8019231a16
ci-arm64-advisory / fast-checks (push) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Successful in 8s
Block internal-flavored paths / Block forbidden paths (push) Successful in 8s
CI / Detect changes (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 9s
E2E Chat / detect-changes (push) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (push) Successful in 49s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 12s
publish-workspace-server-image / build-and-push (push) Successful in 3m12s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 39s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (push) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 14s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m25s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Successful in 5m19s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m30s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Successful in 6m5s
E2E Chat / E2E Chat (push) Successful in 4m6s
CI / Platform (Go) (push) Successful in 5m0s
CI / all-required (push) Successful in 9m45s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
publish-workspace-server-image / Production auto-deploy (push) Successful in 8m32s
Harness Replays / Harness Replays (push) Successful in 12s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m37s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 12s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m9s
main-red-watchdog / watchdog (push) Successful in 32s
gate-check-v3 / gate-check (push) Successful in 25s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 6m10s
CTO-bypass merge 2026-05-24: #1760 Go module rename to git.moleculesai.app path
79 lines
2.5 KiB
Go
79 lines
2.5 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"strings"
|
|
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
|
)
|
|
|
|
// OfflineHandler is called when a workspace's liveness key expires.
|
|
type OfflineHandler func(ctx context.Context, workspaceID string)
|
|
|
|
// StartLivenessMonitor subscribes to Redis keyspace expiry events.
|
|
// When a workspace's liveness key (ws:{id}) expires, it marks the workspace offline
|
|
// and calls the onOffline handler.
|
|
func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) {
|
|
sub := db.RDB.PSubscribe(ctx, "__keyevent@0__:expired")
|
|
|
|
log.Println("Liveness monitor started — listening for Redis key expirations")
|
|
|
|
ch := sub.Channel()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
sub.Close()
|
|
return
|
|
case msg := <-ch:
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
key := msg.Payload
|
|
if !strings.HasPrefix(key, "ws:") {
|
|
continue
|
|
}
|
|
parts := strings.SplitN(key, ":", 3)
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
workspaceID := parts[1]
|
|
|
|
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
|
|
|
|
// Status target depends on runtime:
|
|
// external → 'awaiting_agent' (re-registrable via
|
|
// /registry/register; `molecule connect` brings it
|
|
// back online on next invocation — typical case is
|
|
// the operator closed their laptop overnight).
|
|
// non-external → 'offline' (terminal-feeling status
|
|
// consistent with Docker/CP-managed runtimes whose
|
|
// recovery path is restart, not re-register).
|
|
//
|
|
// The conditional flip is done in a single UPDATE so the
|
|
// non-external case stays cheap (no extra round-trip)
|
|
// and there's no TOCTOU between the runtime read and the
|
|
// status write.
|
|
// CASE branches use placeholders so the typed constants drive
|
|
// the values — keeps the atomicity of the single UPDATE while
|
|
// pinning external→awaiting_agent and other→offline at compile
|
|
// time. $2 = external arm, $3 = non-external arm.
|
|
_, err := db.DB.ExecContext(ctx, `
|
|
UPDATE workspaces
|
|
SET status = CASE WHEN runtime = 'external' THEN $2 ELSE $3 END,
|
|
updated_at = now()
|
|
WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')
|
|
`, workspaceID, models.StatusAwaitingAgent, models.StatusOffline)
|
|
if err != nil {
|
|
log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err)
|
|
continue
|
|
}
|
|
|
|
if onOffline != nil {
|
|
onOffline(ctx, workspaceID)
|
|
}
|
|
}
|
|
}
|
|
}
|