fix(handlers): use raw TCP listener instead of httptest.Server

All previous approaches (plain httptest.Server, raw TCP with io.Copy,
httptest+Hijack) produced a consistent 2-minute timeout in CI.
Analysis of httptest.Server revealed a subtle goroutine ordering
dependency: the server reads the request body into a buffer before
calling the handler, but the client's request-body writer goroutine
waits for response headers before sending the body. The handler must
return (sending headers) before the client's body writer can complete.
This creates a potential race where the connection is closed while the
client is still writing.

The raw TCP approach eliminates all HTTP library goroutines:
- net.Listen("tcp", "127.0.0.1:0") binds an ephemeral port
- Accept in a goroutine, handle one connection
- Read headers using a 2-second deadline (enough for client to send)
- Send response immediately, close connection
- a2aClient DialContext intercepts all dials and redirects to our port

Key insight: set a Read deadline (not ReadAll to EOF) so the server
proceeds to send the response without waiting for the body. The kernel
discards unread buffered body bytes on close — harmless.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-12 14:24:59 +00:00
parent 173339013f
commit 463fd23797

View File

@ -12,11 +12,12 @@
//
// How HTTP is mocked
// -----------------
// a2aClient is a package-level var so tests can reassign it. Each test
// creates an httptest.Server (same-process, same-host) and redirects
// a2aClient's Transport to point at it. Same-process HTTP has no DNS, no
// TCP handshake overhead, and no network partition risk. The httptest.Server
// is started BEFORE a2aClient is updated so every request hits a live server.
// We use raw TCP listeners (net.Listener) instead of httptest.Server to avoid
// any HTTP-library-level goroutine complexity. The test opens a TCP port,
// serves one HTTP response, then closes the connection. The a2aClient transport
// is overridden with a DialContext that intercepts all dials and redirects to
// the test server's port. No DNS, no TCP handshake overhead, no HTTP library
// goroutines that could block on request-body reads.
//
// Run with:
//
@ -40,8 +41,6 @@ import (
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
@ -56,6 +55,102 @@ const testDelegationID = "del-159-test-integration"
const testSourceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
const testTargetID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
// rawHTTPServer starts a TCP listener, serves one HTTP response, and closes.
// It runs in a background goroutine so the test can proceed immediately after
// returning the server URL. The server URL (e.g. "http://127.0.0.1:<port>/")
// is suitable for caching in Redis and passing to executeDelegation.
//
// The server reads HTTP headers (which carry Content-Length) using a short
// deadline, then immediately sends the response. This prevents deadlock where
// io.Copy(io.Discard, conn) would wait for EOF (client waiting for headers
// before sending body → server waiting for body before sending response).
func rawHTTPServer(t *testing.T, statusCode int, body string) (serverURL string, closeFn func()) {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("rawHTTPServer listen: %v", err)
}
port := ln.Addr().(*net.TCPAddr).Port
serverURL = "http://127.0.0.1:" + itoa(port) + "/"
connCh := make(chan net.Conn, 1)
go func() {
conn, err := ln.Accept()
if err != nil {
return
}
connCh <- conn
}()
closeFn = func() {
ln.Close()
}
// Handle in background so we don't block test execution.
// Strategy: read HTTP headers using a 2-second deadline (enough for the
// client to send headers + a small body). After deadline fires, send
// the response. The kernel discards any unread buffered body bytes
// when the connection closes — harmless.
go func() {
conn := <-connCh
if conn == nil {
return
}
defer conn.Close()
// Read headers with deadline. After 2s, Read returns with whatever
// bytes have arrived (headers are always sent first by the HTTP client).
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
headerBuf := make([]byte, 4096)
for {
n, err := conn.Read(headerBuf)
if n > 0 {
_ = headerBuf[:n] // headers consumed; not used
}
if err != nil {
// Deadline exceeded (most likely) — headers have arrived.
break
}
}
// Send response immediately — don't wait for remaining body bytes.
resp := buildHTTPResponse(statusCode, body)
conn.Write(resp) //nolint:errcheck
}()
return serverURL, closeFn
}
// itoa is an inline integer-to-string helper (avoids importing strconv in tests).
func itoa(n int) string {
if n == 0 {
return "0"
}
if n < 0 {
return "-" + itoa(-n)
}
digits := []byte{}
for n > 0 {
digits = append([]byte{byte('0' + n%10)}, digits...)
n /= 10
}
return string(digits)
}
// buildHTTPResponse constructs a minimal HTTP/1.1 response.
func buildHTTPResponse(statusCode int, body string) []byte {
statusText := http.StatusText(statusCode)
if statusText == "" {
statusText = "Unknown"
}
header := "HTTP/1.1 " + itoa(statusCode) + " " + statusText + "\r\n" +
"Content-Type: application/json\r\n" +
"Content-Length: " + itoa(len(body)) + "\r\n" +
"Connection: close\r\n" +
"\r\n"
return []byte(header + body)
}
// setupIntegrationFixtures inserts the rows executeDelegation requires:
// - workspaces: source and target (siblings, parent_id=NULL so CanCommunicate=true)
// - activity_logs: the 'delegate' row that updateDelegationStatus UPDATE will find
@ -142,34 +237,18 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Create test server with 200 response.
// NOTE: Do NOT read r.Body here. httptest.Server reads the full request
// body into an in-memory buffer before calling the handler — r.Body is
// already populated. Reading it here would not block in theory, but
// omitting the drain avoids any subtle goroutine lifetime issues.
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"result":{"parts":[{"text":"work completed successfully"}]}}`))
}))
defer ts.Close()
agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, ts.URL)
db.CacheURL(context.Background(), testTargetID, agentURL)
// Override a2aClient so requests go to the test server (same-process).
// Override a2aClient so requests go to our raw TCP server.
// Extract host:port from agentURL and dial that directly.
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
u, _ := url.Parse(ts.URL)
a2aClient = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", u.Host)
},
ResponseHeaderTimeout: 180 * time.Second,
},
}
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -212,28 +291,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(`{"error":"agent crashed"}`))
}))
defer ts.Close()
agentURL, closeServer := rawHTTPServer(t, 500, `{"error":"agent crashed"}`)
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, ts.URL)
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
u, _ := url.Parse(ts.URL)
a2aClient = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", u.Host)
},
ResponseHeaderTimeout: 180 * time.Second,
},
}
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -271,28 +338,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
// No body — the handler sends a 200 with Content-Length: 0.
}))
defer ts.Close()
agentURL, closeServer := rawHTTPServer(t, 200, "")
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, ts.URL)
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
u, _ := url.Parse(ts.URL)
a2aClient = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", u.Host)
},
ResponseHeaderTimeout: 180 * time.Second,
},
}
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -329,28 +384,16 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer ts.Close()
agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"all good"}]}}`)
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, ts.URL)
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
u, _ := url.Parse(ts.URL)
a2aClient = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", u.Host)
},
ResponseHeaderTimeout: 180 * time.Second,
},
}
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -420,3 +463,27 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) {
t.Error("error_detail should be set on failure due to unreachable target")
}
}
// extractHostPort parses "http://127.0.0.1:PORT/" and returns "127.0.0.1:PORT".
func extractHostPort(rawURL string) string {
// Simple parse: strip "http://" prefix and trailing slash.
// The URL format is always "http://127.0.0.1:PORT/" in our usage.
if len(rawURL) > 7 {
return rawURL[7 : len(rawURL)-1]
}
return rawURL
}
// newA2AClientForHost creates an http.Client that redirects all connections
// to the given host:port. This lets us mock the agent endpoint without
// running a real HTTP server.
func newA2AClientForHost(targetHost string) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", targetHost)
},
ResponseHeaderTimeout: 180 * time.Second,
},
}
}