Merge pull request #2382 from Molecule-AI/auto/external-defaults-poll-and-awaiting

feat(external): default external runtime to poll-mode + awaiting_agent
This commit is contained in:
Hongming Wang 2026-04-30 14:43:03 +00:00 committed by GitHub
commit 7cb8b476ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 196 additions and 30 deletions

View File

@ -323,7 +323,7 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) {
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode preflight — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-prov").
WillReturnError(sql.ErrNoRows)

View File

@ -102,7 +102,7 @@ func TestRegisterHandler(t *testing.T) {
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode preflight — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-123").
WillReturnError(sql.ErrNoRows)

View File

@ -125,21 +125,30 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) {
// Resolution order:
// 1. payload value if non-empty (caller validated it's push/poll already)
// 2. existing row's delivery_mode if the row exists
// 3. "push" (the schema default — safe fallback for both new rows and
// a row whose delivery_mode is somehow NULL despite the NOT NULL
// CHECK constraint, which is forward-defensive only)
// 3. "poll" if the existing row's runtime is "external" — most external
// operators run on a laptop without public HTTPS; poll is the
// no-public-URL path. This default flipped 2026-04-30 (issue #10
// in molecule-cli) when `molecule connect` shipped — push-mode
// stays available via explicit payload.delivery_mode="push" for
// VM/server operators who opt in.
// 4. "push" (the schema default — safe fallback for non-external
// runtimes whose row exists with NULL delivery_mode, which is
// forward-defensive only)
//
// Returns ("", err) only on a real DB error; sql.ErrNoRows is treated
// as "no row yet, default to push" — that's the first-register flow.
// as "no row yet, default to push" — that's the first-register flow,
// and at that point we don't know the runtime yet so push is the
// historical compatible default.
func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, payloadMode string) (string, error) {
if payloadMode != "" {
// Validated by IsValidDeliveryMode in the caller.
return payloadMode, nil
}
var existing sql.NullString
var runtime sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&existing)
`SELECT delivery_mode, runtime FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&existing, &runtime)
if errors.Is(err, sql.ErrNoRows) {
return models.DeliveryModePush, nil
}
@ -149,6 +158,9 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
if existing.Valid && existing.String != "" {
return existing.String, nil
}
if runtime.Valid && runtime.String == "external" {
return models.DeliveryModePoll, nil
}
return models.DeliveryModePush, nil
}

View File

@ -65,7 +65,7 @@ func TestRegister_DBError(t *testing.T) {
// (#2339) New preflight after C18 token check; HasAnyLiveToken's COUNT
// query has no mock here and fails-open per requireWorkspaceToken's
// DB-error handling, so the next DB hit is this delivery_mode lookup.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-fail").
WillReturnError(sql.ErrNoRows)
@ -588,7 +588,7 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) {
handler := NewRegistryHandler(broadcaster)
// resolveDeliveryMode preflight — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-resurrect").
WillReturnError(sql.ErrNoRows)
// This regex-ish match requires the guard. If the handler ever drops
@ -856,7 +856,7 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-new").
WillReturnError(sql.ErrNoRows)
@ -928,7 +928,7 @@ func TestRegister_ReturnsPlatformInboundSecret_RFC2312_PRF(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
@ -1014,7 +1014,7 @@ func TestRegister_NoInboundSecret_LazyHeals(t *testing.T) {
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO workspaces").WillReturnResult(sqlmock.NewResult(0, 1))
@ -1077,7 +1077,7 @@ func TestRegister_NoInboundSecret_LazyHealMintFailureOmitsField(t *testing.T) {
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO workspaces").WillReturnResult(sqlmock.NewResult(0, 1))
@ -1167,7 +1167,7 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode — no row yet, default push (#2339).
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-errtest").
WillReturnError(sql.ErrNoRows)
@ -1490,7 +1490,7 @@ func TestRegister_PushMode_RejectsEmptyURL(t *testing.T) {
// resolveDeliveryMode: no row yet, defaults to push. The handler
// then validates the URL — which is empty — and returns 400.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-push-no-url").
WillReturnError(sql.ErrNoRows)
@ -1554,9 +1554,9 @@ func TestRegister_PollMode_PreservesExistingValue(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode: row exists with delivery_mode=poll.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).AddRow("poll", "langgraph"))
// Upsert carries the resolved poll mode forward — even though
// payload didn't restate it. URL still empty (poll-mode shape).
@ -1599,3 +1599,131 @@ func TestRegister_PollMode_PreservesExistingValue(t *testing.T) {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_ExternalRuntime_DefaultsToPoll covers the 2026-04-30
// flip: a workspace with runtime='external' and an empty
// delivery_mode (existing or payload) defaults to poll instead of
// push. Rationale: external workspaces are operator-driven (laptops,
// no public HTTPS) — push-mode would hard-fail at register time
// because validateAgentURL rejects RFC1918 / loopback. The CLI
// (`molecule connect`) registers without --mode and expects this
// default to land it in poll-mode.
func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const wsID = "ws-external-default-poll"
// requireWorkspaceToken: no live tokens yet (first register).
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode: row exists with empty delivery_mode + runtime=external.
// Branch under test: delivery_mode is empty → fall through to runtime
// check → return poll.
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
AddRow(sql.NullString{}, "external"))
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["delivery_mode"] != "poll" {
t.Errorf("delivery_mode = %v, want %q (external runtime + empty mode → poll)",
resp["delivery_mode"], "poll")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_NonExternalRuntime_StillDefaultsToPush guards the
// inverse: a non-external runtime (langgraph, hermes, etc.) with
// empty delivery_mode keeps the historical push default. Catches
// any future "all empty modes default to poll" overshoot.
func TestRegister_NonExternalRuntime_StillDefaultsToPush(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const wsID = "ws-langgraph-default-push"
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
AddRow(sql.NullString{}, "langgraph"))
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, "http://localhost:8000", `{"name":"a"}`, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("http://localhost:8000"))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"`+wsID+`","url":"http://localhost:8000","agent_card":{"name":"a"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["delivery_mode"] != "push" {
t.Errorf("delivery_mode = %v, want %q (non-external runtime keeps push default)",
resp["delivery_mode"], "push")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}

View File

@ -153,13 +153,21 @@ func sweepStaleRemoteWorkspaces(ctx context.Context, onOffline OfflineHandler) {
}
for _, id := range ids {
log.Printf("Health sweep (remote): %s heartbeat stale (>%s) — marking offline", id, staleAfter)
// External workspaces flip to 'awaiting_agent' (re-registrable
// via /registry/register) instead of 'offline' (which was the
// terminal-feeling status used pre-2026-04-30). The CLI's
// `molecule connect` command (RFC #10 in molecule-cli) re-
// registers on each invocation, bringing the workspace back
// online. 'offline' was confusing because it implied "agent
// crashed and needs operator intervention" when often the
// operator simply closed their laptop overnight.
log.Printf("Health sweep (remote): %s heartbeat stale (>%s) — marking awaiting_agent", id, staleAfter)
_, err = db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = 'offline', updated_at = now()
`UPDATE workspaces SET status = 'awaiting_agent', updated_at = now()
WHERE id = $1 AND status NOT IN ('removed', 'provisioning', 'paused')`, id)
if err != nil {
log.Printf("Health sweep (remote): failed to mark %s offline: %v", id, err)
log.Printf("Health sweep (remote): failed to mark %s awaiting_agent: %v", id, err)
continue
}

View File

@ -60,7 +60,8 @@ func TestSweepOnlineWorkspaces_DeadContainer(t *testing.T) {
mock.ExpectQuery("SELECT id FROM workspaces WHERE status IN").
WillReturnRows(rows)
// Mock: update to offline
// Mock: update to offline (Docker sweep keeps 'offline' status —
// 'awaiting_agent' is the external-runtime path).
mock.ExpectExec("UPDATE workspaces SET status = 'offline'").
WithArgs("ws-dead-123").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -155,7 +156,7 @@ func TestStartHealthSweep_NilChecker(t *testing.T) {
// verify the SQL shape, the offline-path side effects, and the
// environment-variable override for the staleness window.
func TestSweepStaleRemoteWorkspaces_MarksStaleOffline(t *testing.T) {
func TestSweepStaleRemoteWorkspaces_MarksStaleAwaitingAgent(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
@ -164,10 +165,10 @@ func TestSweepStaleRemoteWorkspaces_MarksStaleOffline(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-stale-1").
AddRow("ws-stale-2"))
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
mock.ExpectExec(`UPDATE workspaces SET status = 'awaiting_agent'`).
WithArgs("ws-stale-1").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
mock.ExpectExec(`UPDATE workspaces SET status = 'awaiting_agent'`).
WithArgs("ws-stale-2").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -209,7 +210,7 @@ func TestSweepStaleRemoteWorkspaces_NilCallbackNoPanic(t *testing.T) {
mock.ExpectQuery(`FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-x"))
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
mock.ExpectExec(`UPDATE workspaces SET status = 'awaiting_agent'`).
WithArgs("ws-x").
WillReturnResult(sqlmock.NewResult(0, 1))

View File

@ -41,9 +41,23 @@ func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) {
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
// Mark offline in Postgres — skip paused and hibernated workspaces (no active container)
// Status target depends on runtime:
// external → 'awaiting_agent' (re-registrable via
// /registry/register; `molecule connect` brings it
// back online on next invocation — typical case is
// the operator closed their laptop overnight).
// non-external → 'offline' (terminal-feeling status
// consistent with Docker/CP-managed runtimes whose
// recovery path is restart, not re-register).
//
// The conditional flip is done in a single UPDATE so the
// non-external case stays cheap (no extra round-trip)
// and there's no TOCTOU between the runtime read and the
// status write.
_, err := db.DB.ExecContext(ctx, `
UPDATE workspaces SET status = 'offline', updated_at = now()
UPDATE workspaces
SET status = CASE WHEN runtime = 'external' THEN 'awaiting_agent' ELSE 'offline' END,
updated_at = now()
WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')
`, workspaceID)
if err != nil {

View File

@ -80,8 +80,11 @@ func TestStartLivenessMonitor_KeyExpiryTriggersOffline(t *testing.T) {
called <- wsID
}
// Expect the UPDATE when liveness key expires
mock.ExpectExec("UPDATE workspaces SET status = 'offline'").
// Expect the UPDATE when liveness key expires. The status is now
// CASE-expression-driven on runtime: external → 'awaiting_agent',
// other → 'offline'. sqlmock matches on regex so the SET clause
// just needs to mention the conditional.
mock.ExpectExec(`UPDATE workspaces\s+SET status = CASE WHEN runtime = 'external' THEN 'awaiting_agent' ELSE 'offline' END`).
WithArgs("ws-expire-test").
WillReturnResult(sqlmock.NewResult(0, 1))