molecule-cli/internal/connect/connect.go
Hongming Wang db6b196631 feat(connect): M1.2 — heartbeat + activity poll loops
Implements the runtime side of `molecule connect <id>`. After this PR
the CLI can actually attach to an external workspace and round-trip
inter-agent messages through any registered backend.

What's in:

- `internal/connect/client.go` — platform-API client with bearer auth.
  Endpoints: POST /registry/register (delivery_mode=poll, no URL),
  POST /registry/heartbeat, GET /workspaces/:id/activity?type=a2a_receive,
  POST /workspaces/:id/a2a (reply target).
  Errors split into TransientError (network/5xx — retry with backoff)
  and PermanentError (4xx — abort with clear message).

- `internal/connect/state.go` — atomic cursor persistence at
  ~/.config/molecule/state/<workspace-id>.json. Mode 0o600 (owner-only)
  from day 1 because future state additions may include rotated tokens.
  Atomic write-then-rename so a crash mid-write can never produce a
  half-written cursor.

- `internal/connect/connect.go` — Run() orchestrator. Wires register-
  with-bounded-retry, then heartbeat goroutine + poll goroutine.
  Both respect ctx cancellation for clean SIGTERM.

  Robustness contract per RFC #10:
    * Cursor advances AFTER successful dispatch — crash mid-batch
      re-delivers, never drops.
    * 410 on cursor lookup → reset to "" and re-fetch (don't deadlock
      on a pruned cursor).
    * Heartbeat permanent error stops the heartbeat loop only; poll
      loop keeps running so the operator sees "stopped" + reason in
      logs and can SIGTERM.
    * Backend dispatch is sequential within a batch (avoids out-of-
      order replies for in-flight conversations).
    * Inter-agent reply path: POST envelope to /workspaces/<source>/a2a.
    * Canvas-origin reply (source_id == nil) logs + skips for now —
      M1.3 wires that via the task_update activity convention.

- `internal/cmd/connect.go` — runConnect now actually calls
  connect.Run() (was a placeholder ctx-wait in M1.1).

Test plan:

- httptest workspace-server stub covers register / heartbeat / activity
  / a2a reply endpoints.
- TestRun_RoundTrip_AgentReply: end-to-end ping → mock backend → pong
  reply lands at source, cursor saved.
- TestRun_CanvasOriginMessageNotReplied: source_id=nil → backend fires
  but no reply post; cursor still advances.
- TestRun_CursorPruned410ResetsAndContinues: server returns 410 once,
  cursor resets to "", next poll dispatches the fresh row.
- TestRun_PermanentRegisterErrorAborts: 401 surfaces immediately.
- TestRun_TransientRegisterErrorRetries: 503 then 200 → register
  succeeds on second attempt.
- TestRun_OptionsValidation: missing Backend / WorkspaceID surface
  before any I/O.
- State: round-trip, file mode 0o600, atomic-rename leaves no .tmp
  artifacts, corrupted file surfaces error.
- All tests green under -race.

Out of scope (next PRs in this stack):

- M1.3: claude-code backend (canvas-origin reply convention rides
  with this)
- M1.4: GoReleaser tag-triggered release.yml workflow
- Push-mode (--mode push currently surfaces a clear "M4" error)

RFC: https://github.com/Molecule-AI/molecule-cli/issues/10

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 03:15:51 -07:00

342 lines
10 KiB
Go

