molecule-core/platform/cmd/cli/a2a.go
Hongming Wang 24fec62d7f initial commit — Molecule AI platform
Forked clean from public hackathon repo (Starfire-AgentTeam, BSL 1.1)
with full rebrand to Molecule AI under github.com/Molecule-AI/molecule-monorepo.

Brand: Starfire → Molecule AI.
Slug: starfire / agent-molecule → molecule.
Env vars: STARFIRE_* → MOLECULE_*.
Go module: github.com/agent-molecule/platform → github.com/Molecule-AI/molecule-monorepo/platform.
Python packages: starfire_plugin → molecule_plugin, starfire_agent → molecule_agent.
DB: agentmolecule → molecule.

History truncated; see public repo for prior commits and contributor
attribution. Verified green: go test -race ./... (platform), pytest
(workspace-template 1129 + sdk 132), vitest (canvas 352), build (mcp).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 11:55:37 -07:00

228 lines
5.3 KiB
Go

package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/google/uuid"
)
// A2A JSON-RPC types (Google A2A protocol).
type a2aRequest struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Method string `json:"method"`
Params any `json:"params"`
}
type taskSendParams struct {
ID string `json:"id"`
Message a2aMessage `json:"message"`
}
type a2aMessage struct {
Role string `json:"role"`
Parts []a2aPart `json:"parts"`
}
type a2aPart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
}
type a2aTask struct {
ID string `json:"id"`
Status taskStatus `json:"status"`
Artifacts []artifact `json:"artifacts,omitempty"`
}
type taskStatus struct {
State string `json:"state"` // submitted, working, completed, failed, canceled
Message *a2aMessage `json:"message,omitempty"`
}
type artifact struct {
Parts []a2aPart `json:"parts"`
}
type a2aResponse struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result *a2aTask `json:"result,omitempty"`
Error *a2aError `json:"error,omitempty"`
}
type a2aError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// a2aClient sends tasks to an A2A agent.
type a2aClient struct {
httpClient *http.Client
agentURL string
}
func newA2AClient(agentURL string) *a2aClient {
return &a2aClient{
agentURL: agentURL,
httpClient: &http.Client{Timeout: 60 * time.Second},
}
}
// SendTask sends a user message and returns the agent's text reply.
// Supports both blocking (tasks/send) and streaming (tasks/sendSubscribe via SSE).
func (c *a2aClient) SendTask(text string) (string, error) {
taskID := uuid.New().String()
reqBody := a2aRequest{
JSONRPC: "2.0",
ID: uuid.New().String(),
Method: "tasks/send",
Params: taskSendParams{
ID: taskID,
Message: a2aMessage{
Role: "user",
Parts: []a2aPart{{Type: "text", Text: text}},
},
},
}
body, err := json.Marshal(reqBody)
if err != nil {
return "", fmt.Errorf("marshal request: %w", err)
}
resp, err := c.httpClient.Post(c.agentURL, "application/json", bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("send task: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("agent returned status %d: %s", resp.StatusCode, b)
}
var a2aResp a2aResponse
if err := json.NewDecoder(resp.Body).Decode(&a2aResp); err != nil {
return "", fmt.Errorf("decode response: %w", err)
}
if a2aResp.Error != nil {
return "", fmt.Errorf("agent error %d: %s", a2aResp.Error.Code, a2aResp.Error.Message)
}
if a2aResp.Result == nil {
return "", fmt.Errorf("empty result from agent")
}
return extractText(a2aResp.Result), nil
}
// SendTaskStreaming calls tasks/sendSubscribe and streams chunks to the provided
// writer. Returns the full concatenated text when done.
func (c *a2aClient) SendTaskStreaming(text string, chunk func(string)) (string, error) {
taskID := uuid.New().String()
reqBody := a2aRequest{
JSONRPC: "2.0",
ID: uuid.New().String(),
Method: "tasks/sendSubscribe",
Params: taskSendParams{
ID: taskID,
Message: a2aMessage{
Role: "user",
Parts: []a2aPart{{Type: "text", Text: text}},
},
},
}
body, err := json.Marshal(reqBody)
if err != nil {
return "", fmt.Errorf("marshal request: %w", err)
}
resp, err := c.httpClient.Post(c.agentURL, "application/json", bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("send subscribe: %w", err)
}
defer resp.Body.Close()
// Fall back to blocking if agent doesn't support streaming
ct := resp.Header.Get("Content-Type")
if !strings.Contains(ct, "text/event-stream") {
var a2aResp a2aResponse
if err := json.NewDecoder(resp.Body).Decode(&a2aResp); err != nil {
return "", fmt.Errorf("decode fallback response: %w", err)
}
if a2aResp.Error != nil {
return "", fmt.Errorf("agent error %d: %s", a2aResp.Error.Code, a2aResp.Error.Message)
}
if a2aResp.Result == nil {
return "", fmt.Errorf("empty result from agent")
}
text := extractText(a2aResp.Result)
if chunk != nil {
chunk(text)
}
return text, nil
}
// Parse SSE stream
var full strings.Builder
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
if data == "[DONE]" {
break
}
var event a2aResponse
if err := json.Unmarshal([]byte(data), &event); err != nil {
continue
}
if event.Result == nil {
continue
}
t := extractText(event.Result)
if t != "" {
full.WriteString(t)
if chunk != nil {
chunk(t)
}
}
if event.Result.Status.State == "completed" || event.Result.Status.State == "failed" {
break
}
}
return full.String(), scanner.Err()
}
// extractText pulls the first text part from all artifacts in a task.
func extractText(task *a2aTask) string {
var sb strings.Builder
for _, art := range task.Artifacts {
for _, p := range art.Parts {
if p.Type == "text" {
sb.WriteString(p.Text)
}
}
}
// Also check status message (some agents put the reply there)
if sb.Len() == 0 && task.Status.Message != nil {
for _, p := range task.Status.Message.Parts {
if p.Type == "text" {
sb.WriteString(p.Text)
}
}
}
return sb.String()
}