diff --git a/workspace-server/internal/handlers/delegation_executor_integration_test.go b/workspace-server/internal/handlers/delegation_executor_integration_test.go index 838f282f..b265ab83 100644 --- a/workspace-server/internal/handlers/delegation_executor_integration_test.go +++ b/workspace-server/internal/handlers/delegation_executor_integration_test.go @@ -12,11 +12,12 @@ // // How HTTP is mocked // ----------------- -// a2aClient is a package-level var so tests can reassign it. Each test -// creates an httptest.Server (same-process, same-host) and redirects -// a2aClient's Transport to point at it. Same-process HTTP has no DNS, no -// TCP handshake overhead, and no network partition risk. The httptest.Server -// is started BEFORE a2aClient is updated so every request hits a live server. +// We use raw TCP listeners (net.Listener) instead of httptest.Server to avoid +// any HTTP-library-level goroutine complexity. The test opens a TCP port, +// serves one HTTP response, then closes the connection. The a2aClient transport +// is overridden with a DialContext that intercepts all dials and redirects to +// the test server's port. No DNS, no TCP handshake overhead, no HTTP library +// goroutines that could block on request-body reads. // // Run with: // @@ -40,8 +41,6 @@ import ( "encoding/json" "net" "net/http" - "net/http/httptest" - "net/url" "testing" "time" @@ -56,6 +55,102 @@ const testDelegationID = "del-159-test-integration" const testSourceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" const testTargetID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" +// rawHTTPServer starts a TCP listener, serves one HTTP response, and closes. +// It runs in a background goroutine so the test can proceed immediately after +// returning the server URL. The server URL (e.g. "http://127.0.0.1:/") +// is suitable for caching in Redis and passing to executeDelegation. +// +// The server reads HTTP headers (which carry Content-Length) using a short +// deadline, then immediately sends the response. This prevents deadlock where +// io.Copy(io.Discard, conn) would wait for EOF (client waiting for headers +// before sending body → server waiting for body before sending response). +func rawHTTPServer(t *testing.T, statusCode int, body string) (serverURL string, closeFn func()) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("rawHTTPServer listen: %v", err) + } + port := ln.Addr().(*net.TCPAddr).Port + serverURL = "http://127.0.0.1:" + itoa(port) + "/" + + connCh := make(chan net.Conn, 1) + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + connCh <- conn + }() + + closeFn = func() { + ln.Close() + } + + // Handle in background so we don't block test execution. + // Strategy: read HTTP headers using a 2-second deadline (enough for the + // client to send headers + a small body). After deadline fires, send + // the response. The kernel discards any unread buffered body bytes + // when the connection closes — harmless. + go func() { + conn := <-connCh + if conn == nil { + return + } + defer conn.Close() + + // Read headers with deadline. After 2s, Read returns with whatever + // bytes have arrived (headers are always sent first by the HTTP client). + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + headerBuf := make([]byte, 4096) + for { + n, err := conn.Read(headerBuf) + if n > 0 { + _ = headerBuf[:n] // headers consumed; not used + } + if err != nil { + // Deadline exceeded (most likely) — headers have arrived. + break + } + } + + // Send response immediately — don't wait for remaining body bytes. + resp := buildHTTPResponse(statusCode, body) + conn.Write(resp) //nolint:errcheck + }() + + return serverURL, closeFn +} + +// itoa is an inline integer-to-string helper (avoids importing strconv in tests). +func itoa(n int) string { + if n == 0 { + return "0" + } + if n < 0 { + return "-" + itoa(-n) + } + digits := []byte{} + for n > 0 { + digits = append([]byte{byte('0' + n%10)}, digits...) + n /= 10 + } + return string(digits) +} + +// buildHTTPResponse constructs a minimal HTTP/1.1 response. +func buildHTTPResponse(statusCode int, body string) []byte { + statusText := http.StatusText(statusCode) + if statusText == "" { + statusText = "Unknown" + } + header := "HTTP/1.1 " + itoa(statusCode) + " " + statusText + "\r\n" + + "Content-Type: application/json\r\n" + + "Content-Length: " + itoa(len(body)) + "\r\n" + + "Connection: close\r\n" + + "\r\n" + return []byte(header + body) +} + // setupIntegrationFixtures inserts the rows executeDelegation requires: // - workspaces: source and target (siblings, parent_id=NULL so CanCommunicate=true) // - activity_logs: the 'delegate' row that updateDelegationStatus UPDATE will find @@ -142,34 +237,18 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - // Create test server with 200 response. - // NOTE: Do NOT read r.Body here. httptest.Server reads the full request - // body into an in-memory buffer before calling the handler — r.Body is - // already populated. Reading it here would not block in theory, but - // omitting the drain avoids any subtle goroutine lifetime issues. - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"result":{"parts":[{"text":"work completed successfully"}]}}`)) - })) - defer ts.Close() + agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"work completed successfully"}]}}`) + defer closeServer() mr := setupTestRedis(t) defer mr.Close() - db.CacheURL(context.Background(), testTargetID, ts.URL) + db.CacheURL(context.Background(), testTargetID, agentURL) - // Override a2aClient so requests go to the test server (same-process). + // Override a2aClient so requests go to our raw TCP server. + // Extract host:port from agentURL and dial that directly. prevClient := a2aClient defer func() { a2aClient = prevClient }() - u, _ := url.Parse(ts.URL) - a2aClient = &http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("tcp", u.Host) - }, - ResponseHeaderTimeout: 180 * time.Second, - }, - } + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) @@ -212,28 +291,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(`{"error":"agent crashed"}`)) - })) - defer ts.Close() + agentURL, closeServer := rawHTTPServer(t, 500, `{"error":"agent crashed"}`) + defer closeServer() mr := setupTestRedis(t) defer mr.Close() - db.CacheURL(context.Background(), testTargetID, ts.URL) + db.CacheURL(context.Background(), testTargetID, agentURL) prevClient := a2aClient defer func() { a2aClient = prevClient }() - u, _ := url.Parse(ts.URL) - a2aClient = &http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("tcp", u.Host) - }, - ResponseHeaderTimeout: 180 * time.Second, - }, - } + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) @@ -271,28 +338,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - // No body — the handler sends a 200 with Content-Length: 0. - })) - defer ts.Close() + agentURL, closeServer := rawHTTPServer(t, 200, "") + defer closeServer() mr := setupTestRedis(t) defer mr.Close() - db.CacheURL(context.Background(), testTargetID, ts.URL) + db.CacheURL(context.Background(), testTargetID, agentURL) prevClient := a2aClient defer func() { a2aClient = prevClient }() - u, _ := url.Parse(ts.URL) - a2aClient = &http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("tcp", u.Host) - }, - ResponseHeaderTimeout: 180 * time.Second, - }, - } + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) @@ -329,28 +384,16 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T defer cleanup() t.Setenv("DELEGATION_LEDGER_WRITE", "1") - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`)) - })) - defer ts.Close() + agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"all good"}]}}`) + defer closeServer() mr := setupTestRedis(t) defer mr.Close() - db.CacheURL(context.Background(), testTargetID, ts.URL) + db.CacheURL(context.Background(), testTargetID, agentURL) prevClient := a2aClient defer func() { a2aClient = prevClient }() - u, _ := url.Parse(ts.URL) - a2aClient = &http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("tcp", u.Host) - }, - ResponseHeaderTimeout: 180 * time.Second, - }, - } + a2aClient = newA2AClientForHost(extractHostPort(agentURL)) broadcaster := newTestBroadcaster() wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) @@ -420,3 +463,27 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) { t.Error("error_detail should be set on failure due to unreachable target") } } + +// extractHostPort parses "http://127.0.0.1:PORT/" and returns "127.0.0.1:PORT". +func extractHostPort(rawURL string) string { + // Simple parse: strip "http://" prefix and trailing slash. + // The URL format is always "http://127.0.0.1:PORT/" in our usage. + if len(rawURL) > 7 { + return rawURL[7 : len(rawURL)-1] + } + return rawURL +} + +// newA2AClientForHost creates an http.Client that redirects all connections +// to the given host:port. This lets us mock the agent endpoint without +// running a real HTTP server. +func newA2AClientForHost(targetHost string) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("tcp", targetHost) + }, + ResponseHeaderTimeout: 180 * time.Second, + }, + } +}