Merge pull request #11 from Molecule-AI/auto/connect-m1-foundation

feat(connect): M1.1 — Backend interface + connect skeleton + mock backend
This commit is contained in:
Hongming Wang 2026-04-30 03:08:00 -07:00 committed by GitHub
commit 268e0cd9ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 674 additions and 0 deletions

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.25.0
require (
github.com/spf13/cobra v1.10.2
github.com/spf13/viper v1.21.0
gopkg.in/yaml.v3 v3.0.1
)
require (

View File

@ -0,0 +1,171 @@
// Package backends defines the pluggable handler interface that
// `molecule connect` dispatches inbound A2A messages to.
//
// Each backend impl is a sub-package (claude_code/, exec/, mock/, etc.)
// that registers itself via `Register()` from an `init()` block.
// Runtime selection is done via the --backend flag.
//
// See RFC: https://github.com/Molecule-AI/molecule-cli/issues/10
package backends
import (
"context"
"fmt"
"sort"
"sync"
)
// Request is the inbound A2A message handed to a Backend. Mirrors the
// JSON-RPC `params` shape that workspace-server's /workspaces/:id/a2a
// endpoint consumes — kept lossless so backends can re-issue the request
// to a downstream system without re-parsing.
//
// The fields here are the stable contract; new optional fields can be
// added but must be additive.
type Request struct {
// WorkspaceID is the ID of the receiving workspace (this side).
WorkspaceID string `json:"workspace_id"`
// CallerID is the workspace ID of the sender, when known. Empty for
// canvas-originated messages.
CallerID string `json:"caller_id,omitempty"`
// MessageID is the per-message UUID. Unique per send; backends use
// this for idempotency dedupe.
MessageID string `json:"message_id,omitempty"`
// IdempotencyKey is the caller-supplied dedupe key. If set, prefer
// it over MessageID for de-dup.
IdempotencyKey string `json:"idempotency_key,omitempty"`
// TaskID is the long-running task this message belongs to (when the
// caller is in a delegation flow).
TaskID string `json:"task_id,omitempty"`
// Parts carries the message content (text/file/data parts per A2A
// v0.3). Backends that only handle text concatenate the text parts.
Parts []Part `json:"parts"`
// Method is the JSON-RPC method ("message/send", "message/stream",
// etc.) — backends that can stream may branch on this.
Method string `json:"method,omitempty"`
// Raw is the unparsed JSON-RPC envelope, kept for backends that need
// to forward the full request shape (mcp, openai-passthrough).
Raw []byte `json:"-"`
}
// Part is one A2A message part. Type is "text", "file", "data".
type Part struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
MimeType string `json:"mime_type,omitempty"`
URI string `json:"uri,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
}
// Response is the backend's reply to an A2A request.
type Response struct {
// Parts is the response content. At least one part is required;
// most backends produce a single text part.
Parts []Part `json:"parts"`
// Final indicates this is the terminal response for the request
// (vs an intermediate streaming chunk). Single-shot backends
// always set true.
Final bool `json:"final"`
}
// TextResponse is a convenience constructor for the common case: a
// single text part, terminal response.
func TextResponse(text string) Response {
return Response{
Parts: []Part{{Type: "text", Text: text}},
Final: true,
}
}
// Backend is the seam every concrete handler implements. Two methods,
// no inheritance, no surprise side effects: HandleA2A is called once
// per inbound message, Close once at shutdown.
//
// Backends MUST be safe for concurrent HandleA2A calls — `molecule
// connect` may dispatch poll-batch messages in parallel.
type Backend interface {
// HandleA2A processes one inbound message and returns the reply.
// Implementations should respect ctx cancellation; the caller may
// cancel on shutdown.
HandleA2A(ctx context.Context, req Request) (Response, error)
// Close releases backend resources (subprocess, network conn, etc.).
// Called exactly once during graceful shutdown. Must be idempotent.
Close() error
}
// Factory builds a Backend from per-backend config. Returned by
// each backend impl's `init()`-time registration.
type Factory func(cfg Config) (Backend, error)
// Config is the loosely-typed bag of per-backend options. Each backend
// documents the keys it consumes in its package-level doc. Unknown
// keys are ignored so adding a key doesn't break existing setups.
type Config map[string]string
// Get returns cfg[key], or fallback if unset.
func (c Config) Get(key, fallback string) string {
if v, ok := c[key]; ok && v != "" {
return v
}
return fallback
}
// Require returns cfg[key], or an error if unset/empty. Use for keys
// the backend cannot start without.
func (c Config) Require(key string) (string, error) {
v := c[key]
if v == "" {
return "", fmt.Errorf("backend config: %q is required", key)
}
return v, nil
}
var (
registryMu sync.RWMutex
registry = map[string]Factory{}
)
// Register adds a backend Factory under name. Called from each backend
// impl's init() block. Panics on duplicate name — registration drift
// is a programming error and should fail loudly at startup.
func Register(name string, factory Factory) {
if name == "" {
panic("backends.Register: name must be non-empty")
}
if factory == nil {
panic("backends.Register: factory must be non-nil")
}
registryMu.Lock()
defer registryMu.Unlock()
if _, dup := registry[name]; dup {
panic("backends.Register: duplicate backend name " + name)
}
registry[name] = factory
}
// Build instantiates the named backend with cfg. Returns an error if
// no backend is registered under that name (typo, missing build tag,
// etc.) — callers should surface the error with a clear message that
// includes the list from `Names()`.
func Build(name string, cfg Config) (Backend, error) {
registryMu.RLock()
factory, ok := registry[name]
registryMu.RUnlock()
if !ok {
return nil, fmt.Errorf("backends.Build: unknown backend %q (registered: %v)", name, Names())
}
return factory(cfg)
}
// Names returns the sorted list of registered backend names. Used in
// `--help` rendering and error messages.
func Names() []string {
registryMu.RLock()
out := make([]string, 0, len(registry))
for k := range registry {
out = append(out, k)
}
registryMu.RUnlock()
sort.Strings(out)
return out
}

View File

@ -0,0 +1,184 @@
package backends_test
import (
"context"
"strings"
"testing"
"github.com/Molecule-AI/molecule-cli/internal/backends"
_ "github.com/Molecule-AI/molecule-cli/internal/backends/mock" // register
)
func TestRegister_DuplicatePanics(t *testing.T) {
defer func() {
r := recover()
if r == nil {
t.Fatal("expected panic on duplicate registration")
}
msg, ok := r.(string)
if !ok || !strings.Contains(msg, "duplicate backend name") {
t.Fatalf("unexpected panic value: %v", r)
}
}()
// "mock" is already registered via the package-level import above.
backends.Register("mock", func(backends.Config) (backends.Backend, error) {
return nil, nil
})
}
func TestRegister_EmptyNamePanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic on empty name")
}
}()
backends.Register("", func(backends.Config) (backends.Backend, error) {
return nil, nil
})
}
func TestRegister_NilFactoryPanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic on nil factory")
}
}()
backends.Register("nilfactory", nil)
}
func TestBuild_UnknownBackend(t *testing.T) {
_, err := backends.Build("does-not-exist", nil)
if err == nil {
t.Fatal("expected error on unknown backend")
}
// Error must include the known list so users can see what's available.
if !strings.Contains(err.Error(), "registered:") {
t.Errorf("error missing registered-list hint: %v", err)
}
}
func TestBuild_MockBackend(t *testing.T) {
b, err := backends.Build("mock", backends.Config{"reply": "pong: %s"})
if err != nil {
t.Fatalf("Build(mock): %v", err)
}
defer b.Close()
resp, err := b.HandleA2A(context.Background(), backends.Request{
WorkspaceID: "ws-test",
Parts: []backends.Part{{Type: "text", Text: "ping"}},
})
if err != nil {
t.Fatalf("HandleA2A: %v", err)
}
if !resp.Final {
t.Error("expected Final=true on terminal response")
}
if len(resp.Parts) != 1 {
t.Fatalf("expected 1 part, got %d", len(resp.Parts))
}
if got, want := resp.Parts[0].Text, "pong: ping"; got != want {
t.Errorf("reply: got %q, want %q", got, want)
}
}
func TestBuild_MockBackend_DefaultTemplate(t *testing.T) {
b, err := backends.Build("mock", nil)
if err != nil {
t.Fatalf("Build(mock, nil cfg): %v", err)
}
defer b.Close()
resp, err := b.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{{Type: "text", Text: "hi"}},
})
if err != nil {
t.Fatal(err)
}
if got, want := resp.Parts[0].Text, "echo: hi"; got != want {
t.Errorf("default template: got %q, want %q", got, want)
}
}
func TestBuild_MockBackend_ConcatenatesTextParts(t *testing.T) {
b, err := backends.Build("mock", backends.Config{"reply": "%s"})
if err != nil {
t.Fatal(err)
}
defer b.Close()
resp, _ := b.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{
{Type: "text", Text: "hello "},
{Type: "data", Data: map[string]interface{}{"k": "v"}}, // ignored
{Type: "text", Text: "world"},
},
})
if got, want := resp.Parts[0].Text, "hello world"; got != want {
t.Errorf("concatenation: got %q, want %q", got, want)
}
}
func TestNames_IncludesMock(t *testing.T) {
names := backends.Names()
found := false
for _, n := range names {
if n == "mock" {
found = true
break
}
}
if !found {
t.Errorf("Names() missing 'mock': %v", names)
}
}
func TestConfig_GetWithFallback(t *testing.T) {
cfg := backends.Config{"present": "yes"}
if got := cfg.Get("present", "fb"); got != "yes" {
t.Errorf("Get present: got %q, want yes", got)
}
if got := cfg.Get("absent", "fb"); got != "fb" {
t.Errorf("Get absent: got %q, want fb", got)
}
if got := cfg.Get("present", "fb"); got != "yes" {
t.Errorf("Get present consistent")
}
// Empty value triggers fallback (treated as unset).
cfg["empty"] = ""
if got := cfg.Get("empty", "fb"); got != "fb" {
t.Errorf("Get empty: got %q, want fb", got)
}
}
func TestConfig_RequireMissing(t *testing.T) {
cfg := backends.Config{}
_, err := cfg.Require("nope")
if err == nil {
t.Fatal("expected error on missing required key")
}
if !strings.Contains(err.Error(), "nope") {
t.Errorf("error should name missing key: %v", err)
}
}
func TestConfig_RequirePresent(t *testing.T) {
cfg := backends.Config{"k": "v"}
got, err := cfg.Require("k")
if err != nil {
t.Fatal(err)
}
if got != "v" {
t.Errorf("got %q, want v", got)
}
}
func TestTextResponse_Shape(t *testing.T) {
resp := backends.TextResponse("hello")
if !resp.Final {
t.Error("TextResponse should be Final")
}
if len(resp.Parts) != 1 || resp.Parts[0].Type != "text" || resp.Parts[0].Text != "hello" {
t.Errorf("bad shape: %+v", resp)
}
}

