feat(connect): M1.3 — exec + claude-code backends

After this PR `molecule connect <id>` actually does something useful:
the default `--backend claude-code` runs `claude -p` as a subprocess
per inbound message, capturing stdout as the reply. The general
`--backend exec --backend-opt cmd="..."` lets operators bridge to any
program that reads stdin and writes stdout (Ollama, custom Python,
shell pipelines).

What's in:

- `internal/backends/exec/exec.go` — generic exec backend.
  - cmd  (required): shell command, run via /bin/sh -c (or cmd /c).
  - timeout (default 60s): per-message wall clock; subprocess is
    killed on timeout and the dispatcher keeps the message queued.
  - pass_meta (default false): when true, populates env with
    MOLECULE_WORKSPACE_ID / CALLER_ID / MESSAGE_ID / TASK_ID / METHOD.
  - Stdin = joined text parts; stdout = reply text.
  - Stderr is captured + surfaced in error messages so operators can
    see what their command printed.
  - Honours ctx cancellation — SIGTERM kills the subprocess immediately.

- `internal/backends/claudecode/claudecode.go` — thin shorthand.
  Translates {bin, args, timeout, pass_meta} into the equivalent exec
  config. Defaults: bin=claude, timeout=5m, pass_meta=true.
  Reusing exec means timeout/stdin/env handling stays in one place.

- `internal/cmd/connect.go` — registers both backends so `--help`
  shows them and the default `--backend claude-code` works without
  flag tuning.

Test plan:

- exec: cmd-required, bad-timeout, zero-timeout, echo-stdin-to-stdout,
  text-only-part-concat, non-zero-exit-surfaces-stderr, timeout-kills-
  runaway, pass_meta-injects-env, no-pass_meta-leaves-env-untouched,
  parent-env-inherited-when-pass_meta-off, ctx-cancel-kills-command.
- claude-code: registered, bin/args translation produces "echo -p hello",
  bad-timeout-surfaces, default-config-builds.
- All Unix tests gated on `requireUnix(t)` — Windows-shell semantics
  diverge; cross-platform coverage is M3 work (brew/scoop/winget).
- Full suite green under -race.

UX after this PR:

  # default — Claude Code on PATH
  molecule connect ws_01HF... --token $T

  # custom handler
  molecule connect ws_01HF... --backend exec \
      --backend-opt cmd="python myhandler.py"

  # different model via Claude Code
  molecule connect ws_01HF... \
      --backend-opt args="--model claude-sonnet-4-6"

  # CI smoke
  molecule connect ws_01HF... --backend mock --backend-opt reply=ok

Out of scope (M1.4 + later):

- M1.4: GoReleaser tag-triggered release.yml workflow
- stdio backend (one persistent subprocess + line-delimited JSON-RPC)
  — moved to M2 since exec covers the common case
- openai / mcp backends — M4 per RFC

RFC: https://github.com/Molecule-AI/molecule-cli/issues/10

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-30 04:23:36 -07:00
parent e9e234d750
commit b7a3c887c0
5 changed files with 505 additions and 1 deletions

View File

