Merge pull request #2833 from Molecule-AI/feat/rfc2829-pr2-result-push-and-sync-cutover

feat(delegations): result-push to caller inbox behind feature flag (RFC #2829 PR-2)
This commit is contained in:
Hongming Wang 2026-05-05 04:30:44 +00:00 committed by GitHub
commit 1e12ed7e9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 321 additions and 0 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"log"
"net/http"
"os"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@ -13,6 +14,68 @@ import (
"github.com/google/uuid"
)
// delegationResultInboxPushEnabled gates the RFC #2829 PR-2 result-push
// behavior: when callee POSTs `status=completed` (or `failed`) via
// /workspaces/:id/delegations/:delegation_id/update, ALSO write an
// `activity_type='a2a_receive'` row to the caller's activity_logs.
//
// Why a flag: the caller's inbox poller (workspace/inbox.py) queries
// `?type=a2a_receive` to surface inbound messages to the agent. Adding
// a2a_receive rows for delegation results is the universal-sized fix for
// the 600s message/send timeout class — long-running delegations no
// longer rely on the proxy holding the HTTP connection open. But it is
// observable behavior change (existing agents start seeing delegation
// results in their inbox where they didn't before), so we flag it for
// staging burn-in before flipping default.
//
// Default: off. Staging-canary first; flip to on after RFC #2829 PR-3
// (agent-side cutover) lands and proves the round-trip end-to-end.
func delegationResultInboxPushEnabled() bool {
return os.Getenv("DELEGATION_RESULT_INBOX_PUSH") == "1"
}
// pushDelegationResultToInbox writes the inbox-visible row for a
// completed/failed delegation. Best-effort: a failure logs but does NOT
// fail the parent UpdateStatus — the existing delegate_result row in
// activity_logs is still authoritative for the dashboard.
//
// Caller (sourceID) is the workspace that initiated the delegation; the
// inbox row lands in their activity_logs so wait_for_message picks it up.
//
// Body shape mirrors a2a_receive rows produced by the proxy on a
// successful synchronous reply: response_body.text carries the agent's
// answer, request_body.delegation_id correlates back to the originating
// row.
func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, status, responsePreview, errorDetail string) {
if !delegationResultInboxPushEnabled() {
return
}
respPayload := map[string]interface{}{
"text": responsePreview,
"delegation_id": delegationID,
}
respJSON, _ := json.Marshal(respPayload)
reqJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
})
logStatus := "ok"
if status == "failed" {
logStatus = "error"
}
summary := "Delegation result delivered"
if status == "failed" {
summary = "Delegation failed"
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (
workspace_id, activity_type, method, source_id,
summary, request_body, response_body, status, error_detail
) VALUES ($1, 'a2a_receive', 'delegate_result', $2, $3, $4::jsonb, $5::jsonb, $6, NULLIF($7, ''))
`, sourceID, sourceID, summary, string(reqJSON), string(respJSON), logStatus, errorDetail); err != nil {
log.Printf("Delegation %s: inbox-push insert failed: %v", delegationID, err)
}
}
// Delegation status lifecycle:
// pending → dispatched → received → in_progress → completed | failed
//
@ -289,6 +352,8 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_FAILED", sourceID, map[string]interface{}{
"delegation_id": delegationID, "target_id": targetID, "error": proxyErr.Error(),
})
// RFC #2829 PR-2 result-push (see UpdateStatus for rationale).
pushDelegationResultToInbox(ctx, sourceID, delegationID, "failed", "", proxyErr.Error())
return
}
@ -349,6 +414,8 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
"target_id": targetID,
"response_preview": truncate(responseText, 200),
})
// RFC #2829 PR-2 result-push (see UpdateStatus for rationale).
pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", responseText, "")
}
// updateDelegationStatus updates the status of a delegation record in activity_logs.
@ -459,11 +526,19 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
"delegation_id": delegationID,
"response_preview": truncate(body.ResponsePreview, 200),
})
// RFC #2829 PR-2 result-push: when the gate is on, also write an
// a2a_receive row so the caller's inbox poller surfaces this to
// the agent. Foundational for getting rid of the proxy-blocked
// sync path that hits the 600s message/send timeout — once the
// agent-side cutover lands, the caller polls its own inbox for
// the result instead of holding open an HTTP connection.
pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", body.ResponsePreview, "")
} else {
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_FAILED", sourceID, map[string]interface{}{
"delegation_id": delegationID,
"error": body.Error,
})
pushDelegationResultToInbox(ctx, sourceID, delegationID, "failed", "", body.Error)
}
c.JSON(http.StatusOK, gin.H{"status": body.Status, "delegation_id": delegationID})

View File

@ -0,0 +1,246 @@
package handlers
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// delegation_inbox_push_test.go — coverage for the RFC #2829 PR-2
// result-push behavior. The push is feature-flagged via
// DELEGATION_RESULT_INBOX_PUSH=1; default off keeps the existing
// strict-sqlmock test surface unchanged.
//
// What we pin:
// 1. Flag off (default) → no a2a_receive INSERT fires.
// 2. Flag on, status=completed → a2a_receive row written with the
// response_preview and no error_detail.
// 3. Flag on, status=failed → a2a_receive row written with status=error
// and the error_detail set.
// 4. INSERT failure on inbox-push does NOT bubble up — UpdateStatus
// still returns 200.
// ---------- pushDelegationResultToInbox in isolation ----------
func TestPushDelegationResultToInbox_FlagOff_NoSQL(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "")
pushDelegationResultToInbox(
context.Background(),
"caller", "deleg-1", "completed", "answer body", "",
)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("flag off must not fire SQL: %v", err)
}
}
func TestPushDelegationResultToInbox_FlagOn_CompletedInsertsA2AReceiveRow(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1")
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"caller-ws",
"caller-ws", // source_id mirrors workspace_id
"Delegation result delivered",
sqlmock.AnyArg(), // request_body json
sqlmock.AnyArg(), // response_body json
"ok",
"", // error_detail empty for completed
).
WillReturnResult(sqlmock.NewResult(0, 1))
pushDelegationResultToInbox(
context.Background(),
"caller-ws", "deleg-1", "completed", "answer body", "",
)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestPushDelegationResultToInbox_FlagOn_FailedInsertsErrorRow(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1")
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"caller-ws",
"caller-ws",
"Delegation failed",
sqlmock.AnyArg(),
sqlmock.AnyArg(),
"error",
"target unreachable",
).
WillReturnResult(sqlmock.NewResult(0, 1))
pushDelegationResultToInbox(
context.Background(),
"caller-ws", "deleg-2", "failed", "", "target unreachable",
)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ---------- UpdateStatus end-to-end ----------
func TestUpdateStatus_FlagOn_PushesA2AReceiveOnCompleted(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1")
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// 1. updateDelegationStatus — UPDATE activity_logs SET status='completed'
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("completed", "", "ws-source", "deleg-9").
WillReturnResult(sqlmock.NewResult(0, 1))
// 2. existing delegate_result INSERT (caller-side dashboard view)
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"ws-source", "ws-source",
sqlmock.AnyArg(), // summary
sqlmock.AnyArg(), // response_body
).
WillReturnResult(sqlmock.NewResult(0, 1))
// 3. NEW: PR-2 a2a_receive row for inbox-poller
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"ws-source", "ws-source",
"Delegation result delivered",
sqlmock.AnyArg(),
sqlmock.AnyArg(),
"ok",
"",
).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "ws-source"},
{Key: "delegation_id", Value: "deleg-9"},
}
body := `{"status":"completed","response_preview":"all done"}`
c.Request = httptest.NewRequest("POST",
"/workspaces/ws-source/delegations/deleg-9/update",
bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
dh.UpdateStatus(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestUpdateStatus_FlagOn_PushesA2AReceiveOnFailed(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "1")
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// 1. updateDelegationStatus — UPDATE activity_logs
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("failed", "boom", "ws-source", "deleg-10").
WillReturnResult(sqlmock.NewResult(0, 1))
// 2. NEW: PR-2 a2a_receive row for inbox-poller (failure path doesn't
// have the existing delegate_result INSERT — only the new push).
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"ws-source", "ws-source",
"Delegation failed",
sqlmock.AnyArg(),
sqlmock.AnyArg(),
"error",
"boom",
).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "ws-source"},
{Key: "delegation_id", Value: "deleg-10"},
}
body := `{"status":"failed","error":"boom"}`
c.Request = httptest.NewRequest("POST",
"/workspaces/ws-source/delegations/deleg-10/update",
bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
dh.UpdateStatus(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestUpdateStatus_FlagOff_NoNewSQL — sanity check that the existing
// behavior is preserved when the flag is off. Critical for safe rollout.
func TestUpdateStatus_FlagOff_NoNewSQL(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
// explicitly empty — flag off
t.Setenv("DELEGATION_RESULT_INBOX_PUSH", "")
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Only the two pre-existing queries — no third (a2a_receive) INSERT.
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("completed", "", "ws-source", "deleg-11").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"ws-source", "ws-source",
sqlmock.AnyArg(),
sqlmock.AnyArg(),
).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "ws-source"},
{Key: "delegation_id", Value: "deleg-11"},
}
c.Request = httptest.NewRequest("POST",
"/workspaces/ws-source/delegations/deleg-11/update",
bytes.NewBufferString(`{"status":"completed","response_preview":"ok"}`))
c.Request.Header.Set("Content-Type", "application/json")
dh.UpdateStatus(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("flag-off must not fire extra SQL: %v", err)
}
}