View File

@ -0,0 +1,51 @@
// Package mock implements a deterministic Backend for tests, demos,
// and CI smoke checks.
//
// Config keys:
// - reply: response template. `%s` is replaced with the concatenated
// inbound text parts. Default: "echo: %s".
//
// Registers itself as "mock". Activate via:
//
// molecule connect <id> --backend mock --backend-opt reply="pong"
package mock
import (
"context"
"strings"
"github.com/Molecule-AI/molecule-cli/internal/backends"
)
func init() {
backends.Register("mock", New)
}
// New builds a mock backend from cfg.
func New(cfg backends.Config) (backends.Backend, error) {
return &Backend{
template: cfg.Get("reply", "echo: %s"),
}, nil
}
// Backend is the mock implementation. Pure function: no state, no I/O,
// safe for concurrent use without locks.
type Backend struct {
template string
}
// HandleA2A renders the reply template against the request's text
// parts and returns it as a single-part terminal response.
func (b *Backend) HandleA2A(_ context.Context, req backends.Request) (backends.Response, error) {
var sb strings.Builder
for _, p := range req.Parts {
if p.Type == "text" {
sb.WriteString(p.Text)
}
}
reply := strings.ReplaceAll(b.template, "%s", sb.String())
return backends.TextResponse(reply), nil
}
// Close is a no-op — nothing to release.
func (b *Backend) Close() error { return nil }

