feat(a2a): poll-mode short-circuit in ProxyA2A (#2339 PR 2)

Skip SSRF/dispatch and queue to activity_logs for delivery_mode=poll
workspaces. The polling agent (e.g. molecule-mcp-claude-channel on an
operator's laptop) consumes via GET /activity?since_id= in PR 3 — no
public URL needed.

Order: budget -> normalize -> lookupDeliveryMode short-circuit ->
resolveAgentURL. Normalizing before the short-circuit keeps the
JSON-RPC method name on the activity_logs row so the polling agent
can dispatch correctly.

Fail-closed-to-push: any DB error reading delivery_mode defaults to
push (loud + recoverable) rather than poll (silent drop).

Tests:
- TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch — core invariant:
  no resolveAgentURL, no Do(), records to activity_logs, returns 200
  {status:"queued",delivery_mode:"poll",method:"message/send"}.
- TestProxyA2A_PushMode_NoShortCircuit — push path unaffected; the agent
  server actually receives the request.
- TestProxyA2A_PollMode_FailsClosedToPush — DB error on mode lookup
  must NOT silently queue; falls through to the push path.

Stacked on #2348 (PR 1: schema + register flow).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-29 22:22:28 -07:00
parent d5b00d6ac1
commit 91a1d5377d
3 changed files with 295 additions and 5 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
@ -305,17 +306,54 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
return 0, nil, proxyErr
}
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
if proxyErr != nil {
return 0, nil, proxyErr
}
// Normalize the JSON-RPC envelope BEFORE the poll-mode short-circuit
// so the activity_logs entry carries the protocol method name (initialize,
// message/send, etc.) — the polling agent uses that to dispatch the
// request body to the right handler. Doing it here also means a
// malformed payload fails the same way for push and poll callers
// (consistent 400 instead of "queued garbage").
normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body)
if proxyErr != nil {
return 0, nil, proxyErr
}
body = normalizedBody
// #2339 PR 2 — poll-mode short-circuit. When the target workspace
// is registered as delivery_mode=poll (e.g. an operator's laptop
// running molecule-mcp-claude-channel), the platform does NOT
// dispatch over HTTP — the agent has no public URL. Instead we record
// the A2A request to activity_logs and the agent picks it up via
// GET /activity?since_id= (PR 3).
//
// Returning here means we skip resolveAgentURL entirely (no SSRF check
// needed — there's no URL to validate; no DNS lookup against potentially-
// changing operator-side IPs) and skip the dispatch path completely
// (no Do(), no maybeMarkContainerDead). The response is a synthetic
// {status:"queued"} envelope so the caller (canvas, another workspace)
// knows delivery is acknowledged but pending consumption.
if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll {
if logActivity {
h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod)
}
respBody, marshalErr := json.Marshal(gin.H{
"status": "queued",
"delivery_mode": models.DeliveryModePoll,
"method": a2aMethod,
})
if marshalErr != nil {
return 0, nil, &proxyA2AError{
Status: http.StatusInternalServerError,
Response: gin.H{"error": "failed to marshal poll-mode response"},
}
}
return http.StatusOK, respBody, nil
}
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
if proxyErr != nil {
return 0, nil, proxyErr
}
startTime := time.Now()
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
if cancelFwd != nil {

View File

@ -5,6 +5,7 @@ package handlers
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
@ -13,6 +14,7 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
)
@ -376,6 +378,74 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) {
return 0, 0
}
// lookupDeliveryMode returns the workspace's delivery_mode. On any DB
// error or missing row it returns DeliveryModePush — the fail-closed
// default. "Closed" here means "fall back to today's behavior (synchronous
// dispatch)" rather than "fall back to drop the request silently into
// activity_logs where the agent might never see it." A poll-mode workspace
// that briefly reads as push will get its A2A request dispatched to the
// stored URL (or a 502 if no URL); a push-mode workspace that briefly
// reads as poll would get its request silently queued with no dispatch.
// The first failure is loud + recoverable; the second is silent.
//
// The function is intentionally lookup-only — it never mutates the row.
// The register handler (registry.go) is the only writer for delivery_mode.
//
// See #2339 PR 1 for the column + register-flow side; this is the
// proxy-side read used for the short-circuit in proxyA2ARequest.
func lookupDeliveryMode(ctx context.Context, workspaceID string) string {
var mode sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&mode)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err)
}
return models.DeliveryModePush
}
if !mode.Valid || mode.String == "" {
return models.DeliveryModePush
}
if !models.IsValidDeliveryMode(mode.String) {
log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String)
return models.DeliveryModePush
}
return mode.String
}
// logA2AReceiveQueued records a poll-mode "queued" A2A receive into
// activity_logs. Same shape as logA2ASuccess but without ResponseBody
// (there is no response yet — the polling agent will produce one when
// it picks the request up). status="ok" because the request was
// successfully queued; the consume side reports its own outcome.
//
// The activity_logs row is what the polling agent's GET /activity?since_id=
// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A
// without a public URL.
func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) {
var wsName string
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
if wsName == "" {
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (queued for poll)"
go func(parent context.Context) {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
})
}(ctx)
}
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
// Returns (0, 0, false) when the key is absent or contains no non-zero values.
func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) {

View File

@ -1704,3 +1704,185 @@ func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) {
t.Errorf("unmet DB expectations: %v", err)
}
}
// ==================== ProxyA2A — poll-mode short-circuit (#2339 PR 2) ====================
// TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch verifies the core
// invariant of #2339 PR 2: when delivery_mode=poll, ProxyA2A must NOT
// hit resolveAgentURL (which would SSRF-check or 502 on a missing URL)
// and must NOT dispatch over HTTP. It records the request to activity_logs
// and returns 200 {status:"queued"} instead.
//
// Without this short-circuit, the canvas chat fails for any workspace
// running molecule-mcp-claude-channel (operator's laptop, no public URL):
// resolveAgentURL would 502 on the missing URL and the polling agent
// would never see the inbound message. That's the bug PR 2 fixes.
func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsID = "ws-poll-shortcircuit"
// Budget check still runs (above the short-circuit) — affirms the
// budget guard is mode-agnostic, which is correct: a poll-mode
// workspace shouldn't burn unmetered platform CPU/storage either.
expectBudgetCheck(mock, wsID)
// lookupDeliveryMode SELECT — returns poll, triggering the short-circuit.
// Note: NO ExpectQuery for `SELECT url, status FROM workspaces` (that's
// resolveAgentURL's query) — the short-circuit must skip resolveAgentURL.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
// Activity log: the queued receive (logA2AReceiveQueued in helpers.go).
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
body := `{"jsonrpc":"2.0","id":"poll-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp["status"] != "queued" {
t.Errorf("response.status = %v, want %q", resp["status"], "queued")
}
if resp["delivery_mode"] != "poll" {
t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll")
}
if resp["method"] != "message/send" {
t.Errorf("response.method = %v, want %q (the JSON-RPC method that was queued)", resp["method"], "message/send")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract:
// a push-mode workspace (default) is NOT affected by the new short-circuit.
// It still proceeds to resolveAgentURL + dispatch. Without this guard, a
// regression in lookupDeliveryMode could silently break the entire fleet.
func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsID = "ws-push-default"
dispatched := false
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dispatched = true
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL)
expectBudgetCheck(mock, wsID)
// lookupDeliveryMode returns "push" — short-circuit must NOT fire.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("push"))
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
body := `{"jsonrpc":"2.0","id":"push-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (dispatched), got %d: %s", w.Code, w.Body.String())
}
if !dispatched {
t.Error("push-mode workspace: expected the agent server to receive the request, but it did not")
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err == nil {
if resp["status"] == "queued" {
t.Error("push-mode response leaked queued envelope — short-circuit fired when it shouldn't have")
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract:
// a DB error reading delivery_mode must default to push (the existing
// behavior), NOT poll. Failing to push means a poll-mode workspace
// briefly attempts a real dispatch — visible failure (502 / SSRF
// rejection / restart cascade), not a silent drop into activity_logs
// where the agent might never look. Loud > silent, recoverable > lost.
func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsID = "ws-mode-db-error"
expectBudgetCheck(mock, wsID)
// lookupDeliveryMode hits a transient DB error → must default push.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnError(sql.ErrConnDone)
// Push path proceeds to resolveAgentURL — empty result → 502 path.
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id =").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
body := `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
if w.Code == http.StatusOK {
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] == "queued" {
t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String())
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}