package connect
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/Molecule-AI/molecule-cli/internal/backends"
)
// Options carries the runtime knobs Run needs. Constructed by the cmd
// layer from cobra flags.
type Options struct {
APIURL string
WorkspaceID string
Token string
AgentName string // sent in agent_card on register; default "molecule-connect"
HeartbeatEvery time.Duration // default 30s
PollEvery time.Duration // default 1s
StateDir string // default DefaultStateDir()
Backend backends.Backend
Logger *log.Logger // default log.Default()
OnError func(error) // optional observer (tests use this)
}
// Run wires register → heartbeat goroutine + poll goroutine, returns
// when ctx is cancelled. Both goroutines drain on shutdown.
//
// Crash semantics: cursor is saved AFTER successful dispatch, so a
// SIGTERM mid-dispatch re-delivers on next start. Idempotency dedup
// (MessageID + IdempotencyKey) is the backend's responsibility — Run
// passes the keys through but does not enforce uniqueness across
// process restarts.
func Run(ctx context.Context, opts Options) error {
if opts.Backend == nil {
return fmt.Errorf("connect.Run: Backend is required")
}
if opts.WorkspaceID == "" {
return fmt.Errorf("connect.Run: WorkspaceID is required")
}
if opts.HeartbeatEvery == 0 {
opts.HeartbeatEvery = 30 * time.Second
}
if opts.PollEvery == 0 {
opts.PollEvery = time.Second
}
if opts.AgentName == "" {
opts.AgentName = "molecule-connect"
}
if opts.Logger == nil {
opts.Logger = log.Default()
}
if opts.StateDir == "" {
dir, err := DefaultStateDir()
if err != nil {
opts.Logger.Printf("connect: state dir unavailable, continuing without persistence: %v", err)
}
opts.StateDir = dir
}
state, err := LoadState(opts.StateDir, opts.WorkspaceID)
if err != nil {
opts.Logger.Printf("connect: load state failed (starting fresh): %v", err)
state = State{WorkspaceID: opts.WorkspaceID}
}
cl := NewClient(opts.APIURL, opts.WorkspaceID, opts.Token)
// Register is one-shot; failure here is fatal — we have no auth/identity
// without it. Retry on transient errors with a short bounded backoff so
// a flaky network doesn't immediately abort.
if err := registerWithRetry(ctx, cl, opts); err != nil {
return fmt.Errorf("register: %w", err)
}
opts.Logger.Printf("connect: registered workspace=%s mode=poll", opts.WorkspaceID)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
runHeartbeatLoop(ctx, cl, opts)
}()
go func() {
defer wg.Done()
runPollLoop(ctx, cl, opts, state)
}()
wg.Wait()
return nil
}
// registerWithRetry handles the bootstrap call. Transient errors retry
// up to 5 times with linear backoff (1s, 2s, 4s, 8s, 16s); permanent
// errors abort.
func registerWithRetry(ctx context.Context, cl *Client, opts Options) error {
const maxAttempts = 5
delay := time.Second
for attempt := 1; attempt <= maxAttempts; attempt++ {
err := cl.Register(ctx, opts.AgentName)
if err == nil {
return nil
}
var perm *PermanentError
if errors.As(err, &perm) {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
opts.Logger.Printf("connect: register attempt %d/%d failed: %v (retry in %s)",
attempt, maxAttempts, err, delay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
delay *= 2
if delay > 30*time.Second {
delay = 30 * time.Second
}
}
return fmt.Errorf("register: gave up after %d attempts", maxAttempts)
}
// runHeartbeatLoop pings /registry/heartbeat every opts.HeartbeatEvery.
// Transient failures log and continue with exponential backoff;
// permanent failures (e.g. 401 token revoked) log loudly and stop the
// loop — but Run() doesn't return until ctx is cancelled, so the user
// sees the error and SIGTERMs.
func runHeartbeatLoop(ctx context.Context, cl *Client, opts Options) {
ticker := time.NewTicker(opts.HeartbeatEvery)
defer ticker.Stop()
failures := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
err := cl.Heartbeat(ctx)
if err == nil {
if failures > 0 {
opts.Logger.Printf("connect: heartbeat recovered after %d failures", failures)
}
failures = 0
continue
}
var perm *PermanentError
if errors.As(err, &perm) {
opts.Logger.Printf("connect: heartbeat permanent error (loop stopping): %v", err)
notifyError(opts, err)
return
}
failures++
opts.Logger.Printf("connect: heartbeat transient error #%d: %v", failures, err)
notifyError(opts, err)
// At 5+ failures, slow down the cadence to reduce log spam — the
// platform will mark us awaiting_agent after ~60s anyway.
if failures >= 5 {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(failures) * opts.HeartbeatEvery):
}
}
}
}
// runPollLoop fetches activity_logs since cursor, dispatches each row to
// the backend, posts the reply (when source is an agent — canvas-origin
// reply needs a different convention, deferred to M1.3+), then advances
// + persists the cursor.
//
// A poll batch is processed sequentially: the backend may be expensive
// (LLM call) and parallelism inside one batch invites out-of-order
// responses for in-flight conversations. Future: per-source serialization
// queue if the backend can be safely parallelized across sources.
func runPollLoop(ctx context.Context, cl *Client, opts Options, state State) {
ticker := time.NewTicker(opts.PollEvery)
defer ticker.Stop()
transientFails := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
rows, err := cl.Activity(ctx, state.LastSinceID, 50)
if err != nil {
var perm *PermanentError
if errors.As(err, &perm) {
if perm.Status == 410 {
// Cursor pruned — reset and re-fetch backlog.
opts.Logger.Printf("connect: cursor %s pruned (410), resetting",
state.LastSinceID)
state.LastSinceID = ""
_ = SaveState(opts.StateDir, state)
continue
}
opts.Logger.Printf("connect: poll permanent error (loop stopping): %v", err)
notifyError(opts, err)
return
}
transientFails++
opts.Logger.Printf("connect: poll transient error #%d: %v", transientFails, err)
notifyError(opts, err)
continue
}
transientFails = 0
for _, row := range rows {
if err := dispatchOne(ctx, cl, opts, row); err != nil {
opts.Logger.Printf("connect: dispatch %s failed: %v", row.ID, err)
notifyError(opts, err)
// Save cursor BEFORE the failed row so the next poll
// re-fetches it. If the failure is deterministic this
// will spin — that's the operator's signal to fix the
// backend or the message.
return
}
state.LastSinceID = row.ID
if err := SaveState(opts.StateDir, state); err != nil {
opts.Logger.Printf("connect: save cursor failed (continuing): %v", err)
}
}
}
}
// dispatchOne hands one activity row to the backend, posts the reply if
// applicable, and returns. Errors abort the current batch (caller saves
// cursor up to but not including this row).
func dispatchOne(ctx context.Context, cl *Client, opts Options, row ActivityRow) error {
req, err := parseRequest(row)
if err != nil {
// Malformed inbound — log + skip (advance past it). A permanent
// payload bug shouldn't deadlock the queue.
opts.Logger.Printf("connect: parse row %s failed (skipping): %v", row.ID, err)
return nil
}
resp, err := opts.Backend.HandleA2A(ctx, req)
if err != nil {
return fmt.Errorf("backend: %w", err)
}
// Inter-agent reply path. Canvas-origin (source_id == nil) reply
// uses the activity_logs "task_update" convention (M1.3+); for now,
// log + skip so the dispatch isn't lost silently.
if row.SourceID == nil || *row.SourceID == "" {
opts.Logger.Printf("connect: canvas-origin reply not yet wired (msg=%s parts=%d) — TODO M1.3",
row.ID, len(resp.Parts))
return nil
}
envelope := buildReplyEnvelope(req, resp)
raw, err := json.Marshal(envelope)
if err != nil {
return fmt.Errorf("marshal reply envelope: %w", err)
}
if err := cl.ReplyA2A(ctx, *row.SourceID, raw); err != nil {
return fmt.Errorf("reply to %s: %w", *row.SourceID, err)
}
return nil
}
// parseRequest converts an activity_logs row's request_body into a
// backends.Request. Tolerates the common JSON-RPC shape:
//
// {"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[...]}}}
func parseRequest(row ActivityRow) (backends.Request, error) {
method := ""
if row.Method != nil {
method = *row.Method
}
caller := ""
if row.SourceID != nil {
caller = *row.SourceID
}
out := backends.Request{
WorkspaceID: row.WorkspaceID,
CallerID: caller,
MessageID: row.ID,
Method: method,
Raw: row.RequestBody,
}
if len(row.RequestBody) == 0 {
return out, nil
}
var env struct {
Params struct {
Message struct {
Parts []backends.Part `json:"parts"`
IdempotencyKey string `json:"idempotency_key"`
TaskID string `json:"task_id"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(row.RequestBody, &env); err != nil {
// Not a JSON-RPC envelope — pass raw through, backend handles.
return out, nil
}
out.Parts = env.Params.Message.Parts
out.IdempotencyKey = env.Params.Message.IdempotencyKey
out.TaskID = env.Params.Message.TaskID
return out, nil
}
// buildReplyEnvelope shapes resp into the JSON-RPC reply that the
// platform's /workspaces/<source_id>/a2a expects. Mirrors the v0.3
// message/send shape — the source workspace's adapter parses parts the
// same way it parses any inbound message.
func buildReplyEnvelope(req backends.Request, resp backends.Response) map[string]interface{} {
env := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.MessageID,
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "assistant",
"parts": resp.Parts,
},
},
}
if req.TaskID != "" {
env["params"].(map[string]interface{})["message"].(map[string]interface{})["task_id"] = req.TaskID
}
return env
}
func notifyError(opts Options, err error) {
if opts.OnError != nil {
opts.OnError(err)
}
}