Eliminate raw 'awaiting_agent'/'hibernating'/'failed'/etc string literals from production status writes. Adds models.WorkspaceStatus typed alias and models.AllWorkspaceStatuses canonical slice; every UPDATE workspaces SET status = ... now passes a parameterized $N typed value rather than a hard-coded SQL literal. Defense-in-depth follow-up to migration 046 (#2388): the Postgres enum type was missing 'awaiting_agent' + 'hibernating' for ~5 days because sqlmock regex matching cannot enforce live enum constraints. The drift gate is now a proper Go AST + SQL parser (no regex), asserting the codebase ⊆ migration enum and every const appears in the canonical slice. With status as a parameterized typed value, future enum mismatches fail at the SQL layer in tests, not silently in prod. Test coverage: full suite passes with -race; drift gate green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
79 lines
2.5 KiB
Go
79 lines
2.5 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"strings"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
|
)
|
|
|
|
// OfflineHandler is called when a workspace's liveness key expires.
|
|
type OfflineHandler func(ctx context.Context, workspaceID string)
|
|
|
|
// StartLivenessMonitor subscribes to Redis keyspace expiry events.
|
|
// When a workspace's liveness key (ws:{id}) expires, it marks the workspace offline
|
|
// and calls the onOffline handler.
|
|
func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) {
|
|
sub := db.RDB.PSubscribe(ctx, "__keyevent@0__:expired")
|
|
|
|
log.Println("Liveness monitor started — listening for Redis key expirations")
|
|
|
|
ch := sub.Channel()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
sub.Close()
|
|
return
|
|
case msg := <-ch:
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
key := msg.Payload
|
|
if !strings.HasPrefix(key, "ws:") {
|
|
continue
|
|
}
|
|
parts := strings.SplitN(key, ":", 3)
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
workspaceID := parts[1]
|
|
|
|
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
|
|
|
|
// Status target depends on runtime:
|
|
// external → 'awaiting_agent' (re-registrable via
|
|
// /registry/register; `molecule connect` brings it
|
|
// back online on next invocation — typical case is
|
|
// the operator closed their laptop overnight).
|
|
// non-external → 'offline' (terminal-feeling status
|
|
// consistent with Docker/CP-managed runtimes whose
|
|
// recovery path is restart, not re-register).
|
|
//
|
|
// The conditional flip is done in a single UPDATE so the
|
|
// non-external case stays cheap (no extra round-trip)
|
|
// and there's no TOCTOU between the runtime read and the
|
|
// status write.
|
|
// CASE branches use placeholders so the typed constants drive
|
|
// the values — keeps the atomicity of the single UPDATE while
|
|
// pinning external→awaiting_agent and other→offline at compile
|
|
// time. $2 = external arm, $3 = non-external arm.
|
|
_, err := db.DB.ExecContext(ctx, `
|
|
UPDATE workspaces
|
|
SET status = CASE WHEN runtime = 'external' THEN $2 ELSE $3 END,
|
|
updated_at = now()
|
|
WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')
|
|
`, workspaceID, models.StatusAwaitingAgent, models.StatusOffline)
|
|
if err != nil {
|
|
log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err)
|
|
continue
|
|
}
|
|
|
|
if onOffline != nil {
|
|
onOffline(ctx, workspaceID)
|
|
}
|
|
}
|
|
}
|
|
}
|