diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 6143982e..97296d4f 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -490,7 +490,14 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri if logActivity && deliveryConfirmed { h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) } - return 0, nil, &proxyA2AError{ + // Preserve the actual HTTP status code and any body bytes already read. + // Previously this returned (0, nil, error) which discarded both. + // Preserving them allows executeDelegation's new condition + // proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300 + // to correctly route delivery-confirmed responses (where the agent completed + // the work but the TCP connection dropped before the full body was received) + // to success instead of failure (#159). + return resp.StatusCode, respBody, &proxyA2AError{ Status: http.StatusBadGateway, Response: gin.H{ "error": "failed to read agent response", diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index 0156f864..6761ec7e 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -348,7 +348,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s // received). Treat as success: the response body is valid and the work is done. // This prevents "retry storms" where the canvas sees error + Restart-workspace // suggestion even though the delegation actually completed. - if proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300 { + if isDeliveryConfirmedSuccess(proxyErr, status, respBody) { log.Printf("Delegation %s: completed with delivery error (status=%d, respBody=%d bytes, proxyErr=%v) — treating as success", delegationID, status, len(respBody), proxyErr.Error()) goto handleSuccess @@ -685,6 +685,34 @@ func isTransientProxyError(err *proxyA2AError) bool { return false } +// isDeliveryConfirmedSuccess reports whether the proxy's `(status, body, err)` +// triple represents a delivery-confirmed success: the proxy hit a transport- +// layer error AFTER receiving a complete 2xx response with a non-empty body. +// In that case the agent did the work — the error is on the wire, not in the +// agent — so the delegation should be marked succeeded rather than failed +// (preventing the retry-storm + restart-suggest cascade described in #159). +// +// Caller invariants: +// - proxyErr != nil: a delivery error fired (e.g. connection reset). +// - len(respBody) > 0: a response body was received before the error. +// - 200 <= status < 300: the partial response carried a 2xx code. +// +// All three must hold. nil proxyErr → no decision to make (success path +// already chosen upstream). Empty body → no work-result to recover. Non-2xx → +// the agent itself signalled failure or transient state; don't promote it. +func isDeliveryConfirmedSuccess(proxyErr *proxyA2AError, status int, respBody []byte) bool { + if proxyErr == nil { + return false + } + if len(respBody) == 0 { + return false + } + if status < 200 || status >= 300 { + return false + } + return true +} + // isQueuedProxyResponse reports whether the proxy returned a body shaped like // `{"queued": true, "queue_id": ..., "queue_depth": ..., "message": ...}` — // the busy-target enqueue path in a2a_proxy_helpers.go. Caller checks this diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index 21cc3a90..427e71b2 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -5,8 +5,10 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -376,6 +378,44 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) { } } +// TestIsDeliveryConfirmedSuccess — regression guard for #159: the proxy can +// return a complete 2xx body and THEN raise a transport error (e.g. the TCP +// connection drops after the response is received but before close). In that +// case the agent did the work; marking the delegation "failed" causes the +// retry-storm + Restart-workspace cascade described in #159. The new helper +// distinguishes this from genuine failures. +func TestIsDeliveryConfirmedSuccess(t *testing.T) { + connErr := &proxyA2AError{Status: http.StatusOK, Response: gin.H{}} + cases := []struct { + name string + proxyErr *proxyA2AError + status int + body []byte + expect bool + }{ + // The new branch: 2xx + body + transport error → recover as success. + {"200 + body + connreset (THE bug fix path)", connErr, http.StatusOK, []byte(`{"text":"ok"}`), true}, + {"299 + body + connreset (boundary high)", connErr, 299, []byte(`{"text":"ok"}`), true}, + {"200 + body + connreset (boundary low)", connErr, 200, []byte(`{"x":1}`), true}, + // Negative cases: any one of the three preconditions failing → false. + {"nil proxyErr (no decision to make)", nil, http.StatusOK, []byte(`{"text":"ok"}`), false}, + {"empty body (no work-result to recover)", connErr, http.StatusOK, []byte{}, false}, + {"nil body (no work-result to recover)", connErr, http.StatusOK, nil, false}, + {"4xx with body — agent signalled failure, do not promote", connErr, http.StatusBadRequest, []byte(`{"err":"bad"}`), false}, + {"5xx with body — agent signalled failure, do not promote", connErr, http.StatusInternalServerError, []byte(`{"err":"crash"}`), false}, + {"3xx with body — redirect, not a result", connErr, 301, []byte(`{"loc":"/x"}`), false}, + {"199 status (under 200) — not a 2xx", connErr, 199, []byte(`{"x":1}`), false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isDeliveryConfirmedSuccess(tc.proxyErr, tc.status, tc.body); got != tc.expect { + t.Errorf("isDeliveryConfirmedSuccess(%v, %d, %q) = %v, want %v", + tc.proxyErr, tc.status, string(tc.body), got, tc.expect) + } + }) + } +} + func TestIsQueuedProxyResponse(t *testing.T) { // Regression guard for the chat-leak bug: when the proxy returns // 202 with a queued-shape body, executeDelegation must classify it @@ -918,3 +958,308 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) { t.Errorf("insertOutcomeUnknown must not collide with insertOK") } } + +// ==================== executeDelegation — delivery-confirmed proxy error regression tests ==================== +// +// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a +// non-empty response body with a 2xx status code, executeDelegation must treat it as success. +// The error is a delivery/transport error (e.g., connection reset after response was received). +// Previously, executeDelegation marked these as "failed" even though the work was done, +// causing retry storms and "error" rendering in canvas despite the response being available. +// +// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call +// executeDelegation directly, and verify the activity_logs status and delegation status. + +const testDelegationID = "del-159-test" +const testSourceID = "ws-source-159" +const testTargetID = "ws-target-159" + +// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that +// executeDelegation always makes, regardless of outcome. +func expectExecuteDelegationBase(mock sqlmock.Sqlmock) { + // updateDelegationStatus: dispatched + // Uses prefix match — sqlmock regexes match the full query string. + mock.ExpectExec("UPDATE activity_logs SET status"). + WithArgs("dispatched", "", testSourceID, testDelegationID). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // CanCommunicate (source=target self-call is always allowed — no DB lookup needed) + // resolveAgentURL: reads ws:{id}:url from Redis, falls back to DB for target + mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id = "). + WithArgs(testTargetID). + WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "online")) +} + +// expectExecuteDelegationSuccess sets up expectations for a completed delegation. +func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) { + // INSERT activity_logs for delegation completion (response_body status = 'completed') + mock.ExpectExec("INSERT INTO activity_logs"). + WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "completed"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // updateDelegationStatus: completed + mock.ExpectExec("UPDATE activity_logs SET status"). + WithArgs("completed", "", testSourceID, testDelegationID). + WillReturnResult(sqlmock.NewResult(0, 1)) +} + +// expectExecuteDelegationFailed sets up expectations for a failed delegation. +func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) { + // INSERT activity_logs for delegation failure (response_body status = 'failed') + mock.ExpectExec("INSERT INTO activity_logs"). + WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "failed"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // updateDelegationStatus: failed + mock.ExpectExec("UPDATE activity_logs SET status"). + WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID). + WillReturnResult(sqlmock.NewResult(0, 1)) +} + +// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression +// test for issue #159. The scenario: +// - Attempt 1: server sends 200 OK headers + partial body, then closes connection. +// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, , BadGateway). +// isTransientProxyError(BadGateway) = TRUE → retry. +// - Attempt 2: server does the same thing (closes after partial body). +// proxyA2ARequest: same (200, , BadGateway). +// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon, +// or we get one more attempt). For the test we let it run. +// POST-FIX: the executeDelegation new condition sees status=200, body=, err!=nil +// and routes to handleSuccess immediately. +// +// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded) +// even when the server sent 200, so the condition always failed. Post-fix, status=200 is +// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody). +// In this test the retry ultimately succeeds (server eventually sends full body), but +// the critical assertion is that a 2xx partial-body delivery-confirmed response is never +// classified as "failed" — it always routes to success. +func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // Server that sends a 200 response with declared Content-Length but closes + // the connection before sending all bytes. Go's http.Client sees io.EOF on + // the body read. proxyA2ARequest captures the partial body + status=200 and + // returns (200, , error). executeDelegation's new condition sees + // status=200 + body > 0 + error != nil → routes to handleSuccess. + var wg sync.WaitGroup + wg.Add(1) + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer ln.Close() + go func() { + defer wg.Done() + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + // Consume the HTTP request + buf := make([]byte, 2048) + conn.Read(buf) + // Send 200 OK with Content-Length: 100 but only 74 bytes of body + // (less than declared length → io.LimitReader returns io.EOF after reading all 74) + resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" + resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes + conn.Write([]byte(resp)) + // Close immediately — client gets io.EOF on body read + }() + + agentURL := "http://" + ln.Addr().String() + mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL) + allowLoopbackForTest(t) + + expectExecuteDelegationBase(mock) + expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) + + // Execute synchronously (not as a goroutine) so we can check DB state immediately. + // The handler fires it as goroutine; we call it directly for deterministic testing. + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", + "id": "1", + "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + + time.Sleep(100 * time.Millisecond) // let DB writes settle + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure +// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx +// status code (e.g., 500 Internal Server Error with partial body read before connection drop). +// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure. +func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // Server returns 500 with declared Content-Length but closes connection early. + // proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error. + // Returns (500, , BadGateway). + // New condition: status=500 is NOT >= 200 && < 300 → routes to failure. + // isTransientProxyError(500) = false → no retry. + var wg sync.WaitGroup + wg.Add(1) + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer ln.Close() + go func() { + defer wg.Done() + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + buf := make([]byte, 2048) + conn.Read(buf) + // 500 with Content-Length: 100 but only ~60 bytes of body + resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" + resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared + conn.Write([]byte(resp)) + // Close immediately — client gets io.EOF on body read + }() + + agentURL := "http://" + ln.Addr().String() + mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL) + allowLoopbackForTest(t) + + expectExecuteDelegationBase(mock) + expectExecuteDelegationFailed(mock) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + + time.Sleep(100 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure +// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body. +// The new condition requires len(respBody) > 0, so empty body routes to failure. +func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil. + // New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300 + // → len(respBody) == 0 → condition FALSE → falls through to failure. + // isTransientProxyError(502) is TRUE → retry → same result → failure. + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + // No body — connection closes normally + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL) + allowLoopbackForTest(t) + + // First attempt: updateDelegationStatus(dispatched) — from expectExecuteDelegationBase + expectExecuteDelegationBase(mock) + // Second attempt (retry): updateDelegationStatus(dispatched) again + mock.ExpectExec("UPDATE activity_logs SET status"). + WithArgs("dispatched", "", testSourceID, testDelegationID). + WillReturnResult(sqlmock.NewResult(0, 1)) + // Failure: INSERT + UPDATE (failed) + expectExecuteDelegationFailed(mock) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + + time.Sleep(100 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response +// (no error, 200 with body) is unaffected by the new condition. This is the baseline: +// proxyErr == nil so the new condition never fires. +func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL) + allowLoopbackForTest(t) + + expectExecuteDelegationBase(mock) + expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + + time.Sleep(100 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} +}