Merge pull request #1725 from Molecule-AI/fix/platform-go-ci-tests

fix(handlers): unblock Platform (Go) CI — sqlmock budget-check + test loopback
This commit is contained in:
Hongming Wang 2026-04-22 20:03:06 -07:00 committed by GitHub
commit 64e4c7b661
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 88 additions and 14 deletions

View File

@ -22,11 +22,13 @@ import (
func TestProxyA2A_InvalidJSON(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
// Cache a URL so the handler doesn't fall back to DB
mr.Set(fmt.Sprintf("ws:%s:url", "ws-badjson"), "http://localhost:9999")
expectBudgetCheck(mock, "ws-badjson")
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -59,6 +61,7 @@ func TestProxyA2A_InvalidJSON(t *testing.T) {
func TestProxyA2A_AlreadyWrappedJSONRPC(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -73,6 +76,7 @@ func TestProxyA2A_AlreadyWrappedJSONRPC(t *testing.T) {
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-wrapped"), agentServer.URL)
expectBudgetCheck(mock, "ws-wrapped")
// Expect async activity log
mock.ExpectExec("INSERT INTO activity_logs").
@ -114,6 +118,7 @@ func TestProxyA2A_AlreadyWrappedJSONRPC(t *testing.T) {
func TestProxyA2A_DBLookupFallback(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t) // empty Redis — no cached URL
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -124,6 +129,9 @@ func TestProxyA2A_DBLookupFallback(t *testing.T) {
}))
defer agentServer.Close()
// Budget check runs first (before URL resolution)
expectBudgetCheck(mock, "ws-db-fallback")
// Redis miss → DB lookup → returns URL
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id =").
WithArgs("ws-db-fallback").
@ -162,6 +170,9 @@ func TestProxyA2A_DBLookupError(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
// Budget check runs first (before URL resolution)
expectBudgetCheck(mock, "ws-dberr")
// Redis miss → DB lookup → error
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id =").
WithArgs("ws-dberr").
@ -191,6 +202,7 @@ func TestProxyA2A_DBLookupError(t *testing.T) {
func TestProxyA2A_AgentReturnsError(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -202,6 +214,7 @@ func TestProxyA2A_AgentReturnsError(t *testing.T) {
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-agent-err"), agentServer.URL)
expectBudgetCheck(mock, "ws-agent-err")
// Expect async activity log (with "error" status since agent returned 500)
mock.ExpectExec("INSERT INTO activity_logs").
@ -234,6 +247,7 @@ func TestProxyA2A_AgentReturnsError(t *testing.T) {
func TestProxyA2A_MessageIDInjected(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -246,6 +260,7 @@ func TestProxyA2A_MessageIDInjected(t *testing.T) {
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-msgid"), agentServer.URL)
expectBudgetCheck(mock, "ws-msgid")
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -284,6 +299,7 @@ func TestProxyA2A_MessageIDInjected(t *testing.T) {
func TestProxyA2A_CallerIDPropagated(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -303,6 +319,8 @@ func TestProxyA2A_CallerIDPropagated(t *testing.T) {
WithArgs("ws-target").
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow("ws-target", "ws-parent"))
expectBudgetCheck(mock, "ws-target")
// Expect activity log with source_id set
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -377,6 +395,7 @@ func TestProxyA2A_AccessDenied_DifferentParents(t *testing.T) {
func TestProxyA2A_AllowedSelf_SkipsAccessCheck(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -386,6 +405,7 @@ func TestProxyA2A_AllowedSelf_SkipsAccessCheck(t *testing.T) {
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-self"), agentServer.URL)
expectBudgetCheck(mock, "ws-self")
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
@ -659,6 +679,7 @@ func TestProxyA2AError_BusyShape(t *testing.T) {
func TestProxyA2A_BodyReadFailure_DeliveryConfirmed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@ -687,6 +708,7 @@ func TestProxyA2A_BodyReadFailure_DeliveryConfirmed(t *testing.T) {
wsID := "ws-bodyreadfail"
mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL)
expectBudgetCheck(mock, wsID)
// Expect async activity log INSERT (logA2ASuccess is called because
// delivery_confirmed is true and the handler detected a 2xx status).
@ -941,14 +963,18 @@ func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) {
func TestResolveAgentURL_CacheHit(t *testing.T) {
setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mr.Set("ws:ws-cached:url", "http://cached.example/a2a")
// Use loopback IP (unlocked by allowLoopbackForTest) so isSafeURL passes —
// cached.example does not resolve and would trip the DNS guard.
cached := "http://127.0.0.1:9999/a2a"
mr.Set("ws:ws-cached:url", cached)
url, perr := handler.resolveAgentURL(context.Background(), "ws-cached")
if perr != nil {
t.Fatalf("unexpected error: %+v", perr)
}
if url != "http://cached.example/a2a" {
if url != cached {
t.Errorf("got %q, want cached URL", url)
}
}
@ -956,21 +982,24 @@ func TestResolveAgentURL_CacheHit(t *testing.T) {
func TestResolveAgentURL_CacheMissDBHit(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// Use loopback IP (unlocked by allowLoopbackForTest) so isSafeURL passes.
dbURL := "http://127.0.0.1:9998"
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id =").
WithArgs("ws-dbhit").
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("http://dbhit.example", "online"))
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow(dbURL, "online"))
url, perr := handler.resolveAgentURL(context.Background(), "ws-dbhit")
if perr != nil {
t.Fatalf("unexpected error: %+v", perr)
}
if url != "http://dbhit.example" {
t.Errorf("got %q, want http://dbhit.example", url)
if url != dbURL {
t.Errorf("got %q, want %q", url, dbURL)
}
// Verify cached now
if v, err := mr.Get("ws:ws-dbhit:url"); err != nil || v != "http://dbhit.example" {
if v, err := mr.Get("ws:ws-dbhit:url"); err != nil || v != dbURL {
t.Errorf("expected Redis cache populated; got v=%q err=%v", v, err)
}
}
@ -1020,6 +1049,7 @@ func TestResolveAgentURL_DockerRewrite(t *testing.T) {
// covered by TestResolveAgentURL_DockerRewrite_NilProvisionerNoRewrite.
mr := setupTestRedis(t)
setupTestDB(t)
allowLoopbackForTest(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mr.Set("ws:ws-dock:url", "http://127.0.0.1:55555")

View File

@ -182,9 +182,9 @@ func TestAdminMemories_Import_Success(t *testing.T) {
WithArgs("ws-uuid-1", sqlmock.AnyArg(), "LOCAL").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
// Insert succeeds.
// Insert succeeds. Handler uses 4-arg INSERT when created_at is absent.
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-uuid-1", sqlmock.AnyArg(), "LOCAL", "general", sqlmock.AnyArg()).
WithArgs("ws-uuid-1", sqlmock.AnyArg(), "LOCAL", "general").
WillReturnResult(sqlmock.NewResult(1, 1))
w := adminPost(t, h, []map[string]interface{}{
@ -326,9 +326,10 @@ func TestAdminMemories_Import_RedactsSecretsBeforeDedup(t *testing.T) {
WithArgs("ws-uuid-1", redacted, "LOCAL").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
// Insert — receives the redacted content (not raw).
// Insert — receives the redacted content (not raw). Handler uses the
// 4-arg INSERT when created_at is absent from the payload.
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-uuid-1", redacted, "LOCAL", "general", sqlmock.AnyArg()).
WithArgs("ws-uuid-1", redacted, "LOCAL", "general").
WillReturnResult(sqlmock.NewResult(1, 1))
w := adminPost(t, h, []map[string]interface{}{

View File

@ -399,11 +399,13 @@ func TestProxyA2A_WorkspaceNoURL(t *testing.T) {
func TestProxyA2A_AgentUnreachable(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
// Point to an unreachable address
mr.Set(fmt.Sprintf("ws:%s:url", "ws-dead"), "http://127.0.0.1:1")
expectBudgetCheck(mock, "ws-dead")
// Expect workspace name query for error activity log
mock.ExpectQuery("SELECT name FROM workspaces WHERE id =").

View File

@ -55,6 +55,34 @@ func newTestBroadcaster() *events.Broadcaster {
return events.NewBroadcaster(hub)
}
// allowLoopbackForTest flips the ssrf.go testAllowLoopback escape hatch
// for the duration of the test, so httptest.NewServer's loopback URLs
// don't trip the SSRF guard. The 169.254 metadata, RFC-1918, TEST-NET,
// CGNAT, and link-local guards stay active — only 127.0.0.0/8 and ::1
// are relaxed. Always paired with t.Cleanup to restore; multiple
// parallel tests won't race because Go test flips it sequentially per
// test unless t.Parallel() is used, and these tests don't parallelize.
func allowLoopbackForTest(t *testing.T) {
t.Helper()
prev := testAllowLoopback
testAllowLoopback = true
t.Cleanup(func() { testAllowLoopback = prev })
}
// expectBudgetCheck adds the sqlmock expectation for the budget-check
// query that ProxyA2A runs before forwarding. checkWorkspaceBudget
// fails-open on sql.ErrNoRows, so we return a deliberately-empty
// result — budget_limit NULL + monthly_spend 0 means "no limit".
// All a2a_proxy_test.go tests that run ProxyA2A (not just
// dispatchA2A unit tests) need this expectation; it was added to the
// handler in the 2026-04-18 restructure but the tests never caught up,
// leaving Platform (Go) CI red for weeks.
func expectBudgetCheck(mock sqlmock.Sqlmock, workspaceID string) {
mock.ExpectQuery(`SELECT budget_limit, COALESCE\(monthly_spend, 0\) FROM workspaces WHERE id = \$1`).
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}))
}
// ---------- TestRegisterHandler ----------
func TestRegisterHandler(t *testing.T) {
@ -385,6 +413,7 @@ func TestWorkspaceList(t *testing.T) {
func TestProxyA2A_JSONRPCWrapping(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
@ -400,6 +429,7 @@ func TestProxyA2A_JSONRPCWrapping(t *testing.T) {
// Cache the agent URL in Redis so the handler finds it
mr.Set(fmt.Sprintf("ws:%s:url", "ws-proxy"), agentServer.URL)
expectBudgetCheck(mock, "ws-proxy")
// Expect async activity log INSERT from the LogActivity goroutine
mock.ExpectExec("INSERT INTO activity_logs").

View File

@ -30,7 +30,7 @@ func isSafeURL(rawURL string) error {
return fmt.Errorf("empty hostname")
}
if ip := net.ParseIP(host); ip != nil {
if ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsInterfaceLocalMulticast() {
if (ip.IsLoopback() && !testAllowLoopback) || ip.IsUnspecified() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsInterfaceLocalMulticast() {
return fmt.Errorf("forbidden loopback/unspecified/link-local IP: %s", ip)
}
if isPrivateOrMetadataIP(ip) {
@ -50,7 +50,7 @@ func isSafeURL(rawURL string) error {
if ip == nil {
continue
}
if ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsInterfaceLocalMulticast() {
if (ip.IsLoopback() && !testAllowLoopback) || ip.IsUnspecified() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsInterfaceLocalMulticast() {
return fmt.Errorf("hostname %s resolves to forbidden link-local/loopback IP: %s", host, ip)
}
if isPrivateOrMetadataIP(ip) {
@ -60,6 +60,16 @@ func isSafeURL(rawURL string) error {
return nil
}
// testAllowLoopback is a test-only escape hatch. When true, isSafeURL
// accepts 127.0.0.0/8 and ::1 so unit tests that stub workspace URLs
// with httptest.NewServer (which binds to loopback) can reach their
// own mock backends. Flipped via allowLoopbackForTest(t) in tests —
// never set in production code paths.
//
// The 169.254 metadata, RFC-1918, TEST-NET, CGNAT, and link-local
// guards are NOT relaxed by this flag — only loopback.
var testAllowLoopback = false
// isPrivateOrMetadataIP returns true for IPs that must not be reached via A2A.
//
// Always blocked (both modes):
@ -106,8 +116,9 @@ func isPrivateOrMetadataIP(ip net.IP) bool {
}
// IPv6 path — .To4() was nil so this is a real v6 address.
// ::1 (loopback) — treat as blocked here too for defense-in-depth.
if ip.IsLoopback() {
// ::1 (loopback) — treat as blocked here too for defense-in-depth,
// unless tests have opted into loopback via testAllowLoopback.
if ip.IsLoopback() && !testAllowLoopback {
return true
}
// Link-local fe80::/10 — always blocked.