144
internal/cmd/connect.go Normal file
View File

@ -0,0 +1,144 @@
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/Molecule-AI/molecule-cli/internal/backends"
_ "github.com/Molecule-AI/molecule-cli/internal/backends/mock" // register backend
"github.com/spf13/cobra"
)
// ---------------------------------------------------------------------------
// molecule connect — bridge an external-runtime workspace to a local backend.
//
// The full M1+ design lives in the RFC at
// https://github.com/Molecule-AI/molecule-cli/issues/10. This file owns the
// command surface; the wiring (heartbeat, activity poll, dispatch) lands in
// internal/connect/ in subsequent PRs.
// ---------------------------------------------------------------------------
var connectFlags struct {
backend string
backendOpts []string // KEY=VALUE pairs, repeatable
token string
mode string // "poll" (default) | "push"
intervalMs int // poll cadence, milliseconds
sinceSecs int // initial activity-cursor lookback
dryRun bool // build backend + print summary, do not start loops
}
var connectCmd = &cobra.Command{
Use: "connect <workspace-id>",
Short: "Bridge an external workspace to a local backend (Claude Code, exec, mock, ...)",
Long: `connect attaches the calling process to an external-runtime workspace.
Inbound A2A messages routed to the workspace are dispatched to the
selected --backend. The default backend is "claude-code" (bridges into
a running Claude Code session via the channel plugin); use "mock" for
CI smoke and "exec" for arbitrary shell handlers.
Authentication: the per-workspace token from the create response is
read from --token or the MOLECULE_WORKSPACE_TOKEN env var. The
platform URL is read from --api-url or MOLECULE_API_URL.
Mode: poll (default) is the no-public-URL path the CLI long-polls
the platform for inbound activity. push requires the local box to be
reachable from the platform (public HTTPS); use only when running on
a server with an inbound URL.
Examples:
# Default: bridge into a running Claude Code session
molecule connect ws_01HF2K... --token $WS_TOKEN
# CI smoke / demo replies are deterministic
molecule connect ws_01HF2K... --backend mock \
--backend-opt reply="echo: %s"
# Arbitrary shell handler
molecule connect ws_01HF2K... --backend exec \
--backend-opt cmd="python myhandler.py"
See full design: https://github.com/Molecule-AI/molecule-cli/issues/10`,
Args: cobra.ExactArgs(1),
RunE: runConnect,
}
func init() {
connectCmd.Flags().StringVar(&connectFlags.backend, "backend", "claude-code",
fmt.Sprintf("Backend that handles inbound A2A messages (registered: %s)",
strings.Join(backends.Names(), ", ")))
connectCmd.Flags().StringArrayVar(&connectFlags.backendOpts, "backend-opt", nil,
"Backend-specific option, KEY=VALUE (repeatable)")
connectCmd.Flags().StringVar(&connectFlags.token, "token",
envOr("MOLECULE_WORKSPACE_TOKEN", ""),
"Workspace auth token (env: MOLECULE_WORKSPACE_TOKEN)")
connectCmd.Flags().StringVar(&connectFlags.mode, "mode", "poll",
"Delivery mode: poll (no public URL needed) | push")
connectCmd.Flags().IntVar(&connectFlags.intervalMs, "interval-ms", 1000,
"Poll-mode interval between activity fetches, in milliseconds")
connectCmd.Flags().IntVar(&connectFlags.sinceSecs, "since-secs", 30,
"Poll-mode initial cursor lookback, in seconds")
connectCmd.Flags().BoolVar(&connectFlags.dryRun, "dry-run", false,
"Build the backend and print the connection summary, but do not start loops")
}
func runConnect(_ *cobra.Command, args []string) error {
workspaceID := strings.TrimSpace(args[0])
if workspaceID == "" {
return &exitError{code: 2, msg: "connect: workspace-id is required"}
}
if connectFlags.token == "" {
return &exitError{code: 2, msg: "connect: --token (or MOLECULE_WORKSPACE_TOKEN) is required"}
}
if connectFlags.mode != "poll" && connectFlags.mode != "push" {
return &exitError{code: 2, msg: "connect: --mode must be poll or push"}
}
cfg, err := parseBackendOpts(connectFlags.backendOpts)
if err != nil {
return &exitError{code: 2, msg: err.Error()}
}
backend, err := backends.Build(connectFlags.backend, cfg)
if err != nil {
return &exitError{code: 2, msg: err.Error()}
}
fmt.Fprintf(os.Stderr, "molecule connect: workspace=%s backend=%s mode=%s api=%s\n",
workspaceID, connectFlags.backend, connectFlags.mode, apiURL)
if connectFlags.dryRun {
fmt.Fprintln(os.Stderr, "molecule connect: --dry-run; backend built ok, not starting loops")
return backend.Close()
}
// Loops (heartbeat + activity poll + dispatch) land in internal/connect
// in PR M1.2. For M1.1 we wire signal handling so the command exits
// cleanly when invoked in --dry-run by tests, and so future loops
// inherit context cancellation.
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
<-ctx.Done()
fmt.Fprintln(os.Stderr, "molecule connect: shutting down")
return backend.Close()
}
// parseBackendOpts converts repeated KEY=VALUE flags into a Config map.
func parseBackendOpts(opts []string) (backends.Config, error) {
cfg := backends.Config{}
for _, opt := range opts {
k, v, ok := strings.Cut(opt, "=")
if !ok || k == "" {
return nil, fmt.Errorf("--backend-opt: %q is not KEY=VALUE", opt)
}
cfg[k] = v
}
return cfg, nil
}

