feat(requests): deliver respond/More-Info outcomes to the requester agent as real A2A turns #2614
@@ -21,11 +21,13 @@ package handlers
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// ErrRequestNotFound is returned by Get/Respond/RequestInfo/Cancel/AddMessage
|
||||
@@ -146,6 +148,39 @@ func broadcastTarget(requesterType, requesterID, recipientType, recipientID stri
|
||||
return ""
|
||||
}
|
||||
|
||||
// requestNotifyEnqueue is the a2a-queue enqueue used to deliver
|
||||
// request-outcome notifications to a REQUESTER agent as a real inbound turn.
|
||||
// Package-level var (default: the real EnqueueA2A) so tests can intercept —
|
||||
// mirrors RequestNudgeSweeper's enqueueFunc injection.
|
||||
var requestNotifyEnqueue enqueueFunc = EnqueueA2A
|
||||
|
||||
// notifyRequesterAgent enqueues a message/send A2A turn to the requester
|
||||
// agent. Used on terminal responses and recipient-authored More-Info
|
||||
// messages (CTO 2026-06-11): a human clicking Done/Reject/Approve — or
|
||||
// asking for clarification — must reach the agent that raised the request
|
||||
// as an actual turn, not only as a structure event the agent never reads.
|
||||
// Best-effort: an enqueue failure is logged, never surfaced — the durable
|
||||
// truth is the requests row, and check_requests remains the pull path.
|
||||
func (s *RequestStore) notifyRequesterAgent(ctx context.Context, req RequestRow, idemKey, text string) {
|
||||
body, err := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": idemKey + "-" + uuid.New().String(),
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": text}},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("request: build requester notification for %s failed: %v", req.ID, err)
|
||||
return
|
||||
}
|
||||
if _, _, err := requestNotifyEnqueue(ctx, req.RequesterID, "", PriorityInfo, body, "message/send", idemKey, nil); err != nil {
|
||||
log.Printf("request: enqueue requester notification for %s -> %s failed: %v", req.ID, req.RequesterID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create inserts a new pending request and broadcasts REQUEST_CREATED (anchored
|
||||
// on the recipient agent if any, so an agent recipient's inbox is signalled).
|
||||
// Returns the new request id. Validates kind + party enums up front.
|
||||
@@ -435,6 +470,24 @@ func (s *RequestStore) Respond(ctx context.Context, id, action, responderType, r
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver the outcome to the requester AGENT as a real inbound turn
|
||||
// (core#2606 follow-up, CTO 2026-06-11). The REQUEST_RESPONDED event
|
||||
// above only feeds the canvas/event stream; an agent waiting on an
|
||||
// approval otherwise learns the decision only if something prompts it
|
||||
// to call check_requests. Skip self-notification (agent responded to
|
||||
// its own... impossible per the self-response guard, but cheap belt).
|
||||
if req.RequesterType == "agent" && req.RequesterID != "" &&
|
||||
(responderType != "agent" || responderID != req.RequesterID) {
|
||||
by := "the user"
|
||||
if responderType == "agent" {
|
||||
by = "agent " + responderID
|
||||
}
|
||||
s.notifyRequesterAgent(ctx, req,
|
||||
"request-responded:"+req.ID,
|
||||
fmt.Sprintf("Your %s request %q (id %s) was %s by %s. Use get_request for the thread or check_requests for all your outcomes.",
|
||||
req.Kind, req.Title, req.ID, status, by))
|
||||
}
|
||||
|
||||
req.Status = status
|
||||
req.ResponderType = &responderType
|
||||
req.ResponderID = &responderID
|
||||
@@ -513,6 +566,18 @@ func (s *RequestStore) AddMessage(ctx context.Context, id, authorType, authorID,
|
||||
}
|
||||
}
|
||||
|
||||
// More-Info from the recipient must reach a requester AGENT as a real
|
||||
// turn (same rationale as the Respond notification — CTO 2026-06-11).
|
||||
// Keyed per message so a multi-round clarification thread delivers each
|
||||
// ask; the requester replies with add_request_message.
|
||||
if authorType == req.RecipientType && authorID == req.RecipientID &&
|
||||
req.RequesterType == "agent" && req.RequesterID != "" {
|
||||
s.notifyRequesterAgent(ctx, req,
|
||||
"request-message:"+messageID,
|
||||
fmt.Sprintf("More info requested on your %s request %q (id %s): %s\nReply with add_request_message.",
|
||||
req.Kind, req.Title, req.ID, body))
|
||||
}
|
||||
|
||||
return messageID, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -322,6 +322,13 @@ func (h *RequestsHandler) Respond(c *gin.Context) {
|
||||
// More-Info thread. When the author is the recipient, the request flips to
|
||||
// info_requested.
|
||||
//
|
||||
// Workspace-token auth path (/workspaces/:id/requests/:requestId/messages):
|
||||
// the caller must be a participant (requester or recipient), and the author
|
||||
// identity is BOUND to the authenticated workspace — body author_type/author_id
|
||||
// are ignored. This prevents a workspace-token holder from spoofing another
|
||||
// party or flipping an unrelated request to info_requested (core#2542 /
|
||||
// core#2606).
|
||||
//
|
||||
// @Summary Add a message to a request's More-Info thread
|
||||
// @Tags requests
|
||||
// @Accept json
|
||||
@@ -330,6 +337,7 @@ func (h *RequestsHandler) Respond(c *gin.Context) {
|
||||
// @Param body body AddRequestMessageBody true "Message"
|
||||
// @Success 201 {object} RequestMutationResponse
|
||||
// @Failure 400 {object} ErrorResponse
|
||||
// @Failure 403 {object} ErrorResponse
|
||||
// @Failure 404 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /requests/{requestId}/messages [post]
|
||||
@@ -344,7 +352,34 @@ func (h *RequestsHandler) AddMessage(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
messageID, err := h.store().AddMessage(ctx, requestID, body.AuthorType, body.AuthorID, body.Body)
|
||||
authorType := body.AuthorType
|
||||
authorID := body.AuthorID
|
||||
|
||||
// Workspace-token auth path: bind author to the authenticated workspace and
|
||||
// verify the caller is a participant.
|
||||
if workspaceID := c.Param("id"); workspaceID != "" {
|
||||
authorType = "agent"
|
||||
authorID = workspaceID
|
||||
|
||||
reqRow, err := h.store().Get(ctx, requestID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrRequestNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
|
||||
return
|
||||
}
|
||||
log.Printf("AddMessage authz error request=%s: %v", requestID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add message"})
|
||||
return
|
||||
}
|
||||
isParty := (reqRow.RequesterType == "agent" && reqRow.RequesterID == workspaceID) ||
|
||||
(reqRow.RecipientType == "agent" && reqRow.RecipientID == workspaceID)
|
||||
if !isParty {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "not a participant"})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
messageID, err := h.store().AddMessage(ctx, requestID, authorType, authorID, body.Body)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrRequestNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
|
||||
|
||||
@@ -2,10 +2,13 @@ package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -648,6 +651,108 @@ func TestRequests_AddMessage_RequesterDoesNotFlip(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- AddMessage workspace-token authz (core#2542 / core#2606) ----------
|
||||
|
||||
func TestRequests_AddMessage_AgentPath_Recipient_BindsToCaller(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// URL workspace ws-2 is the recipient. The body tries to spoof ws-EVIL.
|
||||
// The handler's authz Get AND the store's own Get both fetch the row.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
|
||||
// Handler must bind author to ws-2, not the spoofed body value.
|
||||
mock.ExpectQuery("INSERT INTO request_messages").
|
||||
WithArgs("req-1", "agent", "ws-2", "which file?").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-1"))
|
||||
mock.ExpectExec("UPDATE requests SET status = 'info_requested'").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "requestId", Value: "req-1"},
|
||||
{Key: "id", Value: "ws-2"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"which file?","author_type":"agent","author_id":"ws-EVIL"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.AddMessage(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_AddMessage_AgentPath_Requester_BindsToCaller(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// URL workspace ws-1 is the requester. Body author_id is ignored.
|
||||
// Handler authz Get + store Get → two fetches.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested"))
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested"))
|
||||
mock.ExpectQuery("INSERT INTO request_messages").
|
||||
WithArgs("req-1", "agent", "ws-1", "here is the file").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-2"))
|
||||
// No status flip — requester is not the recipient.
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "requestId", Value: "req-1"},
|
||||
{Key: "id", Value: "ws-1"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"here is the file","author_type":"agent","author_id":"ws-EVIL"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.AddMessage(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_AddMessage_AgentPath_NonParticipant_403(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// URL workspace ws-3 is neither requester nor recipient.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "requestId", Value: "req-1"},
|
||||
{Key: "id", Value: "ws-3"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"pwned","author_type":"agent","author_id":"ws-2"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.AddMessage(c)
|
||||
|
||||
if w.Code != http.StatusForbidden {
|
||||
t.Fatalf("expected 403 for non-participant, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Cancel ----------
|
||||
|
||||
func TestRequests_Cancel_Success(t *testing.T) {
|
||||
@@ -799,3 +904,148 @@ func TestRequests_ListPending_InvalidKind(t *testing.T) {
|
||||
t.Errorf("expected 400 for invalid kind, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Requester notification on respond / more-info (CTO 2026-06-11) ----------
|
||||
|
||||
// interceptRequestNotify swaps the package-level enqueue for the test and
|
||||
// returns a capture slice + restore func.
|
||||
func interceptRequestNotify(t *testing.T) *[]map[string]string {
|
||||
t.Helper()
|
||||
captured := &[]map[string]string{}
|
||||
prev := requestNotifyEnqueue
|
||||
requestNotifyEnqueue = func(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idemKey string, expiresAt *time.Time) (string, int, error) {
|
||||
*captured = append(*captured, map[string]string{
|
||||
"workspace_id": workspaceID,
|
||||
"method": method,
|
||||
"idem": idemKey,
|
||||
"body": string(body),
|
||||
})
|
||||
return "q-1", 1, nil
|
||||
}
|
||||
t.Cleanup(func() { requestNotifyEnqueue = prev })
|
||||
return captured
|
||||
}
|
||||
|
||||
// TestRequests_Respond_NotifiesRequesterAgent: a terminal response on a
|
||||
// request raised by an AGENT must enqueue a message/send turn to that agent
|
||||
// — the user clicking Done/Approve must actually reach the requester.
|
||||
func TestRequests_Respond_NotifiesRequesterAgent(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
captured := interceptRequestNotify(t)
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "approval", "ws-agent-1", "user", "", "pending"))
|
||||
mock.ExpectExec("UPDATE requests").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"approved","responder_id":"u-1"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Respond(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if len(*captured) != 1 {
|
||||
t.Fatalf("expected 1 requester notification, got %d", len(*captured))
|
||||
}
|
||||
n := (*captured)[0]
|
||||
if n["workspace_id"] != "ws-agent-1" || n["method"] != "message/send" {
|
||||
t.Errorf("notification misrouted: %+v", n)
|
||||
}
|
||||
if n["idem"] != "request-responded:req-1" {
|
||||
t.Errorf("idempotency key = %q", n["idem"])
|
||||
}
|
||||
if !strings.Contains(n["body"], "approved") || !strings.Contains(n["body"], "Some title") {
|
||||
t.Errorf("notification body missing outcome/title: %s", n["body"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestRequests_AddMessage_MoreInfo_NotifiesRequesterAgent: a recipient-authored
|
||||
// More-Info message must reach the requester agent as a turn carrying the ask.
|
||||
func TestRequests_AddMessage_MoreInfo_NotifiesRequesterAgent(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
captured := interceptRequestNotify(t)
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-2").
|
||||
WillReturnRows(oneRequestRow("req-2", "task", "ws-agent-1", "user", "u-1", "pending"))
|
||||
mock.ExpectQuery("INSERT INTO request_messages").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-9"))
|
||||
mock.ExpectExec("UPDATE requests SET status = 'info_requested'").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-2"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytesNewBufferStringHelper(`{"author_type":"user","author_id":"u-1","body":"which environment do you mean?"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.AddMessage(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if len(*captured) != 1 {
|
||||
t.Fatalf("expected 1 requester notification, got %d", len(*captured))
|
||||
}
|
||||
n := (*captured)[0]
|
||||
if n["workspace_id"] != "ws-agent-1" || n["idem"] != "request-message:msg-9" {
|
||||
t.Errorf("notification misrouted: %+v", n)
|
||||
}
|
||||
if !strings.Contains(n["body"], "which environment do you mean?") {
|
||||
t.Errorf("notification body missing ask: %s", n["body"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestRequests_Respond_NoNotifyForUserRequester: a request raised by a USER
|
||||
// must not enqueue an agent notification on respond.
|
||||
func TestRequests_Respond_NoNotifyForUserRequester(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
captured := interceptRequestNotify(t)
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-3").
|
||||
WillReturnRows(sqlmock.NewRows(requestColumnNames).AddRow(
|
||||
"req-3", "task", "user", "u-1", nil,
|
||||
"agent", "ws-agent-2", "Some title", nil, "pending",
|
||||
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
|
||||
))
|
||||
mock.ExpectExec("UPDATE requests").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-3"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytesNewBufferStringHelper(`{"action":"done","responder_type":"agent","responder_id":"ws-agent-2"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Respond(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if len(*captured) != 0 {
|
||||
t.Fatalf("expected no notification for user requester, got %d", len(*captured))
|
||||
}
|
||||
}
|
||||
|
||||
// bytesNewBufferStringHelper keeps the new tests free of an extra import
|
||||
// alias; identical to bytes.NewBufferString.
|
||||
func bytesNewBufferStringHelper(s string) *bytes.Buffer { return bytes.NewBufferString(s) }
|
||||
|
||||
Reference in New Issue
Block a user