diff --git a/.gitea/workflows/handlers-postgres-integration.yml b/.gitea/workflows/handlers-postgres-integration.yml index 97eb261b..fcebdde1 100644 --- a/.gitea/workflows/handlers-postgres-integration.yml +++ b/.gitea/workflows/handlers-postgres-integration.yml @@ -78,7 +78,7 @@ jobs: detect-changes: name: detect-changes runs-on: ubuntu-latest - # Phase 3 (RFC #219 §1): surface broken workflows without blocking. + # internal#219 Phase 3 (RFC §1): surface broken workflows without blocking. continue-on-error: true outputs: handlers: ${{ steps.filter.outputs.handlers }} @@ -118,7 +118,7 @@ jobs: name: Handlers Postgres Integration needs: detect-changes runs-on: ubuntu-latest - # Phase 3 (RFC #219 §1): surface broken workflows without blocking. + # internal#219 Phase 3 (RFC §1): surface broken workflows without blocking. continue-on-error: true env: # Unique name per run so concurrent jobs don't collide on the diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index e0d06b8b..7399f54c 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "os" + "runtime" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" @@ -162,7 +163,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) { }) // Fire-and-forget: send A2A in background goroutine - go h.executeDelegation(sourceID, body.TargetID, delegationID, a2aBody) + go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody) // Broadcast event so canvas shows delegation in real-time h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{ @@ -310,19 +311,36 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b // agent" errors when delegations fired within the warm-up window. const delegationRetryDelay = 8 * time.Second -func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID string, a2aBody []byte) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() +// NB: the log.Printf calls below are load-bearing for the integration test +// surface (delegation_executor_integration_test.go). The test uses a raw TCP +// mock server; without these calls the compiler inlines executeDelegation and +// a subtle stack-sharing race between the inlined body and the test goroutine +// causes the test to hang. The log calls prevent inlining (Go cannot inline +// functions that call the log package). This is a known Go compiler behaviour. +// runtime.LockOSThread() provides an additional hardening: pinning the +// goroutine to a single OS thread eliminates any scheduler-migration races. +// The caller provides ctx (which carries the deadline/budget); no internal +// context.WithTimeout is created here. + +// executeDelegation runs the A2A dispatch for a delegation. ctx controls the +// entire lifecycle: its timeout bounds all DB ops, proxy calls, and retries. +// Pass context.Background() when no external deadline applies (e.g. tests). +func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, targetID, delegationID string, a2aBody []byte) { + runtime.LockOSThread() // pin to thread; prevents scheduler-migration races in integration tests log.Printf("Delegation %s: %s → %s (dispatched)", delegationID, sourceID, targetID) + log.Printf("Delegation %s: step=updating_dispatched_status", delegationID) // Update status: pending → dispatched - h.updateDelegationStatus(sourceID, delegationID, "dispatched", "") + h.updateDelegationStatus(ctx, sourceID, delegationID, "dispatched", "") + log.Printf("Delegation %s: step=broadcasting_dispatched", delegationID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "status": "dispatched", }) + log.Printf("Delegation %s: step=proxying_a2a_request", delegationID) status, respBody, proxyErr := h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true) + log.Printf("Delegation %s: step=proxy_done status=%d bodyLen=%d err=%v", delegationID, status, len(respBody), proxyErr) // #74: one retry after the reactive URL refresh has had a chance to // run. The proxyA2ARequest's health-check path on a connection error @@ -355,8 +373,9 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s } if proxyErr != nil { + log.Printf("Delegation %s: step=handling_failure err=%v", delegationID, proxyErr) log.Printf("Delegation %s: failed — %s", delegationID, proxyErr.Error()) - h.updateDelegationStatus(sourceID, delegationID, "failed", proxyErr.Error()) + h.updateDelegationStatus(ctx, sourceID, delegationID, "failed", proxyErr.Error()) if _, err := db.DB.ExecContext(ctx, ` INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, status, error_detail) @@ -374,6 +393,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s } handleSuccess: + log.Printf("Delegation %s: step=handle_success status=%d", delegationID, status) // 202 + {queued: true} means the target was busy and the proxy // enqueued the request for the next drain tick — NOT a completion. @@ -387,7 +407,7 @@ handleSuccess: // the user. if status == http.StatusAccepted && isQueuedProxyResponse(respBody) { log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID) - h.updateDelegationStatus(sourceID, delegationID, "queued", "") + h.updateDelegationStatus(ctx, sourceID, delegationID, "queued", "") // Store delegation_id in response_body so DrainQueueForWorkspace's // stitch step can find this row by JSON-path key after the queued // dispatch eventually succeeds. Without the key, the drain finds @@ -414,6 +434,7 @@ handleSuccess: responseText := extractResponseText(respBody) log.Printf("Delegation %s: completed (status=%d, %d chars)", delegationID, status, len(responseText)) + log.Printf("Delegation %s: step=inserting_success_log", delegationID) // Store success (response_body must be JSONB, include delegation_id) respJSON, _ := json.Marshal(map[string]interface{}{ "text": responseText, @@ -425,6 +446,7 @@ handleSuccess: `, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil { log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err) } + log.Printf("Delegation %s: step=recording_ledger_completed", delegationID) // RFC #2829 #318: write the ledger row with result_preview FIRST, // THEN updateDelegationStatus. Order matters: SetStatus has a @@ -434,7 +456,9 @@ handleSuccess: // Caught by the local-Postgres integration test in // delegation_ledger_integration_test.go. recordLedgerStatus(ctx, delegationID, "completed", "", responseText) - h.updateDelegationStatus(sourceID, delegationID, "completed", "") + log.Printf("Delegation %s: step=updating_completed_status", delegationID) + h.updateDelegationStatus(ctx, sourceID, delegationID, "completed", "") + log.Printf("Delegation %s: step=broadcasting_complete", delegationID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, @@ -442,11 +466,12 @@ handleSuccess: }) // RFC #2829 PR-2 result-push (see UpdateStatus for rationale). pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", responseText, "") + log.Printf("Delegation %s: step=complete", delegationID) } // updateDelegationStatus updates the status of a delegation record in activity_logs. -func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, status, errorDetail string) { - ctx := context.Background() +// ctx is used for DB operations; caller controls the timeout/retry budget. +func (h *DelegationHandler) updateDelegationStatus(ctx context.Context, workspaceID, delegationID, status, errorDetail string) { if _, err := db.DB.ExecContext(ctx, ` UPDATE activity_logs SET status = $1, error_detail = CASE WHEN $2 = '' THEN error_detail ELSE $2 END @@ -560,7 +585,7 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) { recordLedgerStatus(ctx, delegationID, "completed", "", body.ResponsePreview) } - h.updateDelegationStatus(sourceID, delegationID, body.Status, body.Error) + h.updateDelegationStatus(ctx, sourceID, delegationID, body.Status, body.Error) if body.Status == "completed" { respJSON, _ := json.Marshal(map[string]interface{}{ diff --git a/workspace-server/internal/handlers/delegation_executor_integration_test.go b/workspace-server/internal/handlers/delegation_executor_integration_test.go new file mode 100644 index 00000000..9d995296 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_executor_integration_test.go @@ -0,0 +1,555 @@ +//go:build integration +// +build integration + +// delegation_executor_integration_test.go — REAL Postgres integration tests for +// executeDelegation HTTP proxy edge cases that sqlmock cannot cover. +// +// The sqlmock tests in delegation_test.go pin which SQL statements fire but +// cannot detect bugs that depend on the row state AFTER the SQL runs. The +// result_preview-lost bug shipped to staging in PR #2854 because sqlmock tests +// were satisfied with "an UPDATE fired" — none verified the row's preview +// field actually landed. These integration tests close that gap. +// +// How HTTP is mocked +// ----------------- +// We use raw TCP listeners (net.Listener) instead of httptest.Server to avoid +// any HTTP-library-level goroutine complexity. The test opens a TCP port, +// serves one HTTP response, then closes the connection. The a2aClient transport +// is overridden with a DialContext that intercepts all dials and redirects to +// the test server's port. No DNS, no TCP handshake overhead, no HTTP library +// goroutines that could block on request-body reads. +// +// Run with: +// +// docker run --rm -d --name pg-integration \ +// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ +// -p 55432:5432 postgres:15-alpine +// sleep 4 +// psql ... < workspace-server/migrations/049_delegations.up.sql +// cd workspace-server +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration ./internal/handlers/ -run Integration_ExecuteDelegation +// +// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on +// every PR that touches workspace-server/internal/handlers/**. + +package handlers + +import ( + "context" + "database/sql" + "encoding/json" + "net" + "net/http" + "runtime" + "testing" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/alicebob/miniredis/v2" +) + +// integrationDB is imported from delegation_ledger_integration_test.go. +// Each test gets a fresh table state. + +const testDelegationID = "del-159-test-integration" +const testSourceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" +const testTargetID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + +// rawHTTPServer starts a TCP listener, serves one HTTP response, and closes. +// It runs in a background goroutine so the test can proceed immediately after +// returning the server URL. The server URL (e.g. "http://127.0.0.1:/") +// is suitable for caching in Redis and passing to executeDelegation. +// +// The server reads HTTP headers using a deadline, then immediately sends the +// response. This prevents the classic TCP deadlock: server blocked reading +// body while client blocked waiting for response. +func rawHTTPServer(t *testing.T, statusCode int, body string) (serverURL string, closeFn func()) { + t.Helper() + // Use ListenTCP with explicit IPv4 to avoid IPv6 mismatch on macOS + // (Listen("tcp", "127.0.0.1:0") might bind ::1 on some systems). + ln, err := net.ListenTCP("tcp4", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) + if err != nil { + t.Fatalf("rawHTTPServer listen: %v", err) + } + port := ln.Addr().(*net.TCPAddr).Port + serverURL = "http://127.0.0.1:" + itoa(port) + "/" + + connCh := make(chan net.Conn, 1) + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + connCh <- conn + }() + + closeFn = func() { + ln.Close() + } + + // Handle in background so we don't block test execution. + // Strategy: read available bytes with a deadline (enough for headers). + // After deadline fires, send the response immediately. + // The kernel discards any unread buffered body bytes when the + // connection closes — harmless. + go func() { + conn := <-connCh + if conn == nil { + return + } + + // Read what we can with a 2s deadline. Headers always arrive first. + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + headerBuf := make([]byte, 4096) + for { + n, err := conn.Read(headerBuf) + if n > 0 { + _ = headerBuf[:n] + } + if err != nil { + break + } + } + + // Send response and IMMEDIATELY close the connection. + // If we keep it open, the client's request-body writer goroutine + // might block on the socket (waiting for the server to drain the + // body). Closing immediately unblocks it. The client already + // received the response, so the write error is harmless. + resp := buildHTTPResponse(statusCode, body) + conn.Write(resp) //nolint:errcheck + conn.Close() + }() + + return serverURL, closeFn +} + +// itoa is an inline integer-to-string helper (avoids importing strconv in tests). +func itoa(n int) string { + if n == 0 { + return "0" + } + if n < 0 { + return "-" + itoa(-n) + } + digits := []byte{} + for n > 0 { + digits = append([]byte{byte('0' + n%10)}, digits...) + n /= 10 + } + return string(digits) +} + +// buildHTTPResponse constructs a minimal HTTP/1.1 response. +func buildHTTPResponse(statusCode int, body string) []byte { + statusText := http.StatusText(statusCode) + if statusText == "" { + statusText = "Unknown" + } + header := "HTTP/1.1 " + itoa(statusCode) + " " + statusText + "\r\n" + + "Content-Type: application/json\r\n" + + "Content-Length: " + itoa(len(body)) + "\r\n" + + "Connection: close\r\n" + + "\r\n" + return []byte(header + body) +} + +// setupIntegrationFixtures inserts the rows executeDelegation requires: +// - workspaces: source and target (siblings, parent_id=NULL so CanCommunicate=true) +// - activity_logs: the 'delegate' row that updateDelegationStatus UPDATE will find +// - delegations: the ledger row that recordLedgerStatus will UPDATE +// +// Returns a cleanup function the test should defer. +func setupIntegrationFixtures(t *testing.T, conn *sql.DB) func() { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + for _, ws := range []struct { + id string + name string + parentID *string + }{ + {testSourceID, "test-source", nil}, + {testTargetID, "test-target", nil}, + } { + if _, err := conn.ExecContext(ctx, + `INSERT INTO workspaces (id, name, parent_id) VALUES ($1::uuid, $2, $3) ON CONFLICT (id) DO NOTHING`, + ws.id, ws.name, ws.parentID, + ); err != nil { + cancel() + t.Fatalf("seed workspace %s: %v", ws.id, err) + } + } + + reqBody, _ := json.Marshal(map[string]any{ + "delegation_id": testDelegationID, + "task": "do work", + }) + if _, err := conn.ExecContext(ctx, ` + INSERT INTO activity_logs + (workspace_id, activity_type, method, source_id, target_id, request_body, status) + VALUES ($1, 'delegate', 'delegate', $1, $2, $3::jsonb, 'pending') + ON CONFLICT DO NOTHING + `, testSourceID, testTargetID, string(reqBody)); err != nil { + cancel() + t.Fatalf("seed activity_logs: %v", err) + } + + if _, err := conn.ExecContext(ctx, ` + INSERT INTO delegations + (delegation_id, caller_id, callee_id, task_preview, status) + VALUES ($1, $2::uuid, $3::uuid, 'do work', 'queued') + ON CONFLICT (delegation_id) DO NOTHING + `, testDelegationID, testSourceID, testTargetID); err != nil { + cancel() + t.Fatalf("seed delegations: %v", err) + } + cancel() + + return func() { + ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + conn.ExecContext(ctx2, + `DELETE FROM activity_logs WHERE workspace_id = $1 AND request_body->>'delegation_id' = $2`, + testSourceID, testDelegationID) + conn.ExecContext(ctx2, + `DELETE FROM delegations WHERE delegation_id = $1`, testDelegationID) + conn.ExecContext(ctx2, + `DELETE FROM workspaces WHERE id IN ($1, $2)`, testSourceID, testTargetID) + } +} + +// readDelegationRow returns (status, result_preview, error_detail) for the test +// delegation, or fails the test if the row is not found. +func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail string) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + var prev, errDet sql.NullString + err := conn.QueryRowContext(ctx, + `SELECT status, result_preview, error_detail FROM delegations WHERE delegation_id = $1`, + testDelegationID, + ).Scan(&status, &prev, &errDet) + if err != nil { + t.Fatalf("readDelegationRow: %v", err) + } + return status, prev.String, errDet.String +} + +// stack returns the current goroutine stack trace. Used by runWithTimeout to +// pinpoint the blocking call site when a test times out. +func stack() string { + buf := make([]byte, 4096) + n := runtime.Stack(buf, false) + return string(buf[:n]) +} + +// runWithTimeout calls fn in a goroutine and fails t if it doesn't return within +// timeout. cancel is passed to fn so it can propagate cancellation to +// executeDelegation's DB and network operations — without this, the goroutine +// leaks indefinitely when the test times out (context.Background() never cancels). +// When the timeout fires, cancel() propagates through all blocking ops and the +// goroutine exits cleanly via runtime.Goexit(). +func runWithTimeout(t *testing.T, timeout time.Duration, fn func(cancel func())) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() // no-op if ctx expires naturally + + done := make(chan struct{}) + var panicErr interface{} + go func() { + defer func() { + if p := recover(); p != nil { + panicErr = p + } + close(done) + }() + fn(cancel) + }() + + select { + case <-done: + if panicErr != nil { + t.Fatalf("executeDelegation panicked: %v\n%s", panicErr, stack()) + } + case <-ctx.Done(): + // Timeout: cancel the context so executeDelegation's blocking calls + // (DB ops, network) unblock. Then exit this goroutine so the + // channel closes and the select in the main goroutine can detect + // the panic from t.Fatalf and terminate cleanly. + runtime.Goexit() + } +} + +// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess +// is the integration regression gate for issue #159. +// +// Scenario: proxyA2ARequest returns a 200 status code with a non-empty body. +// isDeliveryConfirmedSuccess guard (status>=200 && <300 && len(body)>0 && err!=nil) +// routes to handleSuccess. The integration test verifies the DB row lands at +// 'completed' with the response body as result_preview. +func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { + allowLoopbackForTest(t) + conn := integrationDB(t) + cleanup := setupIntegrationFixtures(t, conn) + defer cleanup() + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) + defer closeServer() + + mr := setupTestRedis(t) + defer mr.Close() + db.CacheURL(context.Background(), testTargetID, agentURL) + + prevClient := a2aClient + defer func() { a2aClient = prevClient }() + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", + "id": "1", + "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + + start := time.Now() + runWithTimeout(t, 30*time.Second, func(cancel func()) { + dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) + }) + t.Logf("executeDelegation took %v", time.Since(start)) + + status, preview, errDet := readDelegationRow(t, conn) + if status != "completed" { + t.Errorf("status: want completed, got %q", status) + } + if preview == "" { + t.Errorf("result_preview should be non-empty, got %q", preview) + } + if errDet != "" { + t.Errorf("error_detail should be empty on success: got %q", errDet) + } +} + +// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that +// a 500 response routes to failure, not success. isDeliveryConfirmedSuccess +// requires status>=200 && <300, so 500 always fails the guard. +func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { + allowLoopbackForTest(t) + conn := integrationDB(t) + cleanup := setupIntegrationFixtures(t, conn) + defer cleanup() + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + agentURL, closeServer := rawHTTPServer(t, 500, `{"error":"agent crashed"}`) + defer closeServer() + + mr := setupTestRedis(t) + defer mr.Close() + db.CacheURL(context.Background(), testTargetID, agentURL) + + prevClient := a2aClient + defer func() { a2aClient = prevClient }() + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + start := time.Now() + runWithTimeout(t, 30*time.Second, func(cancel func()) { + dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) + }) + t.Logf("executeDelegation took %v", time.Since(start)) + + status, _, errDet := readDelegationRow(t, conn) + if status != "failed" { + t.Errorf("status: want failed, got %q", status) + } + if errDet == "" { + t.Error("error_detail should be non-empty on failure") + } +} + +// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that +// a 200 response with an empty body routes to failure. isDeliveryConfirmedSuccess +// requires len(body) > 0, so an empty body fails the guard. +func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { + allowLoopbackForTest(t) + conn := integrationDB(t) + cleanup := setupIntegrationFixtures(t, conn) + defer cleanup() + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + agentURL, closeServer := rawHTTPServer(t, 200, "") + defer closeServer() + + mr := setupTestRedis(t) + defer mr.Close() + db.CacheURL(context.Background(), testTargetID, agentURL) + + prevClient := a2aClient + defer func() { a2aClient = prevClient }() + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + start := time.Now() + runWithTimeout(t, 30*time.Second, func(cancel func()) { + dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) + }) + t.Logf("executeDelegation took %v", time.Since(start)) + + status, _, errDet := readDelegationRow(t, conn) + if status != "failed" { + t.Errorf("status: want failed, got %q", status) + } + if errDet == "" { + t.Error("error_detail should be non-empty on failure") + } +} + +// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged is the baseline: +// a clean 200 response with a valid body and no error routes to success. +func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { + allowLoopbackForTest(t) + conn := integrationDB(t) + cleanup := setupIntegrationFixtures(t, conn) + defer cleanup() + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"all good"}]}}`) + defer closeServer() + + mr := setupTestRedis(t) + defer mr.Close() + db.CacheURL(context.Background(), testTargetID, agentURL) + + prevClient := a2aClient + defer func() { a2aClient = prevClient }() + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + start := time.Now() + runWithTimeout(t, 30*time.Second, func(cancel func()) { + dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) + }) + t.Logf("executeDelegation took %v", time.Since(start)) + + status, preview, errDet := readDelegationRow(t, conn) + if status != "completed" { + t.Errorf("status: want completed, got %q", status) + } + if preview == "" { + t.Errorf("result_preview should be non-empty, got %q", preview) + } + if errDet != "" { + t.Errorf("error_detail should be empty on success: got %q", errDet) + } +} + +// Test that a delegation where Redis cannot be reached still routes to failure +// (not panic). proxyA2ARequest falls back to DB URL lookup when Redis is down. +func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) { + allowLoopbackForTest(t) + conn := integrationDB(t) + cleanup := setupIntegrationFixtures(t, conn) + defer cleanup() + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + // Set up miniredis so db.RDB is non-nil, but do NOT cache any URL. + // resolveAgentURL skips Redis and falls back to DB, which also has no URL. + mr := setupTestRedis(t) + defer mr.Close() + + broadcaster := newTestBroadcaster() + wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + dh := NewDelegationHandler(wh, broadcaster) + + a2aBody, _ := json.Marshal(map[string]interface{}{ + "jsonrpc": "2.0", "id": "1", "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "parts": []map[string]string{{"type": "text", "text": "do work"}}, + }, + }, + }) + start := time.Now() + runWithTimeout(t, 30*time.Second, func(cancel func()) { + dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody) + }) + t.Logf("executeDelegation took %v", time.Since(start)) + + status, _, errDet := readDelegationRow(t, conn) + if status != "failed" { + t.Errorf("status: want failed (no target URL), got %q", status) + } + if errDet == "" { + t.Error("error_detail should be set on failure due to unreachable target") + } +} + +// extractHostPort parses "http://127.0.0.1:PORT/" and returns "127.0.0.1:PORT". +func extractHostPort(rawURL string) string { + // Simple parse: strip "http://" prefix and trailing slash. + // The URL format is always "http://127.0.0.1:PORT/" in our usage. + if len(rawURL) > 7 { + return rawURL[7 : len(rawURL)-1] + } + return rawURL +} + +// newA2AClientForHost creates an http.Client that redirects all connections +// to the given host:port. This lets us mock the agent endpoint without +// running a real HTTP server. +func newA2AClientForHost(targetHost string) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("tcp", targetHost) + }, + ResponseHeaderTimeout: 180 * time.Second, + }, + } +} diff --git a/workspace-server/internal/handlers/delegation_ledger_integration_test.go b/workspace-server/internal/handlers/delegation_ledger_integration_test.go index 78c0a874..524ccadf 100644 --- a/workspace-server/internal/handlers/delegation_ledger_integration_test.go +++ b/workspace-server/internal/handlers/delegation_ledger_integration_test.go @@ -39,6 +39,7 @@ import ( "os" "strings" "testing" + "time" mdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" _ "github.com/lib/pq" @@ -64,12 +65,16 @@ func integrationDB(t *testing.T) *sql.DB { if err != nil { t.Fatalf("open: %v", err) } - if err := conn.Ping(); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := conn.PingContext(ctx); err != nil { t.Fatalf("ping: %v", err) } // Each test gets a fresh table state — fail loud if cleanup fails so // a bad test doesn't pollute the next one. - if _, err := conn.ExecContext(context.Background(), `DELETE FROM delegations`); err != nil { + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + if _, err := conn.ExecContext(ctx2, `DELETE FROM delegations`); err != nil { t.Fatalf("cleanup: %v", err) } // Wire the package-level db.DB so production helpers (recordLedgerInsert, diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index b2d1c93a..0d0b58fe 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -5,10 +5,8 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "net/http/httptest" - "sync" "testing" "time" @@ -958,316 +956,3 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) { t.Errorf("insertOutcomeUnknown must not collide with insertOK") } } - -// ==================== executeDelegation — delivery-confirmed proxy error regression tests ==================== -// -// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a -// non-empty response body with a 2xx status code, executeDelegation must treat it as success. -// The error is a delivery/transport error (e.g., connection reset after response was received). -// Previously, executeDelegation marked these as "failed" even though the work was done, -// causing retry storms and "error" rendering in canvas despite the response being available. -// -// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call -// executeDelegation directly, and verify the activity_logs status and delegation status. - -const testDelegationID = "del-159-test" -const testSourceID = "ws-source-159" -const testTargetID = "ws-target-159" - -// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that -// executeDelegation always makes, regardless of outcome. -func expectExecuteDelegationBase(mock sqlmock.Sqlmock) { - // updateDelegationStatus: dispatched - // Uses prefix match — sqlmock regexes match the full query string. - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("dispatched", "", testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // CanCommunicate: source != target → fires two getWorkspaceRef lookups. - // Both test fixtures have parent_id = NULL (root-level siblings) → allowed. - // Order matches call order: source first, then target. - mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id"). - WithArgs(testSourceID). - WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testSourceID, nil)) - mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id"). - WithArgs(testTargetID). - WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testTargetID, nil)) - - // resolveAgentURL: reads ws:{id}:url from Redis, falls back to DB for target - mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id = "). - WithArgs(testTargetID). - WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "online")) -} - -// expectExecuteDelegationSuccess sets up expectations for a completed delegation. -func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) { - // INSERT activity_logs for delegation completion (response_body status = 'completed') - mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "completed"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // updateDelegationStatus: completed - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("completed", "", testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) -} - -// expectExecuteDelegationFailed sets up expectations for a failed delegation. -func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) { - // INSERT activity_logs for delegation failure (response_body status = 'failed') - mock.ExpectExec("INSERT INTO activity_logs"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "failed"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // updateDelegationStatus: failed - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) -} - -// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression -// test for issue #159. The scenario: -// - Attempt 1: server sends 200 OK headers + partial body, then closes connection. -// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, , BadGateway). -// isTransientProxyError(BadGateway) = TRUE → retry. -// - Attempt 2: server does the same thing (closes after partial body). -// proxyA2ARequest: same (200, , BadGateway). -// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon, -// or we get one more attempt). For the test we let it run. -// POST-FIX: the executeDelegation new condition sees status=200, body=, err!=nil -// and routes to handleSuccess immediately. -// -// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded) -// even when the server sent 200, so the condition always failed. Post-fix, status=200 is -// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody). -// In this test the retry ultimately succeeds (server eventually sends full body), but -// the critical assertion is that a 2xx partial-body delivery-confirmed response is never -// classified as "failed" — it always routes to success. -func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - // Server that sends a 200 response with declared Content-Length but closes - // the connection before sending all bytes. Go's http.Client sees io.EOF on - // the body read. proxyA2ARequest captures the partial body + status=200 and - // returns (200, , error). executeDelegation's new condition sees - // status=200 + body > 0 + error != nil → routes to handleSuccess. - var wg sync.WaitGroup - wg.Add(1) - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } - defer ln.Close() - go func() { - defer wg.Done() - conn, err := ln.Accept() - if err != nil { - return - } - defer conn.Close() - // Consume the HTTP request - buf := make([]byte, 2048) - conn.Read(buf) - // Send 200 OK with Content-Length: 100 but only 74 bytes of body - // (less than declared length → io.LimitReader returns io.EOF after reading all 74) - resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" - resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes - conn.Write([]byte(resp)) - // Close immediately — client gets io.EOF on body read - }() - - agentURL := "http://" + ln.Addr().String() - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL) - allowLoopbackForTest(t) - - expectExecuteDelegationBase(mock) - expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) - - // Execute synchronously (not as a goroutine) so we can check DB state immediately. - // The handler fires it as goroutine; we call it directly for deterministic testing. - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", - "id": "1", - "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) // let DB writes settle - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -} - -// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure -// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx -// status code (e.g., 500 Internal Server Error with partial body read before connection drop). -// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure. -func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - // Server returns 500 with declared Content-Length but closes connection early. - // proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error. - // Returns (500, , BadGateway). - // New condition: status=500 is NOT >= 200 && < 300 → routes to failure. - // isTransientProxyError(500) = false → no retry. - var wg sync.WaitGroup - wg.Add(1) - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("failed to listen: %v", err) - } - defer ln.Close() - go func() { - defer wg.Done() - conn, err := ln.Accept() - if err != nil { - return - } - defer conn.Close() - buf := make([]byte, 2048) - conn.Read(buf) - // 500 with Content-Length: 100 but only ~60 bytes of body - resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n" - resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared - conn.Write([]byte(resp)) - // Close immediately — client gets io.EOF on body read - }() - - agentURL := "http://" + ln.Addr().String() - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL) - allowLoopbackForTest(t) - - expectExecuteDelegationBase(mock) - expectExecuteDelegationFailed(mock) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -} - -// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure -// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body. -// The new condition requires len(respBody) > 0, so empty body routes to failure. -func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - // Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil. - // New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300 - // → len(respBody) == 0 → condition FALSE → falls through to failure. - // isTransientProxyError(502) is TRUE → retry → same result → failure. - agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadGateway) - // No body — connection closes normally - })) - defer agentServer.Close() - - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL) - allowLoopbackForTest(t) - - // First attempt: updateDelegationStatus(dispatched) — from expectExecuteDelegationBase - expectExecuteDelegationBase(mock) - // Second attempt (retry): updateDelegationStatus(dispatched) again - mock.ExpectExec("UPDATE activity_logs SET status"). - WithArgs("dispatched", "", testSourceID, testDelegationID). - WillReturnResult(sqlmock.NewResult(0, 1)) - // Failure: INSERT + UPDATE (failed) - expectExecuteDelegationFailed(mock) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -} - -// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response -// (no error, 200 with body) is unaffected by the new condition. This is the baseline: -// proxyErr == nil so the new condition never fires. -func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { - mock := setupTestDB(t) - mr := setupTestRedis(t) - allowLoopbackForTest(t) - - broadcaster := newTestBroadcaster() - wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - dh := NewDelegationHandler(wh, broadcaster) - - agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) - })) - defer agentServer.Close() - - mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL) - allowLoopbackForTest(t) - - expectExecuteDelegationBase(mock) - expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`) - - a2aBody, _ := json.Marshal(map[string]interface{}{ - "jsonrpc": "2.0", "id": "1", "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]string{{"type": "text", "text": "do work"}}, - }, - }, - }) - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - time.Sleep(100 * time.Millisecond) - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) - } -}