molecule-core/workspace-server/internal/handlers/a2a_proxy.go
rabbitblood ce52b67d62 fix(build): add missing fmt import to a2a_proxy.go
Build broken on main since d86b8fe — a2a_proxy.go uses fmt.Errorf()
(8 call sites) but the import was dropped during an isSafeURL refactor
merge. CI fails with "undefined: fmt" at lines 743-775.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 11:17:54 -07:00

875 lines
33 KiB
Go

package handlers
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// platformInDocker caches whether THIS process is running inside a
// Docker container. The a2a proxy uses this to decide whether stored
// agent URLs like "http://127.0.0.1:<ephemeral>" need to be rewritten
// to the Docker-DNS form "http://ws-<id>:8000". When the platform is
// on the host, 127.0.0.1 IS the host and the ephemeral-port URL works
// as-is; rewriting to container DNS would then break (host can't
// resolve Docker bridge hostnames).
//
// Detection: /.dockerenv is the canonical marker inside the default
// Docker runtime. MOLECULE_IN_DOCKER is an explicit override for
// environments where /.dockerenv is absent (Podman, custom runtimes).
// Accepts any value strconv.ParseBool recognises — 1, 0, t, f, T, F,
// true, false, TRUE, FALSE, True, False. Anything else (including
// "yes"/"on") is treated as unset and falls through to the /.dockerenv
// check.
//
// Exposed as a var (not a const) so tests can toggle it via
// setPlatformInDockerForTest without fiddling with real filesystem
// markers or env vars. Production callers never mutate it.
var platformInDocker = detectPlatformInDocker()
func detectPlatformInDocker() bool {
if v := os.Getenv("MOLECULE_IN_DOCKER"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
return b
}
}
if _, err := os.Stat("/.dockerenv"); err == nil {
return true
}
return false
}
// setPlatformInDockerForTest overrides platformInDocker for the duration of
// a test and returns a function to restore the previous value. Use with
// defer in *_test.go only.
func setPlatformInDockerForTest(v bool) func() {
prev := platformInDocker
platformInDocker = v
return func() { platformInDocker = prev }
}
// maxProxyRequestBody is the maximum size of an A2A proxy request body (1MB).
const maxProxyRequestBody = 1 << 20
// systemCallerPrefixes are caller IDs that bypass workspace access control.
// These are non-workspace internal callers (webhooks, system services, tests).
var systemCallerPrefixes = []string{"webhook:", "system:", "test:", "channel:"}
// isSystemCaller returns true if callerID is a non-workspace internal caller.
func isSystemCaller(callerID string) bool {
for _, prefix := range systemCallerPrefixes {
if strings.HasPrefix(callerID, prefix) {
return true
}
}
return false
}
// maxProxyResponseBody is the maximum size of an A2A proxy response body (10MB).
const maxProxyResponseBody = 10 << 20
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
// No client-level timeout — timeouts are enforced per-request via context deadlines:
// canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap).
var a2aClient = &http.Client{
Timeout: 60 * time.Second, // Safety net for when context deadlines are missing
}
type proxyA2AError struct {
Status int
Response gin.H
// Optional response headers (e.g. Retry-After on 503-busy). Kept separate
// from Response so the handler can set real HTTP headers, not just JSON.
Headers map[string]string
}
// busyRetryAfterSeconds is the Retry-After hint returned with 503-busy
// responses when an upstream workspace agent is overloaded (single-threaded
// mid-synthesis). Chosen to be long enough for typical PM synthesis work
// to complete but short enough that a caller's retry loop won't stall
// coordination. See issue #110.
const busyRetryAfterSeconds = 30
// isUpstreamBusyError classifies an http.Client.Do error as a transient
// "upstream busy" condition — a timeout or connection-reset while the
// container is still alive. Distinguishes legitimate busy-agent failures
// from fatal network errors so callers can retry with Retry-After.
func isUpstreamBusyError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
// url.Error wraps "read tcp … EOF" and "Post …: context deadline
// exceeded" strings from the stdlib HTTP client without typing the
// inner cause. Fall back to substring match for those.
msg := err.Error()
return strings.Contains(msg, "context deadline exceeded") ||
strings.Contains(msg, "EOF") ||
strings.Contains(msg, "connection reset")
}
func (e *proxyA2AError) Error() string {
if e == nil || e.Response == nil {
return "proxy a2a error"
}
if msg, ok := e.Response["error"].(string); ok && msg != "" {
return msg
}
return "proxy a2a error"
}
// ProxyA2ARequest is the public wrapper for proxyA2ARequest, used by the
// cron scheduler and other internal callers that need to send A2A messages
// to workspaces programmatically (not from an HTTP handler).
func (h *WorkspaceHandler) ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
status, resp, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, logActivity)
if proxyErr != nil {
return status, resp, proxyErr
}
return status, resp, nil
}
// ProxyA2A handles POST /workspaces/:id/a2a
// Proxies A2A JSON-RPC requests from the canvas to workspace agents,
// avoiding CORS and Docker network issues.
func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
// X-Timeout: caller-specified timeout in seconds (0 = no timeout).
// Overrides the default canvas (5 min) / agent (30 min) timeouts.
if tStr := c.GetHeader("X-Timeout"); tStr != "" {
if tSec, err := strconv.Atoi(tStr); err == nil && tSec > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(tSec)*time.Second)
defer cancel()
}
// tSec == 0 means no timeout — use the raw context (no deadline)
}
// Read the incoming request body (capped at 1MB)
body, err := io.ReadAll(io.LimitReader(c.Request.Body, maxProxyRequestBody))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"})
return
}
callerID := c.GetHeader("X-Workspace-ID")
// #761 SECURITY: reject requests where the client-supplied X-Workspace-ID
// contains a system-caller prefix. isSystemCaller() bypasses both token
// validation and CanCommunicate. On the public /a2a endpoint, system-caller
// semantics only apply to callerIDs set by trusted server-side code
// (ProxyA2ARequest), never to HTTP header values. Legitimate system callers
// (webhooks, scheduler, restart_context) call proxyA2ARequest directly and
// never go through this HTTP handler.
if isSystemCaller(callerID) {
log.Printf("security: system-caller prefix forge attempt — remote=%q header=%q",
c.ClientIP(), callerID)
c.JSON(http.StatusForbidden, gin.H{"error": "invalid caller ID"})
return
}
// Phase 30.5 — validate the caller's auth token when the caller IS
// a workspace (not canvas or a system caller). Canvas requests have
// no X-Workspace-ID so they bypass this check (the existing
// access-control layer already trusts them). System callers
// (webhook:* / system:* / test:*) only reach proxyA2ARequest via
// the server-side ProxyA2ARequest wrapper, never via this HTTP path.
//
// The bind is strict: the token must match `callerID`, not
// `workspaceID` (the target). A compromised token from workspace A
// must never authenticate calls from A pretending to be B.
if callerID != "" && callerID != workspaceID {
if err := validateCallerToken(ctx, c, callerID); err != nil {
return // response already written with 401
}
}
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true)
if proxyErr != nil {
for k, v := range proxyErr.Headers {
c.Header(k, v)
}
c.JSON(proxyErr.Status, proxyErr.Response)
return
}
c.Data(status, "application/json", respBody)
}
// checkWorkspaceBudget returns a proxyA2AError with 402 when the workspace
// has a budget_limit set and monthly_spend has reached or exceeded it.
// DB errors are logged and treated as fail-open — a budget check failure
// must not block legitimate A2A traffic.
func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID string) *proxyA2AError {
var budgetLimit sql.NullInt64
var monthlySpend int64
err := db.DB.QueryRowContext(ctx,
`SELECT budget_limit, COALESCE(monthly_spend, 0) FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&budgetLimit, &monthlySpend)
if err != nil {
if err != sql.ErrNoRows {
log.Printf("ProxyA2A: budget check failed for %s: %v", workspaceID, err)
}
return nil // fail-open
}
if budgetLimit.Valid && monthlySpend >= budgetLimit.Int64 {
log.Printf("ProxyA2A: budget exceeded for %s (spend=%d limit=%d)", workspaceID, monthlySpend, budgetLimit.Int64)
return &proxyA2AError{
Status: http.StatusPaymentRequired,
Response: gin.H{"error": "workspace budget limit exceeded"},
}
}
return nil
}
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, *proxyA2AError) {
// Access control: workspace-to-workspace requests must pass CanCommunicate check.
// Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*)
// are trusted. Self-calls (callerID == workspaceID) are always allowed.
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) {
if !registry.CanCommunicate(callerID, workspaceID) {
log.Printf("ProxyA2A: access denied %s → %s", callerID, workspaceID)
return 0, nil, &proxyA2AError{
Status: http.StatusForbidden,
Response: gin.H{"error": "access denied: workspaces cannot communicate per hierarchy rules"},
}
}
}
// Budget enforcement: reject A2A calls when the workspace has exceeded its
// monthly spend ceiling. Checked after access control so unauthorized calls
// are rejected first (403 > 429 in the denial hierarchy). Fail-open on DB
// errors so a budget check failure never blocks legitimate traffic.
if proxyErr := h.checkWorkspaceBudget(ctx, workspaceID); proxyErr != nil {
return 0, nil, proxyErr
}
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
if proxyErr != nil {
return 0, nil, proxyErr
}
normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body)
if proxyErr != nil {
return 0, nil, proxyErr
}
body = normalizedBody
startTime := time.Now()
resp, cancelFwd, err := h.dispatchA2A(ctx, agentURL, body, callerID)
if cancelFwd != nil {
defer cancelFwd()
}
durationMs := int(time.Since(startTime).Milliseconds())
if err != nil {
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
}
defer func() { _ = resp.Body.Close() }()
// Read agent response (capped at 10MB).
// #689: Do() succeeded, which means the target received the request and sent
// back response headers — delivery is confirmed. The body couldn't be
// fully read (connection drop, timeout mid-stream). Surface
// delivery_confirmed so callers can distinguish "not delivered" from
// "delivered, but response body lost". When delivery is confirmed,
// log the activity as successful (delivery happened) rather than leaving
// a false "failed" entry in the audit trail.
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
if readErr != nil {
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
if logActivity && deliveryConfirmed {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
}
return 0, nil, &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{
"error": "failed to read agent response",
"delivery_confirmed": deliveryConfirmed,
},
}
}
if logActivity {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
}
// Track LLM token usage for cost transparency (#593).
// Fires in a detached goroutine so token accounting never adds latency
// to the critical A2A path.
go extractAndUpsertTokenUsage(context.WithoutCancel(ctx), workspaceID, respBody)
return resp.StatusCode, respBody, nil
}
// resolveAgentURL returns a routable URL for the target workspace agent. It
// checks the Redis URL cache first, then falls back to a DB lookup, caching
// the result on success. When the platform runs inside Docker, 127.0.0.1:<host
// port> is rewritten to the container's Docker-bridge hostname (host-side
// platforms keep the original URL because the bridge name wouldn't resolve).
func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID string) (string, *proxyA2AError) {
agentURL, err := db.GetCachedURL(ctx, workspaceID)
if err != nil {
var urlNullable sql.NullString
var status string
err := db.DB.QueryRowContext(ctx,
`SELECT url, status FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&urlNullable, &status)
if err == sql.ErrNoRows {
return "", &proxyA2AError{
Status: http.StatusNotFound,
Response: gin.H{"error": "workspace not found"},
}
}
if err != nil {
log.Printf("ProxyA2A lookup error: %v", err)
return "", &proxyA2AError{
Status: http.StatusInternalServerError,
Response: gin.H{"error": "lookup failed"},
}
}
if !urlNullable.Valid || urlNullable.String == "" {
// Auto-wake hibernated workspace on incoming A2A message (#711).
// Re-provision asynchronously and return 503 with a retry hint so
// the caller can retry once the workspace is back online (~10s).
if status == "hibernated" {
log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID)
go h.RestartByID(workspaceID)
return "", &proxyA2AError{
Status: http.StatusServiceUnavailable,
Headers: map[string]string{"Retry-After": "15"},
Response: gin.H{
"error": "workspace is waking from hibernation — retry in ~15 seconds",
"waking": true,
"retry_after": 15,
},
}
}
return "", &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace has no URL", "status": status},
}
}
agentURL = urlNullable.String
_ = db.CacheURL(ctx, workspaceID, agentURL)
}
// When the platform runs inside Docker, 127.0.0.1:{host_port} is
// unreachable (it's the platform container's own localhost, not the
// Docker host). Rewrite to the container's Docker-bridge hostname.
if strings.HasPrefix(agentURL, "http://127.0.0.1:") && h.provisioner != nil && platformInDocker {
agentURL = provisioner.InternalURL(workspaceID)
}
// SSRF defence: reject private/metadata URLs before making outbound call.
if err := isSafeURL(agentURL); err != nil {
log.Printf("ProxyA2A: unsafe URL for workspace %s: %v", workspaceID, err)
return "", &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{"error": "workspace URL is not publicly routable"},
}
}
return agentURL, nil
}
// normalizeA2APayload parses the incoming body, wraps it in a JSON-RPC 2.0
// envelope if absent, ensures params.message.messageId is set, and re-marshals
// to bytes. Also returns the A2A method name (for logging) extracted from the
// payload.
func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
var payload map[string]interface{}
if err := json.Unmarshal(body, &payload); err != nil {
return nil, "", &proxyA2AError{
Status: http.StatusBadRequest,
Response: gin.H{"error": "invalid JSON"},
}
}
// Wrap in JSON-RPC envelope if missing
if _, hasJSONRPC := payload["jsonrpc"]; !hasJSONRPC {
payload = map[string]interface{}{
"jsonrpc": "2.0",
"id": uuid.New().String(),
"method": payload["method"],
"params": payload["params"],
}
}
// Ensure params.message.messageId exists (required by a2a-sdk)
if params, ok := payload["params"].(map[string]interface{}); ok {
if msg, ok := params["message"].(map[string]interface{}); ok {
if _, hasID := msg["messageId"]; !hasID {
msg["messageId"] = uuid.New().String()
}
}
}
marshaledBody, marshalErr := json.Marshal(payload)
if marshalErr != nil {
return nil, "", &proxyA2AError{
Status: http.StatusInternalServerError,
Response: gin.H{"error": "failed to marshal request"},
}
}
var a2aMethod string
if m, ok := payload["method"].(string); ok {
a2aMethod = m
}
return marshaledBody, a2aMethod, nil
}
// dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation
// chains survive client disconnect (browser tab close). Default timeouts:
// canvas (callerID == "") = 5 min, agent-to-agent = 30 min. Callers can
// override via the X-Timeout header (applied to ctx upstream in ProxyA2A).
func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) {
forwardCtx := context.WithoutCancel(ctx)
var cancel context.CancelFunc
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
if callerID == "" {
forwardCtx, cancel = context.WithTimeout(forwardCtx, 5*time.Minute)
} else {
forwardCtx, cancel = context.WithTimeout(forwardCtx, 30*time.Minute)
}
}
req, err := http.NewRequestWithContext(forwardCtx, "POST", agentURL, bytes.NewReader(body))
if err != nil {
if cancel != nil {
cancel()
}
// Wrap the construction failure so the caller can distinguish it
// from an upstream Do() error and produce the correct 500 response.
return nil, nil, &proxyDispatchBuildError{err: err}
}
req.Header.Set("Content-Type", "application/json")
resp, doErr := a2aClient.Do(req)
return resp, cancel, doErr
}
// proxyDispatchBuildError is a sentinel wrapper for failures inside
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
type proxyDispatchBuildError struct{ err error }
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
// runs the reactive container-health check, and (when `logActivity` is true)
// schedules a detached LogActivity goroutine for the failed attempt.
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
// Build-time failure (couldn't even create the http.Request) — return
// a 500 without the reactive-health / busy-retry paths.
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
_ = buildErr
return 0, nil, &proxyA2AError{
Status: http.StatusInternalServerError,
Response: gin.H{"error": "failed to create proxy request"},
}
}
log.Printf("ProxyA2A forward error: %v", err)
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
if logActivity {
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
}
if containerDead {
return 0, nil, &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
}
}
// Container is alive but upstream Do() failed with a timeout/EOF-
// shaped error — the agent is most likely mid-synthesis on a
// previous request (single-threaded main loop). Surface as 503
// Busy with a Retry-After hint so callers can distinguish this
// from a real unreachable-agent (502) and retry with backoff.
// Issue #110.
if isUpstreamBusyError(err) {
return 0, nil, &proxyA2AError{
Status: http.StatusServiceUnavailable,
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
Response: gin.H{
"error": "workspace agent busy — retry after a short backoff",
"busy": true,
"retry_after": busyRetryAfterSeconds,
},
}
}
return 0, nil, &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{"error": "failed to reach workspace agent"},
}
}
// maybeMarkContainerDead runs the reactive health check after a forward error.
// If the workspace's Docker container is no longer running (and the workspace
// isn't external), it marks the workspace offline, clears Redis state,
// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true
// when the container was found dead.
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
var wsRuntime string
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
if h.provisioner == nil || wsRuntime == "external" {
return false
}
running, inspectErr := h.provisioner.IsRunning(ctx, workspaceID)
if inspectErr != nil {
// Transient Docker-daemon error (timeout, socket EOF, etc.). Post-
// #386, IsRunning returns (true, err) in this case — caller stays
// on the alive path and does not trigger a restart cascade. Log
// so the defect is visible without being destructive.
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
}
if running {
return false
}
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'offline', updated_at = now() WHERE id = $1 AND status NOT IN ('removed', 'provisioning')`, workspaceID); err != nil {
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{})
go h.RestartByID(workspaceID)
return true
}
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
// goroutine (the request context may already be done by the time it runs).
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
errMsg := err.Error()
var errWsName string
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
if errWsName == "" {
errWsName = workspaceID
}
summary := "A2A request to " + errWsName + " failed: " + errMsg
go func(parent context.Context) {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
DurationMs: &durationMs,
Status: "error",
ErrorDetail: &errMsg,
})
}(ctx)
}
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
// receive the reply without polling.
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
logStatus := "ok"
if statusCode >= 400 {
logStatus = "error"
}
var wsNameForLog string
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsNameForLog)
if wsNameForLog == "" {
wsNameForLog = workspaceID
}
// #817: track outbound activity on the CALLER so orchestrators can detect
// silent workspaces. Only update when callerID is a real workspace (not
// canvas, not a system caller) and the target returned 2xx/3xx.
if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 {
go func() {
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := db.DB.ExecContext(bgCtx,
`UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil {
log.Printf("last_outbound_at update failed for %s: %v", callerID, err)
}
}()
}
summary := a2aMethod + " → " + wsNameForLog
go func(parent context.Context) {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
DurationMs: &durationMs,
Status: logStatus,
})
}(ctx)
if callerID == "" && statusCode < 400 {
h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{
"response_body": json.RawMessage(respBody),
"method": a2aMethod,
"duration_ms": durationMs,
})
}
}
func nilIfEmpty(s string) *string {
if s == "" {
return nil
}
return &s
}
// validateCallerToken enforces the Phase 30.5 auth-token contract on the
// caller of an A2A proxy request. Same lazy-bootstrap shape as
// registry.requireWorkspaceToken: if the caller workspace has any live
// token on file, the Authorization header is mandatory and must match;
// if the caller has zero live tokens, they're grandfathered through
// (their next /registry/register will mint their first token, after
// which this branch never fires again for them).
//
// On auth failure this writes the 401 via c and returns an error so the
// handler aborts without running the proxy.
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) error {
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
if err != nil {
// Fail-open here matches the heartbeat path — A2A caller auth is
// defense-in-depth on top of access-control hierarchy, not the
// sole gate on the secret material. A DB hiccup shouldn't take
// the whole A2A path down.
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, err)
return nil
}
if !hasLive {
return nil // legacy / pre-upgrade caller
}
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing caller auth token"})
return errInvalidCallerToken
}
if err := wsauth.ValidateToken(ctx, db.DB, callerID, tok); err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid caller auth token"})
return err
}
return nil
}
// errInvalidCallerToken is a sentinel for validateCallerToken's "missing
// token" branch so the handler-level guard can detect it without string
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// extractAndUpsertTokenUsage parses LLM usage from a raw A2A response body
// and persists it via upsertTokenUsage. Safe to call in a goroutine — logs
// errors but never panics. ctx must already be detached from the request.
func extractAndUpsertTokenUsage(ctx context.Context, workspaceID string, respBody []byte) {
in, out := parseUsageFromA2AResponse(respBody)
if in > 0 || out > 0 {
upsertTokenUsage(ctx, workspaceID, in, out)
}
}
// parseUsageFromA2AResponse extracts input_tokens / output_tokens from an A2A
// JSON-RPC response. Inspects two locations in order of preference:
// 1. result.usage — the JSON-RPC 2.0 result envelope from workspace agents.
// 2. usage — top-level, for non-JSON-RPC or direct Anthropic-shaped payloads.
//
// Returns (0, 0) when no recognisable usage data is found.
func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) {
if len(body) == 0 {
return 0, 0
}
var top map[string]json.RawMessage
if err := json.Unmarshal(body, &top); err != nil {
return 0, 0
}
// 1. result.usage (JSON-RPC 2.0 wrapper produced by workspace agents).
if rawResult, ok := top["result"]; ok {
var result map[string]json.RawMessage
if err := json.Unmarshal(rawResult, &result); err == nil {
if in, out, ok := readUsageMap(result); ok {
return in, out
}
}
}
// 2. Fallback: top-level usage (direct Anthropic or non-JSON-RPC response).
if in, out, ok := readUsageMap(top); ok {
return in, out
}
return 0, 0
}
// isSafeURL validates that a URL resolves to a publicly-routable address,
// preventing A2A requests from being redirected to internal/cloud-metadata
// infrastructure (SSRF, CWE-918). Workspace URLs come from DB/Redis caches
// so we validate before making any outbound HTTP call.
func isSafeURL(rawURL string) error {
u, err := url.Parse(rawURL)
if err != nil {
return fmt.Errorf("invalid URL: %w", err)
}
// Reject non-HTTP(S) schemes.
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("forbidden scheme: %s (only http/https allowed)", u.Scheme)
}
host := u.Hostname()
if host == "" {
return fmt.Errorf("empty hostname")
}
// Block direct IP addresses.
if ip := net.ParseIP(host); ip != nil {
if ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() {
return fmt.Errorf("forbidden loopback/unspecified IP: %s", ip)
}
if isPrivateOrMetadataIP(ip) {
return fmt.Errorf("forbidden private/metadata IP: %s", ip)
}
return nil
}
// For hostnames, resolve and validate each returned IP.
addrs, err := net.LookupHost(host)
if err != nil {
// DNS resolution failure — block it. Could be an internal hostname.
return fmt.Errorf("DNS resolution blocked for hostname: %s (%v)", host, err)
}
if len(addrs) == 0 {
return fmt.Errorf("DNS returned no addresses for: %s", host)
}
for _, addr := range addrs {
ip := net.ParseIP(addr)
if ip != nil && (ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() || isPrivateOrMetadataIP(ip)) {
return fmt.Errorf("hostname %s resolves to forbidden IP: %s", host, ip)
}
}
return nil
}
// isPrivateOrMetadataIP returns true for cloud-metadata / loopback / link-local
// ranges (always) and RFC-1918 / IPv6 ULA ranges (self-hosted only).
//
// In SaaS cross-EC2 mode (see saasMode() in registry.go) the tenant platform
// and its workspaces share a VPC, so workspaces register with their
// VPC-private IP — typically 172.31.x.x on AWS default VPCs. Blocking RFC-1918
// unconditionally would reject every legitimate registration. Cloud metadata
// (169.254.0.0/16, fe80::/10), loopback, and TEST-NET ranges stay blocked in
// both modes; they are never a legitimate agent URL.
//
// Both IPv4 and IPv6 are checked. The previous implementation returned false
// for every non-IPv4 input, which meant a registered `[::1]` or `[fe80::…]`
// URL would bypass the SSRF gate entirely.
func isPrivateOrMetadataIP(ip net.IP) bool {
// Always blocked — IPv4 cloud metadata + network-test ranges.
metadataRangesV4 := []string{
"169.254.0.0/16", // link-local / IMDSv1-v2
"100.64.0.0/10", // CGNAT — reachable via some VPC configs, not a legit agent URL
"192.0.2.0/24", // TEST-NET-1
"198.51.100.0/24", // TEST-NET-2
"203.0.113.0/24", // TEST-NET-3
}
// Always blocked — IPv6 cloud-metadata / loopback equivalents.
metadataRangesV6 := []string{
"::1/128", // loopback
"fe80::/10", // link-local (IMDS analogue)
"::ffff:0:0/96", // IPv4-mapped loopback (defence-in-depth; To4() below usually normalises first)
}
// RFC-1918 private — blocked in self-hosted, allowed in SaaS.
rfc1918RangesV4 := []string{
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16",
}
// RFC-4193 ULA — IPv6 analogue of RFC-1918. Same SaaS-mode treatment.
ulaRangesV6 := []string{
"fc00::/7",
}
contains := func(cidrs []string, target net.IP) bool {
for _, c := range cidrs {
_, n, err := net.ParseCIDR(c)
if err != nil {
continue
}
if n.Contains(target) {
return true
}
}
return false
}
// Prefer IPv4 semantics when the input is an IPv4 address encoded in any
// form (raw v4, ::ffff:a.b.c.d, etc.) — To4() normalises all of them.
if ip4 := ip.To4(); ip4 != nil {
if contains(metadataRangesV4, ip4) {
return true
}
if saasMode() {
return false
}
return contains(rfc1918RangesV4, ip4)
}
// True IPv6 path.
if contains(metadataRangesV6, ip) {
return true
}
if saasMode() {
return false
}
return contains(ulaRangesV6, ip)
}
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
// Returns (0, 0, false) when the key is absent or contains no non-zero values.
func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) {
rawUsage, has := m["usage"]
if !has {
return 0, 0, false
}
var usage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
}
if err := json.Unmarshal(rawUsage, &usage); err != nil {
return 0, 0, false
}
if usage.InputTokens == 0 && usage.OutputTokens == 0 {
return 0, 0, false
}
return usage.InputTokens, usage.OutputTokens, true
}