fix(handlers): raw TCP mock server with proper request-body drain

Abandon httptest+Hijack — it has two fundamental problems for this use case:

1. Buffered-writer loss: httptest's Hijack() discards the buffered writer,
   losing any bytes written via w.WriteHeader/w.Write that weren't already
   flushed to the raw conn. The HTTP client never receives response headers,
   blocking on ResponseHeaderTimeout=180s (the 2m8s hang).

2. Request-read deadlock: Go's httptest server keeps a read goroutine waiting
   for the request body after the handler returns. Calling Hijack() while that
   goroutine is still waiting causes a deadlock with the client's request-body
   writer.

Fix: use raw TCP with net.Listener directly. The server:
  1. Accepts one connection.
  2. Reads HTTP request headers (blank line terminates).
  3. Drains Content-Length bytes from the connection (prevents broken-pipe on
     client request-body writer when we close).
  4. Writes raw HTTP response directly to the raw conn (no buffered writer).
  5. Brief sleep so client reads headers+body before FIN fires.
  6. Close() sends FIN → client Read() returns io.EOF.

Also add allowLoopbackForTest() to each test so the SSRF guard permits
127.0.0.1 mock server URLs (same pattern as a2a_proxy_test.go).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-12 13:19:19 +00:00
parent 56fd24d339
commit 668abce81e

View File

