diff --git a/go.mod b/go.mod index 6c03571..f97dfe1 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.0 require ( github.com/spf13/cobra v1.10.2 github.com/spf13/viper v1.21.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( diff --git a/internal/backends/backend.go b/internal/backends/backend.go new file mode 100644 index 0000000..3fe17b1 --- /dev/null +++ b/internal/backends/backend.go @@ -0,0 +1,171 @@ +// Package backends defines the pluggable handler interface that +// `molecule connect` dispatches inbound A2A messages to. +// +// Each backend impl is a sub-package (claude_code/, exec/, mock/, etc.) +// that registers itself via `Register()` from an `init()` block. +// Runtime selection is done via the --backend flag. +// +// See RFC: https://github.com/Molecule-AI/molecule-cli/issues/10 +package backends + +import ( + "context" + "fmt" + "sort" + "sync" +) + +// Request is the inbound A2A message handed to a Backend. Mirrors the +// JSON-RPC `params` shape that workspace-server's /workspaces/:id/a2a +// endpoint consumes — kept lossless so backends can re-issue the request +// to a downstream system without re-parsing. +// +// The fields here are the stable contract; new optional fields can be +// added but must be additive. +type Request struct { + // WorkspaceID is the ID of the receiving workspace (this side). + WorkspaceID string `json:"workspace_id"` + // CallerID is the workspace ID of the sender, when known. Empty for + // canvas-originated messages. + CallerID string `json:"caller_id,omitempty"` + // MessageID is the per-message UUID. Unique per send; backends use + // this for idempotency dedupe. + MessageID string `json:"message_id,omitempty"` + // IdempotencyKey is the caller-supplied dedupe key. If set, prefer + // it over MessageID for de-dup. + IdempotencyKey string `json:"idempotency_key,omitempty"` + // TaskID is the long-running task this message belongs to (when the + // caller is in a delegation flow). + TaskID string `json:"task_id,omitempty"` + // Parts carries the message content (text/file/data parts per A2A + // v0.3). Backends that only handle text concatenate the text parts. + Parts []Part `json:"parts"` + // Method is the JSON-RPC method ("message/send", "message/stream", + // etc.) — backends that can stream may branch on this. + Method string `json:"method,omitempty"` + // Raw is the unparsed JSON-RPC envelope, kept for backends that need + // to forward the full request shape (mcp, openai-passthrough). + Raw []byte `json:"-"` +} + +// Part is one A2A message part. Type is "text", "file", "data". +type Part struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + MimeType string `json:"mime_type,omitempty"` + URI string `json:"uri,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` +} + +// Response is the backend's reply to an A2A request. +type Response struct { + // Parts is the response content. At least one part is required; + // most backends produce a single text part. + Parts []Part `json:"parts"` + // Final indicates this is the terminal response for the request + // (vs an intermediate streaming chunk). Single-shot backends + // always set true. + Final bool `json:"final"` +} + +// TextResponse is a convenience constructor for the common case: a +// single text part, terminal response. +func TextResponse(text string) Response { + return Response{ + Parts: []Part{{Type: "text", Text: text}}, + Final: true, + } +} + +// Backend is the seam every concrete handler implements. Two methods, +// no inheritance, no surprise side effects: HandleA2A is called once +// per inbound message, Close once at shutdown. +// +// Backends MUST be safe for concurrent HandleA2A calls — `molecule +// connect` may dispatch poll-batch messages in parallel. +type Backend interface { + // HandleA2A processes one inbound message and returns the reply. + // Implementations should respect ctx cancellation; the caller may + // cancel on shutdown. + HandleA2A(ctx context.Context, req Request) (Response, error) + // Close releases backend resources (subprocess, network conn, etc.). + // Called exactly once during graceful shutdown. Must be idempotent. + Close() error +} + +// Factory builds a Backend from per-backend config. Returned by +// each backend impl's `init()`-time registration. +type Factory func(cfg Config) (Backend, error) + +// Config is the loosely-typed bag of per-backend options. Each backend +// documents the keys it consumes in its package-level doc. Unknown +// keys are ignored so adding a key doesn't break existing setups. +type Config map[string]string + +// Get returns cfg[key], or fallback if unset. +func (c Config) Get(key, fallback string) string { + if v, ok := c[key]; ok && v != "" { + return v + } + return fallback +} + +// Require returns cfg[key], or an error if unset/empty. Use for keys +// the backend cannot start without. +func (c Config) Require(key string) (string, error) { + v := c[key] + if v == "" { + return "", fmt.Errorf("backend config: %q is required", key) + } + return v, nil +} + +var ( + registryMu sync.RWMutex + registry = map[string]Factory{} +) + +// Register adds a backend Factory under name. Called from each backend +// impl's init() block. Panics on duplicate name — registration drift +// is a programming error and should fail loudly at startup. +func Register(name string, factory Factory) { + if name == "" { + panic("backends.Register: name must be non-empty") + } + if factory == nil { + panic("backends.Register: factory must be non-nil") + } + registryMu.Lock() + defer registryMu.Unlock() + if _, dup := registry[name]; dup { + panic("backends.Register: duplicate backend name " + name) + } + registry[name] = factory +} + +// Build instantiates the named backend with cfg. Returns an error if +// no backend is registered under that name (typo, missing build tag, +// etc.) — callers should surface the error with a clear message that +// includes the list from `Names()`. +func Build(name string, cfg Config) (Backend, error) { + registryMu.RLock() + factory, ok := registry[name] + registryMu.RUnlock() + if !ok { + return nil, fmt.Errorf("backends.Build: unknown backend %q (registered: %v)", name, Names()) + } + return factory(cfg) +} + +// Names returns the sorted list of registered backend names. Used in +// `--help` rendering and error messages. +func Names() []string { + registryMu.RLock() + out := make([]string, 0, len(registry)) + for k := range registry { + out = append(out, k) + } + registryMu.RUnlock() + sort.Strings(out) + return out +} diff --git a/internal/backends/backend_test.go b/internal/backends/backend_test.go new file mode 100644 index 0000000..5346d17 --- /dev/null +++ b/internal/backends/backend_test.go @@ -0,0 +1,184 @@ +package backends_test + +import ( + "context" + "strings" + "testing" + + "github.com/Molecule-AI/molecule-cli/internal/backends" + _ "github.com/Molecule-AI/molecule-cli/internal/backends/mock" // register +) + +func TestRegister_DuplicatePanics(t *testing.T) { + defer func() { + r := recover() + if r == nil { + t.Fatal("expected panic on duplicate registration") + } + msg, ok := r.(string) + if !ok || !strings.Contains(msg, "duplicate backend name") { + t.Fatalf("unexpected panic value: %v", r) + } + }() + // "mock" is already registered via the package-level import above. + backends.Register("mock", func(backends.Config) (backends.Backend, error) { + return nil, nil + }) +} + +func TestRegister_EmptyNamePanics(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic on empty name") + } + }() + backends.Register("", func(backends.Config) (backends.Backend, error) { + return nil, nil + }) +} + +func TestRegister_NilFactoryPanics(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic on nil factory") + } + }() + backends.Register("nilfactory", nil) +} + +func TestBuild_UnknownBackend(t *testing.T) { + _, err := backends.Build("does-not-exist", nil) + if err == nil { + t.Fatal("expected error on unknown backend") + } + // Error must include the known list so users can see what's available. + if !strings.Contains(err.Error(), "registered:") { + t.Errorf("error missing registered-list hint: %v", err) + } +} + +func TestBuild_MockBackend(t *testing.T) { + b, err := backends.Build("mock", backends.Config{"reply": "pong: %s"}) + if err != nil { + t.Fatalf("Build(mock): %v", err) + } + defer b.Close() + + resp, err := b.HandleA2A(context.Background(), backends.Request{ + WorkspaceID: "ws-test", + Parts: []backends.Part{{Type: "text", Text: "ping"}}, + }) + if err != nil { + t.Fatalf("HandleA2A: %v", err) + } + if !resp.Final { + t.Error("expected Final=true on terminal response") + } + if len(resp.Parts) != 1 { + t.Fatalf("expected 1 part, got %d", len(resp.Parts)) + } + if got, want := resp.Parts[0].Text, "pong: ping"; got != want { + t.Errorf("reply: got %q, want %q", got, want) + } +} + +func TestBuild_MockBackend_DefaultTemplate(t *testing.T) { + b, err := backends.Build("mock", nil) + if err != nil { + t.Fatalf("Build(mock, nil cfg): %v", err) + } + defer b.Close() + + resp, err := b.HandleA2A(context.Background(), backends.Request{ + Parts: []backends.Part{{Type: "text", Text: "hi"}}, + }) + if err != nil { + t.Fatal(err) + } + if got, want := resp.Parts[0].Text, "echo: hi"; got != want { + t.Errorf("default template: got %q, want %q", got, want) + } +} + +func TestBuild_MockBackend_ConcatenatesTextParts(t *testing.T) { + b, err := backends.Build("mock", backends.Config{"reply": "%s"}) + if err != nil { + t.Fatal(err) + } + defer b.Close() + + resp, _ := b.HandleA2A(context.Background(), backends.Request{ + Parts: []backends.Part{ + {Type: "text", Text: "hello "}, + {Type: "data", Data: map[string]interface{}{"k": "v"}}, // ignored + {Type: "text", Text: "world"}, + }, + }) + if got, want := resp.Parts[0].Text, "hello world"; got != want { + t.Errorf("concatenation: got %q, want %q", got, want) + } +} + +func TestNames_IncludesMock(t *testing.T) { + names := backends.Names() + found := false + for _, n := range names { + if n == "mock" { + found = true + break + } + } + if !found { + t.Errorf("Names() missing 'mock': %v", names) + } +} + +func TestConfig_GetWithFallback(t *testing.T) { + cfg := backends.Config{"present": "yes"} + if got := cfg.Get("present", "fb"); got != "yes" { + t.Errorf("Get present: got %q, want yes", got) + } + if got := cfg.Get("absent", "fb"); got != "fb" { + t.Errorf("Get absent: got %q, want fb", got) + } + if got := cfg.Get("present", "fb"); got != "yes" { + t.Errorf("Get present consistent") + } + // Empty value triggers fallback (treated as unset). + cfg["empty"] = "" + if got := cfg.Get("empty", "fb"); got != "fb" { + t.Errorf("Get empty: got %q, want fb", got) + } +} + +func TestConfig_RequireMissing(t *testing.T) { + cfg := backends.Config{} + _, err := cfg.Require("nope") + if err == nil { + t.Fatal("expected error on missing required key") + } + if !strings.Contains(err.Error(), "nope") { + t.Errorf("error should name missing key: %v", err) + } +} + +func TestConfig_RequirePresent(t *testing.T) { + cfg := backends.Config{"k": "v"} + got, err := cfg.Require("k") + if err != nil { + t.Fatal(err) + } + if got != "v" { + t.Errorf("got %q, want v", got) + } +} + +func TestTextResponse_Shape(t *testing.T) { + resp := backends.TextResponse("hello") + if !resp.Final { + t.Error("TextResponse should be Final") + } + if len(resp.Parts) != 1 || resp.Parts[0].Type != "text" || resp.Parts[0].Text != "hello" { + t.Errorf("bad shape: %+v", resp) + } +} diff --git a/internal/backends/mock/mock.go b/internal/backends/mock/mock.go new file mode 100644 index 0000000..02eddf3 --- /dev/null +++ b/internal/backends/mock/mock.go @@ -0,0 +1,51 @@ +// Package mock implements a deterministic Backend for tests, demos, +// and CI smoke checks. +// +// Config keys: +// - reply: response template. `%s` is replaced with the concatenated +// inbound text parts. Default: "echo: %s". +// +// Registers itself as "mock". Activate via: +// +// molecule connect --backend mock --backend-opt reply="pong" +package mock + +import ( + "context" + "strings" + + "github.com/Molecule-AI/molecule-cli/internal/backends" +) + +func init() { + backends.Register("mock", New) +} + +// New builds a mock backend from cfg. +func New(cfg backends.Config) (backends.Backend, error) { + return &Backend{ + template: cfg.Get("reply", "echo: %s"), + }, nil +} + +// Backend is the mock implementation. Pure function: no state, no I/O, +// safe for concurrent use without locks. +type Backend struct { + template string +} + +// HandleA2A renders the reply template against the request's text +// parts and returns it as a single-part terminal response. +func (b *Backend) HandleA2A(_ context.Context, req backends.Request) (backends.Response, error) { + var sb strings.Builder + for _, p := range req.Parts { + if p.Type == "text" { + sb.WriteString(p.Text) + } + } + reply := strings.ReplaceAll(b.template, "%s", sb.String()) + return backends.TextResponse(reply), nil +} + +// Close is a no-op — nothing to release. +func (b *Backend) Close() error { return nil } diff --git a/internal/cmd/connect.go b/internal/cmd/connect.go new file mode 100644 index 0000000..0287e73 --- /dev/null +++ b/internal/cmd/connect.go @@ -0,0 +1,144 @@ +package cmd + +import ( + "context" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/Molecule-AI/molecule-cli/internal/backends" + _ "github.com/Molecule-AI/molecule-cli/internal/backends/mock" // register backend + "github.com/spf13/cobra" +) + +// --------------------------------------------------------------------------- +// molecule connect — bridge an external-runtime workspace to a local backend. +// +// The full M1+ design lives in the RFC at +// https://github.com/Molecule-AI/molecule-cli/issues/10. This file owns the +// command surface; the wiring (heartbeat, activity poll, dispatch) lands in +// internal/connect/ in subsequent PRs. +// --------------------------------------------------------------------------- + +var connectFlags struct { + backend string + backendOpts []string // KEY=VALUE pairs, repeatable + token string + mode string // "poll" (default) | "push" + intervalMs int // poll cadence, milliseconds + sinceSecs int // initial activity-cursor lookback + dryRun bool // build backend + print summary, do not start loops +} + +var connectCmd = &cobra.Command{ + Use: "connect ", + Short: "Bridge an external workspace to a local backend (Claude Code, exec, mock, ...)", + Long: `connect attaches the calling process to an external-runtime workspace. + +Inbound A2A messages routed to the workspace are dispatched to the +selected --backend. The default backend is "claude-code" (bridges into +a running Claude Code session via the channel plugin); use "mock" for +CI smoke and "exec" for arbitrary shell handlers. + +Authentication: the per-workspace token from the create response is +read from --token or the MOLECULE_WORKSPACE_TOKEN env var. The +platform URL is read from --api-url or MOLECULE_API_URL. + +Mode: poll (default) is the no-public-URL path — the CLI long-polls +the platform for inbound activity. push requires the local box to be +reachable from the platform (public HTTPS); use only when running on +a server with an inbound URL. + +Examples: + + # Default: bridge into a running Claude Code session + molecule connect ws_01HF2K... --token $WS_TOKEN + + # CI smoke / demo — replies are deterministic + molecule connect ws_01HF2K... --backend mock \ + --backend-opt reply="echo: %s" + + # Arbitrary shell handler + molecule connect ws_01HF2K... --backend exec \ + --backend-opt cmd="python myhandler.py" + +See full design: https://github.com/Molecule-AI/molecule-cli/issues/10`, + Args: cobra.ExactArgs(1), + RunE: runConnect, +} + +func init() { + connectCmd.Flags().StringVar(&connectFlags.backend, "backend", "claude-code", + fmt.Sprintf("Backend that handles inbound A2A messages (registered: %s)", + strings.Join(backends.Names(), ", "))) + connectCmd.Flags().StringArrayVar(&connectFlags.backendOpts, "backend-opt", nil, + "Backend-specific option, KEY=VALUE (repeatable)") + connectCmd.Flags().StringVar(&connectFlags.token, "token", + envOr("MOLECULE_WORKSPACE_TOKEN", ""), + "Workspace auth token (env: MOLECULE_WORKSPACE_TOKEN)") + connectCmd.Flags().StringVar(&connectFlags.mode, "mode", "poll", + "Delivery mode: poll (no public URL needed) | push") + connectCmd.Flags().IntVar(&connectFlags.intervalMs, "interval-ms", 1000, + "Poll-mode interval between activity fetches, in milliseconds") + connectCmd.Flags().IntVar(&connectFlags.sinceSecs, "since-secs", 30, + "Poll-mode initial cursor lookback, in seconds") + connectCmd.Flags().BoolVar(&connectFlags.dryRun, "dry-run", false, + "Build the backend and print the connection summary, but do not start loops") +} + +func runConnect(_ *cobra.Command, args []string) error { + workspaceID := strings.TrimSpace(args[0]) + if workspaceID == "" { + return &exitError{code: 2, msg: "connect: workspace-id is required"} + } + if connectFlags.token == "" { + return &exitError{code: 2, msg: "connect: --token (or MOLECULE_WORKSPACE_TOKEN) is required"} + } + if connectFlags.mode != "poll" && connectFlags.mode != "push" { + return &exitError{code: 2, msg: "connect: --mode must be poll or push"} + } + + cfg, err := parseBackendOpts(connectFlags.backendOpts) + if err != nil { + return &exitError{code: 2, msg: err.Error()} + } + + backend, err := backends.Build(connectFlags.backend, cfg) + if err != nil { + return &exitError{code: 2, msg: err.Error()} + } + + fmt.Fprintf(os.Stderr, "molecule connect: workspace=%s backend=%s mode=%s api=%s\n", + workspaceID, connectFlags.backend, connectFlags.mode, apiURL) + + if connectFlags.dryRun { + fmt.Fprintln(os.Stderr, "molecule connect: --dry-run; backend built ok, not starting loops") + return backend.Close() + } + + // Loops (heartbeat + activity poll + dispatch) land in internal/connect + // in PR M1.2. For M1.1 we wire signal handling so the command exits + // cleanly when invoked in --dry-run by tests, and so future loops + // inherit context cancellation. + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + <-ctx.Done() + fmt.Fprintln(os.Stderr, "molecule connect: shutting down") + return backend.Close() +} + +// parseBackendOpts converts repeated KEY=VALUE flags into a Config map. +func parseBackendOpts(opts []string) (backends.Config, error) { + cfg := backends.Config{} + for _, opt := range opts { + k, v, ok := strings.Cut(opt, "=") + if !ok || k == "" { + return nil, fmt.Errorf("--backend-opt: %q is not KEY=VALUE", opt) + } + cfg[k] = v + } + return cfg, nil +} diff --git a/internal/cmd/connect_test.go b/internal/cmd/connect_test.go new file mode 100644 index 0000000..ac744ec --- /dev/null +++ b/internal/cmd/connect_test.go @@ -0,0 +1,122 @@ +package cmd + +import ( + "strings" + "testing" +) + +func TestParseBackendOpts(t *testing.T) { + cases := []struct { + name string + input []string + want map[string]string + wantErr bool + }{ + {"empty", nil, map[string]string{}, false}, + {"single", []string{"reply=pong"}, map[string]string{"reply": "pong"}, false}, + { + "multiple", + []string{"reply=pong", "cmd=python x.py"}, + map[string]string{"reply": "pong", "cmd": "python x.py"}, + false, + }, + { + "value contains equals", + []string{"url=https://x.com?a=b"}, + map[string]string{"url": "https://x.com?a=b"}, + false, + }, + {"empty value allowed", []string{"k="}, map[string]string{"k": ""}, false}, + {"missing equals", []string{"justakey"}, nil, true}, + {"empty key", []string{"=v"}, nil, true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got, err := parseBackendOpts(tc.input) + if (err != nil) != tc.wantErr { + t.Fatalf("err = %v, wantErr = %v", err, tc.wantErr) + } + if tc.wantErr { + return + } + if len(got) != len(tc.want) { + t.Errorf("len: got %d, want %d (%+v)", len(got), len(tc.want), got) + } + for k, v := range tc.want { + if got[k] != v { + t.Errorf("key %q: got %q, want %q", k, got[k], v) + } + } + }) + } +} + +// TestConnect_FlagValidation walks the invalid argument paths the runner +// guards against. We don't actually open a socket — the flags are +// rejected before any I/O. +func TestConnect_FlagValidation(t *testing.T) { + cases := []struct { + name string + args []string + wantSub string + }{ + // Cobra catches missing args before runConnect runs. + {"no token", []string{"connect", "ws-1", "--backend", "mock"}, "token"}, + { + "bad mode", + []string{"connect", "ws-1", "--backend", "mock", "--token", "t", "--mode", "ftp"}, + "mode", + }, + { + "bad backend-opt", + []string{"connect", "ws-1", "--backend", "mock", "--token", "t", "--backend-opt", "noequals"}, + "KEY=VALUE", + }, + { + "unknown backend", + []string{"connect", "ws-1", "--backend", "nonesuch", "--token", "t"}, + "unknown backend", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // Reset package-level connectFlags between runs so a prior test's + // settings don't leak. cobra rebinds on Execute. + resetConnectFlags() + rootCmd.SetArgs(tc.args) + err := rootCmd.Execute() + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), tc.wantSub) { + t.Errorf("error %q missing %q", err.Error(), tc.wantSub) + } + }) + } +} + +// TestConnect_DryRun covers the happy path: valid flags, --dry-run set, +// runner builds the backend and exits without entering loops. +func TestConnect_DryRun(t *testing.T) { + resetConnectFlags() + rootCmd.SetArgs([]string{ + "connect", "ws-test", + "--backend", "mock", + "--token", "tok", + "--backend-opt", "reply=ok", + "--dry-run", + }) + if err := rootCmd.Execute(); err != nil { + t.Fatalf("dry-run should succeed: %v", err) + } +} + +func resetConnectFlags() { + connectFlags.backend = "claude-code" + connectFlags.backendOpts = nil + connectFlags.token = "" + connectFlags.mode = "poll" + connectFlags.intervalMs = 1000 + connectFlags.sinceSecs = 30 + connectFlags.dryRun = false +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 3aebe9a..4703165 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -89,6 +89,7 @@ func init() { rootCmd.AddCommand(platformCmd) rootCmd.AddCommand(configCmd) rootCmd.AddCommand(initCmd) + rootCmd.AddCommand(connectCmd) } // exitError wraps a user-facing error with a specific exit code.