From 5fe52b08e7f39fa8c2cc56c74ffecb5c59c34fd0 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 28 Apr 2026 20:11:46 -0700 Subject: [PATCH] feat(harness): coordinator phase-boundary instrumentation for RFC #2251 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds structured `rfc2251_phase=...` log lines at the deterministic phase boundaries inside route_task_to_team and check_task_status, so an operator running scripts/measure-coordinator-task-bounds.sh against staging can correlate the harness's external timing trace with what phase the coordinator was in at any given second. The harness already exists in staging and measures end-to-end response time + heartbeat trace. What it CAN'T do without this PR is answer "the coordinator response took 7 minutes — was it stuck delegating, or stuck polling children, or stuck synthesizing after all children returned?" The phase logs answer that question. Phases instrumented (deterministic Python boundaries, no agent prompt involvement): route_start → enter route_task_to_team children_fetched → after get_children() returns routing_decided → after build_team_routing_payload delegate_invoked → just before delegate_task_async.ainvoke delegate_returned → after delegate_task_async returns check_status → every check_task_status poll (per-poll) route_returning_decision_only → fall-through path Each line includes elapsed_ms from route_start so per-phase durations are extractable via: grep rfc2251_phase= \ | awk '{...}' to compute deltas between consecutive phases The synthesis phase (after all children return, before agent emits final A2A response) is NOT instrumented here because it's agent-driven (no deterministic Python boundary). The harness operator infers synthesis_secs = total_response_secs − max(check_status_ts). This is reproduction-harness scaffolding; it adds zero behavior. Strip the rfc2251_phase log lines when V1.0 ships and the phase data lands in the structured heartbeat payload instead. Refs: - RFC: molecule-core#2251 - Harness: scripts/measure-coordinator-task-bounds.sh (shipped earlier) - V1.0 gate: this is deliverable #2 of the four pre-V1.0 gates --- workspace/builtin_tools/delegation.py | 10 +++++++ workspace/coordinator.py | 41 ++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 01e4da00..f4e6ad01 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -515,4 +515,14 @@ async def check_task_status( elif delegation.status == DelegationStatus.FAILED: result["error"] = delegation.error + # RFC #2251 V1.0 reproduction-harness instrumentation. Every poll of + # check_task_status emits a phase=check_status line so the harness + # operator can tell whether a coordinator stuck for 8 minutes was + # polling-children-the-whole-time vs synthesizing-after-children-done. + # `grep rfc2251_phase=check_status` in the workspace's container log + # gives the polling pattern. Strip when V1.0 ships. + logger.info( + "rfc2251_phase=check_status task_id=%s peer=%s status=%s", + task_id, delegation.workspace_id, delegation.status.value, + ) return result diff --git a/workspace/coordinator.py b/workspace/coordinator.py index 7790262f..954ea2f3 100644 --- a/workspace/coordinator.py +++ b/workspace/coordinator.py @@ -120,23 +120,56 @@ async def route_task_to_team( task: The task description to route. preferred_member_id: Optional — directly delegate to this member. """ + import time from builtin_tools.delegation import delegate_task_async as delegate + # RFC #2251 V1.0 reproduction-harness instrumentation. Phase-tagged log + # lines correlate with scripts/measure-coordinator-task-bounds.sh's + # external timing trace, so an operator running the harness against + # staging can answer "what phase was the coordinator in at minute 7?". + # `grep rfc2251_phase` on the workspace's container logs is the query. + # Strip when V1.0 ships and the phase data lands in the structured + # heartbeat payload instead. + _phase_t0 = time.monotonic() + logger.info( + "rfc2251_phase=route_start task_chars=%d preferred_member_id=%s", + len(task), preferred_member_id or "none", + ) + children = await get_children() + logger.info( + "rfc2251_phase=children_fetched count=%d elapsed_ms=%d", + len(children), int((time.monotonic() - _phase_t0) * 1000), + ) + decision = build_team_routing_payload( children, task=task, preferred_member_id=preferred_member_id, ) + logger.info( + "rfc2251_phase=routing_decided action=%s elapsed_ms=%d", + decision.get("action", "unknown"), int((time.monotonic() - _phase_t0) * 1000), + ) if decision.get("action") == "delegate_to_preferred_member": # Async delegation — returns immediately with task_id + target = decision["preferred_member_id"] + logger.info( + "rfc2251_phase=delegate_invoked target=%s elapsed_ms=%d", + target, int((time.monotonic() - _phase_t0) * 1000), + ) result = await delegate.ainvoke( - { - "workspace_id": decision["preferred_member_id"], - "task": task, - } + {"workspace_id": target, "task": task} + ) + logger.info( + "rfc2251_phase=delegate_returned target=%s task_id=%s elapsed_ms=%d", + target, result.get("task_id", "n/a"), int((time.monotonic() - _phase_t0) * 1000), ) return result + logger.info( + "rfc2251_phase=route_returning_decision_only elapsed_ms=%d", + int((time.monotonic() - _phase_t0) * 1000), + ) return decision