From 91a1d5377d0ae009d70c1daaff42737398ec5d8b Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 29 Apr 2026 22:22:28 -0700 Subject: [PATCH] feat(a2a): poll-mode short-circuit in ProxyA2A (#2339 PR 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Skip SSRF/dispatch and queue to activity_logs for delivery_mode=poll workspaces. The polling agent (e.g. molecule-mcp-claude-channel on an operator's laptop) consumes via GET /activity?since_id= in PR 3 — no public URL needed. Order: budget -> normalize -> lookupDeliveryMode short-circuit -> resolveAgentURL. Normalizing before the short-circuit keeps the JSON-RPC method name on the activity_logs row so the polling agent can dispatch correctly. Fail-closed-to-push: any DB error reading delivery_mode defaults to push (loud + recoverable) rather than poll (silent drop). Tests: - TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch — core invariant: no resolveAgentURL, no Do(), records to activity_logs, returns 200 {status:"queued",delivery_mode:"poll",method:"message/send"}. - TestProxyA2A_PushMode_NoShortCircuit — push path unaffected; the agent server actually receives the request. - TestProxyA2A_PollMode_FailsClosedToPush — DB error on mode lookup must NOT silently queue; falls through to the push path. Stacked on #2348 (PR 1: schema + register flow). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/a2a_proxy.go | 48 ++++- .../internal/handlers/a2a_proxy_helpers.go | 70 +++++++ .../internal/handlers/a2a_proxy_test.go | 182 ++++++++++++++++++ 3 files changed, 295 insertions(+), 5 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 4a7c8026..2961a89c 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -21,6 +21,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" @@ -305,17 +306,54 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri return 0, nil, proxyErr } - agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) - if proxyErr != nil { - return 0, nil, proxyErr - } - + // Normalize the JSON-RPC envelope BEFORE the poll-mode short-circuit + // so the activity_logs entry carries the protocol method name (initialize, + // message/send, etc.) — the polling agent uses that to dispatch the + // request body to the right handler. Doing it here also means a + // malformed payload fails the same way for push and poll callers + // (consistent 400 instead of "queued garbage"). normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body) if proxyErr != nil { return 0, nil, proxyErr } body = normalizedBody + // #2339 PR 2 — poll-mode short-circuit. When the target workspace + // is registered as delivery_mode=poll (e.g. an operator's laptop + // running molecule-mcp-claude-channel), the platform does NOT + // dispatch over HTTP — the agent has no public URL. Instead we record + // the A2A request to activity_logs and the agent picks it up via + // GET /activity?since_id= (PR 3). + // + // Returning here means we skip resolveAgentURL entirely (no SSRF check + // needed — there's no URL to validate; no DNS lookup against potentially- + // changing operator-side IPs) and skip the dispatch path completely + // (no Do(), no maybeMarkContainerDead). The response is a synthetic + // {status:"queued"} envelope so the caller (canvas, another workspace) + // knows delivery is acknowledged but pending consumption. + if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll { + if logActivity { + h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod) + } + respBody, marshalErr := json.Marshal(gin.H{ + "status": "queued", + "delivery_mode": models.DeliveryModePoll, + "method": a2aMethod, + }) + if marshalErr != nil { + return 0, nil, &proxyA2AError{ + Status: http.StatusInternalServerError, + Response: gin.H{"error": "failed to marshal poll-mode response"}, + } + } + return http.StatusOK, respBody, nil + } + + agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) + if proxyErr != nil { + return 0, nil, proxyErr + } + startTime := time.Now() resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 3e67667f..d0ccea86 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -5,6 +5,7 @@ package handlers import ( "context" + "database/sql" "encoding/json" "errors" "log" @@ -13,6 +14,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" ) @@ -376,6 +378,74 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) { return 0, 0 } +// lookupDeliveryMode returns the workspace's delivery_mode. On any DB +// error or missing row it returns DeliveryModePush — the fail-closed +// default. "Closed" here means "fall back to today's behavior (synchronous +// dispatch)" rather than "fall back to drop the request silently into +// activity_logs where the agent might never see it." A poll-mode workspace +// that briefly reads as push will get its A2A request dispatched to the +// stored URL (or a 502 if no URL); a push-mode workspace that briefly +// reads as poll would get its request silently queued with no dispatch. +// The first failure is loud + recoverable; the second is silent. +// +// The function is intentionally lookup-only — it never mutates the row. +// The register handler (registry.go) is the only writer for delivery_mode. +// +// See #2339 PR 1 for the column + register-flow side; this is the +// proxy-side read used for the short-circuit in proxyA2ARequest. +func lookupDeliveryMode(ctx context.Context, workspaceID string) string { + var mode sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&mode) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err) + } + return models.DeliveryModePush + } + if !mode.Valid || mode.String == "" { + return models.DeliveryModePush + } + if !models.IsValidDeliveryMode(mode.String) { + log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String) + return models.DeliveryModePush + } + return mode.String +} + +// logA2AReceiveQueued records a poll-mode "queued" A2A receive into +// activity_logs. Same shape as logA2ASuccess but without ResponseBody +// (there is no response yet — the polling agent will produce one when +// it picks the request up). status="ok" because the request was +// successfully queued; the consume side reports its own outcome. +// +// The activity_logs row is what the polling agent's GET /activity?since_id= +// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A +// without a public URL. +func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) { + var wsName string + db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (queued for poll)" + go func(parent context.Context) { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + Status: "ok", + }) + }(ctx) +} + // readUsageMap extracts input_tokens / output_tokens from the "usage" key of m. // Returns (0, 0, false) when the key is absent or contains no non-zero values. func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 1a33a866..7bf95818 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -1704,3 +1704,185 @@ func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) { t.Errorf("unmet DB expectations: %v", err) } } + +// ==================== ProxyA2A — poll-mode short-circuit (#2339 PR 2) ==================== + +// TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch verifies the core +// invariant of #2339 PR 2: when delivery_mode=poll, ProxyA2A must NOT +// hit resolveAgentURL (which would SSRF-check or 502 on a missing URL) +// and must NOT dispatch over HTTP. It records the request to activity_logs +// and returns 200 {status:"queued"} instead. +// +// Without this short-circuit, the canvas chat fails for any workspace +// running molecule-mcp-claude-channel (operator's laptop, no public URL): +// resolveAgentURL would 502 on the missing URL and the polling agent +// would never see the inbound message. That's the bug PR 2 fixes. +func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-poll-shortcircuit" + + // Budget check still runs (above the short-circuit) — affirms the + // budget guard is mode-agnostic, which is correct: a poll-mode + // workspace shouldn't burn unmetered platform CPU/storage either. + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode SELECT — returns poll, triggering the short-circuit. + // Note: NO ExpectQuery for `SELECT url, status FROM workspaces` (that's + // resolveAgentURL's query) — the short-circuit must skip resolveAgentURL. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll")) + + // Activity log: the queued receive (logA2AReceiveQueued in helpers.go). + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"poll-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["status"] != "queued" { + t.Errorf("response.status = %v, want %q", resp["status"], "queued") + } + if resp["delivery_mode"] != "poll" { + t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll") + } + if resp["method"] != "message/send" { + t.Errorf("response.method = %v, want %q (the JSON-RPC method that was queued)", resp["method"], "message/send") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract: +// a push-mode workspace (default) is NOT affected by the new short-circuit. +// It still proceeds to resolveAgentURL + dispatch. Without this guard, a +// regression in lookupDeliveryMode could silently break the entire fleet. +func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-push-default" + + dispatched := false + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + dispatched = true + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL) + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode returns "push" — short-circuit must NOT fire. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("push")) + + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"push-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (dispatched), got %d: %s", w.Code, w.Body.String()) + } + if !dispatched { + t.Error("push-mode workspace: expected the agent server to receive the request, but it did not") + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err == nil { + if resp["status"] == "queued" { + t.Error("push-mode response leaked queued envelope — short-circuit fired when it shouldn't have") + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract: +// a DB error reading delivery_mode must default to push (the existing +// behavior), NOT poll. Failing to push means a poll-mode workspace +// briefly attempts a real dispatch — visible failure (502 / SSRF +// rejection / restart cascade), not a silent drop into activity_logs +// where the agent might never look. Loud > silent, recoverable > lost. +func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-mode-db-error" + + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode hits a transient DB error → must default push. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnError(sql.ErrConnDone) + + // Push path proceeds to resolveAgentURL — empty result → 502 path. + mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id ="). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url", "status"})) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + body := `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + if w.Code == http.StatusOK { + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["status"] == "queued" { + t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String()) + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +}