feat(workspaces): delivery_mode column + poll-mode register flow (#2339 PR 1)

Adds workspaces.delivery_mode (push, default | poll) and lets the register
handler accept poll-mode workspaces with no URL. This is the foundation
for the unified poll/push delivery design in #2339 — Telegram-getUpdates
shape for external runtimes that have no public URL.

What this PR does:

  - Migration 045: NOT NULL TEXT column, default 'push', CHECK constraint
    on the two valid values.
  - models.Workspace + RegisterPayload + CreateWorkspacePayload gain a
    DeliveryMode field. RegisterPayload.URL drops the `binding:"required"`
    tag — the handler now enforces it conditionally on the resolved mode.
  - Register handler: validates explicit delivery_mode if set; resolves
    effective mode (payload value, else stored row value, else push) AFTER
    the C18 token check; validates URL only when effective mode is push;
    persists delivery_mode in the upsert; returns it in the response;
    skips URL caching when payload.URL is empty.
  - CreateWorkspace handler: persists delivery_mode (defaults to push) in
    the same INSERT, validates it before any side effects.

What this PR does NOT do (intentional, follow-up PRs):

  - PR 2: short-circuit ProxyA2A for poll-mode workspaces (skip SSRF +
    dispatch, log a2a_receive activity, return 200).
  - PR 3: since_id cursor on GET /activity for lossless polling.
  - Plugin v0.2 in molecule-mcp-claude-channel: cursor persistence + a
    register helper that creates poll-mode workspaces.

Backwards compatibility: every existing workspace stays push-mode (schema
default) with identical behavior. New tests:
TestRegister_PollMode_AcceptsEmptyURL,
TestRegister_PushMode_RejectsEmptyURL,
TestRegister_InvalidDeliveryMode,
TestRegister_PollMode_PreservesExistingValue. All existing register +
create tests updated to expect the new delivery_mode column in the
INSERT args.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-29 21:46:23 -07:00
parent 21ed74c76a
commit d5b00d6ac1
10 changed files with 501 additions and 46 deletions

View File

@ -31,8 +31,9 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) {
parentID := "parent-ws-123"
mock.ExpectBegin()
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
// delivery_mode defaults to "push" when payload omits it (#2339).
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -66,8 +67,9 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) {
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mock.ExpectBegin()
// delivery_mode defaults to "push" when payload omits it (#2339).
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -288,7 +290,7 @@ func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3).
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -320,8 +322,13 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode preflight — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-prov").
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO workspaces").
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`).
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
// DB returns provisioner URL (127.0.0.1) — should take precedence over agent-reported URL

View File

@ -2,6 +2,7 @@ package handlers
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"net/http"
@ -100,9 +101,14 @@ func TestRegisterHandler(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode preflight — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-123").
WillReturnError(sql.ErrNoRows)
// Expect the upsert INSERT ... ON CONFLICT
mock.ExpectExec("INSERT INTO workspaces").
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`).
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect the SELECT url query (for cache URL logic)
@ -290,8 +296,9 @@ func TestWorkspaceCreate(t *testing.T) {
// Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace).
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
// delivery_mode defaults to "push" when payload omits it (#2339).
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect transaction commit (no secrets in this payload)

View File

@ -2,6 +2,7 @@ package handlers
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
@ -116,6 +117,41 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) {
// returned IP is checked against the blocklist. This closes the gap where
// an attacker could register agent.example.com pointing to 169.254.169.254.
//
// resolveDeliveryMode returns the EFFECTIVE delivery mode for a register
// call given the payload's explicit value (which may be empty) and the
// row's existing stored value (which may not exist yet on first
// registration).
//
// Resolution order:
// 1. payload value if non-empty (caller validated it's push/poll already)
// 2. existing row's delivery_mode if the row exists
// 3. "push" (the schema default — safe fallback for both new rows and
// a row whose delivery_mode is somehow NULL despite the NOT NULL
// CHECK constraint, which is forward-defensive only)
//
// Returns ("", err) only on a real DB error; sql.ErrNoRows is treated
// as "no row yet, default to push" — that's the first-register flow.
func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, payloadMode string) (string, error) {
if payloadMode != "" {
// Validated by IsValidDeliveryMode in the caller.
return payloadMode, nil
}
var existing sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&existing)
if errors.Is(err, sql.ErrNoRows) {
return models.DeliveryModePush, nil
}
if err != nil {
return "", err
}
if existing.Valid && existing.String != "" {
return existing.String, nil
}
return models.DeliveryModePush, nil
}
// Returns a non-nil error suitable for including in a 400 Bad Request response.
func validateAgentURL(rawURL string) error {
if rawURL == "" {
@ -221,15 +257,11 @@ func (h *RegistryHandler) Register(c *gin.Context) {
return
}
// C6: reject SSRF-capable URLs before persisting or caching them.
if err := validateAgentURL(payload.URL); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
// C6: reject SSRF-capable URLs before persisting or caching them.
if err := validateAgentURL(payload.URL); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// Validate explicit delivery_mode if the agent declared one; empty is
// allowed and resolves to the row's existing value (or "push" default)
// in the upsert below. See #2339 for the poll/push split rationale.
if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) {
c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"})
return
}
@ -250,9 +282,60 @@ func (h *RegistryHandler) Register(c *gin.Context) {
return // 401 response already written by requireWorkspaceToken
}
// Resolve the EFFECTIVE delivery mode for THIS register call: the
// payload's explicit value wins; falling back to the existing row's
// stored value; falling back to push (the schema default). Done AFTER
// the C18 token check so a hijack attempt fails on auth before we
// reveal whether a workspace row exists at all (resolveDeliveryMode
// would otherwise side-channel that via timing). #2339.
effectiveMode, err := h.resolveDeliveryMode(ctx, payload.ID, payload.DeliveryMode)
if err != nil {
log.Printf("Registry register: resolveDeliveryMode failed for %s: %v", payload.ID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"})
return
}
// URL handling diverges by mode:
// push: URL is required and must pass the SSRF safety check —
// same as pre-#2339 behavior (the workspace must be reachable for
// the proxy to dispatch).
// poll: URL is optional and ignored when present. We don't even
// validate it because the platform never dispatches to it. Skipping
// validateAgentURL is intentional — a poll-mode workspace doesn't
// need a publicly-routable URL, so a localhost / private IP /
// missing URL is correct, not a mis-configuration.
if effectiveMode == models.DeliveryModePush {
if payload.URL == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"})
return
}
if err := validateAgentURL(payload.URL); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
}
agentCardStr := string(payload.AgentCard)
// Upsert workspace: update url, agent_card, status if already exists.
// urlForUpsert: poll-mode workspaces don't need a URL. Empty input
// becomes NULL via sql.NullString so the row's URL stays clean (the
// CASE below also preserves an existing provisioner-set URL, which
// matters for hybrid setups where a workspace was previously push
// and is being re-registered as poll).
var urlForUpsert sql.NullString
if payload.URL != "" {
urlForUpsert = sql.NullString{String: payload.URL, Valid: true}
}
// modeForUpsert: empty payload value means "keep what's already on the
// row, or default to push for new rows". The COALESCE in the CASE on
// the UPDATE branch and the EXCLUDED.delivery_mode on the INSERT branch
// implement that. We pass effectiveMode (already resolved above) so
// the row's mode is consistent with the URL-validation decision we
// just made.
modeForUpsert := effectiveMode
// Upsert workspace: update url, agent_card, status, delivery_mode if already exists.
// On INSERT (workspace not yet created via POST /workspaces), use ID as name placeholder.
// Keep existing URL if provisioner already set a host-accessible one (starts with http://127.0.0.1).
//
@ -261,9 +344,9 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// the row. Without this guard, bulk deletes left tier-3 stragglers because
// the last pre-teardown heartbeat flipped status back to 'online' after
// Delete's UPDATE.
_, err := db.DB.ExecContext(ctx, `
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at)
VALUES ($1, $2, $3, $4::jsonb, 'online', now())
_, err = db.DB.ExecContext(ctx, `
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode)
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5)
ON CONFLICT (id) DO UPDATE SET
url = CASE
WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url
@ -272,9 +355,10 @@ func (h *RegistryHandler) Register(c *gin.Context) {
agent_card = EXCLUDED.agent_card,
status = 'online',
last_heartbeat_at = now(),
delivery_mode = EXCLUDED.delivery_mode,
updated_at = now()
WHERE workspaces.status IS DISTINCT FROM 'removed'
`, payload.ID, payload.ID, payload.URL, agentCardStr)
`, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert)
if err != nil {
log.Printf("Registry register error: %v (id=%s)", err, payload.ID)
c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"})
@ -289,6 +373,12 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// Cache URL — prefer existing provisioner URL over agent-reported one.
// The DB CASE already preserves provisioner URLs, so read from DB as source of truth
// instead of adding a Redis round-trip on every registration.
//
// Poll-mode workspaces typically have no URL at all; skip the cache
// writes entirely in that case so we don't poison the cache with an
// empty string that another caller might mistake for "registered with
// no URL" vs "not yet registered". The proxy short-circuits poll-mode
// before consulting the URL cache anyway (see #2339 PR 2).
cachedURL := payload.URL
var dbURL string
if err := db.DB.QueryRowContext(ctx, `SELECT url FROM workspaces WHERE id = $1`, payload.ID).Scan(&dbURL); err == nil {
@ -296,20 +386,26 @@ func (h *RegistryHandler) Register(c *gin.Context) {
cachedURL = dbURL
}
}
if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil {
log.Printf("Registry cache url error: %v", err)
if cachedURL != "" {
if err := db.CacheURL(ctx, payload.ID, cachedURL); err != nil {
log.Printf("Registry cache url error: %v", err)
}
}
// Cache agent-reported URL separately for workspace-to-workspace discovery
// (Docker containers can reach each other by hostname but not via host ports)
if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil {
log.Printf("Registry cache internal url error: %v", err)
// (Docker containers can reach each other by hostname but not via host ports).
// Same skip-when-empty rule as above.
if payload.URL != "" {
if err := db.CacheInternalURL(ctx, payload.ID, payload.URL); err != nil {
log.Printf("Registry cache internal url error: %v", err)
}
}
// Broadcast WORKSPACE_ONLINE
if err := h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.ID, map[string]interface{}{
"url": cachedURL,
"agent_card": payload.AgentCard,
"url": cachedURL,
"agent_card": payload.AgentCard,
"delivery_mode": effectiveMode,
}); err != nil {
log.Printf("Registry broadcast error: %v", err)
}
@ -324,7 +420,7 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// Legacy workspaces that registered before tokens existed have no
// live token; they bootstrap one here on their next register call.
// New workspaces always pass through this path on their first boot.
response := gin.H{"status": "registered"}
response := gin.H{"status": "registered", "delivery_mode": effectiveMode}
if hasLive, hasLiveErr := wsauth.HasAnyLiveToken(ctx, db.DB, payload.ID); hasLiveErr == nil && !hasLive {
token, tokErr := wsauth.IssueToken(ctx, db.DB, payload.ID)
if tokErr != nil {

View File

@ -61,9 +61,17 @@ func TestRegister_DBError(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode SELECT — no row yet, so default "push".
// (#2339) New preflight after C18 token check; HasAnyLiveToken's COUNT
// query has no mock here and fails-open per requireWorkspaceToken's
// DB-error handling, so the next DB hit is this delivery_mode lookup.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-fail").
WillReturnError(sql.ErrNoRows)
// DB insert fails
mock.ExpectExec("INSERT INTO workspaces").
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`).
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push").
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
@ -579,10 +587,14 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode preflight — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-resurrect").
WillReturnError(sql.ErrNoRows)
// This regex-ish match requires the guard. If the handler ever drops
// the clause the test fails because the emitted SQL won't match.
mock.ExpectExec("ON CONFLICT.*WHERE workspaces.status IS DISTINCT FROM 'removed'").
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`).
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push").
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows affected = correctly guarded
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs("ws-resurrect").
@ -843,9 +855,14 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) {
WithArgs("ws-new").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-new").
WillReturnError(sql.ErrNoRows)
// Workspace upsert proceeds normally.
mock.ExpectExec("INSERT INTO workspaces").
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`).
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
@ -910,6 +927,11 @@ func TestRegister_ReturnsPlatformInboundSecret_RFC2312_PRF(t *testing.T) {
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
// Workspace upsert.
mock.ExpectExec("INSERT INTO workspaces").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -980,6 +1002,10 @@ func TestRegister_NoInboundSecret_OmitsField(t *testing.T) {
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO workspaces").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
@ -1063,9 +1089,14 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) {
WithArgs("ws-errtest").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-errtest").
WillReturnError(sql.ErrNoRows)
// DB upsert fails with a descriptive internal error.
mock.ExpectExec("INSERT INTO workspaces").
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`).
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push").
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
@ -1283,3 +1314,211 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) {
t.Errorf("monthly_spend=0 must not trigger a DB write for spend: %v", err)
}
}
// ==================== Register — delivery_mode (#2339) ====================
// TestRegister_PollMode_AcceptsEmptyURL verifies the new contract:
// when delivery_mode=poll, URL is optional. A poll-mode workspace
// (e.g. operator's laptop running molecule-mcp-claude-channel) has
// no public URL to register, and we must NOT reject the registration
// for that. The proxy short-circuits poll-mode A2A in PR 2 — no URL
// needed there either.
func TestRegister_PollMode_AcceptsEmptyURL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const wsID = "ws-poll-no-url"
// Bootstrap path — no live tokens, so requireWorkspaceToken passes
// without an Authorization header.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode: payload sets "poll" explicitly, so we should
// NOT hit the DB lookup at all (the helper short-circuits when
// payload value is non-empty). Asserted by the absence of an
// ExpectQuery for SELECT delivery_mode here.
// Upsert MUST run with empty URL (sql.NullString) and delivery_mode=poll.
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll").
WillReturnResult(sqlmock.NewResult(0, 1))
// SELECT url for cache: returns NULL/empty for poll-mode rows. The
// handler skips the cache writes in that case (no CacheURL /
// CacheInternalURL expectations).
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
// Token issuance — first-register path.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"`+wsID+`","delivery_mode":"poll","agent_card":{"name":"poll-agent"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("poll-mode + empty URL: expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp["delivery_mode"] != "poll" {
t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "poll")
}
// First-register must still mint a token regardless of delivery_mode.
if resp["auth_token"] == nil {
t.Error("expected auth_token in response (first-register path)")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_PushMode_RejectsEmptyURL verifies the symmetric contract:
// push-mode (the default) still requires a URL. Skipping URL validation
// in poll-mode mustn't accidentally relax the push-mode invariant — that
// would silently break dispatch for the rest of the fleet.
func TestRegister_PushMode_RejectsEmptyURL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// Bootstrap path through requireWorkspaceToken.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs("ws-push-no-url").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode: no row yet, defaults to push. The handler
// then validates the URL — which is empty — and returns 400.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs("ws-push-no-url").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"ws-push-no-url","agent_card":{"name":"push-agent"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusBadRequest {
t.Errorf("push-mode + empty URL: expected 400, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "url is required") {
t.Errorf("expected 'url is required' in error body, got: %s", w.Body.String())
}
}
// TestRegister_InvalidDeliveryMode rejects payloads that declare an
// unrecognised delivery_mode — defends against a typo silently
// becoming "push" and leaving the operator wondering why polling
// doesn't work.
func TestRegister_InvalidDeliveryMode(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"ws-x","url":"http://localhost:8000","agent_card":{"name":"a"},"delivery_mode":"webhook"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusBadRequest {
t.Errorf("invalid delivery_mode: expected 400, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "delivery_mode") {
t.Errorf("expected error body to mention delivery_mode, got: %s", w.Body.String())
}
}
// TestRegister_PollMode_PreservesExistingValue: when the row already
// has delivery_mode=poll and the payload doesn't set it, the resolved
// mode should be poll — i.e. "absent payload mode" must NOT silently
// downgrade an existing poll workspace to push. Ensures Telegram-style
// stability: mode is sticky once set.
func TestRegister_PollMode_PreservesExistingValue(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const wsID = "ws-existing-poll"
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode: row exists with delivery_mode=poll.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
// Upsert carries the resolved poll mode forward — even though
// payload didn't restate it. URL still empty (poll-mode shape).
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
// No delivery_mode in payload — must inherit "poll" from the row.
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["delivery_mode"] != "poll" {
t.Errorf("delivery_mode = %v, want %q (must inherit existing row's mode when payload absent)",
resp["delivery_mode"], "poll")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}

View File

@ -224,11 +224,24 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
if maxConcurrent <= 0 {
maxConcurrent = models.DefaultMaxConcurrentTasks
}
// Insert workspace with runtime persisted in DB (inside transaction)
// delivery_mode: explicit payload value (validated below), else default
// to push (the schema default + pre-#2339 behavior). Validated here, not
// in workspace_provision.go, so a bad value fails the create cleanly
// instead of mid-provision after side effects.
deliveryMode := payload.DeliveryMode
if deliveryMode == "" {
deliveryMode = models.DeliveryModePush
}
if !models.IsValidDeliveryMode(deliveryMode) {
tx.Rollback() //nolint:errcheck
c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"})
return
}
// Insert workspace with runtime + delivery_mode persisted in DB (inside transaction)
_, err := tx.ExecContext(ctx, `
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks)
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11)
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent)
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode)
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12)
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode)
if err != nil {
tx.Rollback() //nolint:errcheck
log.Printf("Create workspace error: %v", err)

View File

@ -152,6 +152,7 @@ func TestWorkspaceBudget_Create_WithLimit(t *testing.T) {
"none", // workspace_access
&budgetVal, // budget_limit ($10)
models.DefaultMaxConcurrentTasks, // max_concurrent_tasks default
"push", // delivery_mode default (#2339)
).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

View File

@ -155,7 +155,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) {
// Transaction begins, workspace INSERT fails, transaction is rolled back.
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnError(sql.ErrConnDone)
mock.ExpectRollback()
@ -188,7 +188,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
// Expect workspace INSERT with defaulted tier=3 (Privileged — the
// handler default in workspace.go), runtime="langgraph"
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
@ -239,7 +239,7 @@ func TestWorkspaceCreate_WithSecrets_Persists(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
// Secret inserted inside the same transaction.
mock.ExpectExec("INSERT INTO workspace_secrets").
@ -1258,7 +1258,7 @@ runtime_config:
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(
sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes",
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -1315,7 +1315,7 @@ model: anthropic:claude-sonnet-4-5
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(
sqlmock.AnyArg(), "Legacy Agent", nil, 3, "langgraph",
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -1368,7 +1368,7 @@ runtime_config:
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(
sqlmock.AnyArg(), "Custom Hermes", nil, 3, "hermes",
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks).
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").

View File

@ -32,16 +32,42 @@ type Workspace struct {
UptimeSeconds int `json:"uptime_seconds" db:"uptime_seconds"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
// DeliveryMode: "push" (synchronous to URL — default) or "poll" (logged
// to activity_logs, agent reads via GET /activity?since_id=). See
// migration 045 + RFC #2339.
DeliveryMode string `json:"delivery_mode" db:"delivery_mode"`
// Canvas layout fields (from JOIN)
X float64 `json:"x"`
Y float64 `json:"y"`
Collapsed bool `json:"collapsed"`
}
// Delivery mode constants. Matches the CHECK constraint in migration 045.
const (
DeliveryModePush = "push"
DeliveryModePoll = "poll"
)
// IsValidDeliveryMode reports whether s is one of the recognised
// delivery modes. Empty string is NOT valid here — callers must
// resolve the default ("push") before calling.
func IsValidDeliveryMode(s string) bool {
return s == DeliveryModePush || s == DeliveryModePoll
}
type RegisterPayload struct {
ID string `json:"id" binding:"required"`
URL string `json:"url" binding:"required"`
AgentCard json.RawMessage `json:"agent_card" binding:"required"`
ID string `json:"id" binding:"required"`
// URL is required for push-mode workspaces; optional / unused for
// poll-mode (the platform never dispatches to it). The handler
// enforces the conditional requirement based on the resolved
// delivery mode (payload value, falling back to the row's existing
// value, falling back to "push").
URL string `json:"url"`
AgentCard json.RawMessage `json:"agent_card" binding:"required"`
// DeliveryMode is optional. Empty string means "keep the existing
// value on the workspace row, or default to push for new rows".
// When set, must be one of DeliveryModePush / DeliveryModePoll.
DeliveryMode string `json:"delivery_mode,omitempty"`
}
type HeartbeatPayload struct {
@ -127,7 +153,11 @@ type CreateWorkspacePayload struct {
Model string `json:"model"`
Runtime string `json:"runtime"` // "langgraph" (default), "claude-code", etc.
External bool `json:"external"` // true = no Docker container, just a registered URL
URL string `json:"url"` // for external workspaces: the A2A endpoint URL
URL string `json:"url"` // for external workspaces: the A2A endpoint URL (push mode only — omit for poll)
// DeliveryMode: "push" (default) sends inbound A2A to URL synchronously;
// "poll" records inbound to activity_logs for the agent to consume via
// GET /activity?since_id=. Poll mode does not require a URL. See #2339.
DeliveryMode string `json:"delivery_mode,omitempty"`
WorkspaceDir string `json:"workspace_dir"` // host path to mount as /workspace (empty = isolated volume)
WorkspaceAccess string `json:"workspace_access"` // "none" (default), "read_only", or "read_write" — see #65
ParentID *string `json:"parent_id"`

View File

@ -0,0 +1,8 @@
-- 045_workspaces_delivery_mode.down.sql
--
-- Drops the delivery_mode column. Any code reading it after rollback falls
-- back to push mode (the pre-#2339 behavior), so this is forward-only-safe
-- only if the matching application code is rolled back in the same release.
ALTER TABLE workspaces
DROP COLUMN IF EXISTS delivery_mode;

View File

@ -0,0 +1,54 @@
-- 045_workspaces_delivery_mode.up.sql
--
-- Per-workspace declaration of how A2A traffic is delivered TO the workspace.
--
-- push (default, today's behavior)
-- Platform synchronously POSTs to workspaces.url and surfaces the response
-- to the caller. Requires a publicly-routable URL (SSRF gate at
-- a2a_proxy.go:455). Used by all hosted runtimes (claude-code, hermes,
-- etc.) where the platform's provisioner sets the URL at boot.
--
-- poll
-- Platform records the inbound A2A as an a2a_receive activity row and
-- returns 200 to the caller without dispatching. The agent client (e.g.
-- molecule-mcp-claude-channel) consumes the inbox via
-- GET /workspaces/:id/activity?since_id=… and replies via
-- POST /workspaces/:peer/a2a. NO URL required — works through every NAT,
-- firewall, and dev-laptop without a tunnel.
--
-- Why a column and not a derived signal:
--
-- * Mutual exclusivity matches Telegram's getUpdates / setWebhook
-- semantics — operationally cleaner than "both half-work because URL
-- is empty". Telegram explicitly rejects double-delivery; we now do
-- the same.
-- * The platform short-circuits BEFORE the SSRF check, so a poll-mode
-- workspace with a stale or missing URL never trips the silent-404
-- failure mode that motivated #2339.
-- * Push-mode is the safe default: every existing workspace continues
-- to work exactly as before with no migration of behavior.
--
-- Backwards compatibility:
--
-- * NOT NULL with DEFAULT 'push' — the ALTER backfills existing rows.
-- * Push-mode workspaces are unchanged: SSRF check still gates dispatch,
-- activity logging unchanged.
-- * Poll-mode opt-in only via POST /workspaces (delivery_mode='poll')
-- or POST /registry/register with delivery_mode='poll'. Cannot be
-- toggled after the fact via heartbeat — flipping mode mid-life is
-- ambiguous (in-flight pushes vs queued polls), so an explicit
-- PATCH /workspaces/:id/delivery_mode endpoint will be added later
-- if the use case appears.
--
-- Reverse plan: the .down.sql drops the column. Any short-circuit code
-- that reads delivery_mode would then hit a "column does not exist"
-- error — readers fall back to push mode (behaviour pre-2339), which is
-- the safe degradation. Acceptable for a forward-only schema; the down
-- exists for migration tooling parity, not as a recommended runtime path.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS delivery_mode TEXT NOT NULL DEFAULT 'push'
CHECK (delivery_mode IN ('push', 'poll'));
COMMENT ON COLUMN workspaces.delivery_mode IS
'How inbound A2A is delivered: push (synchronous to workspaces.url) or poll (logged to activity_logs, agent reads via GET /activity?since_id=). See migration 045 + RFC #2339.';