From 54ee033973704a7ee15b32e3bd92da685025d0a9 Mon Sep 17 00:00:00 2001 From: core-be Date: Mon, 11 May 2026 23:03:36 -0700 Subject: [PATCH] test(handlers): migrate 4x TestExecuteDelegation tests to real-Postgres integration (mc#664 Class 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pre-migration sqlmock-based tests in delegation_test.go (4 tests + 3 helpers expectExecuteDelegationBase/Success/Failed) had been failing silently for weeks behind `Platform (Go)`'s continue-on-error: true. The root cause is structural, not a missing expectation: every new DB query the production executeDelegation path picks up imposes a fresh sqlmock-expectation tax, and the helpers fell behind on five recent additions: 1. last_outbound_at UPDATE (a2a_proxy_helpers.go logA2ASuccess) 2. lookupDeliveryMode SELECT (a2a_proxy.go poll-mode short-circuit) 3. lookupRuntime SELECT (a2a_proxy.go mock-runtime short-circuit) 4. a2a_receive INSERT into activity_logs (LogActivity goroutine) 5. recordLedgerStatus writes (delegation.go + delegation_ledger.go) Per feedback_real_subprocess_test_for_boot_path + feedback_local_must_mimic_production + feedback_mandatory_local_e2e_before_ship, the right fix isn't to patch the helpers — it's to run these tests against a real Postgres so the downstream queries fire for real and the test signal tracks production drift automatically. That eliminates the structural anti-pattern. This commit: - Deletes the 4 sqlmock TestExecuteDelegation_* tests and the 3 helpers entirely from delegation_test.go (no backward-compat shim; the helpers were the failure surface, not load-bearing). - Adds 4 TestIntegration_ExecuteDelegation_* tests in a new file delegation_executor_integration_test.go under `//go:build integration` so the existing `Handlers Postgres Integration` CI job picks them up via its `-run "^TestIntegration_"` runner. The new tests seed real workspaces + delegations rows, run executeDelegation end-to-end (including the production retry path and the ledger gate flipped on via DELEGATION_LEDGER_WRITE=1), and assert both activity_logs and delegations.status land as expected. Wall-clock: ~9s × 3 partial-body tests + 1s clean = 28s end-to-end. Within CI's 5min timeout. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../delegation_executor_integration_test.go | 407 ++++++++++++++++++ .../internal/handlers/delegation_test.go | 315 -------------- 2 files changed, 407 insertions(+), 315 deletions(-) create mode 100644 workspace-server/internal/handlers/delegation_executor_integration_test.go diff --git a/workspace-server/internal/handlers/delegation_executor_integration_test.go b/workspace-server/internal/handlers/delegation_executor_integration_test.go new file mode 100644 index 00000000..676eead9 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_executor_integration_test.go @@ -0,0 +1,407 @@ +//go:build integration +// +build integration + +// delegation_executor_integration_test.go — REAL Postgres integration tests +// for executeDelegation's delivery-confirmed proxy error regression path +// (issue #159 + mc#664 Class 1 follow-up). +// +// Background — mc#664 cascade root cause +// -------------------------------------- +// Pre-mc#664 these 4 cases lived in delegation_test.go as sqlmock-based +// unit tests, driven by 3 helpers (expectExecuteDelegationBase / +// expectExecuteDelegationSuccess / expectExecuteDelegationFailed). +// They went stale as production code added new DB queries to +// executeDelegation's downstream paths: +// +// 1. last_outbound_at UPDATE (a2a_proxy_helpers.go logA2ASuccess) +// 2. lookupDeliveryMode SELECT (a2a_proxy.go poll-mode short-circuit) +// 3. lookupRuntime SELECT (a2a_proxy.go mock-runtime short-circuit) +// 4. a2a_receive INSERT into activity_logs (LogActivity goroutine) +// 5. recordLedgerStatus writes (delegation.go + delegation_ledger.go) +// +// Each new query was a fresh sqlmock-expectation tax on the helpers, and +// the helpers fell behind. The mismatched expectations broke the 4 tests +// + their failures were masked for weeks behind `Platform (Go)`'s +// continue-on-error: true. +// +// Right fix per +// - feedback_real_subprocess_test_for_boot_path +// - feedback_local_must_mimic_production +// - feedback_mandatory_local_e2e_before_ship +// is to migrate these tests to real Postgres so the downstream queries +// run for real and the test signal tracks production drift automatically. +// That eliminates the structural anti-pattern — every new query the +// production code adds is automatically covered by these tests with no +// helper-maintenance tax. +// +// Why these tests are SLOW (~9s each for the partial-body cases) +// -------------------------------------------------------------- +// executeDelegation's retry path (delegation.go:334) waits 8 seconds +// between the first failed proxy attempt and the retry — the production +// `delegationRetryDelay` const. The pre-migration sqlmock tests appear to +// have been broken in part because they set up the listener to handle a +// SINGLE Accept; the retry then connected to a dead socket and the rest +// of the test went off-rails. The integration version uses a long-lived +// listener loop that serves the same partial-body response on every +// connection, so the retry produces the same outcome and the +// isDeliveryConfirmedSuccess gate makes a clean decision. +// +// 9s × 3 partial-body tests + ~1s for the clean path = ~28s end-to-end. +// Still well under CI's `-timeout 5m`. Local devs running `-run TestInt` +// should pass `-timeout 60s` or higher. +// +// Build tag + naming +// ------------------ +// `//go:build integration` + `TestIntegration_*` prefix so the existing +// `Handlers Postgres Integration` CI workflow picks them up via its +// `-tags=integration ... -run "^TestIntegration_"` runner. The same +// shape as delegation_ledger_integration_test.go (the file these tests +// were modelled after). +// +// Run locally: +// +// docker run --rm -d --name pg-integration \ +// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ +// -p 55432:5432 postgres:15-alpine +// sleep 4 +// # apply migrations (replays the Handlers Postgres Integration loop) +// for m in workspace-server/migrations/*.sql; do +// [[ "$m" == *.down.sql ]] && continue +// PGPASSWORD=test psql -h localhost -p 55432 -U postgres -d molecule \ +// -v ON_ERROR_STOP=1 -f "$m" >/dev/null 2>&1 || true +// done +// cd workspace-server +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration -timeout 60s ./internal/handlers/ \ +// -run TestIntegration_ExecuteDelegation -v + +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + mdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +// Real UUIDs — required because workspaces.id is UUID (not TEXT). The +// pre-migration sqlmock tests passed "ws-source-159"/"ws-target-159" +// strings, which sqlmock happily accepted but a real Postgres rejects. +const ( + integExecSourceID = "11111111-aaaa-aaaa-aaaa-000000000159" + integExecTargetID = "22222222-aaaa-aaaa-aaaa-000000000159" + integExecDelegationID = "del-integ-159-test" +) + +// seedExecuteDelegationFixtures inserts the source + target workspace rows +// and the queued delegations ledger row that executeDelegation expects to +// observe. Mirrors the pre-fix sqlmock helper's intent but in real DB +// terms. +// +// Per-test cleanup is handled by integrationDB(t) which DELETE-purges +// delegations before each test; workspaces/activity_logs are scrubbed +// here so cross-test fixture leak doesn't surface. +func seedExecuteDelegationFixtures(t *testing.T) { + t.Helper() + conn := mdb.DB + if _, err := conn.ExecContext(context.Background(), + `DELETE FROM activity_logs WHERE workspace_id IN ($1, $2)`, + integExecSourceID, integExecTargetID, + ); err != nil { + t.Fatalf("cleanup activity_logs: %v", err) + } + if _, err := conn.ExecContext(context.Background(), + `DELETE FROM workspaces WHERE id IN ($1, $2)`, + integExecSourceID, integExecTargetID, + ); err != nil { + t.Fatalf("cleanup workspaces: %v", err) + } + for _, id := range []string{integExecSourceID, integExecTargetID} { + if _, err := conn.ExecContext(context.Background(), + `INSERT INTO workspaces (id, name, status) VALUES ($1, $2, 'online')`, + id, "integ-"+id[:8], + ); err != nil { + t.Fatalf("seed workspaces %s: %v", id, err) + } + } + // Seed the queued delegation row so recordLedgerStatus's first + // SetStatus("dispatched", ...) has somewhere to transition from. + // Without this row the SetStatus is a defensive no-op (logs "row + // missing, skipping") — the rest of the executeDelegation path still + // runs, but ledger-side state is silently lost. We want it real. + recordLedgerInsert(context.Background(), + integExecSourceID, integExecTargetID, integExecDelegationID, + "integration-test task", "") +} + +// startPartialBodyServer spins up a raw TCP listener that responds to +// every connection with the given HTTP response prefix (headers + start +// of body) and then closes the connection. Go's http.Client sees io.EOF +// when reading the body. Returns the URL + a stop func. +// +// Unlike httptest.NewServer this serves repeat connections — necessary +// because executeDelegation's #74 retry path will reconnect once. +func startPartialBodyServer(t *testing.T, responseHead string) (url string, stop func()) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + var done int32 + go func() { + for atomic.LoadInt32(&done) == 0 { + conn, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 2048) + _, _ = c.Read(buf) + _, _ = c.Write([]byte(responseHead)) + // Close immediately — client sees EOF mid body-read. + }(conn) + } + }() + return "http://" + ln.Addr().String(), func() { + atomic.StoreInt32(&done, 1) + ln.Close() + } +} + +// activityRowsByStatus counts activity_logs rows that match the given +// (workspace_id, status) pair. Used to assert executeDelegation's +// INSERT INTO activity_logs landed (success path: status='completed'; +// failure path: status='failed' or 'queued' depending on branch). +func activityRowsByStatus(t *testing.T, workspaceID, status string) int { + t.Helper() + var n int + if err := mdb.DB.QueryRowContext(context.Background(), + `SELECT count(*) FROM activity_logs WHERE workspace_id = $1 AND status = $2`, + workspaceID, status, + ).Scan(&n); err != nil { + t.Fatalf("activity count(%s, %s): %v", workspaceID, status, err) + } + return n +} + +// delegationLedgerStatus returns the current delegations.status for the +// seeded delegation_id, or "" if the row is missing. Real-Postgres +// version of "did the ledger transition we expected actually land". +func delegationLedgerStatus(t *testing.T, delegationID string) string { + t.Helper() + var s string + err := mdb.DB.QueryRowContext(context.Background(), + `SELECT status FROM delegations WHERE delegation_id = $1`, delegationID, + ).Scan(&s) + if err != nil { + t.Fatalf("ledger status(%s): %v", delegationID, err) + } + return s +} + +// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess +// is the primary regression test for issue #159 in real-Postgres form. +// Scenario: target sends a 200 response with declared Content-Length but +// closes the connection mid-body; client gets io.EOF on body read. +// proxyA2ARequest captures status=200 + partial body + transport error; +// executeDelegation's isDeliveryConfirmedSuccess branch must route to +// handleSuccess so the row lands as 'completed' (not 'failed'). +// +// Real-Postgres advantage over the sqlmock version: this test will fail +// if a future refactor adds a new DB write to the success path without +// updating any helper — sqlmock would have required reflexive expectation +// updates; real Postgres just runs. +// +// Timing: executeDelegation's first attempt returns (200, , EOF +// → BadGateway-class err). isTransientProxyError(BadGateway)=true so the +// caller sleeps `delegationRetryDelay` (8s) and retries. Our listener +// loop serves the same partial response on attempt 2, producing the +// same (200, , BadGateway) triple. isDeliveryConfirmedSuccess +// then fires (status=200 ∈ [200,300) + body > 0 + err != nil) → success. +func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { + integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + seedExecuteDelegationFixtures(t) + + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + // 200 OK with declared Content-Length=100 but only 74 bytes of body. + // Connection closes after the partial body → client io.EOF. + resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" + resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes + agentURL, stop := startPartialBodyServer(t, resp) + defer stop() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) + + // executeDelegation is synchronous here; the 8s retry sleep is INSIDE + // the call. We still need a small buffer for the async logA2ASuccess / + // last_outbound_at goroutines that fan out after the success branch. + time.Sleep(500 * time.Millisecond) + + // Assert the executeDelegation success path wrote the activity_logs + // completion row + transitioned the ledger to completed. + if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 { + t.Errorf("expected 1 'completed' activity_logs row, got %d", got) + } + if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" { + t.Errorf("delegation ledger: want status=completed, got %q", s) + } +} + +// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed — +// 500 with partial body + connection drop. The retry produces the same +// 500 partial. isDeliveryConfirmedSuccess fails on status>=300 → falls +// through to the failure branch. Pins that the new condition didn't +// accidentally widen the success branch. +func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { + integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + seedExecuteDelegationFixtures(t) + + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" + resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared 100 + agentURL, stop := startPartialBodyServer(t, resp) + defer stop() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) + + time.Sleep(500 * time.Millisecond) + + if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 { + t.Errorf("expected 1 'failed' activity_logs row, got %d", got) + } + if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" { + t.Errorf("delegation ledger: want status=failed, got %q", s) + } +} + +// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed — +// 502 Bad Gateway with empty body, normal close. proxyA2ARequest returns +// (502, "", error). isDeliveryConfirmedSuccess requires len(respBody) > 0 +// → false → falls through to the failure branch. isTransientProxyError +// (BadGateway) = true so we get a retry that also fails, then 'failed'. +func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { + integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + seedExecuteDelegationFixtures(t) + + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) + + time.Sleep(500 * time.Millisecond) + + if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 { + t.Errorf("expected 1 'failed' activity_logs row, got %d", got) + } + if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" { + t.Errorf("delegation ledger: want status=failed, got %q", s) + } +} + +// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged — +// baseline: clean 200 with full body, no error. proxyErr == nil so +// isDeliveryConfirmedSuccess never fires and no retry runs (fast path). +// Pins that the new error-recovery branch didn't regress the most +// common code path. +func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { + integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + seedExecuteDelegationFixtures(t) + + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) + + time.Sleep(500 * time.Millisecond) + + if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 { + t.Errorf("expected 1 'completed' activity_logs row, got %d", got) + } + if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" { + t.Errorf("delegation ledger: want status=completed, got %q", s) + } +} diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index b2d1c93a..0d0b58fe 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -5,10 +5,8 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "net/http/httptest" - "sync" "testing" "time" @@ -958,316 +956,3 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) { t.Errorf("insertOutcomeUnknown must not collide with insertOK") } } - -// ==================== executeDelegation — delivery-confirmed proxy error regression tests ==================== -// -// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a -// non-empty response body with a 2xx status code, executeDelegation must treat it as success. -// The error is a delivery/transport error (e.g., connection reset after response was received). -// Previously, executeDelegation marked these as "failed" even though the work was done, -// causing retry storms and "error" rendering in canvas despite the response being available. -// -// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call -// executeDelegation directly, and verify the activity_logs status and delegation status. - -const testDelegationID = "del-159-test" -const testSourceID = "ws-source-159" -const testTargetID = "ws-target-159" - -// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that -// executeDelegation always makes, regardless of outcome. -func expectExecuteDelegationBase(mock sqlmock.Sqlmock) { - // updateDelegationStatus: dispatched - // Uses prefix match — sqlmock regexes match the full query string. - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("dispatched", "", testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // CanCommunicate: source != target → fires two getWorkspaceRef lookups. - // Both test fixtures have parent_id = NULL (root-level siblings) → allowed. - // Order matches call order: source first, then target. - mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id"). - WithArgs(testSourceID). - WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testSourceID, nil)) - mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id"). - WithArgs(testTargetID). - WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testTargetID, nil)) - - // resolveAgentURL: reads ws:{id}:url from Redis, falls back to DB for target - mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id = "). - WithArgs(testTargetID). - WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "online")) -} - -// expectExecuteDelegationSuccess sets up expectations for a completed delegation. -func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) { - // INSERT activity_logs for delegation completion (response_body status = 'completed') - mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "completed"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // updateDelegationStatus: completed - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("completed", "", testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) -} - -// expectExecuteDelegationFailed sets up expectations for a failed delegation. -func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) { - // INSERT activity_logs for delegation failure (response_body status = 'failed') - mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "failed"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // updateDelegationStatus: failed - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) -} - -// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression -// test for issue #159. The scenario: -// - Attempt 1: server sends 200 OK headers + partial body, then closes connection. -// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, , BadGateway). -// isTransientProxyError(BadGateway) = TRUE → retry. -// - Attempt 2: server does the same thing (closes after partial body). -// proxyA2ARequest: same (200, , BadGateway). -// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon, -// or we get one more attempt). For the test we let it run. -// POST-FIX: the executeDelegation new condition sees status=200, body=, err!=nil -// and routes to handleSuccess immediately. -// -// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded) -// even when the server sent 200, so the condition always failed. Post-fix, status=200 is -// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody). -// In this test the retry ultimately succeeds (server eventually sends full body), but -// the critical assertion is that a 2xx partial-body delivery-confirmed response is never -// classified as "failed" — it always routes to success. -func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - // Server that sends a 200 response with declared Content-Length but closes - // the connection before sending all bytes. Go's http.Client sees io.EOF on - // the body read. proxyA2ARequest captures the partial body + status=200 and - // returns (200, , error). executeDelegation's new condition sees - // status=200 + body > 0 + error != nil → routes to handleSuccess. - var wg sync.WaitGroup - wg.Add(1) - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } - defer ln.Close() - go func() { - defer wg.Done() - conn, err := ln.Accept() - if err != nil { - return - } - defer conn.Close() - // Consume the HTTP request - buf := make([]byte, 2048) - conn.Read(buf) - // Send 200 OK with Content-Length: 100 but only 74 bytes of body - // (less than declared length → io.LimitReader returns io.EOF after reading all 74) - resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" - resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes - conn.Write([]byte(resp)) - // Close immediately — client gets io.EOF on body read - }() - - agentURL := "http://" + ln.Addr().String() - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL) - allowLoopbackForTest(t) - - expectExecuteDelegationBase(mock) - expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) - - // Execute synchronously (not as a goroutine) so we can check DB state immediately. - // The handler fires it as goroutine; we call it directly for deterministic testing. - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", - "id": "1", - "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) // let DB writes settle - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -} - -// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure -// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx -// status code (e.g., 500 Internal Server Error with partial body read before connection drop). -// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure. -func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - // Server returns 500 with declared Content-Length but closes connection early. - // proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error. - // Returns (500, , BadGateway). - // New condition: status=500 is NOT >= 200 && < 300 → routes to failure. - // isTransientProxyError(500) = false → no retry. - var wg sync.WaitGroup - wg.Add(1) - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } - defer ln.Close() - go func() { - defer wg.Done() - conn, err := ln.Accept() - if err != nil { - return - } - defer conn.Close() - buf := make([]byte, 2048) - conn.Read(buf) - // 500 with Content-Length: 100 but only ~60 bytes of body - resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" - resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared - conn.Write([]byte(resp)) - // Close immediately — client gets io.EOF on body read - }() - - agentURL := "http://" + ln.Addr().String() - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL) - allowLoopbackForTest(t) - - expectExecuteDelegationBase(mock) - expectExecuteDelegationFailed(mock) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -} - -// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure -// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body. -// The new condition requires len(respBody) > 0, so empty body routes to failure. -func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - // Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil. - // New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300 - // → len(respBody) == 0 → condition FALSE → falls through to failure. - // isTransientProxyError(502) is TRUE → retry → same result → failure. - agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadGateway) - // No body — connection closes normally - })) - defer agentServer.Close() - - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL) - allowLoopbackForTest(t) - - // First attempt: updateDelegationStatus(dispatched) — from expectExecuteDelegationBase - expectExecuteDelegationBase(mock) - // Second attempt (retry): updateDelegationStatus(dispatched) again - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("dispatched", "", testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) - // Failure: INSERT + UPDATE (failed) - expectExecuteDelegationFailed(mock) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -} - -// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response -// (no error, 200 with body) is unaffected by the new condition. This is the baseline: -// proxyErr == nil so the new condition never fires. -func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) - })) - defer agentServer.Close() - - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL) - allowLoopbackForTest(t) - - expectExecuteDelegationBase(mock) - expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -}