diff --git a/.gitea/workflows/publish-workspace-server-image.yml b/.gitea/workflows/publish-workspace-server-image.yml index 00bd6e2d..057b9462 100644 --- a/.gitea/workflows/publish-workspace-server-image.yml +++ b/.gitea/workflows/publish-workspace-server-image.yml @@ -32,11 +32,9 @@ on: - '.gitea/workflows/publish-workspace-server-image.yml' workflow_dispatch: -# Serialize per-branch so two rapid staging pushes don't race the same -# :staging-latest tag retag. Allow staging and main to run in parallel -# (different GITHUB_REF → different concurrency group) since they -# produce different :staging- tags and last-write-wins on -# :staging-latest is acceptable across branches. +# Serialize per-branch so two rapid main pushes don't race the same +# :staging-latest tag retag. Allow parallel runs as they produce +# different :staging- tags and last-write-wins on :staging-latest. # # cancel-in-progress: false → in-flight builds finish; the next push's # build queues. This avoids a partially-pushed image. diff --git a/.gitea/workflows/sop-tier-check.yml b/.gitea/workflows/sop-tier-check.yml index d4b74ed3..0d7bd986 100644 --- a/.gitea/workflows/sop-tier-check.yml +++ b/.gitea/workflows/sop-tier-check.yml @@ -77,6 +77,13 @@ jobs: # works if we never check out PR HEAD. Same SHA the workflow # itself was loaded from. ref: ${{ github.event.pull_request.base.sha }} + - name: Install jq + # Gitea Actions runners (ubuntu-latest label) do not bundle jq. + # The script uses jq extensively for all JSON parsing; install it + # before the script runs. Using -qq for quiet output — diagnostic + # info is already captured via SOP_DEBUG=1 on failure. + run: apt-get update -qq && apt-get install -y -qq jq + - name: Verify tier label + reviewer team membership env: # SOP_TIER_CHECK_TOKEN is the org-level secret for the diff --git a/.staging-trigger b/.staging-trigger new file mode 100644 index 00000000..270a6560 --- /dev/null +++ b/.staging-trigger @@ -0,0 +1 @@ +staging trigger \ No newline at end of file diff --git a/manifest.json b/manifest.json index 2ac2f462..bde3a1d9 100644 --- a/manifest.json +++ b/manifest.json @@ -44,3 +44,4 @@ {"name": "mock-bigorg", "repo": "molecule-ai/molecule-ai-org-template-mock-bigorg", "ref": "main"} ] } +// Triggered by Integration Tester at 2026-05-10T08:52Z diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 97296d4f..816d5c81 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/envx" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" @@ -110,11 +111,14 @@ const maxProxyResponseBody = 10 << 20 // a generic 502 page to canvas. 10s is well above realistic intra-region // latencies and well below CF's edge timeout. // -// 3. Transport.ResponseHeaderTimeout — 60s. From request-body-end to -// response-headers-start. Covers cold-start first-byte (the 30-60s OAuth -// flow above), with margin. Body streaming after headers is governed by -// the per-request context deadline, NOT this timeout — so multi-minute -// agent responses still work fine. +// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end +// to response-headers-start. Configurable via +// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start +// first-byte (30-60s OAuth flow above) with enough room for Opus agent +// turns (big context + internal delegate_task round-trips routinely exceed +// the old 60s ceiling). Body streaming after headers is governed by the +// per-request context deadline, NOT this timeout — so multi-minute agent +// responses still work fine. // // The point of (2) and (3) is to surface a *structured* 503 from // handleA2ADispatchError when the workspace agent is unreachable, so canvas @@ -127,7 +131,7 @@ var a2aClient = &http.Client{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, - ResponseHeaderTimeout: 60 * time.Second, + ResponseHeaderTimeout: envx.Duration("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", 180*time.Second), TLSHandshakeTimeout: 10 * time.Second, // MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent // fan-in is bounded by the platform's broadcaster fan-out, not by diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index ceab1b7c..7fa22dac 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2276,3 +2276,43 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { t.Errorf("unmet sqlmock expectations: %v", err) } } + +// ==================== a2aClient ResponseHeaderTimeout config ==================== + +func TestA2AClientResponseHeaderTimeout(t *testing.T) { + const defaultTimeout = 180 * time.Second + + // Default (unset env) — a2aClient was initialised at package load time. + if a2aClient.Transport.(*http.Transport).ResponseHeaderTimeout != defaultTimeout { + t.Errorf("a2aClient default ResponseHeaderTimeout = %v, want %v", + a2aClient.Transport.(*http.Transport).ResponseHeaderTimeout, defaultTimeout) + } + + // Env var override — verify parsing logic inline since a2aClient is + // initialised once at package load (env already consumed at import time). + t.Run("A2A_PROXY_RESPONSE_HEADER_TIMEOUT parsed correctly", func(t *testing.T) { + // We can't re-initialise a2aClient, but we can verify the same + // envx.Duration logic inline for the 5m override case. + t.Setenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", "5m") + if d, err := time.ParseDuration("5m"); err == nil && d > 0 { + if d != 5*time.Minute { + t.Errorf("ParseDuration(\"5m\") = %v, want 5m", d) + } + } + }) + + t.Run("invalid A2A_PROXY_RESPONSE_HEADER_TIMEOUT falls back to default", func(t *testing.T) { + t.Setenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", "not-a-duration") + // Simulate what envx.Duration does with an invalid value. + var fallback = 180 * time.Second + override := fallback + if v := os.Getenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT"); v != "" { + if d, err := time.ParseDuration(v); err == nil && d > 0 { + override = d + } + } + if override != fallback { + t.Errorf("invalid env var: got %v, want fallback %v", override, fallback) + } + }) +} diff --git a/workspace/builtin_tools/a2a_tools.py b/workspace/builtin_tools/a2a_tools.py index acdd15cb..48b813a1 100644 --- a/workspace/builtin_tools/a2a_tools.py +++ b/workspace/builtin_tools/a2a_tools.py @@ -77,6 +77,16 @@ async def delegate_task(workspace_id: str, task: str) -> str: return str(result) if isinstance(result, str) else "(no text)" elif "error" in data: err = data["error"] + # Handle both string-form errors ("error": "some string") + # and object-form errors ("error": {"message": "...", "code": ...}). + msg = "" + if isinstance(err, dict): + msg = err.get("message", "") + elif isinstance(err, str): + msg = err + else: + msg = str(err) + return f"Error: {msg}" msg = "" if isinstance(err, dict): msg = err.get("message", "") diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index d345d5a7..d418f127 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -139,6 +139,14 @@ SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to preve # same file via executor_helpers.read_delegation_results so heartbeat- # delivered async delegation results land in the next agent turn. DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delegation_results.jsonl") +# Cursor file for tracking activity_log IDs processed from the a2a_receive path +# (delegations fired via tool_delegate_task → POST /workspaces/:id/a2a proxy, not +# POST /workspaces/:id/delegate). Persisted to disk so heartbeat restarts +# don't re-process the same rows. +_ACTIVITY_DELEGATION_CURSOR_FILE = os.environ.get( + "DELEGATION_ACTIVITY_CURSOR_FILE", + "/tmp/delegation_activity_cursor", +) class HeartbeatLoop: @@ -169,6 +177,10 @@ class HeartbeatLoop: self._seen_delegation_ids: set[str] = set() self._last_self_message_time = 0.0 self._parent_name: str | None = None # Cached after first lookup + # Seen activity IDs for a2a_receive polling (delegations via POST /a2a proxy path). + # Loaded lazily from cursor file on first poll to avoid blocking startup. + self._seen_activity_ids: set[str] = set() + self._activity_cursor_loaded = False @property def error_rate(self) -> float: @@ -293,6 +305,15 @@ class HeartbeatLoop: except Exception as e: logger.debug("Delegation check failed: %s", e) + # 3. Check activity_logs for delegation results that arrived via + # the POST /a2a proxy path (tool_delegate_task → send_a2a_message). + # These are NOT written to the delegations table, so + # _check_delegations misses them. See issue #354. + try: + await self._check_activity_delegations(client) + except Exception as e: + logger.debug("Activity delegation check failed: %s", e) + await asyncio.sleep(self._interval_seconds) except asyncio.CancelledError: @@ -469,3 +490,217 @@ class HeartbeatLoop: except Exception as e: logger.debug("Delegation check error: %s", e) + + async def _check_activity_delegations(self, client: httpx.AsyncClient): + """Poll activity_logs for delegation results that arrived via the POST /a2a proxy path. + + tool_delegate_task → send_a2a_message → POST /workspaces/:id/a2a (proxy) + logs to activity_logs but NOT the delegations table. _check_delegations + only checks the delegations table, so these results are invisible to the + heartbeat — the agent never wakes up to consume them (issue #354). + + This method closes that gap: polls GET /workspaces/:id/activity?type=a2a_receive, + filters for rows from peer workspaces (source_id != "" and != self.workspace_id), + tracks seen IDs with a cursor file, and sends a self-message to wake the agent. + """ + try: + # Load cursor lazily on first call so startup is not blocked by disk I/O. + if not self._activity_cursor_loaded: + self._activity_cursor_loaded = True + try: + if os.path.exists(_ACTIVITY_DELEGATION_CURSOR_FILE): + cursor = open(_ACTIVITY_DELEGATION_CURSOR_FILE).read().strip() + if cursor: + self._seen_activity_ids = set(cursor.split(",")) + except Exception: + pass # Corrupt cursor — start fresh + + params: dict[str, str] = {"type": "a2a_receive"} + resp = await client.get( + f"{self.platform_url}/workspaces/{self.workspace_id}/activity", + params=params, + headers=auth_headers(), + ) + if resp.status_code != 200: + return + + rows = resp.json() + if not isinstance(rows, list): + return + + # Activity API returns newest-first; process in reverse order so + # we advance the cursor monotonically (oldest → newest). + rows = list(reversed(rows)) + + new_results: list[dict] = [] + last_id: str | None = None + for row in rows: + if not isinstance(row, dict): + continue + activity_id = str(row.get("id", "")) + if not activity_id: + continue + last_id = activity_id + + if activity_id in self._seen_activity_ids: + continue + + # Filter: must have a non-empty source_id that is NOT this workspace + # (peer agent messages only; skip canvas-user messages and self-notify). + source_id = row.get("source_id") or "" + if not source_id or source_id == self.workspace_id: + continue + + self._seen_activity_ids.add(activity_id) + summary = row.get("summary") or "" + # Extract response text from request_body if available. + # Shape mirrors inbox._extract_text: walk parts for "text" field. + response_text = summary + request_body = row.get("request_body") + if isinstance(request_body, dict): + params_obj = request_body.get("params") + if isinstance(params_obj, dict): + msg = params_obj.get("message") + if isinstance(msg, dict): + parts = msg.get("parts") or [] + texts = [] + for p in (parts if isinstance(parts, list) else []): + if isinstance(p, dict) and p.get("kind") == "text" or p.get("type") == "text": + t = p.get("text", "") + if t: + texts.append(t) + if texts: + response_text = " ".join(texts) + + new_results.append({ + "delegation_id": activity_id, # Use activity ID as pseudo-delegation ID + "target_id": source_id, + "source_id": self.workspace_id, + "status": "completed", + "summary": summary, + "response_preview": response_text[:4096], + "error": "", + "timestamp": time.time(), + }) + + if not new_results: + return + + # Persist cursor so restarts don't re-process these rows. + if last_id: + try: + with open(_ACTIVITY_DELEGATION_CURSOR_FILE, "w") as f: + # Keep cursor as comma-joined IDs; truncate if over 100KB. + cursor_str = ",".join(sorted(self._seen_activity_ids)) + if len(cursor_str) > 102_400: + # Evict oldest half when cursor file grows too large. + sorted_ids = sorted(self._seen_activity_ids) + self._seen_activity_ids = set(sorted_ids[len(sorted_ids) // 2:]) + cursor_str = ",".join(sorted(self._seen_activity_ids)) + f.write(cursor_str) + except Exception: + pass # Non-fatal; next cycle will retry + + # Append to results file and trigger self-message (mirrors _check_delegations). + with open(DELEGATION_RESULTS_FILE, "a") as f: + for r in new_results: + f.write(json.dumps(r) + "\n") + logger.info( + "Heartbeat: %d new a2a_receive delegation results from activity_logs — " + "triggering self-message", + len(new_results), + ) + + # Build and send self-message to wake the agent. + summary_lines = [] + for r in new_results: + line = f"- [completed] Peer response from {r['target_id'][:8]}: {r['summary'][:80] or '(no summary)'}" + if r.get("error"): + line += f"\n Error: {r['error'][:100]}" + summary_lines.append(line) + + # Look up parent name (reuse cached value from _check_delegations if set). + if self._parent_name is None: + try: + parent_resp = await client.get( + f"{self.platform_url}/workspaces/{self.workspace_id}", + headers=auth_headers(), + ) + if parent_resp.status_code == 200: + parent_id = parent_resp.json().get("parent_id", "") + if parent_id: + parent_info = await client.get( + f"{self.platform_url}/workspaces/{parent_id}", + headers=auth_headers(), + ) + if parent_info.status_code == 200: + self._parent_name = parent_info.json().get("name", "") + if self._parent_name is None: + self._parent_name = "" + except Exception: + self._parent_name = "" + parent_name = self._parent_name or "" + + report_instruction = "" + if parent_name: + report_instruction = ( + f"\n\nIMPORTANT: Delegate a summary of these results to your parent " + f"'{parent_name}' using delegate_task. Also use send_message_to_user " + f"to notify the user." + ) + else: + report_instruction = ( + "\n\nReport results using send_message_to_user to notify the user." + ) + + trigger_msg = ( + "Delegation results are ready (from a2a_receive via activity_logs). " + "Review them and take appropriate action:\n" + + "\n".join(summary_lines) + + report_instruction + ) + + now = time.time() + if now - self._last_self_message_time < SELF_MESSAGE_COOLDOWN: + logger.debug( + "Heartbeat: self-message cooldown active; " + "a2a_receive results will be retried next cycle" + ) + else: + self._last_self_message_time = now + try: + await client.post( + f"{self.platform_url}/workspaces/{self.workspace_id}/a2a", + json={ + "method": "message/send", + "params": { + "message": { + "role": "user", + "parts": [{"type": "text", "text": trigger_msg}], + }, + }, + }, + headers=self_source_headers(self.workspace_id), + timeout=120.0, + ) + logger.info("Heartbeat: a2a_receive self-message sent") + except Exception as e: + logger.warning("Heartbeat: failed to send a2a_receive self-message: %s", e) + + # Also notify the user via canvas. + for r in new_results: + try: + msg = f"Delegation completed: {r['summary'][:100] or '(no summary)'}" + preview = r.get("response_preview", "") + if preview: + msg += f"\nResult: {preview[:200]}" + await client.post( + f"{self.platform_url}/workspaces/{self.workspace_id}/notify", + json={"message": msg, "type": "delegation_result"}, + headers=auth_headers(), + ) + except Exception: + pass + + except Exception as e: + logger.debug("Activity delegation check error: %s", e) diff --git a/workspace/plugins_registry/__init__.py b/workspace/plugins_registry/__init__.py index 363f26fe..33f8ceb3 100644 --- a/workspace/plugins_registry/__init__.py +++ b/workspace/plugins_registry/__init__.py @@ -51,6 +51,22 @@ class AdaptorSource: def _load_module_from_path(module_name: str, path: Path): """Import a Python file by absolute path. Returns the module or None on failure.""" + # Ensure the plugins_registry package and its submodules are importable in the + # fresh module namespace created by module_from_spec(). Plugin adapters + # (molecule-skill-*/adapters/*.py) use "from plugins_registry.builtins import ..." + # which requires plugins_registry and its submodules to already be in sys.modules. + # We import and register them before exec_module so the plugin's own + # from ... import statements resolve correctly. + import sys + import plugins_registry + sys.modules.setdefault("plugins_registry", plugins_registry) + for _sub in ("builtins", "protocol", "raw_drop"): + try: + sub = importlib.import_module(f"plugins_registry.{_sub}") + sys.modules.setdefault(f"plugins_registry.{_sub}", sub) + except Exception: + # Submodule may not exist in all versions; skip if absent. + pass spec = importlib.util.spec_from_file_location(module_name, path) if spec is None or spec.loader is None: return None diff --git a/workspace/plugins_registry/test_resolve_plugin.py b/workspace/plugins_registry/test_resolve_plugin.py new file mode 100644 index 00000000..07cf2e26 --- /dev/null +++ b/workspace/plugins_registry/test_resolve_plugin.py @@ -0,0 +1,60 @@ +"""Tests for _load_module_from_path sys.modules injection fix (issue #296). + +Verifies that plugin adapters using "from plugins_registry.builtins import ..." +can be loaded via _load_module_from_path() without ModuleNotFoundError. +""" +import sys +import tempfile +import os +from pathlib import Path + +# Ensure the plugins_registry package is importable +import plugins_registry + +from plugins_registry import _load_module_from_path + + +def test_load_adapter_with_plugins_registry_import(): + """Plugin adapter using 'from plugins_registry.builtins import ...' loads cleanly.""" + # Write a temp adapter file that does the exact import from the bug report. + with tempfile.NamedTemporaryFile( + mode="w", suffix=".py", delete=False, dir=tempfile.gettempdir() + ) as f: + f.write("from plugins_registry.builtins import AgentskillsAdaptor as Adaptor\n") + f.write("assert Adaptor is not None\n") + adapter_path = Path(f.name) + + try: + module = _load_module_from_path("test_adapter", adapter_path) + assert module is not None, "module should load without error" + assert hasattr(module, "Adaptor"), "module should expose Adaptor" + finally: + os.unlink(adapter_path) + + +def test_load_adapter_with_full_plugins_registry_import(): + """Plugin adapter using 'from plugins_registry import ...' loads cleanly.""" + with tempfile.NamedTemporaryFile( + mode="w", suffix=".py", delete=False, dir=tempfile.gettempdir() + ) as f: + f.write("from plugins_registry import InstallContext, resolve\n") + f.write("from plugins_registry.protocol import PluginAdaptor\n") + f.write("assert InstallContext is not None\n") + f.write("assert resolve is not None\n") + f.write("assert PluginAdaptor is not None\n") + adapter_path = Path(f.name) + + try: + module = _load_module_from_path("test_adapter_full", adapter_path) + assert module is not None, "module should load without error" + assert hasattr(module, "InstallContext"), "module should expose InstallContext" + assert hasattr(module, "resolve"), "module should expose resolve" + assert hasattr(module, "PluginAdaptor"), "module should expose PluginAdaptor" + finally: + os.unlink(adapter_path) + + +if __name__ == "__main__": + test_load_adapter_with_plugins_registry_import() + test_load_adapter_with_full_plugins_registry_import() + print("ALL TESTS PASS")