molecule-cli/internal/connect/client.go
Hongming Wang db6b196631 feat(connect): M1.2 — heartbeat + activity poll loops
Implements the runtime side of `molecule connect <id>`. After this PR
the CLI can actually attach to an external workspace and round-trip
inter-agent messages through any registered backend.

What's in:

- `internal/connect/client.go` — platform-API client with bearer auth.
  Endpoints: POST /registry/register (delivery_mode=poll, no URL),
  POST /registry/heartbeat, GET /workspaces/:id/activity?type=a2a_receive,
  POST /workspaces/:id/a2a (reply target).
  Errors split into TransientError (network/5xx — retry with backoff)
  and PermanentError (4xx — abort with clear message).

- `internal/connect/state.go` — atomic cursor persistence at
  ~/.config/molecule/state/<workspace-id>.json. Mode 0o600 (owner-only)
  from day 1 because future state additions may include rotated tokens.
  Atomic write-then-rename so a crash mid-write can never produce a
  half-written cursor.

- `internal/connect/connect.go` — Run() orchestrator. Wires register-
  with-bounded-retry, then heartbeat goroutine + poll goroutine.
  Both respect ctx cancellation for clean SIGTERM.

  Robustness contract per RFC #10:
    * Cursor advances AFTER successful dispatch — crash mid-batch
      re-delivers, never drops.
    * 410 on cursor lookup → reset to "" and re-fetch (don't deadlock
      on a pruned cursor).
    * Heartbeat permanent error stops the heartbeat loop only; poll
      loop keeps running so the operator sees "stopped" + reason in
      logs and can SIGTERM.
    * Backend dispatch is sequential within a batch (avoids out-of-
      order replies for in-flight conversations).
    * Inter-agent reply path: POST envelope to /workspaces/<source>/a2a.
    * Canvas-origin reply (source_id == nil) logs + skips for now —
      M1.3 wires that via the task_update activity convention.

- `internal/cmd/connect.go` — runConnect now actually calls
  connect.Run() (was a placeholder ctx-wait in M1.1).

Test plan:

- httptest workspace-server stub covers register / heartbeat / activity
  / a2a reply endpoints.
- TestRun_RoundTrip_AgentReply: end-to-end ping → mock backend → pong
  reply lands at source, cursor saved.
- TestRun_CanvasOriginMessageNotReplied: source_id=nil → backend fires
  but no reply post; cursor still advances.
- TestRun_CursorPruned410ResetsAndContinues: server returns 410 once,
  cursor resets to "", next poll dispatches the fresh row.
- TestRun_PermanentRegisterErrorAborts: 401 surfaces immediately.
- TestRun_TransientRegisterErrorRetries: 503 then 200 → register
  succeeds on second attempt.
- TestRun_OptionsValidation: missing Backend / WorkspaceID surface
  before any I/O.
- State: round-trip, file mode 0o600, atomic-rename leaves no .tmp
  artifacts, corrupted file surfaces error.
- All tests green under -race.

Out of scope (next PRs in this stack):

- M1.3: claude-code backend (canvas-origin reply convention rides
  with this)
- M1.4: GoReleaser tag-triggered release.yml workflow
- Push-mode (--mode push currently surfaces a clear "M4" error)

RFC: https://github.com/Molecule-AI/molecule-cli/issues/10

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 03:15:51 -07:00

234 lines
7.5 KiB
Go