@ -0,0 +1,61 @@
// Package claudecode implements the `claude-code` backend: a thin
// shorthand for `exec` with `cmd="claude -p"`. Each inbound A2A
// message is dispatched to a fresh `claude --print` invocation; the
// model's stdout becomes the reply.
//
// Why a separate backend instead of telling users `--backend exec
// --backend-opt cmd="claude -p"`?
//
// - The default backend is "claude-code" — copy-paste-from-canvas
// should Just Work without the operator memorising flag spelling.
// - Future versions can add Claude-Code-specific config: model
// selection, system prompt, MCP forwarding. The exec backend
// stays generic.
//
// Config keys (all optional):
//
// - bin — claude binary path. Default "claude".
// - args — extra args appended after `-p`. Default "".
// - timeout — per-message timeout. Default "5m" (Claude Code
// responses can take a while; longer than exec's 60s default).
// - pass_meta — see exec backend. Default "true" — Claude Code
// sessions benefit from knowing who sent the message.
//
// Implementation: builds the equivalent exec config under the hood.
// Reusing exec means timeout/stdin/stderr/env handling stays in one
// place; bug fixes flow to both.
package claudecode
import (
"strings"
"github.com/Molecule-AI/molecule-cli/internal/backends"
exec "github.com/Molecule-AI/molecule-cli/internal/backends/exec"
)
func init() {
backends.Register("claude-code", New)
}
// New builds a claude-code backend from cfg. Translates the
// claude-code keys to an exec backend config and delegates.
func New(cfg backends.Config) (backends.Backend, error) {
bin := cfg.Get("bin", "claude")
extra := cfg.Get("args", "")
timeout := cfg.Get("timeout", "5m")
passMeta := cfg.Get("pass_meta", "true")
// Build the underlying shell command. -p (print mode) is the
// non-interactive Claude Code mode that reads stdin and writes
// stdout once.
cmd := bin + " -p"
if strings.TrimSpace(extra) != "" {
cmd += " " + extra
}
return exec.New(backends.Config{
"cmd": cmd,
"timeout": timeout,
"pass_meta": passMeta,
})
}

View File

@ -0,0 +1,80 @@
package claudecode_test
import (
"context"
"runtime"
"strings"
"testing"
"github.com/Molecule-AI/molecule-cli/internal/backends"
_ "github.com/Molecule-AI/molecule-cli/internal/backends/claudecode" // register
)
func requireUnix(t *testing.T) {
t.Helper()
if runtime.GOOS == "windows" {
t.Skip("Unix-only: shell command semantics differ on Windows")
}
}
func TestClaudeCode_Registered(t *testing.T) {
names := backends.Names()
found := false
for _, n := range names {
if n == "claude-code" {
found = true
break
}
}
if !found {
t.Errorf("claude-code missing from registry: %v", names)
}
}
// TestClaudeCode_BinArgsTranslation: the wrapper should compose bin +
// "-p" + extra-args into the underlying exec command. Use bin=echo
// args=hello so the command becomes "echo -p hello" and stdout is
// deterministic.
func TestClaudeCode_BinArgsTranslation(t *testing.T) {
requireUnix(t)
be, err := backends.Build("claude-code", backends.Config{
"bin": "echo",
"args": "hello",
"timeout": "5s",
})
if err != nil {
t.Fatal(err)
}
defer be.Close()
resp, err := be.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{{Type: "text", Text: "ignored-stdin"}},
})
if err != nil {
t.Fatal(err)
}
got := strings.TrimSpace(resp.Parts[0].Text)
if got != "-p hello" {
t.Errorf("got %q, want %q", got, "-p hello")
}
}
func TestClaudeCode_BadTimeoutSurfaces(t *testing.T) {
_, err := backends.Build("claude-code", backends.Config{"timeout": "nope"})
if err == nil {
t.Fatal("expected error on bad timeout")
}
}
// TestClaudeCode_LongTimeoutDefault: the default timeout (5m) should
// be longer than the bare exec backend default (60s). Verify by
// building with no timeout override and confirming a 90s sleep
// doesn't error from the wrapper's side. We don't actually wait —
// we just confirm building succeeds with no override (no error means
// the default parsed cleanly).
func TestClaudeCode_DefaultTimeoutAccepted(t *testing.T) {
be, err := backends.Build("claude-code", backends.Config{"bin": "true"})
if err != nil {
t.Fatalf("default config should build: %v", err)
}
be.Close()
}

View File

