forked from molecule-ai/molecule-core
Merge pull request #2352 from Molecule-AI/staging
staging → main: auto-promote 0b83faa
This commit is contained in:
commit
c140ad28ae
160
.github/workflows/continuous-synth-e2e.yml
vendored
Normal file
160
.github/workflows/continuous-synth-e2e.yml
vendored
Normal file
@ -0,0 +1,160 @@
|
||||
name: Continuous synthetic E2E (staging)
|
||||
|
||||
# Hard gate (#2342): cron-driven full-lifecycle E2E that catches
|
||||
# regressions visible only at runtime — schema drift, deployment-pipeline
|
||||
# gaps, vendor outages, env-var rotations, DNS / CF / Railway side-effects.
|
||||
#
|
||||
# Why this gate exists:
|
||||
# PR-time CI catches code-level regressions but not deployment-time or
|
||||
# integration-time ones. Today's empirical data:
|
||||
# • #2345 (A2A v0.2 silent drop) — passed all unit tests, broke at
|
||||
# JSON-RPC parse layer between sender and receiver. Visible only
|
||||
# to a sender exercising the full path.
|
||||
# • RFC #2312 chat upload — landed on staging-branch but never
|
||||
# reached staging tenants because publish-workspace-server-image
|
||||
# was main-only. Caught by manual dogfooding hours after deploy.
|
||||
# Both would have surfaced within 15-20 min of regression if a
|
||||
# continuous synth-E2E was running.
|
||||
#
|
||||
# Cadence: every 20 min (3x/hour). The script is conservatively
|
||||
# bounded at 10 min wall-clock; even on degraded staging it should
|
||||
# finish before the next firing. cron-overlap is guarded by the
|
||||
# concurrency group below.
|
||||
#
|
||||
# Cost: ~3 runs/hour × 5-10 min × $0.008/min GHA = ~$0.50-$1/day.
|
||||
# Plus a fresh tenant provisioned + torn down each run (Railway +
|
||||
# AWS pennies). Negligible.
|
||||
#
|
||||
# Failure handling: when the run fails, the workflow exits non-zero
|
||||
# and GitHub's standard email/notification path fires. Operators
|
||||
# can subscribe to this workflow's failure channel for paging-grade
|
||||
# alerting.
|
||||
|
||||
on:
|
||||
schedule:
|
||||
# Every 20 minutes, on the :00 :20 :40. Offsets the existing :15
|
||||
# sweep-cf-orphans and :45 sweep-cf-tunnels so the three
|
||||
# operations don't all hit Cloudflare/AWS at the same minute.
|
||||
- cron: '0,20,40 * * * *'
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
runtime:
|
||||
description: "Runtime to provision (langgraph = fastest, default; hermes = slower but covers SDK-native path; claude-code = needs OAUTH token in tenant env)"
|
||||
required: false
|
||||
default: "langgraph"
|
||||
type: string
|
||||
keep_org:
|
||||
description: "Skip teardown for post-mortem debugging (only manual dispatch — never set this for cron runs)"
|
||||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
# No issue-write here — failures surface as red runs in the workflow
|
||||
# history. If you want auto-issue-on-fail, add a follow-up step that
|
||||
# uses gh issue create gated on `if: failure()`. Keeping the surface
|
||||
# minimal until that's actually wanted.
|
||||
|
||||
# Serialize so two firings can never overlap. Cron firing every 20 min
|
||||
# but scripts conservatively bounded at 10 min — overlap shouldn't
|
||||
# happen in steady state, but if a run hangs we don't want N more
|
||||
# stacking up.
|
||||
concurrency:
|
||||
group: continuous-synth-e2e
|
||||
cancel-in-progress: false
|
||||
|
||||
jobs:
|
||||
synth:
|
||||
name: Synthetic E2E against staging
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 12
|
||||
env:
|
||||
# langgraph default keeps cold-start under 5 min on staging EC2.
|
||||
# hermes is slower (~7-10 min) and isn't needed for the
|
||||
# regression class this gate exists to catch (deployment-pipeline
|
||||
# + schema-drift + integration). Operators can pick hermes via
|
||||
# workflow_dispatch when they need to exercise the SDK-native
|
||||
# session path.
|
||||
E2E_RUNTIME: ${{ github.event.inputs.runtime || 'langgraph' }}
|
||||
# Bound to 10 min so a stuck provision fails the run instead of
|
||||
# holding up the next cron firing. 15-min default in the script
|
||||
# is for the on-PR full lifecycle where we have more headroom.
|
||||
E2E_PROVISION_TIMEOUT_SECS: '600'
|
||||
# Slug suffix — namespaced "synth-" so these runs are
|
||||
# distinguishable from PR-driven runs in CP admin.
|
||||
E2E_RUN_ID: synth-${{ github.run_id }}
|
||||
# Forced false for cron; respected for manual dispatch
|
||||
E2E_KEEP_ORG: ${{ github.event.inputs.keep_org == 'true' && '1' || '' }}
|
||||
MOLECULE_CP_URL: ${{ vars.STAGING_CP_URL || 'https://staging-api.moleculesai.app' }}
|
||||
MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
|
||||
|
||||
- name: Verify required secret present
|
||||
run: |
|
||||
# Schedule-vs-dispatch hardening (mirrors the sweep-cf-* and
|
||||
# redeploy-tenants-on-* workflows): hard-fail on missing secret
|
||||
# for cron firing so a misconfigured-repo doesn't silently
|
||||
# report green while doing nothing. Soft-skip on operator
|
||||
# dispatch — operators can dispatch ad-hoc to verify a fix
|
||||
# without setting up the secret first.
|
||||
if [ -z "${MOLECULE_ADMIN_TOKEN:-}" ]; then
|
||||
if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then
|
||||
echo "::warning::CP_STAGING_ADMIN_API_TOKEN not set — synth E2E cannot run"
|
||||
echo "::warning::Set it at Settings → Secrets and Variables → Actions"
|
||||
exit 0
|
||||
fi
|
||||
echo "::error::CP_STAGING_ADMIN_API_TOKEN secret missing — synth E2E cannot run"
|
||||
echo "::error::Set it at Settings → Secrets and Variables → Actions; pull from staging-CP's CP_ADMIN_API_TOKEN env in Railway."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Install required tools
|
||||
run: |
|
||||
# The script depends on jq + curl (already on ubuntu-latest)
|
||||
# and python3 (likewise). Verify they're all present so we
|
||||
# fail fast on a runner image regression rather than mid-script.
|
||||
for cmd in jq curl python3; do
|
||||
command -v "$cmd" >/dev/null 2>&1 || {
|
||||
echo "::error::required tool '$cmd' not on PATH — runner image regression?"
|
||||
exit 1
|
||||
}
|
||||
done
|
||||
|
||||
- name: Run synthetic E2E
|
||||
# The script handles its own teardown via EXIT trap; even on
|
||||
# failure (timeout, assertion), the org is deprovisioned and
|
||||
# leaks are reported. Exit code propagates from the script.
|
||||
run: |
|
||||
bash tests/e2e/test_staging_full_saas.sh
|
||||
|
||||
- name: Failure summary
|
||||
# Runs only on failure. Adds a job summary so the workflow run
|
||||
# page shows a quick "what happened" instead of forcing readers
|
||||
# to scroll through script output.
|
||||
if: failure()
|
||||
run: |
|
||||
{
|
||||
echo "## Continuous synth E2E failed"
|
||||
echo ""
|
||||
echo "**Run ID:** ${{ github.run_id }}"
|
||||
echo "**Trigger:** ${{ github.event_name }}"
|
||||
echo "**Runtime:** ${E2E_RUNTIME}"
|
||||
echo "**Slug:** synth-${{ github.run_id }}"
|
||||
echo ""
|
||||
echo "### What this means"
|
||||
echo ""
|
||||
echo "Staging just regressed on a path that previously worked. Likely classes:"
|
||||
echo "- Schema mismatch between sender and receiver (#2345 class)"
|
||||
echo "- Deployment-pipeline gap (RFC #2312 / staging-tenant-image-stale class)"
|
||||
echo "- Vendor outage (Cloudflare, Railway, AWS, GHCR)"
|
||||
echo "- Staging-CP env var rotation"
|
||||
echo ""
|
||||
echo "### Next steps"
|
||||
echo ""
|
||||
echo "1. Check the script output above for the assertion that failed"
|
||||
echo "2. If it's a vendor outage, no action needed — next firing in ~20 min"
|
||||
echo "3. If it's a code regression, find the causing PR via \`git log\` against last green run and revert/fix"
|
||||
echo "4. Keep an eye on the next 1-2 firings — flake vs persistent fail differs in priority"
|
||||
} >> "$GITHUB_STEP_SUMMARY"
|
||||
63
workspace-server/internal/db/architecture_test.go
Normal file
63
workspace-server/internal/db/architecture_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
package db_test
|
||||
|
||||
// Architecture test (#2344): db is a leaf — DB pool + migrations + raw
|
||||
// SQL helpers, no business-logic dependencies. The DB layer must be
|
||||
// testable with sqlmock in isolation. If db starts importing handlers
|
||||
// or provisioner, every db unit test would need to bring up that
|
||||
// subsystem, and the layering becomes circular.
|
||||
//
|
||||
// If this test fails: you put business logic in the db package. Move
|
||||
// it to a higher-tier package that imports db, not the reverse.
|
||||
|
||||
import (
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/"
|
||||
|
||||
func TestDBHasNoInternalDependencies(t *testing.T) {
|
||||
t.Parallel()
|
||||
for path, file := range listImports(t, ".") {
|
||||
if strings.HasPrefix(path, moduleInternalPrefix) {
|
||||
t.Errorf(
|
||||
"db must not import other internal packages "+
|
||||
"(found %q in %s) — db is the foundation layer and a "+
|
||||
"reverse dep creates a cycle (everything imports db). "+
|
||||
"See workspace-server/internal/db/architecture_test.go.",
|
||||
path, file,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func listImports(t *testing.T, dir string) map[string]string {
|
||||
t.Helper()
|
||||
fset := token.NewFileSet()
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("read %s: %v", dir, err)
|
||||
}
|
||||
out := make(map[string]string)
|
||||
for _, e := range entries {
|
||||
name := e.Name()
|
||||
if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", name, err)
|
||||
}
|
||||
for _, imp := range f.Imports {
|
||||
path := strings.Trim(imp.Path.Value, "\"")
|
||||
if _, seen := out[path]; !seen {
|
||||
out[path] = name
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
@ -305,17 +306,54 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
// Normalize the JSON-RPC envelope BEFORE the poll-mode short-circuit
|
||||
// so the activity_logs entry carries the protocol method name (initialize,
|
||||
// message/send, etc.) — the polling agent uses that to dispatch the
|
||||
// request body to the right handler. Doing it here also means a
|
||||
// malformed payload fails the same way for push and poll callers
|
||||
// (consistent 400 instead of "queued garbage").
|
||||
normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
body = normalizedBody
|
||||
|
||||
// #2339 PR 2 — poll-mode short-circuit. When the target workspace
|
||||
// is registered as delivery_mode=poll (e.g. an operator's laptop
|
||||
// running molecule-mcp-claude-channel), the platform does NOT
|
||||
// dispatch over HTTP — the agent has no public URL. Instead we record
|
||||
// the A2A request to activity_logs and the agent picks it up via
|
||||
// GET /activity?since_id= (PR 3).
|
||||
//
|
||||
// Returning here means we skip resolveAgentURL entirely (no SSRF check
|
||||
// needed — there's no URL to validate; no DNS lookup against potentially-
|
||||
// changing operator-side IPs) and skip the dispatch path completely
|
||||
// (no Do(), no maybeMarkContainerDead). The response is a synthetic
|
||||
// {status:"queued"} envelope so the caller (canvas, another workspace)
|
||||
// knows delivery is acknowledged but pending consumption.
|
||||
if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll {
|
||||
if logActivity {
|
||||
h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod)
|
||||
}
|
||||
respBody, marshalErr := json.Marshal(gin.H{
|
||||
"status": "queued",
|
||||
"delivery_mode": models.DeliveryModePoll,
|
||||
"method": a2aMethod,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "failed to marshal poll-mode response"},
|
||||
}
|
||||
}
|
||||
return http.StatusOK, respBody, nil
|
||||
}
|
||||
|
||||
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
@ -486,11 +524,54 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
}
|
||||
|
||||
// Ensure params.message.messageId exists (required by a2a-sdk)
|
||||
// AND v0.2→v0.3 compat (#2345): when sender supplies
|
||||
// params.message.content (v0.2) instead of params.message.parts
|
||||
// (v0.3), wrap the content as a single text Part so the downstream
|
||||
// a2a-sdk's v0.3 Pydantic validator accepts the message.
|
||||
//
|
||||
// Pre-fix: Design Director silently dropped briefs whose sender
|
||||
// used v0.2 shape — Pydantic rejected at parse time, the rejection
|
||||
// went only to logs, and the sender saw a happy 200/202.
|
||||
//
|
||||
// Reject loud (HTTP 400) when neither content nor parts is present;
|
||||
// previously the SDK's own rejection happened post-handler-dispatch
|
||||
// and was invisible to the original sender.
|
||||
if params, ok := payload["params"].(map[string]interface{}); ok {
|
||||
if msg, ok := params["message"].(map[string]interface{}); ok {
|
||||
if _, hasID := msg["messageId"]; !hasID {
|
||||
msg["messageId"] = uuid.New().String()
|
||||
}
|
||||
_, hasParts := msg["parts"]
|
||||
rawContent, hasContent := msg["content"]
|
||||
if !hasParts {
|
||||
if hasContent {
|
||||
switch v := rawContent.(type) {
|
||||
case string:
|
||||
msg["parts"] = []interface{}{
|
||||
map[string]interface{}{"kind": "text", "text": v},
|
||||
}
|
||||
case []interface{}:
|
||||
msg["parts"] = v
|
||||
default:
|
||||
return nil, "", &proxyA2AError{
|
||||
Status: http.StatusBadRequest,
|
||||
Response: gin.H{
|
||||
"error": "invalid params.message.content type",
|
||||
"hint": "content must be a string (v0.2 compat) or omitted in favour of parts (v0.3)",
|
||||
},
|
||||
}
|
||||
}
|
||||
delete(msg, "content")
|
||||
} else {
|
||||
return nil, "", &proxyA2AError{
|
||||
Status: http.StatusBadRequest,
|
||||
Response: gin.H{
|
||||
"error": "params.message must contain either 'parts' (v0.3) or 'content' (v0.2 compat)",
|
||||
"hint": "v0.3 example: {\"parts\":[{\"kind\":\"text\",\"text\":\"...\"}]}",
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
@ -13,6 +14,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
@ -376,6 +378,74 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
// lookupDeliveryMode returns the workspace's delivery_mode. On any DB
|
||||
// error or missing row it returns DeliveryModePush — the fail-closed
|
||||
// default. "Closed" here means "fall back to today's behavior (synchronous
|
||||
// dispatch)" rather than "fall back to drop the request silently into
|
||||
// activity_logs where the agent might never see it." A poll-mode workspace
|
||||
// that briefly reads as push will get its A2A request dispatched to the
|
||||
// stored URL (or a 502 if no URL); a push-mode workspace that briefly
|
||||
// reads as poll would get its request silently queued with no dispatch.
|
||||
// The first failure is loud + recoverable; the second is silent.
|
||||
//
|
||||
// The function is intentionally lookup-only — it never mutates the row.
|
||||
// The register handler (registry.go) is the only writer for delivery_mode.
|
||||
//
|
||||
// See #2339 PR 1 for the column + register-flow side; this is the
|
||||
// proxy-side read used for the short-circuit in proxyA2ARequest.
|
||||
func lookupDeliveryMode(ctx context.Context, workspaceID string) string {
|
||||
var mode sql.NullString
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&mode)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err)
|
||||
}
|
||||
return models.DeliveryModePush
|
||||
}
|
||||
if !mode.Valid || mode.String == "" {
|
||||
return models.DeliveryModePush
|
||||
}
|
||||
if !models.IsValidDeliveryMode(mode.String) {
|
||||
log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String)
|
||||
return models.DeliveryModePush
|
||||
}
|
||||
return mode.String
|
||||
}
|
||||
|
||||
// logA2AReceiveQueued records a poll-mode "queued" A2A receive into
|
||||
// activity_logs. Same shape as logA2ASuccess but without ResponseBody
|
||||
// (there is no response yet — the polling agent will produce one when
|
||||
// it picks the request up). status="ok" because the request was
|
||||
// successfully queued; the consume side reports its own outcome.
|
||||
//
|
||||
// The activity_logs row is what the polling agent's GET /activity?since_id=
|
||||
// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A
|
||||
// without a public URL.
|
||||
func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) {
|
||||
var wsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if wsName == "" {
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (queued for poll)"
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
Status: "ok",
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
|
||||
// Returns (0, 0, false) when the key is absent or contains no non-zero values.
|
||||
func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64, ok bool) {
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -1137,7 +1138,10 @@ func TestNormalizeA2APayload_PreservesExistingMessageId(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) {
|
||||
raw := []byte(`{"params":{"message":{"role":"user"}}}`)
|
||||
// Method extraction returns empty string when method is absent,
|
||||
// regardless of message validity. Include parts: [] so the v0.2→v0.3
|
||||
// compat check (#2345) doesn't reject before method extraction.
|
||||
raw := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`)
|
||||
_, method, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
@ -1147,6 +1151,102 @@ func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// --- v0.2 → v0.3 compat shim (#2345) ---
|
||||
|
||||
func TestNormalizeA2APayload_ConvertsV02StringContentToParts(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":"hello world"}}}`)
|
||||
out, _, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(out, &parsed); err != nil {
|
||||
t.Fatalf("output not valid JSON: %v", err)
|
||||
}
|
||||
msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{})
|
||||
if _, stillHasContent := msg["content"]; stillHasContent {
|
||||
t.Error("v0.2 'content' field should be removed after conversion")
|
||||
}
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) != 1 {
|
||||
t.Fatalf("expected 1 part, got %v", msg["parts"])
|
||||
}
|
||||
part := parts[0].(map[string]interface{})
|
||||
if part["kind"] != "text" || part["text"] != "hello world" {
|
||||
t.Errorf("expected {kind:text, text:'hello world'}, got %v", part)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_ConvertsV02ListContentToParts(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":[{"kind":"text","text":"hi"}]}}}`)
|
||||
out, _, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
_ = json.Unmarshal(out, &parsed)
|
||||
msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{})
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) != 1 {
|
||||
t.Fatalf("expected list preserved as parts, got %v", msg["parts"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_PreservesV03Parts(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"hi"}]}}}`)
|
||||
out, _, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
_ = json.Unmarshal(out, &parsed)
|
||||
msg := parsed["params"].(map[string]interface{})["message"].(map[string]interface{})
|
||||
if _, hasContent := msg["content"]; hasContent {
|
||||
t.Error("did not expect content field in v0.3-shaped payload output")
|
||||
}
|
||||
parts := msg["parts"].([]interface{})
|
||||
if len(parts) != 1 {
|
||||
t.Errorf("expected 1 part preserved, got %d", len(parts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_RejectsMessageWithNeitherContentNorParts(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","metadata":{}}}}`)
|
||||
_, _, perr := normalizeA2APayload(raw)
|
||||
if perr == nil {
|
||||
t.Fatal("expected error for message with neither content nor parts")
|
||||
}
|
||||
if perr.Status != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", perr.Status)
|
||||
}
|
||||
errMsg, _ := perr.Response["error"].(string)
|
||||
if !strings.Contains(errMsg, "parts") || !strings.Contains(errMsg, "content") {
|
||||
t.Errorf("error message should mention both 'parts' and 'content', got: %q", errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_RejectsContentWithUnsupportedType(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","content":42}}}`)
|
||||
_, _, perr := normalizeA2APayload(raw)
|
||||
if perr == nil {
|
||||
t.Fatal("expected error for non-string non-list content")
|
||||
}
|
||||
if perr.Status != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", perr.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_NoMessageNoCheck(t *testing.T) {
|
||||
raw := []byte(`{"method":"tasks/list","params":{}}`)
|
||||
_, method, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error on params-message-absent payload: %+v", perr)
|
||||
}
|
||||
if method != "tasks/list" {
|
||||
t.Errorf("expected method=tasks/list, got %q", method)
|
||||
}
|
||||
}
|
||||
|
||||
// --- resolveAgentURL direct unit tests ---
|
||||
|
||||
func TestResolveAgentURL_CacheHit(t *testing.T) {
|
||||
@ -1704,3 +1804,185 @@ func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== ProxyA2A — poll-mode short-circuit (#2339 PR 2) ====================
|
||||
|
||||
// TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch verifies the core
|
||||
// invariant of #2339 PR 2: when delivery_mode=poll, ProxyA2A must NOT
|
||||
// hit resolveAgentURL (which would SSRF-check or 502 on a missing URL)
|
||||
// and must NOT dispatch over HTTP. It records the request to activity_logs
|
||||
// and returns 200 {status:"queued"} instead.
|
||||
//
|
||||
// Without this short-circuit, the canvas chat fails for any workspace
|
||||
// running molecule-mcp-claude-channel (operator's laptop, no public URL):
|
||||
// resolveAgentURL would 502 on the missing URL and the polling agent
|
||||
// would never see the inbound message. That's the bug PR 2 fixes.
|
||||
func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
const wsID = "ws-poll-shortcircuit"
|
||||
|
||||
// Budget check still runs (above the short-circuit) — affirms the
|
||||
// budget guard is mode-agnostic, which is correct: a poll-mode
|
||||
// workspace shouldn't burn unmetered platform CPU/storage either.
|
||||
expectBudgetCheck(mock, wsID)
|
||||
|
||||
// lookupDeliveryMode SELECT — returns poll, triggering the short-circuit.
|
||||
// Note: NO ExpectQuery for `SELECT url, status FROM workspaces` (that's
|
||||
// resolveAgentURL's query) — the short-circuit must skip resolveAgentURL.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
|
||||
|
||||
// Activity log: the queued receive (logA2AReceiveQueued in helpers.go).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
|
||||
body := `{"jsonrpc":"2.0","id":"poll-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "queued" {
|
||||
t.Errorf("response.status = %v, want %q", resp["status"], "queued")
|
||||
}
|
||||
if resp["delivery_mode"] != "poll" {
|
||||
t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll")
|
||||
}
|
||||
if resp["method"] != "message/send" {
|
||||
t.Errorf("response.method = %v, want %q (the JSON-RPC method that was queued)", resp["method"], "message/send")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract:
|
||||
// a push-mode workspace (default) is NOT affected by the new short-circuit.
|
||||
// It still proceeds to resolveAgentURL + dispatch. Without this guard, a
|
||||
// regression in lookupDeliveryMode could silently break the entire fleet.
|
||||
func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
const wsID = "ws-push-default"
|
||||
|
||||
dispatched := false
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
dispatched = true
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL)
|
||||
expectBudgetCheck(mock, wsID)
|
||||
|
||||
// lookupDeliveryMode returns "push" — short-circuit must NOT fire.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("push"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
|
||||
body := `{"jsonrpc":"2.0","id":"push-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 (dispatched), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !dispatched {
|
||||
t.Error("push-mode workspace: expected the agent server to receive the request, but it did not")
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err == nil {
|
||||
if resp["status"] == "queued" {
|
||||
t.Error("push-mode response leaked queued envelope — short-circuit fired when it shouldn't have")
|
||||
}
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract:
|
||||
// a DB error reading delivery_mode must default to push (the existing
|
||||
// behavior), NOT poll. Failing to push means a poll-mode workspace
|
||||
// briefly attempts a real dispatch — visible failure (502 / SSRF
|
||||
// rejection / restart cascade), not a silent drop into activity_logs
|
||||
// where the agent might never look. Loud > silent, recoverable > lost.
|
||||
func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
const wsID = "ws-mode-db-error"
|
||||
|
||||
expectBudgetCheck(mock, wsID)
|
||||
|
||||
// lookupDeliveryMode hits a transient DB error → must default push.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
// Push path proceeds to resolveAgentURL — empty result → 502 path.
|
||||
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id =").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
|
||||
body := `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
if w.Code == http.StatusOK {
|
||||
var resp map[string]interface{}
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["status"] == "queued" {
|
||||
t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,8 +31,9 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) {
|
||||
parentID := "parent-ws-123"
|
||||
mock.ExpectBegin()
|
||||
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@ -66,8 +67,9 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) {
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectBegin()
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@ -288,7 +290,7 @@ func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3).
|
||||
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@ -320,8 +322,13 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// resolveDeliveryMode preflight — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-prov").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`).
|
||||
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// DB returns provisioner URL (127.0.0.1) — should take precedence over agent-reported URL
|
||||
|
||||
@ -2,6 +2,7 @@ package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -100,9 +101,14 @@ func TestRegisterHandler(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// resolveDeliveryMode preflight — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-123").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// Expect the upsert INSERT ... ON CONFLICT
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`).
|
||||
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect the SELECT url query (for cache URL logic)
|
||||
@ -290,8 +296,9 @@ func TestWorkspaceCreate(t *testing.T) {
|
||||
|
||||
// Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace).
|
||||
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect transaction commit (no secrets in this payload)
|
||||
|
||||
@ -2,6 +2,7 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
@ -116,6 +117,41 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) {
|
||||
// returned IP is checked against the blocklist. This closes the gap where
|
||||
// an attacker could register agent.example.com pointing to 169.254.169.254.
|
||||
//
|
||||
// resolveDeliveryMode returns the EFFECTIVE delivery mode for a register
|
||||
// call given the payload's explicit value (which may be empty) and the
|
||||
// row's existing stored value (which may not exist yet on first
|
||||
// registration).
|
||||
//
|
||||
// Resolution order:
|
||||
// 1. payload value if non-empty (caller validated it's push/poll already)
|
||||
// 2. existing row's delivery_mode if the row exists
|
||||
// 3. "push" (the schema default — safe fallback for both new rows and
|
||||
// a row whose delivery_mode is somehow NULL despite the NOT NULL
|
||||
// CHECK constraint, which is forward-defensive only)
|
||||
//
|
||||
// Returns ("", err) only on a real DB error; sql.ErrNoRows is treated
|
||||
// as "no row yet, default to push" — that's the first-register flow.
|
||||
func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, payloadMode string) (string, error) {
|
||||
if payloadMode != "" {
|
||||
// Validated by IsValidDeliveryMode in the caller.
|
||||
return payloadMode, nil
|
||||
}
|
||||
var existing sql.NullString
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&existing)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if existing.Valid && existing.String != "" {
|
||||
return existing.String, nil
|
||||
}
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
|
||||
// Returns a non-nil error suitable for including in a 400 Bad Request response.
|
||||
func validateAgentURL(rawURL string) error {
|
||||
if rawURL == "" {
|
||||
@ -221,15 +257,11 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// C6: reject SSRF-capable URLs before persisting or caching them.
|
||||
if err := validateAgentURL(payload.URL); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
|
||||
// C6: reject SSRF-capable URLs before persisting or caching them.
|
||||
if err := validateAgentURL(payload.URL); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
// Validate explicit delivery_mode if the agent declared one; empty is
|
||||
// allowed and resolves to the row's existing value (or "push" default)
|
||||
// in the upsert below. See #2339 for the poll/push split rationale.
|
||||
if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"})
|
||||
return
|
||||
}
|
||||
|
||||
@ -250,9 +282,60 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
return // 401 response already written by requireWorkspaceToken
|
||||
}
|
||||
|
||||
// Resolve the EFFECTIVE delivery mode for THIS register call: the
|
||||
// payload's explicit value wins; falling back to the existing row's
|
||||
// stored value; falling back to push (the schema default). Done AFTER
|
||||
// the C18 token check so a hijack attempt fails on auth before we
|
||||
// reveal whether a workspace row exists at all (resolveDeliveryMode
|
||||
// would otherwise side-channel that via timing). #2339.
|
||||
effectiveMode, err := h.resolveDeliveryMode(ctx, payload.ID, payload.DeliveryMode)
|
||||
if err != nil {
|
||||
log.Printf("Registry register: resolveDeliveryMode failed for %s: %v", payload.ID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"})
|
||||
return
|
||||
}
|
||||
|
||||
// URL handling diverges by mode:
|
||||
// push: URL is required and must pass the SSRF safety check —
|
||||
// same as pre-#2339 behavior (the workspace must be reachable for
|
||||
// the proxy to dispatch).
|
||||
// poll: URL is optional and ignored when present. We don't even
|
||||
// validate it because the platform never dispatches to it. Skipping
|
||||
// validateAgentURL is intentional — a poll-mode workspace doesn't
|
||||
// need a publicly-routable URL, so a localhost / private IP /
|
||||
// missing URL is correct, not a mis-configuration.
|
||||
if effectiveMode == models.DeliveryModePush {
|
||||
if payload.URL == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"})
|
||||
return
|
||||
}
|
||||
if err := validateAgentURL(payload.URL); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
agentCardStr := string(payload.AgentCard)
|
||||
|
||||
// Upsert workspace: update url, agent_card, status if already exists.
|
||||
// urlForUpsert: poll-mode workspaces don't need a URL. Empty input
|
||||
// becomes NULL via sql.NullString so the row's URL stays clean (the
|
||||
// CASE below also preserves an existing provisioner-set URL, which
|
||||
// matters for hybrid setups where a workspace was previously push
|
||||
// and is being re-registered as poll).
|
||||
var urlForUpsert sql.NullString
|
||||
if payload.URL != "" {
|
||||
urlForUpsert = sql.NullString{String: payload.URL, Valid: true}
|
||||
}
|
||||
|
||||
// modeForUpsert: empty payload value means "keep what's already on the
|
||||
// row, or default to push for new rows". The COALESCE in the CASE on
|
||||
// the UPDATE branch and the EXCLUDED.delivery_mode on the INSERT branch
|
||||
// implement that. We pass effectiveMode (already resolved above) so
|
||||
// the row's mode is consistent with the URL-validation decision we
|
||||
// just made.
|
||||
modeForUpsert := effectiveMode
|
||||
|
||||
// Upsert workspace: update url, agent_card, status, delivery_mode if already exists.
|
||||
// On INSERT (workspace not yet created via POST /workspaces), use ID as name placeholder.
|
||||
// Keep existing URL if provisioner already set a host-accessible one (starts with http://127.0.0.1).
|
||||
//
|
||||
@ -261,9 +344,9 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
// the row. Without this guard, bulk deletes left tier-3 stragglers because
|
||||
// the last pre-teardown heartbeat flipped status back to 'online' after
|
||||
// Delete's UPDATE.
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now())
|
||||
_, err = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
url = CASE
|
||||
WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url
|
||||
@ -272,9 +355,10 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
agent_card = EXCLUDED.agent_card,
|
||||
status = 'online',
|
||||
last_heartbeat_at = now(),
|
||||
delivery_mode = EXCLUDED.delivery_mode,
|
||||
updated_at = now()
|
||||
WHERE workspaces.status IS DISTINCT FROM 'removed'
|
||||
`, payload.ID, payload.ID, payload.URL, agentCardStr)
|
||||
`, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert)
|
||||
if err != nil {
|
||||
log.Printf("Registry register error: %v (id=%s)", err, payload.ID)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"})
|
||||
@ -289,6 +373,12 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
// Cache URL — prefer existing provisioner URL over agent-reported one.
|
||||
// The DB CASE already preserves provisioner URLs, so read from DB as source of truth
|
||||
// instead of adding a Redis round-trip on every registration.
|
||||
//
|
||||
// Poll-mode workspaces typically have no URL at all; skip the cache
|
||||
// writes entirely in that case so we don't poison the cache with an
|
||||
// empty string that another caller might mistake for "registered with
|
||||
// no URL" vs "not yet registered". The proxy short-circuits poll-mode
|
||||
// before consulting the URL cache anyway (see #2339 PR 2).
|
||||
cachedURL := payload.URL
|
||||
var dbURL string
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT url FROM workspaces WHERE id = $1`, payload.ID).Scan(&dbURL); err == nil {
|
||||
@ -296,20 +386,26 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
cachedURL = dbURL
|
||||
}
|
||||
}
|
||||
if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil {
|
||||
log.Printf("Registry cache url error: %v", err)
|
||||
if cachedURL != "" {
|
||||
if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil {
|
||||
log.Printf("Registry cache url error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Cache agent-reported URL separately for workspace-to-workspace discovery
|
||||
// (Docker containers can reach each other by hostname but not via host ports)
|
||||
if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil {
|
||||
log.Printf("Registry cache internal url error: %v", err)
|
||||
// (Docker containers can reach each other by hostname but not via host ports).
|
||||
// Same skip-when-empty rule as above.
|
||||
if payload.URL != "" {
|
||||
if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil {
|
||||
log.Printf("Registry cache internal url error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast WORKSPACE_ONLINE
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.ID, map[string]interface{}{
|
||||
"url": cachedURL,
|
||||
"agent_card": payload.AgentCard,
|
||||
"url": cachedURL,
|
||||
"agent_card": payload.AgentCard,
|
||||
"delivery_mode": effectiveMode,
|
||||
}); err != nil {
|
||||
log.Printf("Registry broadcast error: %v", err)
|
||||
}
|
||||
@ -324,7 +420,7 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
// Legacy workspaces that registered before tokens existed have no
|
||||
// live token; they bootstrap one here on their next register call.
|
||||
// New workspaces always pass through this path on their first boot.
|
||||
response := gin.H{"status": "registered"}
|
||||
response := gin.H{"status": "registered", "delivery_mode": effectiveMode}
|
||||
if hasLive, hasLiveErr := wsauth.HasAnyLiveToken(ctx, db.DB, payload.ID); hasLiveErr == nil && !hasLive {
|
||||
token, tokErr := wsauth.IssueToken(ctx, db.DB, payload.ID)
|
||||
if tokErr != nil {
|
||||
|
||||
@ -61,9 +61,17 @@ func TestRegister_DBError(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// resolveDeliveryMode SELECT — no row yet, so default "push".
|
||||
// (#2339) New preflight after C18 token check; HasAnyLiveToken's COUNT
|
||||
// query has no mock here and fails-open per requireWorkspaceToken's
|
||||
// DB-error handling, so the next DB hit is this delivery_mode lookup.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-fail").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// DB insert fails
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`).
|
||||
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@ -579,10 +587,14 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// resolveDeliveryMode preflight — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-resurrect").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
// This regex-ish match requires the guard. If the handler ever drops
|
||||
// the clause the test fails because the emitted SQL won't match.
|
||||
mock.ExpectExec("ON CONFLICT.*WHERE workspaces.status IS DISTINCT FROM 'removed'").
|
||||
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`).
|
||||
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows affected = correctly guarded
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs("ws-resurrect").
|
||||
@ -843,9 +855,14 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) {
|
||||
WithArgs("ws-new").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// resolveDeliveryMode — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-new").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// Workspace upsert proceeds normally.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`).
|
||||
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
@ -910,6 +927,11 @@ func TestRegister_ReturnsPlatformInboundSecret_RFC2312_PRF(t *testing.T) {
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// resolveDeliveryMode — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// Workspace upsert.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@ -980,6 +1002,10 @@ func TestRegister_NoInboundSecret_OmitsField(t *testing.T) {
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
// resolveDeliveryMode — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectExec("INSERT INTO workspaces").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@ -1063,9 +1089,14 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) {
|
||||
WithArgs("ws-errtest").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// resolveDeliveryMode — no row yet, default push (#2339).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-errtest").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// DB upsert fails with a descriptive internal error.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`).
|
||||
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@ -1283,3 +1314,211 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) {
|
||||
t.Errorf("monthly_spend=0 must not trigger a DB write for spend: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Register — delivery_mode (#2339) ====================
|
||||
|
||||
// TestRegister_PollMode_AcceptsEmptyURL verifies the new contract:
|
||||
// when delivery_mode=poll, URL is optional. A poll-mode workspace
|
||||
// (e.g. operator's laptop running molecule-mcp-claude-channel) has
|
||||
// no public URL to register, and we must NOT reject the registration
|
||||
// for that. The proxy short-circuits poll-mode A2A in PR 2 — no URL
|
||||
// needed there either.
|
||||
func TestRegister_PollMode_AcceptsEmptyURL(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
const wsID = "ws-poll-no-url"
|
||||
|
||||
// Bootstrap path — no live tokens, so requireWorkspaceToken passes
|
||||
// without an Authorization header.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// resolveDeliveryMode: payload sets "poll" explicitly, so we should
|
||||
// NOT hit the DB lookup at all (the helper short-circuits when
|
||||
// payload value is non-empty). Asserted by the absence of an
|
||||
// ExpectQuery for SELECT delivery_mode here.
|
||||
|
||||
// Upsert MUST run with empty URL (sql.NullString) and delivery_mode=poll.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// SELECT url for cache: returns NULL/empty for poll-mode rows. The
|
||||
// handler skips the cache writes in that case (no CacheURL /
|
||||
// CacheInternalURL expectations).
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
|
||||
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Token issuance — first-register path.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"`+wsID+`","delivery_mode":"poll","agent_card":{"name":"poll-agent"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("poll-mode + empty URL: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp["delivery_mode"] != "poll" {
|
||||
t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll")
|
||||
}
|
||||
// First-register must still mint a token regardless of delivery_mode.
|
||||
if resp["auth_token"] == nil {
|
||||
t.Error("expected auth_token in response (first-register path)")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_PushMode_RejectsEmptyURL verifies the symmetric contract:
|
||||
// push-mode (the default) still requires a URL. Skipping URL validation
|
||||
// in poll-mode mustn't accidentally relax the push-mode invariant — that
|
||||
// would silently break dispatch for the rest of the fleet.
|
||||
func TestRegister_PushMode_RejectsEmptyURL(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// Bootstrap path through requireWorkspaceToken.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs("ws-push-no-url").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// resolveDeliveryMode: no row yet, defaults to push. The handler
|
||||
// then validates the URL — which is empty — and returns 400.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs("ws-push-no-url").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"ws-push-no-url","agent_card":{"name":"push-agent"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("push-mode + empty URL: expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "url is required") {
|
||||
t.Errorf("expected 'url is required' in error body, got: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_InvalidDeliveryMode rejects payloads that declare an
|
||||
// unrecognised delivery_mode — defends against a typo silently
|
||||
// becoming "push" and leaving the operator wondering why polling
|
||||
// doesn't work.
|
||||
func TestRegister_InvalidDeliveryMode(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"ws-x","url":"http://localhost:8000","agent_card":{"name":"a"},"delivery_mode":"webhook"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("invalid delivery_mode: expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "delivery_mode") {
|
||||
t.Errorf("expected error body to mention delivery_mode, got: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_PollMode_PreservesExistingValue: when the row already
|
||||
// has delivery_mode=poll and the payload doesn't set it, the resolved
|
||||
// mode should be poll — i.e. "absent payload mode" must NOT silently
|
||||
// downgrade an existing poll workspace to push. Ensures Telegram-style
|
||||
// stability: mode is sticky once set.
|
||||
func TestRegister_PollMode_PreservesExistingValue(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
const wsID = "ws-existing-poll"
|
||||
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// resolveDeliveryMode: row exists with delivery_mode=poll.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
|
||||
|
||||
// Upsert carries the resolved poll mode forward — even though
|
||||
// payload didn't restate it. URL still empty (poll-mode shape).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
// No delivery_mode in payload — must inherit "poll" from the row.
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["delivery_mode"] != "poll" {
|
||||
t.Errorf("delivery_mode = %v, want %q (must inherit existing row's mode when payload absent)",
|
||||
resp["delivery_mode"], "poll")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -224,11 +224,24 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
if maxConcurrent <= 0 {
|
||||
maxConcurrent = models.DefaultMaxConcurrentTasks
|
||||
}
|
||||
// Insert workspace with runtime persisted in DB (inside transaction)
|
||||
// delivery_mode: explicit payload value (validated below), else default
|
||||
// to push (the schema default + pre-#2339 behavior). Validated here, not
|
||||
// in workspace_provision.go, so a bad value fails the create cleanly
|
||||
// instead of mid-provision after side effects.
|
||||
deliveryMode := payload.DeliveryMode
|
||||
if deliveryMode == "" {
|
||||
deliveryMode = models.DeliveryModePush
|
||||
}
|
||||
if !models.IsValidDeliveryMode(deliveryMode) {
|
||||
tx.Rollback() //nolint:errcheck
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"})
|
||||
return
|
||||
}
|
||||
// Insert workspace with runtime + delivery_mode persisted in DB (inside transaction)
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11)
|
||||
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent)
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12)
|
||||
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode)
|
||||
if err != nil {
|
||||
tx.Rollback() //nolint:errcheck
|
||||
log.Printf("Create workspace error: %v", err)
|
||||
|
||||
@ -152,6 +152,7 @@ func TestWorkspaceBudget_Create_WithLimit(t *testing.T) {
|
||||
"none", // workspace_access
|
||||
&budgetVal, // budget_limit ($10)
|
||||
models.DefaultMaxConcurrentTasks, // max_concurrent_tasks default
|
||||
"push", // delivery_mode default (#2339)
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
@ -155,7 +155,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) {
|
||||
// Transaction begins, workspace INSERT fails, transaction is rolled back.
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
mock.ExpectRollback()
|
||||
|
||||
@ -188,7 +188,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
|
||||
// Expect workspace INSERT with defaulted tier=3 (Privileged — the
|
||||
// handler default in workspace.go), runtime="langgraph"
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
@ -239,7 +239,7 @@ func TestWorkspaceCreate_WithSecrets_Persists(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Secret inserted inside the same transaction.
|
||||
mock.ExpectExec("INSERT INTO workspace_secrets").
|
||||
@ -1258,7 +1258,7 @@ runtime_config:
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@ -1315,7 +1315,7 @@ model: anthropic:claude-sonnet-4-5
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Legacy Agent", nil, 3, "langgraph",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@ -1368,7 +1368,7 @@ runtime_config:
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Custom Hermes", nil, 3, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
|
||||
63
workspace-server/internal/models/architecture_test.go
Normal file
63
workspace-server/internal/models/architecture_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
package models_test
|
||||
|
||||
// Architecture test (#2344): models is a leaf — it carries pure type
|
||||
// definitions and must not import any other internal/* package. Almost
|
||||
// every package in workspace-server depends on models; if models grew a
|
||||
// reverse dep, the import graph would cycle.
|
||||
//
|
||||
// If this test fails: you put behavior inside models. Move the behavior
|
||||
// to whichever package actually owns it (handlers, provisioner, db, …)
|
||||
// and have *that* package import models, not the reverse.
|
||||
|
||||
import (
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/"
|
||||
|
||||
func TestModelsHasNoInternalDependencies(t *testing.T) {
|
||||
t.Parallel()
|
||||
for path, file := range listImports(t, ".") {
|
||||
if strings.HasPrefix(path, moduleInternalPrefix) {
|
||||
t.Errorf(
|
||||
"models must not import other internal packages "+
|
||||
"(found %q in %s) — models is the pure-types leaf and any "+
|
||||
"reverse dep creates an import cycle since most packages "+
|
||||
"depend on models. See workspace-server/internal/models/architecture_test.go.",
|
||||
path, file,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func listImports(t *testing.T, dir string) map[string]string {
|
||||
t.Helper()
|
||||
fset := token.NewFileSet()
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("read %s: %v", dir, err)
|
||||
}
|
||||
out := make(map[string]string)
|
||||
for _, e := range entries {
|
||||
name := e.Name()
|
||||
if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", name, err)
|
||||
}
|
||||
for _, imp := range f.Imports {
|
||||
path := strings.Trim(imp.Path.Value, "\"")
|
||||
if _, seen := out[path]; !seen {
|
||||
out[path] = name
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@ -32,16 +32,42 @@ type Workspace struct {
|
||||
UptimeSeconds int `json:"uptime_seconds" db:"uptime_seconds"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
// DeliveryMode: "push" (synchronous to URL — default) or "poll" (logged
|
||||
// to activity_logs, agent reads via GET /activity?since_id=). See
|
||||
// migration 045 + RFC #2339.
|
||||
DeliveryMode string `json:"delivery_mode" db:"delivery_mode"`
|
||||
// Canvas layout fields (from JOIN)
|
||||
X float64 `json:"x"`
|
||||
Y float64 `json:"y"`
|
||||
Collapsed bool `json:"collapsed"`
|
||||
}
|
||||
|
||||
// Delivery mode constants. Matches the CHECK constraint in migration 045.
|
||||
const (
|
||||
DeliveryModePush = "push"
|
||||
DeliveryModePoll = "poll"
|
||||
)
|
||||
|
||||
// IsValidDeliveryMode reports whether s is one of the recognised
|
||||
// delivery modes. Empty string is NOT valid here — callers must
|
||||
// resolve the default ("push") before calling.
|
||||
func IsValidDeliveryMode(s string) bool {
|
||||
return s == DeliveryModePush || s == DeliveryModePoll
|
||||
}
|
||||
|
||||
type RegisterPayload struct {
|
||||
ID string `json:"id" binding:"required"`
|
||||
URL string `json:"url" binding:"required"`
|
||||
AgentCard json.RawMessage `json:"agent_card" binding:"required"`
|
||||
ID string `json:"id" binding:"required"`
|
||||
// URL is required for push-mode workspaces; optional / unused for
|
||||
// poll-mode (the platform never dispatches to it). The handler
|
||||
// enforces the conditional requirement based on the resolved
|
||||
// delivery mode (payload value, falling back to the row's existing
|
||||
// value, falling back to "push").
|
||||
URL string `json:"url"`
|
||||
AgentCard json.RawMessage `json:"agent_card" binding:"required"`
|
||||
// DeliveryMode is optional. Empty string means "keep the existing
|
||||
// value on the workspace row, or default to push for new rows".
|
||||
// When set, must be one of DeliveryModePush / DeliveryModePoll.
|
||||
DeliveryMode string `json:"delivery_mode,omitempty"`
|
||||
}
|
||||
|
||||
type HeartbeatPayload struct {
|
||||
@ -127,7 +153,11 @@ type CreateWorkspacePayload struct {
|
||||
Model string `json:"model"`
|
||||
Runtime string `json:"runtime"` // "langgraph" (default), "claude-code", etc.
|
||||
External bool `json:"external"` // true = no Docker container, just a registered URL
|
||||
URL string `json:"url"` // for external workspaces: the A2A endpoint URL
|
||||
URL string `json:"url"` // for external workspaces: the A2A endpoint URL (push mode only — omit for poll)
|
||||
// DeliveryMode: "push" (default) sends inbound A2A to URL synchronously;
|
||||
// "poll" records inbound to activity_logs for the agent to consume via
|
||||
// GET /activity?since_id=. Poll mode does not require a URL. See #2339.
|
||||
DeliveryMode string `json:"delivery_mode,omitempty"`
|
||||
WorkspaceDir string `json:"workspace_dir"` // host path to mount as /workspace (empty = isolated volume)
|
||||
WorkspaceAccess string `json:"workspace_access"` // "none" (default), "read_only", or "read_write" — see #65
|
||||
ParentID *string `json:"parent_id"`
|
||||
|
||||
80
workspace-server/internal/provisioner/architecture_test.go
Normal file
80
workspace-server/internal/provisioner/architecture_test.go
Normal file
@ -0,0 +1,80 @@
|
||||
package provisioner_test
|
||||
|
||||
// Architecture test (#2344): provisioner is below handlers/router in
|
||||
// the layer hierarchy. handlers wires provisioner into HTTP routes;
|
||||
// the reverse direction (provisioner reaching back into handlers or
|
||||
// the router) creates a cycle and tangles infra-orchestration with
|
||||
// transport.
|
||||
//
|
||||
// Note: provisioner CURRENTLY imports db (for the runtime-image
|
||||
// lookup). That's a known coupling — see PR #2276 review thread on
|
||||
// where image resolution should live. The narrower rule we enforce
|
||||
// here is "no upward import to handlers/router," which is the harder
|
||||
// rule to keep clean.
|
||||
//
|
||||
// If this test fails: you reached "up" the stack. Pass whatever you
|
||||
// need from handlers down through a constructor parameter or a
|
||||
// function-typed callback instead of importing the package directly.
|
||||
|
||||
import (
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/"
|
||||
|
||||
var provisionerForbiddenImports = []string{
|
||||
moduleInternalPrefix + "handlers",
|
||||
moduleInternalPrefix + "router",
|
||||
}
|
||||
|
||||
func TestProvisionerDoesNotImportUpstreamLayers(t *testing.T) {
|
||||
t.Parallel()
|
||||
imports := listImports(t, ".")
|
||||
for path, file := range imports {
|
||||
for _, forbidden := range provisionerForbiddenImports {
|
||||
if path == forbidden || strings.HasPrefix(path, forbidden+"/") {
|
||||
t.Errorf(
|
||||
"provisioner must not import %q (found in %s) — "+
|
||||
"provisioner sits below handlers/router in the layer "+
|
||||
"hierarchy and a reverse dep creates a cycle. Pass "+
|
||||
"what you need down via constructor params or "+
|
||||
"function-typed callbacks. See workspace-server/internal/"+
|
||||
"provisioner/architecture_test.go.",
|
||||
path, file,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func listImports(t *testing.T, dir string) map[string]string {
|
||||
t.Helper()
|
||||
fset := token.NewFileSet()
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("read %s: %v", dir, err)
|
||||
}
|
||||
out := make(map[string]string)
|
||||
for _, e := range entries {
|
||||
name := e.Name()
|
||||
if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", name, err)
|
||||
}
|
||||
for _, imp := range f.Imports {
|
||||
path := strings.Trim(imp.Path.Value, "\"")
|
||||
if _, seen := out[path]; !seen {
|
||||
out[path] = name
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
68
workspace-server/internal/wsauth/architecture_test.go
Normal file
68
workspace-server/internal/wsauth/architecture_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
package wsauth_test
|
||||
|
||||
// Architecture test (#2344): wsauth is a leaf package — it must not import
|
||||
// any other internal/* package. The auth layer is below business logic;
|
||||
// importing handlers, db, or any cousin package would force every wsauth
|
||||
// test to spin up that subsystem, defeating the unit-test boundary that
|
||||
// makes the auth code reviewable.
|
||||
//
|
||||
// If this test fails: you added an import that crosses a layer. Either
|
||||
// move the dependency the other direction (consumer wires wsauth into
|
||||
// itself), accept the boundary by inlining what you need, or — if the
|
||||
// new coupling is genuinely correct — explicitly update this test with
|
||||
// the new allowed import + a comment explaining why.
|
||||
|
||||
import (
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const moduleInternalPrefix = "github.com/Molecule-AI/molecule-monorepo/platform/internal/"
|
||||
|
||||
func TestWsauthHasNoInternalDependencies(t *testing.T) {
|
||||
t.Parallel()
|
||||
for path, file := range listImports(t, ".") {
|
||||
if strings.HasPrefix(path, moduleInternalPrefix) {
|
||||
t.Errorf(
|
||||
"wsauth must not import other internal packages "+
|
||||
"(found %q in %s) — wsauth is the auth leaf and must stay "+
|
||||
"unit-testable without spinning up other subsystems. "+
|
||||
"See workspace-server/internal/wsauth/architecture_test.go for context.",
|
||||
path, file,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// listImports returns import-path → first-file-where-seen for non-test
|
||||
// .go files in dir. Used by every architecture_test.go in this tree.
|
||||
func listImports(t *testing.T, dir string) map[string]string {
|
||||
t.Helper()
|
||||
fset := token.NewFileSet()
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("read %s: %v", dir, err)
|
||||
}
|
||||
out := make(map[string]string)
|
||||
for _, e := range entries {
|
||||
name := e.Name()
|
||||
if e.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
f, err := parser.ParseFile(fset, filepath.Join(dir, name), nil, parser.ImportsOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", name, err)
|
||||
}
|
||||
for _, imp := range f.Imports {
|
||||
path := strings.Trim(imp.Path.Value, "\"")
|
||||
if _, seen := out[path]; !seen {
|
||||
out[path] = name
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
-- 045_workspaces_delivery_mode.down.sql
|
||||
--
|
||||
-- Drops the delivery_mode column. Any code reading it after rollback falls
|
||||
-- back to push mode (the pre-#2339 behavior), so this is forward-only-safe
|
||||
-- only if the matching application code is rolled back in the same release.
|
||||
|
||||
ALTER TABLE workspaces
|
||||
DROP COLUMN IF EXISTS delivery_mode;
|
||||
@ -0,0 +1,54 @@
|
||||
-- 045_workspaces_delivery_mode.up.sql
|
||||
--
|
||||
-- Per-workspace declaration of how A2A traffic is delivered TO the workspace.
|
||||
--
|
||||
-- push (default, today's behavior)
|
||||
-- Platform synchronously POSTs to workspaces.url and surfaces the response
|
||||
-- to the caller. Requires a publicly-routable URL (SSRF gate at
|
||||
-- a2a_proxy.go:455). Used by all hosted runtimes (claude-code, hermes,
|
||||
-- etc.) where the platform's provisioner sets the URL at boot.
|
||||
--
|
||||
-- poll
|
||||
-- Platform records the inbound A2A as an a2a_receive activity row and
|
||||
-- returns 200 to the caller without dispatching. The agent client (e.g.
|
||||
-- molecule-mcp-claude-channel) consumes the inbox via
|
||||
-- GET /workspaces/:id/activity?since_id=… and replies via
|
||||
-- POST /workspaces/:peer/a2a. NO URL required — works through every NAT,
|
||||
-- firewall, and dev-laptop without a tunnel.
|
||||
--
|
||||
-- Why a column and not a derived signal:
|
||||
--
|
||||
-- * Mutual exclusivity matches Telegram's getUpdates / setWebhook
|
||||
-- semantics — operationally cleaner than "both half-work because URL
|
||||
-- is empty". Telegram explicitly rejects double-delivery; we now do
|
||||
-- the same.
|
||||
-- * The platform short-circuits BEFORE the SSRF check, so a poll-mode
|
||||
-- workspace with a stale or missing URL never trips the silent-404
|
||||
-- failure mode that motivated #2339.
|
||||
-- * Push-mode is the safe default: every existing workspace continues
|
||||
-- to work exactly as before with no migration of behavior.
|
||||
--
|
||||
-- Backwards compatibility:
|
||||
--
|
||||
-- * NOT NULL with DEFAULT 'push' — the ALTER backfills existing rows.
|
||||
-- * Push-mode workspaces are unchanged: SSRF check still gates dispatch,
|
||||
-- activity logging unchanged.
|
||||
-- * Poll-mode opt-in only via POST /workspaces (delivery_mode='poll')
|
||||
-- or POST /registry/register with delivery_mode='poll'. Cannot be
|
||||
-- toggled after the fact via heartbeat — flipping mode mid-life is
|
||||
-- ambiguous (in-flight pushes vs queued polls), so an explicit
|
||||
-- PATCH /workspaces/:id/delivery_mode endpoint will be added later
|
||||
-- if the use case appears.
|
||||
--
|
||||
-- Reverse plan: the .down.sql drops the column. Any short-circuit code
|
||||
-- that reads delivery_mode would then hit a "column does not exist"
|
||||
-- error — readers fall back to push mode (behaviour pre-2339), which is
|
||||
-- the safe degradation. Acceptable for a forward-only schema; the down
|
||||
-- exists for migration tooling parity, not as a recommended runtime path.
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS delivery_mode TEXT NOT NULL DEFAULT 'push'
|
||||
CHECK (delivery_mode IN ('push', 'poll'));
|
||||
|
||||
COMMENT ON COLUMN workspaces.delivery_mode IS
|
||||
'How inbound A2A is delivered: push (synchronous to workspaces.url) or poll (logged to activity_logs, agent reads via GET /activity?since_id=). See migration 045 + RFC #2339.';
|
||||
Loading…
Reference in New Issue
Block a user