diff --git a/workspace-server/internal/handlers/delegation_executor_integration_test.go b/workspace-server/internal/handlers/delegation_executor_integration_test.go index 43625d4a..676eead9 100644 --- a/workspace-server/internal/handlers/delegation_executor_integration_test.go +++ b/workspace-server/internal/handlers/delegation_executor_integration_test.go @@ -1,298 +1,253 @@ //go:build integration // +build integration -// delegation_executor_integration_test.go — REAL Postgres integration tests for -// executeDelegation HTTP proxy edge cases that sqlmock cannot cover. +// delegation_executor_integration_test.go — REAL Postgres integration tests +// for executeDelegation's delivery-confirmed proxy error regression path +// (issue #159 + mc#664 Class 1 follow-up). // -// The sqlmock tests in delegation_test.go pin which SQL statements fire but -// cannot detect bugs that depend on the row state AFTER the SQL runs. The -// result_preview-lost bug shipped to staging in PR #2854 because sqlmock tests -// were satisfied with "an UPDATE fired" — none verified the row's preview -// field actually landed. These integration tests close that gap. +// Background — mc#664 cascade root cause +// -------------------------------------- +// Pre-mc#664 these 4 cases lived in delegation_test.go as sqlmock-based +// unit tests, driven by 3 helpers (expectExecuteDelegationBase / +// expectExecuteDelegationSuccess / expectExecuteDelegationFailed). +// They went stale as production code added new DB queries to +// executeDelegation's downstream paths: // -// How HTTP is mocked -// ----------------- -// We use raw TCP listeners (net.Listener) instead of httptest.Server to avoid -// any HTTP-library-level goroutine complexity. The test opens a TCP port, -// serves one HTTP response, then closes the connection. The a2aClient transport -// is overridden with a DialContext that intercepts all dials and redirects to -// the test server's port. No DNS, no TCP handshake overhead, no HTTP library -// goroutines that could block on request-body reads. +// 1. last_outbound_at UPDATE (a2a_proxy_helpers.go logA2ASuccess) +// 2. lookupDeliveryMode SELECT (a2a_proxy.go poll-mode short-circuit) +// 3. lookupRuntime SELECT (a2a_proxy.go mock-runtime short-circuit) +// 4. a2a_receive INSERT into activity_logs (LogActivity goroutine) +// 5. recordLedgerStatus writes (delegation.go + delegation_ledger.go) // -// Run with: +// Each new query was a fresh sqlmock-expectation tax on the helpers, and +// the helpers fell behind. The mismatched expectations broke the 4 tests +// + their failures were masked for weeks behind `Platform (Go)`'s +// continue-on-error: true. +// +// Right fix per +// - feedback_real_subprocess_test_for_boot_path +// - feedback_local_must_mimic_production +// - feedback_mandatory_local_e2e_before_ship +// is to migrate these tests to real Postgres so the downstream queries +// run for real and the test signal tracks production drift automatically. +// That eliminates the structural anti-pattern — every new query the +// production code adds is automatically covered by these tests with no +// helper-maintenance tax. +// +// Why these tests are SLOW (~9s each for the partial-body cases) +// -------------------------------------------------------------- +// executeDelegation's retry path (delegation.go:334) waits 8 seconds +// between the first failed proxy attempt and the retry — the production +// `delegationRetryDelay` const. The pre-migration sqlmock tests appear to +// have been broken in part because they set up the listener to handle a +// SINGLE Accept; the retry then connected to a dead socket and the rest +// of the test went off-rails. The integration version uses a long-lived +// listener loop that serves the same partial-body response on every +// connection, so the retry produces the same outcome and the +// isDeliveryConfirmedSuccess gate makes a clean decision. +// +// 9s × 3 partial-body tests + ~1s for the clean path = ~28s end-to-end. +// Still well under CI's `-timeout 5m`. Local devs running `-run TestInt` +// should pass `-timeout 60s` or higher. +// +// Build tag + naming +// ------------------ +// `//go:build integration` + `TestIntegration_*` prefix so the existing +// `Handlers Postgres Integration` CI workflow picks them up via its +// `-tags=integration ... -run "^TestIntegration_"` runner. The same +// shape as delegation_ledger_integration_test.go (the file these tests +// were modelled after). +// +// Run locally: // // docker run --rm -d --name pg-integration \ // -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ // -p 55432:5432 postgres:15-alpine // sleep 4 -// psql ... < workspace-server/migrations/049_delegations.up.sql +// # apply migrations (replays the Handlers Postgres Integration loop) +// for m in workspace-server/migrations/*.sql; do +// [[ "$m" == *.down.sql ]] && continue +// PGPASSWORD=test psql -h localhost -p 55432 -U postgres -d molecule \ +// -v ON_ERROR_STOP=1 -f "$m" >/dev/null 2>&1 || true +// done // cd workspace-server // INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ -// go test -tags=integration ./internal/handlers/ -run Integration_ExecuteDelegation -// -// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on -// every PR that touches workspace-server/internal/handlers/**. +// go test -tags=integration -timeout 60s ./internal/handlers/ \ +// -run TestIntegration_ExecuteDelegation -v package handlers import ( "context" - "database/sql" "encoding/json" + "fmt" "net" "net/http" - "runtime" - "strconv" + "net/http/httptest" + "sync/atomic" "testing" "time" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + mdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" ) -// integrationDB is imported from delegation_ledger_integration_test.go. -// Each test gets a fresh table state. +// Real UUIDs — required because workspaces.id is UUID (not TEXT). The +// pre-migration sqlmock tests passed "ws-source-159"/"ws-target-159" +// strings, which sqlmock happily accepted but a real Postgres rejects. +const ( + integExecSourceID = "11111111-aaaa-aaaa-aaaa-000000000159" + integExecTargetID = "22222222-aaaa-aaaa-aaaa-000000000159" + integExecDelegationID = "del-integ-159-test" +) -const testDelegationID = "del-159-test-integration" -const testSourceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" -const testTargetID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" - -// rawHTTPServer starts a TCP listener, serves one HTTP response, and closes. -// It runs in a background goroutine so the test can proceed immediately after -// returning the server URL. The server URL (e.g. "http://127.0.0.1:/") -// is suitable for caching in Redis and passing to executeDelegation. +// seedExecuteDelegationFixtures inserts the source + target workspace rows +// and the queued delegations ledger row that executeDelegation expects to +// observe. Mirrors the pre-fix sqlmock helper's intent but in real DB +// terms. // -// The server reads HTTP headers using a deadline, then immediately sends the -// response. This prevents the classic TCP deadlock: server blocked reading -// body while client blocked waiting for response. -func rawHTTPServer(t *testing.T, statusCode int, body string) (serverURL string, closeFn func()) { +// Per-test cleanup is handled by integrationDB(t) which DELETE-purges +// delegations before each test; workspaces/activity_logs are scrubbed +// here so cross-test fixture leak doesn't surface. +func seedExecuteDelegationFixtures(t *testing.T) { t.Helper() - // Use ListenTCP with explicit IPv4 to avoid IPv6 mismatch on macOS - // (Listen("tcp", "127.0.0.1:0") might bind ::1 on some systems). - ln, err := net.ListenTCP("tcp4", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) - if err != nil { - t.Fatalf("rawHTTPServer listen: %v", err) + conn := mdb.DB + if _, err := conn.ExecContext(context.Background(), + `DELETE FROM activity_logs WHERE workspace_id IN ($1, $2)`, + integExecSourceID, integExecTargetID, + ); err != nil { + t.Fatalf("cleanup activity_logs: %v", err) } - port := ln.Addr().(*net.TCPAddr).Port - serverURL = "http://127.0.0.1:" + strconv.Itoa(port) + "/" - - connCh := make(chan net.Conn, 1) - go func() { - conn, err := ln.Accept() - if err != nil { - return + if _, err := conn.ExecContext(context.Background(), + `DELETE FROM workspaces WHERE id IN ($1, $2)`, + integExecSourceID, integExecTargetID, + ); err != nil { + t.Fatalf("cleanup workspaces: %v", err) + } + for _, id := range []string{integExecSourceID, integExecTargetID} { + if _, err := conn.ExecContext(context.Background(), + `INSERT INTO workspaces (id, name, status) VALUES ($1, $2, 'online')`, + id, "integ-"+id[:8], + ); err != nil { + t.Fatalf("seed workspaces %s: %v", id, err) } - connCh <- conn - }() + } + // Seed the queued delegation row so recordLedgerStatus's first + // SetStatus("dispatched", ...) has somewhere to transition from. + // Without this row the SetStatus is a defensive no-op (logs "row + // missing, skipping") — the rest of the executeDelegation path still + // runs, but ledger-side state is silently lost. We want it real. + recordLedgerInsert(context.Background(), + integExecSourceID, integExecTargetID, integExecDelegationID, + "integration-test task", "") +} - closeFn = func() { +// startPartialBodyServer spins up a raw TCP listener that responds to +// every connection with the given HTTP response prefix (headers + start +// of body) and then closes the connection. Go's http.Client sees io.EOF +// when reading the body. Returns the URL + a stop func. +// +// Unlike httptest.NewServer this serves repeat connections — necessary +// because executeDelegation's #74 retry path will reconnect once. +func startPartialBodyServer(t *testing.T, responseHead string) (url string, stop func()) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + var done int32 + go func() { + for atomic.LoadInt32(&done) == 0 { + conn, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 2048) + _, _ = c.Read(buf) + _, _ = c.Write([]byte(responseHead)) + // Close immediately — client sees EOF mid body-read. + }(conn) + } + }() + return "http://" + ln.Addr().String(), func() { + atomic.StoreInt32(&done, 1) ln.Close() } - - // Handle in background so we don't block test execution. - // Strategy: read available bytes with a deadline (enough for headers). - // After deadline fires, send the response immediately. - // The kernel discards any unread buffered body bytes when the - // connection closes — harmless. - go func() { - conn := <-connCh - if conn == nil { - return - } - - // Read what we can with a 2s deadline. Headers always arrive first. - conn.SetReadDeadline(time.Now().Add(2 * time.Second)) - headerBuf := make([]byte, 4096) - for { - n, err := conn.Read(headerBuf) - if n > 0 { - _ = headerBuf[:n] - } - if err != nil { - break - } - } - - // Send response and IMMEDIATELY close the connection. - // If we keep it open, the client's request-body writer goroutine - // might block on the socket (waiting for the server to drain the - // body). Closing immediately unblocks it. The client already - // received the response, so the write error is harmless. - resp := buildHTTPResponse(statusCode, body) - conn.Write(resp) //nolint:errcheck - conn.Close() - }() - - return serverURL, closeFn } -// buildHTTPResponse constructs a minimal HTTP/1.1 response. -func buildHTTPResponse(statusCode int, body string) []byte { - statusText := http.StatusText(statusCode) - if statusText == "" { - statusText = "Unknown" - } - header := "HTTP/1.1 " + strconv.Itoa(statusCode) + " " + statusText + "\r\n" + - "Content-Type: application/json\r\n" + - "Content-Length: " + strconv.Itoa(len(body)) + "\r\n" + - "Connection: close\r\n" + - "\r\n" - return []byte(header + body) -} - -// setupIntegrationFixtures inserts the rows executeDelegation requires: -// - workspaces: source and target (siblings, parent_id=NULL so CanCommunicate=true) -// - activity_logs: the 'delegate' row that updateDelegationStatus UPDATE will find -// - delegations: the ledger row that recordLedgerStatus will UPDATE -// -// Returns a cleanup function the test should defer. -func setupIntegrationFixtures(t *testing.T, conn *sql.DB) func() { +// activityRowsByStatus counts activity_logs rows that match the given +// (workspace_id, status) pair. Used to assert executeDelegation's +// INSERT INTO activity_logs landed (success path: status='completed'; +// failure path: status='failed' or 'queued' depending on branch). +func activityRowsByStatus(t *testing.T, workspaceID, status string) int { t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - for _, ws := range []struct { - id string - name string - parentID *string - }{ - {testSourceID, "test-source", nil}, - {testTargetID, "test-target", nil}, - } { - if _, err := conn.ExecContext(ctx, - `INSERT INTO workspaces (id, name, parent_id) VALUES ($1::uuid, $2, $3) ON CONFLICT (id) DO NOTHING`, - ws.id, ws.name, ws.parentID, - ); err != nil { - cancel() - t.Fatalf("seed workspace %s: %v", ws.id, err) - } - } - - reqBody, _ := json.Marshal(map[string]any{ - "delegation_id": testDelegationID, - "task": "do work", - }) - if _, err := conn.ExecContext(ctx, ` - INSERT INTO activity_logs - (workspace_id, activity_type, method, source_id, target_id, request_body, status) - VALUES ($1, 'delegate', 'delegate', $1, $2, $3::jsonb, 'pending') - ON CONFLICT DO NOTHING - `, testSourceID, testTargetID, string(reqBody)); err != nil { - cancel() - t.Fatalf("seed activity_logs: %v", err) - } - - if _, err := conn.ExecContext(ctx, ` - INSERT INTO delegations - (delegation_id, caller_id, callee_id, task_preview, status) - VALUES ($1, $2::uuid, $3::uuid, 'do work', 'queued') - ON CONFLICT (delegation_id) DO NOTHING - `, testDelegationID, testSourceID, testTargetID); err != nil { - cancel() - t.Fatalf("seed delegations: %v", err) - } - cancel() - - return func() { - ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel2() - conn.ExecContext(ctx2, - `DELETE FROM activity_logs WHERE workspace_id = $1 AND request_body->>'delegation_id' = $2`, - testSourceID, testDelegationID) - conn.ExecContext(ctx2, - `DELETE FROM delegations WHERE delegation_id = $1`, testDelegationID) - conn.ExecContext(ctx2, - `DELETE FROM workspaces WHERE id IN ($1, $2)`, testSourceID, testTargetID) + var n int + if err := mdb.DB.QueryRowContext(context.Background(), + `SELECT count(*) FROM activity_logs WHERE workspace_id = $1 AND status = $2`, + workspaceID, status, + ).Scan(&n); err != nil { + t.Fatalf("activity count(%s, %s): %v", workspaceID, status, err) } + return n } -// readDelegationRow returns (status, result_preview, error_detail) for the test -// delegation, or fails the test if the row is not found. -func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail string) { +// delegationLedgerStatus returns the current delegations.status for the +// seeded delegation_id, or "" if the row is missing. Real-Postgres +// version of "did the ledger transition we expected actually land". +func delegationLedgerStatus(t *testing.T, delegationID string) string { t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - var prev, errDet sql.NullString - err := conn.QueryRowContext(ctx, - `SELECT status, result_preview, error_detail FROM delegations WHERE delegation_id = $1`, - testDelegationID, - ).Scan(&status, &prev, &errDet) + var s string + err := mdb.DB.QueryRowContext(context.Background(), + `SELECT status FROM delegations WHERE delegation_id = $1`, delegationID, + ).Scan(&s) if err != nil { - t.Fatalf("readDelegationRow: %v", err) - } - return status, prev.String, errDet.String -} - -// stack returns the current goroutine stack trace. Used by runWithTimeout to -// pinpoint the blocking call site when a test times out. -func stack() string { - buf := make([]byte, 4096) - n := runtime.Stack(buf, false) - return string(buf[:n]) -} - -// runWithTimeout calls fn in a goroutine and fails t if it doesn't return within -// timeout. ctx is passed to fn so it can propagate cancellation to -// executeDelegation's DB and network operations — without this, the goroutine -// leaks indefinitely when the test times out (context.Background() never cancels). -func runWithTimeout(t *testing.T, timeout time.Duration, fn func(context.Context)) { - t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - done := make(chan struct{}) - var panicErr interface{} - go func() { - defer func() { - if p := recover(); p != nil { - panicErr = p - } - close(done) - }() - fn(ctx) - }() - - select { - case <-done: - if panicErr != nil { - t.Fatalf("executeDelegation panicked: %v\n%s", panicErr, stack()) - } - case <-ctx.Done(): - cancel() - t.Fatalf("executeDelegation timed out after %s\n%s", timeout, stack()) + t.Fatalf("ledger status(%s): %v", delegationID, err) } + return s } // TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess -// is the integration regression gate for issue #159. +// is the primary regression test for issue #159 in real-Postgres form. +// Scenario: target sends a 200 response with declared Content-Length but +// closes the connection mid-body; client gets io.EOF on body read. +// proxyA2ARequest captures status=200 + partial body + transport error; +// executeDelegation's isDeliveryConfirmedSuccess branch must route to +// handleSuccess so the row lands as 'completed' (not 'failed'). // -// Scenario: proxyA2ARequest returns a 200 status code with a non-empty body. -// isDeliveryConfirmedSuccess guard (status>=200 && <300 && len(body)>0 && err!=nil) -// routes to handleSuccess. The integration test verifies the DB row lands at -// 'completed' with the response body as result_preview. +// Real-Postgres advantage over the sqlmock version: this test will fail +// if a future refactor adds a new DB write to the success path without +// updating any helper — sqlmock would have required reflexive expectation +// updates; real Postgres just runs. +// +// Timing: executeDelegation's first attempt returns (200, , EOF +// → BadGateway-class err). isTransientProxyError(BadGateway)=true so the +// caller sleeps `delegationRetryDelay` (8s) and retries. Our listener +// loop serves the same partial response on attempt 2, producing the +// same (200, , BadGateway) triple. isDeliveryConfirmedSuccess +// then fires (status=200 ∈ [200,300) + body > 0 + err != nil) → success. func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { - allowLoopbackForTest(t) - conn := integrationDB(t) - cleanup := setupIntegrationFixtures(t, conn) - defer cleanup() + integrationDB(t) t.Setenv("DELEGATION_LEDGER_WRITE", "1") - - agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) - defer closeServer() + seedExecuteDelegationFixtures(t) mr := setupTestRedis(t) - defer mr.Close() - db.CacheURL(context.Background(), testTargetID, agentURL) - - prevClient := a2aClient - defer func() { a2aClient = prevClient }() - a2aClient = newA2AClientForHost(extractHostPort(agentURL)) - + allowLoopbackForTest(t) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) dh := NewDelegationHandler(wh, broadcaster) + // 200 OK with declared Content-Length=100 but only 74 bytes of body. + // Connection closes after the partial body → client io.EOF. + resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" + resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes + agentURL, stop := startPartialBodyServer(t, resp) + defer stop() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL) + a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", - "id": "1", - "method": "message/send", + "jsonrpc": "2.0", "id": "1", "method": "message/send", "params": map[string]interface{}{ "message": map[string]interface{}{ "role": "user", @@ -300,50 +255,46 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce }, }, }) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) - start := time.Now() - runWithTimeout(t, 30*time.Second, func(ctx context.Context) { - dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) - }) - t.Logf("executeDelegation took %v", time.Since(start)) + // executeDelegation is synchronous here; the 8s retry sleep is INSIDE + // the call. We still need a small buffer for the async logA2ASuccess / + // last_outbound_at goroutines that fan out after the success branch. + time.Sleep(500 * time.Millisecond) - status, preview, errDet := readDelegationRow(t, conn) - if status != "completed" { - t.Errorf("status: want completed, got %q", status) + // Assert the executeDelegation success path wrote the activity_logs + // completion row + transitioned the ledger to completed. + if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 { + t.Errorf("expected 1 'completed' activity_logs row, got %d", got) } - if preview == "" { - t.Errorf("result_preview should be non-empty, got %q", preview) - } - if errDet != "" { - t.Errorf("error_detail should be empty on success: got %q", errDet) + if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" { + t.Errorf("delegation ledger: want status=completed, got %q", s) } } -// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that -// a 500 response routes to failure, not success. isDeliveryConfirmedSuccess -// requires status>=200 && <300, so 500 always fails the guard. +// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed — +// 500 with partial body + connection drop. The retry produces the same +// 500 partial. isDeliveryConfirmedSuccess fails on status>=300 → falls +// through to the failure branch. Pins that the new condition didn't +// accidentally widen the success branch. func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { - allowLoopbackForTest(t) - conn := integrationDB(t) - cleanup := setupIntegrationFixtures(t, conn) - defer cleanup() + integrationDB(t) t.Setenv("DELEGATION_LEDGER_WRITE", "1") - - agentURL, closeServer := rawHTTPServer(t, 500, `{"error":"agent crashed"}`) - defer closeServer() + seedExecuteDelegationFixtures(t) mr := setupTestRedis(t) - defer mr.Close() - db.CacheURL(context.Background(), testTargetID, agentURL) - - prevClient := a2aClient - defer func() { a2aClient = prevClient }() - a2aClient = newA2AClientForHost(extractHostPort(agentURL)) - + allowLoopbackForTest(t) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) dh := NewDelegationHandler(wh, broadcaster) + resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" + resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared 100 + agentURL, stop := startPartialBodyServer(t, resp) + defer stop() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL) + a2aBody, _ := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": "1", "method": "message/send", "params": map[string]interface{}{ @@ -353,46 +304,41 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing }, }, }) - start := time.Now() - runWithTimeout(t, 30*time.Second, func(ctx context.Context) { - dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) - }) - t.Logf("executeDelegation took %v", time.Since(start)) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) - status, _, errDet := readDelegationRow(t, conn) - if status != "failed" { - t.Errorf("status: want failed, got %q", status) + time.Sleep(500 * time.Millisecond) + + if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 { + t.Errorf("expected 1 'failed' activity_logs row, got %d", got) } - if errDet == "" { - t.Error("error_detail should be non-empty on failure") + if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" { + t.Errorf("delegation ledger: want status=failed, got %q", s) } } -// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that -// a 200 response with an empty body routes to failure. isDeliveryConfirmedSuccess -// requires len(body) > 0, so an empty body fails the guard. +// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed — +// 502 Bad Gateway with empty body, normal close. proxyA2ARequest returns +// (502, "", error). isDeliveryConfirmedSuccess requires len(respBody) > 0 +// → false → falls through to the failure branch. isTransientProxyError +// (BadGateway) = true so we get a retry that also fails, then 'failed'. func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { - allowLoopbackForTest(t) - conn := integrationDB(t) - cleanup := setupIntegrationFixtures(t, conn) - defer cleanup() + integrationDB(t) t.Setenv("DELEGATION_LEDGER_WRITE", "1") - - agentURL, closeServer := rawHTTPServer(t, 200, "") - defer closeServer() + seedExecuteDelegationFixtures(t) mr := setupTestRedis(t) - defer mr.Close() - db.CacheURL(context.Background(), testTargetID, agentURL) - - prevClient := a2aClient - defer func() { a2aClient = prevClient }() - a2aClient = newA2AClientForHost(extractHostPort(agentURL)) - + allowLoopbackForTest(t) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) dh := NewDelegationHandler(wh, broadcaster) + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL) + a2aBody, _ := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": "1", "method": "message/send", "params": map[string]interface{}{ @@ -402,45 +348,43 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test }, }, }) - start := time.Now() - runWithTimeout(t, 30*time.Second, func(ctx context.Context) { - dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) - }) - t.Logf("executeDelegation took %v", time.Since(start)) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) - status, _, errDet := readDelegationRow(t, conn) - if status != "failed" { - t.Errorf("status: want failed, got %q", status) + time.Sleep(500 * time.Millisecond) + + if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 { + t.Errorf("expected 1 'failed' activity_logs row, got %d", got) } - if errDet == "" { - t.Error("error_detail should be non-empty on failure") + if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" { + t.Errorf("delegation ledger: want status=failed, got %q", s) } } -// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged is the baseline: -// a clean 200 response with a valid body and no error routes to success. +// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged — +// baseline: clean 200 with full body, no error. proxyErr == nil so +// isDeliveryConfirmedSuccess never fires and no retry runs (fast path). +// Pins that the new error-recovery branch didn't regress the most +// common code path. func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { - allowLoopbackForTest(t) - conn := integrationDB(t) - cleanup := setupIntegrationFixtures(t, conn) - defer cleanup() + integrationDB(t) t.Setenv("DELEGATION_LEDGER_WRITE", "1") - - agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"all good"}]}}`) - defer closeServer() + seedExecuteDelegationFixtures(t) mr := setupTestRedis(t) - defer mr.Close() - db.CacheURL(context.Background(), testTargetID, agentURL) - - prevClient := a2aClient - defer func() { a2aClient = prevClient }() - a2aClient = newA2AClientForHost(extractHostPort(agentURL)) - + allowLoopbackForTest(t) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) dh := NewDelegationHandler(wh, broadcaster) + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL) + a2aBody, _ := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": "1", "method": "message/send", "params": map[string]interface{}{ @@ -450,86 +394,14 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T }, }, }) - start := time.Now() - runWithTimeout(t, 30*time.Second, func(ctx context.Context) { - dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) - }) - t.Logf("executeDelegation took %v", time.Since(start)) + dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody) - status, preview, errDet := readDelegationRow(t, conn) - if status != "completed" { - t.Errorf("status: want completed, got %q", status) + time.Sleep(500 * time.Millisecond) + + if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 { + t.Errorf("expected 1 'completed' activity_logs row, got %d", got) } - if preview == "" { - t.Errorf("result_preview should be non-empty, got %q", preview) - } - if errDet != "" { - t.Errorf("error_detail should be empty on success: got %q", errDet) - } -} - -// Test that a delegation where Redis cannot be reached still routes to failure -// (not panic). proxyA2ARequest falls back to DB URL lookup when Redis is down. -func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) { - allowLoopbackForTest(t) - conn := integrationDB(t) - cleanup := setupIntegrationFixtures(t, conn) - defer cleanup() - t.Setenv("DELEGATION_LEDGER_WRITE", "1") - - // Set up miniredis so db.RDB is non-nil, but do NOT cache any URL. - // resolveAgentURL skips Redis and falls back to DB, which also has no URL. - mr := setupTestRedis(t) - defer mr.Close() - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - start := time.Now() - runWithTimeout(t, 30*time.Second, func(ctx context.Context) { - dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) - }) - t.Logf("executeDelegation took %v", time.Since(start)) - - status, _, errDet := readDelegationRow(t, conn) - if status != "failed" { - t.Errorf("status: want failed (no target URL), got %q", status) - } - if errDet == "" { - t.Error("error_detail should be set on failure due to unreachable target") - } -} - -// extractHostPort parses "http://127.0.0.1:PORT/" and returns "127.0.0.1:PORT". -func extractHostPort(rawURL string) string { - // Simple parse: strip "http://" prefix and trailing slash. - // The URL format is always "http://127.0.0.1:PORT/" in our usage. - if len(rawURL) > 7 { - return rawURL[7 : len(rawURL)-1] - } - return rawURL -} - -// newA2AClientForHost creates an http.Client that redirects all connections -// to the given host:port. This lets us mock the agent endpoint without -// running a real HTTP server. -func newA2AClientForHost(targetHost string) *http.Client { - return &http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("tcp", targetHost) - }, - ResponseHeaderTimeout: 180 * time.Second, - }, + if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" { + t.Errorf("delegation ledger: want status=completed, got %q", s) } } diff --git a/workspace-server/internal/ws/hub_test.go b/workspace-server/internal/ws/hub_test.go index 6416ab6f..b27fc3e5 100644 --- a/workspace-server/internal/ws/hub_test.go +++ b/workspace-server/internal/ws/hub_test.go @@ -94,10 +94,9 @@ func TestSafeSend_FullChannel(t *testing.T) { func TestBroadcast_CanvasClientGetsAll(t *testing.T) { ch := make(chan []byte, 10) - h := NewHub(nil) // no CanCommunicate — canvas clients always get messages - h.clients = map[*Client]bool{ - {WorkspaceID: "", Send: ch}: true, - } + client := &Client{WorkspaceID: "", Send: ch} + h := NewHub(nil) + h.clients = map[*Client]bool{client: true} h.Broadcast(models.WSMessage{Type: "test", Content: "hello"}) <-ch // non-blocking since channel has capacity @@ -105,14 +104,13 @@ func TestBroadcast_CanvasClientGetsAll(t *testing.T) { func TestBroadcast_WorkspaceClientGetsWhenAllowed(t *testing.T) { ch := make(chan []byte, 10) + client := &Client{WorkspaceID: "ws-caller", Send: ch} allowed := false h := NewHub(func(callerID, targetID string) bool { return allowed }) msg := models.WSMessage{Type: "test", Content: "secret", WorkspaceID: "ws-target"} - h.clients = map[*Client]bool{ - {WorkspaceID: "ws-caller", Send: ch}: true, - } + h.clients = map[*Client]bool{client: true} // Not allowed — should not receive h.Broadcast(msg) @@ -129,15 +127,20 @@ func TestBroadcast_WorkspaceClientGetsWhenAllowed(t *testing.T) { } func TestBroadcast_DropsOnClosedChannel(t *testing.T) { - ch := make(chan []byte) // unbuffered — will block + // Use a named variable for the client so the map key and Broadcast's + // range both refer to the same *Client pointer. + ch := make(chan []byte, 1) + client := &Client{WorkspaceID: "", Send: ch} h := NewHub(nil) - h.clients = map[*Client]bool{ - {WorkspaceID: "", Send: ch}: true, - } + h.clients = map[*Client]bool{client: true} - // Broadcast should not panic even though channel is blocking + // Fill and close so any subsequent send (from Broadcast) hits + // safeSend's default → returns false without blocking or panicking. + ch <- []byte("fill") + close(ch) + + // Broadcast must not panic — safeSend returns false for closed channel h.Broadcast(models.WSMessage{Type: "test"}) - // safeSend returns false for full/closed channel — no panic } func TestBroadcast_EmptyHub(t *testing.T) { @@ -150,15 +153,14 @@ func TestBroadcast_MultipleClients(t *testing.T) { ch1 := make(chan []byte, 10) ch2 := make(chan []byte, 10) ch3 := make(chan []byte, 10) // disallowed + c1 := &Client{WorkspaceID: "ws-1", Send: ch1} + c2 := &Client{WorkspaceID: "ws-2", Send: ch2} + c3 := &Client{WorkspaceID: "ws-3", Send: ch3} h := NewHub(func(callerID, targetID string) bool { return targetID != "ws-3" }) msg := models.WSMessage{Type: "test", Content: "hello", WorkspaceID: "ws-target"} - h.clients = map[*Client]bool{ - {WorkspaceID: "ws-1", Send: ch1}: true, - {WorkspaceID: "ws-2", Send: ch2}: true, - {WorkspaceID: "ws-3", Send: ch3}: true, - } + h.clients = map[*Client]bool{c1: true, c2: true, c3: true} h.Broadcast(msg) @@ -184,13 +186,14 @@ func TestBroadcast_MultipleClients(t *testing.T) { func TestBroadcast_CanvasClientAlwaysGets(t *testing.T) { ch := make(chan []byte, 10) + canvasClient := &Client{WorkspaceID: "", Send: ch} h := NewHub(func(callerID, targetID string) bool { return false // nobody can communicate with anybody }) msg := models.WSMessage{Type: "test", Content: "canvas only", WorkspaceID: "ws-target"} h.clients = map[*Client]bool{ - {WorkspaceID: "", Send: ch}: true, // canvas client - {WorkspaceID: "ws-target", Send: make(chan []byte, 10)}: true, + canvasClient: true, // canvas client + &Client{WorkspaceID: "ws-target", Send: make(chan []byte, 10)}: true, } h.Broadcast(msg)