@ -0,0 +1,161 @@
// Package exec implements the `exec` backend: each inbound A2A
// message is dispatched to a configured shell command. The text parts
// are written to the subprocess's stdin; stdout becomes the reply.
//
// This is the most general external-bridge: any handler that can read
// stdin and write stdout works. Claude Code (`claude -p`), `ollama
// run <model>`, custom Python scripts, etc.
//
// Config keys:
// - cmd (required): shell command, e.g. "claude -p" or
// "python myhandler.py". Run via /bin/sh -c on Unix or cmd /c on
// Windows so quoting + pipes + env-var expansion work as users
// expect from a terminal.
// - timeout (optional): per-message timeout duration string
// (Go time.ParseDuration), default "60s". The subprocess is killed
// on timeout and the backend returns an error so the dispatcher
// keeps the message in the activity queue for re-delivery on a
// later run.
// - pass_meta (optional): when "true", populate the subprocess env
// with MOLECULE_WORKSPACE_ID, MOLECULE_CALLER_ID, MOLECULE_MESSAGE_ID,
// MOLECULE_TASK_ID, MOLECULE_METHOD. Useful for handlers that
// thread context across messages.
//
// Concurrency: HandleA2A is safe to call concurrently; each call
// spawns its own subprocess. The dispatcher serializes calls within a
// poll batch, so in practice there is at most one subprocess running.
//
// Security note: cmd runs through sh -c, which means the operator's
// command line is the trust boundary. Don't pass user-controlled
// strings into cmd. The inbound message text goes via stdin, not
// argv, so a malicious sender can't inject shell metacharacters.
package exec
import (
"bytes"
"context"
"fmt"
"os"
osexec "os/exec"
"runtime"
"strings"
"time"
"github.com/Molecule-AI/molecule-cli/internal/backends"
)
func init() {
backends.Register("exec", New)
}
// New builds an exec backend from cfg. The cmd key is required; other
// keys default sensibly.
func New(cfg backends.Config) (backends.Backend, error) {
cmd, err := cfg.Require("cmd")
if err != nil {
return nil, fmt.Errorf("exec backend: %w", err)
}
timeoutStr := cfg.Get("timeout", "60s")
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return nil, fmt.Errorf("exec backend: parse timeout %q: %w", timeoutStr, err)
}
if timeout <= 0 {
return nil, fmt.Errorf("exec backend: timeout must be positive, got %s", timeoutStr)
}
passMeta := strings.EqualFold(cfg.Get("pass_meta", "false"), "true")
return &Backend{
cmd: cmd,
timeout: timeout,
passMeta: passMeta,
}, nil
}
// Backend is the exec implementation. Stateless across messages — each
// call spawns a fresh subprocess.
type Backend struct {
cmd string
timeout time.Duration
passMeta bool
}
// HandleA2A spawns the configured command, pipes the joined text parts
// to stdin, captures stdout, and returns it as the reply. Stderr is
// captured separately and surfaced in the error message on failure so
// the operator can see what their command printed.
func (b *Backend) HandleA2A(ctx context.Context, req backends.Request) (backends.Response, error) {
input := joinTextParts(req.Parts)
runCtx, cancel := context.WithTimeout(ctx, b.timeout)
defer cancel()
shell, shellArg := platformShell()
cmd := osexec.CommandContext(runCtx, shell, shellArg, b.cmd)
cmd.Stdin = strings.NewReader(input)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if b.passMeta {
cmd.Env = append(os.Environ(),
"MOLECULE_WORKSPACE_ID="+req.WorkspaceID,
"MOLECULE_CALLER_ID="+req.CallerID,
"MOLECULE_MESSAGE_ID="+req.MessageID,
"MOLECULE_TASK_ID="+req.TaskID,
"MOLECULE_METHOD="+req.Method,
)
}
err := cmd.Run()
// Always surface stderr if the command produced any — operators
// rely on stderr for log lines even on success.
stderrTail := tail(stderr.String(), 1024)
if err != nil {
if runCtx.Err() == context.DeadlineExceeded {
return backends.Response{}, fmt.Errorf("exec backend: command %q timed out after %s (stderr: %s)",
b.cmd, b.timeout, stderrTail)
}
return backends.Response{}, fmt.Errorf("exec backend: command %q failed: %w (stderr: %s)",
b.cmd, err, stderrTail)
}
return backends.TextResponse(stdout.String()), nil
}
// Close is a no-op — exec spawns subprocesses on-demand, nothing to
// release at shutdown beyond what the OS already does when the parent
// exits.
func (b *Backend) Close() error { return nil }
// joinTextParts concatenates the text parts of a request, ignoring
// data/file parts. Text-only is the M1 contract; richer marshalling
// (e.g. JSON-on-stdin for backends that want full structure) is a
// future opt-in via a `format` config key.
func joinTextParts(parts []backends.Part) string {
var sb strings.Builder
for _, p := range parts {
if p.Type == "text" {
sb.WriteString(p.Text)
}
}
return sb.String()
}
// platformShell returns the shell binary + the "run this command
// string" argument for the current OS. On Windows, cmd.exe uses /c;
// everywhere else, /bin/sh -c works.
func platformShell() (string, string) {
if runtime.GOOS == "windows" {
return "cmd.exe", "/c"
}
return "/bin/sh", "-c"
}
// tail returns the last n bytes of s, prefixed with "..." if truncated.
// Used to keep stderr quotes in error messages bounded.
func tail(s string, n int) string {
if len(s) <= n {
return s
}
return "..." + s[len(s)-n:]
}

