Merge pull request #2255 from Molecule-AI/feat/rfc2251-coordinator-phase-instrumentation

feat(harness): coordinator phase-boundary instrumentation for RFC #2251
This commit is contained in:
Hongming Wang 2026-04-29 03:18:08 +00:00 committed by GitHub
commit 54ea64bb01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 47 additions and 4 deletions

View File

@ -515,4 +515,14 @@ async def check_task_status(
elif delegation.status == DelegationStatus.FAILED:
result["error"] = delegation.error
# RFC #2251 V1.0 reproduction-harness instrumentation. Every poll of
# check_task_status emits a phase=check_status line so the harness
# operator can tell whether a coordinator stuck for 8 minutes was
# polling-children-the-whole-time vs synthesizing-after-children-done.
# `grep rfc2251_phase=check_status` in the workspace's container log
# gives the polling pattern. Strip when V1.0 ships.
logger.info(
"rfc2251_phase=check_status task_id=%s peer=%s status=%s",
task_id, delegation.workspace_id, delegation.status.value,
)
return result

View File

@ -120,23 +120,56 @@ async def route_task_to_team(
task: The task description to route.
preferred_member_id: Optional directly delegate to this member.
"""
import time
from builtin_tools.delegation import delegate_task_async as delegate
# RFC #2251 V1.0 reproduction-harness instrumentation. Phase-tagged log
# lines correlate with scripts/measure-coordinator-task-bounds.sh's
# external timing trace, so an operator running the harness against
# staging can answer "what phase was the coordinator in at minute 7?".
# `grep rfc2251_phase` on the workspace's container logs is the query.
# Strip when V1.0 ships and the phase data lands in the structured
# heartbeat payload instead.
_phase_t0 = time.monotonic()
logger.info(
"rfc2251_phase=route_start task_chars=%d preferred_member_id=%s",
len(task), preferred_member_id or "none",
)
children = await get_children()
logger.info(
"rfc2251_phase=children_fetched count=%d elapsed_ms=%d",
len(children), int((time.monotonic() - _phase_t0) * 1000),
)
decision = build_team_routing_payload(
children,
task=task,
preferred_member_id=preferred_member_id,
)
logger.info(
"rfc2251_phase=routing_decided action=%s elapsed_ms=%d",
decision.get("action", "unknown"), int((time.monotonic() - _phase_t0) * 1000),
)
if decision.get("action") == "delegate_to_preferred_member":
# Async delegation — returns immediately with task_id
target = decision["preferred_member_id"]
logger.info(
"rfc2251_phase=delegate_invoked target=%s elapsed_ms=%d",
target, int((time.monotonic() - _phase_t0) * 1000),
)
result = await delegate.ainvoke(
{
"workspace_id": decision["preferred_member_id"],
"task": task,
}
{"workspace_id": target, "task": task}
)
logger.info(
"rfc2251_phase=delegate_returned target=%s task_id=%s elapsed_ms=%d",
target, result.get("task_id", "n/a"), int((time.monotonic() - _phase_t0) * 1000),
)
return result
logger.info(
"rfc2251_phase=route_returning_decision_only elapsed_ms=%d",
int((time.monotonic() - _phase_t0) * 1000),
)
return decision