forked from molecule-ai/molecule-core
Forked clean from public hackathon repo (Starfire-AgentTeam, BSL 1.1) with full rebrand to Molecule AI under github.com/Molecule-AI/molecule-monorepo. Brand: Starfire → Molecule AI. Slug: starfire / agent-molecule → molecule. Env vars: STARFIRE_* → MOLECULE_*. Go module: github.com/agent-molecule/platform → github.com/Molecule-AI/molecule-monorepo/platform. Python packages: starfire_plugin → molecule_plugin, starfire_agent → molecule_agent. DB: agentmolecule → molecule. History truncated; see public repo for prior commits and contributor attribution. Verified green: go test -race ./... (platform), pytest (workspace-template 1129 + sdk 132), vitest (canvas 352), build (mcp). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
228 lines
5.3 KiB
Go
228 lines
5.3 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// A2A JSON-RPC types (Google A2A protocol).
|
|
|
|
type a2aRequest struct {
|
|
JSONRPC string `json:"jsonrpc"`
|
|
ID string `json:"id"`
|
|
Method string `json:"method"`
|
|
Params any `json:"params"`
|
|
}
|
|
|
|
type taskSendParams struct {
|
|
ID string `json:"id"`
|
|
Message a2aMessage `json:"message"`
|
|
}
|
|
|
|
type a2aMessage struct {
|
|
Role string `json:"role"`
|
|
Parts []a2aPart `json:"parts"`
|
|
}
|
|
|
|
type a2aPart struct {
|
|
Type string `json:"type"`
|
|
Text string `json:"text,omitempty"`
|
|
}
|
|
|
|
type a2aTask struct {
|
|
ID string `json:"id"`
|
|
Status taskStatus `json:"status"`
|
|
Artifacts []artifact `json:"artifacts,omitempty"`
|
|
}
|
|
|
|
type taskStatus struct {
|
|
State string `json:"state"` // submitted, working, completed, failed, canceled
|
|
Message *a2aMessage `json:"message,omitempty"`
|
|
}
|
|
|
|
type artifact struct {
|
|
Parts []a2aPart `json:"parts"`
|
|
}
|
|
|
|
type a2aResponse struct {
|
|
JSONRPC string `json:"jsonrpc"`
|
|
ID string `json:"id"`
|
|
Result *a2aTask `json:"result,omitempty"`
|
|
Error *a2aError `json:"error,omitempty"`
|
|
}
|
|
|
|
type a2aError struct {
|
|
Code int `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
// a2aClient sends tasks to an A2A agent.
|
|
type a2aClient struct {
|
|
httpClient *http.Client
|
|
agentURL string
|
|
}
|
|
|
|
func newA2AClient(agentURL string) *a2aClient {
|
|
return &a2aClient{
|
|
agentURL: agentURL,
|
|
httpClient: &http.Client{Timeout: 60 * time.Second},
|
|
}
|
|
}
|
|
|
|
// SendTask sends a user message and returns the agent's text reply.
|
|
// Supports both blocking (tasks/send) and streaming (tasks/sendSubscribe via SSE).
|
|
func (c *a2aClient) SendTask(text string) (string, error) {
|
|
taskID := uuid.New().String()
|
|
reqBody := a2aRequest{
|
|
JSONRPC: "2.0",
|
|
ID: uuid.New().String(),
|
|
Method: "tasks/send",
|
|
Params: taskSendParams{
|
|
ID: taskID,
|
|
Message: a2aMessage{
|
|
Role: "user",
|
|
Parts: []a2aPart{{Type: "text", Text: text}},
|
|
},
|
|
},
|
|
}
|
|
|
|
body, err := json.Marshal(reqBody)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Post(c.agentURL, "application/json", bytes.NewReader(body))
|
|
if err != nil {
|
|
return "", fmt.Errorf("send task: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
b, _ := io.ReadAll(resp.Body)
|
|
return "", fmt.Errorf("agent returned status %d: %s", resp.StatusCode, b)
|
|
}
|
|
|
|
var a2aResp a2aResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&a2aResp); err != nil {
|
|
return "", fmt.Errorf("decode response: %w", err)
|
|
}
|
|
if a2aResp.Error != nil {
|
|
return "", fmt.Errorf("agent error %d: %s", a2aResp.Error.Code, a2aResp.Error.Message)
|
|
}
|
|
if a2aResp.Result == nil {
|
|
return "", fmt.Errorf("empty result from agent")
|
|
}
|
|
|
|
return extractText(a2aResp.Result), nil
|
|
}
|
|
|
|
// SendTaskStreaming calls tasks/sendSubscribe and streams chunks to the provided
|
|
// writer. Returns the full concatenated text when done.
|
|
func (c *a2aClient) SendTaskStreaming(text string, chunk func(string)) (string, error) {
|
|
taskID := uuid.New().String()
|
|
reqBody := a2aRequest{
|
|
JSONRPC: "2.0",
|
|
ID: uuid.New().String(),
|
|
Method: "tasks/sendSubscribe",
|
|
Params: taskSendParams{
|
|
ID: taskID,
|
|
Message: a2aMessage{
|
|
Role: "user",
|
|
Parts: []a2aPart{{Type: "text", Text: text}},
|
|
},
|
|
},
|
|
}
|
|
|
|
body, err := json.Marshal(reqBody)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Post(c.agentURL, "application/json", bytes.NewReader(body))
|
|
if err != nil {
|
|
return "", fmt.Errorf("send subscribe: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Fall back to blocking if agent doesn't support streaming
|
|
ct := resp.Header.Get("Content-Type")
|
|
if !strings.Contains(ct, "text/event-stream") {
|
|
var a2aResp a2aResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&a2aResp); err != nil {
|
|
return "", fmt.Errorf("decode fallback response: %w", err)
|
|
}
|
|
if a2aResp.Error != nil {
|
|
return "", fmt.Errorf("agent error %d: %s", a2aResp.Error.Code, a2aResp.Error.Message)
|
|
}
|
|
if a2aResp.Result == nil {
|
|
return "", fmt.Errorf("empty result from agent")
|
|
}
|
|
text := extractText(a2aResp.Result)
|
|
if chunk != nil {
|
|
chunk(text)
|
|
}
|
|
return text, nil
|
|
}
|
|
|
|
// Parse SSE stream
|
|
var full strings.Builder
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if !strings.HasPrefix(line, "data: ") {
|
|
continue
|
|
}
|
|
data := strings.TrimPrefix(line, "data: ")
|
|
if data == "[DONE]" {
|
|
break
|
|
}
|
|
var event a2aResponse
|
|
if err := json.Unmarshal([]byte(data), &event); err != nil {
|
|
continue
|
|
}
|
|
if event.Result == nil {
|
|
continue
|
|
}
|
|
t := extractText(event.Result)
|
|
if t != "" {
|
|
full.WriteString(t)
|
|
if chunk != nil {
|
|
chunk(t)
|
|
}
|
|
}
|
|
if event.Result.Status.State == "completed" || event.Result.Status.State == "failed" {
|
|
break
|
|
}
|
|
}
|
|
return full.String(), scanner.Err()
|
|
}
|
|
|
|
// extractText pulls the first text part from all artifacts in a task.
|
|
func extractText(task *a2aTask) string {
|
|
var sb strings.Builder
|
|
for _, art := range task.Artifacts {
|
|
for _, p := range art.Parts {
|
|
if p.Type == "text" {
|
|
sb.WriteString(p.Text)
|
|
}
|
|
}
|
|
}
|
|
// Also check status message (some agents put the reply there)
|
|
if sb.Len() == 0 && task.Status.Message != nil {
|
|
for _, p := range task.Status.Message.Parts {
|
|
if p.Type == "text" {
|
|
sb.WriteString(p.Text)
|
|
}
|
|
}
|
|
}
|
|
return sb.String()
|
|
}
|