Compare commits

...

26 Commits

Author SHA1 Message Date
core-be 93b7d9a88a fix(a2a_tools): add comment + test coverage for string-form error handling in delegate_task
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 8s
sop-tier-check / tier-check (pull_request) Manual override — infra#241 duplicate runner fails immediately. PR only adds comment + tests to a2a_tools.py. core-qa APPROVED.
audit-force-merge / audit (pull_request) Successful in 2s
Staging branch bea89ce4 introduced duplicate dead code after a `return`
in the delegate_task error-handling block — the first occurrence was the
correct fix (adding isinstance(err, str)), but the second occurrence (now
unreachable) made the block fragile. Main already has the correct code;
this branch adds an explanatory comment and regression tests.

The non-tool delegate_task() in a2a_tools.py uses httpx.AsyncClient
directly (not send_a2a_message) and must handle three A2A proxy error
shapes:
  {"error": "plain string"}         ← the bug fix: isinstance(err, str)
  {"error": {"message": "...", ...}} ← pre-existing path
  {"error": {"nested": "object"}}    ← falls through to str(err)

Adds TestDelegateTaskDirect:
  test_string_form_error_returns_error_message  — regression for AttributeError
  test_dict_form_error_returns_error_message    — pre-existing path still works
  test_success_returns_result_text               — happy path still works

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 05:51:48 +00:00
core-be 44b40a442b Merge pull request 'ci: install jq before sop-tier-check script runs' (#391) from infra/jq-install-main into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 6s
2026-05-11 05:47:42 +00:00
core-devops 1f9042688e ci: install jq before sop-tier-check script runs
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
sop-tier-check / tier-check (pull_request) Failing after 7s
audit-force-merge / audit (pull_request) Successful in 6s
Gitea Actions runners (ubuntu-latest) do not bundle jq.
The sop-tier-check script uses jq for all JSON API parsing.
Install jq before the script runs so sop-tier-check can pass.

Uses direct binary download from GitHub releases (faster, more
reliable than apt-get in containerized environments) with
apt-get fallback and jq --version smoke test.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 05:26:03 +00:00
core-be 4542ab0704 Merge pull request '[core-be-agent] fix(security#321): CWE-22 path traversal guards in loadWorkspaceEnv (main-targeted)' (#369) from fix/cwe22-loadWorkspaceEnv-main into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 11s
publish-workspace-server-image / build-and-push (push) Successful in 7m42s
2026-05-11 05:12:46 +00:00
core-be 322beb506e Merge pull request #369 from fix/cwe22-loadWorkspaceEnv-main
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 11s
sop-tier-check / tier-check (pull_request) Manual override for infra#241
audit-force-merge / audit (pull_request) Successful in 14s
2026-05-11 03:59:08 +00:00
core-be f82033a3ca [ci force] force fresh runner
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 15s
sop-tier-check / tier-check (pull_request) Failing after 9s
2026-05-11 03:52:40 +00:00
core-be fd40700c43 [ci skip false-positive] force re-run CI (runner stuck at infra#241)
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
sop-tier-check / tier-check (pull_request) Failing after 6s
2026-05-11 03:48:31 +00:00
core-be 706df19b43 [core-be-agent] fix(security#321): CWE-22 path traversal guards in loadWorkspaceEnv
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 11s
sop-tier-check / tier-check (pull_request) Failing after 11s
Two vulnerable call sites confirmed on origin/main:

1. org_helpers.go:loadWorkspaceEnv (line 101): filesDir from untrusted org YAML
   joined directly with orgBaseDir without traversal guard. A malicious filesDir
   like "../../../etc" escapes the org root and reads arbitrary files.

2. org_import.go:createWorkspaceTree (line 494): same pattern directly in the
   env-loading block — not covered by staging-targeted PR #345.

Fix (both locations): call resolveInsideRoot(orgBaseDir, filesDir) before
filepath.Join. On traversal detection, org_helpers.go returns an empty map
(caller contract); org_import.go silently skips the workspace .env override
(matches existing template-resolution pattern in the same function).

Tests: org_helpers_test.go — 3 cases covering traversal rejection,
workspace-override happy path, and empty filesDir edge case.

Closes: molecule-core#362, molecule-core#321

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 03:34:55 +00:00
infra-sre 108b9a54d9 Merge pull request '[core-be-agent] fix(#354): wire delegation-results consumer into a2a executor' (#358) from fix/354-a2a-delegation-auto-resume into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 3s
publish-runtime-autobump / autobump-and-tag (push) Successful in 31s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 10s
sop-tier-check / tier-check (pull_request) Failing after 11s
audit-force-merge / audit (pull_request) Has been skipped
2026-05-11 02:50:41 +00:00
infra-sre 173a642f9e ci: re-trigger after tier downgrade
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 2s
sop-tier-check / tier-check (pull_request) Successful in 3s
audit-force-merge / audit (pull_request) Successful in 3s
Co-Authored-By: infra-sre
2026-05-11 02:49:32 +00:00
infra-sre 177c4ef18c ci: re-trigger after runner recovery
Co-Authored-By: infra-sre
2026-05-11 02:49:32 +00:00
core-be 99f3cf7c8f [core-be-agent] fix(#354): wire delegation-results consumer into a2a executor
Close the A2A delegation auto-resume gap.

Root cause: heartbeat.py's _check_delegations already writes completed
delegation rows to DELEGATION_RESULTS_FILE and sends a self-message to
wake the agent. executor_helpers.read_delegation_results() was defined to
atomically consume that file, but a2a_executor._core_execute() never
called it — so delegation results were written but the agent never saw
them.

Fix: call read_delegation_results() at the top of _core_execute() and
prepend the results to the user input context so the agent can act on
them without an explicit check_task_status call. The Temporal durable
workflow path is also covered because it calls _core_execute() directly.

Test: two new cases — delegation results injected when file exists;
user input passed through unchanged when file is empty.

Closes molecule-core#354.
2026-05-11 02:49:32 +00:00
infra-sre aed164ed6f Merge pull request 'fix(workspace): push-mode Queued returns delivery_mode="push" (not silent default "poll")' (#356) from runtime/fix-a2a-push-delivery-mode-v2 into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 2s
publish-runtime-autobump / autobump-and-tag (push) Failing after 29s
2026-05-11 02:49:11 +00:00
infra-sre d616381f81 ci: re-trigger after label change
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 3s
sop-tier-check / tier-check (pull_request) Successful in 3s
audit-force-merge / audit (pull_request) Successful in 3s
Co-Authored-By: infra-sre
2026-05-11 02:47:21 +00:00
infra-sre 42b867d764 ci: re-trigger after runner recovery
Co-Authored-By: infra-sre
2026-05-11 02:47:21 +00:00
infra-runtime-be 3eb3609b0c test(workspace): add queue_id-absence and push-vs-poll distinction tests
Incorporates valuable extra coverage from fullstack-engineer's PR #336:
- test_push_queued_missing_queue_id_still_parsed: queue_id is optional,
  absence must not break parsing
- test_push_queued_is_distinct_from_poll_queued: both envelope shapes
  parse correctly and independently, with correct delivery_mode values

Also adds push_queued_no_queue_id fixture and regression gate entry.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 02:47:21 +00:00
infra-runtime-be 0a9b66a3ed fix(workspace): push-mode Queued returns delivery_mode="push" (not silent default "poll")
Bug: a2a_response.py:197 returned Queued(method=method) without passing
delivery_mode, silently defaulting to "poll" for push-mode busy-queue
responses. Callers branching on v.delivery_mode would mis-identify push-mode
responses as poll-mode, causing wrong dispatch logic.

Fix: pass delivery_mode="push" explicitly in the push-mode branch.

Tests: add push_queued_full/notify/no_method fixtures and 4 test cases
asserting delivery_mode="push" for all three envelope shapes. Also add
adversarial {"queued": "yes"} and {"queued": False} → Malformed guards.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 02:47:21 +00:00
infra-sre 8046410eee Merge pull request 'fix(ci): add _sanitize_a2a to TOP_LEVEL_MODULES allowlist (third defect from #351 chain)' (#357) from fix/publish-runtime-add-_sanitize_a2a-to-allowlist into main
publish-workspace-server-image / build-and-push (push) Failing after 3s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 3s
publish-runtime / publish (push) Successful in 2m0s
publish-runtime / cascade (push) Failing after 52s
2026-05-11 02:43:41 +00:00
infra-sre a1ba496926 ci: re-trigger after runner recovery
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
sop-tier-check / tier-check (pull_request) Successful in 4s
audit-force-merge / audit (pull_request) Successful in 3s
Co-Authored-By: infra-sre
2026-05-11 02:41:46 +00:00
hongming ce479e5ced fix(ci): add _sanitize_a2a to TOP_LEVEL_MODULES allowlist (third workflow defect)
sop-tier-check / tier-check (pull_request) Failing after 3s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 3s
Run 5160 publish-runtime build step failed:

  error: TOP_LEVEL_MODULES drifted from workspace/*.py contents:
    in workspace/ but NOT in TOP_LEVEL_MODULES (will ship un-rewritten): ['_sanitize_a2a']
    Edit scripts/build_runtime_package.py:TOP_LEVEL_MODULES to match.

workspace/_sanitize_a2a.py was added recently but the allowlist in
scripts/build_runtime_package.py was not updated. The build script
intentionally aborts (exit 3) when it detects the drift, because
shipping a module un-rewritten breaks the package's flat-layout import
contract.

Fix: add '_sanitize_a2a' to the set. Alphabetical order preserved
(it sorts before 'a2a_*').

Third workflow defect after #353 (workflow_dispatch.inputs parser) and
#355 (Publish step working-directory). After this lands, attempt #4 of
runtime-v0.1.130 should finally succeed.

Refs: #351, #353, #355, #348 Q3
2026-05-10 19:32:58 -07:00
claude-ceo-assistant d293a32593 fix(ci): add missing working-directory to publish-runtime Publish step (#355)
Secret scan / Scan diff for credential-shaped strings (push) Successful in 2s
publish-runtime / publish (push) Failing after 58s
publish-runtime / cascade (push) Has been skipped
2026-05-11 02:30:11 +00:00
infra-sre 1254337f4f ci: re-trigger after runner recovery
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 2s
sop-tier-check / tier-check (pull_request) Successful in 3s
audit-force-merge / audit (pull_request) Successful in 3s
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 02:29:51 +00:00
hongming b026179476 fix(ci): add missing working-directory to publish-runtime Publish step
First-ever publish-runtime.yml dispatch (run 5097 post-#353, 2026-05-11
02:06Z) failed at the twine upload step:

  ERROR InvalidDistribution: Cannot find file (or expand pattern): 'dist/*'

Cause: the Publish step was missing 'working-directory: ${{ runner.temp
}}/runtime-build' while the preceding Build/Verify steps all had it.
Result: twine ran from the workspace checkout dir where dist/ doesn't
exist.

Fix: add working-directory to match the rest of the publish job.

This is the second of three workflow defects exposed by #353 finally
making the workflow run at all:
  1. workflow_dispatch.inputs rejection      → fixed in #353
  2. Publish step missing working-directory  → THIS PR
  3. (anything else surfaced by 0.1.130 attempt #2)

After merge: push runtime-v0.1.130 again (tag was already pushed once
post-#353 but the run failed at publish; need a fresh trigger). Should
finally land 0.1.130 on PyPI.

Refs: #351, #348 Q3, #353
2026-05-11 02:29:51 +00:00
infra-sre 64bb7352ca Merge pull request 'fix(ci): add sqlalchemy>=2.0.0 to pip install step (closes #293)' (#332) from ci/add-sqlalchemy-to-pip-install into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 3s
2026-05-11 02:28:08 +00:00
core-devops 1b6c28ebfa fix(ci): add sqlalchemy>=2.0.0 to pip install step (closes #293)
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 3s
sop-tier-check / tier-check (pull_request) Successful in 2s
audit-force-merge / audit (pull_request) Successful in 3s
test_audit_ledger.py imports sqlalchemy directly (line 42).
Without an explicit sqlalchemy install, pip dependency resolution can
omit it when pytest/pytest-asyncio/pytest-cov are installed as a
separate step after requirements.txt.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 02:26:53 +00:00
infra-sre 98bf294844 Merge pull request 'ci: resolve .github vs .gitea triplicate for publish-runtime/publish-workspace-server-image/secret-scan' (#342) from ci-resolve-github-gitea-triplicate into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 2s
2026-05-11 02:18:59 +00:00
13 changed files with 435 additions and 4 deletions
+8
View File
@@ -139,6 +139,14 @@ jobs:
/tmp/smoke/bin/python "$GITHUB_WORKSPACE/scripts/wheel_smoke.py"
- name: Publish to PyPI
# working-directory matches the preceding Build/Verify steps. Without
# this, twine runs from the default workspace checkout dir where
# `dist/` doesn't exist and fails with:
# ERROR InvalidDistribution: Cannot find file (or expand pattern): 'dist/*'
# Caught on the first-ever successful dispatch of this workflow
# (run 5097, 2026-05-11 02:08Z) — every other step in the publish
# job already had this working-directory; Publish was missing it.
working-directory: ${{ runner.temp }}/runtime-build
env:
# PYPI_TOKEN: repository secret scoped to molecule-ai-workspace-runtime.
# Set via: Settings → Actions → Variables and Secrets → New Secret.
+17
View File
@@ -77,6 +77,23 @@ jobs:
# works if we never check out PR HEAD. Same SHA the workflow
# itself was loaded from.
ref: ${{ github.event.pull_request.base.sha }}
- name: Install jq
# Gitea Actions runners (ubuntu-latest label) do not bundle jq.
# The sop-tier-check script uses jq for all JSON API parsing.
# Install jq before the script runs so sop-tier-check can pass.
#
# Method: download binary directly from GitHub releases (faster and
# more reliable than apt-get in containerized environments). Falls
# back to apt-get if the download fails. The smoke test confirms
# jq is on PATH before the main script runs.
run: |
set -e
timeout 60 curl -sSL \
"https://github.com/jqlang/jq/releases/download/jq-1.7.1/jq-linux-amd64" \
-o /usr/local/bin/jq && chmod +x /usr/local/bin/jq \
|| apt-get update -qq && apt-get install -y -qq jq
jq --version
- name: Verify tier label + reviewer team membership
env:
# SOP_TIER_CHECK_TOKEN is the org-level secret for the
+1 -1
View File
@@ -365,7 +365,7 @@ jobs:
cache: pip
cache-dependency-path: workspace/requirements.txt
- if: needs.changes.outputs.python == 'true'
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov sqlalchemy>=2.0.0
# Coverage flags + fail-under floor moved into workspace/pytest.ini
# (issue #1817) so local `pytest` and CI use identical config.
- if: needs.changes.outputs.python == 'true'
+1
View File
@@ -50,6 +50,7 @@ from pathlib import Path
# without updating this set), which broke every workspace startup with
# `ModuleNotFoundError: No module named 'transcript_auth'`.
TOP_LEVEL_MODULES = {
"_sanitize_a2a",
"a2a_cli",
"a2a_client",
"a2a_executor",
@@ -91,6 +91,10 @@ func expandWithEnv(s string, env map[string]string) string {
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
// (workspace overrides org root). Used by both secret injection and channel
// config expansion.
//
// SECURITY: filesDir is sourced from untrusted org YAML input (ws.FilesDir).
// resolveInsideRoot guard prevents path traversal (CWE-22) where a malicious
// filesDir like "../../../etc" could escape the org root.
func loadWorkspaceEnv(orgBaseDir, filesDir string) map[string]string {
envVars := map[string]string{}
if orgBaseDir == "" {
@@ -98,7 +102,14 @@ func loadWorkspaceEnv(orgBaseDir, filesDir string) map[string]string {
}
parseEnvFile(filepath.Join(orgBaseDir, ".env"), envVars)
if filesDir != "" {
parseEnvFile(filepath.Join(orgBaseDir, filesDir, ".env"), envVars)
safeFilesDir, err := resolveInsideRoot(orgBaseDir, filesDir)
if err != nil {
// Reject traversal attempt silently — callers expect an empty map
// on any read failure.
log.Printf("loadWorkspaceEnv: rejecting filesDir %q: %v", filesDir, err)
return envVars
}
parseEnvFile(filepath.Join(safeFilesDir, ".env"), envVars)
}
return envVars
}
@@ -0,0 +1,104 @@
package handlers
import (
"os"
"path/filepath"
"testing"
)
// TestLoadWorkspaceEnv_RejectsTraversal asserts that loadWorkspaceEnv refuses
// to read workspace-specific .env files when filesDir contains CWE-22 traversal
// patterns (../../../etc, absolute paths, etc.). This is the primary security
// control for the ws.FilesDir attack surface in POST /org/import.
func TestLoadWorkspaceEnv_RejectsTraversal(t *testing.T) {
tmp := t.TempDir()
orgRoot := filepath.Join(tmp, "my-org")
if err := os.Mkdir(orgRoot, 0o755); err != nil {
t.Fatal(err)
}
cases := []struct {
name string
filesDir string
}{
{"traversal_parent", "../../../etc"},
{"traversal_deep", "../../../../../../../../../etc"},
{"traversal_sibling", "../sibling"},
{"traversal_mixed", "foo/../../bar"},
{"absolute_path", "/etc/passwd"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Write an org-level .env to confirm it loads even when the
// workspace .env is rejected.
orgEnv := filepath.Join(orgRoot, ".env")
if err := os.WriteFile(orgEnv, []byte("ORG_KEY=org-value\n"), 0o644); err != nil {
t.Fatal(err)
}
got := loadWorkspaceEnv(orgRoot, tc.filesDir)
// Org-level .env must be loaded regardless of workspace rejection.
if got["ORG_KEY"] != "org-value" {
t.Errorf("org-level .env not loaded: got %v", got)
}
// Traversal path must NOT have been read.
if val, ok := got["TRAVERSAL_KEY"]; ok {
t.Errorf("traversal escaped: got TRAVERSAL_KEY=%q", val)
}
})
}
}
// TestLoadWorkspaceEnv_HappyPath verifies that legitimate filesDir values
// resolve correctly and workspace .env overrides org-level values.
func TestLoadWorkspaceEnv_HappyPath(t *testing.T) {
tmp := t.TempDir()
orgRoot := filepath.Join(tmp, "my-org")
wsDir := filepath.Join(orgRoot, "workspaces", "dev-workspace")
if err := os.MkdirAll(wsDir, 0o755); err != nil {
t.Fatal(err)
}
orgEnv := filepath.Join(orgRoot, ".env")
wsEnv := filepath.Join(wsDir, ".env")
if err := os.WriteFile(orgEnv, []byte("ORG_KEY=org-val\nSHARED=org-wins\n"), 0o644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(wsEnv, []byte("WS_KEY=ws-val\nSHARED=ws-wins\n"), 0o644); err != nil {
t.Fatal(err)
}
got := loadWorkspaceEnv(orgRoot, filepath.Join("workspaces", "dev-workspace"))
if got["ORG_KEY"] != "org-val" {
t.Errorf("org-level key missing: %v", got)
}
if got["WS_KEY"] != "ws-val" {
t.Errorf("workspace key missing: %v", got)
}
if got["SHARED"] != "ws-wins" {
t.Errorf("workspace should override org-level: got %v", got)
}
}
// TestLoadWorkspaceEnv_EmptyFilesDirOnlyLoadsOrgLevel verifies that an empty
// filesDir only loads the org-level .env (no workspace override).
func TestLoadWorkspaceEnv_EmptyFilesDir(t *testing.T) {
tmp := t.TempDir()
orgRoot := filepath.Join(tmp, "my-org")
if err := os.Mkdir(orgRoot, 0o755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(orgRoot, ".env"), []byte("KEY=only-org\n"), 0o644); err != nil {
t.Fatal(err)
}
got := loadWorkspaceEnv(orgRoot, "")
if got["KEY"] != "only-org" {
t.Errorf("expected only-org, got %v", got)
}
}
@@ -490,8 +490,13 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
// 1. Org root .env (shared defaults)
parseEnvFile(filepath.Join(orgBaseDir, ".env"), envVars)
// 2. Workspace-specific .env (overrides)
// SECURITY: ws.FilesDir is untrusted YAML input — guard against CWE-22
// traversal so a crafted filesDir like "../../../etc" cannot escape orgBaseDir.
if ws.FilesDir != "" {
parseEnvFile(filepath.Join(orgBaseDir, ws.FilesDir, ".env"), envVars)
if safeFilesDir, err := resolveInsideRoot(orgBaseDir, ws.FilesDir); err == nil {
parseEnvFile(filepath.Join(safeFilesDir, ".env"), envVars)
}
// Traversal rejection: silently skip — callers expect partial env on failure.
}
}
// Store as workspace secrets via DB (encrypted if key is set, raw otherwise)
+12
View File
@@ -51,6 +51,7 @@ from shared_runtime import (
from executor_helpers import (
collect_outbound_files,
extract_attached_files,
read_delegation_results,
)
from builtin_tools.telemetry import (
A2A_TASK_ID,
@@ -215,6 +216,17 @@ class LangGraphA2AExecutor(AgentExecutor):
3. Message(final_text) — terminal event
"""
user_input = extract_message_text(context)
# Inject delegation results from prior turns. Heartbeat writes
# completed delegation rows to DELEGATION_RESULTS_FILE and sends
# a self-message to wake the agent; this consumes the file and
# surfaces the results as context so the agent can act on them
# without needing an explicit check_task_status call.
# Results are prepended so they are visible even when the
# self-message text is overwritten by a subsequent user message.
pending_results = read_delegation_results()
if pending_results:
logger.info("A2A execute: injecting %d delegation result(s)", pending_results.count("\n") + 1)
user_input = f"[Delegation results available]\n{pending_results}\n\n{user_input}"
# Pull attached files from A2A message parts (kind: "file") and
# append a manifest to the prompt so the agent knows they exist.
# LangGraph tools (filesystem, bash, skills) can then open the
+1 -1
View File
@@ -194,7 +194,7 @@ def parse(data: Any) -> Variant:
method,
data.get("queue_id", "?"),
)
return Queued(method=method)
return Queued(method=method, delivery_mode="push")
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is
+2
View File
@@ -77,6 +77,8 @@ async def delegate_task(workspace_id: str, task: str) -> str:
return str(result) if isinstance(result, str) else "(no text)"
elif "error" in data:
err = data["error"]
# Handle both string-form errors ("error": "some string")
# and object-form errors ("error": {"message": "...", "code": ...}).
msg = ""
if isinstance(err, dict):
msg = err.get("message", "")
+91
View File
@@ -1201,3 +1201,94 @@ async def test_terminal_error_routes_via_updater_failed():
assert not eq._complete_calls, (
"complete() should not fire when execute() raises"
)
# ---------------------------------------------------------------------------
# Issue #354 — delegation results auto-resume gap
# ---------------------------------------------------------------------------
# heartbeat.py's _check_delegations writes completed delegation rows to
# DELEGATION_RESULTS_FILE and sends a self-message to wake the agent.
# read_delegation_results() in executor_helpers.py atomically reads+consumes
# that file. The fix wires this consumer into _core_execute so the agent
# receives delegation results as context in the next turn — closing the gap
# where parallel delegate_task calls return after the SDK turn ends and the
# agent has no way to discover the results.
@pytest.mark.asyncio
async def test_delegation_results_injected_into_user_input(monkeypatch):
"""When delegation results exist, they are prepended to the user input
passed to the agent so the agent can act on them without an explicit
check_task_status call."""
import a2a_executor
from unittest.mock import patch
pending_results = (
"- [completed] Delegation abc123: Checked 3 issues\n"
" Response: 3 open, 0 critical\n"
"- [failed] Delegation def456: Scan PR #352\n"
" Error: peer workspace offline"
)
# Patch read_delegation_results at the module level where a2a_executor
# imported it so the _core_execute call picks it up.
with patch.object(a2a_executor, "read_delegation_results", return_value=pending_results):
agent = MagicMock()
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Got it")))
executor = LangGraphA2AExecutor(agent)
part = MagicMock()
part.text = "What's the status?"
context = _make_context([part], "ctx-deleg", task_id="task-deleg")
eq = _make_event_queue()
eq._complete_calls = []
eq._failed_calls = []
await executor.execute(context, eq)
# Verify the agent received the injected context
agent.astream_events.assert_called_once()
call_args = agent.astream_events.call_args
messages = call_args[0][0]["messages"]
# The last message should be a human turn with the injected context
human_turn = messages[-1]
assert human_turn[0] == "human"
# Must contain the delegation results marker
assert "[Delegation results available]" in human_turn[1]
# Must contain the completed delegation
assert "abc123" in human_turn[1]
assert "3 open" in human_turn[1]
# Must contain the failed delegation
assert "def456" in human_turn[1]
# Must contain the original user message
assert "What's the status?" in human_turn[1]
@pytest.mark.asyncio
async def test_no_delegation_results_no_injection(monkeypatch):
"""When no delegation results exist, user input is passed through unchanged."""
import a2a_executor
from unittest.mock import patch
with patch.object(a2a_executor, "read_delegation_results", return_value=""):
agent = MagicMock()
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
executor = LangGraphA2AExecutor(agent)
part = MagicMock()
part.text = "Hello"
context = _make_context([part], "ctx-clean", task_id="task-clean")
eq = _make_event_queue()
eq._complete_calls = []
eq._failed_calls = []
await executor.execute(context, eq)
agent.astream_events.assert_called_once()
call_args = agent.astream_events.call_args
messages = call_args[0][0]["messages"]
human_turn = messages[-1]
assert human_turn[0] == "human"
# Must NOT contain the injection marker
assert "[Delegation results available]" not in human_turn[1]
assert human_turn[1] == "Hello"
+81
View File
@@ -105,6 +105,27 @@ _FIXTURES = {
"status": "queued",
"delivery_mode": "poll",
},
# Push-mode queue envelope: returned when a push-mode workspace is at
# capacity. The platform queues the request and returns
# {queued: true, message: "...", queue_id: "..."}. The ``delivery_mode``
# field is not present in this envelope (distinguishes it from poll-mode).
"push_queued_full": {
"queued": True,
"method": "message/send",
"queue_id": "q-abc-123",
},
"push_queued_notify": {
"queued": True,
"method": "notify",
},
"push_queued_no_method": {
"queued": True,
},
"push_queued_no_queue_id": {
# queue_id is purely informational — parser must not raise on its absence.
"queued": True,
"method": "message/send",
},
"malformed_empty_dict": {},
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
"malformed_status_queued_no_delivery_mode": {
@@ -159,6 +180,62 @@ class TestQueuedVariant:
a2a_response.parse(_FIXTURES["poll_queued_full"])
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
# --- Push-mode queue (handleA2ADispatchError → EnqueueA2A → 202 {queued: true}) ---
def test_push_queued_full_returns_queued_with_delivery_mode_push(self):
# The push-mode path must set delivery_mode="push", not silently default to "poll".
# Callers that branch on v.delivery_mode will mis-route poll-mode responses
# as push-mode (and vice versa) if this field is wrong.
v = a2a_response.parse(_FIXTURES["push_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_notify(self):
v = a2a_response.parse(_FIXTURES["push_queued_notify"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "notify"
assert v.delivery_mode == "push"
def test_push_queued_missing_method_defaults_to_message_send(self):
# Push-mode servers should always send method, but we handle absence gracefully.
v = a2a_response.parse(_FIXTURES["push_queued_no_method"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_missing_queue_id_still_parsed(self):
# queue_id is purely informational — its absence must not break parsing.
v = a2a_response.parse(_FIXTURES["push_queued_no_queue_id"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_is_distinct_from_poll_queued(self):
# Both paths return Queued, but from different wire envelopes.
# Verify both parse correctly and are independent.
push_v = a2a_response.parse(_FIXTURES["push_queued_full"])
poll_v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(push_v, a2a_response.Queued)
assert isinstance(poll_v, a2a_response.Queued)
assert push_v.method == poll_v.method == "message/send"
assert push_v.delivery_mode == "push"
assert poll_v.delivery_mode == "poll"
def test_push_queued_logs_queue_id(self, caplog):
with caplog.at_level(logging.INFO, logger="a2a_response"):
a2a_response.parse(_FIXTURES["push_queued_full"])
assert any("q-abc-123" in r.message for r in caplog.records)
def test_queued_string_yes_is_malformed_not_push_queued(self):
# ``{"queued": "yes"}`` is not True, so it must NOT enter the push branch.
v = a2a_response.parse({"queued": "yes"})
assert isinstance(v, a2a_response.Malformed)
def test_queued_false_is_malformed(self):
v = a2a_response.parse({"queued": False})
assert isinstance(v, a2a_response.Malformed)
class TestResultVariant:
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
@@ -436,6 +513,10 @@ class TestRegressionGate:
"poll_queued_full": a2a_response.Queued,
"poll_queued_notify": a2a_response.Queued,
"poll_queued_no_method": a2a_response.Queued,
"push_queued_full": a2a_response.Queued,
"push_queued_notify": a2a_response.Queued,
"push_queued_no_method": a2a_response.Queued,
"push_queued_no_queue_id": a2a_response.Queued,
"malformed_empty_dict": a2a_response.Malformed,
"malformed_unexpected_keys": a2a_response.Malformed,
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
+99
View File
@@ -326,6 +326,105 @@ class TestToolDelegateTask:
assert a2a_tools._peer_names.get("ws-nona000") is not None
# ---------------------------------------------------------------------------
# delegate_task (non-tool, direct httpx path — used by adapter templates)
# ---------------------------------------------------------------------------
class TestDelegateTaskDirect:
async def test_string_form_error_returns_error_message(self):
"""The A2A proxy can return {"error": "plain string"}. Must not raise
AttributeError: 'str' object has no attribute 'get'."""
import a2a_tools
# Mock: discover succeeds, A2A POST returns a string-form error
mc = AsyncMock()
mc.__aenter__ = AsyncMock(return_value=mc)
mc.__aexit__ = AsyncMock(return_value=False)
async def fake_post(url, **kwargs):
r = MagicMock()
r.status_code = 200
r.json = MagicMock(return_value={"error": "peer workspace unreachable"})
return r
async def fake_get(url, **kwargs):
r = MagicMock()
r.status_code = 200
r.json = MagicMock(return_value={"url": "http://peer.svc/a2a"})
return r
mc.post = fake_post
mc.get = fake_get
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.delegate_task("ws-peer-123", "do a thing")
assert "Error" in result
assert "peer workspace unreachable" in result
async def test_dict_form_error_returns_error_message(self):
"""{"error": {"message": "...", "code": ...}} — the pre-existing path."""
import a2a_tools
mc = AsyncMock()
mc.__aenter__ = AsyncMock(return_value=mc)
mc.__aexit__ = AsyncMock(return_value=False)
async def fake_post(url, **kwargs):
r = MagicMock()
r.status_code = 200
r.json = MagicMock(return_value={"error": {"message": "internal server error", "code": 500}})
return r
async def fake_get(url, **kwargs):
r = MagicMock()
r.status_code = 200
r.json = MagicMock(return_value={"url": "http://peer.svc/a2a"})
return r
mc.post = fake_post
mc.get = fake_get
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.delegate_task("ws-peer-456", "do a thing")
assert "Error" in result
assert "internal server error" in result
async def test_success_returns_result_text(self):
"""Happy path: result with parts returns the first text part."""
import a2a_tools
mc = AsyncMock()
mc.__aenter__ = AsyncMock(return_value=mc)
mc.__aexit__ = AsyncMock(return_value=False)
async def fake_post(url, **kwargs):
r = MagicMock()
r.status_code = 200
r.json = MagicMock(return_value={
"result": {
"parts": [{"kind": "text", "text": "Task done!"}]
}
})
return r
async def fake_get(url, **kwargs):
r = MagicMock()
r.status_code = 200
r.json = MagicMock(return_value={"url": "http://peer.svc/a2a"})
return r
mc.post = fake_post
mc.get = fake_get
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.delegate_task("ws-peer-789", "do a thing")
assert result == "Task done!"
# ---------------------------------------------------------------------------
# tool_delegate_task_async
# ---------------------------------------------------------------------------