diff --git a/.github/workflows/continuous-synth-e2e.yml b/.github/workflows/continuous-synth-e2e.yml new file mode 100644 index 00000000..e477214a --- /dev/null +++ b/.github/workflows/continuous-synth-e2e.yml @@ -0,0 +1,160 @@ +name: Continuous synthetic E2E (staging) + +# Hard gate (#2342): cron-driven full-lifecycle E2E that catches +# regressions visible only at runtime — schema drift, deployment-pipeline +# gaps, vendor outages, env-var rotations, DNS / CF / Railway side-effects. +# +# Why this gate exists: +# PR-time CI catches code-level regressions but not deployment-time or +# integration-time ones. Today's empirical data: +# • #2345 (A2A v0.2 silent drop) — passed all unit tests, broke at +# JSON-RPC parse layer between sender and receiver. Visible only +# to a sender exercising the full path. +# • RFC #2312 chat upload — landed on staging-branch but never +# reached staging tenants because publish-workspace-server-image +# was main-only. Caught by manual dogfooding hours after deploy. +# Both would have surfaced within 15-20 min of regression if a +# continuous synth-E2E was running. +# +# Cadence: every 20 min (3x/hour). The script is conservatively +# bounded at 10 min wall-clock; even on degraded staging it should +# finish before the next firing. cron-overlap is guarded by the +# concurrency group below. +# +# Cost: ~3 runs/hour × 5-10 min × $0.008/min GHA = ~$0.50-$1/day. +# Plus a fresh tenant provisioned + torn down each run (Railway + +# AWS pennies). Negligible. +# +# Failure handling: when the run fails, the workflow exits non-zero +# and GitHub's standard email/notification path fires. Operators +# can subscribe to this workflow's failure channel for paging-grade +# alerting. + +on: + schedule: + # Every 20 minutes, on the :00 :20 :40. Offsets the existing :15 + # sweep-cf-orphans and :45 sweep-cf-tunnels so the three + # operations don't all hit Cloudflare/AWS at the same minute. + - cron: '0,20,40 * * * *' + workflow_dispatch: + inputs: + runtime: + description: "Runtime to provision (langgraph = fastest, default; hermes = slower but covers SDK-native path; claude-code = needs OAUTH token in tenant env)" + required: false + default: "langgraph" + type: string + keep_org: + description: "Skip teardown for post-mortem debugging (only manual dispatch — never set this for cron runs)" + required: false + default: false + type: boolean + +permissions: + contents: read + # No issue-write here — failures surface as red runs in the workflow + # history. If you want auto-issue-on-fail, add a follow-up step that + # uses gh issue create gated on `if: failure()`. Keeping the surface + # minimal until that's actually wanted. + +# Serialize so two firings can never overlap. Cron firing every 20 min +# but scripts conservatively bounded at 10 min — overlap shouldn't +# happen in steady state, but if a run hangs we don't want N more +# stacking up. +concurrency: + group: continuous-synth-e2e + cancel-in-progress: false + +jobs: + synth: + name: Synthetic E2E against staging + runs-on: ubuntu-latest + timeout-minutes: 12 + env: + # langgraph default keeps cold-start under 5 min on staging EC2. + # hermes is slower (~7-10 min) and isn't needed for the + # regression class this gate exists to catch (deployment-pipeline + # + schema-drift + integration). Operators can pick hermes via + # workflow_dispatch when they need to exercise the SDK-native + # session path. + E2E_RUNTIME: ${{ github.event.inputs.runtime || 'langgraph' }} + # Bound to 10 min so a stuck provision fails the run instead of + # holding up the next cron firing. 15-min default in the script + # is for the on-PR full lifecycle where we have more headroom. + E2E_PROVISION_TIMEOUT_SECS: '600' + # Slug suffix — namespaced "synth-" so these runs are + # distinguishable from PR-driven runs in CP admin. + E2E_RUN_ID: synth-${{ github.run_id }} + # Forced false for cron; respected for manual dispatch + E2E_KEEP_ORG: ${{ github.event.inputs.keep_org == 'true' && '1' || '' }} + MOLECULE_CP_URL: ${{ vars.STAGING_CP_URL || 'https://staging-api.moleculesai.app' }} + MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }} + steps: + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + + - name: Verify required secret present + run: | + # Schedule-vs-dispatch hardening (mirrors the sweep-cf-* and + # redeploy-tenants-on-* workflows): hard-fail on missing secret + # for cron firing so a misconfigured-repo doesn't silently + # report green while doing nothing. Soft-skip on operator + # dispatch — operators can dispatch ad-hoc to verify a fix + # without setting up the secret first. + if [ -z "${MOLECULE_ADMIN_TOKEN:-}" ]; then + if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then + echo "::warning::CP_STAGING_ADMIN_API_TOKEN not set — synth E2E cannot run" + echo "::warning::Set it at Settings → Secrets and Variables → Actions" + exit 0 + fi + echo "::error::CP_STAGING_ADMIN_API_TOKEN secret missing — synth E2E cannot run" + echo "::error::Set it at Settings → Secrets and Variables → Actions; pull from staging-CP's CP_ADMIN_API_TOKEN env in Railway." + exit 1 + fi + + - name: Install required tools + run: | + # The script depends on jq + curl (already on ubuntu-latest) + # and python3 (likewise). Verify they're all present so we + # fail fast on a runner image regression rather than mid-script. + for cmd in jq curl python3; do + command -v "$cmd" >/dev/null 2>&1 || { + echo "::error::required tool '$cmd' not on PATH — runner image regression?" + exit 1 + } + done + + - name: Run synthetic E2E + # The script handles its own teardown via EXIT trap; even on + # failure (timeout, assertion), the org is deprovisioned and + # leaks are reported. Exit code propagates from the script. + run: | + bash tests/e2e/test_staging_full_saas.sh + + - name: Failure summary + # Runs only on failure. Adds a job summary so the workflow run + # page shows a quick "what happened" instead of forcing readers + # to scroll through script output. + if: failure() + run: | + { + echo "## Continuous synth E2E failed" + echo "" + echo "**Run ID:** ${{ github.run_id }}" + echo "**Trigger:** ${{ github.event_name }}" + echo "**Runtime:** ${E2E_RUNTIME}" + echo "**Slug:** synth-${{ github.run_id }}" + echo "" + echo "### What this means" + echo "" + echo "Staging just regressed on a path that previously worked. Likely classes:" + echo "- Schema mismatch between sender and receiver (#2345 class)" + echo "- Deployment-pipeline gap (RFC #2312 / staging-tenant-image-stale class)" + echo "- Vendor outage (Cloudflare, Railway, AWS, GHCR)" + echo "- Staging-CP env var rotation" + echo "" + echo "### Next steps" + echo "" + echo "1. Check the script output above for the assertion that failed" + echo "2. If it's a vendor outage, no action needed — next firing in ~20 min" + echo "3. If it's a code regression, find the causing PR via \`git log\` against last green run and revert/fix" + echo "4. Keep an eye on the next 1-2 firings — flake vs persistent fail differs in priority" + } >> "$GITHUB_STEP_SUMMARY" diff --git a/workspace-server/internal/db/architecture_test.go b/workspace-server/internal/db/architecture_test.go new file mode 100644 index 00000000..d585db62 --- /dev/null +++ b/workspace-server/internal/db/architecture_test.go @@ -0,0 +1,63 @@ +package db_test + +// Architecture test (#2344): db is a leaf — DB pool + migrations + raw +// SQL helpers, no business-logic dependencies. The DB layer must be +// testable with sqlmock in isolation. If db starts importing handlers +// or provisioner, every db unit test would need to bring up that +// subsystem, and the layering becomes circular. +// +// If this test fails: you put business logic in the db package. Move +// it to a higher-tier package that imports db, not the reverse. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +func TestDBHasNoInternalDependencies(t *testing.T) { + t.Parallel() + for path, file := range listImports(t, ".") { + if strings.HasPrefix(path, moduleInternalPrefix) { + t.Errorf( + "db must not import other internal packages "+ + "(found %q in %s) — db is the foundation layer and a "+ + "reverse dep creates a cycle (everything imports db). "+ + "See workspace-server/internal/db/architecture_test.go.", + path, file, + ) + } + } +} + +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 4a7c8026..3389cb96 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -21,6 +21,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" @@ -305,17 +306,54 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri return 0, nil, proxyErr } - agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) - if proxyErr != nil { - return 0, nil, proxyErr - } - + // Normalize the JSON-RPC envelope BEFORE the poll-mode short-circuit + // so the activity_logs entry carries the protocol method name (initialize, + // message/send, etc.) — the polling agent uses that to dispatch the + // request body to the right handler. Doing it here also means a + // malformed payload fails the same way for push and poll callers + // (consistent 400 instead of "queued garbage"). normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body) if proxyErr != nil { return 0, nil, proxyErr } body = normalizedBody + // #2339 PR 2 — poll-mode short-circuit. When the target workspace + // is registered as delivery_mode=poll (e.g. an operator's laptop + // running molecule-mcp-claude-channel), the platform does NOT + // dispatch over HTTP — the agent has no public URL. Instead we record + // the A2A request to activity_logs and the agent picks it up via + // GET /activity?since_id= (PR 3). + // + // Returning here means we skip resolveAgentURL entirely (no SSRF check + // needed — there's no URL to validate; no DNS lookup against potentially- + // changing operator-side IPs) and skip the dispatch path completely + // (no Do(), no maybeMarkContainerDead). The response is a synthetic + // {status:"queued"} envelope so the caller (canvas, another workspace) + // knows delivery is acknowledged but pending consumption. + if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll { + if logActivity { + h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod) + } + respBody, marshalErr := json.Marshal(gin.H{ + "status": "queued", + "delivery_mode": models.DeliveryModePoll, + "method": a2aMethod, + }) + if marshalErr != nil { + return 0, nil, &proxyA2AError{ + Status: http.StatusInternalServerError, + Response: gin.H{"error": "failed to marshal poll-mode response"}, + } + } + return http.StatusOK, respBody, nil + } + + agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) + if proxyErr != nil { + return 0, nil, proxyErr + } + startTime := time.Now() resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { @@ -486,11 +524,54 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { } // Ensure params.message.messageId exists (required by a2a-sdk) + // AND v0.2→v0.3 compat (#2345): when sender supplies + // params.message.content (v0.2) instead of params.message.parts + // (v0.3), wrap the content as a single text Part so the downstream + // a2a-sdk's v0.3 Pydantic validator accepts the message. + // + // Pre-fix: Design Director silently dropped briefs whose sender + // used v0.2 shape — Pydantic rejected at parse time, the rejection + // went only to logs, and the sender saw a happy 200/202. + // + // Reject loud (HTTP 400) when neither content nor parts is present; + // previously the SDK's own rejection happened post-handler-dispatch + // and was invisible to the original sender. if params, ok := payload["params"].(map[string]interface{}); ok { if msg, ok := params["message"].(map[string]interface{}); ok { if _, hasID := msg["messageId"]; !hasID { msg["messageId"] = uuid.New().String() } + _, hasParts := msg["parts"] + rawContent, hasContent := msg["content"] + if !hasParts { + if hasContent { + switch v := rawContent.(type) { + case string: + msg["parts"] = []interface{}{ + map[string]interface{}{"kind": "text", "text": v}, + } + case []interface{}: + msg["parts"] = v + default: + return nil, "", &proxyA2AError{ + Status: http.StatusBadRequest, + Response: gin.H{ + "error": "invalid params.message.content type", + "hint": "content must be a string (v0.2 compat) or omitted in favour of parts (v0.3)", + }, + } + } + delete(msg, "content") + } else { + return nil, "", &proxyA2AError{ + Status: http.StatusBadRequest, + Response: gin.H{ + "error": "params.message must contain either 'parts' (v0.3) or 'content' (v0.2 compat)", + "hint": "v0.3 example: {\"parts\":[{\"kind\":\"text\",\"text\":\"...\"}]}", + }, + } + } + } } } diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 3e67667f..d0ccea86 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -5,6 +5,7 @@ package handlers import ( "context" + "database/sql" "encoding/json" "errors" "log" @@ -13,6 +14,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" ) @@ -376,6 +378,74 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) { return 0, 0 } +// lookupDeliveryMode returns the workspace's delivery_mode. On any DB +// error or missing row it returns DeliveryModePush — the fail-closed +// default. "Closed" here means "fall back to today's behavior (synchronous +// dispatch)" rather than "fall back to drop the request silently into +// activity_logs where the agent might never see it." A poll-mode workspace +// that briefly reads as push will get its A2A request dispatched to the +// stored URL (or a 502 if no URL); a push-mode workspace that briefly +// reads as poll would get its request silently queued with no dispatch. +// The first failure is loud + recoverable; the second is silent. +// +// The function is intentionally lookup-only — it never mutates the row. +// The register handler (registry.go) is the only writer for delivery_mode. +// +// See #2339 PR 1 for the column + register-flow side; this is the +// proxy-side read used for the short-circuit in proxyA2ARequest. +func lookupDeliveryMode(ctx context.Context, workspaceID string) string { + var mode sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&mode) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err) + } + return models.DeliveryModePush + } + if !mode.Valid || mode.String == "" { + return models.DeliveryModePush + } + if !models.IsValidDeliveryMode(mode.String) { + log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String) + return models.DeliveryModePush + } + return mode.String +} + +// logA2AReceiveQueued records a poll-mode "queued" A2A receive into +// activity_logs. Same shape as logA2ASuccess but without ResponseBody +// (there is no response yet — the polling agent will produce one when +// it picks the request up). status="ok" because the request was +// successfully queued; the consume side reports its own outcome. +// +// The activity_logs row is what the polling agent's GET /activity?since_id= +// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A +// without a public URL. +func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) { + var wsName string + db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (queued for poll)" + go func(parent context.Context) { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + Status: "ok", + }) + }(ctx) +} + // readUsageMap extracts input_tokens / output_tokens from the "usage" key of m. // Returns (0, 0, false) when the key is absent or contains no non-zero values. func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 1a33a866..9d0b6e28 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -11,6 +11,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "time" @@ -1137,7 +1138,10 @@ func TestNormalizeA2APayload_PreservesExistingMessageId(t *testing.T) { } func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) { - raw := []byte(`{"params":{"message":{"role":"user"}}}`) + // Method extraction returns empty string when method is absent, + // regardless of message validity. Include parts: [] so the v0.2→v0.3 + // compat check (#2345) doesn't reject before method extraction. + raw := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`) _, method, perr := normalizeA2APayload(raw) if perr != nil { t.Fatalf("unexpected error: %+v", perr) @@ -1147,6 +1151,102 @@ func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) { } } +// --- v0.2 → v0.3 compat shim (#2345) --- + +func TestNormalizeA2APayload_ConvertsV02StringContentToParts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":"hello world"}}}`) + out, _, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error: %+v", perr) + } + var parsed map[string]interface{} + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatalf("output not valid JSON: %v", err) + } + msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{}) + if _, stillHasContent := msg["content"]; stillHasContent { + t.Error("v0.2 'content' field should be removed after conversion") + } + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) != 1 { + t.Fatalf("expected 1 part, got %v", msg["parts"]) + } + part := parts[0].(map[string]interface{}) + if part["kind"] != "text" || part["text"] != "hello world" { + t.Errorf("expected {kind:text, text:'hello world'}, got %v", part) + } +} + +func TestNormalizeA2APayload_ConvertsV02ListContentToParts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":[{"kind":"text","text":"hi"}]}}}`) + out, _, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error: %+v", perr) + } + var parsed map[string]interface{} + _ = json.Unmarshal(out, &parsed) + msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{}) + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) != 1 { + t.Fatalf("expected list preserved as parts, got %v", msg["parts"]) + } +} + +func TestNormalizeA2APayload_PreservesV03Parts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"hi"}]}}}`) + out, _, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error: %+v", perr) + } + var parsed map[string]interface{} + _ = json.Unmarshal(out, &parsed) + msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{}) + if _, hasContent := msg["content"]; hasContent { + t.Error("did not expect content field in v0.3-shaped payload output") + } + parts := msg["parts"].([]interface{}) + if len(parts) != 1 { + t.Errorf("expected 1 part preserved, got %d", len(parts)) + } +} + +func TestNormalizeA2APayload_RejectsMessageWithNeitherContentNorParts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","metadata":{}}}}`) + _, _, perr := normalizeA2APayload(raw) + if perr == nil { + t.Fatal("expected error for message with neither content nor parts") + } + if perr.Status != http.StatusBadRequest { + t.Errorf("expected 400, got %d", perr.Status) + } + errMsg, _ := perr.Response["error"].(string) + if !strings.Contains(errMsg, "parts") || !strings.Contains(errMsg, "content") { + t.Errorf("error message should mention both 'parts' and 'content', got: %q", errMsg) + } +} + +func TestNormalizeA2APayload_RejectsContentWithUnsupportedType(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":42}}}`) + _, _, perr := normalizeA2APayload(raw) + if perr == nil { + t.Fatal("expected error for non-string non-list content") + } + if perr.Status != http.StatusBadRequest { + t.Errorf("expected 400, got %d", perr.Status) + } +} + +func TestNormalizeA2APayload_NoMessageNoCheck(t *testing.T) { + raw := []byte(`{"method":"tasks/list","params":{}}`) + _, method, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error on params-message-absent payload: %+v", perr) + } + if method != "tasks/list" { + t.Errorf("expected method=tasks/list, got %q", method) + } +} + // --- resolveAgentURL direct unit tests --- func TestResolveAgentURL_CacheHit(t *testing.T) { @@ -1704,3 +1804,185 @@ func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) { t.Errorf("unmet DB expectations: %v", err) } } + +// ==================== ProxyA2A — poll-mode short-circuit (#2339 PR 2) ==================== + +// TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch verifies the core +// invariant of #2339 PR 2: when delivery_mode=poll, ProxyA2A must NOT +// hit resolveAgentURL (which would SSRF-check or 502 on a missing URL) +// and must NOT dispatch over HTTP. It records the request to activity_logs +// and returns 200 {status:"queued"} instead. +// +// Without this short-circuit, the canvas chat fails for any workspace +// running molecule-mcp-claude-channel (operator's laptop, no public URL): +// resolveAgentURL would 502 on the missing URL and the polling agent +// would never see the inbound message. That's the bug PR 2 fixes. +func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-poll-shortcircuit" + + // Budget check still runs (above the short-circuit) — affirms the + // budget guard is mode-agnostic, which is correct: a poll-mode + // workspace shouldn't burn unmetered platform CPU/storage either. + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode SELECT — returns poll, triggering the short-circuit. + // Note: NO ExpectQuery for `SELECT url, status FROM workspaces` (that's + // resolveAgentURL's query) — the short-circuit must skip resolveAgentURL. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll")) + + // Activity log: the queued receive (logA2AReceiveQueued in helpers.go). + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"poll-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["status"] != "queued" { + t.Errorf("response.status = %v, want %q", resp["status"], "queued") + } + if resp["delivery_mode"] != "poll" { + t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll") + } + if resp["method"] != "message/send" { + t.Errorf("response.method = %v, want %q (the JSON-RPC method that was queued)", resp["method"], "message/send") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract: +// a push-mode workspace (default) is NOT affected by the new short-circuit. +// It still proceeds to resolveAgentURL + dispatch. Without this guard, a +// regression in lookupDeliveryMode could silently break the entire fleet. +func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-push-default" + + dispatched := false + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + dispatched = true + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL) + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode returns "push" — short-circuit must NOT fire. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("push")) + + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"push-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (dispatched), got %d: %s", w.Code, w.Body.String()) + } + if !dispatched { + t.Error("push-mode workspace: expected the agent server to receive the request, but it did not") + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err == nil { + if resp["status"] == "queued" { + t.Error("push-mode response leaked queued envelope — short-circuit fired when it shouldn't have") + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract: +// a DB error reading delivery_mode must default to push (the existing +// behavior), NOT poll. Failing to push means a poll-mode workspace +// briefly attempts a real dispatch — visible failure (502 / SSRF +// rejection / restart cascade), not a silent drop into activity_logs +// where the agent might never look. Loud > silent, recoverable > lost. +func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-mode-db-error" + + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode hits a transient DB error → must default push. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrConnDone) + + // Push path proceeds to resolveAgentURL — empty result → 502 path. + mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id ="). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url", "status"})) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + if w.Code == http.StatusOK { + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["status"] == "queued" { + t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String()) + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go index ca3df0de..7583493e 100644 --- a/workspace-server/internal/handlers/handlers_additional_test.go +++ b/workspace-server/internal/handlers/handlers_additional_test.go @@ -31,8 +31,9 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) { parentID := "parent-ws-123" mock.ExpectBegin() // Default tier is 3 (Privileged) — see workspace.go create-handler comment. + // delivery_mode defaults to "push" when payload omits it (#2339). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -66,8 +67,9 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) { handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) mock.ExpectBegin() + // delivery_mode defaults to "push" when payload omits it (#2339). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -288,7 +290,7 @@ func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3). + WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -320,8 +322,13 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode preflight — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-prov"). + WillReturnError(sql.ErrNoRows) + mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`). + WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // DB returns provisioner URL (127.0.0.1) — should take precedence over agent-reported URL diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index a051f662..435cff37 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "database/sql" "encoding/json" "fmt" "net/http" @@ -100,9 +101,14 @@ func TestRegisterHandler(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode preflight — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-123"). + WillReturnError(sql.ErrNoRows) + // Expect the upsert INSERT ... ON CONFLICT mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`). + WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect the SELECT url query (for cache URL logic) @@ -290,8 +296,9 @@ func TestWorkspaceCreate(t *testing.T) { // Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace). // Default tier is 3 (Privileged) — see workspace.go create-handler comment. + // delivery_mode defaults to "push" when payload omits it (#2339). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect transaction commit (no secrets in this payload) diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 8bd694b8..60fe4b7d 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "database/sql" "errors" "fmt" "log" @@ -116,6 +117,41 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) { // returned IP is checked against the blocklist. This closes the gap where // an attacker could register agent.example.com pointing to 169.254.169.254. // +// resolveDeliveryMode returns the EFFECTIVE delivery mode for a register +// call given the payload's explicit value (which may be empty) and the +// row's existing stored value (which may not exist yet on first +// registration). +// +// Resolution order: +// 1. payload value if non-empty (caller validated it's push/poll already) +// 2. existing row's delivery_mode if the row exists +// 3. "push" (the schema default — safe fallback for both new rows and +// a row whose delivery_mode is somehow NULL despite the NOT NULL +// CHECK constraint, which is forward-defensive only) +// +// Returns ("", err) only on a real DB error; sql.ErrNoRows is treated +// as "no row yet, default to push" — that's the first-register flow. +func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, payloadMode string) (string, error) { + if payloadMode != "" { + // Validated by IsValidDeliveryMode in the caller. + return payloadMode, nil + } + var existing sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&existing) + if errors.Is(err, sql.ErrNoRows) { + return models.DeliveryModePush, nil + } + if err != nil { + return "", err + } + if existing.Valid && existing.String != "" { + return existing.String, nil + } + return models.DeliveryModePush, nil +} + // Returns a non-nil error suitable for including in a 400 Bad Request response. func validateAgentURL(rawURL string) error { if rawURL == "" { @@ -221,15 +257,11 @@ func (h *RegistryHandler) Register(c *gin.Context) { return } - // C6: reject SSRF-capable URLs before persisting or caching them. - if err := validateAgentURL(payload.URL); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) - return - } - - // C6: reject SSRF-capable URLs before persisting or caching them. - if err := validateAgentURL(payload.URL); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + // Validate explicit delivery_mode if the agent declared one; empty is + // allowed and resolves to the row's existing value (or "push" default) + // in the upsert below. See #2339 for the poll/push split rationale. + if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) { + c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"}) return } @@ -250,9 +282,60 @@ func (h *RegistryHandler) Register(c *gin.Context) { return // 401 response already written by requireWorkspaceToken } + // Resolve the EFFECTIVE delivery mode for THIS register call: the + // payload's explicit value wins; falling back to the existing row's + // stored value; falling back to push (the schema default). Done AFTER + // the C18 token check so a hijack attempt fails on auth before we + // reveal whether a workspace row exists at all (resolveDeliveryMode + // would otherwise side-channel that via timing). #2339. + effectiveMode, err := h.resolveDeliveryMode(ctx, payload.ID, payload.DeliveryMode) + if err != nil { + log.Printf("Registry register: resolveDeliveryMode failed for %s: %v", payload.ID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"}) + return + } + + // URL handling diverges by mode: + // push: URL is required and must pass the SSRF safety check — + // same as pre-#2339 behavior (the workspace must be reachable for + // the proxy to dispatch). + // poll: URL is optional and ignored when present. We don't even + // validate it because the platform never dispatches to it. Skipping + // validateAgentURL is intentional — a poll-mode workspace doesn't + // need a publicly-routable URL, so a localhost / private IP / + // missing URL is correct, not a mis-configuration. + if effectiveMode == models.DeliveryModePush { + if payload.URL == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"}) + return + } + if err := validateAgentURL(payload.URL); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + } + agentCardStr := string(payload.AgentCard) - // Upsert workspace: update url, agent_card, status if already exists. + // urlForUpsert: poll-mode workspaces don't need a URL. Empty input + // becomes NULL via sql.NullString so the row's URL stays clean (the + // CASE below also preserves an existing provisioner-set URL, which + // matters for hybrid setups where a workspace was previously push + // and is being re-registered as poll). + var urlForUpsert sql.NullString + if payload.URL != "" { + urlForUpsert = sql.NullString{String: payload.URL, Valid: true} + } + + // modeForUpsert: empty payload value means "keep what's already on the + // row, or default to push for new rows". The COALESCE in the CASE on + // the UPDATE branch and the EXCLUDED.delivery_mode on the INSERT branch + // implement that. We pass effectiveMode (already resolved above) so + // the row's mode is consistent with the URL-validation decision we + // just made. + modeForUpsert := effectiveMode + + // Upsert workspace: update url, agent_card, status, delivery_mode if already exists. // On INSERT (workspace not yet created via POST /workspaces), use ID as name placeholder. // Keep existing URL if provisioner already set a host-accessible one (starts with http://127.0.0.1). // @@ -261,9 +344,9 @@ func (h *RegistryHandler) Register(c *gin.Context) { // the row. Without this guard, bulk deletes left tier-3 stragglers because // the last pre-teardown heartbeat flipped status back to 'online' after // Delete's UPDATE. - _, err := db.DB.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at) - VALUES ($1, $2, $3, $4::jsonb, 'online', now()) + _, err = db.DB.ExecContext(ctx, ` + INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode) + VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5) ON CONFLICT (id) DO UPDATE SET url = CASE WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url @@ -272,9 +355,10 @@ func (h *RegistryHandler) Register(c *gin.Context) { agent_card = EXCLUDED.agent_card, status = 'online', last_heartbeat_at = now(), + delivery_mode = EXCLUDED.delivery_mode, updated_at = now() WHERE workspaces.status IS DISTINCT FROM 'removed' - `, payload.ID, payload.ID, payload.URL, agentCardStr) + `, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert) if err != nil { log.Printf("Registry register error: %v (id=%s)", err, payload.ID) c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"}) @@ -289,6 +373,12 @@ func (h *RegistryHandler) Register(c *gin.Context) { // Cache URL — prefer existing provisioner URL over agent-reported one. // The DB CASE already preserves provisioner URLs, so read from DB as source of truth // instead of adding a Redis round-trip on every registration. + // + // Poll-mode workspaces typically have no URL at all; skip the cache + // writes entirely in that case so we don't poison the cache with an + // empty string that another caller might mistake for "registered with + // no URL" vs "not yet registered". The proxy short-circuits poll-mode + // before consulting the URL cache anyway (see #2339 PR 2). cachedURL := payload.URL var dbURL string if err := db.DB.QueryRowContext(ctx, `SELECT url FROM workspaces WHERE id = $1`, payload.ID).Scan(&dbURL); err == nil { @@ -296,20 +386,26 @@ func (h *RegistryHandler) Register(c *gin.Context) { cachedURL = dbURL } } - if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil { - log.Printf("Registry cache url error: %v", err) + if cachedURL != "" { + if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil { + log.Printf("Registry cache url error: %v", err) + } } // Cache agent-reported URL separately for workspace-to-workspace discovery - // (Docker containers can reach each other by hostname but not via host ports) - if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil { - log.Printf("Registry cache internal url error: %v", err) + // (Docker containers can reach each other by hostname but not via host ports). + // Same skip-when-empty rule as above. + if payload.URL != "" { + if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil { + log.Printf("Registry cache internal url error: %v", err) + } } // Broadcast WORKSPACE_ONLINE if err := h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.ID, map[string]interface{}{ - "url": cachedURL, - "agent_card": payload.AgentCard, + "url": cachedURL, + "agent_card": payload.AgentCard, + "delivery_mode": effectiveMode, }); err != nil { log.Printf("Registry broadcast error: %v", err) } @@ -324,7 +420,7 @@ func (h *RegistryHandler) Register(c *gin.Context) { // Legacy workspaces that registered before tokens existed have no // live token; they bootstrap one here on their next register call. // New workspaces always pass through this path on their first boot. - response := gin.H{"status": "registered"} + response := gin.H{"status": "registered", "delivery_mode": effectiveMode} if hasLive, hasLiveErr := wsauth.HasAnyLiveToken(ctx, db.DB, payload.ID); hasLiveErr == nil && !hasLive { token, tokErr := wsauth.IssueToken(ctx, db.DB, payload.ID) if tokErr != nil { diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 645dd924..5ad4fc62 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -61,9 +61,17 @@ func TestRegister_DBError(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode SELECT — no row yet, so default "push". + // (#2339) New preflight after C18 token check; HasAnyLiveToken's COUNT + // query has no mock here and fails-open per requireWorkspaceToken's + // DB-error handling, so the next DB hit is this delivery_mode lookup. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-fail"). + WillReturnError(sql.ErrNoRows) + // DB insert fails mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`). + WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push"). WillReturnError(sql.ErrConnDone) w := httptest.NewRecorder() @@ -579,10 +587,14 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode preflight — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-resurrect"). + WillReturnError(sql.ErrNoRows) // This regex-ish match requires the guard. If the handler ever drops // the clause the test fails because the emitted SQL won't match. mock.ExpectExec("ON CONFLICT.*WHERE workspaces.status IS DISTINCT FROM 'removed'"). - WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`). + WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows affected = correctly guarded mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). WithArgs("ws-resurrect"). @@ -843,9 +855,14 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) { WithArgs("ws-new"). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-new"). + WillReturnError(sql.ErrNoRows) + // Workspace upsert proceeds normally. mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`). + WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). @@ -910,6 +927,11 @@ func TestRegister_ReturnsPlatformInboundSecret_RFC2312_PRF(t *testing.T) { WithArgs(wsID). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) + // Workspace upsert. mock.ExpectExec("INSERT INTO workspaces"). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -980,6 +1002,10 @@ func TestRegister_NoInboundSecret_OmitsField(t *testing.T) { mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). WithArgs(wsID). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) mock.ExpectExec("INSERT INTO workspaces").WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). WithArgs(wsID). @@ -1063,9 +1089,14 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) { WithArgs("ws-errtest"). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-errtest"). + WillReturnError(sql.ErrNoRows) + // DB upsert fails with a descriptive internal error. mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`). + WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push"). WillReturnError(sql.ErrConnDone) w := httptest.NewRecorder() @@ -1283,3 +1314,211 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) { t.Errorf("monthly_spend=0 must not trigger a DB write for spend: %v", err) } } + +// ==================== Register — delivery_mode (#2339) ==================== + +// TestRegister_PollMode_AcceptsEmptyURL verifies the new contract: +// when delivery_mode=poll, URL is optional. A poll-mode workspace +// (e.g. operator's laptop running molecule-mcp-claude-channel) has +// no public URL to register, and we must NOT reject the registration +// for that. The proxy short-circuits poll-mode A2A in PR 2 — no URL +// needed there either. +func TestRegister_PollMode_AcceptsEmptyURL(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const wsID = "ws-poll-no-url" + + // Bootstrap path — no live tokens, so requireWorkspaceToken passes + // without an Authorization header. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // resolveDeliveryMode: payload sets "poll" explicitly, so we should + // NOT hit the DB lookup at all (the helper short-circuits when + // payload value is non-empty). Asserted by the absence of an + // ExpectQuery for SELECT delivery_mode here. + + // Upsert MUST run with empty URL (sql.NullString) and delivery_mode=poll. + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // SELECT url for cache: returns NULL/empty for poll-mode rows. The + // handler skips the cache writes in that case (no CacheURL / + // CacheInternalURL expectations). + mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("")) + + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Token issuance — first-register path. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"`+wsID+`","delivery_mode":"poll","agent_card":{"name":"poll-agent"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("poll-mode + empty URL: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["delivery_mode"] != "poll" { + t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll") + } + // First-register must still mint a token regardless of delivery_mode. + if resp["auth_token"] == nil { + t.Error("expected auth_token in response (first-register path)") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestRegister_PushMode_RejectsEmptyURL verifies the symmetric contract: +// push-mode (the default) still requires a URL. Skipping URL validation +// in poll-mode mustn't accidentally relax the push-mode invariant — that +// would silently break dispatch for the rest of the fleet. +func TestRegister_PushMode_RejectsEmptyURL(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + // Bootstrap path through requireWorkspaceToken. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs("ws-push-no-url"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // resolveDeliveryMode: no row yet, defaults to push. The handler + // then validates the URL — which is empty — and returns 400. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-push-no-url"). + WillReturnError(sql.ErrNoRows) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"ws-push-no-url","agent_card":{"name":"push-agent"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("push-mode + empty URL: expected 400, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "url is required") { + t.Errorf("expected 'url is required' in error body, got: %s", w.Body.String()) + } +} + +// TestRegister_InvalidDeliveryMode rejects payloads that declare an +// unrecognised delivery_mode — defends against a typo silently +// becoming "push" and leaving the operator wondering why polling +// doesn't work. +func TestRegister_InvalidDeliveryMode(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"ws-x","url":"http://localhost:8000","agent_card":{"name":"a"},"delivery_mode":"webhook"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("invalid delivery_mode: expected 400, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "delivery_mode") { + t.Errorf("expected error body to mention delivery_mode, got: %s", w.Body.String()) + } +} + +// TestRegister_PollMode_PreservesExistingValue: when the row already +// has delivery_mode=poll and the payload doesn't set it, the resolved +// mode should be poll — i.e. "absent payload mode" must NOT silently +// downgrade an existing poll workspace to push. Ensures Telegram-style +// stability: mode is sticky once set. +func TestRegister_PollMode_PreservesExistingValue(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const wsID = "ws-existing-poll" + + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // resolveDeliveryMode: row exists with delivery_mode=poll. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll")) + + // Upsert carries the resolved poll mode forward — even though + // payload didn't restate it. URL still empty (poll-mode shape). + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("")) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + // No delivery_mode in payload — must inherit "poll" from the row. + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["delivery_mode"] != "poll" { + t.Errorf("delivery_mode = %v, want %q (must inherit existing row's mode when payload absent)", + resp["delivery_mode"], "poll") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index 00d86b7e..ae70256b 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -224,11 +224,24 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { if maxConcurrent <= 0 { maxConcurrent = models.DefaultMaxConcurrentTasks } - // Insert workspace with runtime persisted in DB (inside transaction) + // delivery_mode: explicit payload value (validated below), else default + // to push (the schema default + pre-#2339 behavior). Validated here, not + // in workspace_provision.go, so a bad value fails the create cleanly + // instead of mid-provision after side effects. + deliveryMode := payload.DeliveryMode + if deliveryMode == "" { + deliveryMode = models.DeliveryModePush + } + if !models.IsValidDeliveryMode(deliveryMode) { + tx.Rollback() //nolint:errcheck + c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"}) + return + } + // Insert workspace with runtime + delivery_mode persisted in DB (inside transaction) _, err := tx.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks) - VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11) - `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent) + INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode) + VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12) + `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode) if err != nil { tx.Rollback() //nolint:errcheck log.Printf("Create workspace error: %v", err) diff --git a/workspace-server/internal/handlers/workspace_budget_test.go b/workspace-server/internal/handlers/workspace_budget_test.go index b38e984b..920dad9c 100644 --- a/workspace-server/internal/handlers/workspace_budget_test.go +++ b/workspace-server/internal/handlers/workspace_budget_test.go @@ -152,6 +152,7 @@ func TestWorkspaceBudget_Create_WithLimit(t *testing.T) { "none", // workspace_access &budgetVal, // budget_limit ($10) models.DefaultMaxConcurrentTasks, // max_concurrent_tasks default + "push", // delivery_mode default (#2339) ). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go index 43faa8f9..8afc93e8 100644 --- a/workspace-server/internal/handlers/workspace_test.go +++ b/workspace-server/internal/handlers/workspace_test.go @@ -155,7 +155,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) { // Transaction begins, workspace INSERT fails, transaction is rolled back. mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnError(sql.ErrConnDone) mock.ExpectRollback() @@ -188,7 +188,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) { // Expect workspace INSERT with defaulted tier=3 (Privileged — the // handler default in workspace.go), runtime="langgraph" mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() @@ -239,7 +239,7 @@ func TestWorkspaceCreate_WithSecrets_Persists(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // Secret inserted inside the same transaction. mock.ExpectExec("INSERT INTO workspace_secrets"). @@ -1258,7 +1258,7 @@ runtime_config: mock.ExpectExec("INSERT INTO workspaces"). WithArgs( sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", - sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -1315,7 +1315,7 @@ model: anthropic:claude-sonnet-4-5 mock.ExpectExec("INSERT INTO workspaces"). WithArgs( sqlmock.AnyArg(), "Legacy Agent", nil, 3, "langgraph", - sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -1368,7 +1368,7 @@ runtime_config: mock.ExpectExec("INSERT INTO workspaces"). WithArgs( sqlmock.AnyArg(), "Custom Hermes", nil, 3, "hermes", - sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). diff --git a/workspace-server/internal/models/architecture_test.go b/workspace-server/internal/models/architecture_test.go new file mode 100644 index 00000000..40b65ba6 --- /dev/null +++ b/workspace-server/internal/models/architecture_test.go @@ -0,0 +1,63 @@ +package models_test + +// Architecture test (#2344): models is a leaf — it carries pure type +// definitions and must not import any other internal/* package. Almost +// every package in workspace-server depends on models; if models grew a +// reverse dep, the import graph would cycle. +// +// If this test fails: you put behavior inside models. Move the behavior +// to whichever package actually owns it (handlers, provisioner, db, …) +// and have *that* package import models, not the reverse. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +func TestModelsHasNoInternalDependencies(t *testing.T) { + t.Parallel() + for path, file := range listImports(t, ".") { + if strings.HasPrefix(path, moduleInternalPrefix) { + t.Errorf( + "models must not import other internal packages "+ + "(found %q in %s) — models is the pure-types leaf and any "+ + "reverse dep creates an import cycle since most packages "+ + "depend on models. See workspace-server/internal/models/architecture_test.go.", + path, file, + ) + } + } +} + +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index e8850425..11284473 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -32,16 +32,42 @@ type Workspace struct { UptimeSeconds int `json:"uptime_seconds" db:"uptime_seconds"` CreatedAt time.Time `json:"created_at" db:"created_at"` UpdatedAt time.Time `json:"updated_at" db:"updated_at"` + // DeliveryMode: "push" (synchronous to URL — default) or "poll" (logged + // to activity_logs, agent reads via GET /activity?since_id=). See + // migration 045 + RFC #2339. + DeliveryMode string `json:"delivery_mode" db:"delivery_mode"` // Canvas layout fields (from JOIN) X float64 `json:"x"` Y float64 `json:"y"` Collapsed bool `json:"collapsed"` } +// Delivery mode constants. Matches the CHECK constraint in migration 045. +const ( + DeliveryModePush = "push" + DeliveryModePoll = "poll" +) + +// IsValidDeliveryMode reports whether s is one of the recognised +// delivery modes. Empty string is NOT valid here — callers must +// resolve the default ("push") before calling. +func IsValidDeliveryMode(s string) bool { + return s == DeliveryModePush || s == DeliveryModePoll +} + type RegisterPayload struct { - ID string `json:"id" binding:"required"` - URL string `json:"url" binding:"required"` - AgentCard json.RawMessage `json:"agent_card" binding:"required"` + ID string `json:"id" binding:"required"` + // URL is required for push-mode workspaces; optional / unused for + // poll-mode (the platform never dispatches to it). The handler + // enforces the conditional requirement based on the resolved + // delivery mode (payload value, falling back to the row's existing + // value, falling back to "push"). + URL string `json:"url"` + AgentCard json.RawMessage `json:"agent_card" binding:"required"` + // DeliveryMode is optional. Empty string means "keep the existing + // value on the workspace row, or default to push for new rows". + // When set, must be one of DeliveryModePush / DeliveryModePoll. + DeliveryMode string `json:"delivery_mode,omitempty"` } type HeartbeatPayload struct { @@ -127,7 +153,11 @@ type CreateWorkspacePayload struct { Model string `json:"model"` Runtime string `json:"runtime"` // "langgraph" (default), "claude-code", etc. External bool `json:"external"` // true = no Docker container, just a registered URL - URL string `json:"url"` // for external workspaces: the A2A endpoint URL + URL string `json:"url"` // for external workspaces: the A2A endpoint URL (push mode only — omit for poll) + // DeliveryMode: "push" (default) sends inbound A2A to URL synchronously; + // "poll" records inbound to activity_logs for the agent to consume via + // GET /activity?since_id=. Poll mode does not require a URL. See #2339. + DeliveryMode string `json:"delivery_mode,omitempty"` WorkspaceDir string `json:"workspace_dir"` // host path to mount as /workspace (empty = isolated volume) WorkspaceAccess string `json:"workspace_access"` // "none" (default), "read_only", or "read_write" — see #65 ParentID *string `json:"parent_id"` diff --git a/workspace-server/internal/provisioner/architecture_test.go b/workspace-server/internal/provisioner/architecture_test.go new file mode 100644 index 00000000..c7455e52 --- /dev/null +++ b/workspace-server/internal/provisioner/architecture_test.go @@ -0,0 +1,80 @@ +package provisioner_test + +// Architecture test (#2344): provisioner is below handlers/router in +// the layer hierarchy. handlers wires provisioner into HTTP routes; +// the reverse direction (provisioner reaching back into handlers or +// the router) creates a cycle and tangles infra-orchestration with +// transport. +// +// Note: provisioner CURRENTLY imports db (for the runtime-image +// lookup). That's a known coupling — see PR #2276 review thread on +// where image resolution should live. The narrower rule we enforce +// here is "no upward import to handlers/router," which is the harder +// rule to keep clean. +// +// If this test fails: you reached "up" the stack. Pass whatever you +// need from handlers down through a constructor parameter or a +// function-typed callback instead of importing the package directly. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +var provisionerForbiddenImports = []string{ + moduleInternalPrefix + "handlers", + moduleInternalPrefix + "router", +} + +func TestProvisionerDoesNotImportUpstreamLayers(t *testing.T) { + t.Parallel() + imports := listImports(t, ".") + for path, file := range imports { + for _, forbidden := range provisionerForbiddenImports { + if path == forbidden || strings.HasPrefix(path, forbidden+"/") { + t.Errorf( + "provisioner must not import %q (found in %s) — "+ + "provisioner sits below handlers/router in the layer "+ + "hierarchy and a reverse dep creates a cycle. Pass "+ + "what you need down via constructor params or "+ + "function-typed callbacks. See workspace-server/internal/"+ + "provisioner/architecture_test.go.", + path, file, + ) + } + } + } +} + +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/internal/wsauth/architecture_test.go b/workspace-server/internal/wsauth/architecture_test.go new file mode 100644 index 00000000..c61e5b7e --- /dev/null +++ b/workspace-server/internal/wsauth/architecture_test.go @@ -0,0 +1,68 @@ +package wsauth_test + +// Architecture test (#2344): wsauth is a leaf package — it must not import +// any other internal/* package. The auth layer is below business logic; +// importing handlers, db, or any cousin package would force every wsauth +// test to spin up that subsystem, defeating the unit-test boundary that +// makes the auth code reviewable. +// +// If this test fails: you added an import that crosses a layer. Either +// move the dependency the other direction (consumer wires wsauth into +// itself), accept the boundary by inlining what you need, or — if the +// new coupling is genuinely correct — explicitly update this test with +// the new allowed import + a comment explaining why. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +func TestWsauthHasNoInternalDependencies(t *testing.T) { + t.Parallel() + for path, file := range listImports(t, ".") { + if strings.HasPrefix(path, moduleInternalPrefix) { + t.Errorf( + "wsauth must not import other internal packages "+ + "(found %q in %s) — wsauth is the auth leaf and must stay "+ + "unit-testable without spinning up other subsystems. "+ + "See workspace-server/internal/wsauth/architecture_test.go for context.", + path, file, + ) + } + } +} + +// listImports returns import-path → first-file-where-seen for non-test +// .go files in dir. Used by every architecture_test.go in this tree. +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/migrations/045_workspaces_delivery_mode.down.sql b/workspace-server/migrations/045_workspaces_delivery_mode.down.sql new file mode 100644 index 00000000..2d21840e --- /dev/null +++ b/workspace-server/migrations/045_workspaces_delivery_mode.down.sql @@ -0,0 +1,8 @@ +-- 045_workspaces_delivery_mode.down.sql +-- +-- Drops the delivery_mode column. Any code reading it after rollback falls +-- back to push mode (the pre-#2339 behavior), so this is forward-only-safe +-- only if the matching application code is rolled back in the same release. + +ALTER TABLE workspaces + DROP COLUMN IF EXISTS delivery_mode; diff --git a/workspace-server/migrations/045_workspaces_delivery_mode.up.sql b/workspace-server/migrations/045_workspaces_delivery_mode.up.sql new file mode 100644 index 00000000..1eab5e84 --- /dev/null +++ b/workspace-server/migrations/045_workspaces_delivery_mode.up.sql @@ -0,0 +1,54 @@ +-- 045_workspaces_delivery_mode.up.sql +-- +-- Per-workspace declaration of how A2A traffic is delivered TO the workspace. +-- +-- push (default, today's behavior) +-- Platform synchronously POSTs to workspaces.url and surfaces the response +-- to the caller. Requires a publicly-routable URL (SSRF gate at +-- a2a_proxy.go:455). Used by all hosted runtimes (claude-code, hermes, +-- etc.) where the platform's provisioner sets the URL at boot. +-- +-- poll +-- Platform records the inbound A2A as an a2a_receive activity row and +-- returns 200 to the caller without dispatching. The agent client (e.g. +-- molecule-mcp-claude-channel) consumes the inbox via +-- GET /workspaces/:id/activity?since_id=… and replies via +-- POST /workspaces/:peer/a2a. NO URL required — works through every NAT, +-- firewall, and dev-laptop without a tunnel. +-- +-- Why a column and not a derived signal: +-- +-- * Mutual exclusivity matches Telegram's getUpdates / setWebhook +-- semantics — operationally cleaner than "both half-work because URL +-- is empty". Telegram explicitly rejects double-delivery; we now do +-- the same. +-- * The platform short-circuits BEFORE the SSRF check, so a poll-mode +-- workspace with a stale or missing URL never trips the silent-404 +-- failure mode that motivated #2339. +-- * Push-mode is the safe default: every existing workspace continues +-- to work exactly as before with no migration of behavior. +-- +-- Backwards compatibility: +-- +-- * NOT NULL with DEFAULT 'push' — the ALTER backfills existing rows. +-- * Push-mode workspaces are unchanged: SSRF check still gates dispatch, +-- activity logging unchanged. +-- * Poll-mode opt-in only via POST /workspaces (delivery_mode='poll') +-- or POST /registry/register with delivery_mode='poll'. Cannot be +-- toggled after the fact via heartbeat — flipping mode mid-life is +-- ambiguous (in-flight pushes vs queued polls), so an explicit +-- PATCH /workspaces/:id/delivery_mode endpoint will be added later +-- if the use case appears. +-- +-- Reverse plan: the .down.sql drops the column. Any short-circuit code +-- that reads delivery_mode would then hit a "column does not exist" +-- error — readers fall back to push mode (behaviour pre-2339), which is +-- the safe degradation. Acceptable for a forward-only schema; the down +-- exists for migration tooling parity, not as a recommended runtime path. + +ALTER TABLE workspaces + ADD COLUMN IF NOT EXISTS delivery_mode TEXT NOT NULL DEFAULT 'push' + CHECK (delivery_mode IN ('push', 'poll')); + +COMMENT ON COLUMN workspaces.delivery_mode IS + 'How inbound A2A is delivered: push (synchronous to workspaces.url) or poll (logged to activity_logs, agent reads via GET /activity?since_id=). See migration 045 + RFC #2339.';