View File

@ -0,0 +1,122 @@
package cmd
import (
"strings"
"testing"
)
func TestParseBackendOpts(t *testing.T) {
cases := []struct {
name string
input []string
want map[string]string
wantErr bool
}{
{"empty", nil, map[string]string{}, false},
{"single", []string{"reply=pong"}, map[string]string{"reply": "pong"}, false},
{
"multiple",
[]string{"reply=pong", "cmd=python x.py"},
map[string]string{"reply": "pong", "cmd": "python x.py"},
false,
},
{
"value contains equals",
[]string{"url=https://x.com?a=b"},
map[string]string{"url": "https://x.com?a=b"},
false,
},
{"empty value allowed", []string{"k="}, map[string]string{"k": ""}, false},
{"missing equals", []string{"justakey"}, nil, true},
{"empty key", []string{"=v"}, nil, true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, err := parseBackendOpts(tc.input)
if (err != nil) != tc.wantErr {
t.Fatalf("err = %v, wantErr = %v", err, tc.wantErr)
}
if tc.wantErr {
return
}
if len(got) != len(tc.want) {
t.Errorf("len: got %d, want %d (%+v)", len(got), len(tc.want), got)
}
for k, v := range tc.want {
if got[k] != v {
t.Errorf("key %q: got %q, want %q", k, got[k], v)
}
}
})
}
}
// TestConnect_FlagValidation walks the invalid argument paths the runner
// guards against. We don't actually open a socket — the flags are
// rejected before any I/O.
func TestConnect_FlagValidation(t *testing.T) {
cases := []struct {
name string
args []string
wantSub string
}{
// Cobra catches missing args before runConnect runs.
{"no token", []string{"connect", "ws-1", "--backend", "mock"}, "token"},
{
"bad mode",
[]string{"connect", "ws-1", "--backend", "mock", "--token", "t", "--mode", "ftp"},
"mode",
},
{
"bad backend-opt",
[]string{"connect", "ws-1", "--backend", "mock", "--token", "t", "--backend-opt", "noequals"},
"KEY=VALUE",
},
{
"unknown backend",
[]string{"connect", "ws-1", "--backend", "nonesuch", "--token", "t"},
"unknown backend",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Reset package-level connectFlags between runs so a prior test's
// settings don't leak. cobra rebinds on Execute.
resetConnectFlags()
rootCmd.SetArgs(tc.args)
err := rootCmd.Execute()
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), tc.wantSub) {
t.Errorf("error %q missing %q", err.Error(), tc.wantSub)
}
})
}
}
// TestConnect_DryRun covers the happy path: valid flags, --dry-run set,
// runner builds the backend and exits without entering loops.
func TestConnect_DryRun(t *testing.T) {
resetConnectFlags()
rootCmd.SetArgs([]string{
"connect", "ws-test",
"--backend", "mock",
"--token", "tok",
"--backend-opt", "reply=ok",
"--dry-run",
})
if err := rootCmd.Execute(); err != nil {
t.Fatalf("dry-run should succeed: %v", err)
}
}
func resetConnectFlags() {
connectFlags.backend = "claude-code"
connectFlags.backendOpts = nil
connectFlags.token = ""
connectFlags.mode = "poll"
connectFlags.intervalMs = 1000
connectFlags.sinceSecs = 30
connectFlags.dryRun = false
}

View File

@ -89,6 +89,7 @@ func init() {
rootCmd.AddCommand(platformCmd)
rootCmd.AddCommand(configCmd)
rootCmd.AddCommand(initCmd)
rootCmd.AddCommand(connectCmd)
}
// exitError wraps a user-facing error with a specific exit code.