diff --git a/workspace-server/internal/handlers/request_store.go b/workspace-server/internal/handlers/request_store.go index 3712470d..586326ec 100644 --- a/workspace-server/internal/handlers/request_store.go +++ b/workspace-server/internal/handlers/request_store.go @@ -21,11 +21,13 @@ package handlers import ( "context" "database/sql" + "encoding/json" "errors" "fmt" "log" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events" + "github.com/google/uuid" ) // ErrRequestNotFound is returned by Get/Respond/RequestInfo/Cancel/AddMessage @@ -146,6 +148,39 @@ func broadcastTarget(requesterType, requesterID, recipientType, recipientID stri return "" } +// requestNotifyEnqueue is the a2a-queue enqueue used to deliver +// request-outcome notifications to a REQUESTER agent as a real inbound turn. +// Package-level var (default: the real EnqueueA2A) so tests can intercept — +// mirrors RequestNudgeSweeper's enqueueFunc injection. +var requestNotifyEnqueue enqueueFunc = EnqueueA2A + +// notifyRequesterAgent enqueues a message/send A2A turn to the requester +// agent. Used on terminal responses and recipient-authored More-Info +// messages (CTO 2026-06-11): a human clicking Done/Reject/Approve — or +// asking for clarification — must reach the agent that raised the request +// as an actual turn, not only as a structure event the agent never reads. +// Best-effort: an enqueue failure is logged, never surfaced — the durable +// truth is the requests row, and check_requests remains the pull path. +func (s *RequestStore) notifyRequesterAgent(ctx context.Context, req RequestRow, idemKey, text string) { + body, err := json.Marshal(map[string]interface{}{ + "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "messageId": idemKey + "-" + uuid.New().String(), + "parts": []map[string]interface{}{{"kind": "text", "text": text}}, + }, + }, + }) + if err != nil { + log.Printf("request: build requester notification for %s failed: %v", req.ID, err) + return + } + if _, _, err := requestNotifyEnqueue(ctx, req.RequesterID, "", PriorityInfo, body, "message/send", idemKey, nil); err != nil { + log.Printf("request: enqueue requester notification for %s -> %s failed: %v", req.ID, req.RequesterID, err) + } +} + // Create inserts a new pending request and broadcasts REQUEST_CREATED (anchored // on the recipient agent if any, so an agent recipient's inbox is signalled). // Returns the new request id. Validates kind + party enums up front. @@ -435,6 +470,24 @@ func (s *RequestStore) Respond(ctx context.Context, id, action, responderType, r } } + // Deliver the outcome to the requester AGENT as a real inbound turn + // (core#2606 follow-up, CTO 2026-06-11). The REQUEST_RESPONDED event + // above only feeds the canvas/event stream; an agent waiting on an + // approval otherwise learns the decision only if something prompts it + // to call check_requests. Skip self-notification (agent responded to + // its own... impossible per the self-response guard, but cheap belt). + if req.RequesterType == "agent" && req.RequesterID != "" && + (responderType != "agent" || responderID != req.RequesterID) { + by := "the user" + if responderType == "agent" { + by = "agent " + responderID + } + s.notifyRequesterAgent(ctx, req, + "request-responded:"+req.ID, + fmt.Sprintf("Your %s request %q (id %s) was %s by %s. Use get_request for the thread or check_requests for all your outcomes.", + req.Kind, req.Title, req.ID, status, by)) + } + req.Status = status req.ResponderType = &responderType req.ResponderID = &responderID @@ -513,6 +566,18 @@ func (s *RequestStore) AddMessage(ctx context.Context, id, authorType, authorID, } } + // More-Info from the recipient must reach a requester AGENT as a real + // turn (same rationale as the Respond notification — CTO 2026-06-11). + // Keyed per message so a multi-round clarification thread delivers each + // ask; the requester replies with add_request_message. + if authorType == req.RecipientType && authorID == req.RecipientID && + req.RequesterType == "agent" && req.RequesterID != "" { + s.notifyRequesterAgent(ctx, req, + "request-message:"+messageID, + fmt.Sprintf("More info requested on your %s request %q (id %s): %s\nReply with add_request_message.", + req.Kind, req.Title, req.ID, body)) + } + return messageID, nil } diff --git a/workspace-server/internal/handlers/requests.go b/workspace-server/internal/handlers/requests.go index 992932dc..7ecf74fb 100644 --- a/workspace-server/internal/handlers/requests.go +++ b/workspace-server/internal/handlers/requests.go @@ -322,6 +322,13 @@ func (h *RequestsHandler) Respond(c *gin.Context) { // More-Info thread. When the author is the recipient, the request flips to // info_requested. // +// Workspace-token auth path (/workspaces/:id/requests/:requestId/messages): +// the caller must be a participant (requester or recipient), and the author +// identity is BOUND to the authenticated workspace — body author_type/author_id +// are ignored. This prevents a workspace-token holder from spoofing another +// party or flipping an unrelated request to info_requested (core#2542 / +// core#2606). +// // @Summary Add a message to a request's More-Info thread // @Tags requests // @Accept json @@ -330,6 +337,7 @@ func (h *RequestsHandler) Respond(c *gin.Context) { // @Param body body AddRequestMessageBody true "Message" // @Success 201 {object} RequestMutationResponse // @Failure 400 {object} ErrorResponse +// @Failure 403 {object} ErrorResponse // @Failure 404 {object} ErrorResponse // @Failure 500 {object} ErrorResponse // @Router /requests/{requestId}/messages [post] @@ -344,7 +352,34 @@ func (h *RequestsHandler) AddMessage(c *gin.Context) { return } - messageID, err := h.store().AddMessage(ctx, requestID, body.AuthorType, body.AuthorID, body.Body) + authorType := body.AuthorType + authorID := body.AuthorID + + // Workspace-token auth path: bind author to the authenticated workspace and + // verify the caller is a participant. + if workspaceID := c.Param("id"); workspaceID != "" { + authorType = "agent" + authorID = workspaceID + + reqRow, err := h.store().Get(ctx, requestID) + if err != nil { + if errors.Is(err, ErrRequestNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "request not found"}) + return + } + log.Printf("AddMessage authz error request=%s: %v", requestID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add message"}) + return + } + isParty := (reqRow.RequesterType == "agent" && reqRow.RequesterID == workspaceID) || + (reqRow.RecipientType == "agent" && reqRow.RecipientID == workspaceID) + if !isParty { + c.JSON(http.StatusForbidden, gin.H{"error": "not a participant"}) + return + } + } + + messageID, err := h.store().AddMessage(ctx, requestID, authorType, authorID, body.Body) if err != nil { if errors.Is(err, ErrRequestNotFound) { c.JSON(http.StatusNotFound, gin.H{"error": "request not found"}) diff --git a/workspace-server/internal/handlers/requests_test.go b/workspace-server/internal/handlers/requests_test.go index 830e2f1e..75156b73 100644 --- a/workspace-server/internal/handlers/requests_test.go +++ b/workspace-server/internal/handlers/requests_test.go @@ -2,10 +2,13 @@ package handlers import ( "bytes" + "context" "encoding/json" "net/http" "net/http/httptest" + "strings" "testing" + "time" "github.com/DATA-DOG/go-sqlmock" "github.com/gin-gonic/gin" @@ -648,6 +651,108 @@ func TestRequests_AddMessage_RequesterDoesNotFlip(t *testing.T) { } } +// ---------- AddMessage workspace-token authz (core#2542 / core#2606) ---------- + +func TestRequests_AddMessage_AgentPath_Recipient_BindsToCaller(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // URL workspace ws-2 is the recipient. The body tries to spoof ws-EVIL. + // The handler's authz Get AND the store's own Get both fetch the row. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending")) + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending")) + // Handler must bind author to ws-2, not the spoofed body value. + mock.ExpectQuery("INSERT INTO request_messages"). + WithArgs("req-1", "agent", "ws-2", "which file?"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-1")) + mock.ExpectExec("UPDATE requests SET status = 'info_requested'"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{ + {Key: "requestId", Value: "req-1"}, + {Key: "id", Value: "ws-2"}, + } + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"which file?","author_type":"agent","author_id":"ws-EVIL"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.AddMessage(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_AddMessage_AgentPath_Requester_BindsToCaller(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // URL workspace ws-1 is the requester. Body author_id is ignored. + // Handler authz Get + store Get → two fetches. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested")) + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested")) + mock.ExpectQuery("INSERT INTO request_messages"). + WithArgs("req-1", "agent", "ws-1", "here is the file"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-2")) + // No status flip — requester is not the recipient. + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{ + {Key: "requestId", Value: "req-1"}, + {Key: "id", Value: "ws-1"}, + } + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"here is the file","author_type":"agent","author_id":"ws-EVIL"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.AddMessage(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_AddMessage_AgentPath_NonParticipant_403(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // URL workspace ws-3 is neither requester nor recipient. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{ + {Key: "requestId", Value: "req-1"}, + {Key: "id", Value: "ws-3"}, + } + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"pwned","author_type":"agent","author_id":"ws-2"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.AddMessage(c) + + if w.Code != http.StatusForbidden { + t.Fatalf("expected 403 for non-participant, got %d: %s", w.Code, w.Body.String()) + } +} + // ---------- Cancel ---------- func TestRequests_Cancel_Success(t *testing.T) { @@ -799,3 +904,148 @@ func TestRequests_ListPending_InvalidKind(t *testing.T) { t.Errorf("expected 400 for invalid kind, got %d", w.Code) } } + +// ---------- Requester notification on respond / more-info (CTO 2026-06-11) ---------- + +// interceptRequestNotify swaps the package-level enqueue for the test and +// returns a capture slice + restore func. +func interceptRequestNotify(t *testing.T) *[]map[string]string { + t.Helper() + captured := &[]map[string]string{} + prev := requestNotifyEnqueue + requestNotifyEnqueue = func(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idemKey string, expiresAt *time.Time) (string, int, error) { + *captured = append(*captured, map[string]string{ + "workspace_id": workspaceID, + "method": method, + "idem": idemKey, + "body": string(body), + }) + return "q-1", 1, nil + } + t.Cleanup(func() { requestNotifyEnqueue = prev }) + return captured +} + +// TestRequests_Respond_NotifiesRequesterAgent: a terminal response on a +// request raised by an AGENT must enqueue a message/send turn to that agent +// — the user clicking Done/Approve must actually reach the requester. +func TestRequests_Respond_NotifiesRequesterAgent(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + captured := interceptRequestNotify(t) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "approval", "ws-agent-1", "user", "", "pending")) + mock.ExpectExec("UPDATE requests"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"approved","responder_id":"u-1"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Respond(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if len(*captured) != 1 { + t.Fatalf("expected 1 requester notification, got %d", len(*captured)) + } + n := (*captured)[0] + if n["workspace_id"] != "ws-agent-1" || n["method"] != "message/send" { + t.Errorf("notification misrouted: %+v", n) + } + if n["idem"] != "request-responded:req-1" { + t.Errorf("idempotency key = %q", n["idem"]) + } + if !strings.Contains(n["body"], "approved") || !strings.Contains(n["body"], "Some title") { + t.Errorf("notification body missing outcome/title: %s", n["body"]) + } +} + +// TestRequests_AddMessage_MoreInfo_NotifiesRequesterAgent: a recipient-authored +// More-Info message must reach the requester agent as a turn carrying the ask. +func TestRequests_AddMessage_MoreInfo_NotifiesRequesterAgent(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + captured := interceptRequestNotify(t) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-2"). + WillReturnRows(oneRequestRow("req-2", "task", "ws-agent-1", "user", "u-1", "pending")) + mock.ExpectQuery("INSERT INTO request_messages"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-9")) + mock.ExpectExec("UPDATE requests SET status = 'info_requested'"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-2"}} + c.Request = httptest.NewRequest("POST", "/", bytesNewBufferStringHelper(`{"author_type":"user","author_id":"u-1","body":"which environment do you mean?"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.AddMessage(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + if len(*captured) != 1 { + t.Fatalf("expected 1 requester notification, got %d", len(*captured)) + } + n := (*captured)[0] + if n["workspace_id"] != "ws-agent-1" || n["idem"] != "request-message:msg-9" { + t.Errorf("notification misrouted: %+v", n) + } + if !strings.Contains(n["body"], "which environment do you mean?") { + t.Errorf("notification body missing ask: %s", n["body"]) + } +} + +// TestRequests_Respond_NoNotifyForUserRequester: a request raised by a USER +// must not enqueue an agent notification on respond. +func TestRequests_Respond_NoNotifyForUserRequester(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + captured := interceptRequestNotify(t) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-3"). + WillReturnRows(sqlmock.NewRows(requestColumnNames).AddRow( + "req-3", "task", "user", "u-1", nil, + "agent", "ws-agent-2", "Some title", nil, "pending", + nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil, + )) + mock.ExpectExec("UPDATE requests"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-3"}} + c.Request = httptest.NewRequest("POST", "/", bytesNewBufferStringHelper(`{"action":"done","responder_type":"agent","responder_id":"ws-agent-2"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Respond(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if len(*captured) != 0 { + t.Fatalf("expected no notification for user requester, got %d", len(*captured)) + } +} + +// bytesNewBufferStringHelper keeps the new tests free of an extra import +// alias; identical to bytes.NewBufferString. +func bytesNewBufferStringHelper(s string) *bytes.Buffer { return bytes.NewBufferString(s) }