From ae79b9e9fe55883bcaafa9f6b83bc5d724280c09 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 20:50:46 -0700 Subject: [PATCH] feat(delegations): result-push to caller inbox behind feature flag (RFC #2829 PR-2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a delegation completes (or fails), also write an `activity_type='a2a_receive'` row to the caller's activity_logs so the caller's inbox poller (workspace/inbox.py — `?type=a2a_receive`) surfaces the result to the agent. Why: today the only way the caller agent learns about a delegation result is by holding open an HTTP `message/send` connection through the platform proxy. That connection has a hard timeout (~600s) — a 90-iteration external-runtime task on stream output routinely blows past it, and the result emitted after the timeout lands in /dev/null. (Hongming's home hermes hit this on 2026-05-05 — task was actively heartbeating "iteration 14/90" when the proxy timer fired.) This PR adds the SERVER-SIDE result-push so the result is durably delivered to the caller's inbox queue. The agent-side cutover (replace sync httpx delegation with delegate_task_async + wait_for_message poll) ships in the next PR — once both land, the proxy timeout class is gone. ## Feature flag `DELEGATION_RESULT_INBOX_PUSH=1` enables the push. Default off — staging canary first, flip after RFC #2829 PR-3 (agent-side) lands and proves the round-trip end-to-end. With the flag off, behavior is byte-identical to before this PR (verified by TestUpdateStatus_FlagOff_NoNewSQL). ## Two write sites 1. UpdateStatus handler (POST /workspaces/:id/delegations/:id/update) — agent-initiated delegations report status here 2. executeDelegation goroutine — canvas-initiated delegations (POST /workspaces/:id/delegate) report status from this background coroutine Both paths call `pushDelegationResultToInbox` which is best-effort: an INSERT failure logs but does NOT propagate up. The existing `delegate_result` row in activity_logs (the dashboard view) remains authoritative; the new `a2a_receive` row is purely additive for the inbox-poller to surface. ## Coverage 6 new tests in delegation_inbox_push_test.go: - flag off → no SQL fired (the rollout-safety contract) - flag on, completed → a2a_receive row with status=ok - flag on, failed → a2a_receive row with status=error + error_detail - UpdateStatus end-to-end (flag on, completed) - UpdateStatus end-to-end (flag on, failed) - UpdateStatus end-to-end (flag off, byte-identical to pre-PR behavior) All 30 existing delegation_test.go tests still pass — flag default off keeps the strict-sqlmock surface unchanged. Refs RFC #2829. --- .../internal/handlers/delegation.go | 75 ++++++ .../handlers/delegation_inbox_push_test.go | 246 ++++++++++++++++++ 2 files changed, 321 insertions(+) create mode 100644 workspace-server/internal/handlers/delegation_inbox_push_test.go diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index 59da198c..e1b5e1a8 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -5,6 +5,7 @@ import ( "encoding/json" "log" "net/http" + "os" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" @@ -13,6 +14,68 @@ import ( "github.com/google/uuid" ) +// delegationResultInboxPushEnabled gates the RFC #2829 PR-2 result-push +// behavior: when callee POSTs `status=completed` (or `failed`) via +// /workspaces/:id/delegations/:delegation_id/update, ALSO write an +// `activity_type='a2a_receive'` row to the caller's activity_logs. +// +// Why a flag: the caller's inbox poller (workspace/inbox.py) queries +// `?type=a2a_receive` to surface inbound messages to the agent. Adding +// a2a_receive rows for delegation results is the universal-sized fix for +// the 600s message/send timeout class — long-running delegations no +// longer rely on the proxy holding the HTTP connection open. But it is +// observable behavior change (existing agents start seeing delegation +// results in their inbox where they didn't before), so we flag it for +// staging burn-in before flipping default. +// +// Default: off. Staging-canary first; flip to on after RFC #2829 PR-3 +// (agent-side cutover) lands and proves the round-trip end-to-end. +func delegationResultInboxPushEnabled() bool { + return os.Getenv("DELEGATION_RESULT_INBOX_PUSH") == "1" +} + +// pushDelegationResultToInbox writes the inbox-visible row for a +// completed/failed delegation. Best-effort: a failure logs but does NOT +// fail the parent UpdateStatus — the existing delegate_result row in +// activity_logs is still authoritative for the dashboard. +// +// Caller (sourceID) is the workspace that initiated the delegation; the +// inbox row lands in their activity_logs so wait_for_message picks it up. +// +// Body shape mirrors a2a_receive rows produced by the proxy on a +// successful synchronous reply: response_body.text carries the agent's +// answer, request_body.delegation_id correlates back to the originating +// row. +func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, status, responsePreview, errorDetail string) { + if !delegationResultInboxPushEnabled() { + return + } + respPayload := map[string]interface{}{ + "text": responsePreview, + "delegation_id": delegationID, + } + respJSON, _ := json.Marshal(respPayload) + reqJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": delegationID, + }) + logStatus := "ok" + if status == "failed" { + logStatus = "error" + } + summary := "Delegation result delivered" + if status == "failed" { + summary = "Delegation failed" + } + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs ( + workspace_id, activity_type, method, source_id, + summary, request_body, response_body, status, error_detail + ) VALUES ($1, 'a2a_receive', 'delegate_result', $2, $3, $4::jsonb, $5::jsonb, $6, NULLIF($7, '')) + `, sourceID, sourceID, summary, string(reqJSON), string(respJSON), logStatus, errorDetail); err != nil { + log.Printf("Delegation %s: inbox-push insert failed: %v", delegationID, err) + } +} + // Delegation status lifecycle: // pending → dispatched → received → in_progress → completed | failed // @@ -289,6 +352,8 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_FAILED", sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "error": proxyErr.Error(), }) + // RFC #2829 PR-2 result-push (see UpdateStatus for rationale). + pushDelegationResultToInbox(ctx, sourceID, delegationID, "failed", "", proxyErr.Error()) return } @@ -349,6 +414,8 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s "target_id": targetID, "response_preview": truncate(responseText, 200), }) + // RFC #2829 PR-2 result-push (see UpdateStatus for rationale). + pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", responseText, "") } // updateDelegationStatus updates the status of a delegation record in activity_logs. @@ -459,11 +526,19 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) { "delegation_id": delegationID, "response_preview": truncate(body.ResponsePreview, 200), }) + // RFC #2829 PR-2 result-push: when the gate is on, also write an + // a2a_receive row so the caller's inbox poller surfaces this to + // the agent. Foundational for getting rid of the proxy-blocked + // sync path that hits the 600s message/send timeout — once the + // agent-side cutover lands, the caller polls its own inbox for + // the result instead of holding open an HTTP connection. + pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", body.ResponsePreview, "") } else { h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_FAILED", sourceID, map[string]interface{}{ "delegation_id": delegationID, "error": body.Error, }) + pushDelegationResultToInbox(ctx, sourceID, delegationID, "failed", "", body.Error) } c.JSON(http.StatusOK, gin.H{"status": body.Status, "delegation_id": delegationID}) diff --git a/workspace-server/internal/handlers/delegation_inbox_push_test.go b/workspace-server/internal/handlers/delegation_inbox_push_test.go new file mode 100644 index 00000000..02c51190 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_inbox_push_test.go @@ -0,0 +1,246 @@ +package handlers + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// delegation_inbox_push_test.go — coverage for the RFC #2829 PR-2 +// result-push behavior. The push is feature-flagged via +// DELEGATION_RESULT_INBOX_PUSH=1; default off keeps the existing +// strict-sqlmock test surface unchanged. +// +// What we pin: +// 1. Flag off (default) → no a2a_receive INSERT fires. +// 2. Flag on, status=completed → a2a_receive row written with the +// response_preview and no error_detail. +// 3. Flag on, status=failed → a2a_receive row written with status=error +// and the error_detail set. +// 4. INSERT failure on inbox-push does NOT bubble up — UpdateStatus +// still returns 200. + +// ---------- pushDelegationResultToInbox in isolation ---------- + +func TestPushDelegationResultToInbox_FlagOff_NoSQL(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "") + + pushDelegationResultToInbox( + context.Background(), + "caller", "deleg-1", "completed", "answer body", "", + ) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("flag off must not fire SQL: %v", err) + } +} + +func TestPushDelegationResultToInbox_FlagOn_CompletedInsertsA2AReceiveRow(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1") + + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "caller-ws", + "caller-ws", // source_id mirrors workspace_id + "Delegation result delivered", + sqlmock.AnyArg(), // request_body json + sqlmock.AnyArg(), // response_body json + "ok", + "", // error_detail empty for completed + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + pushDelegationResultToInbox( + context.Background(), + "caller-ws", "deleg-1", "completed", "answer body", "", + ) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestPushDelegationResultToInbox_FlagOn_FailedInsertsErrorRow(t *testing.T) { + mock := setupTestDB(t) + t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1") + + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "caller-ws", + "caller-ws", + "Delegation failed", + sqlmock.AnyArg(), + sqlmock.AnyArg(), + "error", + "target unreachable", + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + pushDelegationResultToInbox( + context.Background(), + "caller-ws", "deleg-2", "failed", "", "target unreachable", + ) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// ---------- UpdateStatus end-to-end ---------- + +func TestUpdateStatus_FlagOn_PushesA2AReceiveOnCompleted(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1") + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // 1. updateDelegationStatus — UPDATE activity_logs SET status='completed' + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("completed", "", "ws-source", "deleg-9"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 2. existing delegate_result INSERT (caller-side dashboard view) + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-source", "ws-source", + sqlmock.AnyArg(), // summary + sqlmock.AnyArg(), // response_body + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 3. NEW: PR-2 a2a_receive row for inbox-poller + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-source", "ws-source", + "Delegation result delivered", + sqlmock.AnyArg(), + sqlmock.AnyArg(), + "ok", + "", + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{ + {Key: "id", Value: "ws-source"}, + {Key: "delegation_id", Value: "deleg-9"}, + } + body := `{"status":"completed","response_preview":"all done"}` + c.Request = httptest.NewRequest("POST", + "/workspaces/ws-source/delegations/deleg-9/update", + bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + dh.UpdateStatus(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +func TestUpdateStatus_FlagOn_PushesA2AReceiveOnFailed(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1") + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // 1. updateDelegationStatus — UPDATE activity_logs + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("failed", "boom", "ws-source", "deleg-10"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 2. NEW: PR-2 a2a_receive row for inbox-poller (failure path doesn't + // have the existing delegate_result INSERT — only the new push). + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-source", "ws-source", + "Delegation failed", + sqlmock.AnyArg(), + sqlmock.AnyArg(), + "error", + "boom", + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{ + {Key: "id", Value: "ws-source"}, + {Key: "delegation_id", Value: "deleg-10"}, + } + body := `{"status":"failed","error":"boom"}` + c.Request = httptest.NewRequest("POST", + "/workspaces/ws-source/delegations/deleg-10/update", + bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + dh.UpdateStatus(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d", w.Code) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// TestUpdateStatus_FlagOff_NoNewSQL — sanity check that the existing +// behavior is preserved when the flag is off. Critical for safe rollout. +func TestUpdateStatus_FlagOff_NoNewSQL(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + // explicitly empty — flag off + t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "") + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // Only the two pre-existing queries — no third (a2a_receive) INSERT. + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("completed", "", "ws-source", "deleg-11"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-source", "ws-source", + sqlmock.AnyArg(), + sqlmock.AnyArg(), + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{ + {Key: "id", Value: "ws-source"}, + {Key: "delegation_id", Value: "deleg-11"}, + } + c.Request = httptest.NewRequest("POST", + "/workspaces/ws-source/delegations/deleg-11/update", + bytes.NewBufferString(`{"status":"completed","response_preview":"ok"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + dh.UpdateStatus(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d", w.Code) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("flag-off must not fire extra SQL: %v", err) + } +}