feat(requests): deliver respond/More-Info outcomes to the requester agent as real A2A turns #2614

Merged
devops-engineer merged 4 commits from feat/2606-respond-notifies-requester into main 2026-06-12 09:54:53 +00:00
3 changed files with 351 additions and 1 deletions
@@ -21,11 +21,13 @@ package handlers
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"github.com/google/uuid"
)
// ErrRequestNotFound is returned by Get/Respond/RequestInfo/Cancel/AddMessage
@@ -146,6 +148,39 @@ func broadcastTarget(requesterType, requesterID, recipientType, recipientID stri
return ""
}
// requestNotifyEnqueue is the a2a-queue enqueue used to deliver
// request-outcome notifications to a REQUESTER agent as a real inbound turn.
// Package-level var (default: the real EnqueueA2A) so tests can intercept —
// mirrors RequestNudgeSweeper's enqueueFunc injection.
var requestNotifyEnqueue enqueueFunc = EnqueueA2A
// notifyRequesterAgent enqueues a message/send A2A turn to the requester
// agent. Used on terminal responses and recipient-authored More-Info
// messages (CTO 2026-06-11): a human clicking Done/Reject/Approve — or
// asking for clarification — must reach the agent that raised the request
// as an actual turn, not only as a structure event the agent never reads.
// Best-effort: an enqueue failure is logged, never surfaced — the durable
// truth is the requests row, and check_requests remains the pull path.
func (s *RequestStore) notifyRequesterAgent(ctx context.Context, req RequestRow, idemKey, text string) {
body, err := json.Marshal(map[string]interface{}{
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"messageId": idemKey + "-" + uuid.New().String(),
"parts": []map[string]interface{}{{"kind": "text", "text": text}},
},
},
})
if err != nil {
log.Printf("request: build requester notification for %s failed: %v", req.ID, err)
return
}
if _, _, err := requestNotifyEnqueue(ctx, req.RequesterID, "", PriorityInfo, body, "message/send", idemKey, nil); err != nil {
log.Printf("request: enqueue requester notification for %s -> %s failed: %v", req.ID, req.RequesterID, err)
}
}
// Create inserts a new pending request and broadcasts REQUEST_CREATED (anchored
// on the recipient agent if any, so an agent recipient's inbox is signalled).
// Returns the new request id. Validates kind + party enums up front.
@@ -435,6 +470,24 @@ func (s *RequestStore) Respond(ctx context.Context, id, action, responderType, r
}
}
// Deliver the outcome to the requester AGENT as a real inbound turn
// (core#2606 follow-up, CTO 2026-06-11). The REQUEST_RESPONDED event
// above only feeds the canvas/event stream; an agent waiting on an
// approval otherwise learns the decision only if something prompts it
// to call check_requests. Skip self-notification (agent responded to
// its own... impossible per the self-response guard, but cheap belt).
if req.RequesterType == "agent" && req.RequesterID != "" &&
(responderType != "agent" || responderID != req.RequesterID) {
by := "the user"
if responderType == "agent" {
by = "agent " + responderID
}
s.notifyRequesterAgent(ctx, req,
"request-responded:"+req.ID,
fmt.Sprintf("Your %s request %q (id %s) was %s by %s. Use get_request for the thread or check_requests for all your outcomes.",
req.Kind, req.Title, req.ID, status, by))
}
req.Status = status
req.ResponderType = &responderType
req.ResponderID = &responderID
@@ -513,6 +566,18 @@ func (s *RequestStore) AddMessage(ctx context.Context, id, authorType, authorID,
}
}
// More-Info from the recipient must reach a requester AGENT as a real
// turn (same rationale as the Respond notification — CTO 2026-06-11).
// Keyed per message so a multi-round clarification thread delivers each
// ask; the requester replies with add_request_message.
if authorType == req.RecipientType && authorID == req.RecipientID &&
req.RequesterType == "agent" && req.RequesterID != "" {
s.notifyRequesterAgent(ctx, req,
"request-message:"+messageID,
fmt.Sprintf("More info requested on your %s request %q (id %s): %s\nReply with add_request_message.",
req.Kind, req.Title, req.ID, body))
}
return messageID, nil
}
+36 -1
View File
@@ -322,6 +322,13 @@ func (h *RequestsHandler) Respond(c *gin.Context) {
// More-Info thread. When the author is the recipient, the request flips to
// info_requested.
//
// Workspace-token auth path (/workspaces/:id/requests/:requestId/messages):
// the caller must be a participant (requester or recipient), and the author
// identity is BOUND to the authenticated workspace — body author_type/author_id
// are ignored. This prevents a workspace-token holder from spoofing another
// party or flipping an unrelated request to info_requested (core#2542 /
// core#2606).
//
// @Summary Add a message to a request's More-Info thread
// @Tags requests
// @Accept json
@@ -330,6 +337,7 @@ func (h *RequestsHandler) Respond(c *gin.Context) {
// @Param body body AddRequestMessageBody true "Message"
// @Success 201 {object} RequestMutationResponse
// @Failure 400 {object} ErrorResponse
// @Failure 403 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /requests/{requestId}/messages [post]
@@ -344,7 +352,34 @@ func (h *RequestsHandler) AddMessage(c *gin.Context) {
return
}
messageID, err := h.store().AddMessage(ctx, requestID, body.AuthorType, body.AuthorID, body.Body)
authorType := body.AuthorType
authorID := body.AuthorID
// Workspace-token auth path: bind author to the authenticated workspace and
// verify the caller is a participant.
if workspaceID := c.Param("id"); workspaceID != "" {
authorType = "agent"
authorID = workspaceID
reqRow, err := h.store().Get(ctx, requestID)
if err != nil {
if errors.Is(err, ErrRequestNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
return
}
log.Printf("AddMessage authz error request=%s: %v", requestID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add message"})
return
}
isParty := (reqRow.RequesterType == "agent" && reqRow.RequesterID == workspaceID) ||
(reqRow.RecipientType == "agent" && reqRow.RecipientID == workspaceID)
if !isParty {
c.JSON(http.StatusForbidden, gin.H{"error": "not a participant"})
return
}
}
messageID, err := h.store().AddMessage(ctx, requestID, authorType, authorID, body.Body)
if err != nil {
if errors.Is(err, ErrRequestNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
@@ -2,10 +2,13 @@ package handlers
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
@@ -648,6 +651,108 @@ func TestRequests_AddMessage_RequesterDoesNotFlip(t *testing.T) {
}
}
// ---------- AddMessage workspace-token authz (core#2542 / core#2606) ----------
func TestRequests_AddMessage_AgentPath_Recipient_BindsToCaller(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// URL workspace ws-2 is the recipient. The body tries to spoof ws-EVIL.
// The handler's authz Get AND the store's own Get both fetch the row.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
// Handler must bind author to ws-2, not the spoofed body value.
mock.ExpectQuery("INSERT INTO request_messages").
WithArgs("req-1", "agent", "ws-2", "which file?").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-1"))
mock.ExpectExec("UPDATE requests SET status = 'info_requested'").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "requestId", Value: "req-1"},
{Key: "id", Value: "ws-2"},
}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"which file?","author_type":"agent","author_id":"ws-EVIL"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.AddMessage(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_AddMessage_AgentPath_Requester_BindsToCaller(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// URL workspace ws-1 is the requester. Body author_id is ignored.
// Handler authz Get + store Get → two fetches.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested"))
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested"))
mock.ExpectQuery("INSERT INTO request_messages").
WithArgs("req-1", "agent", "ws-1", "here is the file").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-2"))
// No status flip — requester is not the recipient.
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "requestId", Value: "req-1"},
{Key: "id", Value: "ws-1"},
}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"here is the file","author_type":"agent","author_id":"ws-EVIL"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.AddMessage(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_AddMessage_AgentPath_NonParticipant_403(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// URL workspace ws-3 is neither requester nor recipient.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "requestId", Value: "req-1"},
{Key: "id", Value: "ws-3"},
}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"pwned","author_type":"agent","author_id":"ws-2"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.AddMessage(c)
if w.Code != http.StatusForbidden {
t.Fatalf("expected 403 for non-participant, got %d: %s", w.Code, w.Body.String())
}
}
// ---------- Cancel ----------
func TestRequests_Cancel_Success(t *testing.T) {
@@ -799,3 +904,148 @@ func TestRequests_ListPending_InvalidKind(t *testing.T) {
t.Errorf("expected 400 for invalid kind, got %d", w.Code)
}
}
// ---------- Requester notification on respond / more-info (CTO 2026-06-11) ----------
// interceptRequestNotify swaps the package-level enqueue for the test and
// returns a capture slice + restore func.
func interceptRequestNotify(t *testing.T) *[]map[string]string {
t.Helper()
captured := &[]map[string]string{}
prev := requestNotifyEnqueue
requestNotifyEnqueue = func(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idemKey string, expiresAt *time.Time) (string, int, error) {
*captured = append(*captured, map[string]string{
"workspace_id": workspaceID,
"method": method,
"idem": idemKey,
"body": string(body),
})
return "q-1", 1, nil
}
t.Cleanup(func() { requestNotifyEnqueue = prev })
return captured
}
// TestRequests_Respond_NotifiesRequesterAgent: a terminal response on a
// request raised by an AGENT must enqueue a message/send turn to that agent
// — the user clicking Done/Approve must actually reach the requester.
func TestRequests_Respond_NotifiesRequesterAgent(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
captured := interceptRequestNotify(t)
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "approval", "ws-agent-1", "user", "", "pending"))
mock.ExpectExec("UPDATE requests").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"approved","responder_id":"u-1"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Respond(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if len(*captured) != 1 {
t.Fatalf("expected 1 requester notification, got %d", len(*captured))
}
n := (*captured)[0]
if n["workspace_id"] != "ws-agent-1" || n["method"] != "message/send" {
t.Errorf("notification misrouted: %+v", n)
}
if n["idem"] != "request-responded:req-1" {
t.Errorf("idempotency key = %q", n["idem"])
}
if !strings.Contains(n["body"], "approved") || !strings.Contains(n["body"], "Some title") {
t.Errorf("notification body missing outcome/title: %s", n["body"])
}
}
// TestRequests_AddMessage_MoreInfo_NotifiesRequesterAgent: a recipient-authored
// More-Info message must reach the requester agent as a turn carrying the ask.
func TestRequests_AddMessage_MoreInfo_NotifiesRequesterAgent(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
captured := interceptRequestNotify(t)
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-2").
WillReturnRows(oneRequestRow("req-2", "task", "ws-agent-1", "user", "u-1", "pending"))
mock.ExpectQuery("INSERT INTO request_messages").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-9"))
mock.ExpectExec("UPDATE requests SET status = 'info_requested'").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-2"}}
c.Request = httptest.NewRequest("POST", "/", bytesNewBufferStringHelper(`{"author_type":"user","author_id":"u-1","body":"which environment do you mean?"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.AddMessage(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
if len(*captured) != 1 {
t.Fatalf("expected 1 requester notification, got %d", len(*captured))
}
n := (*captured)[0]
if n["workspace_id"] != "ws-agent-1" || n["idem"] != "request-message:msg-9" {
t.Errorf("notification misrouted: %+v", n)
}
if !strings.Contains(n["body"], "which environment do you mean?") {
t.Errorf("notification body missing ask: %s", n["body"])
}
}
// TestRequests_Respond_NoNotifyForUserRequester: a request raised by a USER
// must not enqueue an agent notification on respond.
func TestRequests_Respond_NoNotifyForUserRequester(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
captured := interceptRequestNotify(t)
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-3").
WillReturnRows(sqlmock.NewRows(requestColumnNames).AddRow(
"req-3", "task", "user", "u-1", nil,
"agent", "ws-agent-2", "Some title", nil, "pending",
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
))
mock.ExpectExec("UPDATE requests").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-3"}}
c.Request = httptest.NewRequest("POST", "/", bytesNewBufferStringHelper(`{"action":"done","responder_type":"agent","responder_id":"ws-agent-2"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Respond(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if len(*captured) != 0 {
t.Fatalf("expected no notification for user requester, got %d", len(*captured))
}
}
// bytesNewBufferStringHelper keeps the new tests free of an extra import
// alias; identical to bytes.NewBufferString.
func bytesNewBufferStringHelper(s string) *bytes.Buffer { return bytes.NewBufferString(s) }