Merge pull request #2947 from Molecule-AI/staging
staging → main: auto-promote c4807a9
This commit is contained in:
commit
f5ea812e9d
1
.github/workflows/ci.yml
vendored
1
.github/workflows/ci.yml
vendored
@ -387,6 +387,7 @@ jobs:
|
|||||||
"a2a_mcp_server.py"
|
"a2a_mcp_server.py"
|
||||||
"mcp_cli.py"
|
"mcp_cli.py"
|
||||||
"a2a_tools.py"
|
"a2a_tools.py"
|
||||||
|
"a2a_tools_inbox.py"
|
||||||
"inbox.py"
|
"inbox.py"
|
||||||
"platform_auth.py"
|
"platform_auth.py"
|
||||||
)
|
)
|
||||||
|
|||||||
@ -56,6 +56,7 @@ TOP_LEVEL_MODULES = {
|
|||||||
"a2a_mcp_server",
|
"a2a_mcp_server",
|
||||||
"a2a_tools",
|
"a2a_tools",
|
||||||
"a2a_tools_delegation",
|
"a2a_tools_delegation",
|
||||||
|
"a2a_tools_inbox",
|
||||||
"a2a_tools_memory",
|
"a2a_tools_memory",
|
||||||
"a2a_tools_messaging",
|
"a2a_tools_messaging",
|
||||||
"a2a_tools_rbac",
|
"a2a_tools_rbac",
|
||||||
|
|||||||
@ -0,0 +1,177 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go/ast"
|
||||||
|
"go/parser"
|
||||||
|
"go/token"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAgentMessageBroadcastsArePersisted is a forward-looking AST
|
||||||
|
// gate: every function in this package that broadcasts an
|
||||||
|
// `AGENT_MESSAGE` WebSocket event MUST also call
|
||||||
|
// `INSERT INTO activity_logs` somewhere in its body.
|
||||||
|
//
|
||||||
|
// The reno-stars production data-loss bug (CEO Ryan PC's long-form
|
||||||
|
// onboarding-friction message visible live but missing on reload)
|
||||||
|
// happened because mcp_tools.go:toolSendMessageToUser broadcast WS
|
||||||
|
// without a paired INSERT — while the HTTP /notify sibling DID
|
||||||
|
// persist. The fix added the INSERT; this gate prevents the regression
|
||||||
|
// class from re-emerging in any future chat-bearing tool.
|
||||||
|
//
|
||||||
|
// Why an AST gate vs a code-review checklist (per memory
|
||||||
|
// feedback_behavior_based_ast_gates.md): "pin invariants by what a
|
||||||
|
// function calls, not what it's named". The shape that loses data is:
|
||||||
|
//
|
||||||
|
// BroadcastOnly(_, "AGENT_MESSAGE", _) without an INSERT companion
|
||||||
|
//
|
||||||
|
// Any new tool that emits AGENT_MESSAGE must persist or the next
|
||||||
|
// canvas refresh drops the message — same shape as reno-stars. A
|
||||||
|
// reviewer can miss this; the AST walk can't.
|
||||||
|
//
|
||||||
|
// Allowlist: empty by intent. If a future use case genuinely needs
|
||||||
|
// fire-and-forget broadcast (e.g., transient typing indicators that
|
||||||
|
// should NOT survive reload), add an entry here AND document why.
|
||||||
|
// "Doesn't need to persist" is rarely the right answer for chat —
|
||||||
|
// the canvas history is the source of truth.
|
||||||
|
func TestAgentMessageBroadcastsArePersisted(t *testing.T) {
|
||||||
|
wd, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getwd: %v", err)
|
||||||
|
}
|
||||||
|
entries, err := os.ReadDir(wd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("readdir %s: %v", wd, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type violation struct {
|
||||||
|
file string
|
||||||
|
fn string
|
||||||
|
}
|
||||||
|
var violations []violation
|
||||||
|
|
||||||
|
for _, ent := range entries {
|
||||||
|
name := ent.Name()
|
||||||
|
if ent.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
path := filepath.Join(wd, name)
|
||||||
|
fset := token.NewFileSet()
|
||||||
|
file, err := parser.ParseFile(fset, path, nil, parser.ParseComments)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parse %s: %v", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, decl := range file.Decls {
|
||||||
|
fn, ok := decl.(*ast.FuncDecl)
|
||||||
|
if !ok || fn.Body == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !funcEmitsAgentMessageBroadcast(fn) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !funcInsertsIntoActivityLogs(fn) {
|
||||||
|
violations = append(violations, violation{file: name, fn: fn.Name.Name})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(violations) > 0 {
|
||||||
|
sort.Slice(violations, func(i, j int) bool {
|
||||||
|
if violations[i].file != violations[j].file {
|
||||||
|
return violations[i].file < violations[j].file
|
||||||
|
}
|
||||||
|
return violations[i].fn < violations[j].fn
|
||||||
|
})
|
||||||
|
var buf strings.Builder
|
||||||
|
for _, v := range violations {
|
||||||
|
buf.WriteString(" - ")
|
||||||
|
buf.WriteString(v.file)
|
||||||
|
buf.WriteString(":")
|
||||||
|
buf.WriteString(v.fn)
|
||||||
|
buf.WriteString("\n")
|
||||||
|
}
|
||||||
|
t.Errorf(`function(s) broadcast `+"`AGENT_MESSAGE`"+` without persisting to activity_logs:
|
||||||
|
|
||||||
|
%s
|
||||||
|
This is the reno-stars data-loss regression class: live message
|
||||||
|
visible to the user, but missing on reload because activity_log was
|
||||||
|
never written. Every chat-bearing broadcast MUST be paired with:
|
||||||
|
|
||||||
|
INSERT INTO activity_logs (workspace_id, activity_type, method,
|
||||||
|
summary, response_body, status)
|
||||||
|
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||||
|
|
||||||
|
See activity.go:Notify and mcp_tools.go:toolSendMessageToUser for
|
||||||
|
the canonical shapes. Don't add an allowlist entry without a
|
||||||
|
documented reason — the canvas chat history is the source of truth
|
||||||
|
and silently dropping messages is a P0 user trust break.`,
|
||||||
|
buf.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// funcEmitsAgentMessageBroadcast walks fn.Body for any CallExpr that
|
||||||
|
// looks like `*.BroadcastOnly(_, "AGENT_MESSAGE", _)`.
|
||||||
|
func funcEmitsAgentMessageBroadcast(fn *ast.FuncDecl) bool {
|
||||||
|
var found bool
|
||||||
|
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||||
|
call, ok := n.(*ast.CallExpr)
|
||||||
|
if !ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
sel, ok := call.Fun.(*ast.SelectorExpr)
|
||||||
|
if !ok || sel.Sel.Name != "BroadcastOnly" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// BroadcastOnly(workspaceID, eventType, payload) — the second
|
||||||
|
// arg is the event name. Match by string-literal value.
|
||||||
|
if len(call.Args) < 2 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
lit, ok := call.Args[1].(*ast.BasicLit)
|
||||||
|
if !ok || lit.Kind != token.STRING {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
raw := lit.Value
|
||||||
|
if unq, err := strconv.Unquote(raw); err == nil {
|
||||||
|
raw = unq
|
||||||
|
}
|
||||||
|
if raw == "AGENT_MESSAGE" {
|
||||||
|
found = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
|
||||||
|
// funcInsertsIntoActivityLogs walks fn.Body for any STRING BasicLit
|
||||||
|
// whose body contains `INSERT INTO activity_logs` (the SQL literal
|
||||||
|
// passed to ExecContext). Matches the substring rather than a strict
|
||||||
|
// regex because we don't care about the exact INSERT shape here —
|
||||||
|
// only that the function persists. Specific shape pinning lives in
|
||||||
|
// the per-handler test (see TestMCPHandler_SendMessageToUser_*).
|
||||||
|
func funcInsertsIntoActivityLogs(fn *ast.FuncDecl) bool {
|
||||||
|
var found bool
|
||||||
|
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||||
|
lit, ok := n.(*ast.BasicLit)
|
||||||
|
if !ok || lit.Kind != token.STRING {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
raw := lit.Value
|
||||||
|
if unq, err := strconv.Unquote(raw); err == nil {
|
||||||
|
raw = unq
|
||||||
|
}
|
||||||
|
if strings.Contains(raw, "INSERT INTO activity_logs") {
|
||||||
|
found = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return found
|
||||||
|
}
|
||||||
@ -11,18 +11,21 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/DATA-DOG/go-sqlmock"
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newMCPHandler is a test helper that constructs an MCPHandler backed by the
|
// newMCPHandler is a test helper that constructs an MCPHandler backed by the
|
||||||
// sqlmock DB set up by setupTestDB.
|
// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers
|
||||||
|
// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the
|
||||||
|
// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast.
|
||||||
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
|
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
mock := setupTestDB(t)
|
mock := setupTestDB(t)
|
||||||
h := NewMCPHandler(db.DB, events.NewBroadcaster(nil))
|
h := NewMCPHandler(db.DB, newTestBroadcaster())
|
||||||
return h, mock
|
return h, mock
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -628,6 +631,170 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s pins the
|
||||||
|
// "best-effort persistence" contract: when the activity_log INSERT
|
||||||
|
// fails (DB hiccup, constraint violation, transient connection drop),
|
||||||
|
// the tool MUST still return success to the agent because the WS
|
||||||
|
// broadcast already succeeded — the user has seen the message.
|
||||||
|
//
|
||||||
|
// This matches /notify (activity.go) behavior. Returning an error
|
||||||
|
// here would cause the agent to retry and re-broadcast, double-
|
||||||
|
// rendering the message in the user's live chat panel for every
|
||||||
|
// retry until the DB recovers.
|
||||||
|
func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
|
||||||
|
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||||
|
h, mock := newMCPHandler(t)
|
||||||
|
|
||||||
|
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||||
|
WithArgs("ws-err").
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||||
|
|
||||||
|
// INSERT fails — must NOT abort the tool response.
|
||||||
|
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||||
|
WillReturnError(errors.New("transient db error"))
|
||||||
|
|
||||||
|
w := mcpPost(t, h, "ws-err", map[string]interface{}{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": 100,
|
||||||
|
"method": "tools/call",
|
||||||
|
"params": map[string]interface{}{
|
||||||
|
"name": "send_message_to_user",
|
||||||
|
"arguments": map[string]interface{}{
|
||||||
|
"message": "should not be lost from the live chat",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
var resp mcpResponse
|
||||||
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||||
|
t.Fatalf("response was not valid JSON-RPC: %v", err)
|
||||||
|
}
|
||||||
|
// Tool response is success — INSERT failure logged, broadcast
|
||||||
|
// already succeeded.
|
||||||
|
if resp.Error != nil {
|
||||||
|
t.Errorf("tool response should be success on DB error (broadcast won), got JSON-RPC error: %+v", resp.Error)
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("expected DB calls in order: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMCPHandler_SendMessageToUser_ResponseBodyShape pins the
|
||||||
|
// response_body JSON shape stored in activity_logs. This shape MUST
|
||||||
|
// match what the canvas hydrater (extractResponseText in
|
||||||
|
// historyHydration.ts) reads — specifically `{"result": "<text>"}`.
|
||||||
|
// Any drift in the JSON shape silently breaks chat history without
|
||||||
|
// failing the INSERT.
|
||||||
|
//
|
||||||
|
// Caught the same drift class flagged in
|
||||||
|
// feedback_assert_exact_not_substring.md: a substring match on
|
||||||
|
// "result" would pass even if the field were renamed; we assert the
|
||||||
|
// exact JSON shape.
|
||||||
|
func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
|
||||||
|
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||||
|
h, mock := newMCPHandler(t)
|
||||||
|
|
||||||
|
const userMessage = "Hi there from the agent"
|
||||||
|
|
||||||
|
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||||
|
WithArgs("ws-shape").
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||||
|
|
||||||
|
// Capture the response_body argument and assert its exact shape.
|
||||||
|
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||||
|
WithArgs(
|
||||||
|
"ws-shape",
|
||||||
|
sqlmock.AnyArg(), // summary
|
||||||
|
// The response_body MUST be JSON `{"result": "<message>"}`.
|
||||||
|
// Any other shape (e.g., wrapping in a Task object) breaks
|
||||||
|
// the canvas hydrater's `body.result` extractor.
|
||||||
|
`{"result":"`+userMessage+`"}`,
|
||||||
|
).
|
||||||
|
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||||
|
|
||||||
|
w := mcpPost(t, h, "ws-shape", map[string]interface{}{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": 101,
|
||||||
|
"method": "tools/call",
|
||||||
|
"params": map[string]interface{}{
|
||||||
|
"name": "send_message_to_user",
|
||||||
|
"arguments": map[string]interface{}{
|
||||||
|
"message": userMessage,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if w.Code != 200 {
|
||||||
|
t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String())
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("response_body shape drift — would silently break canvas chat history: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix
|
||||||
|
// for the reno-stars / CEO Ryan PC chat-history data-loss bug:
|
||||||
|
// external claude-code agents using molecule-mcp's send_message_to_user
|
||||||
|
// tool route through THIS handler (not the HTTP /notify endpoint),
|
||||||
|
// and the handler used to broadcast WS only — visible live, gone on
|
||||||
|
// reload because nothing wrote to activity_logs.
|
||||||
|
//
|
||||||
|
// Pins:
|
||||||
|
// - INSERT happens on the success path (broadcast + DB write).
|
||||||
|
// - INSERT shape mirrors the HTTP /notify handler exactly:
|
||||||
|
// activity_type='a2a_receive', method='notify', request_body NULL,
|
||||||
|
// response_body={"result": message}, status='ok'. The canvas
|
||||||
|
// hydration query (`type=a2a_receive&source=canvas`) treats
|
||||||
|
// both writers as the same shape — drift here means the bug
|
||||||
|
// re-surfaces silently.
|
||||||
|
func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
|
||||||
|
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||||
|
h, mock := newMCPHandler(t)
|
||||||
|
|
||||||
|
// Workspace lookup — the handler verifies the workspace exists
|
||||||
|
// before it does anything else. Returning a name lets the
|
||||||
|
// broadcast payload populate; the test doesn't assert on the
|
||||||
|
// broadcast (no observable WS in this fake), only on the DB.
|
||||||
|
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||||
|
WithArgs("ws-msg").
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||||
|
|
||||||
|
// The persistence INSERT — pin the exact shape so a future
|
||||||
|
// refactor that switches columns or drops `method='notify'`
|
||||||
|
// breaks the test loud, not silently. Match by regex on the
|
||||||
|
// table + activity_type + method literals.
|
||||||
|
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||||
|
WithArgs(
|
||||||
|
"ws-msg",
|
||||||
|
sqlmock.AnyArg(), // summary "Agent message: ..."
|
||||||
|
sqlmock.AnyArg(), // response_body JSON
|
||||||
|
).
|
||||||
|
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||||
|
|
||||||
|
w := mcpPost(t, h, "ws-msg", map[string]interface{}{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": 99,
|
||||||
|
"method": "tools/call",
|
||||||
|
"params": map[string]interface{}{
|
||||||
|
"name": "send_message_to_user",
|
||||||
|
"arguments": map[string]interface{}{
|
||||||
|
"message": "Hello, this should persist!",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
var resp mcpResponse
|
||||||
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||||
|
t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String())
|
||||||
|
}
|
||||||
|
if resp.Error != nil {
|
||||||
|
t.Errorf("unexpected JSON-RPC error: %+v", resp.Error)
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ─────────────────────────────────────────────────────────────────────────────
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
// Parse error
|
// Parse error
|
||||||
// ─────────────────────────────────────────────────────────────────────────────
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|||||||
@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
|||||||
"name": wsName,
|
"name": wsName,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Persist to activity_logs so chat history loaders surface this
|
||||||
|
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
|
||||||
|
// the MCP-bridge variant of send_message_to_user broadcast WS
|
||||||
|
// only — visible live, gone on reload — while the HTTP /notify
|
||||||
|
// sibling already had this fix (activity.go:535). External
|
||||||
|
// claude-code agents using molecule-mcp's send_message_to_user
|
||||||
|
// tool route through THIS handler, not /notify, so they were
|
||||||
|
// hitting the unfixed path.
|
||||||
|
//
|
||||||
|
// Shape mirrors activity.go's Notify handler exactly so the
|
||||||
|
// canvas chat-history hydration treats both the same:
|
||||||
|
// - activity_type='a2a_receive' joins the source=canvas filter
|
||||||
|
// - source_id is omitted → defaults to NULL ("canvas-source")
|
||||||
|
// - method='notify' tags it as a push (vs a real A2A receive)
|
||||||
|
// - request_body=NULL so the loader doesn't draw a duplicate
|
||||||
|
// "user" bubble
|
||||||
|
// - response_body={"result": "<text>"} feeds extractResponseText
|
||||||
|
// directly
|
||||||
|
//
|
||||||
|
// Errors are log-only — the broadcast already returned ok to the
|
||||||
|
// caller, the user has seen the message, and the persistence
|
||||||
|
// failure mode (DB hiccup) shouldn't block the tool call. The
|
||||||
|
// downside is the same as pre-fix: message vanishes on reload
|
||||||
|
// after a transient DB error. Log it so operators have a signal.
|
||||||
|
respPayload := map[string]interface{}{"result": message}
|
||||||
|
respJSON, _ := json.Marshal(respPayload)
|
||||||
|
preview := message
|
||||||
|
if len(preview) > 80 {
|
||||||
|
preview = preview[:80] + "…"
|
||||||
|
}
|
||||||
|
if _, err := h.database.ExecContext(ctx, `
|
||||||
|
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||||
|
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||||
|
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||||
|
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
|
||||||
|
}
|
||||||
|
|
||||||
return "Message sent.", nil
|
return "Message sent.", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -154,127 +154,15 @@ from a2a_tools_memory import ( # noqa: E402 (import after the top-of-module im
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# Inbox tool handlers — extracted to a2a_tools_inbox (RFC #2873 iter 4e).
|
||||||
# Inbox tools — inbound delivery for the standalone molecule-mcp path.
|
# Re-imported here so call sites + tests that reference
|
||||||
# ---------------------------------------------------------------------------
|
# ``a2a_tools.tool_inbox_peek`` / ``tool_inbox_pop`` / ``tool_wait_for_message``
|
||||||
#
|
# / ``_enrich_inbound_for_agent`` / ``_INBOX_NOT_ENABLED_MSG`` keep
|
||||||
# The InboxState singleton is set by mcp_cli before the MCP server starts
|
# resolving identically.
|
||||||
# (see workspace/inbox.py for the rationale). In-container runtimes never
|
from a2a_tools_inbox import ( # noqa: E402 (import after the top-of-module imports)
|
||||||
# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and
|
_INBOX_NOT_ENABLED_MSG,
|
||||||
# these tools surface an informational error rather than raising.
|
_enrich_inbound_for_agent,
|
||||||
#
|
tool_inbox_peek,
|
||||||
# When-to-use guidance (mirrored in platform_tools/registry.py): agents
|
tool_inbox_pop,
|
||||||
# in standalone-runtime mode should call ``wait_for_message`` to block
|
tool_wait_for_message,
|
||||||
# on the next inbound message after they've emitted a reply, forming
|
|
||||||
# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting
|
|
||||||
# the queue without consuming; ``inbox_pop`` removes a handled message.
|
|
||||||
|
|
||||||
_INBOX_NOT_ENABLED_MSG = (
|
|
||||||
"Error: inbox polling is not enabled in this runtime. The standalone "
|
|
||||||
"molecule-mcp wrapper activates it; in-container runtimes receive "
|
|
||||||
"messages via push delivery and do not need these tools."
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _enrich_inbound_for_agent(d: dict) -> dict:
|
|
||||||
"""Add peer_name / peer_role / agent_card_url to a poll-path message.
|
|
||||||
|
|
||||||
The PUSH path (a2a_mcp_server._build_channel_notification) already
|
|
||||||
enriches the meta dict with these fields, so a Claude Code host
|
|
||||||
with channel-push sees them. The POLL path goes through
|
|
||||||
InboxMessage.to_dict, which is intentionally identity-free (the
|
|
||||||
storage layer doesn't know about the registry cache). Without this
|
|
||||||
helper, every non-Claude-Code MCP client that uses inbox_peek /
|
|
||||||
wait_for_message gets a plain message and the receiving agent
|
|
||||||
can't tell who's writing — breaking the contract documented in
|
|
||||||
a2a_mcp_server.py:303-345 ("In both paths the same fields apply").
|
|
||||||
|
|
||||||
Cache-first non-blocking enrichment (same shape as push): on cache
|
|
||||||
miss the helper returns the bare message; the next call within the
|
|
||||||
5-min TTL hits the warm cache. Failure to enrich is non-fatal —
|
|
||||||
the agent still gets text + peer_id + kind + activity_id, just
|
|
||||||
without the friendly identity.
|
|
||||||
"""
|
|
||||||
peer_id = d.get("peer_id") or ""
|
|
||||||
if not peer_id:
|
|
||||||
# canvas_user — no peer to enrich; helper returns the plain
|
|
||||||
# message unchanged so the canvas reply path still works.
|
|
||||||
return d
|
|
||||||
try:
|
|
||||||
from a2a_client import ( # local import — avoid module-load cycle
|
|
||||||
_agent_card_url_for,
|
|
||||||
enrich_peer_metadata_nonblocking,
|
|
||||||
)
|
|
||||||
except Exception: # noqa: BLE001
|
|
||||||
# If a2a_client is unavailable (test harness, partial install),
|
|
||||||
# degrade gracefully — agent still gets the bare envelope.
|
|
||||||
return d
|
|
||||||
record = enrich_peer_metadata_nonblocking(peer_id)
|
|
||||||
if record is not None:
|
|
||||||
if name := record.get("name"):
|
|
||||||
d["peer_name"] = name
|
|
||||||
if role := record.get("role"):
|
|
||||||
d["peer_role"] = role
|
|
||||||
# agent_card_url is constructable from peer_id alone — surface it
|
|
||||||
# even when registry enrichment misses, so the receiving agent has
|
|
||||||
# a single endpoint to hit for the peer's full capability list.
|
|
||||||
d["agent_card_url"] = _agent_card_url_for(peer_id)
|
|
||||||
return d
|
|
||||||
|
|
||||||
async def tool_inbox_peek(limit: int = 10) -> str:
|
|
||||||
"""Return up to ``limit`` pending inbound messages without removing them."""
|
|
||||||
import inbox # local import — avoids a circular dep at module load
|
|
||||||
|
|
||||||
state = inbox.get_state()
|
|
||||||
if state is None:
|
|
||||||
return _INBOX_NOT_ENABLED_MSG
|
|
||||||
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
|
|
||||||
return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages])
|
|
||||||
|
|
||||||
|
|
||||||
async def tool_inbox_pop(activity_id: str) -> str:
|
|
||||||
"""Remove a message from the inbox queue by activity_id."""
|
|
||||||
import inbox
|
|
||||||
|
|
||||||
state = inbox.get_state()
|
|
||||||
if state is None:
|
|
||||||
return _INBOX_NOT_ENABLED_MSG
|
|
||||||
if not isinstance(activity_id, str) or not activity_id:
|
|
||||||
return "Error: activity_id is required."
|
|
||||||
removed = state.pop(activity_id)
|
|
||||||
if removed is None:
|
|
||||||
return json.dumps({"removed": False, "activity_id": activity_id})
|
|
||||||
return json.dumps({"removed": True, "activity_id": activity_id})
|
|
||||||
|
|
||||||
|
|
||||||
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
|
|
||||||
"""Block until a new message arrives or ``timeout_secs`` elapses.
|
|
||||||
|
|
||||||
Returns the head message non-destructively; the agent decides
|
|
||||||
whether to ``inbox_pop`` it after acting.
|
|
||||||
"""
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
import inbox
|
|
||||||
|
|
||||||
state = inbox.get_state()
|
|
||||||
if state is None:
|
|
||||||
return _INBOX_NOT_ENABLED_MSG
|
|
||||||
|
|
||||||
try:
|
|
||||||
timeout = float(timeout_secs)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
timeout = 60.0
|
|
||||||
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
|
|
||||||
# blocking longer than 5min wastes the prompt cache window for
|
|
||||||
# nothing useful. Operators who want longer can call repeatedly.
|
|
||||||
timeout = max(0.0, min(timeout, 300.0))
|
|
||||||
|
|
||||||
# The threading.Event-based wait would block the asyncio loop.
|
|
||||||
# Run it on the default executor so the MCP server can keep
|
|
||||||
# processing other JSON-RPC requests while we sleep.
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
message = await loop.run_in_executor(None, state.wait, timeout)
|
|
||||||
if message is None:
|
|
||||||
return json.dumps({"timeout": True, "timeout_secs": timeout})
|
|
||||||
return json.dumps(_enrich_inbound_for_agent(message.to_dict()))
|
|
||||||
|
|||||||
140
workspace/a2a_tools_inbox.py
Normal file
140
workspace/a2a_tools_inbox.py
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
"""Inbox tool handlers — single-concern slice of the a2a_tools surface.
|
||||||
|
|
||||||
|
Standalone-runtime path for inbound-message delivery (push-mode runtimes
|
||||||
|
get messages via the channel-tag synthesis in a2a_mcp_server). The
|
||||||
|
``InboxState`` singleton is set by ``mcp_cli`` before the MCP server
|
||||||
|
starts; in-container runtimes never call ``inbox.activate(...)`` so
|
||||||
|
``inbox.get_state()`` returns None and these tools surface an
|
||||||
|
informational error instead of raising.
|
||||||
|
|
||||||
|
When-to-use guidance for agents (mirrored in
|
||||||
|
``platform_tools/registry.py``):
|
||||||
|
- ``wait_for_message``: block until a new inbound message arrives, then
|
||||||
|
decide what to do with it; forms the loop ``wait → respond → wait``.
|
||||||
|
- ``inbox_peek``: inspect the queue non-destructively.
|
||||||
|
- ``inbox_pop``: remove a handled message by activity_id.
|
||||||
|
|
||||||
|
Extracted from ``a2a_tools.py`` in RFC #2873 iter 4e so the kitchen-sink
|
||||||
|
module shrinks to a back-compat shim. The extraction also makes the
|
||||||
|
``_enrich_inbound_for_agent`` helper unit-testable in isolation —
|
||||||
|
previously it was buried in ``a2a_tools`` and only exercised through
|
||||||
|
the inbox wrappers, leaving its peer-id-empty / cache-miss / registry-
|
||||||
|
unavailable branches under-covered.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
# Surfaced when the inbox subsystem is not initialised. Returned by the
|
||||||
|
# three inbox tool wrappers below so the agent gets a clear "this
|
||||||
|
# runtime delivers via push" message instead of a NameError.
|
||||||
|
_INBOX_NOT_ENABLED_MSG = (
|
||||||
|
"Error: inbox polling is not enabled in this runtime. The standalone "
|
||||||
|
"molecule-mcp wrapper activates it; in-container runtimes receive "
|
||||||
|
"messages via push delivery and do not need these tools."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _enrich_inbound_for_agent(d: dict) -> dict:
|
||||||
|
"""Add peer_name / peer_role / agent_card_url to a poll-path message.
|
||||||
|
|
||||||
|
The PUSH path (a2a_mcp_server._build_channel_notification) already
|
||||||
|
enriches the meta dict with these fields, so a Claude Code host
|
||||||
|
with channel-push sees them. The POLL path goes through
|
||||||
|
InboxMessage.to_dict, which is intentionally identity-free (the
|
||||||
|
storage layer doesn't know about the registry cache). Without this
|
||||||
|
helper, every non-Claude-Code MCP client that uses inbox_peek /
|
||||||
|
wait_for_message gets a plain message and the receiving agent
|
||||||
|
can't tell who's writing — breaking the contract documented in
|
||||||
|
a2a_mcp_server.py:303-345 ("In both paths the same fields apply").
|
||||||
|
|
||||||
|
Cache-first non-blocking enrichment (same shape as push): on cache
|
||||||
|
miss the helper returns the bare message; the next call within the
|
||||||
|
5-min TTL hits the warm cache. Failure to enrich is non-fatal —
|
||||||
|
the agent still gets text + peer_id + kind + activity_id, just
|
||||||
|
without the friendly identity.
|
||||||
|
"""
|
||||||
|
peer_id = d.get("peer_id") or ""
|
||||||
|
if not peer_id:
|
||||||
|
# canvas_user — no peer to enrich; helper returns the plain
|
||||||
|
# message unchanged so the canvas reply path still works.
|
||||||
|
return d
|
||||||
|
try:
|
||||||
|
from a2a_client import ( # local import — avoid module-load cycle
|
||||||
|
_agent_card_url_for,
|
||||||
|
enrich_peer_metadata_nonblocking,
|
||||||
|
)
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
# If a2a_client is unavailable (test harness, partial install),
|
||||||
|
# degrade gracefully — agent still gets the bare envelope.
|
||||||
|
return d
|
||||||
|
record = enrich_peer_metadata_nonblocking(peer_id)
|
||||||
|
if record is not None:
|
||||||
|
if name := record.get("name"):
|
||||||
|
d["peer_name"] = name
|
||||||
|
if role := record.get("role"):
|
||||||
|
d["peer_role"] = role
|
||||||
|
# agent_card_url is constructable from peer_id alone — surface it
|
||||||
|
# even when registry enrichment misses, so the receiving agent has
|
||||||
|
# a single endpoint to hit for the peer's full capability list.
|
||||||
|
d["agent_card_url"] = _agent_card_url_for(peer_id)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_inbox_peek(limit: int = 10) -> str:
|
||||||
|
"""Return up to ``limit`` pending inbound messages without removing them."""
|
||||||
|
import inbox # local import — avoids a circular dep at module load
|
||||||
|
|
||||||
|
state = inbox.get_state()
|
||||||
|
if state is None:
|
||||||
|
return _INBOX_NOT_ENABLED_MSG
|
||||||
|
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
|
||||||
|
return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages])
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_inbox_pop(activity_id: str) -> str:
|
||||||
|
"""Remove a message from the inbox queue by activity_id."""
|
||||||
|
import inbox
|
||||||
|
|
||||||
|
state = inbox.get_state()
|
||||||
|
if state is None:
|
||||||
|
return _INBOX_NOT_ENABLED_MSG
|
||||||
|
if not isinstance(activity_id, str) or not activity_id:
|
||||||
|
return "Error: activity_id is required."
|
||||||
|
removed = state.pop(activity_id)
|
||||||
|
if removed is None:
|
||||||
|
return json.dumps({"removed": False, "activity_id": activity_id})
|
||||||
|
return json.dumps({"removed": True, "activity_id": activity_id})
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
|
||||||
|
"""Block until a new message arrives or ``timeout_secs`` elapses.
|
||||||
|
|
||||||
|
Returns the head message non-destructively; the agent decides
|
||||||
|
whether to ``inbox_pop`` it after acting.
|
||||||
|
"""
|
||||||
|
import inbox
|
||||||
|
|
||||||
|
state = inbox.get_state()
|
||||||
|
if state is None:
|
||||||
|
return _INBOX_NOT_ENABLED_MSG
|
||||||
|
|
||||||
|
try:
|
||||||
|
timeout = float(timeout_secs)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
timeout = 60.0
|
||||||
|
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
|
||||||
|
# blocking longer than 5min wastes the prompt cache window for
|
||||||
|
# nothing useful. Operators who want longer can call repeatedly.
|
||||||
|
timeout = max(0.0, min(timeout, 300.0))
|
||||||
|
|
||||||
|
# The threading.Event-based wait would block the asyncio loop.
|
||||||
|
# Run it on the default executor so the MCP server can keep
|
||||||
|
# processing other JSON-RPC requests while we sleep.
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
message = await loop.run_in_executor(None, state.wait, timeout)
|
||||||
|
if message is None:
|
||||||
|
return json.dumps({"timeout": True, "timeout_secs": timeout})
|
||||||
|
return json.dumps(_enrich_inbound_for_agent(message.to_dict()))
|
||||||
181
workspace/tests/test_a2a_tools_inbox_split.py
Normal file
181
workspace/tests/test_a2a_tools_inbox_split.py
Normal file
@ -0,0 +1,181 @@
|
|||||||
|
"""Drift gate + import-contract tests for ``a2a_tools_inbox`` (RFC #2873 iter 4e).
|
||||||
|
|
||||||
|
The full behavior matrix for the three inbox tool wrappers lives in
|
||||||
|
``test_a2a_tools_inbox_wrappers.py`` (kept on the public ``a2a_tools``
|
||||||
|
module so the same tests pin both the alias and the underlying impl).
|
||||||
|
|
||||||
|
This file pins:
|
||||||
|
|
||||||
|
1. **Drift gate** — every previously-public symbol on ``a2a_tools``
|
||||||
|
(``tool_inbox_peek``, ``tool_inbox_pop``, ``tool_wait_for_message``,
|
||||||
|
``_enrich_inbound_for_agent``, ``_INBOX_NOT_ENABLED_MSG``) is the
|
||||||
|
EXACT same object as ``a2a_tools_inbox.foo``. Refactor wrapping
|
||||||
|
silently loses existing test coverage; this gate makes that drift
|
||||||
|
fail fast.
|
||||||
|
2. **Import contract** — ``a2a_tools_inbox`` does NOT pull in
|
||||||
|
``a2a_tools`` at module-load time (the layered architecture: it
|
||||||
|
depends only on stdlib + a lazy import of ``inbox`` + a lazy
|
||||||
|
import of ``a2a_client``, never the kitchen-sink module that
|
||||||
|
re-exports it).
|
||||||
|
3. **_enrich_inbound_for_agent** branches that the wrapper tests
|
||||||
|
can't easily reach: peer_id-empty (canvas_user) returns the
|
||||||
|
dict unchanged; a2a_client unavailable degrades gracefully.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _require_workspace_id(monkeypatch):
|
||||||
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||||
|
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Drift gate ==============
|
||||||
|
|
||||||
|
class TestBackCompatAliases:
|
||||||
|
def test_tool_inbox_peek_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_inbox
|
||||||
|
assert a2a_tools.tool_inbox_peek is a2a_tools_inbox.tool_inbox_peek
|
||||||
|
|
||||||
|
def test_tool_inbox_pop_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_inbox
|
||||||
|
assert a2a_tools.tool_inbox_pop is a2a_tools_inbox.tool_inbox_pop
|
||||||
|
|
||||||
|
def test_tool_wait_for_message_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_inbox
|
||||||
|
assert (
|
||||||
|
a2a_tools.tool_wait_for_message is a2a_tools_inbox.tool_wait_for_message
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_enrich_helper_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_inbox
|
||||||
|
assert (
|
||||||
|
a2a_tools._enrich_inbound_for_agent
|
||||||
|
is a2a_tools_inbox._enrich_inbound_for_agent
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_inbox_not_enabled_msg_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_inbox
|
||||||
|
assert (
|
||||||
|
a2a_tools._INBOX_NOT_ENABLED_MSG is a2a_tools_inbox._INBOX_NOT_ENABLED_MSG
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Import contract ==============
|
||||||
|
|
||||||
|
class TestImportContract:
|
||||||
|
def test_inbox_module_does_not_import_a2a_tools_eagerly(self):
|
||||||
|
# Force a fresh load of a2a_tools_inbox without a2a_tools in sight.
|
||||||
|
for k in [k for k in list(sys.modules) if k in (
|
||||||
|
"a2a_tools_inbox", "a2a_tools",
|
||||||
|
)]:
|
||||||
|
sys.modules.pop(k, None)
|
||||||
|
import a2a_tools_inbox # noqa: F401 — load only
|
||||||
|
|
||||||
|
# a2a_tools_inbox MUST NOT have caused a2a_tools to load. The
|
||||||
|
# extracted module sits BELOW the kitchen-sink in the layering;
|
||||||
|
# the dependency arrow points the other direction.
|
||||||
|
assert "a2a_tools" not in sys.modules, (
|
||||||
|
"a2a_tools_inbox eagerly imported a2a_tools — the kitchen-sink "
|
||||||
|
"module must not be a load-time dependency of its slices."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ============== _enrich_inbound_for_agent branches ==============
|
||||||
|
|
||||||
|
class TestEnrichInboundForAgent:
|
||||||
|
def test_canvas_user_returns_dict_unchanged(self):
|
||||||
|
# peer_id empty → canvas_user → no enrichment, no a2a_client touch.
|
||||||
|
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||||
|
|
||||||
|
msg = {"activity_id": "a-1", "kind": "canvas_user", "peer_id": ""}
|
||||||
|
result = _enrich_inbound_for_agent(msg)
|
||||||
|
assert result is msg # same dict, mutated in place if at all
|
||||||
|
assert "peer_name" not in result
|
||||||
|
assert "peer_role" not in result
|
||||||
|
assert "agent_card_url" not in result
|
||||||
|
|
||||||
|
def test_missing_peer_id_key_returns_unchanged(self):
|
||||||
|
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||||
|
|
||||||
|
msg = {"activity_id": "a-2", "kind": "canvas_user"} # no peer_id key
|
||||||
|
result = _enrich_inbound_for_agent(msg)
|
||||||
|
assert result is msg
|
||||||
|
assert "agent_card_url" not in result
|
||||||
|
|
||||||
|
def test_a2a_client_unavailable_degrades_gracefully(self, monkeypatch):
|
||||||
|
# Simulate a2a_client import failing (test harness, partial
|
||||||
|
# install). The helper must return the bare envelope, not raise.
|
||||||
|
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||||
|
|
||||||
|
# Force an ImportError by poisoning sys.modules.
|
||||||
|
import builtins
|
||||||
|
real_import = builtins.__import__
|
||||||
|
|
||||||
|
def fake_import(name, *args, **kwargs):
|
||||||
|
if name == "a2a_client":
|
||||||
|
raise ImportError("simulated a2a_client unavailable")
|
||||||
|
return real_import(name, *args, **kwargs)
|
||||||
|
|
||||||
|
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||||
|
|
||||||
|
msg = {"activity_id": "a-3", "kind": "peer_agent", "peer_id": "ws-x"}
|
||||||
|
result = _enrich_inbound_for_agent(msg)
|
||||||
|
# Bare envelope back — no peer_name, no agent_card_url. Crucially
|
||||||
|
# the helper did NOT raise, so the inbox tool surfaces the message
|
||||||
|
# to the agent even when the registry is unreachable.
|
||||||
|
assert result is msg
|
||||||
|
assert "peer_name" not in result
|
||||||
|
assert "agent_card_url" not in result
|
||||||
|
|
||||||
|
def test_registry_record_populates_peer_name_and_role(self, monkeypatch):
|
||||||
|
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||||
|
|
||||||
|
# Stub out the lazy-imported a2a_client functions.
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
fake_a2a_client = types.SimpleNamespace(
|
||||||
|
_agent_card_url_for=lambda pid: f"http://test/agent/{pid}",
|
||||||
|
enrich_peer_metadata_nonblocking=lambda pid: {
|
||||||
|
"name": "PeerOne",
|
||||||
|
"role": "worker",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client)
|
||||||
|
|
||||||
|
msg = {"activity_id": "a-4", "kind": "peer_agent", "peer_id": "ws-1"}
|
||||||
|
result = _enrich_inbound_for_agent(msg)
|
||||||
|
assert result["peer_name"] == "PeerOne"
|
||||||
|
assert result["peer_role"] == "worker"
|
||||||
|
assert result["agent_card_url"] == "http://test/agent/ws-1"
|
||||||
|
|
||||||
|
def test_registry_miss_keeps_agent_card_url(self, monkeypatch):
|
||||||
|
# On registry cache miss the helper still surfaces agent_card_url
|
||||||
|
# because it's constructable from peer_id alone — preserves the
|
||||||
|
# contract that the receiving agent always has somewhere to
|
||||||
|
# fetch the peer's full capability list.
|
||||||
|
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
fake_a2a_client = types.SimpleNamespace(
|
||||||
|
_agent_card_url_for=lambda pid: f"http://test/agent/{pid}",
|
||||||
|
enrich_peer_metadata_nonblocking=lambda pid: None, # cache miss
|
||||||
|
)
|
||||||
|
monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client)
|
||||||
|
|
||||||
|
msg = {"activity_id": "a-5", "kind": "peer_agent", "peer_id": "ws-2"}
|
||||||
|
result = _enrich_inbound_for_agent(msg)
|
||||||
|
assert "peer_name" not in result
|
||||||
|
assert "peer_role" not in result
|
||||||
|
assert result["agent_card_url"] == "http://test/agent/ws-2"
|
||||||
Loading…
Reference in New Issue
Block a user