Implements the runtime side of `molecule connect <id>`. After this PR the CLI can actually attach to an external workspace and round-trip inter-agent messages through any registered backend. What's in: - `internal/connect/client.go` — platform-API client with bearer auth. Endpoints: POST /registry/register (delivery_mode=poll, no URL), POST /registry/heartbeat, GET /workspaces/:id/activity?type=a2a_receive, POST /workspaces/:id/a2a (reply target). Errors split into TransientError (network/5xx — retry with backoff) and PermanentError (4xx — abort with clear message). - `internal/connect/state.go` — atomic cursor persistence at ~/.config/molecule/state/<workspace-id>.json. Mode 0o600 (owner-only) from day 1 because future state additions may include rotated tokens. Atomic write-then-rename so a crash mid-write can never produce a half-written cursor. - `internal/connect/connect.go` — Run() orchestrator. Wires register- with-bounded-retry, then heartbeat goroutine + poll goroutine. Both respect ctx cancellation for clean SIGTERM. Robustness contract per RFC #10: * Cursor advances AFTER successful dispatch — crash mid-batch re-delivers, never drops. * 410 on cursor lookup → reset to "" and re-fetch (don't deadlock on a pruned cursor). * Heartbeat permanent error stops the heartbeat loop only; poll loop keeps running so the operator sees "stopped" + reason in logs and can SIGTERM. * Backend dispatch is sequential within a batch (avoids out-of- order replies for in-flight conversations). * Inter-agent reply path: POST envelope to /workspaces/<source>/a2a. * Canvas-origin reply (source_id == nil) logs + skips for now — M1.3 wires that via the task_update activity convention. - `internal/cmd/connect.go` — runConnect now actually calls connect.Run() (was a placeholder ctx-wait in M1.1). Test plan: - httptest workspace-server stub covers register / heartbeat / activity / a2a reply endpoints. - TestRun_RoundTrip_AgentReply: end-to-end ping → mock backend → pong reply lands at source, cursor saved. - TestRun_CanvasOriginMessageNotReplied: source_id=nil → backend fires but no reply post; cursor still advances. - TestRun_CursorPruned410ResetsAndContinues: server returns 410 once, cursor resets to "", next poll dispatches the fresh row. - TestRun_PermanentRegisterErrorAborts: 401 surfaces immediately. - TestRun_TransientRegisterErrorRetries: 503 then 200 → register succeeds on second attempt. - TestRun_OptionsValidation: missing Backend / WorkspaceID surface before any I/O. - State: round-trip, file mode 0o600, atomic-rename leaves no .tmp artifacts, corrupted file surfaces error. - All tests green under -race. Out of scope (next PRs in this stack): - M1.3: claude-code backend (canvas-origin reply convention rides with this) - M1.4: GoReleaser tag-triggered release.yml workflow - Push-mode (--mode push currently surfaces a clear "M4" error) RFC: https://github.com/Molecule-AI/molecule-cli/issues/10 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
90 lines
2.8 KiB
Go
90 lines
2.8 KiB
Go
package connect
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
)
|
|
|
|
// State is the persisted per-workspace state for crash-resume. Currently
|
|
// just the activity cursor; future keys (auth token rotation, last
|
|
// successful heartbeat) get appended without a schema bump because
|
|
// json.Decode tolerates unknown / missing fields.
|
|
type State struct {
|
|
WorkspaceID string `json:"workspace_id"`
|
|
LastSinceID string `json:"last_since_id,omitempty"`
|
|
}
|
|
|
|
// StatePath returns the on-disk path for workspaceID's state file.
|
|
// dir is the directory root (typically `~/.config/molecule/state`); the
|
|
// caller resolves it via DefaultStateDir() unless the user passed a
|
|
// custom one.
|
|
func StatePath(dir, workspaceID string) string {
|
|
return filepath.Join(dir, workspaceID+".json")
|
|
}
|
|
|
|
// DefaultStateDir returns ~/.config/molecule/state, creating the
|
|
// hierarchy if missing. Returns the path even on mkdir error so callers
|
|
// can surface a meaningful "could not write state" message — the loops
|
|
// run regardless; persistence is best-effort.
|
|
func DefaultStateDir() (string, error) {
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
return "", fmt.Errorf("home dir: %w", err)
|
|
}
|
|
dir := filepath.Join(home, ".config", "molecule", "state")
|
|
if err := os.MkdirAll(dir, 0o700); err != nil {
|
|
return dir, fmt.Errorf("mkdir %s: %w", dir, err)
|
|
}
|
|
return dir, nil
|
|
}
|
|
|
|
// LoadState reads workspaceID's state from dir. Returns a zero-value
|
|
// State (no error) when the file is missing — first run is the same
|
|
// as "fresh state". Other errors (parse failure, permission denied)
|
|
// surface so the user knows their state is corrupt.
|
|
func LoadState(dir, workspaceID string) (State, error) {
|
|
path := StatePath(dir, workspaceID)
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return State{WorkspaceID: workspaceID}, nil
|
|
}
|
|
return State{}, fmt.Errorf("read %s: %w", path, err)
|
|
}
|
|
var s State
|
|
if err := json.Unmarshal(data, &s); err != nil {
|
|
return State{}, fmt.Errorf("parse %s: %w", path, err)
|
|
}
|
|
if s.WorkspaceID == "" {
|
|
s.WorkspaceID = workspaceID
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// SaveState writes workspaceID's state atomically (write to .tmp, rename
|
|
// over). The rename is atomic on POSIX so a crash mid-write can never
|
|
// produce a half-written cursor file.
|
|
func SaveState(dir string, s State) error {
|
|
if s.WorkspaceID == "" {
|
|
return fmt.Errorf("save state: workspace_id is required")
|
|
}
|
|
if err := os.MkdirAll(dir, 0o700); err != nil {
|
|
return fmt.Errorf("mkdir %s: %w", dir, err)
|
|
}
|
|
path := StatePath(dir, s.WorkspaceID)
|
|
tmp := path + ".tmp"
|
|
data, err := json.MarshalIndent(s, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("marshal: %w", err)
|
|
}
|
|
if err := os.WriteFile(tmp, data, 0o600); err != nil {
|
|
return fmt.Errorf("write %s: %w", tmp, err)
|
|
}
|
|
if err := os.Rename(tmp, path); err != nil {
|
|
return fmt.Errorf("rename %s -> %s: %w", tmp, path, err)
|
|
}
|
|
return nil
|
|
}
|