// Package connect implements the runtime side of `molecule connect <id>` —
// register, heartbeat, poll, dispatch.
//
// Layout:
// - client.go — thin platform-API client (Register, Heartbeat, Activity, ReplyA2A)
// - state.go — cursor file at ~/.config/molecule/state/<workspace-id>.json
// - connect.go — Run() orchestrator that wires the loops to a Backend
//
// Robustness contract per RFC #10:
// - Heartbeat and poll use independent goroutines so a slow backend dispatch
// doesn't starve heartbeats (workspace would flip to 'awaiting_agent').
// - Both loops respect ctx cancellation for clean SIGTERM shutdown.
// - Network errors trigger exponential backoff (cap 60s); permanent errors
// (4xx) abort with a clear message.
// - Cursor file is written AFTER successful dispatch — a crash mid-batch
// re-delivers the in-flight message, never drops it.
// - Dispatch is idempotent against MessageID + IdempotencyKey so the
// re-delivery doesn't double-fire the backend.
package connect
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"
)
// Client is the platform-API surface that the loops talk to. Single
// struct so the heartbeat + poll goroutines share one http.Client with
// connection pooling. Concurrency-safe — each call builds its own
// *http.Request.
type Client struct {
apiURL string // e.g. "https://platform.example.com"
workspaceID string
token string
httpClient *http.Client
}
// NewClient builds a platform-API client. apiURL must be the base URL
// (no trailing slash); the methods append paths.
func NewClient(apiURL, workspaceID, token string) *Client {
return &Client{
apiURL: apiURL,
workspaceID: workspaceID,
token: token,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
}
}
// Register POSTs /registry/register with delivery_mode=poll and no URL
// (poll-mode workspaces don't need a public endpoint).
func (c *Client) Register(ctx context.Context, agentName string) error {
body := map[string]interface{}{
"id": c.workspaceID,
"agent_card": map[string]interface{}{
"name": agentName,
"description": "molecule connect (CLI bridge)",
"version": "0.1.0",
},
"delivery_mode": "poll",
}
return c.do(ctx, "POST", "/registry/register", body, nil)
}
// Heartbeat POSTs /registry/heartbeat. Called periodically by the
// heartbeat goroutine. The platform's TTL on the workspace's online
// status is short (~60s) so we beat every 30s by default.
func (c *Client) Heartbeat(ctx context.Context) error {
body := map[string]interface{}{
"workspace_id": c.workspaceID,
"runtime_state": "ok",
"uptime_seconds": 0, // not tracked by the bridge yet
}
return c.do(ctx, "POST", "/registry/heartbeat", body, nil)
}
// ActivityRow mirrors the activity_logs row shape returned by GET
// /workspaces/:id/activity. Only the fields the connect loops use are
// pulled; the rest pass through unread.
type ActivityRow struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
ActivityType string `json:"activity_type"`
SourceID *string `json:"source_id"`
TargetID *string `json:"target_id"`
Method *string `json:"method"`
Summary *string `json:"summary"`
RequestBody json.RawMessage `json:"request_body"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
}
// Activity GETs /workspaces/:id/activity with the given cursor. sinceID
// empty means "first call after register" — server returns the most
// recent backlog up to limit.
func (c *Client) Activity(ctx context.Context, sinceID string, limit int) ([]ActivityRow, error) {
q := url.Values{}
q.Set("type", "a2a_receive")
if sinceID != "" {
q.Set("since_id", sinceID)
}
if limit > 0 {
q.Set("limit", strconv.Itoa(limit))
}
path := "/workspaces/" + c.workspaceID + "/activity?" + q.Encode()
var rows []ActivityRow
if err := c.do(ctx, "GET", path, nil, &rows); err != nil {
return nil, err
}
return rows, nil
}
// ReplyA2A posts a JSON-RPC reply envelope to the source workspace's
// /a2a endpoint. This is the inter-agent reply path; canvas-origin
// messages (source_id == nil) need a different convention — see
// connect.go for the canvas-reply TODO.
func (c *Client) ReplyA2A(ctx context.Context, sourceWorkspaceID string, envelope []byte) error {
path := "/workspaces/" + sourceWorkspaceID + "/a2a"
return c.doRaw(ctx, "POST", path, envelope, nil)
}
// do runs a JSON request: marshal body, decode response into out (when
// non-nil). 4xx is a permanent error, 5xx is a retryable error — the
// caller decides what to do with each.
func (c *Client) do(ctx context.Context, method, path string, body interface{}, out interface{}) error {
var raw []byte
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("marshal %s %s: %w", method, path, err)
}
raw = b
}
return c.doRaw(ctx, method, path, raw, out)
}
// doRaw is do() but with a pre-marshaled body — used by ReplyA2A which
// passes through the original JSON-RPC envelope without re-encoding.
func (c *Client) doRaw(ctx context.Context, method, path string, body []byte, out interface{}) error {
var reader io.Reader
if len(body) > 0 {
reader = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, c.apiURL+path, reader)
if err != nil {
return fmt.Errorf("build request %s %s: %w", method, path, err)
}
req.Header.Set("Authorization", "Bearer "+c.token)
if len(body) > 0 {
req.Header.Set("Content-Type", "application/json")
}
if out != nil {
req.Header.Set("Accept", "application/json")
}
resp, err := c.httpClient.Do(req)
if err != nil {
// Network/transport error — caller treats as retryable.
return &TransientError{Op: method + " " + path, Err: err}
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if resp.StatusCode >= 400 {
// 4xx = permanent (caller config bug); 5xx = retryable.
if resp.StatusCode >= 500 {
return &TransientError{
Op: method + " " + path,
Status: resp.StatusCode,
Err: fmt.Errorf("server error %d: %s", resp.StatusCode, truncate(respBody, 200)),
}
}
return &PermanentError{
Op: method + " " + path,
Status: resp.StatusCode,
Body: string(truncate(respBody, 200)),
}
}
if out != nil && len(respBody) > 0 {
if err := json.Unmarshal(respBody, out); err != nil {
return fmt.Errorf("decode %s %s: %w (body: %s)", method, path, err, truncate(respBody, 200))
}
}
return nil
}
// TransientError is a network or 5xx error — the caller should retry
// with backoff.
type TransientError struct {
Op string
Status int
Err error
}
func (e *TransientError) Error() string {
if e.Status > 0 {
return fmt.Sprintf("%s: transient %d: %v", e.Op, e.Status, e.Err)
}
return fmt.Sprintf("%s: transient: %v", e.Op, e.Err)
}
func (e *TransientError) Unwrap() error { return e.Err }
// PermanentError is a 4xx error — the caller should abort or surface
// the message to the user. Usually means token wrong, workspace
// removed, or payload malformed.
type PermanentError struct {
Op string
Status int
Body string
}
func (e *PermanentError) Error() string {
return fmt.Sprintf("%s: %d: %s", e.Op, e.Status, e.Body)
}
func truncate(b []byte, n int) []byte {
if len(b) <= n {
return b
}
out := make([]byte, 0, n+3)
out = append(out, b[:n]...)
out = append(out, "..."...)
return out
}