molecule-cli/internal/connect/state.go
Hongming Wang db6b196631 feat(connect): M1.2 — heartbeat + activity poll loops
Implements the runtime side of `molecule connect <id>`. After this PR
the CLI can actually attach to an external workspace and round-trip
inter-agent messages through any registered backend.

What's in:

- `internal/connect/client.go` — platform-API client with bearer auth.
  Endpoints: POST /registry/register (delivery_mode=poll, no URL),
  POST /registry/heartbeat, GET /workspaces/:id/activity?type=a2a_receive,
  POST /workspaces/:id/a2a (reply target).
  Errors split into TransientError (network/5xx — retry with backoff)
  and PermanentError (4xx — abort with clear message).

- `internal/connect/state.go` — atomic cursor persistence at
  ~/.config/molecule/state/<workspace-id>.json. Mode 0o600 (owner-only)
  from day 1 because future state additions may include rotated tokens.
  Atomic write-then-rename so a crash mid-write can never produce a
  half-written cursor.

- `internal/connect/connect.go` — Run() orchestrator. Wires register-
  with-bounded-retry, then heartbeat goroutine + poll goroutine.
  Both respect ctx cancellation for clean SIGTERM.

  Robustness contract per RFC #10:
    * Cursor advances AFTER successful dispatch — crash mid-batch
      re-delivers, never drops.
    * 410 on cursor lookup → reset to "" and re-fetch (don't deadlock
      on a pruned cursor).
    * Heartbeat permanent error stops the heartbeat loop only; poll
      loop keeps running so the operator sees "stopped" + reason in
      logs and can SIGTERM.
    * Backend dispatch is sequential within a batch (avoids out-of-
      order replies for in-flight conversations).
    * Inter-agent reply path: POST envelope to /workspaces/<source>/a2a.
    * Canvas-origin reply (source_id == nil) logs + skips for now —
      M1.3 wires that via the task_update activity convention.

- `internal/cmd/connect.go` — runConnect now actually calls
  connect.Run() (was a placeholder ctx-wait in M1.1).

Test plan:

- httptest workspace-server stub covers register / heartbeat / activity
  / a2a reply endpoints.
- TestRun_RoundTrip_AgentReply: end-to-end ping → mock backend → pong
  reply lands at source, cursor saved.
- TestRun_CanvasOriginMessageNotReplied: source_id=nil → backend fires
  but no reply post; cursor still advances.
- TestRun_CursorPruned410ResetsAndContinues: server returns 410 once,
  cursor resets to "", next poll dispatches the fresh row.
- TestRun_PermanentRegisterErrorAborts: 401 surfaces immediately.
- TestRun_TransientRegisterErrorRetries: 503 then 200 → register
  succeeds on second attempt.
- TestRun_OptionsValidation: missing Backend / WorkspaceID surface
  before any I/O.
- State: round-trip, file mode 0o600, atomic-rename leaves no .tmp
  artifacts, corrupted file surfaces error.
- All tests green under -race.

Out of scope (next PRs in this stack):

- M1.3: claude-code backend (canvas-origin reply convention rides
  with this)
- M1.4: GoReleaser tag-triggered release.yml workflow
- Push-mode (--mode push currently surfaces a clear "M4" error)

RFC: https://github.com/Molecule-AI/molecule-cli/issues/10

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 03:15:51 -07:00

90 lines
2.8 KiB
Go

package connect
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
)
// State is the persisted per-workspace state for crash-resume. Currently
// just the activity cursor; future keys (auth token rotation, last
// successful heartbeat) get appended without a schema bump because
// json.Decode tolerates unknown / missing fields.
type State struct {
WorkspaceID string `json:"workspace_id"`
LastSinceID string `json:"last_since_id,omitempty"`
}
// StatePath returns the on-disk path for workspaceID's state file.
// dir is the directory root (typically `~/.config/molecule/state`); the
// caller resolves it via DefaultStateDir() unless the user passed a
// custom one.
func StatePath(dir, workspaceID string) string {
return filepath.Join(dir, workspaceID+".json")
}
// DefaultStateDir returns ~/.config/molecule/state, creating the
// hierarchy if missing. Returns the path even on mkdir error so callers
// can surface a meaningful "could not write state" message — the loops
// run regardless; persistence is best-effort.
func DefaultStateDir() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("home dir: %w", err)
}
dir := filepath.Join(home, ".config", "molecule", "state")
if err := os.MkdirAll(dir, 0o700); err != nil {
return dir, fmt.Errorf("mkdir %s: %w", dir, err)
}
return dir, nil
}
// LoadState reads workspaceID's state from dir. Returns a zero-value
// State (no error) when the file is missing — first run is the same
// as "fresh state". Other errors (parse failure, permission denied)
// surface so the user knows their state is corrupt.
func LoadState(dir, workspaceID string) (State, error) {
path := StatePath(dir, workspaceID)
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return State{WorkspaceID: workspaceID}, nil
}
return State{}, fmt.Errorf("read %s: %w", path, err)
}
var s State
if err := json.Unmarshal(data, &s); err != nil {
return State{}, fmt.Errorf("parse %s: %w", path, err)
}
if s.WorkspaceID == "" {
s.WorkspaceID = workspaceID
}
return s, nil
}
// SaveState writes workspaceID's state atomically (write to .tmp, rename
// over). The rename is atomic on POSIX so a crash mid-write can never
// produce a half-written cursor file.
func SaveState(dir string, s State) error {
if s.WorkspaceID == "" {
return fmt.Errorf("save state: workspace_id is required")
}
if err := os.MkdirAll(dir, 0o700); err != nil {
return fmt.Errorf("mkdir %s: %w", dir, err)
}
path := StatePath(dir, s.WorkspaceID)
tmp := path + ".tmp"
data, err := json.MarshalIndent(s, "", " ")
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if err := os.WriteFile(tmp, data, 0o600); err != nil {
return fmt.Errorf("write %s: %w", tmp, err)
}
if err := os.Rename(tmp, path); err != nil {
return fmt.Errorf("rename %s -> %s: %w", tmp, path, err)
}
return nil
}