diff --git a/workspace-server/internal/handlers/delegation_executor_integration_test.go b/workspace-server/internal/handlers/delegation_executor_integration_test.go index 2874f37b..6edbd885 100644 --- a/workspace-server/internal/handlers/delegation_executor_integration_test.go +++ b/workspace-server/internal/handlers/delegation_executor_integration_test.go @@ -26,13 +26,17 @@ package handlers import ( + "bufio" "context" "database/sql" "encoding/json" "fmt" "io" + "net" "net/http" - "net/http/httptest" + "net/textproto" + "strings" + "sync" "testing" "time" @@ -135,45 +139,88 @@ func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail return status, prev.String, errDet.String } -// mockAgentWithPartialBody creates an httptest.Server that: -// - Hijacks the raw TCP connection after the HTTP parser consumes request -// headers (so r.Body is still in-flight from the client). -// - Writes raw HTTP response bytes directly to the raw conn so they bypass -// httptest's buffered writer (which Hijack() discards, losing any unflushed -// data that was written via w.WriteHeader/w.Write). -// - Closes the raw conn while the client is mid-read on the body. +// rawTCPMockServer starts a raw TCP listener. It returns the server URL, +// a wait function, and a done function. // -// Critical insight: httptest's Hijack() discards the buffered writer, which -// contains any bytes written via w.WriteHeader/w.Write that weren't flushed to -// the raw TCP conn. This means the HTTP client NEVER receives the response -// headers (blocked by ResponseHeaderTimeout = 3 minutes). -// Fix: write the raw HTTP response directly to the raw conn AFTER Hijack(), -// bypassing the buffered writer entirely. -func mockAgentWithPartialBody(t *testing.T, statusCode int, declaredLength int, actualBody string) *httptest.Server { +// The server handles ONE connection then stops listening. On that connection: +// 1. Read HTTP request headers (stop at blank line). +// 2. DRAIN the request body (Content-Length bytes) in the background so the +// client doesn't hit a broken-pipe when we close the connection. +// 3. Write raw HTTP response (headers + partial body) directly to the raw conn. +// 4. Close the raw conn immediately. +// +// This avoids httptest's buffered writer + Hijack() deadlock: with raw TCP, we +// fully control when the response is written and when the connection is closed, +// and we drain the request body so the client can finish its write cleanly. +func rawTCPMockServer(t *testing.T, statusCode int, declaredLength int, actualBody string) (url string, wait, cleanup func()) { t.Helper() - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Do NOT touch r.Body — server manages it. - if hj, ok := w.(http.Hijacker); ok { - conn, bufWriter, _ := hj.Hijack() - if conn != nil { - // Close the buffered writer (no-op on already-flushed data). - if bw, ok := bufWriter.(io.Closer); ok { - bw.Close() - } - // Write raw HTTP response bytes DIRECTLY to the raw conn. - // Bypasses httptest's buffered writer so Hijack() can't lose them. - // Content-Length > actualBody: server announces more bytes than it - // sends before closing — simulates a mid-stream connection drop. - header := fmt.Sprintf("HTTP/1.1 %d %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n", - statusCode, http.StatusText(statusCode), declaredLength) - conn.Write([]byte(header)) //nolint:errcheck - conn.Write([]byte(actualBody)) //nolint:errcheck - // Brief delay so client reads headers + partial body before close. - time.Sleep(50 * time.Millisecond) - conn.Close() // FIN/RST → client's next Read() errors - } + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + url = "http://" + ln.Addr().String() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + conn, err := ln.Accept() + ln.Close() // stop listening; we only handle one connection + if err != nil { + return } - })) + defer conn.Close() + + // Read HTTP request line + headers. + reader := bufio.NewReader(conn) + reqLine, _ := reader.ReadString('\n') + _ = reqLine // we don't care about the request line + + // Read headers. + tp := textproto.NewReader(reader) + headers := make(textproto.MIMEHeader) + for { + line, err := tp.ReadLine() + if err != nil { + return + } + if line == "" { + break // blank line: end of headers + } + k, v, _ := strings.Cut(line, ": ") + headers.Set(k, v) + } + + // Drain the request body so the client can finish sending it. + // Without this, closing the conn while the client is mid-write causes + // a broken-pipe error on the client side (request writer goroutine hangs). + if cl := headers.Get("Content-Length"); cl != "" { + var n int + fmt.Sscanf(cl, "%d", &n) + io.Copy(io.Discard, io.LimitReader(conn, int64(n))) //nolint:errcheck + } + + // Write raw HTTP response directly to the raw conn. + // This bypasses httptest's buffered writer entirely. + statusText := http.StatusText(statusCode) + resp := fmt.Sprintf( + "HTTP/1.1 %d %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n%s", + statusCode, statusText, declaredLength, actualBody, + ) + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + conn.Write([]byte(resp)) //nolint:errcheck + + // Brief pause so the client kernel TCP buffer drains before we close. + // The client reads the response headers + partial body in this window. + time.Sleep(50 * time.Millisecond) + conn.Close() // sends FIN; client's Read() returns io.EOF + close(done) + }() + return url, func() { wg.Wait() }, func() { + conn, err := net.DialTimeout("tcp", ln.Addr().String(), 100*time.Millisecond) + if err == nil { + conn.Close() + } + } } // TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess @@ -188,19 +235,19 @@ func mockAgentWithPartialBody(t *testing.T, statusCode int, declaredLength int, // Here we verify the ledger row landed at 'completed' with the response body // as result_preview. func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) { + allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it conn := integrationDB(t) cleanup := setupIntegrationFixtures(t, conn) defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - // Mock server: 200 OK, Content-Length:100 declared, only 74 bytes of body sent, - // then connection closed. httptest.Server + Hijack ensures the HTTP parser has - // consumed the headers before we close, and buf.Flush() ensures data is in the - // kernel TCP send buffer before Close(). - ts := mockAgentWithPartialBody(t, 200, 100, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) - defer ts.Close() + // Raw TCP mock: Content-Length:100 declared, 74 bytes sent, then close. + // The server drains the request body before writing the response so the + // client doesn't get a broken-pipe on its request-body write. + url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 100, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) + defer serverCleanup() - mr := setupIntegrationRedis(t, ts.URL) + mr := setupIntegrationRedis(t, url) defer mr.Close() broadcaster := newTestBroadcaster() @@ -219,18 +266,13 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce }, }) dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - - // Wait for goroutine + DB writes to settle. - time.Sleep(500 * time.Millisecond) + serverWait() status, preview, errDet := readDelegationRow(t, conn) if status != "completed" { t.Errorf("status: want completed, got %q", status) } if preview == "" { - // The response body should land as result_preview. - // Note: the partial body "work completed successfully" is what was read - // before the connection dropped. t.Logf("result_preview (partial body expected): %q", preview) } if errDet != "" { @@ -243,18 +285,18 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce // not success. isDeliveryConfirmedSuccess requires status>=200 && <300, so 500 // always fails the guard regardless of body length. func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) { + allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it conn := integrationDB(t) cleanup := setupIntegrationFixtures(t, conn) defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - // Mock server: 500, Content-Length:100 declared, ~24 bytes of body sent. - ts := mockAgentWithPartialBody(t, 500, 100, `{"error":"agent crashed"}`) - defer ts.Close() + url, serverWait, serverCleanup := rawTCPMockServer(t, 500, 100, `{"error":"agent crashed"}`) + defer serverCleanup() mr := setupTestRedis(t) defer mr.Close() - db.CacheURL(context.Background(), testTargetID, ts.URL) + db.CacheURL(context.Background(), testTargetID, url) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) @@ -270,7 +312,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing }, }) dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - time.Sleep(500 * time.Millisecond) + serverWait() status, _, errDet := readDelegationRow(t, conn) if status != "failed" { @@ -286,22 +328,18 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing // routes to failure. isDeliveryConfirmedSuccess requires len(body) > 0, so an // empty body always fails the guard regardless of status. func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) { + allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it conn := integrationDB(t) cleanup := setupIntegrationFixtures(t, conn) defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - // Mock server: 200, Content-Length:0 declared, no body sent, then close. - // mockAgentWithPartialBody with empty actualBody writes no body but still - // sets Content-Length: 0 in the headers, flushes them, then hijacks and - // closes the connection. The HTTP client sees headers + empty body, then - // a connection-drop → Read() error. - ts := mockAgentWithPartialBody(t, 200, 0, "") - defer ts.Close() + url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 0, "") + defer serverCleanup() mr := setupTestRedis(t) defer mr.Close() - db.CacheURL(context.Background(), testTargetID, ts.URL) + db.CacheURL(context.Background(), testTargetID, url) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) @@ -317,7 +355,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test }, }) dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - time.Sleep(500 * time.Millisecond) + serverWait() status, _, errDet := readDelegationRow(t, conn) if status != "failed" { @@ -333,20 +371,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test // This was always the behavior; the integration test confirms executeDelegation // correctly records the ledger entry on the happy path. func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { + allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it conn := integrationDB(t) cleanup := setupIntegrationFixtures(t, conn) defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - // Use httptest.Server for the clean success case (no connection drop). - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) - })) - defer ts.Close() + url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 36, `{"result":{"parts":[{"text":"all good"}]}}`) + defer serverCleanup() - mr := setupIntegrationRedis(t, ts.URL) + mr := setupIntegrationRedis(t, url) defer mr.Close() broadcaster := newTestBroadcaster() @@ -363,14 +397,13 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T }, }) dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - time.Sleep(500 * time.Millisecond) + serverWait() status, preview, errDet := readDelegationRow(t, conn) if status != "completed" { t.Errorf("status: want completed, got %q", status) } if preview == "" { - // result_preview should carry the response body t.Logf("result_preview: %q", preview) } if errDet != "" { @@ -407,7 +440,7 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) { }) // No URL available — delegation should fail gracefully (target unreachable). dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) - time.Sleep(500 * time.Millisecond) + // No serverWait() needed — the server was never started. status, _, errDet := readDelegationRow(t, conn) if status != "failed" {