From d5b00d6ac18be6a4ee9bd7e2e72b3e88c4a3a59d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 29 Apr 2026 21:46:23 -0700 Subject: [PATCH 1/5] feat(workspaces): delivery_mode column + poll-mode register flow (#2339 PR 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds workspaces.delivery_mode (push, default | poll) and lets the register handler accept poll-mode workspaces with no URL. This is the foundation for the unified poll/push delivery design in #2339 — Telegram-getUpdates shape for external runtimes that have no public URL. What this PR does: - Migration 045: NOT NULL TEXT column, default 'push', CHECK constraint on the two valid values. - models.Workspace + RegisterPayload + CreateWorkspacePayload gain a DeliveryMode field. RegisterPayload.URL drops the `binding:"required"` tag — the handler now enforces it conditionally on the resolved mode. - Register handler: validates explicit delivery_mode if set; resolves effective mode (payload value, else stored row value, else push) AFTER the C18 token check; validates URL only when effective mode is push; persists delivery_mode in the upsert; returns it in the response; skips URL caching when payload.URL is empty. - CreateWorkspace handler: persists delivery_mode (defaults to push) in the same INSERT, validates it before any side effects. What this PR does NOT do (intentional, follow-up PRs): - PR 2: short-circuit ProxyA2A for poll-mode workspaces (skip SSRF + dispatch, log a2a_receive activity, return 200). - PR 3: since_id cursor on GET /activity for lossless polling. - Plugin v0.2 in molecule-mcp-claude-channel: cursor persistence + a register helper that creates poll-mode workspaces. Backwards compatibility: every existing workspace stays push-mode (schema default) with identical behavior. New tests: TestRegister_PollMode_AcceptsEmptyURL, TestRegister_PushMode_RejectsEmptyURL, TestRegister_InvalidDeliveryMode, TestRegister_PollMode_PreservesExistingValue. All existing register + create tests updated to expect the new delivery_mode column in the INSERT args. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../handlers/handlers_additional_test.go | 15 +- .../internal/handlers/handlers_test.go | 11 +- .../internal/handlers/registry.go | 140 ++++++++-- .../internal/handlers/registry_test.go | 247 +++++++++++++++++- .../internal/handlers/workspace.go | 21 +- .../handlers/workspace_budget_test.go | 1 + .../internal/handlers/workspace_test.go | 12 +- workspace-server/internal/models/workspace.go | 38 ++- .../045_workspaces_delivery_mode.down.sql | 8 + .../045_workspaces_delivery_mode.up.sql | 54 ++++ 10 files changed, 501 insertions(+), 46 deletions(-) create mode 100644 workspace-server/migrations/045_workspaces_delivery_mode.down.sql create mode 100644 workspace-server/migrations/045_workspaces_delivery_mode.up.sql diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go index ca3df0de..7583493e 100644 --- a/workspace-server/internal/handlers/handlers_additional_test.go +++ b/workspace-server/internal/handlers/handlers_additional_test.go @@ -31,8 +31,9 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) { parentID := "parent-ws-123" mock.ExpectBegin() // Default tier is 3 (Privileged) — see workspace.go create-handler comment. + // delivery_mode defaults to "push" when payload omits it (#2339). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -66,8 +67,9 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) { handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) mock.ExpectBegin() + // delivery_mode defaults to "push" when payload omits it (#2339). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -288,7 +290,7 @@ func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3). + WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -320,8 +322,13 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode preflight — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-prov"). + WillReturnError(sql.ErrNoRows) + mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`). + WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // DB returns provisioner URL (127.0.0.1) — should take precedence over agent-reported URL diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index a051f662..435cff37 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "database/sql" "encoding/json" "fmt" "net/http" @@ -100,9 +101,14 @@ func TestRegisterHandler(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode preflight — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-123"). + WillReturnError(sql.ErrNoRows) + // Expect the upsert INSERT ... ON CONFLICT mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`). + WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect the SELECT url query (for cache URL logic) @@ -290,8 +296,9 @@ func TestWorkspaceCreate(t *testing.T) { // Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace). // Default tier is 3 (Privileged) — see workspace.go create-handler comment. + // delivery_mode defaults to "push" when payload omits it (#2339). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect transaction commit (no secrets in this payload) diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 8bd694b8..60fe4b7d 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "database/sql" "errors" "fmt" "log" @@ -116,6 +117,41 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) { // returned IP is checked against the blocklist. This closes the gap where // an attacker could register agent.example.com pointing to 169.254.169.254. // +// resolveDeliveryMode returns the EFFECTIVE delivery mode for a register +// call given the payload's explicit value (which may be empty) and the +// row's existing stored value (which may not exist yet on first +// registration). +// +// Resolution order: +// 1. payload value if non-empty (caller validated it's push/poll already) +// 2. existing row's delivery_mode if the row exists +// 3. "push" (the schema default — safe fallback for both new rows and +// a row whose delivery_mode is somehow NULL despite the NOT NULL +// CHECK constraint, which is forward-defensive only) +// +// Returns ("", err) only on a real DB error; sql.ErrNoRows is treated +// as "no row yet, default to push" — that's the first-register flow. +func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, payloadMode string) (string, error) { + if payloadMode != "" { + // Validated by IsValidDeliveryMode in the caller. + return payloadMode, nil + } + var existing sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&existing) + if errors.Is(err, sql.ErrNoRows) { + return models.DeliveryModePush, nil + } + if err != nil { + return "", err + } + if existing.Valid && existing.String != "" { + return existing.String, nil + } + return models.DeliveryModePush, nil +} + // Returns a non-nil error suitable for including in a 400 Bad Request response. func validateAgentURL(rawURL string) error { if rawURL == "" { @@ -221,15 +257,11 @@ func (h *RegistryHandler) Register(c *gin.Context) { return } - // C6: reject SSRF-capable URLs before persisting or caching them. - if err := validateAgentURL(payload.URL); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) - return - } - - // C6: reject SSRF-capable URLs before persisting or caching them. - if err := validateAgentURL(payload.URL); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + // Validate explicit delivery_mode if the agent declared one; empty is + // allowed and resolves to the row's existing value (or "push" default) + // in the upsert below. See #2339 for the poll/push split rationale. + if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) { + c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"}) return } @@ -250,9 +282,60 @@ func (h *RegistryHandler) Register(c *gin.Context) { return // 401 response already written by requireWorkspaceToken } + // Resolve the EFFECTIVE delivery mode for THIS register call: the + // payload's explicit value wins; falling back to the existing row's + // stored value; falling back to push (the schema default). Done AFTER + // the C18 token check so a hijack attempt fails on auth before we + // reveal whether a workspace row exists at all (resolveDeliveryMode + // would otherwise side-channel that via timing). #2339. + effectiveMode, err := h.resolveDeliveryMode(ctx, payload.ID, payload.DeliveryMode) + if err != nil { + log.Printf("Registry register: resolveDeliveryMode failed for %s: %v", payload.ID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"}) + return + } + + // URL handling diverges by mode: + // push: URL is required and must pass the SSRF safety check — + // same as pre-#2339 behavior (the workspace must be reachable for + // the proxy to dispatch). + // poll: URL is optional and ignored when present. We don't even + // validate it because the platform never dispatches to it. Skipping + // validateAgentURL is intentional — a poll-mode workspace doesn't + // need a publicly-routable URL, so a localhost / private IP / + // missing URL is correct, not a mis-configuration. + if effectiveMode == models.DeliveryModePush { + if payload.URL == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"}) + return + } + if err := validateAgentURL(payload.URL); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + } + agentCardStr := string(payload.AgentCard) - // Upsert workspace: update url, agent_card, status if already exists. + // urlForUpsert: poll-mode workspaces don't need a URL. Empty input + // becomes NULL via sql.NullString so the row's URL stays clean (the + // CASE below also preserves an existing provisioner-set URL, which + // matters for hybrid setups where a workspace was previously push + // and is being re-registered as poll). + var urlForUpsert sql.NullString + if payload.URL != "" { + urlForUpsert = sql.NullString{String: payload.URL, Valid: true} + } + + // modeForUpsert: empty payload value means "keep what's already on the + // row, or default to push for new rows". The COALESCE in the CASE on + // the UPDATE branch and the EXCLUDED.delivery_mode on the INSERT branch + // implement that. We pass effectiveMode (already resolved above) so + // the row's mode is consistent with the URL-validation decision we + // just made. + modeForUpsert := effectiveMode + + // Upsert workspace: update url, agent_card, status, delivery_mode if already exists. // On INSERT (workspace not yet created via POST /workspaces), use ID as name placeholder. // Keep existing URL if provisioner already set a host-accessible one (starts with http://127.0.0.1). // @@ -261,9 +344,9 @@ func (h *RegistryHandler) Register(c *gin.Context) { // the row. Without this guard, bulk deletes left tier-3 stragglers because // the last pre-teardown heartbeat flipped status back to 'online' after // Delete's UPDATE. - _, err := db.DB.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at) - VALUES ($1, $2, $3, $4::jsonb, 'online', now()) + _, err = db.DB.ExecContext(ctx, ` + INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode) + VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5) ON CONFLICT (id) DO UPDATE SET url = CASE WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url @@ -272,9 +355,10 @@ func (h *RegistryHandler) Register(c *gin.Context) { agent_card = EXCLUDED.agent_card, status = 'online', last_heartbeat_at = now(), + delivery_mode = EXCLUDED.delivery_mode, updated_at = now() WHERE workspaces.status IS DISTINCT FROM 'removed' - `, payload.ID, payload.ID, payload.URL, agentCardStr) + `, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert) if err != nil { log.Printf("Registry register error: %v (id=%s)", err, payload.ID) c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"}) @@ -289,6 +373,12 @@ func (h *RegistryHandler) Register(c *gin.Context) { // Cache URL — prefer existing provisioner URL over agent-reported one. // The DB CASE already preserves provisioner URLs, so read from DB as source of truth // instead of adding a Redis round-trip on every registration. + // + // Poll-mode workspaces typically have no URL at all; skip the cache + // writes entirely in that case so we don't poison the cache with an + // empty string that another caller might mistake for "registered with + // no URL" vs "not yet registered". The proxy short-circuits poll-mode + // before consulting the URL cache anyway (see #2339 PR 2). cachedURL := payload.URL var dbURL string if err := db.DB.QueryRowContext(ctx, `SELECT url FROM workspaces WHERE id = $1`, payload.ID).Scan(&dbURL); err == nil { @@ -296,20 +386,26 @@ func (h *RegistryHandler) Register(c *gin.Context) { cachedURL = dbURL } } - if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil { - log.Printf("Registry cache url error: %v", err) + if cachedURL != "" { + if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil { + log.Printf("Registry cache url error: %v", err) + } } // Cache agent-reported URL separately for workspace-to-workspace discovery - // (Docker containers can reach each other by hostname but not via host ports) - if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil { - log.Printf("Registry cache internal url error: %v", err) + // (Docker containers can reach each other by hostname but not via host ports). + // Same skip-when-empty rule as above. + if payload.URL != "" { + if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil { + log.Printf("Registry cache internal url error: %v", err) + } } // Broadcast WORKSPACE_ONLINE if err := h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.ID, map[string]interface{}{ - "url": cachedURL, - "agent_card": payload.AgentCard, + "url": cachedURL, + "agent_card": payload.AgentCard, + "delivery_mode": effectiveMode, }); err != nil { log.Printf("Registry broadcast error: %v", err) } @@ -324,7 +420,7 @@ func (h *RegistryHandler) Register(c *gin.Context) { // Legacy workspaces that registered before tokens existed have no // live token; they bootstrap one here on their next register call. // New workspaces always pass through this path on their first boot. - response := gin.H{"status": "registered"} + response := gin.H{"status": "registered", "delivery_mode": effectiveMode} if hasLive, hasLiveErr := wsauth.HasAnyLiveToken(ctx, db.DB, payload.ID); hasLiveErr == nil && !hasLive { token, tokErr := wsauth.IssueToken(ctx, db.DB, payload.ID) if tokErr != nil { diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 645dd924..5ad4fc62 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -61,9 +61,17 @@ func TestRegister_DBError(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode SELECT — no row yet, so default "push". + // (#2339) New preflight after C18 token check; HasAnyLiveToken's COUNT + // query has no mock here and fails-open per requireWorkspaceToken's + // DB-error handling, so the next DB hit is this delivery_mode lookup. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-fail"). + WillReturnError(sql.ErrNoRows) + // DB insert fails mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`). + WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push"). WillReturnError(sql.ErrConnDone) w := httptest.NewRecorder() @@ -579,10 +587,14 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) + // resolveDeliveryMode preflight — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-resurrect"). + WillReturnError(sql.ErrNoRows) // This regex-ish match requires the guard. If the handler ever drops // the clause the test fails because the emitted SQL won't match. mock.ExpectExec("ON CONFLICT.*WHERE workspaces.status IS DISTINCT FROM 'removed'"). - WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`). + WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows affected = correctly guarded mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). WithArgs("ws-resurrect"). @@ -843,9 +855,14 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) { WithArgs("ws-new"). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-new"). + WillReturnError(sql.ErrNoRows) + // Workspace upsert proceeds normally. mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`). + WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). @@ -910,6 +927,11 @@ func TestRegister_ReturnsPlatformInboundSecret_RFC2312_PRF(t *testing.T) { WithArgs(wsID). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) + // Workspace upsert. mock.ExpectExec("INSERT INTO workspaces"). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -980,6 +1002,10 @@ func TestRegister_NoInboundSecret_OmitsField(t *testing.T) { mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). WithArgs(wsID). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) mock.ExpectExec("INSERT INTO workspaces").WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). WithArgs(wsID). @@ -1063,9 +1089,14 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) { WithArgs("ws-errtest"). WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode — no row yet, default push (#2339). + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-errtest"). + WillReturnError(sql.ErrNoRows) + // DB upsert fails with a descriptive internal error. mock.ExpectExec("INSERT INTO workspaces"). - WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`). + WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push"). WillReturnError(sql.ErrConnDone) w := httptest.NewRecorder() @@ -1283,3 +1314,211 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) { t.Errorf("monthly_spend=0 must not trigger a DB write for spend: %v", err) } } + +// ==================== Register — delivery_mode (#2339) ==================== + +// TestRegister_PollMode_AcceptsEmptyURL verifies the new contract: +// when delivery_mode=poll, URL is optional. A poll-mode workspace +// (e.g. operator's laptop running molecule-mcp-claude-channel) has +// no public URL to register, and we must NOT reject the registration +// for that. The proxy short-circuits poll-mode A2A in PR 2 — no URL +// needed there either. +func TestRegister_PollMode_AcceptsEmptyURL(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const wsID = "ws-poll-no-url" + + // Bootstrap path — no live tokens, so requireWorkspaceToken passes + // without an Authorization header. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // resolveDeliveryMode: payload sets "poll" explicitly, so we should + // NOT hit the DB lookup at all (the helper short-circuits when + // payload value is non-empty). Asserted by the absence of an + // ExpectQuery for SELECT delivery_mode here. + + // Upsert MUST run with empty URL (sql.NullString) and delivery_mode=poll. + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // SELECT url for cache: returns NULL/empty for poll-mode rows. The + // handler skips the cache writes in that case (no CacheURL / + // CacheInternalURL expectations). + mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("")) + + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Token issuance — first-register path. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"`+wsID+`","delivery_mode":"poll","agent_card":{"name":"poll-agent"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("poll-mode + empty URL: expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["delivery_mode"] != "poll" { + t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll") + } + // First-register must still mint a token regardless of delivery_mode. + if resp["auth_token"] == nil { + t.Error("expected auth_token in response (first-register path)") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestRegister_PushMode_RejectsEmptyURL verifies the symmetric contract: +// push-mode (the default) still requires a URL. Skipping URL validation +// in poll-mode mustn't accidentally relax the push-mode invariant — that +// would silently break dispatch for the rest of the fleet. +func TestRegister_PushMode_RejectsEmptyURL(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + // Bootstrap path through requireWorkspaceToken. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs("ws-push-no-url"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // resolveDeliveryMode: no row yet, defaults to push. The handler + // then validates the URL — which is empty — and returns 400. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs("ws-push-no-url"). + WillReturnError(sql.ErrNoRows) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"ws-push-no-url","agent_card":{"name":"push-agent"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("push-mode + empty URL: expected 400, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "url is required") { + t.Errorf("expected 'url is required' in error body, got: %s", w.Body.String()) + } +} + +// TestRegister_InvalidDeliveryMode rejects payloads that declare an +// unrecognised delivery_mode — defends against a typo silently +// becoming "push" and leaving the operator wondering why polling +// doesn't work. +func TestRegister_InvalidDeliveryMode(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"ws-x","url":"http://localhost:8000","agent_card":{"name":"a"},"delivery_mode":"webhook"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("invalid delivery_mode: expected 400, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "delivery_mode") { + t.Errorf("expected error body to mention delivery_mode, got: %s", w.Body.String()) + } +} + +// TestRegister_PollMode_PreservesExistingValue: when the row already +// has delivery_mode=poll and the payload doesn't set it, the resolved +// mode should be poll — i.e. "absent payload mode" must NOT silently +// downgrade an existing poll workspace to push. Ensures Telegram-style +// stability: mode is sticky once set. +func TestRegister_PollMode_PreservesExistingValue(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const wsID = "ws-existing-poll" + + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // resolveDeliveryMode: row exists with delivery_mode=poll. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll")) + + // Upsert carries the resolved poll mode forward — even though + // payload didn't restate it. URL still empty (poll-mode shape). + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("")) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + // No delivery_mode in payload — must inherit "poll" from the row. + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["delivery_mode"] != "poll" { + t.Errorf("delivery_mode = %v, want %q (must inherit existing row's mode when payload absent)", + resp["delivery_mode"], "poll") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index 00d86b7e..ae70256b 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -224,11 +224,24 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { if maxConcurrent <= 0 { maxConcurrent = models.DefaultMaxConcurrentTasks } - // Insert workspace with runtime persisted in DB (inside transaction) + // delivery_mode: explicit payload value (validated below), else default + // to push (the schema default + pre-#2339 behavior). Validated here, not + // in workspace_provision.go, so a bad value fails the create cleanly + // instead of mid-provision after side effects. + deliveryMode := payload.DeliveryMode + if deliveryMode == "" { + deliveryMode = models.DeliveryModePush + } + if !models.IsValidDeliveryMode(deliveryMode) { + tx.Rollback() //nolint:errcheck + c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"}) + return + } + // Insert workspace with runtime + delivery_mode persisted in DB (inside transaction) _, err := tx.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks) - VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11) - `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent) + INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode) + VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12) + `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode) if err != nil { tx.Rollback() //nolint:errcheck log.Printf("Create workspace error: %v", err) diff --git a/workspace-server/internal/handlers/workspace_budget_test.go b/workspace-server/internal/handlers/workspace_budget_test.go index b38e984b..920dad9c 100644 --- a/workspace-server/internal/handlers/workspace_budget_test.go +++ b/workspace-server/internal/handlers/workspace_budget_test.go @@ -152,6 +152,7 @@ func TestWorkspaceBudget_Create_WithLimit(t *testing.T) { "none", // workspace_access &budgetVal, // budget_limit ($10) models.DefaultMaxConcurrentTasks, // max_concurrent_tasks default + "push", // delivery_mode default (#2339) ). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go index 43faa8f9..8afc93e8 100644 --- a/workspace-server/internal/handlers/workspace_test.go +++ b/workspace-server/internal/handlers/workspace_test.go @@ -155,7 +155,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) { // Transaction begins, workspace INSERT fails, transaction is rolled back. mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnError(sql.ErrConnDone) mock.ExpectRollback() @@ -188,7 +188,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) { // Expect workspace INSERT with defaulted tier=3 (Privileged — the // handler default in workspace.go), runtime="langgraph" mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() @@ -239,7 +239,7 @@ func TestWorkspaceCreate_WithSecrets_Persists(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) // Secret inserted inside the same transaction. mock.ExpectExec("INSERT INTO workspace_secrets"). @@ -1258,7 +1258,7 @@ runtime_config: mock.ExpectExec("INSERT INTO workspaces"). WithArgs( sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", - sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -1315,7 +1315,7 @@ model: anthropic:claude-sonnet-4-5 mock.ExpectExec("INSERT INTO workspaces"). WithArgs( sqlmock.AnyArg(), "Legacy Agent", nil, 3, "langgraph", - sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -1368,7 +1368,7 @@ runtime_config: mock.ExpectExec("INSERT INTO workspaces"). WithArgs( sqlmock.AnyArg(), "Custom Hermes", nil, 3, "hermes", - sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks). + sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index e8850425..11284473 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -32,16 +32,42 @@ type Workspace struct { UptimeSeconds int `json:"uptime_seconds" db:"uptime_seconds"` CreatedAt time.Time `json:"created_at" db:"created_at"` UpdatedAt time.Time `json:"updated_at" db:"updated_at"` + // DeliveryMode: "push" (synchronous to URL — default) or "poll" (logged + // to activity_logs, agent reads via GET /activity?since_id=). See + // migration 045 + RFC #2339. + DeliveryMode string `json:"delivery_mode" db:"delivery_mode"` // Canvas layout fields (from JOIN) X float64 `json:"x"` Y float64 `json:"y"` Collapsed bool `json:"collapsed"` } +// Delivery mode constants. Matches the CHECK constraint in migration 045. +const ( + DeliveryModePush = "push" + DeliveryModePoll = "poll" +) + +// IsValidDeliveryMode reports whether s is one of the recognised +// delivery modes. Empty string is NOT valid here — callers must +// resolve the default ("push") before calling. +func IsValidDeliveryMode(s string) bool { + return s == DeliveryModePush || s == DeliveryModePoll +} + type RegisterPayload struct { - ID string `json:"id" binding:"required"` - URL string `json:"url" binding:"required"` - AgentCard json.RawMessage `json:"agent_card" binding:"required"` + ID string `json:"id" binding:"required"` + // URL is required for push-mode workspaces; optional / unused for + // poll-mode (the platform never dispatches to it). The handler + // enforces the conditional requirement based on the resolved + // delivery mode (payload value, falling back to the row's existing + // value, falling back to "push"). + URL string `json:"url"` + AgentCard json.RawMessage `json:"agent_card" binding:"required"` + // DeliveryMode is optional. Empty string means "keep the existing + // value on the workspace row, or default to push for new rows". + // When set, must be one of DeliveryModePush / DeliveryModePoll. + DeliveryMode string `json:"delivery_mode,omitempty"` } type HeartbeatPayload struct { @@ -127,7 +153,11 @@ type CreateWorkspacePayload struct { Model string `json:"model"` Runtime string `json:"runtime"` // "langgraph" (default), "claude-code", etc. External bool `json:"external"` // true = no Docker container, just a registered URL - URL string `json:"url"` // for external workspaces: the A2A endpoint URL + URL string `json:"url"` // for external workspaces: the A2A endpoint URL (push mode only — omit for poll) + // DeliveryMode: "push" (default) sends inbound A2A to URL synchronously; + // "poll" records inbound to activity_logs for the agent to consume via + // GET /activity?since_id=. Poll mode does not require a URL. See #2339. + DeliveryMode string `json:"delivery_mode,omitempty"` WorkspaceDir string `json:"workspace_dir"` // host path to mount as /workspace (empty = isolated volume) WorkspaceAccess string `json:"workspace_access"` // "none" (default), "read_only", or "read_write" — see #65 ParentID *string `json:"parent_id"` diff --git a/workspace-server/migrations/045_workspaces_delivery_mode.down.sql b/workspace-server/migrations/045_workspaces_delivery_mode.down.sql new file mode 100644 index 00000000..2d21840e --- /dev/null +++ b/workspace-server/migrations/045_workspaces_delivery_mode.down.sql @@ -0,0 +1,8 @@ +-- 045_workspaces_delivery_mode.down.sql +-- +-- Drops the delivery_mode column. Any code reading it after rollback falls +-- back to push mode (the pre-#2339 behavior), so this is forward-only-safe +-- only if the matching application code is rolled back in the same release. + +ALTER TABLE workspaces + DROP COLUMN IF EXISTS delivery_mode; diff --git a/workspace-server/migrations/045_workspaces_delivery_mode.up.sql b/workspace-server/migrations/045_workspaces_delivery_mode.up.sql new file mode 100644 index 00000000..1eab5e84 --- /dev/null +++ b/workspace-server/migrations/045_workspaces_delivery_mode.up.sql @@ -0,0 +1,54 @@ +-- 045_workspaces_delivery_mode.up.sql +-- +-- Per-workspace declaration of how A2A traffic is delivered TO the workspace. +-- +-- push (default, today's behavior) +-- Platform synchronously POSTs to workspaces.url and surfaces the response +-- to the caller. Requires a publicly-routable URL (SSRF gate at +-- a2a_proxy.go:455). Used by all hosted runtimes (claude-code, hermes, +-- etc.) where the platform's provisioner sets the URL at boot. +-- +-- poll +-- Platform records the inbound A2A as an a2a_receive activity row and +-- returns 200 to the caller without dispatching. The agent client (e.g. +-- molecule-mcp-claude-channel) consumes the inbox via +-- GET /workspaces/:id/activity?since_id=… and replies via +-- POST /workspaces/:peer/a2a. NO URL required — works through every NAT, +-- firewall, and dev-laptop without a tunnel. +-- +-- Why a column and not a derived signal: +-- +-- * Mutual exclusivity matches Telegram's getUpdates / setWebhook +-- semantics — operationally cleaner than "both half-work because URL +-- is empty". Telegram explicitly rejects double-delivery; we now do +-- the same. +-- * The platform short-circuits BEFORE the SSRF check, so a poll-mode +-- workspace with a stale or missing URL never trips the silent-404 +-- failure mode that motivated #2339. +-- * Push-mode is the safe default: every existing workspace continues +-- to work exactly as before with no migration of behavior. +-- +-- Backwards compatibility: +-- +-- * NOT NULL with DEFAULT 'push' — the ALTER backfills existing rows. +-- * Push-mode workspaces are unchanged: SSRF check still gates dispatch, +-- activity logging unchanged. +-- * Poll-mode opt-in only via POST /workspaces (delivery_mode='poll') +-- or POST /registry/register with delivery_mode='poll'. Cannot be +-- toggled after the fact via heartbeat — flipping mode mid-life is +-- ambiguous (in-flight pushes vs queued polls), so an explicit +-- PATCH /workspaces/:id/delivery_mode endpoint will be added later +-- if the use case appears. +-- +-- Reverse plan: the .down.sql drops the column. Any short-circuit code +-- that reads delivery_mode would then hit a "column does not exist" +-- error — readers fall back to push mode (behaviour pre-2339), which is +-- the safe degradation. Acceptable for a forward-only schema; the down +-- exists for migration tooling parity, not as a recommended runtime path. + +ALTER TABLE workspaces + ADD COLUMN IF NOT EXISTS delivery_mode TEXT NOT NULL DEFAULT 'push' + CHECK (delivery_mode IN ('push', 'poll')); + +COMMENT ON COLUMN workspaces.delivery_mode IS + 'How inbound A2A is delivered: push (synchronous to workspaces.url) or poll (logged to activity_logs, agent reads via GET /activity?since_id=). See migration 045 + RFC #2339.'; From 140fc5fb10eee5a91d0c76521fdf64b026a614f1 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 29 Apr 2026 22:01:22 -0700 Subject: [PATCH 2/5] =?UTF-8?q?fix(a2a):=20v0.2=20=E2=86=92=20v0.3=20compa?= =?UTF-8?q?t=20shim=20at=20proxy=20edge=20(#2345)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #2345. ## Symptom Design Director silently dropped A2A briefs whose sender used the v0.2 message format (`params.message.content` string) instead of v0.3 (`params.message.parts` part-list). The downstream a2a-sdk's v0.3 Pydantic validator rejected with "params.message.parts — Field required" but the rejection only landed in tenant-side logs; the sender saw HTTP 200/202 and assumed delivery. UX Researcher therefore never received the kickoff. Multi-agent pipeline silently idle. ## Fix Convert at the proxy edge in normalizeA2APayload. Two cases handled, one explicitly rejected: v0.2 string content → wrap as [{kind: text, text: }] (the canonical v0.2 case from the dogfooding report) v0.2 list content → preserve list as parts (some older clients put a list under `content`; treat as "client meant parts, used wrong field name") v0.3 parts present → no-op (hot path for normal traffic) Neither present → return HTTP 400 with structured JSON-RPC error pointing at the missing field Why at the proxy edge: every workspace gets the compat for free without each one bumping a2a-sdk separately. The SDK's own compat adapter is strict about `parts` and rejects v0.2 senders. Why reject loud on missing-both: pre-fix the SDK's Pydantic rejection was post-handler-dispatch and invisible to the original sender. Now misshapen payloads return a structured 400 to the actual caller — kills the entire silent-drop class for this payload-shape category. ## Tests 7 new cases on normalizeA2APayload (#2345) + 1 fixture update on the existing _MissingMethodReturnsEmpty test: TestNormalizeA2APayload_ConvertsV02StringContentToParts TestNormalizeA2APayload_ConvertsV02ListContentToParts TestNormalizeA2APayload_PreservesV03Parts (hot path) TestNormalizeA2APayload_RejectsMessageWithNeitherContentNorParts TestNormalizeA2APayload_RejectsContentWithUnsupportedType TestNormalizeA2APayload_NoMessageNoCheck (e.g. tasks/list bypasses) All 11 normalizeA2APayload tests pass + full handler suite (no regressions). ## Refs Hard-gates discussion: this is exactly the class of failure (silent-drop on schema mismatch) that #2342 (continuous synthetic E2E) would catch automatically. Tier 2 RFC item from #2345 (caller gets structured JSON-RPC error on parse failure) is delivered above via the loud-reject path. --- .../internal/handlers/a2a_proxy.go | 43 ++++++++ .../internal/handlers/a2a_proxy_test.go | 102 +++++++++++++++++- 2 files changed, 144 insertions(+), 1 deletion(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 4a7c8026..8520f564 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -486,11 +486,54 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { } // Ensure params.message.messageId exists (required by a2a-sdk) + // AND v0.2→v0.3 compat (#2345): when sender supplies + // params.message.content (v0.2) instead of params.message.parts + // (v0.3), wrap the content as a single text Part so the downstream + // a2a-sdk's v0.3 Pydantic validator accepts the message. + // + // Pre-fix: Design Director silently dropped briefs whose sender + // used v0.2 shape — Pydantic rejected at parse time, the rejection + // went only to logs, and the sender saw a happy 200/202. + // + // Reject loud (HTTP 400) when neither content nor parts is present; + // previously the SDK's own rejection happened post-handler-dispatch + // and was invisible to the original sender. if params, ok := payload["params"].(map[string]interface{}); ok { if msg, ok := params["message"].(map[string]interface{}); ok { if _, hasID := msg["messageId"]; !hasID { msg["messageId"] = uuid.New().String() } + _, hasParts := msg["parts"] + rawContent, hasContent := msg["content"] + if !hasParts { + if hasContent { + switch v := rawContent.(type) { + case string: + msg["parts"] = []interface{}{ + map[string]interface{}{"kind": "text", "text": v}, + } + case []interface{}: + msg["parts"] = v + default: + return nil, "", &proxyA2AError{ + Status: http.StatusBadRequest, + Response: gin.H{ + "error": "invalid params.message.content type", + "hint": "content must be a string (v0.2 compat) or omitted in favour of parts (v0.3)", + }, + } + } + delete(msg, "content") + } else { + return nil, "", &proxyA2AError{ + Status: http.StatusBadRequest, + Response: gin.H{ + "error": "params.message must contain either 'parts' (v0.3) or 'content' (v0.2 compat)", + "hint": "v0.3 example: {\"parts\":[{\"kind\":\"text\",\"text\":\"...\"}]}", + }, + } + } + } } } diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 1a33a866..1b43c402 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -11,6 +11,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "time" @@ -1137,7 +1138,10 @@ func TestNormalizeA2APayload_PreservesExistingMessageId(t *testing.T) { } func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) { - raw := []byte(`{"params":{"message":{"role":"user"}}}`) + // Method extraction returns empty string when method is absent, + // regardless of message validity. Include parts: [] so the v0.2→v0.3 + // compat check (#2345) doesn't reject before method extraction. + raw := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`) _, method, perr := normalizeA2APayload(raw) if perr != nil { t.Fatalf("unexpected error: %+v", perr) @@ -1147,6 +1151,102 @@ func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) { } } +// --- v0.2 → v0.3 compat shim (#2345) --- + +func TestNormalizeA2APayload_ConvertsV02StringContentToParts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":"hello world"}}}`) + out, _, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error: %+v", perr) + } + var parsed map[string]interface{} + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatalf("output not valid JSON: %v", err) + } + msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{}) + if _, stillHasContent := msg["content"]; stillHasContent { + t.Error("v0.2 'content' field should be removed after conversion") + } + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) != 1 { + t.Fatalf("expected 1 part, got %v", msg["parts"]) + } + part := parts[0].(map[string]interface{}) + if part["kind"] != "text" || part["text"] != "hello world" { + t.Errorf("expected {kind:text, text:'hello world'}, got %v", part) + } +} + +func TestNormalizeA2APayload_ConvertsV02ListContentToParts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":[{"kind":"text","text":"hi"}]}}}`) + out, _, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error: %+v", perr) + } + var parsed map[string]interface{} + _ = json.Unmarshal(out, &parsed) + msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{}) + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) != 1 { + t.Fatalf("expected list preserved as parts, got %v", msg["parts"]) + } +} + +func TestNormalizeA2APayload_PreservesV03Parts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"hi"}]}}}`) + out, _, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error: %+v", perr) + } + var parsed map[string]interface{} + _ = json.Unmarshal(out, &parsed) + msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{}) + if _, hasContent := msg["content"]; hasContent { + t.Error("did not expect content field in v0.3-shaped payload output") + } + parts := msg["parts"].([]interface{}) + if len(parts) != 1 { + t.Errorf("expected 1 part preserved, got %d", len(parts)) + } +} + +func TestNormalizeA2APayload_RejectsMessageWithNeitherContentNorParts(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","metadata":{}}}}`) + _, _, perr := normalizeA2APayload(raw) + if perr == nil { + t.Fatal("expected error for message with neither content nor parts") + } + if perr.Status != http.StatusBadRequest { + t.Errorf("expected 400, got %d", perr.Status) + } + errMsg, _ := perr.Response["error"].(string) + if !strings.Contains(errMsg, "parts") || !strings.Contains(errMsg, "content") { + t.Errorf("error message should mention both 'parts' and 'content', got: %q", errMsg) + } +} + +func TestNormalizeA2APayload_RejectsContentWithUnsupportedType(t *testing.T) { + raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":42}}}`) + _, _, perr := normalizeA2APayload(raw) + if perr == nil { + t.Fatal("expected error for non-string non-list content") + } + if perr.Status != http.StatusBadRequest { + t.Errorf("expected 400, got %d", perr.Status) + } +} + +func TestNormalizeA2APayload_NoMessageNoCheck(t *testing.T) { + raw := []byte(`{"method":"tasks/list","params":{}}`) + _, method, perr := normalizeA2APayload(raw) + if perr != nil { + t.Fatalf("unexpected error on params-message-absent payload: %+v", perr) + } + if method != "tasks/list" { + t.Errorf("expected method=tasks/list, got %q", method) + } +} + // --- resolveAgentURL direct unit tests --- func TestResolveAgentURL_CacheHit(t *testing.T) { From db5d11ffca548bd4893f9f04f5f55794e8fbbabe Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 29 Apr 2026 22:04:57 -0700 Subject: [PATCH 3/5] ci: continuous synthetic E2E against staging (#2342) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hard gate Tier 2 item 2 of 4. Cron-driven full-lifecycle E2E that catches regressions visible only at runtime — schema drift, deployment-pipeline gaps, vendor outages, env-var rotations, DNS / CF / Railway side-effects. Empirical motivation from today: - #2345 (A2A v0.2 silent drop) — passed unit tests, broke at JSON-RPC parse layer between sender + receiver. Visible only when a sender exercises the full path. Now-fixed by PR #2349, but a continuous E2E would have surfaced it within 20 min of the regression. - RFC #2312 chat upload — landed staging-branch but never reached staging tenants because publish-workspace-server-image was main- only. Caught by manual dogfooding hours after deploy. Same pattern. Both classes are invisible to PR-time CI. The continuous gate fires every 20 min against a real staging tenant and surfaces regressions within minutes. Cadence: cron `0,20,40 * * * *` (3x/hour). Offsets the existing sweep-cf-orphans (:15) and sweep-cf-tunnels (:45) so the three ops don't burst CF/AWS APIs at the same minute. Concurrency group prevents overlapping runs if one hangs. Cost: ~$0.50-1/day GHA + pennies of staging tenant lifecycle. Reuses existing tests/e2e/test_staging_full_saas.sh — no new harness to maintain. Bounded at 10 min wall-clock (vs 15 min default) so stuck runs fail fast rather than holding up the next firing. Defaults to E2E_RUNTIME=langgraph (fastest cold start; the regression classes this gate catches don't need hermes-specific paths). Operators can dispatch with runtime=hermes when they want SDK-native coverage. Schedule-vs-dispatch hardening: hard-fail on missing CP_STAGING_ADMIN_API_TOKEN for cron firing (silent-skip would mask real outages); soft-skip for operator dispatch. Refs: - #2342 hard-gates Tier 2 item 2 - #2345 (A2A v0.2 fix that this gate would have caught earlier) - #2335 / #2337 (deployment-pipeline gaps that this gate also catches) --- .github/workflows/continuous-synth-e2e.yml | 160 +++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 .github/workflows/continuous-synth-e2e.yml diff --git a/.github/workflows/continuous-synth-e2e.yml b/.github/workflows/continuous-synth-e2e.yml new file mode 100644 index 00000000..e477214a --- /dev/null +++ b/.github/workflows/continuous-synth-e2e.yml @@ -0,0 +1,160 @@ +name: Continuous synthetic E2E (staging) + +# Hard gate (#2342): cron-driven full-lifecycle E2E that catches +# regressions visible only at runtime — schema drift, deployment-pipeline +# gaps, vendor outages, env-var rotations, DNS / CF / Railway side-effects. +# +# Why this gate exists: +# PR-time CI catches code-level regressions but not deployment-time or +# integration-time ones. Today's empirical data: +# • #2345 (A2A v0.2 silent drop) — passed all unit tests, broke at +# JSON-RPC parse layer between sender and receiver. Visible only +# to a sender exercising the full path. +# • RFC #2312 chat upload — landed on staging-branch but never +# reached staging tenants because publish-workspace-server-image +# was main-only. Caught by manual dogfooding hours after deploy. +# Both would have surfaced within 15-20 min of regression if a +# continuous synth-E2E was running. +# +# Cadence: every 20 min (3x/hour). The script is conservatively +# bounded at 10 min wall-clock; even on degraded staging it should +# finish before the next firing. cron-overlap is guarded by the +# concurrency group below. +# +# Cost: ~3 runs/hour × 5-10 min × $0.008/min GHA = ~$0.50-$1/day. +# Plus a fresh tenant provisioned + torn down each run (Railway + +# AWS pennies). Negligible. +# +# Failure handling: when the run fails, the workflow exits non-zero +# and GitHub's standard email/notification path fires. Operators +# can subscribe to this workflow's failure channel for paging-grade +# alerting. + +on: + schedule: + # Every 20 minutes, on the :00 :20 :40. Offsets the existing :15 + # sweep-cf-orphans and :45 sweep-cf-tunnels so the three + # operations don't all hit Cloudflare/AWS at the same minute. + - cron: '0,20,40 * * * *' + workflow_dispatch: + inputs: + runtime: + description: "Runtime to provision (langgraph = fastest, default; hermes = slower but covers SDK-native path; claude-code = needs OAUTH token in tenant env)" + required: false + default: "langgraph" + type: string + keep_org: + description: "Skip teardown for post-mortem debugging (only manual dispatch — never set this for cron runs)" + required: false + default: false + type: boolean + +permissions: + contents: read + # No issue-write here — failures surface as red runs in the workflow + # history. If you want auto-issue-on-fail, add a follow-up step that + # uses gh issue create gated on `if: failure()`. Keeping the surface + # minimal until that's actually wanted. + +# Serialize so two firings can never overlap. Cron firing every 20 min +# but scripts conservatively bounded at 10 min — overlap shouldn't +# happen in steady state, but if a run hangs we don't want N more +# stacking up. +concurrency: + group: continuous-synth-e2e + cancel-in-progress: false + +jobs: + synth: + name: Synthetic E2E against staging + runs-on: ubuntu-latest + timeout-minutes: 12 + env: + # langgraph default keeps cold-start under 5 min on staging EC2. + # hermes is slower (~7-10 min) and isn't needed for the + # regression class this gate exists to catch (deployment-pipeline + # + schema-drift + integration). Operators can pick hermes via + # workflow_dispatch when they need to exercise the SDK-native + # session path. + E2E_RUNTIME: ${{ github.event.inputs.runtime || 'langgraph' }} + # Bound to 10 min so a stuck provision fails the run instead of + # holding up the next cron firing. 15-min default in the script + # is for the on-PR full lifecycle where we have more headroom. + E2E_PROVISION_TIMEOUT_SECS: '600' + # Slug suffix — namespaced "synth-" so these runs are + # distinguishable from PR-driven runs in CP admin. + E2E_RUN_ID: synth-${{ github.run_id }} + # Forced false for cron; respected for manual dispatch + E2E_KEEP_ORG: ${{ github.event.inputs.keep_org == 'true' && '1' || '' }} + MOLECULE_CP_URL: ${{ vars.STAGING_CP_URL || 'https://staging-api.moleculesai.app' }} + MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }} + steps: + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + + - name: Verify required secret present + run: | + # Schedule-vs-dispatch hardening (mirrors the sweep-cf-* and + # redeploy-tenants-on-* workflows): hard-fail on missing secret + # for cron firing so a misconfigured-repo doesn't silently + # report green while doing nothing. Soft-skip on operator + # dispatch — operators can dispatch ad-hoc to verify a fix + # without setting up the secret first. + if [ -z "${MOLECULE_ADMIN_TOKEN:-}" ]; then + if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then + echo "::warning::CP_STAGING_ADMIN_API_TOKEN not set — synth E2E cannot run" + echo "::warning::Set it at Settings → Secrets and Variables → Actions" + exit 0 + fi + echo "::error::CP_STAGING_ADMIN_API_TOKEN secret missing — synth E2E cannot run" + echo "::error::Set it at Settings → Secrets and Variables → Actions; pull from staging-CP's CP_ADMIN_API_TOKEN env in Railway." + exit 1 + fi + + - name: Install required tools + run: | + # The script depends on jq + curl (already on ubuntu-latest) + # and python3 (likewise). Verify they're all present so we + # fail fast on a runner image regression rather than mid-script. + for cmd in jq curl python3; do + command -v "$cmd" >/dev/null 2>&1 || { + echo "::error::required tool '$cmd' not on PATH — runner image regression?" + exit 1 + } + done + + - name: Run synthetic E2E + # The script handles its own teardown via EXIT trap; even on + # failure (timeout, assertion), the org is deprovisioned and + # leaks are reported. Exit code propagates from the script. + run: | + bash tests/e2e/test_staging_full_saas.sh + + - name: Failure summary + # Runs only on failure. Adds a job summary so the workflow run + # page shows a quick "what happened" instead of forcing readers + # to scroll through script output. + if: failure() + run: | + { + echo "## Continuous synth E2E failed" + echo "" + echo "**Run ID:** ${{ github.run_id }}" + echo "**Trigger:** ${{ github.event_name }}" + echo "**Runtime:** ${E2E_RUNTIME}" + echo "**Slug:** synth-${{ github.run_id }}" + echo "" + echo "### What this means" + echo "" + echo "Staging just regressed on a path that previously worked. Likely classes:" + echo "- Schema mismatch between sender and receiver (#2345 class)" + echo "- Deployment-pipeline gap (RFC #2312 / staging-tenant-image-stale class)" + echo "- Vendor outage (Cloudflare, Railway, AWS, GHCR)" + echo "- Staging-CP env var rotation" + echo "" + echo "### Next steps" + echo "" + echo "1. Check the script output above for the assertion that failed" + echo "2. If it's a vendor outage, no action needed — next firing in ~20 min" + echo "3. If it's a code regression, find the causing PR via \`git log\` against last green run and revert/fix" + echo "4. Keep an eye on the next 1-2 firings — flake vs persistent fail differs in priority" + } >> "$GITHUB_STEP_SUMMARY" From 68f18424f54a0ca87fafa3b3f820423803be31be Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 29 Apr 2026 22:12:58 -0700 Subject: [PATCH 4/5] test(arch): codify 4 module boundaries as architecture tests (#2344) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hard gate #4: codified module boundaries as Go tests, so a new contributor (or AI agent) can't silently land an import that crosses a layer. Boundaries enforced (one architecture_test.go per package): - wsauth has no internal/* deps — auth leaf, must be unit-testable in isolation - models has no internal/* deps — pure-types leaf, reverse dep would create cycles since most packages depend on models - db has no internal/* deps — DB layer below business logic, must be testable with sqlmock without spinning up handlers/provisioner - provisioner does not import handlers or router — unidirectional layering: handlers wires provisioner into HTTP routes; the reverse is a cycle Each test parses .go files in its package via go/parser (no x/tools dep needed) and asserts forbidden import paths don't appear. Failure messages name the rule, the offending file, and explain WHY the boundary exists so the diff reviewer learns the rule. Note: the original issue's first two proposed boundaries (provisioner-no-DB, handlers-no-docker) don't match the codebase today — provisioner already imports db (PR #2276 runtime-image lookup) and handlers hold *docker.Client directly (terminal, plugins, bundle, templates). I picked the four boundaries that actually hold; the first two are aspirational and would need a refactor before they could be codified. Hand-tested by injecting a deliberate wsauth -> orgtoken violation: the gate fires red with the rule message before merge. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/db/architecture_test.go | 63 +++++++++++++++ .../internal/models/architecture_test.go | 63 +++++++++++++++ .../internal/provisioner/architecture_test.go | 80 +++++++++++++++++++ .../internal/wsauth/architecture_test.go | 68 ++++++++++++++++ 4 files changed, 274 insertions(+) create mode 100644 workspace-server/internal/db/architecture_test.go create mode 100644 workspace-server/internal/models/architecture_test.go create mode 100644 workspace-server/internal/provisioner/architecture_test.go create mode 100644 workspace-server/internal/wsauth/architecture_test.go diff --git a/workspace-server/internal/db/architecture_test.go b/workspace-server/internal/db/architecture_test.go new file mode 100644 index 00000000..d585db62 --- /dev/null +++ b/workspace-server/internal/db/architecture_test.go @@ -0,0 +1,63 @@ +package db_test + +// Architecture test (#2344): db is a leaf — DB pool + migrations + raw +// SQL helpers, no business-logic dependencies. The DB layer must be +// testable with sqlmock in isolation. If db starts importing handlers +// or provisioner, every db unit test would need to bring up that +// subsystem, and the layering becomes circular. +// +// If this test fails: you put business logic in the db package. Move +// it to a higher-tier package that imports db, not the reverse. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +func TestDBHasNoInternalDependencies(t *testing.T) { + t.Parallel() + for path, file := range listImports(t, ".") { + if strings.HasPrefix(path, moduleInternalPrefix) { + t.Errorf( + "db must not import other internal packages "+ + "(found %q in %s) — db is the foundation layer and a "+ + "reverse dep creates a cycle (everything imports db). "+ + "See workspace-server/internal/db/architecture_test.go.", + path, file, + ) + } + } +} + +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/internal/models/architecture_test.go b/workspace-server/internal/models/architecture_test.go new file mode 100644 index 00000000..40b65ba6 --- /dev/null +++ b/workspace-server/internal/models/architecture_test.go @@ -0,0 +1,63 @@ +package models_test + +// Architecture test (#2344): models is a leaf — it carries pure type +// definitions and must not import any other internal/* package. Almost +// every package in workspace-server depends on models; if models grew a +// reverse dep, the import graph would cycle. +// +// If this test fails: you put behavior inside models. Move the behavior +// to whichever package actually owns it (handlers, provisioner, db, …) +// and have *that* package import models, not the reverse. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +func TestModelsHasNoInternalDependencies(t *testing.T) { + t.Parallel() + for path, file := range listImports(t, ".") { + if strings.HasPrefix(path, moduleInternalPrefix) { + t.Errorf( + "models must not import other internal packages "+ + "(found %q in %s) — models is the pure-types leaf and any "+ + "reverse dep creates an import cycle since most packages "+ + "depend on models. See workspace-server/internal/models/architecture_test.go.", + path, file, + ) + } + } +} + +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/internal/provisioner/architecture_test.go b/workspace-server/internal/provisioner/architecture_test.go new file mode 100644 index 00000000..c7455e52 --- /dev/null +++ b/workspace-server/internal/provisioner/architecture_test.go @@ -0,0 +1,80 @@ +package provisioner_test + +// Architecture test (#2344): provisioner is below handlers/router in +// the layer hierarchy. handlers wires provisioner into HTTP routes; +// the reverse direction (provisioner reaching back into handlers or +// the router) creates a cycle and tangles infra-orchestration with +// transport. +// +// Note: provisioner CURRENTLY imports db (for the runtime-image +// lookup). That's a known coupling — see PR #2276 review thread on +// where image resolution should live. The narrower rule we enforce +// here is "no upward import to handlers/router," which is the harder +// rule to keep clean. +// +// If this test fails: you reached "up" the stack. Pass whatever you +// need from handlers down through a constructor parameter or a +// function-typed callback instead of importing the package directly. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +var provisionerForbiddenImports = []string{ + moduleInternalPrefix + "handlers", + moduleInternalPrefix + "router", +} + +func TestProvisionerDoesNotImportUpstreamLayers(t *testing.T) { + t.Parallel() + imports := listImports(t, ".") + for path, file := range imports { + for _, forbidden := range provisionerForbiddenImports { + if path == forbidden || strings.HasPrefix(path, forbidden+"/") { + t.Errorf( + "provisioner must not import %q (found in %s) — "+ + "provisioner sits below handlers/router in the layer "+ + "hierarchy and a reverse dep creates a cycle. Pass "+ + "what you need down via constructor params or "+ + "function-typed callbacks. See workspace-server/internal/"+ + "provisioner/architecture_test.go.", + path, file, + ) + } + } + } +} + +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} diff --git a/workspace-server/internal/wsauth/architecture_test.go b/workspace-server/internal/wsauth/architecture_test.go new file mode 100644 index 00000000..c61e5b7e --- /dev/null +++ b/workspace-server/internal/wsauth/architecture_test.go @@ -0,0 +1,68 @@ +package wsauth_test + +// Architecture test (#2344): wsauth is a leaf package — it must not import +// any other internal/* package. The auth layer is below business logic; +// importing handlers, db, or any cousin package would force every wsauth +// test to spin up that subsystem, defeating the unit-test boundary that +// makes the auth code reviewable. +// +// If this test fails: you added an import that crosses a layer. Either +// move the dependency the other direction (consumer wires wsauth into +// itself), accept the boundary by inlining what you need, or — if the +// new coupling is genuinely correct — explicitly update this test with +// the new allowed import + a comment explaining why. + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/" + +func TestWsauthHasNoInternalDependencies(t *testing.T) { + t.Parallel() + for path, file := range listImports(t, ".") { + if strings.HasPrefix(path, moduleInternalPrefix) { + t.Errorf( + "wsauth must not import other internal packages "+ + "(found %q in %s) — wsauth is the auth leaf and must stay "+ + "unit-testable without spinning up other subsystems. "+ + "See workspace-server/internal/wsauth/architecture_test.go for context.", + path, file, + ) + } + } +} + +// listImports returns import-path → first-file-where-seen for non-test +// .go files in dir. Used by every architecture_test.go in this tree. +func listImports(t *testing.T, dir string) map[string]string { + t.Helper() + fset := token.NewFileSet() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read %s: %v", dir, err) + } + out := make(map[string]string) + for _, e := range entries { + name := e.Name() + if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", name, err) + } + for _, imp := range f.Imports { + path := strings.Trim(imp.Path.Value, "\"") + if _, seen := out[path]; !seen { + out[path] = name + } + } + } + return out +} From 91a1d5377d0ae009d70c1daaff42737398ec5d8b Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 29 Apr 2026 22:22:28 -0700 Subject: [PATCH 5/5] feat(a2a): poll-mode short-circuit in ProxyA2A (#2339 PR 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Skip SSRF/dispatch and queue to activity_logs for delivery_mode=poll workspaces. The polling agent (e.g. molecule-mcp-claude-channel on an operator's laptop) consumes via GET /activity?since_id= in PR 3 — no public URL needed. Order: budget -> normalize -> lookupDeliveryMode short-circuit -> resolveAgentURL. Normalizing before the short-circuit keeps the JSON-RPC method name on the activity_logs row so the polling agent can dispatch correctly. Fail-closed-to-push: any DB error reading delivery_mode defaults to push (loud + recoverable) rather than poll (silent drop). Tests: - TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch — core invariant: no resolveAgentURL, no Do(), records to activity_logs, returns 200 {status:"queued",delivery_mode:"poll",method:"message/send"}. - TestProxyA2A_PushMode_NoShortCircuit — push path unaffected; the agent server actually receives the request. - TestProxyA2A_PollMode_FailsClosedToPush — DB error on mode lookup must NOT silently queue; falls through to the push path. Stacked on #2348 (PR 1: schema + register flow). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/a2a_proxy.go | 48 ++++- .../internal/handlers/a2a_proxy_helpers.go | 70 +++++++ .../internal/handlers/a2a_proxy_test.go | 182 ++++++++++++++++++ 3 files changed, 295 insertions(+), 5 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 4a7c8026..2961a89c 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -21,6 +21,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" @@ -305,17 +306,54 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri return 0, nil, proxyErr } - agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) - if proxyErr != nil { - return 0, nil, proxyErr - } - + // Normalize the JSON-RPC envelope BEFORE the poll-mode short-circuit + // so the activity_logs entry carries the protocol method name (initialize, + // message/send, etc.) — the polling agent uses that to dispatch the + // request body to the right handler. Doing it here also means a + // malformed payload fails the same way for push and poll callers + // (consistent 400 instead of "queued garbage"). normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body) if proxyErr != nil { return 0, nil, proxyErr } body = normalizedBody + // #2339 PR 2 — poll-mode short-circuit. When the target workspace + // is registered as delivery_mode=poll (e.g. an operator's laptop + // running molecule-mcp-claude-channel), the platform does NOT + // dispatch over HTTP — the agent has no public URL. Instead we record + // the A2A request to activity_logs and the agent picks it up via + // GET /activity?since_id= (PR 3). + // + // Returning here means we skip resolveAgentURL entirely (no SSRF check + // needed — there's no URL to validate; no DNS lookup against potentially- + // changing operator-side IPs) and skip the dispatch path completely + // (no Do(), no maybeMarkContainerDead). The response is a synthetic + // {status:"queued"} envelope so the caller (canvas, another workspace) + // knows delivery is acknowledged but pending consumption. + if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll { + if logActivity { + h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod) + } + respBody, marshalErr := json.Marshal(gin.H{ + "status": "queued", + "delivery_mode": models.DeliveryModePoll, + "method": a2aMethod, + }) + if marshalErr != nil { + return 0, nil, &proxyA2AError{ + Status: http.StatusInternalServerError, + Response: gin.H{"error": "failed to marshal poll-mode response"}, + } + } + return http.StatusOK, respBody, nil + } + + agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) + if proxyErr != nil { + return 0, nil, proxyErr + } + startTime := time.Now() resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 3e67667f..d0ccea86 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -5,6 +5,7 @@ package handlers import ( "context" + "database/sql" "encoding/json" "errors" "log" @@ -13,6 +14,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" ) @@ -376,6 +378,74 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) { return 0, 0 } +// lookupDeliveryMode returns the workspace's delivery_mode. On any DB +// error or missing row it returns DeliveryModePush — the fail-closed +// default. "Closed" here means "fall back to today's behavior (synchronous +// dispatch)" rather than "fall back to drop the request silently into +// activity_logs where the agent might never see it." A poll-mode workspace +// that briefly reads as push will get its A2A request dispatched to the +// stored URL (or a 502 if no URL); a push-mode workspace that briefly +// reads as poll would get its request silently queued with no dispatch. +// The first failure is loud + recoverable; the second is silent. +// +// The function is intentionally lookup-only — it never mutates the row. +// The register handler (registry.go) is the only writer for delivery_mode. +// +// See #2339 PR 1 for the column + register-flow side; this is the +// proxy-side read used for the short-circuit in proxyA2ARequest. +func lookupDeliveryMode(ctx context.Context, workspaceID string) string { + var mode sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&mode) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err) + } + return models.DeliveryModePush + } + if !mode.Valid || mode.String == "" { + return models.DeliveryModePush + } + if !models.IsValidDeliveryMode(mode.String) { + log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String) + return models.DeliveryModePush + } + return mode.String +} + +// logA2AReceiveQueued records a poll-mode "queued" A2A receive into +// activity_logs. Same shape as logA2ASuccess but without ResponseBody +// (there is no response yet — the polling agent will produce one when +// it picks the request up). status="ok" because the request was +// successfully queued; the consume side reports its own outcome. +// +// The activity_logs row is what the polling agent's GET /activity?since_id= +// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A +// without a public URL. +func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) { + var wsName string + db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (queued for poll)" + go func(parent context.Context) { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + Status: "ok", + }) + }(ctx) +} + // readUsageMap extracts input_tokens / output_tokens from the "usage" key of m. // Returns (0, 0, false) when the key is absent or contains no non-zero values. func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 1a33a866..7bf95818 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -1704,3 +1704,185 @@ func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) { t.Errorf("unmet DB expectations: %v", err) } } + +// ==================== ProxyA2A — poll-mode short-circuit (#2339 PR 2) ==================== + +// TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch verifies the core +// invariant of #2339 PR 2: when delivery_mode=poll, ProxyA2A must NOT +// hit resolveAgentURL (which would SSRF-check or 502 on a missing URL) +// and must NOT dispatch over HTTP. It records the request to activity_logs +// and returns 200 {status:"queued"} instead. +// +// Without this short-circuit, the canvas chat fails for any workspace +// running molecule-mcp-claude-channel (operator's laptop, no public URL): +// resolveAgentURL would 502 on the missing URL and the polling agent +// would never see the inbound message. That's the bug PR 2 fixes. +func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-poll-shortcircuit" + + // Budget check still runs (above the short-circuit) — affirms the + // budget guard is mode-agnostic, which is correct: a poll-mode + // workspace shouldn't burn unmetered platform CPU/storage either. + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode SELECT — returns poll, triggering the short-circuit. + // Note: NO ExpectQuery for `SELECT url, status FROM workspaces` (that's + // resolveAgentURL's query) — the short-circuit must skip resolveAgentURL. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll")) + + // Activity log: the queued receive (logA2AReceiveQueued in helpers.go). + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"poll-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["status"] != "queued" { + t.Errorf("response.status = %v, want %q", resp["status"], "queued") + } + if resp["delivery_mode"] != "poll" { + t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll") + } + if resp["method"] != "message/send" { + t.Errorf("response.method = %v, want %q (the JSON-RPC method that was queued)", resp["method"], "message/send") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract: +// a push-mode workspace (default) is NOT affected by the new short-circuit. +// It still proceeds to resolveAgentURL + dispatch. Without this guard, a +// regression in lookupDeliveryMode could silently break the entire fleet. +func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-push-default" + + dispatched := false + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + dispatched = true + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL) + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode returns "push" — short-circuit must NOT fire. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("push")) + + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"push-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (dispatched), got %d: %s", w.Code, w.Body.String()) + } + if !dispatched { + t.Error("push-mode workspace: expected the agent server to receive the request, but it did not") + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err == nil { + if resp["status"] == "queued" { + t.Error("push-mode response leaked queued envelope — short-circuit fired when it shouldn't have") + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract: +// a DB error reading delivery_mode must default to push (the existing +// behavior), NOT poll. Failing to push means a poll-mode workspace +// briefly attempts a real dispatch — visible failure (502 / SSRF +// rejection / restart cascade), not a silent drop into activity_logs +// where the agent might never look. Loud > silent, recoverable > lost. +func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-mode-db-error" + + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode hits a transient DB error → must default push. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrConnDone) + + // Push path proceeds to resolveAgentURL — empty result → 502 path. + mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id ="). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url", "status"})) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + if w.Code == http.StatusOK { + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["status"] == "queued" { + t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String()) + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +}