fix(handlers): raw TCP mock server with proper request-body drain
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 10s
sop-checklist / all-items-acked (pull_request) [soft-fail tier:low] acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: 2
qa-review / approved (pull_request) Failing after 10s
Harness Replays / detect-changes (pull_request) Successful in 12s
security-review / approved (pull_request) Failing after 10s
sop-checklist-gate / gate (pull_request) Successful in 10s
CI / Detect changes (pull_request) Successful in 15s
E2E API Smoke Test / detect-changes (pull_request) Successful in 17s
sop-tier-check / tier-check (pull_request) Successful in 11s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 17s
gate-check-v3 / gate-check (pull_request) Successful in 17s
Harness Replays / Harness Replays (pull_request) Successful in 4s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 19s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 18s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 3s
CI / Canvas (Next.js) (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 4s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m4s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 2m10s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 2m13s
CI / Platform (Go) (pull_request) Failing after 4m6s
CI / all-required (pull_request) Failing after 1s
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 10s
sop-checklist / all-items-acked (pull_request) [soft-fail tier:low] acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: 2
qa-review / approved (pull_request) Failing after 10s
Harness Replays / detect-changes (pull_request) Successful in 12s
security-review / approved (pull_request) Failing after 10s
sop-checklist-gate / gate (pull_request) Successful in 10s
CI / Detect changes (pull_request) Successful in 15s
E2E API Smoke Test / detect-changes (pull_request) Successful in 17s
sop-tier-check / tier-check (pull_request) Successful in 11s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 17s
gate-check-v3 / gate-check (pull_request) Successful in 17s
Harness Replays / Harness Replays (pull_request) Successful in 4s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 19s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 18s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 3s
CI / Canvas (Next.js) (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 4s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m4s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 2m10s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 2m13s
CI / Platform (Go) (pull_request) Failing after 4m6s
CI / all-required (pull_request) Failing after 1s
Abandon httptest+Hijack — it has two fundamental problems for this use case:
1. Buffered-writer loss: httptest's Hijack() discards the buffered writer,
losing any bytes written via w.WriteHeader/w.Write that weren't already
flushed to the raw conn. The HTTP client never receives response headers,
blocking on ResponseHeaderTimeout=180s (the 2m8s hang).
2. Request-read deadlock: Go's httptest server keeps a read goroutine waiting
for the request body after the handler returns. Calling Hijack() while that
goroutine is still waiting causes a deadlock with the client's request-body
writer.
Fix: use raw TCP with net.Listener directly. The server:
1. Accepts one connection.
2. Reads HTTP request headers (blank line terminates).
3. Drains Content-Length bytes from the connection (prevents broken-pipe on
client request-body writer when we close).
4. Writes raw HTTP response directly to the raw conn (no buffered writer).
5. Brief sleep so client reads headers+body before FIN fires.
6. Close() sends FIN → client Read() returns io.EOF.
Also add allowLoopbackForTest() to each test so the SSRF guard permits
127.0.0.1 mock server URLs (same pattern as a2a_proxy_test.go).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
2198b874bf
commit
4530b67336
@ -26,13 +26,17 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/textproto"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -135,45 +139,88 @@ func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail
|
||||
return status, prev.String, errDet.String
|
||||
}
|
||||
|
||||
// mockAgentWithPartialBody creates an httptest.Server that:
|
||||
// - Hijacks the raw TCP connection after the HTTP parser consumes request
|
||||
// headers (so r.Body is still in-flight from the client).
|
||||
// - Writes raw HTTP response bytes directly to the raw conn so they bypass
|
||||
// httptest's buffered writer (which Hijack() discards, losing any unflushed
|
||||
// data that was written via w.WriteHeader/w.Write).
|
||||
// - Closes the raw conn while the client is mid-read on the body.
|
||||
// rawTCPMockServer starts a raw TCP listener. It returns the server URL,
|
||||
// a wait function, and a done function.
|
||||
//
|
||||
// Critical insight: httptest's Hijack() discards the buffered writer, which
|
||||
// contains any bytes written via w.WriteHeader/w.Write that weren't flushed to
|
||||
// the raw TCP conn. This means the HTTP client NEVER receives the response
|
||||
// headers (blocked by ResponseHeaderTimeout = 3 minutes).
|
||||
// Fix: write the raw HTTP response directly to the raw conn AFTER Hijack(),
|
||||
// bypassing the buffered writer entirely.
|
||||
func mockAgentWithPartialBody(t *testing.T, statusCode int, declaredLength int, actualBody string) *httptest.Server {
|
||||
// The server handles ONE connection then stops listening. On that connection:
|
||||
// 1. Read HTTP request headers (stop at blank line).
|
||||
// 2. DRAIN the request body (Content-Length bytes) in the background so the
|
||||
// client doesn't hit a broken-pipe when we close the connection.
|
||||
// 3. Write raw HTTP response (headers + partial body) directly to the raw conn.
|
||||
// 4. Close the raw conn immediately.
|
||||
//
|
||||
// This avoids httptest's buffered writer + Hijack() deadlock: with raw TCP, we
|
||||
// fully control when the response is written and when the connection is closed,
|
||||
// and we drain the request body so the client can finish its write cleanly.
|
||||
func rawTCPMockServer(t *testing.T, statusCode int, declaredLength int, actualBody string) (url string, wait, cleanup func()) {
|
||||
t.Helper()
|
||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Do NOT touch r.Body — server manages it.
|
||||
if hj, ok := w.(http.Hijacker); ok {
|
||||
conn, bufWriter, _ := hj.Hijack()
|
||||
if conn != nil {
|
||||
// Close the buffered writer (no-op on already-flushed data).
|
||||
if bw, ok := bufWriter.(io.Closer); ok {
|
||||
bw.Close()
|
||||
}
|
||||
// Write raw HTTP response bytes DIRECTLY to the raw conn.
|
||||
// Bypasses httptest's buffered writer so Hijack() can't lose them.
|
||||
// Content-Length > actualBody: server announces more bytes than it
|
||||
// sends before closing — simulates a mid-stream connection drop.
|
||||
header := fmt.Sprintf("HTTP/1.1 %d %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n",
|
||||
statusCode, http.StatusText(statusCode), declaredLength)
|
||||
conn.Write([]byte(header)) //nolint:errcheck
|
||||
conn.Write([]byte(actualBody)) //nolint:errcheck
|
||||
// Brief delay so client reads headers + partial body before close.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
conn.Close() // FIN/RST → client's next Read() errors
|
||||
}
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("listen: %v", err)
|
||||
}
|
||||
url = "http://" + ln.Addr().String()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
conn, err := ln.Accept()
|
||||
ln.Close() // stop listening; we only handle one connection
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}))
|
||||
defer conn.Close()
|
||||
|
||||
// Read HTTP request line + headers.
|
||||
reader := bufio.NewReader(conn)
|
||||
reqLine, _ := reader.ReadString('\n')
|
||||
_ = reqLine // we don't care about the request line
|
||||
|
||||
// Read headers.
|
||||
tp := textproto.NewReader(reader)
|
||||
headers := make(textproto.MIMEHeader)
|
||||
for {
|
||||
line, err := tp.ReadLine()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if line == "" {
|
||||
break // blank line: end of headers
|
||||
}
|
||||
k, v, _ := strings.Cut(line, ": ")
|
||||
headers.Set(k, v)
|
||||
}
|
||||
|
||||
// Drain the request body so the client can finish sending it.
|
||||
// Without this, closing the conn while the client is mid-write causes
|
||||
// a broken-pipe error on the client side (request writer goroutine hangs).
|
||||
if cl := headers.Get("Content-Length"); cl != "" {
|
||||
var n int
|
||||
fmt.Sscanf(cl, "%d", &n)
|
||||
io.Copy(io.Discard, io.LimitReader(conn, int64(n))) //nolint:errcheck
|
||||
}
|
||||
|
||||
// Write raw HTTP response directly to the raw conn.
|
||||
// This bypasses httptest's buffered writer entirely.
|
||||
statusText := http.StatusText(statusCode)
|
||||
resp := fmt.Sprintf(
|
||||
"HTTP/1.1 %d %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n%s",
|
||||
statusCode, statusText, declaredLength, actualBody,
|
||||
)
|
||||
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
conn.Write([]byte(resp)) //nolint:errcheck
|
||||
|
||||
// Brief pause so the client kernel TCP buffer drains before we close.
|
||||
// The client reads the response headers + partial body in this window.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
conn.Close() // sends FIN; client's Read() returns io.EOF
|
||||
close(done)
|
||||
}()
|
||||
return url, func() { wg.Wait() }, func() {
|
||||
conn, err := net.DialTimeout("tcp", ln.Addr().String(), 100*time.Millisecond)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess
|
||||
@ -188,19 +235,19 @@ func mockAgentWithPartialBody(t *testing.T, statusCode int, declaredLength int,
|
||||
// Here we verify the ledger row landed at 'completed' with the response body
|
||||
// as result_preview.
|
||||
func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
|
||||
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
|
||||
conn := integrationDB(t)
|
||||
cleanup := setupIntegrationFixtures(t, conn)
|
||||
defer cleanup()
|
||||
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
|
||||
|
||||
// Mock server: 200 OK, Content-Length:100 declared, only 74 bytes of body sent,
|
||||
// then connection closed. httptest.Server + Hijack ensures the HTTP parser has
|
||||
// consumed the headers before we close, and buf.Flush() ensures data is in the
|
||||
// kernel TCP send buffer before Close().
|
||||
ts := mockAgentWithPartialBody(t, 200, 100, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
|
||||
defer ts.Close()
|
||||
// Raw TCP mock: Content-Length:100 declared, 74 bytes sent, then close.
|
||||
// The server drains the request body before writing the response so the
|
||||
// client doesn't get a broken-pipe on its request-body write.
|
||||
url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 100, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
|
||||
defer serverCleanup()
|
||||
|
||||
mr := setupIntegrationRedis(t, ts.URL)
|
||||
mr := setupIntegrationRedis(t, url)
|
||||
defer mr.Close()
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
@ -219,18 +266,13 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
|
||||
// Wait for goroutine + DB writes to settle.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
serverWait()
|
||||
|
||||
status, preview, errDet := readDelegationRow(t, conn)
|
||||
if status != "completed" {
|
||||
t.Errorf("status: want completed, got %q", status)
|
||||
}
|
||||
if preview == "" {
|
||||
// The response body should land as result_preview.
|
||||
// Note: the partial body "work completed successfully" is what was read
|
||||
// before the connection dropped.
|
||||
t.Logf("result_preview (partial body expected): %q", preview)
|
||||
}
|
||||
if errDet != "" {
|
||||
@ -243,18 +285,18 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
|
||||
// not success. isDeliveryConfirmedSuccess requires status>=200 && <300, so 500
|
||||
// always fails the guard regardless of body length.
|
||||
func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
|
||||
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
|
||||
conn := integrationDB(t)
|
||||
cleanup := setupIntegrationFixtures(t, conn)
|
||||
defer cleanup()
|
||||
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
|
||||
|
||||
// Mock server: 500, Content-Length:100 declared, ~24 bytes of body sent.
|
||||
ts := mockAgentWithPartialBody(t, 500, 100, `{"error":"agent crashed"}`)
|
||||
defer ts.Close()
|
||||
url, serverWait, serverCleanup := rawTCPMockServer(t, 500, 100, `{"error":"agent crashed"}`)
|
||||
defer serverCleanup()
|
||||
|
||||
mr := setupTestRedis(t)
|
||||
defer mr.Close()
|
||||
db.CacheURL(context.Background(), testTargetID, ts.URL)
|
||||
db.CacheURL(context.Background(), testTargetID, url)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
@ -270,7 +312,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
serverWait()
|
||||
|
||||
status, _, errDet := readDelegationRow(t, conn)
|
||||
if status != "failed" {
|
||||
@ -286,22 +328,18 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
|
||||
// routes to failure. isDeliveryConfirmedSuccess requires len(body) > 0, so an
|
||||
// empty body always fails the guard regardless of status.
|
||||
func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
|
||||
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
|
||||
conn := integrationDB(t)
|
||||
cleanup := setupIntegrationFixtures(t, conn)
|
||||
defer cleanup()
|
||||
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
|
||||
|
||||
// Mock server: 200, Content-Length:0 declared, no body sent, then close.
|
||||
// mockAgentWithPartialBody with empty actualBody writes no body but still
|
||||
// sets Content-Length: 0 in the headers, flushes them, then hijacks and
|
||||
// closes the connection. The HTTP client sees headers + empty body, then
|
||||
// a connection-drop → Read() error.
|
||||
ts := mockAgentWithPartialBody(t, 200, 0, "")
|
||||
defer ts.Close()
|
||||
url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 0, "")
|
||||
defer serverCleanup()
|
||||
|
||||
mr := setupTestRedis(t)
|
||||
defer mr.Close()
|
||||
db.CacheURL(context.Background(), testTargetID, ts.URL)
|
||||
db.CacheURL(context.Background(), testTargetID, url)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
@ -317,7 +355,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
serverWait()
|
||||
|
||||
status, _, errDet := readDelegationRow(t, conn)
|
||||
if status != "failed" {
|
||||
@ -333,20 +371,16 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
|
||||
// This was always the behavior; the integration test confirms executeDelegation
|
||||
// correctly records the ledger entry on the happy path.
|
||||
func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
|
||||
allowLoopbackForTest(t) // raw TCP mock uses 127.0.0.1; SSRF guard must permit it
|
||||
conn := integrationDB(t)
|
||||
cleanup := setupIntegrationFixtures(t, conn)
|
||||
defer cleanup()
|
||||
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
|
||||
|
||||
// Use httptest.Server for the clean success case (no connection drop).
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
url, serverWait, serverCleanup := rawTCPMockServer(t, 200, 36, `{"result":{"parts":[{"text":"all good"}]}}`)
|
||||
defer serverCleanup()
|
||||
|
||||
mr := setupIntegrationRedis(t, ts.URL)
|
||||
mr := setupIntegrationRedis(t, url)
|
||||
defer mr.Close()
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
@ -363,14 +397,13 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
serverWait()
|
||||
|
||||
status, preview, errDet := readDelegationRow(t, conn)
|
||||
if status != "completed" {
|
||||
t.Errorf("status: want completed, got %q", status)
|
||||
}
|
||||
if preview == "" {
|
||||
// result_preview should carry the response body
|
||||
t.Logf("result_preview: %q", preview)
|
||||
}
|
||||
if errDet != "" {
|
||||
@ -407,7 +440,7 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) {
|
||||
})
|
||||
// No URL available — delegation should fail gracefully (target unreachable).
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
// No serverWait() needed — the server was never started.
|
||||
|
||||
status, _, errDet := readDelegationRow(t, conn)
|
||||
if status != "failed" {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user