fix(workspace): poll activity_logs for a2a_proxy delegation results (closes #354) #376

Closed
fullstack-engineer wants to merge 13 commits from fix/354-delegation-auto-resume into main
10 changed files with 383 additions and 11 deletions

View File

@ -32,11 +32,9 @@ on:
- '.gitea/workflows/publish-workspace-server-image.yml'
workflow_dispatch:
# Serialize per-branch so two rapid staging pushes don't race the same
# :staging-latest tag retag. Allow staging and main to run in parallel
# (different GITHUB_REF → different concurrency group) since they
# produce different :staging-<sha> tags and last-write-wins on
# :staging-latest is acceptable across branches.
# Serialize per-branch so two rapid main pushes don't race the same
# :staging-latest tag retag. Allow parallel runs as they produce
# different :staging-<sha> tags and last-write-wins on :staging-latest.
#
# cancel-in-progress: false → in-flight builds finish; the next push's
# build queues. This avoids a partially-pushed image.

View File

@ -77,6 +77,13 @@ jobs:
# works if we never check out PR HEAD. Same SHA the workflow
# itself was loaded from.
ref: ${{ github.event.pull_request.base.sha }}
- name: Install jq
# Gitea Actions runners (ubuntu-latest label) do not bundle jq.
# The script uses jq extensively for all JSON parsing; install it
# before the script runs. Using -qq for quiet output — diagnostic
# info is already captured via SOP_DEBUG=1 on failure.
run: apt-get update -qq && apt-get install -y -qq jq
- name: Verify tier label + reviewer team membership
env:
# SOP_TIER_CHECK_TOKEN is the org-level secret for the

1
.staging-trigger Normal file
View File

@ -0,0 +1 @@
staging trigger

View File

@ -44,3 +44,4 @@
{"name": "mock-bigorg", "repo": "molecule-ai/molecule-ai-org-template-mock-bigorg", "ref": "main"}
]
}
// Triggered by Integration Tester at 2026-05-10T08:52Z

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/envx"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
@ -110,11 +111,14 @@ const maxProxyResponseBody = 10 << 20
// a generic 502 page to canvas. 10s is well above realistic intra-region
// latencies and well below CF's edge timeout.
//
// 3. Transport.ResponseHeaderTimeout — 60s. From request-body-end to
// response-headers-start. Covers cold-start first-byte (the 30-60s OAuth
// flow above), with margin. Body streaming after headers is governed by
// the per-request context deadline, NOT this timeout — so multi-minute
// agent responses still work fine.
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
// to response-headers-start. Configurable via
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
// turns (big context + internal delegate_task round-trips routinely exceed
// the old 60s ceiling). Body streaming after headers is governed by the
// per-request context deadline, NOT this timeout — so multi-minute agent
// responses still work fine.
//
// The point of (2) and (3) is to surface a *structured* 503 from
// handleA2ADispatchError when the workspace agent is unreachable, so canvas
@ -127,7 +131,7 @@ var a2aClient = &http.Client{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 60 * time.Second,
ResponseHeaderTimeout: envx.Duration("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", 180*time.Second),
TLSHandshakeTimeout: 10 * time.Second,
// MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent
// fan-in is bounded by the platform's broadcaster fan-out, not by

View File

@ -2276,3 +2276,43 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ==================== a2aClient ResponseHeaderTimeout config ====================
func TestA2AClientResponseHeaderTimeout(t *testing.T) {
const defaultTimeout = 180 * time.Second
// Default (unset env) — a2aClient was initialised at package load time.
if a2aClient.Transport.(*http.Transport).ResponseHeaderTimeout != defaultTimeout {
t.Errorf("a2aClient default ResponseHeaderTimeout = %v, want %v",
a2aClient.Transport.(*http.Transport).ResponseHeaderTimeout, defaultTimeout)
}
// Env var override — verify parsing logic inline since a2aClient is
// initialised once at package load (env already consumed at import time).
t.Run("A2A_PROXY_RESPONSE_HEADER_TIMEOUT parsed correctly", func(t *testing.T) {
// We can't re-initialise a2aClient, but we can verify the same
// envx.Duration logic inline for the 5m override case.
t.Setenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", "5m")
if d, err := time.ParseDuration("5m"); err == nil && d > 0 {
if d != 5*time.Minute {
t.Errorf("ParseDuration(\"5m\") = %v, want 5m", d)
}
}
})
t.Run("invalid A2A_PROXY_RESPONSE_HEADER_TIMEOUT falls back to default", func(t *testing.T) {
t.Setenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", "not-a-duration")
// Simulate what envx.Duration does with an invalid value.
var fallback = 180 * time.Second
override := fallback
if v := os.Getenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT"); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
override = d
}
}
if override != fallback {
t.Errorf("invalid env var: got %v, want fallback %v", override, fallback)
}
})
}

View File

@ -77,6 +77,16 @@ async def delegate_task(workspace_id: str, task: str) -> str:
return str(result) if isinstance(result, str) else "(no text)"
elif "error" in data:
err = data["error"]
# Handle both string-form errors ("error": "some string")
# and object-form errors ("error": {"message": "...", "code": ...}).
msg = ""
if isinstance(err, dict):
msg = err.get("message", "")
elif isinstance(err, str):
msg = err
else:
msg = str(err)
return f"Error: {msg}"
msg = ""
if isinstance(err, dict):
msg = err.get("message", "")

View File

@ -139,6 +139,14 @@ SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to preve
# same file via executor_helpers.read_delegation_results so heartbeat-
# delivered async delegation results land in the next agent turn.
DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delegation_results.jsonl")
# Cursor file for tracking activity_log IDs processed from the a2a_receive path
# (delegations fired via tool_delegate_task → POST /workspaces/:id/a2a proxy, not
# POST /workspaces/:id/delegate). Persisted to disk so heartbeat restarts
# don't re-process the same rows.
_ACTIVITY_DELEGATION_CURSOR_FILE = os.environ.get(
"DELEGATION_ACTIVITY_CURSOR_FILE",
"/tmp/delegation_activity_cursor",
)
class HeartbeatLoop:
@ -169,6 +177,10 @@ class HeartbeatLoop:
self._seen_delegation_ids: set[str] = set()
self._last_self_message_time = 0.0
self._parent_name: str | None = None # Cached after first lookup
# Seen activity IDs for a2a_receive polling (delegations via POST /a2a proxy path).
# Loaded lazily from cursor file on first poll to avoid blocking startup.
self._seen_activity_ids: set[str] = set()
self._activity_cursor_loaded = False
@property
def error_rate(self) -> float:
@ -293,6 +305,15 @@ class HeartbeatLoop:
except Exception as e:
logger.debug("Delegation check failed: %s", e)
# 3. Check activity_logs for delegation results that arrived via
# the POST /a2a proxy path (tool_delegate_task → send_a2a_message).
# These are NOT written to the delegations table, so
# _check_delegations misses them. See issue #354.
try:
await self._check_activity_delegations(client)
except Exception as e:
logger.debug("Activity delegation check failed: %s", e)
await asyncio.sleep(self._interval_seconds)
except asyncio.CancelledError:
@ -469,3 +490,217 @@ class HeartbeatLoop:
except Exception as e:
logger.debug("Delegation check error: %s", e)
async def _check_activity_delegations(self, client: httpx.AsyncClient):
"""Poll activity_logs for delegation results that arrived via the POST /a2a proxy path.
tool_delegate_task send_a2a_message POST /workspaces/:id/a2a (proxy)
logs to activity_logs but NOT the delegations table. _check_delegations
only checks the delegations table, so these results are invisible to the
heartbeat the agent never wakes up to consume them (issue #354).
This method closes that gap: polls GET /workspaces/:id/activity?type=a2a_receive,
filters for rows from peer workspaces (source_id != "" and != self.workspace_id),
tracks seen IDs with a cursor file, and sends a self-message to wake the agent.
"""
try:
# Load cursor lazily on first call so startup is not blocked by disk I/O.
if not self._activity_cursor_loaded:
self._activity_cursor_loaded = True
try:
if os.path.exists(_ACTIVITY_DELEGATION_CURSOR_FILE):
cursor = open(_ACTIVITY_DELEGATION_CURSOR_FILE).read().strip()
if cursor:
self._seen_activity_ids = set(cursor.split(","))
except Exception:
pass # Corrupt cursor — start fresh
params: dict[str, str] = {"type": "a2a_receive"}
resp = await client.get(
f"{self.platform_url}/workspaces/{self.workspace_id}/activity",
params=params,
headers=auth_headers(),
)
if resp.status_code != 200:
return
rows = resp.json()
if not isinstance(rows, list):
return
# Activity API returns newest-first; process in reverse order so
# we advance the cursor monotonically (oldest → newest).
rows = list(reversed(rows))
new_results: list[dict] = []
last_id: str | None = None
for row in rows:
if not isinstance(row, dict):
continue
activity_id = str(row.get("id", ""))
if not activity_id:
continue
last_id = activity_id
if activity_id in self._seen_activity_ids:
continue
# Filter: must have a non-empty source_id that is NOT this workspace
# (peer agent messages only; skip canvas-user messages and self-notify).
source_id = row.get("source_id") or ""
if not source_id or source_id == self.workspace_id:
continue
self._seen_activity_ids.add(activity_id)
summary = row.get("summary") or ""
# Extract response text from request_body if available.
# Shape mirrors inbox._extract_text: walk parts for "text" field.
response_text = summary
request_body = row.get("request_body")
if isinstance(request_body, dict):
params_obj = request_body.get("params")
if isinstance(params_obj, dict):
msg = params_obj.get("message")
if isinstance(msg, dict):
parts = msg.get("parts") or []
texts = []
for p in (parts if isinstance(parts, list) else []):
if isinstance(p, dict) and p.get("kind") == "text" or p.get("type") == "text":
t = p.get("text", "")
if t:
texts.append(t)
if texts:
response_text = " ".join(texts)
new_results.append({
"delegation_id": activity_id, # Use activity ID as pseudo-delegation ID
"target_id": source_id,
"source_id": self.workspace_id,
"status": "completed",
"summary": summary,
"response_preview": response_text[:4096],
"error": "",
"timestamp": time.time(),
})
if not new_results:
return
# Persist cursor so restarts don't re-process these rows.
if last_id:
try:
with open(_ACTIVITY_DELEGATION_CURSOR_FILE, "w") as f:
# Keep cursor as comma-joined IDs; truncate if over 100KB.
cursor_str = ",".join(sorted(self._seen_activity_ids))
if len(cursor_str) > 102_400:
# Evict oldest half when cursor file grows too large.
sorted_ids = sorted(self._seen_activity_ids)
self._seen_activity_ids = set(sorted_ids[len(sorted_ids) // 2:])
cursor_str = ",".join(sorted(self._seen_activity_ids))
f.write(cursor_str)
except Exception:
pass # Non-fatal; next cycle will retry
# Append to results file and trigger self-message (mirrors _check_delegations).
with open(DELEGATION_RESULTS_FILE, "a") as f:
for r in new_results:
f.write(json.dumps(r) + "\n")
logger.info(
"Heartbeat: %d new a2a_receive delegation results from activity_logs — "
"triggering self-message",
len(new_results),
)
# Build and send self-message to wake the agent.
summary_lines = []
for r in new_results:
line = f"- [completed] Peer response from {r['target_id'][:8]}: {r['summary'][:80] or '(no summary)'}"
if r.get("error"):
line += f"\n Error: {r['error'][:100]}"
summary_lines.append(line)
# Look up parent name (reuse cached value from _check_delegations if set).
if self._parent_name is None:
try:
parent_resp = await client.get(
f"{self.platform_url}/workspaces/{self.workspace_id}",
headers=auth_headers(),
)
if parent_resp.status_code == 200:
parent_id = parent_resp.json().get("parent_id", "")
if parent_id:
parent_info = await client.get(
f"{self.platform_url}/workspaces/{parent_id}",
headers=auth_headers(),
)
if parent_info.status_code == 200:
self._parent_name = parent_info.json().get("name", "")
if self._parent_name is None:
self._parent_name = ""
except Exception:
self._parent_name = ""
parent_name = self._parent_name or ""
report_instruction = ""
if parent_name:
report_instruction = (
f"\n\nIMPORTANT: Delegate a summary of these results to your parent "
f"'{parent_name}' using delegate_task. Also use send_message_to_user "
f"to notify the user."
)
else:
report_instruction = (
"\n\nReport results using send_message_to_user to notify the user."
)
trigger_msg = (
"Delegation results are ready (from a2a_receive via activity_logs). "
"Review them and take appropriate action:\n"
+ "\n".join(summary_lines)
+ report_instruction
)
now = time.time()
if now - self._last_self_message_time < SELF_MESSAGE_COOLDOWN:
logger.debug(
"Heartbeat: self-message cooldown active; "
"a2a_receive results will be retried next cycle"
)
else:
self._last_self_message_time = now
try:
await client.post(
f"{self.platform_url}/workspaces/{self.workspace_id}/a2a",
json={
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": trigger_msg}],
},
},
},
headers=self_source_headers(self.workspace_id),
timeout=120.0,
)
logger.info("Heartbeat: a2a_receive self-message sent")
except Exception as e:
logger.warning("Heartbeat: failed to send a2a_receive self-message: %s", e)
# Also notify the user via canvas.
for r in new_results:
try:
msg = f"Delegation completed: {r['summary'][:100] or '(no summary)'}"
preview = r.get("response_preview", "")
if preview:
msg += f"\nResult: {preview[:200]}"
await client.post(
f"{self.platform_url}/workspaces/{self.workspace_id}/notify",
json={"message": msg, "type": "delegation_result"},
headers=auth_headers(),
)
except Exception:
pass
except Exception as e:
logger.debug("Activity delegation check error: %s", e)

View File

@ -51,6 +51,22 @@ class AdaptorSource:
def _load_module_from_path(module_name: str, path: Path):
"""Import a Python file by absolute path. Returns the module or None on failure."""
# Ensure the plugins_registry package and its submodules are importable in the
# fresh module namespace created by module_from_spec(). Plugin adapters
# (molecule-skill-*/adapters/*.py) use "from plugins_registry.builtins import ..."
# which requires plugins_registry and its submodules to already be in sys.modules.
# We import and register them before exec_module so the plugin's own
# from ... import statements resolve correctly.
import sys
import plugins_registry
sys.modules.setdefault("plugins_registry", plugins_registry)
for _sub in ("builtins", "protocol", "raw_drop"):
try:
sub = importlib.import_module(f"plugins_registry.{_sub}")
sys.modules.setdefault(f"plugins_registry.{_sub}", sub)
except Exception:
# Submodule may not exist in all versions; skip if absent.
pass
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
return None

View File

@ -0,0 +1,60 @@
"""Tests for _load_module_from_path sys.modules injection fix (issue #296).
Verifies that plugin adapters using "from plugins_registry.builtins import ..."
can be loaded via _load_module_from_path() without ModuleNotFoundError.
"""
import sys
import tempfile
import os
from pathlib import Path
# Ensure the plugins_registry package is importable
import plugins_registry
from plugins_registry import _load_module_from_path
def test_load_adapter_with_plugins_registry_import():
"""Plugin adapter using 'from plugins_registry.builtins import ...' loads cleanly."""
# Write a temp adapter file that does the exact import from the bug report.
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False, dir=tempfile.gettempdir()
) as f:
f.write("from plugins_registry.builtins import AgentskillsAdaptor as Adaptor\n")
f.write("assert Adaptor is not None\n")
adapter_path = Path(f.name)
try:
module = _load_module_from_path("test_adapter", adapter_path)
assert module is not None, "module should load without error"
assert hasattr(module, "Adaptor"), "module should expose Adaptor"
finally:
os.unlink(adapter_path)
def test_load_adapter_with_full_plugins_registry_import():
"""Plugin adapter using 'from plugins_registry import ...' loads cleanly."""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False, dir=tempfile.gettempdir()
) as f:
f.write("from plugins_registry import InstallContext, resolve\n")
f.write("from plugins_registry.protocol import PluginAdaptor\n")
f.write("assert InstallContext is not None\n")
f.write("assert resolve is not None\n")
f.write("assert PluginAdaptor is not None\n")
adapter_path = Path(f.name)
try:
module = _load_module_from_path("test_adapter_full", adapter_path)
assert module is not None, "module should load without error"
assert hasattr(module, "InstallContext"), "module should expose InstallContext"
assert hasattr(module, "resolve"), "module should expose resolve"
assert hasattr(module, "PluginAdaptor"), "module should expose PluginAdaptor"
finally:
os.unlink(adapter_path)
if __name__ == "__main__":
test_load_adapter_with_plugins_registry_import()
test_load_adapter_with_full_plugins_registry_import()
print("ALL TESTS PASS")