From 4c43f54f0c8f7bb1c92b209fe0ff5151dfe30a68 Mon Sep 17 00:00:00 2001 From: Molecule AI Fullstack Engineer Date: Mon, 11 May 2026 13:37:08 +0000 Subject: [PATCH] fix(#376): store proxy-path delegation results in activity_logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a workspace delegates a task via POST /workspaces/:id/a2a, the proxy records the response via logA2ASuccess which writes activity_type='a2a_receive'. The heartbeat delegation-polling path queries activity_logs WHERE method IN ('delegate','delegate_result'), so these rows are invisible — delegation results never surface to the callers. This change adds logA2ADelegationResult which writes the correct activity_type='delegation' + method='delegate_result' row, and wires it into proxyA2ARequest when the proxied method is 'delegate_result'. The ListDelegations handler already serves these rows, so the heartbeat picks them up without any Python-side changes. Co-Authored-By: Claude Opus 4.7 --- .../internal/handlers/a2a_proxy.go | 7 + .../internal/handlers/a2a_proxy_helpers.go | 87 ++++++++++++ .../internal/handlers/a2a_proxy_test.go | 125 ++++++++++++++++++ 3 files changed, 219 insertions(+) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 816d5c81..11106061 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -512,6 +512,13 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri if logActivity { h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + // Fix #376: when the proxied method is 'delegate_result', also write + // the delegation row so heartbeat delegation polling can find it. + // Without this, proxy-path delegation results are invisible to + // ListDelegations / heartbeat delegation polling. + if a2aMethod == "delegate_result" { + h.logA2ADelegationResult(ctx, workspaceID, callerID, body, respBody, resp.StatusCode) + } } // Track LLM token usage for cost transparency (#593). diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index ded26ec5..c2cb5ab3 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -336,6 +336,93 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle } } +// logA2ADelegationResult records a delegation result into activity_logs +// with method='delegate_result' and activity_type='delegation' so that +// ListDelegations (and therefore the heartbeat delegation-polling path) +// can surface it to the caller. +// +// This bridges the gap for proxy-path delegations: when a workspace +// sends a delegate_task via POST /workspaces/:id/a2a, the proxy stores +// the response here with the correct method so heartbeat polling finds it. +// (The non-proxy path via executeDelegation already writes correctly via +// its own INSERT at delegation.go:422.) +// +// Fire-and-forget: runs in a goroutine so it never adds latency to the +// critical A2A response path. Errors are logged but non-fatal. +func (h *WorkspaceHandler) logA2ADelegationResult(ctx context.Context, callerID, targetID string, reqBody, respBody []byte, statusCode int) { + // Extract delegation_id from the request body (JSON-RPC delegate_result). + var req struct { + Params struct { + Data struct { + DelegationID string `json:"delegation_id"` + } `json:"data"` + } `json:"params"` + } + if err := json.Unmarshal(reqBody, &req); err != nil { + log.Printf("logA2ADelegationResult: failed to parse req body: %v", err) + return + } + delegationID := req.Params.Data.DelegationID + if delegationID == "" { + log.Printf("logA2ADelegationResult: no delegation_id in request body") + return + } + + // Extract text from the response body — the delegate_result response + // carries the agent's answer in result.data.text or result.text. + var responseText string + var respTop map[string]json.RawMessage + if json.Unmarshal(respBody, &respTop) == nil { + if result, ok := respTop["result"]; ok { + var resultObj map[string]json.RawMessage + if json.Unmarshal(result, &resultObj) == nil { + if textRaw, ok := resultObj["text"]; ok { + json.Unmarshal(textRaw, &responseText) + } else if dataRaw, ok := resultObj["data"]; ok { + var dataObj map[string]json.RawMessage + if json.Unmarshal(dataRaw, &dataObj) == nil { + if textRaw, ok := dataObj["text"]; ok { + json.Unmarshal(textRaw, &responseText) + } + } + } + } + } + if responseText == "" { + if textRaw, ok := respTop["text"]; ok { + json.Unmarshal(textRaw, &responseText) + } + } + } + + status := "completed" + if statusCode >= 300 { + status = "failed" + } + + summary := "Delegation completed" + if status == "failed" { + summary = "Delegation failed" + } + + go func(parent context.Context) { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + defer cancel() + respJSON, _ := json.Marshal(map[string]interface{}{ + "text": responseText, + "delegation_id": delegationID, + }) + if _, err := db.DB.ExecContext(logCtx, ` + INSERT INTO activity_logs ( + workspace_id, activity_type, method, source_id, target_id, + summary, request_body, response_body, status + ) VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, $6::jsonb, $7) + `, callerID, callerID, targetID, summary, string(reqBody), string(respJSON), status); err != nil { + log.Printf("logA2ADelegationResult: INSERT failed for delegation %s: %v", delegationID, err) + } + }(ctx) +} + func nilIfEmpty(s string) *string { if s == "" { return nil diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 7fa22dac..35610984 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2017,6 +2017,131 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { time.Sleep(80 * time.Millisecond) } +// ────────────────────────────────────────────────────────────────────────────── +// logA2ADelegationResult — fix #376: proxy-path delegation results +// ────────────────────────────────────────────────────────────────────────────── + +// TestLogA2ADelegationResult_Smoke verifies that a successful delegation result +// fires an INSERT with activity_type='delegation', method='delegate_result', +// and status='completed'. The response text is extracted from result.data.text. +func TestLogA2ADelegationResult_Smoke(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // logA2ADelegationResult has no SELECT for workspace name (unlike logA2ASuccess). + // It fires the INSERT directly in a goroutine. + mock.ExpectExec(`^INSERT INTO activity_logs`). + WithArgs( + "ws-caller", // workspace_id ($1) + "ws-caller", // source_id ($2) + "ws-target", // target_id ($3) + "Delegation completed", // summary ($4) + sqlmock.AnyArg(), // request_body ($5) + sqlmock.AnyArg(), // response_body ($6) + "completed", // status ($7) + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ADelegationResult( + context.Background(), + "ws-caller", "ws-target", + []byte(`{"method":"delegate_task","params":{"data":{"delegation_id":"del-abc123"}}}`), + []byte(`{"jsonrpc":"2.0","id":"1","result":{"data":{"text":"the answer"}}}`), + 200, + ) + time.Sleep(80 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestLogA2ADelegationResult_FailedStatus verifies that a 4xx/5xx response +// from the target is recorded with status='failed' and summary='Delegation failed'. +func TestLogA2ADelegationResult_FailedStatus(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectExec(`^INSERT INTO activity_logs`). + WithArgs( + "ws-a", "ws-a", "ws-b", + "Delegation failed", + sqlmock.AnyArg(), + sqlmock.AnyArg(), + "failed", + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ADelegationResult( + context.Background(), + "ws-a", "ws-b", + []byte(`{"method":"delegate_task","params":{"data":{"delegation_id":"del-xyz"}}}`), + []byte(`{"jsonrpc":"2.0","id":"2","error":{"code":-32600,"message":"bad request"}}`), + 400, + ) + time.Sleep(80 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestLogA2ADelegationResult_NoDelegationID skips the INSERT when the +// request body carries no delegation_id (logically impossible but defensive). +func TestLogA2ADelegationResult_NoDelegationID(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // No ExpectExec — the function must return early without any DB write. + + handler.logA2ADelegationResult( + context.Background(), + "ws-x", "ws-y", + []byte(`{"method":"delegate_task","params":{"data":{}}}`), + []byte(`{}`), + 200, + ) + time.Sleep(80 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unexpected DB call: %v", err) + } +} + +// TestLogA2ADelegationResult_TextFromResultText verifies that when the +// response text lives at result.text (flat JSON-RPC), it is still captured. +func TestLogA2ADelegationResult_TextFromResultText(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectExec(`^INSERT INTO activity_logs`). + WithArgs( + "ws-1", "ws-1", "ws-2", + "Delegation completed", + sqlmock.AnyArg(), + sqlmock.AnyArg(), + "completed", + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ADelegationResult( + context.Background(), + "ws-1", "ws-2", + []byte(`{"method":"delegate_task","params":{"data":{"delegation_id":"del-flat"}}}`), + []byte(`{"jsonrpc":"2.0","id":"3","result":{"text":"flat response"}}`), + 200, + ) + time.Sleep(80 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + // ────────────────────────────────────────────────────────────────────────────── // A2A auto-wake: hibernated workspace (#711) // ──────────────────────────────────────────────────────────────────────────────