View File

@ -0,0 +1,200 @@
package exec_test
import (
"context"
"os"
"runtime"
"strings"
"testing"
"github.com/Molecule-AI/molecule-cli/internal/backends"
_ "github.com/Molecule-AI/molecule-cli/internal/backends/exec" // register
)
// requireUnix skips Windows tests that depend on /bin/sh shell semantics.
// Windows-shell coverage is in TestPlatformShell_Windows below.
func requireUnix(t *testing.T) {
t.Helper()
if runtime.GOOS == "windows" {
t.Skip("Unix-only: shell command semantics differ on Windows")
}
}
func TestExec_RequiresCmd(t *testing.T) {
_, err := backends.Build("exec", backends.Config{})
if err == nil {
t.Fatal("expected error when cmd is unset")
}
if !strings.Contains(err.Error(), "cmd") {
t.Errorf("error should mention cmd: %v", err)
}
}
func TestExec_RejectsBadTimeout(t *testing.T) {
_, err := backends.Build("exec", backends.Config{"cmd": "echo hi", "timeout": "not-a-duration"})
if err == nil {
t.Fatal("expected error on bad timeout")
}
}
func TestExec_RejectsZeroTimeout(t *testing.T) {
_, err := backends.Build("exec", backends.Config{"cmd": "echo hi", "timeout": "0s"})
if err == nil {
t.Fatal("expected error on zero timeout")
}
}
func TestExec_EchoStdinToStdout(t *testing.T) {
requireUnix(t)
be, err := backends.Build("exec", backends.Config{"cmd": "cat"})
if err != nil {
t.Fatal(err)
}
defer be.Close()
resp, err := be.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{{Type: "text", Text: "hello world"}},
})
if err != nil {
t.Fatal(err)
}
if got := resp.Parts[0].Text; got != "hello world" {
t.Errorf("got %q, want %q", got, "hello world")
}
if !resp.Final {
t.Error("expected Final=true")
}
}
func TestExec_ConcatenatesTextPartsOnly(t *testing.T) {
requireUnix(t)
be, _ := backends.Build("exec", backends.Config{"cmd": "cat"})
defer be.Close()
resp, _ := be.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{
{Type: "text", Text: "a"},
{Type: "data", Data: map[string]interface{}{"k": "v"}}, // ignored
{Type: "text", Text: "b"},
},
})
if got := resp.Parts[0].Text; got != "ab" {
t.Errorf("got %q, want ab", got)
}
}
func TestExec_NonZeroExitSurfacesStderr(t *testing.T) {
requireUnix(t)
be, _ := backends.Build("exec", backends.Config{
"cmd": "echo my-stderr-text >&2; exit 17",
})
defer be.Close()
_, err := be.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{{Type: "text", Text: "x"}},
})
if err == nil {
t.Fatal("expected error on exit 17")
}
if !strings.Contains(err.Error(), "my-stderr-text") {
t.Errorf("error should include stderr: %v", err)
}
}
func TestExec_TimeoutKillsRunaway(t *testing.T) {
requireUnix(t)
be, _ := backends.Build("exec", backends.Config{
"cmd": "sleep 5",
"timeout": "100ms",
})
defer be.Close()
_, err := be.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{{Type: "text", Text: "x"}},
})
if err == nil {
t.Fatal("expected timeout error")
}
if !strings.Contains(err.Error(), "timed out") {
t.Errorf("error should mention timeout: %v", err)
}
}
func TestExec_PassMetaInjectsEnv(t *testing.T) {
requireUnix(t)
be, _ := backends.Build("exec", backends.Config{
"cmd": `printf "ws=%s caller=%s msg=%s task=%s method=%s" "$MOLECULE_WORKSPACE_ID" "$MOLECULE_CALLER_ID" "$MOLECULE_MESSAGE_ID" "$MOLECULE_TASK_ID" "$MOLECULE_METHOD"`,
"pass_meta": "true",
})
defer be.Close()
resp, err := be.HandleA2A(context.Background(), backends.Request{
WorkspaceID: "ws-1",
CallerID: "ws-2",
MessageID: "act-3",
TaskID: "task-4",
Method: "message/send",
Parts: []backends.Part{{Type: "text", Text: "x"}},
})
if err != nil {
t.Fatal(err)
}
want := "ws=ws-1 caller=ws-2 msg=act-3 task=task-4 method=message/send"
if got := resp.Parts[0].Text; got != want {
t.Errorf("env injection: got %q, want %q", got, want)
}
}
func TestExec_NoPassMetaLeavesEnvUntouched(t *testing.T) {
requireUnix(t)
be, _ := backends.Build("exec", backends.Config{
"cmd": `printf "%s" "$MOLECULE_WORKSPACE_ID"`,
// pass_meta unset → defaults to false
})
defer be.Close()
resp, _ := be.HandleA2A(context.Background(), backends.Request{
WorkspaceID: "ws-secret",
Parts: []backends.Part{{Type: "text", Text: "x"}},
})
if got := resp.Parts[0].Text; got != "" {
t.Errorf("expected empty env (pass_meta off); got %q", got)
}
}
func TestExec_ParentEnvAvailableEvenWhenPassMetaOff(t *testing.T) {
requireUnix(t)
// When pass_meta is off, we still inherit the parent process env
// (cmd.Env nil → os.Environ). Verify by reading PATH.
be, _ := backends.Build("exec", backends.Config{"cmd": `printf "%s" "$PATH"`})
defer be.Close()
resp, _ := be.HandleA2A(context.Background(), backends.Request{
Parts: []backends.Part{{Type: "text", Text: "x"}},
})
if !strings.Contains(resp.Parts[0].Text, os.Getenv("PATH")[:min(len(os.Getenv("PATH")), 8)]) {
t.Errorf("expected PATH inheritance; got %q", resp.Parts[0].Text)
}
}
// TestExec_ContextCancelKillsCommand: when the caller's ctx is
// cancelled mid-run, the subprocess is killed immediately (vs waiting
// for our internal timeout to fire).
func TestExec_ContextCancelKillsCommand(t *testing.T) {
requireUnix(t)
be, _ := backends.Build("exec", backends.Config{
"cmd": "sleep 5",
"timeout": "30s",
})
defer be.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
_, err := be.HandleA2A(ctx, backends.Request{
Parts: []backends.Part{{Type: "text", Text: "x"}},
})
if err == nil {
t.Fatal("expected error on cancelled ctx")
}
}
// Tiny helper since math/min is generic since Go 1.21.
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@ -10,7 +10,9 @@ import (
"time"
"github.com/Molecule-AI/molecule-cli/internal/backends"
_ "github.com/Molecule-AI/molecule-cli/internal/backends/mock" // register backend
_ "github.com/Molecule-AI/molecule-cli/internal/backends/claudecode" // register backend
_ "github.com/Molecule-AI/molecule-cli/internal/backends/exec" // register backend
_ "github.com/Molecule-AI/molecule-cli/internal/backends/mock" // register backend
"github.com/Molecule-AI/molecule-cli/internal/connect"
"github.com/spf13/cobra"
)