@ -26,13 +26,17 @@
package handlers
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/textproto"
"strings"
"sync"
"testing"
"time"
@ -135,45 +139,88 @@ func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail
return status, prev.String, errDet.String
}
// mockAgentWithPartialBody creates an httptest.Server that:
// - Hijacks the raw TCP connection after the HTTP parser consumes request
// headers (so r.Body is still in-flight from the client).
// - Writes raw HTTP response bytes directly to the raw conn so they bypass
// httptest's buffered writer (which Hijack() discards, losing any unflushed
// data that was written via w.WriteHeader/w.Write).
// - Closes the raw conn while the client is mid-read on the body.
// rawTCPMockServer starts a raw TCP listener. It returns the server URL,
// a wait function, and a done function.
//
// Critical insight: httptest's Hijack() discards the buffered writer, which
// contains any bytes written via w.WriteHeader/w.Write that weren't flushed to
// the raw TCP conn. This means the HTTP client NEVER receives the response
// headers (blocked by ResponseHeaderTimeout = 3 minutes).
// Fix: write the raw HTTP response directly to the raw conn AFTER Hijack(),
// bypassing the buffered writer entirely.
func mockAgentWithPartialBody(t *testing.T, statusCode int, declaredLength int, actualBody string) *httptest.Server {
// The server handles ONE connection then stops listening. On that connection:
// 1. Read HTTP request headers (stop at blank line).
// 2. DRAIN the request body (Content-Length bytes) in the background so the
// client doesn't hit a broken-pipe when we close the connection.
// 3. Write raw HTTP response (headers + partial body) directly to the raw conn.
// 4. Close the raw conn immediately.
//
// This avoids httptest's buffered writer + Hijack() deadlock: with raw TCP, we
// fully control when the response is written and when the connection is closed,
// and we drain the request body so the client can finish its write cleanly.
func rawTCPMockServer(t *testing.T, statusCode int, declaredLength int, actualBody string) (url string, wait, cleanup func()) {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Do NOT touch r.Body — server manages it.
if hj, ok := w.(http.Hijacker); ok {
conn, bufWriter, _ := hj.Hijack()
if conn != nil {
// Close the buffered writer (no-op on already-flushed data).
if bw, ok := bufWriter.(io.Closer); ok {
bw.Close()
}
// Write raw HTTP response bytes DIRECTLY to the raw conn.
// Bypasses httptest's buffered writer so Hijack() can't lose them.
// Content-Length > actualBody: server announces more bytes than it
// sends before closing — simulates a mid-stream connection drop.
header := fmt.Sprintf("HTTP/1.1 %d %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n",
statusCode, http.StatusText(statusCode), declaredLength)
conn.Write([]byte(header)) //nolint:errcheck
conn.Write([]byte(actualBody)) //nolint:errcheck
// Brief delay so client reads headers + partial body before close.
time.Sleep(50 * time.Millisecond)
conn.Close() // FIN/RST → client's next Read() errors
}
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
url = "http://" + ln.Addr().String()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
conn, err := ln.Accept()
ln.Close() // stop listening; we only handle one connection
if err != nil {
return
}
}))
defer conn.Close()
// Read HTTP request line + headers.
reader := bufio.NewReader(conn)
reqLine, _ := reader.ReadString('\n')
_ = reqLine // we don't care about the request line
// Read headers.
tp := textproto.NewReader(reader)
headers := make(textproto.MIMEHeader)
for {
line, err := tp.ReadLine()
if err != nil {
return
}
if line == "" {
break // blank line: end of headers
}
k, v, _ := strings.Cut(line, ": ")
headers.Set(k, v)
}
// Drain the request body so the client can finish sending it.
// Without this, closing the conn while the client is mid-write causes
// a broken-pipe error on the client side (request writer goroutine hangs).
if cl := headers.Get("Content-Length"); cl != "" {
var n int
fmt.Sscanf(cl, "%d", &n)
io.Copy(io.Discard, io.LimitReader(conn, int64(n))) //nolint:errcheck
}
// Write raw HTTP response directly to the raw conn.
// This bypasses httptest's buffered writer entirely.
statusText := http.StatusText(statusCode)
resp := fmt.Sprintf(
"HTTP/1.1 %d %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n%s",
statusCode, statusText, declaredLength, actualBody,
)
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
conn.Write([]byte(resp)) //nolint:errcheck
// Brief pause so the client kernel TCP buffer drains before we close.
// The client reads the response headers + partial body in this window.
time.Sleep(50 * time.Millisecond)
conn.Close() // sends FIN; client's Read() returns io.EOF
close(done)
}()
return url, func() { wg.Wait() }, func() {
conn, err := net.DialTimeout("tcp", ln.Addr().String(), 100*time.Millisecond)
if err == nil {
conn.Close()
}
}
}
// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess
@ -188,19 +235,19 @@ func mockAgentWithPartialBody(t *testing.T, statusCode int, declaredLength int,
// Here we verify the ledger row landed at 'completed' with the response body
// as result_preview.
func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Mock server: 200 OK, Content-Length:100 declared, only 74 bytes of body sent,
// then connection closed. httptest.Server + Hijack ensures the HTTP parser has
// consumed the headers before we close, and buf.Flush() ensures data is in the
// kernel TCP send buffer before Close().
ts := mockAgentWithPartialBody(t, 200, 100, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
defer ts.Close()
// Raw TCP mock: Content-Length:100 declared, 74 bytes sent, then close.
// The server drains the request body before writing the response so the
// client doesn't get a broken-pipe on its request-body write.
url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 100, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
defer serverCleanup()
mr := setupIntegrationRedis(t, ts.URL)
mr := setupIntegrationRedis(t, url)
defer mr.Close()
broadcaster := newTestBroadcaster()
@ -219,18 +266,13 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
// Wait for goroutine + DB writes to settle.
time.Sleep(500 * time.Millisecond)
serverWait()
status, preview, errDet := readDelegationRow(t, conn)
if status != "completed" {
t.Errorf("status: want completed, got %q", status)
}
if preview == "" {
// The response body should land as result_preview.
// Note: the partial body "work completed successfully" is what was read
// before the connection dropped.
t.Logf("result_preview (partial body expected): %q", preview)
}
if errDet != "" {
@ -243,18 +285,18 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
// not success. isDeliveryConfirmedSuccess requires status>=200 && <300, so 500
// always fails the guard regardless of body length.
func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Mock server: 500, Content-Length:100 declared, ~24 bytes of body sent.
ts := mockAgentWithPartialBody(t, 500, 100, `{"error":"agent crashed"}`)
defer ts.Close()
url, serverWait, serverCleanup := rawTCPMockServer(t, 500, 100, `{"error":"agent crashed"}`)
defer serverCleanup()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, ts.URL)
db.CacheURL(context.Background(), testTargetID, url)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -270,7 +312,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
serverWait()
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
@ -286,22 +328,18 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
// routes to failure. isDeliveryConfirmedSuccess requires len(body) > 0, so an
// empty body always fails the guard regardless of status.
func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Mock server: 200, Content-Length:0 declared, no body sent, then close.
// mockAgentWithPartialBody with empty actualBody writes no body but still
// sets Content-Length: 0 in the headers, flushes them, then hijacks and
// closes the connection. The HTTP client sees headers + empty body, then
// a connection-drop → Read() error.
ts := mockAgentWithPartialBody(t, 200, 0, "")
defer ts.Close()
url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 0, "")
defer serverCleanup()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, ts.URL)
db.CacheURL(context.Background(), testTargetID, url)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -317,7 +355,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
serverWait()
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
@ -333,20 +371,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
// This was always the behavior; the integration test confirms executeDelegation
// correctly records the ledger entry on the happy path.
func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Use httptest.Server for the clean success case (no connection drop).
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer ts.Close()
url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 36, `{"result":{"parts":[{"text":"all good"}]}}`)
defer serverCleanup()
mr := setupIntegrationRedis(t, ts.URL)
mr := setupIntegrationRedis(t, url)
defer mr.Close()
broadcaster := newTestBroadcaster()
@ -363,14 +397,13 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
serverWait()
status, preview, errDet := readDelegationRow(t, conn)
if status != "completed" {
t.Errorf("status: want completed, got %q", status)
}
if preview == "" {
// result_preview should carry the response body
t.Logf("result_preview: %q", preview)
}
if errDet != "" {
@ -407,7 +440,7 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) {
})
// No URL available — delegation should fail gracefully (target unreachable).
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
// No serverWait() needed — the server was never started.
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {