forked from molecule-ai/molecule-core
Every workspace in the cross-EC2 SaaS provisioning shape was failing
registration, heartbeat, or A2A routing. Four distinct blockers sat
between "EC2 is up" and "agent responds"; three are platform-side and
fixed here (the fourth is in the CP user-data, separate PR).
1. SSRF validator blocked RFC-1918 (registry.go + mcp.go)
validateAgentURL and isPrivateOrMetadataIP rejected 172.16.0.0/12,
which contains the AWS default VPC range (172.31.x.x) that every
sibling workspace EC2 registers from. Registration returned 400 and
the 10-min provision sweep flipped status to failed. RFC-1918 +
IPv6 ULA are now gated behind saasMode(); link-local (169.254/16),
loopback, IPv6 metadata (fe80::/10, ::1), and TEST-NET stay blocked
unconditionally in both modes.
saasMode() resolution order:
1. MOLECULE_DEPLOY_MODE=saas|self-hosted (explicit operator flag)
2. MOLECULE_ORG_ID presence (legacy implicit signal, kept for
back-compat so existing deployments don't need a config change)
isPrivateOrMetadataIP now actually checks IPv6 — previously it
returned false on any non-IPv4 input, which would let a registered
[::1] or [fe80::...] URL bypass the SSRF check entirely.
2. Orphan auth-token minting (workspace_provision.go)
issueAndInjectToken mints a token and stuffs it into
cfg.ConfigFiles[".auth_token"]. The Docker provisioner writes that
file into the /configs volume — the CP provisioner ignores it
(only cfg.EnvVars crosses the wire). Result: live token in DB, no
plaintext on disk, RegistryHandler.requireWorkspaceToken 401s every
/registry/register attempt because the workspace is no longer in
the "no live token → bootstrap-allowed" state. Now no-ops in SaaS
mode; the register handler already mints on first successful
register and returns the plaintext in the response body for the
runtime to persist locally.
Also removes the redundant wsauth.IssueToken call at the bottom of
provisionWorkspaceCP, which created the same orphan-token pattern
a second time.
3. Compaction artefacts (bundle/importer.go, handlers/org_tokens.go,
scheduler.go, workspace_provision.go)
Four pre-existing compile errors on main from an earlier session's
code truncation: missing tuple destructuring on ExecContext /
redactSecrets / orgTokenActor, missing close-brace in
Scheduler.fireSchedule's panic recovery. All one-line mechanical
fixes; without them the binary would not build.
Tests
-----
ssrf_test.go adds:
* TestSaasMode — covers the env resolution ladder (explicit flag
wins over legacy signal, case-insensitive, whitespace tolerant)
* TestIsPrivateOrMetadataIP_SaaSMode — asserts RFC-1918 + IPv6 ULA
flip to allowed, metadata/loopback/TEST-NET still blocked
* TestIsPrivateOrMetadataIP_IPv6 — regression guard for the old
"returns false for all IPv6" behaviour
Follow-up issue for CP-sourced workspace_id attestation will be filed
separately — closes the residual intra-VPC SSRF + token-race windows
the SaaS-mode relaxation introduces.
Verified end-to-end today on workspace 6565a2e0 (hermes runtime, OpenAI
provider) — agent returned "PONG" in 1.4s after register → heartbeat →
A2A proxy → runtime.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
875 lines
33 KiB
Go
875 lines
33 KiB
Go
package handlers
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// platformInDocker caches whether THIS process is running inside a
|
|
// Docker container. The a2a proxy uses this to decide whether stored
|
|
// agent URLs like "http://127.0.0.1:<ephemeral>" need to be rewritten
|
|
// to the Docker-DNS form "http://ws-<id>:8000". When the platform is
|
|
// on the host, 127.0.0.1 IS the host and the ephemeral-port URL works
|
|
// as-is; rewriting to container DNS would then break (host can't
|
|
// resolve Docker bridge hostnames).
|
|
//
|
|
// Detection: /.dockerenv is the canonical marker inside the default
|
|
// Docker runtime. MOLECULE_IN_DOCKER is an explicit override for
|
|
// environments where /.dockerenv is absent (Podman, custom runtimes).
|
|
// Accepts any value strconv.ParseBool recognises — 1, 0, t, f, T, F,
|
|
// true, false, TRUE, FALSE, True, False. Anything else (including
|
|
// "yes"/"on") is treated as unset and falls through to the /.dockerenv
|
|
// check.
|
|
//
|
|
// Exposed as a var (not a const) so tests can toggle it via
|
|
// setPlatformInDockerForTest without fiddling with real filesystem
|
|
// markers or env vars. Production callers never mutate it.
|
|
var platformInDocker = detectPlatformInDocker()
|
|
|
|
func detectPlatformInDocker() bool {
|
|
if v := os.Getenv("MOLECULE_IN_DOCKER"); v != "" {
|
|
if b, err := strconv.ParseBool(v); err == nil {
|
|
return b
|
|
}
|
|
}
|
|
if _, err := os.Stat("/.dockerenv"); err == nil {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// setPlatformInDockerForTest overrides platformInDocker for the duration of
|
|
// a test and returns a function to restore the previous value. Use with
|
|
// defer in *_test.go only.
|
|
func setPlatformInDockerForTest(v bool) func() {
|
|
prev := platformInDocker
|
|
platformInDocker = v
|
|
return func() { platformInDocker = prev }
|
|
}
|
|
|
|
// maxProxyRequestBody is the maximum size of an A2A proxy request body (1MB).
|
|
const maxProxyRequestBody = 1 << 20
|
|
|
|
// systemCallerPrefixes are caller IDs that bypass workspace access control.
|
|
// These are non-workspace internal callers (webhooks, system services, tests).
|
|
var systemCallerPrefixes = []string{"webhook:", "system:", "test:", "channel:"}
|
|
|
|
// isSystemCaller returns true if callerID is a non-workspace internal caller.
|
|
func isSystemCaller(callerID string) bool {
|
|
for _, prefix := range systemCallerPrefixes {
|
|
if strings.HasPrefix(callerID, prefix) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// maxProxyResponseBody is the maximum size of an A2A proxy response body (10MB).
|
|
const maxProxyResponseBody = 10 << 20
|
|
|
|
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
|
|
// No client-level timeout — timeouts are enforced per-request via context deadlines:
|
|
// canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap).
|
|
var a2aClient = &http.Client{
|
|
Timeout: 60 * time.Second, // Safety net for when context deadlines are missing
|
|
}
|
|
|
|
type proxyA2AError struct {
|
|
Status int
|
|
Response gin.H
|
|
// Optional response headers (e.g. Retry-After on 503-busy). Kept separate
|
|
// from Response so the handler can set real HTTP headers, not just JSON.
|
|
Headers map[string]string
|
|
}
|
|
|
|
// busyRetryAfterSeconds is the Retry-After hint returned with 503-busy
|
|
// responses when an upstream workspace agent is overloaded (single-threaded
|
|
// mid-synthesis). Chosen to be long enough for typical PM synthesis work
|
|
// to complete but short enough that a caller's retry loop won't stall
|
|
// coordination. See issue #110.
|
|
const busyRetryAfterSeconds = 30
|
|
|
|
// isUpstreamBusyError classifies an http.Client.Do error as a transient
|
|
// "upstream busy" condition — a timeout or connection-reset while the
|
|
// container is still alive. Distinguishes legitimate busy-agent failures
|
|
// from fatal network errors so callers can retry with Retry-After.
|
|
func isUpstreamBusyError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return true
|
|
}
|
|
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
|
return true
|
|
}
|
|
// url.Error wraps "read tcp … EOF" and "Post …: context deadline
|
|
// exceeded" strings from the stdlib HTTP client without typing the
|
|
// inner cause. Fall back to substring match for those.
|
|
msg := err.Error()
|
|
return strings.Contains(msg, "context deadline exceeded") ||
|
|
strings.Contains(msg, "EOF") ||
|
|
strings.Contains(msg, "connection reset")
|
|
}
|
|
|
|
func (e *proxyA2AError) Error() string {
|
|
if e == nil || e.Response == nil {
|
|
return "proxy a2a error"
|
|
}
|
|
if msg, ok := e.Response["error"].(string); ok && msg != "" {
|
|
return msg
|
|
}
|
|
return "proxy a2a error"
|
|
}
|
|
|
|
// ProxyA2ARequest is the public wrapper for proxyA2ARequest, used by the
|
|
// cron scheduler and other internal callers that need to send A2A messages
|
|
// to workspaces programmatically (not from an HTTP handler).
|
|
func (h *WorkspaceHandler) ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
|
|
status, resp, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, logActivity)
|
|
if proxyErr != nil {
|
|
return status, resp, proxyErr
|
|
}
|
|
return status, resp, nil
|
|
}
|
|
|
|
// ProxyA2A handles POST /workspaces/:id/a2a
|
|
// Proxies A2A JSON-RPC requests from the canvas to workspace agents,
|
|
// avoiding CORS and Docker network issues.
|
|
func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
|
workspaceID := c.Param("id")
|
|
ctx := c.Request.Context()
|
|
|
|
// X-Timeout: caller-specified timeout in seconds (0 = no timeout).
|
|
// Overrides the default canvas (5 min) / agent (30 min) timeouts.
|
|
if tStr := c.GetHeader("X-Timeout"); tStr != "" {
|
|
if tSec, err := strconv.Atoi(tStr); err == nil && tSec > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(tSec)*time.Second)
|
|
defer cancel()
|
|
}
|
|
// tSec == 0 means no timeout — use the raw context (no deadline)
|
|
}
|
|
|
|
// Read the incoming request body (capped at 1MB)
|
|
body, err := io.ReadAll(io.LimitReader(c.Request.Body, maxProxyRequestBody))
|
|
if err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"})
|
|
return
|
|
}
|
|
|
|
callerID := c.GetHeader("X-Workspace-ID")
|
|
|
|
// #761 SECURITY: reject requests where the client-supplied X-Workspace-ID
|
|
// contains a system-caller prefix. isSystemCaller() bypasses both token
|
|
// validation and CanCommunicate. On the public /a2a endpoint, system-caller
|
|
// semantics only apply to callerIDs set by trusted server-side code
|
|
// (ProxyA2ARequest), never to HTTP header values. Legitimate system callers
|
|
// (webhooks, scheduler, restart_context) call proxyA2ARequest directly and
|
|
// never go through this HTTP handler.
|
|
if isSystemCaller(callerID) {
|
|
log.Printf("security: system-caller prefix forge attempt — remote=%q header=%q",
|
|
c.ClientIP(), callerID)
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "invalid caller ID"})
|
|
return
|
|
}
|
|
|
|
// Phase 30.5 — validate the caller's auth token when the caller IS
|
|
// a workspace (not canvas or a system caller). Canvas requests have
|
|
// no X-Workspace-ID so they bypass this check (the existing
|
|
// access-control layer already trusts them). System callers
|
|
// (webhook:* / system:* / test:*) only reach proxyA2ARequest via
|
|
// the server-side ProxyA2ARequest wrapper, never via this HTTP path.
|
|
//
|
|
// The bind is strict: the token must match `callerID`, not
|
|
// `workspaceID` (the target). A compromised token from workspace A
|
|
// must never authenticate calls from A pretending to be B.
|
|
if callerID != "" && callerID != workspaceID {
|
|
if err := validateCallerToken(ctx, c, callerID); err != nil {
|
|
return // response already written with 401
|
|
}
|
|
}
|
|
|
|
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true)
|
|
if proxyErr != nil {
|
|
for k, v := range proxyErr.Headers {
|
|
c.Header(k, v)
|
|
}
|
|
c.JSON(proxyErr.Status, proxyErr.Response)
|
|
return
|
|
}
|
|
|
|
c.Data(status, "application/json", respBody)
|
|
}
|
|
|
|
// checkWorkspaceBudget returns a proxyA2AError with 402 when the workspace
|
|
// has a budget_limit set and monthly_spend has reached or exceeded it.
|
|
// DB errors are logged and treated as fail-open — a budget check failure
|
|
// must not block legitimate A2A traffic.
|
|
func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID string) *proxyA2AError {
|
|
var budgetLimit sql.NullInt64
|
|
var monthlySpend int64
|
|
err := db.DB.QueryRowContext(ctx,
|
|
`SELECT budget_limit, COALESCE(monthly_spend, 0) FROM workspaces WHERE id = $1`,
|
|
workspaceID,
|
|
).Scan(&budgetLimit, &monthlySpend)
|
|
if err != nil {
|
|
if err != sql.ErrNoRows {
|
|
log.Printf("ProxyA2A: budget check failed for %s: %v", workspaceID, err)
|
|
}
|
|
return nil // fail-open
|
|
}
|
|
if budgetLimit.Valid && monthlySpend >= budgetLimit.Int64 {
|
|
log.Printf("ProxyA2A: budget exceeded for %s (spend=%d limit=%d)", workspaceID, monthlySpend, budgetLimit.Int64)
|
|
return &proxyA2AError{
|
|
Status: http.StatusPaymentRequired,
|
|
Response: gin.H{"error": "workspace budget limit exceeded"},
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, *proxyA2AError) {
|
|
// Access control: workspace-to-workspace requests must pass CanCommunicate check.
|
|
// Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*)
|
|
// are trusted. Self-calls (callerID == workspaceID) are always allowed.
|
|
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) {
|
|
if !registry.CanCommunicate(callerID, workspaceID) {
|
|
log.Printf("ProxyA2A: access denied %s → %s", callerID, workspaceID)
|
|
return 0, nil, &proxyA2AError{
|
|
Status: http.StatusForbidden,
|
|
Response: gin.H{"error": "access denied: workspaces cannot communicate per hierarchy rules"},
|
|
}
|
|
}
|
|
}
|
|
|
|
// Budget enforcement: reject A2A calls when the workspace has exceeded its
|
|
// monthly spend ceiling. Checked after access control so unauthorized calls
|
|
// are rejected first (403 > 429 in the denial hierarchy). Fail-open on DB
|
|
// errors so a budget check failure never blocks legitimate traffic.
|
|
if proxyErr := h.checkWorkspaceBudget(ctx, workspaceID); proxyErr != nil {
|
|
return 0, nil, proxyErr
|
|
}
|
|
|
|
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
|
|
if proxyErr != nil {
|
|
return 0, nil, proxyErr
|
|
}
|
|
|
|
normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body)
|
|
if proxyErr != nil {
|
|
return 0, nil, proxyErr
|
|
}
|
|
body = normalizedBody
|
|
|
|
startTime := time.Now()
|
|
resp, cancelFwd, err := h.dispatchA2A(ctx, agentURL, body, callerID)
|
|
if cancelFwd != nil {
|
|
defer cancelFwd()
|
|
}
|
|
durationMs := int(time.Since(startTime).Milliseconds())
|
|
if err != nil {
|
|
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
// Read agent response (capped at 10MB).
|
|
// #689: Do() succeeded, which means the target received the request and sent
|
|
// back response headers — delivery is confirmed. The body couldn't be
|
|
// fully read (connection drop, timeout mid-stream). Surface
|
|
// delivery_confirmed so callers can distinguish "not delivered" from
|
|
// "delivered, but response body lost". When delivery is confirmed,
|
|
// log the activity as successful (delivery happened) rather than leaving
|
|
// a false "failed" entry in the audit trail.
|
|
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
|
if readErr != nil {
|
|
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
|
|
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
|
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
|
if logActivity && deliveryConfirmed {
|
|
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
|
}
|
|
return 0, nil, &proxyA2AError{
|
|
Status: http.StatusBadGateway,
|
|
Response: gin.H{
|
|
"error": "failed to read agent response",
|
|
"delivery_confirmed": deliveryConfirmed,
|
|
},
|
|
}
|
|
}
|
|
|
|
if logActivity {
|
|
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
|
}
|
|
|
|
// Track LLM token usage for cost transparency (#593).
|
|
// Fires in a detached goroutine so token accounting never adds latency
|
|
// to the critical A2A path.
|
|
go extractAndUpsertTokenUsage(context.WithoutCancel(ctx), workspaceID, respBody)
|
|
|
|
return resp.StatusCode, respBody, nil
|
|
}
|
|
|
|
// resolveAgentURL returns a routable URL for the target workspace agent. It
|
|
// checks the Redis URL cache first, then falls back to a DB lookup, caching
|
|
// the result on success. When the platform runs inside Docker, 127.0.0.1:<host
|
|
// port> is rewritten to the container's Docker-bridge hostname (host-side
|
|
// platforms keep the original URL because the bridge name wouldn't resolve).
|
|
func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID string) (string, *proxyA2AError) {
|
|
agentURL, err := db.GetCachedURL(ctx, workspaceID)
|
|
if err != nil {
|
|
var urlNullable sql.NullString
|
|
var status string
|
|
err := db.DB.QueryRowContext(ctx,
|
|
`SELECT url, status FROM workspaces WHERE id = $1`, workspaceID,
|
|
).Scan(&urlNullable, &status)
|
|
if err == sql.ErrNoRows {
|
|
return "", &proxyA2AError{
|
|
Status: http.StatusNotFound,
|
|
Response: gin.H{"error": "workspace not found"},
|
|
}
|
|
}
|
|
if err != nil {
|
|
log.Printf("ProxyA2A lookup error: %v", err)
|
|
return "", &proxyA2AError{
|
|
Status: http.StatusInternalServerError,
|
|
Response: gin.H{"error": "lookup failed"},
|
|
}
|
|
}
|
|
if !urlNullable.Valid || urlNullable.String == "" {
|
|
// Auto-wake hibernated workspace on incoming A2A message (#711).
|
|
// Re-provision asynchronously and return 503 with a retry hint so
|
|
// the caller can retry once the workspace is back online (~10s).
|
|
if status == "hibernated" {
|
|
log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID)
|
|
go h.RestartByID(workspaceID)
|
|
return "", &proxyA2AError{
|
|
Status: http.StatusServiceUnavailable,
|
|
Headers: map[string]string{"Retry-After": "15"},
|
|
Response: gin.H{
|
|
"error": "workspace is waking from hibernation — retry in ~15 seconds",
|
|
"waking": true,
|
|
"retry_after": 15,
|
|
},
|
|
}
|
|
}
|
|
return "", &proxyA2AError{
|
|
Status: http.StatusServiceUnavailable,
|
|
Response: gin.H{"error": "workspace has no URL", "status": status},
|
|
}
|
|
}
|
|
agentURL = urlNullable.String
|
|
_ = db.CacheURL(ctx, workspaceID, agentURL)
|
|
}
|
|
|
|
// When the platform runs inside Docker, 127.0.0.1:{host_port} is
|
|
// unreachable (it's the platform container's own localhost, not the
|
|
// Docker host). Rewrite to the container's Docker-bridge hostname.
|
|
if strings.HasPrefix(agentURL, "http://127.0.0.1:") && h.provisioner != nil && platformInDocker {
|
|
agentURL = provisioner.InternalURL(workspaceID)
|
|
}
|
|
// SSRF defence: reject private/metadata URLs before making outbound call.
|
|
if err := isSafeURL(agentURL); err != nil {
|
|
log.Printf("ProxyA2A: unsafe URL for workspace %s: %v", workspaceID, err)
|
|
return "", &proxyA2AError{
|
|
Status: http.StatusBadGateway,
|
|
Response: gin.H{"error": "workspace URL is not publicly routable"},
|
|
}
|
|
}
|
|
return agentURL, nil
|
|
}
|
|
|
|
// normalizeA2APayload parses the incoming body, wraps it in a JSON-RPC 2.0
|
|
// envelope if absent, ensures params.message.messageId is set, and re-marshals
|
|
// to bytes. Also returns the A2A method name (for logging) extracted from the
|
|
// payload.
|
|
func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
|
var payload map[string]interface{}
|
|
if err := json.Unmarshal(body, &payload); err != nil {
|
|
return nil, "", &proxyA2AError{
|
|
Status: http.StatusBadRequest,
|
|
Response: gin.H{"error": "invalid JSON"},
|
|
}
|
|
}
|
|
|
|
// Wrap in JSON-RPC envelope if missing
|
|
if _, hasJSONRPC := payload["jsonrpc"]; !hasJSONRPC {
|
|
payload = map[string]interface{}{
|
|
"jsonrpc": "2.0",
|
|
"id": uuid.New().String(),
|
|
"method": payload["method"],
|
|
"params": payload["params"],
|
|
}
|
|
}
|
|
|
|
// Ensure params.message.messageId exists (required by a2a-sdk)
|
|
if params, ok := payload["params"].(map[string]interface{}); ok {
|
|
if msg, ok := params["message"].(map[string]interface{}); ok {
|
|
if _, hasID := msg["messageId"]; !hasID {
|
|
msg["messageId"] = uuid.New().String()
|
|
}
|
|
}
|
|
}
|
|
|
|
marshaledBody, marshalErr := json.Marshal(payload)
|
|
if marshalErr != nil {
|
|
return nil, "", &proxyA2AError{
|
|
Status: http.StatusInternalServerError,
|
|
Response: gin.H{"error": "failed to marshal request"},
|
|
}
|
|
}
|
|
|
|
var a2aMethod string
|
|
if m, ok := payload["method"].(string); ok {
|
|
a2aMethod = m
|
|
}
|
|
return marshaledBody, a2aMethod, nil
|
|
}
|
|
|
|
// dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation
|
|
// chains survive client disconnect (browser tab close). Default timeouts:
|
|
// canvas (callerID == "") = 5 min, agent-to-agent = 30 min. Callers can
|
|
// override via the X-Timeout header (applied to ctx upstream in ProxyA2A).
|
|
func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) {
|
|
forwardCtx := context.WithoutCancel(ctx)
|
|
var cancel context.CancelFunc
|
|
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
|
|
if callerID == "" {
|
|
forwardCtx, cancel = context.WithTimeout(forwardCtx, 5*time.Minute)
|
|
} else {
|
|
forwardCtx, cancel = context.WithTimeout(forwardCtx, 30*time.Minute)
|
|
}
|
|
}
|
|
req, err := http.NewRequestWithContext(forwardCtx, "POST", agentURL, bytes.NewReader(body))
|
|
if err != nil {
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
// Wrap the construction failure so the caller can distinguish it
|
|
// from an upstream Do() error and produce the correct 500 response.
|
|
return nil, nil, &proxyDispatchBuildError{err: err}
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, doErr := a2aClient.Do(req)
|
|
return resp, cancel, doErr
|
|
}
|
|
|
|
// proxyDispatchBuildError is a sentinel wrapper for failures inside
|
|
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
|
|
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
|
|
type proxyDispatchBuildError struct{ err error }
|
|
|
|
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
|
|
|
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
|
// runs the reactive container-health check, and (when `logActivity` is true)
|
|
// schedules a detached LogActivity goroutine for the failed attempt.
|
|
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
|
|
// Build-time failure (couldn't even create the http.Request) — return
|
|
// a 500 without the reactive-health / busy-retry paths.
|
|
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
|
|
_ = buildErr
|
|
return 0, nil, &proxyA2AError{
|
|
Status: http.StatusInternalServerError,
|
|
Response: gin.H{"error": "failed to create proxy request"},
|
|
}
|
|
}
|
|
|
|
log.Printf("ProxyA2A forward error: %v", err)
|
|
|
|
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
|
|
|
if logActivity {
|
|
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
|
}
|
|
if containerDead {
|
|
return 0, nil, &proxyA2AError{
|
|
Status: http.StatusServiceUnavailable,
|
|
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
|
|
}
|
|
}
|
|
// Container is alive but upstream Do() failed with a timeout/EOF-
|
|
// shaped error — the agent is most likely mid-synthesis on a
|
|
// previous request (single-threaded main loop). Surface as 503
|
|
// Busy with a Retry-After hint so callers can distinguish this
|
|
// from a real unreachable-agent (502) and retry with backoff.
|
|
// Issue #110.
|
|
if isUpstreamBusyError(err) {
|
|
return 0, nil, &proxyA2AError{
|
|
Status: http.StatusServiceUnavailable,
|
|
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
|
Response: gin.H{
|
|
"error": "workspace agent busy — retry after a short backoff",
|
|
"busy": true,
|
|
"retry_after": busyRetryAfterSeconds,
|
|
},
|
|
}
|
|
}
|
|
return 0, nil, &proxyA2AError{
|
|
Status: http.StatusBadGateway,
|
|
Response: gin.H{"error": "failed to reach workspace agent"},
|
|
}
|
|
}
|
|
|
|
// maybeMarkContainerDead runs the reactive health check after a forward error.
|
|
// If the workspace's Docker container is no longer running (and the workspace
|
|
// isn't external), it marks the workspace offline, clears Redis state,
|
|
// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true
|
|
// when the container was found dead.
|
|
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
|
var wsRuntime string
|
|
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
|
if h.provisioner == nil || wsRuntime == "external" {
|
|
return false
|
|
}
|
|
running, inspectErr := h.provisioner.IsRunning(ctx, workspaceID)
|
|
if inspectErr != nil {
|
|
// Transient Docker-daemon error (timeout, socket EOF, etc.). Post-
|
|
// #386, IsRunning returns (true, err) in this case — caller stays
|
|
// on the alive path and does not trigger a restart cascade. Log
|
|
// so the defect is visible without being destructive.
|
|
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
|
|
}
|
|
if running {
|
|
return false
|
|
}
|
|
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
|
|
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'offline', updated_at = now() WHERE id = $1 AND status NOT IN ('removed', 'provisioning')`, workspaceID); err != nil {
|
|
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
|
|
}
|
|
db.ClearWorkspaceKeys(ctx, workspaceID)
|
|
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{})
|
|
go h.RestartByID(workspaceID)
|
|
return true
|
|
}
|
|
|
|
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
|
|
// goroutine (the request context may already be done by the time it runs).
|
|
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
|
|
errMsg := err.Error()
|
|
var errWsName string
|
|
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
|
if errWsName == "" {
|
|
errWsName = workspaceID
|
|
}
|
|
summary := "A2A request to " + errWsName + " failed: " + errMsg
|
|
go func(parent context.Context) {
|
|
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
|
defer cancel()
|
|
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
|
WorkspaceID: workspaceID,
|
|
ActivityType: "a2a_receive",
|
|
SourceID: nilIfEmpty(callerID),
|
|
TargetID: &workspaceID,
|
|
Method: &a2aMethod,
|
|
Summary: &summary,
|
|
RequestBody: json.RawMessage(body),
|
|
DurationMs: &durationMs,
|
|
Status: "error",
|
|
ErrorDetail: &errMsg,
|
|
})
|
|
}(ctx)
|
|
}
|
|
|
|
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
|
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
|
// receive the reply without polling.
|
|
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
|
logStatus := "ok"
|
|
if statusCode >= 400 {
|
|
logStatus = "error"
|
|
}
|
|
var wsNameForLog string
|
|
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsNameForLog)
|
|
if wsNameForLog == "" {
|
|
wsNameForLog = workspaceID
|
|
}
|
|
|
|
// #817: track outbound activity on the CALLER so orchestrators can detect
|
|
// silent workspaces. Only update when callerID is a real workspace (not
|
|
// canvas, not a system caller) and the target returned 2xx/3xx.
|
|
if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 {
|
|
go func() {
|
|
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if _, err := db.DB.ExecContext(bgCtx,
|
|
`UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil {
|
|
log.Printf("last_outbound_at update failed for %s: %v", callerID, err)
|
|
}
|
|
}()
|
|
}
|
|
summary := a2aMethod + " → " + wsNameForLog
|
|
go func(parent context.Context) {
|
|
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
|
defer cancel()
|
|
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
|
WorkspaceID: workspaceID,
|
|
ActivityType: "a2a_receive",
|
|
SourceID: nilIfEmpty(callerID),
|
|
TargetID: &workspaceID,
|
|
Method: &a2aMethod,
|
|
Summary: &summary,
|
|
RequestBody: json.RawMessage(body),
|
|
ResponseBody: json.RawMessage(respBody),
|
|
DurationMs: &durationMs,
|
|
Status: logStatus,
|
|
})
|
|
}(ctx)
|
|
|
|
if callerID == "" && statusCode < 400 {
|
|
h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{
|
|
"response_body": json.RawMessage(respBody),
|
|
"method": a2aMethod,
|
|
"duration_ms": durationMs,
|
|
})
|
|
}
|
|
}
|
|
|
|
func nilIfEmpty(s string) *string {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return &s
|
|
}
|
|
|
|
// validateCallerToken enforces the Phase 30.5 auth-token contract on the
|
|
// caller of an A2A proxy request. Same lazy-bootstrap shape as
|
|
// registry.requireWorkspaceToken: if the caller workspace has any live
|
|
// token on file, the Authorization header is mandatory and must match;
|
|
// if the caller has zero live tokens, they're grandfathered through
|
|
// (their next /registry/register will mint their first token, after
|
|
// which this branch never fires again for them).
|
|
//
|
|
// On auth failure this writes the 401 via c and returns an error so the
|
|
// handler aborts without running the proxy.
|
|
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) error {
|
|
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
|
|
if err != nil {
|
|
// Fail-open here matches the heartbeat path — A2A caller auth is
|
|
// defense-in-depth on top of access-control hierarchy, not the
|
|
// sole gate on the secret material. A DB hiccup shouldn't take
|
|
// the whole A2A path down.
|
|
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, err)
|
|
return nil
|
|
}
|
|
if !hasLive {
|
|
return nil // legacy / pre-upgrade caller
|
|
}
|
|
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
|
if tok == "" {
|
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing caller auth token"})
|
|
return errInvalidCallerToken
|
|
}
|
|
if err := wsauth.ValidateToken(ctx, db.DB, callerID, tok); err != nil {
|
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid caller auth token"})
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// errInvalidCallerToken is a sentinel for validateCallerToken's "missing
|
|
// token" branch so the handler-level guard can detect it without string
|
|
// matching (the wsauth errors are typed for the invalid case).
|
|
var errInvalidCallerToken = errors.New("missing caller auth token")
|
|
|
|
// extractAndUpsertTokenUsage parses LLM usage from a raw A2A response body
|
|
// and persists it via upsertTokenUsage. Safe to call in a goroutine — logs
|
|
// errors but never panics. ctx must already be detached from the request.
|
|
func extractAndUpsertTokenUsage(ctx context.Context, workspaceID string, respBody []byte) {
|
|
in, out := parseUsageFromA2AResponse(respBody)
|
|
if in > 0 || out > 0 {
|
|
upsertTokenUsage(ctx, workspaceID, in, out)
|
|
}
|
|
}
|
|
|
|
// parseUsageFromA2AResponse extracts input_tokens / output_tokens from an A2A
|
|
// JSON-RPC response. Inspects two locations in order of preference:
|
|
// 1. result.usage — the JSON-RPC 2.0 result envelope from workspace agents.
|
|
// 2. usage — top-level, for non-JSON-RPC or direct Anthropic-shaped payloads.
|
|
//
|
|
// Returns (0, 0) when no recognisable usage data is found.
|
|
func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) {
|
|
if len(body) == 0 {
|
|
return 0, 0
|
|
}
|
|
var top map[string]json.RawMessage
|
|
if err := json.Unmarshal(body, &top); err != nil {
|
|
return 0, 0
|
|
}
|
|
|
|
// 1. result.usage (JSON-RPC 2.0 wrapper produced by workspace agents).
|
|
if rawResult, ok := top["result"]; ok {
|
|
var result map[string]json.RawMessage
|
|
if err := json.Unmarshal(rawResult, &result); err == nil {
|
|
if in, out, ok := readUsageMap(result); ok {
|
|
return in, out
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2. Fallback: top-level usage (direct Anthropic or non-JSON-RPC response).
|
|
if in, out, ok := readUsageMap(top); ok {
|
|
return in, out
|
|
}
|
|
return 0, 0
|
|
}
|
|
|
|
// isSafeURL validates that a URL resolves to a publicly-routable address,
|
|
// preventing A2A requests from being redirected to internal/cloud-metadata
|
|
// infrastructure (SSRF, CWE-918). Workspace URLs come from DB/Redis caches
|
|
// so we validate before making any outbound HTTP call.
|
|
func isSafeURL(rawURL string) error {
|
|
u, err := url.Parse(rawURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid URL: %w", err)
|
|
}
|
|
// Reject non-HTTP(S) schemes.
|
|
if u.Scheme != "http" && u.Scheme != "https" {
|
|
return fmt.Errorf("forbidden scheme: %s (only http/https allowed)", u.Scheme)
|
|
}
|
|
host := u.Hostname()
|
|
if host == "" {
|
|
return fmt.Errorf("empty hostname")
|
|
}
|
|
// Block direct IP addresses.
|
|
if ip := net.ParseIP(host); ip != nil {
|
|
if ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() {
|
|
return fmt.Errorf("forbidden loopback/unspecified IP: %s", ip)
|
|
}
|
|
if isPrivateOrMetadataIP(ip) {
|
|
return fmt.Errorf("forbidden private/metadata IP: %s", ip)
|
|
}
|
|
return nil
|
|
}
|
|
// For hostnames, resolve and validate each returned IP.
|
|
addrs, err := net.LookupHost(host)
|
|
if err != nil {
|
|
// DNS resolution failure — block it. Could be an internal hostname.
|
|
return fmt.Errorf("DNS resolution blocked for hostname: %s (%v)", host, err)
|
|
}
|
|
if len(addrs) == 0 {
|
|
return fmt.Errorf("DNS returned no addresses for: %s", host)
|
|
}
|
|
for _, addr := range addrs {
|
|
ip := net.ParseIP(addr)
|
|
if ip != nil && (ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() || isPrivateOrMetadataIP(ip)) {
|
|
return fmt.Errorf("hostname %s resolves to forbidden IP: %s", host, ip)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isPrivateOrMetadataIP returns true for cloud-metadata / loopback / link-local
|
|
// ranges (always) and RFC-1918 / IPv6 ULA ranges (self-hosted only).
|
|
//
|
|
// In SaaS cross-EC2 mode (see saasMode() in registry.go) the tenant platform
|
|
// and its workspaces share a VPC, so workspaces register with their
|
|
// VPC-private IP — typically 172.31.x.x on AWS default VPCs. Blocking RFC-1918
|
|
// unconditionally would reject every legitimate registration. Cloud metadata
|
|
// (169.254.0.0/16, fe80::/10), loopback, and TEST-NET ranges stay blocked in
|
|
// both modes; they are never a legitimate agent URL.
|
|
//
|
|
// Both IPv4 and IPv6 are checked. The previous implementation returned false
|
|
// for every non-IPv4 input, which meant a registered `[::1]` or `[fe80::…]`
|
|
// URL would bypass the SSRF gate entirely.
|
|
func isPrivateOrMetadataIP(ip net.IP) bool {
|
|
// Always blocked — IPv4 cloud metadata + network-test ranges.
|
|
metadataRangesV4 := []string{
|
|
"169.254.0.0/16", // link-local / IMDSv1-v2
|
|
"100.64.0.0/10", // CGNAT — reachable via some VPC configs, not a legit agent URL
|
|
"192.0.2.0/24", // TEST-NET-1
|
|
"198.51.100.0/24", // TEST-NET-2
|
|
"203.0.113.0/24", // TEST-NET-3
|
|
}
|
|
// Always blocked — IPv6 cloud-metadata / loopback equivalents.
|
|
metadataRangesV6 := []string{
|
|
"::1/128", // loopback
|
|
"fe80::/10", // link-local (IMDS analogue)
|
|
"::ffff:0:0/96", // IPv4-mapped loopback (defence-in-depth; To4() below usually normalises first)
|
|
}
|
|
// RFC-1918 private — blocked in self-hosted, allowed in SaaS.
|
|
rfc1918RangesV4 := []string{
|
|
"10.0.0.0/8",
|
|
"172.16.0.0/12",
|
|
"192.168.0.0/16",
|
|
}
|
|
// RFC-4193 ULA — IPv6 analogue of RFC-1918. Same SaaS-mode treatment.
|
|
ulaRangesV6 := []string{
|
|
"fc00::/7",
|
|
}
|
|
|
|
contains := func(cidrs []string, target net.IP) bool {
|
|
for _, c := range cidrs {
|
|
_, n, err := net.ParseCIDR(c)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if n.Contains(target) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Prefer IPv4 semantics when the input is an IPv4 address encoded in any
|
|
// form (raw v4, ::ffff:a.b.c.d, etc.) — To4() normalises all of them.
|
|
if ip4 := ip.To4(); ip4 != nil {
|
|
if contains(metadataRangesV4, ip4) {
|
|
return true
|
|
}
|
|
if saasMode() {
|
|
return false
|
|
}
|
|
return contains(rfc1918RangesV4, ip4)
|
|
}
|
|
|
|
// True IPv6 path.
|
|
if contains(metadataRangesV6, ip) {
|
|
return true
|
|
}
|
|
if saasMode() {
|
|
return false
|
|
}
|
|
return contains(ulaRangesV6, ip)
|
|
}
|
|
|
|
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
|
|
// Returns (0, 0, false) when the key is absent or contains no non-zero values.
|
|
func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) {
|
|
rawUsage, has := m["usage"]
|
|
if !has {
|
|
return 0, 0, false
|
|
}
|
|
var usage struct {
|
|
InputTokens int64 `json:"input_tokens"`
|
|
OutputTokens int64 `json:"output_tokens"`
|
|
}
|
|
if err := json.Unmarshal(rawUsage, &usage); err != nil {
|
|
return 0, 0, false
|
|
}
|
|
if usage.InputTokens == 0 && usage.OutputTokens == 0 {
|
|
return 0, 0, false
|
|
}
|
|
return usage.InputTokens, usage.OutputTokens, true
|
|
}
|