feat(harness): coordinator phase-boundary instrumentation for RFC #2251
Adds structured `rfc2251_phase=...` log lines at the deterministic phase
boundaries inside route_task_to_team and check_task_status, so an
operator running scripts/measure-coordinator-task-bounds.sh against
staging can correlate the harness's external timing trace with what
phase the coordinator was in at any given second.
The harness already exists in staging and measures end-to-end response
time + heartbeat trace. What it CAN'T do without this PR is answer
"the coordinator response took 7 minutes — was it stuck delegating, or
stuck polling children, or stuck synthesizing after all children
returned?" The phase logs answer that question.
Phases instrumented (deterministic Python boundaries, no agent prompt
involvement):
route_start → enter route_task_to_team
children_fetched → after get_children() returns
routing_decided → after build_team_routing_payload
delegate_invoked → just before delegate_task_async.ainvoke
delegate_returned → after delegate_task_async returns
check_status → every check_task_status poll (per-poll)
route_returning_decision_only → fall-through path
Each line includes elapsed_ms from route_start so per-phase durations
are extractable via:
grep rfc2251_phase= <container.log> \
| awk '{...}' to compute deltas between consecutive phases
The synthesis phase (after all children return, before agent emits
final A2A response) is NOT instrumented here because it's
agent-driven (no deterministic Python boundary). The harness operator
infers synthesis_secs = total_response_secs − max(check_status_ts).
This is reproduction-harness scaffolding; it adds zero behavior. Strip
the rfc2251_phase log lines when V1.0 ships and the phase data lands
in the structured heartbeat payload instead.
Refs:
- RFC: molecule-core#2251
- Harness: scripts/measure-coordinator-task-bounds.sh (shipped earlier)
- V1.0 gate: this is deliverable #2 of the four pre-V1.0 gates
This commit is contained in:
parent
daea27641f
commit
5fe52b08e7
@ -515,4 +515,14 @@ async def check_task_status(
|
|||||||
elif delegation.status == DelegationStatus.FAILED:
|
elif delegation.status == DelegationStatus.FAILED:
|
||||||
result["error"] = delegation.error
|
result["error"] = delegation.error
|
||||||
|
|
||||||
|
# RFC #2251 V1.0 reproduction-harness instrumentation. Every poll of
|
||||||
|
# check_task_status emits a phase=check_status line so the harness
|
||||||
|
# operator can tell whether a coordinator stuck for 8 minutes was
|
||||||
|
# polling-children-the-whole-time vs synthesizing-after-children-done.
|
||||||
|
# `grep rfc2251_phase=check_status` in the workspace's container log
|
||||||
|
# gives the polling pattern. Strip when V1.0 ships.
|
||||||
|
logger.info(
|
||||||
|
"rfc2251_phase=check_status task_id=%s peer=%s status=%s",
|
||||||
|
task_id, delegation.workspace_id, delegation.status.value,
|
||||||
|
)
|
||||||
return result
|
return result
|
||||||
|
|||||||
@ -120,23 +120,56 @@ async def route_task_to_team(
|
|||||||
task: The task description to route.
|
task: The task description to route.
|
||||||
preferred_member_id: Optional — directly delegate to this member.
|
preferred_member_id: Optional — directly delegate to this member.
|
||||||
"""
|
"""
|
||||||
|
import time
|
||||||
from builtin_tools.delegation import delegate_task_async as delegate
|
from builtin_tools.delegation import delegate_task_async as delegate
|
||||||
|
|
||||||
|
# RFC #2251 V1.0 reproduction-harness instrumentation. Phase-tagged log
|
||||||
|
# lines correlate with scripts/measure-coordinator-task-bounds.sh's
|
||||||
|
# external timing trace, so an operator running the harness against
|
||||||
|
# staging can answer "what phase was the coordinator in at minute 7?".
|
||||||
|
# `grep rfc2251_phase` on the workspace's container logs is the query.
|
||||||
|
# Strip when V1.0 ships and the phase data lands in the structured
|
||||||
|
# heartbeat payload instead.
|
||||||
|
_phase_t0 = time.monotonic()
|
||||||
|
logger.info(
|
||||||
|
"rfc2251_phase=route_start task_chars=%d preferred_member_id=%s",
|
||||||
|
len(task), preferred_member_id or "none",
|
||||||
|
)
|
||||||
|
|
||||||
children = await get_children()
|
children = await get_children()
|
||||||
|
logger.info(
|
||||||
|
"rfc2251_phase=children_fetched count=%d elapsed_ms=%d",
|
||||||
|
len(children), int((time.monotonic() - _phase_t0) * 1000),
|
||||||
|
)
|
||||||
|
|
||||||
decision = build_team_routing_payload(
|
decision = build_team_routing_payload(
|
||||||
children,
|
children,
|
||||||
task=task,
|
task=task,
|
||||||
preferred_member_id=preferred_member_id,
|
preferred_member_id=preferred_member_id,
|
||||||
)
|
)
|
||||||
|
logger.info(
|
||||||
|
"rfc2251_phase=routing_decided action=%s elapsed_ms=%d",
|
||||||
|
decision.get("action", "unknown"), int((time.monotonic() - _phase_t0) * 1000),
|
||||||
|
)
|
||||||
|
|
||||||
if decision.get("action") == "delegate_to_preferred_member":
|
if decision.get("action") == "delegate_to_preferred_member":
|
||||||
# Async delegation — returns immediately with task_id
|
# Async delegation — returns immediately with task_id
|
||||||
|
target = decision["preferred_member_id"]
|
||||||
|
logger.info(
|
||||||
|
"rfc2251_phase=delegate_invoked target=%s elapsed_ms=%d",
|
||||||
|
target, int((time.monotonic() - _phase_t0) * 1000),
|
||||||
|
)
|
||||||
result = await delegate.ainvoke(
|
result = await delegate.ainvoke(
|
||||||
{
|
{"workspace_id": target, "task": task}
|
||||||
"workspace_id": decision["preferred_member_id"],
|
)
|
||||||
"task": task,
|
logger.info(
|
||||||
}
|
"rfc2251_phase=delegate_returned target=%s task_id=%s elapsed_ms=%d",
|
||||||
|
target, result.get("task_id", "n/a"), int((time.monotonic() - _phase_t0) * 1000),
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"rfc2251_phase=route_returning_decision_only elapsed_ms=%d",
|
||||||
|
int((time.monotonic() - _phase_t0) * 1000),
|
||||||
|
)
|
||||||
return decision
|
return decision
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user