Compare commits

..

1 Commits

Author SHA1 Message Date
fullstack-engineer 0bf7eb92e5 test(handlers): add HTTP handler tests for GetA2AQueueStatus and PatchAbilities
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 42s
CI / Detect changes (pull_request) Successful in 1m25s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m6s
Harness Replays / detect-changes (pull_request) Successful in 40s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m14s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 36s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m15s
qa-review / approved (pull_request) Successful in 38s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 2m16s
security-review / approved (pull_request) Successful in 44s
gate-check-v3 / gate-check (pull_request) Successful in 47s
sop-checklist / all-items-acked (pull_request) Successful in 51s
sop-tier-check / tier-check (pull_request) Successful in 46s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 25s
CI / Python Lint & Test (pull_request) Successful in 23s
Harness Replays / Harness Replays (pull_request) Successful in 21s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 36s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 3m33s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 7m15s
CI / Canvas (Next.js) (pull_request) Successful in 20m11s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Failing after 21m51s
CI / all-required (pull_request) Successful in 6s
GetA2AQueueStatus (9 cases):
- queue_id empty → 400 (via gin.CreateTestContext to bypass router)
- no identity, no org token → 404 (existence-non-inference policy)
- org token set → skips caller workspace check (authorized)
- caller workspace matches caller_id → 200
- caller workspace matches workspace_id → 200
- queue not found (sql.ErrNoRows) → 404
- queue auth-fields DB error → 500
- wrong caller workspace → 404 (IDOR collapsed to 404)
- status fetch DB error → 500
- full happy path → 200 with JSON body

PatchAbilities (11 cases):
- invalid workspace ID → 400
- invalid request body → 400
- no ability fields → 400
- workspace not found (ErrNoRows) → 404
- workspace not found (exists=false) → 404
- update broadcast_enabled=true → 200
- update talk_to_user_enabled=false → 200
- update both abilities → 200
- broadcast_enabled DB error → 500
- talk_to_user_enabled DB error → 500

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 19:36:21 +00:00
22 changed files with 1189 additions and 892 deletions
@@ -0,0 +1,408 @@
package handlers
import (
"database/sql"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// setupQueueStatusHandlerDB creates a sqlmock DB with QueryMatcherEqual for exact SQL string matching.
func setupQueueStatusHandlerDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
return mock
}
// Exact SQL strings used by the production code.
const (
sqlQueueRowAuthFields = `SELECT caller_id, workspace_id FROM a2a_queue WHERE id = $1`
sqlQueueStatusByID = `
SELECT
q.id,
q.workspace_id,
q.status,
q.priority,
q.attempts,
q.last_error,
q.enqueued_at::text,
q.dispatched_at::text,
q.completed_at::text,
q.expires_at::text,
al.response_body::text
FROM a2a_queue q
LEFT JOIN activity_logs al
ON al.method = 'delegate_result'
AND al.target_id = q.workspace_id
AND al.workspace_id = q.caller_id
AND al.response_body->>'delegation_id' = (q.body->'params'->'message'->'metadata'->>'delegation_id')
WHERE q.id = $1`
)
// ── GetA2AQueueStatus HTTP handler tests ──────────────────────────────────────
// TestGetA2AQueueStatus_QueueIDEmpty_Returns400 exercises the handler directly
// (not via router) so we can verify the empty-value branch without relying on
// Gin route-matching behaviour.
func TestGetA2AQueueStatus_QueueIDEmpty_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"}}
// queue_id param is empty string
c.Params = gin.Params{
{Key: "id", Value: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"},
{Key: "queue_id", Value: ""},
}
c.Request = httptest.NewRequest(http.MethodGet, "/", nil)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400", w.Code)
}
}
func TestGetA2AQueueStatus_NoIdentity_NoOrgToken_Returns404(t *testing.T) {
gin.SetMode(gin.TestMode)
setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/wsid/a2a/queue/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
// No identity derivable → 404 (not 401) per existence-non-inference policy.
if w.Code != http.StatusNotFound {
t.Errorf("got %d, want 404", w.Code)
}
}
func TestGetA2AQueueStatus_OrgToken_SkipsCallerCheck(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
authRows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("other-ws", wsID)
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnRows(authRows)
statusRows := sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
queueID, wsID, "queued", 50, 0,
nil, "2026-01-01T00:00:00Z", nil, nil, nil, nil,
)
mock.ExpectQuery(sqlQueueStatusByID).
WithArgs(queueID).
WillReturnRows(statusRows)
r := gin.New()
// Simulate org-token middleware setting org_token_id.
r.GET("/workspaces/:id/a2a/queue/:queue_id", func(c *gin.Context) {
c.Set("org_token_id", "org-admin")
h.GetA2AQueueStatus(c)
})
req := httptest.NewRequest(http.MethodGet, "/workspaces/wsid/a2a/queue/"+queueID, nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_CallerWorkspaceMatchesCallerID_Success(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
authRows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow(callerID, wsID)
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnRows(authRows)
statusRows := sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
queueID, wsID, "completed", 50, 1,
nil, "2026-01-01T00:00:00Z", "2026-01-01T00:01:00Z", "2026-01-01T00:02:00Z",
nil, []byte(`{"text":"result"}`),
)
mock.ExpectQuery(sqlQueueStatusByID).
WithArgs(queueID).
WillReturnRows(statusRows)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", callerID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_CallerWorkspaceMatchesWorkspaceID_Success(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
authRows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow(callerID, wsID)
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnRows(authRows)
statusRows := sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
queueID, wsID, "queued", 50, 0,
nil, "2026-01-01T00:00:00Z", nil, nil, nil, nil,
)
mock.ExpectQuery(sqlQueueStatusByID).
WithArgs(queueID).
WillReturnRows(statusRows)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", wsID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_QueueNotFound_Returns404(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnError(sql.ErrNoRows)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", callerID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("got %d, want 404: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_QueueAuthFieldsDBError_Returns500(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnError(sql.ErrConnDone)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", callerID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusInternalServerError {
t.Errorf("got %d, want 500: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_WrongCallerWorkspace_Returns404(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
wrongCaller := "dddddddd-dddd-dddd-dddd-dddddddddddd"
authRows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow(callerID, wsID)
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnRows(authRows)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", wrongCaller)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("got %d, want 404: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_StatusFetchDBError_Returns500(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
authRows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow(callerID, wsID)
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnRows(authRows)
mock.ExpectQuery(sqlQueueStatusByID).
WithArgs(queueID).
WillReturnError(sql.ErrConnDone)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", callerID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusInternalServerError {
t.Errorf("got %d, want 500: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestGetA2AQueueStatus_FullHappyPath_ReturnsJSON(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupQueueStatusHandlerDB(t)
h := &WorkspaceHandler{}
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
authRows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow(callerID, wsID)
mock.ExpectQuery(sqlQueueRowAuthFields).
WithArgs(queueID).
WillReturnRows(authRows)
respBody := []byte(`{"text":"delegation result"}`)
statusRows := sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
queueID, wsID, "completed", 50, 1,
nil, "2026-01-01T00:00:00Z", "2026-01-01T00:01:00Z", "2026-01-01T00:02:00Z",
nil, respBody,
)
mock.ExpectQuery(sqlQueueStatusByID).
WithArgs(queueID).
WillReturnRows(statusRows)
r := gin.New()
r.GET("/workspaces/:id/a2a/queue/:queue_id", h.GetA2AQueueStatus)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID+"/a2a/queue/"+queueID, nil)
req.Header.Set("X-Workspace-ID", wsID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if w.Body.Len() == 0 {
t.Error("response body is empty")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -14,18 +14,16 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
type ActivityHandler struct {
broadcaster *events.Broadcaster
notifier *push.Notifier
}
func NewActivityHandler(b *events.Broadcaster, notifier *push.Notifier) *ActivityHandler {
return &ActivityHandler{broadcaster: b, notifier: notifier}
func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
return &ActivityHandler{broadcaster: b}
}
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
@@ -478,7 +476,7 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
for _, a := range body.Attachments {
attachments = append(attachments, AgentMessageAttachment(a))
}
writer := NewAgentMessageWriter(db.DB, h.broadcaster, h.notifier)
writer := NewAgentMessageWriter(db.DB, h.broadcaster)
if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
@@ -40,7 +40,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -69,7 +69,7 @@ func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -101,7 +101,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -137,7 +137,7 @@ func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -41,7 +41,7 @@ func TestActivityHandler_SinceSecs_Accepted(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -70,7 +70,7 @@ func TestActivityHandler_SinceSecs_ClampedAt30Days(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -106,7 +106,7 @@ func TestActivityHandler_SinceSecs_InvalidRejected(t *testing.T) {
// No DB call expected; bad input must be caught before the query.
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -142,7 +142,7 @@ func TestActivityHandler_SinceSecs_Omitted(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -22,7 +22,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
rows := sqlmock.NewRows([]string{
"kind", "id", "workspace_id", "label", "content", "method", "status", "request_body", "response_body", "created_at",
@@ -68,7 +68,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
func TestActivityList_SourceCanvas(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Expect query with "source_id IS NULL"
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NULL`).
@@ -97,7 +97,7 @@ func TestActivityList_SourceCanvas(t *testing.T) {
func TestActivityList_SourceAgent(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Expect query with "source_id IS NOT NULL"
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NOT NULL`).
@@ -126,7 +126,7 @@ func TestActivityList_SourceAgent(t *testing.T) {
func TestActivityList_SourceInvalid(t *testing.T) {
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -142,7 +142,7 @@ func TestActivityList_SourceInvalid(t *testing.T) {
func TestActivityList_SourceWithType(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Both type and source filters
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NULL`).
@@ -181,7 +181,7 @@ const testPeerUUID = "11111111-2222-3333-4444-555555555555"
func TestActivityList_PeerIDFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// peer_id binds twice in the query (source_id OR target_id) but is
// added to args once — sqlmock matches positional args, so the
@@ -220,7 +220,7 @@ func TestActivityList_PeerIDComposesWithType(t *testing.T) {
// of the builder can't silently rearrange placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
mock.ExpectQuery(
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`,
@@ -258,7 +258,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
// otherwise interpolate the value into the URL or another query.
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
for _, bad := range []string{
"not-a-uuid",
@@ -292,7 +292,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
func TestActivityList_BeforeTSFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
mock.ExpectQuery(
@@ -328,7 +328,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
// can't silently drop one filter or reorder placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
mock.ExpectQuery(
@@ -363,7 +363,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
func TestActivityList_BeforeTSRejectsInvalidFormat(t *testing.T) {
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
for _, bad := range []string{
"yesterday",
@@ -400,7 +400,7 @@ func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -426,7 +426,7 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) {
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -478,7 +478,7 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -527,7 +527,7 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -593,7 +593,7 @@ func TestNotify_RejectsAttachmentWithEmptyURIOrName(t *testing.T) {
// only if the handler unexpectedly queries.
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -647,7 +647,7 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
WillReturnError(fmt.Errorf("simulated db hiccup"))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -44,7 +44,6 @@ import (
"log"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/textutil"
)
@@ -82,14 +81,12 @@ type AgentMessageAttachment struct {
type AgentMessageWriter struct {
db *sql.DB
broadcaster events.EventEmitter
notifier *push.Notifier
}
// NewAgentMessageWriter binds the writer to the platform's DB pool +
// WebSocket broadcaster. notifier may be nil if push notifications are
// not configured.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter, notifier *push.Notifier) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster, notifier: notifier}
// WebSocket broadcaster.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster}
}
// Send delivers a single agent → user message. Look up + broadcast +
@@ -144,12 +141,7 @@ func (w *AgentMessageWriter) Send(
}
w.broadcaster.BroadcastOnly(workspaceID, string(events.EventAgentMessage), broadcastPayload)
// 3. Send push notifications to mobile devices.
if w.notifier != nil {
w.notifier.NotifyAgentMessage(ctx, workspaceID, wsName, message)
}
// 4. Persist for chat-history hydration. response_body shape MUST stay
// 3. Persist for chat-history hydration. response_body shape MUST stay
// in sync with extractResponseText + extractFilesFromTask in
// canvas/src/components/tabs/chat/historyHydration.ts:
// - extractResponseText reads body.result (string) → renders text
@@ -86,7 +86,7 @@ func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType strin
// path: workspace lookup, broadcast, INSERT, return nil.
func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-1").
@@ -114,7 +114,7 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
// Drift here = chips disappear on chat reload.
func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-att").
@@ -171,7 +171,7 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter, nil)
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-missing").
@@ -200,7 +200,7 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
// broadcast.
func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbfail").
@@ -221,7 +221,7 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
// table doesn't carry multi-KB summaries that bloat list queries.
func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-trunc").
@@ -261,7 +261,7 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter, nil)
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-bc").
@@ -312,7 +312,7 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
// real incidents in alerting.
func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
transientErr := errors.New("connection refused")
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
@@ -344,7 +344,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
// coverage. Now it does.
func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
// 200-rune CJK message — exceeds the 80-rune cap, would have hit
// the byte-slice bug.
@@ -393,7 +393,7 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter, nil)
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-noatt").
@@ -631,7 +631,7 @@ func TestActivityHandler_List(t *testing.T) {
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -680,7 +680,7 @@ func TestActivityHandler_ListByType(t *testing.T) {
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -708,7 +708,7 @@ func TestActivityHandler_Report(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Expect the INSERT into activity_logs
mock.ExpectExec("INSERT INTO activity_logs").
@@ -737,7 +737,7 @@ func TestActivityHandler_Report_InvalidType(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -965,7 +965,7 @@ func TestActivityHandler_ListEmpty(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -999,7 +999,7 @@ func TestActivityHandler_ListCustomLimit(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1032,7 +1032,7 @@ func TestActivityHandler_ListMaxLimit(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1060,7 +1060,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1091,7 +1091,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
func TestActivityHandler_ReportMissingBody(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1165,7 +1165,7 @@ func TestActivityHandler_Report_SourceIDSpoofRejected(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1188,7 +1188,7 @@ func TestActivityHandler_Report_MatchingSourceIDAccepted(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1218,7 +1218,7 @@ func TestActivityHandler_Report_SourceIDLogInjection(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
+2 -5
View File
@@ -34,7 +34,6 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/gin-gonic/gin"
)
@@ -85,7 +84,6 @@ type mcpTool struct {
type MCPHandler struct {
database *sql.DB
broadcaster *events.Broadcaster
notifier *push.Notifier
// memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe:
// every v2 tool calls memoryV2Available() first and returns a
@@ -96,9 +94,8 @@ type MCPHandler struct {
// NewMCPHandler wires the handler to db and broadcaster.
// Pass db.DB and the platform broadcaster at router-setup time.
// notifier may be nil if push notifications are not configured.
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster, notifier *push.Notifier) *MCPHandler {
return &MCPHandler{database: database, broadcaster: broadcaster, notifier: notifier}
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandler {
return &MCPHandler{database: database, broadcaster: broadcaster}
}
// ─────────────────────────────────────────────────────────────────────────────
@@ -26,7 +26,7 @@ import (
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
h := NewMCPHandler(db.DB, newTestBroadcaster(), nil)
h := NewMCPHandler(db.DB, newTestBroadcaster())
return h, mock
}
@@ -392,7 +392,7 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
// (the tool args don't accept them); pass nil. If a future tool
// schema adds an attachments arg, build []AgentMessageAttachment
// and pass through.
writer := NewAgentMessageWriter(h.database, h.broadcaster, h.notifier)
writer := NewAgentMessageWriter(h.database, h.broadcaster)
if err := writer.Send(ctx, workspaceID, message, nil); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
return "", fmt.Errorf("workspace not found")
@@ -0,0 +1,317 @@
package handlers
import (
"database/sql"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// setupAbilitiesDB creates a sqlmock DB with QueryMatcherEqual for exact SQL matching.
func setupAbilitiesDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
return mock
}
// Exact SQL strings used by the production handler.
const (
sqlPatchAbilitiesExists = `SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND status != 'removed')`
sqlPatchBroadcastEnabled = `UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`
sqlPatchTalkToUserEnabled = `UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`
)
// ── PatchAbilities HTTP handler tests ──────────────────────────────────────────
func TestPatchAbilities_InvalidWorkspaceID_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupAbilitiesDB(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/not-a-uuid/abilities", nil)
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400", w.Code)
}
}
func TestPatchAbilities_InvalidBody_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupAbilitiesDB(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"}}
c.Request = httptest.NewRequest(http.MethodPatch,
"/workspaces/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/abilities",
newFakeCloser([]byte("not json")))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400", w.Code)
}
}
func TestPatchAbilities_NoAbilityFields_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupAbilitiesDB(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"}}
c.Request = httptest.NewRequest(http.MethodPatch,
"/workspaces/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/abilities",
newFakeCloser([]byte(`{}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400", w.Code)
}
}
func TestPatchAbilities_WorkspaceNotFound_Returns404(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"broadcast_enabled":true}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusNotFound {
t.Errorf("got %d, want 404: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestPatchAbilities_WorkspaceNotFound_ExistsFalse_Returns404(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"talk_to_user_enabled":false}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusNotFound {
t.Errorf("got %d, want 404: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestPatchAbilities_UpdateBroadcastEnabled_Success(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(sqlPatchBroadcastEnabled).
WithArgs(wsID, true).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"broadcast_enabled":true}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestPatchAbilities_UpdateTalkToUserEnabled_Success(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(sqlPatchTalkToUserEnabled).
WithArgs(wsID, false).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"talk_to_user_enabled":false}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestPatchAbilities_UpdateBothAbilities_Success(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(sqlPatchBroadcastEnabled).
WithArgs(wsID, true).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(sqlPatchTalkToUserEnabled).
WithArgs(wsID, false).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"broadcast_enabled":true,"talk_to_user_enabled":false}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestPatchAbilities_BroadcastEnabledDBError_Returns500(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(sqlPatchBroadcastEnabled).
WithArgs(wsID, true).
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"broadcast_enabled":true}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("got %d, want 500: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestPatchAbilities_TalkToUserEnabledDBError_Returns500(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupAbilitiesDB(t)
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlPatchAbilitiesExists).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(sqlPatchTalkToUserEnabled).
WithArgs(wsID, true).
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest(http.MethodPatch, "/workspaces/"+wsID+"/abilities",
newFakeCloser([]byte(`{"talk_to_user_enabled":true}`)))
c.Request.Header.Set("Content-Type", "application/json")
PatchAbilities(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("got %d, want 500: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ── Helpers ────────────────────────────────────────────────────────────────────
// newFakeCloser wraps a byte slice as an io.ReadCloser for request body injection.
func newFakeCloser(data []byte) *fakeReadCloser {
return &fakeReadCloser{data: data}
}
type fakeReadCloser struct {
data []byte
pos int
}
func (f *fakeReadCloser) Read(p []byte) (n int, err error) {
if f.pos >= len(f.data) {
return 0, nil
}
n = copy(p, f.data[f.pos:])
f.pos += n
return n, nil
}
func (*fakeReadCloser) Close() error { return nil }
@@ -0,0 +1,403 @@
package handlers
import (
"database/sql"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// broadcastBody is a convenience that returns an io.ReadCloser wrapping JSON body.
func broadcastBody(body string) io.ReadCloser {
return &broadcastFakeCloser{data: []byte(body)}
}
type broadcastFakeCloser struct {
data []byte
pos int
}
func (f *broadcastFakeCloser) Read(p []byte) (n int, err error) {
if f.pos >= len(f.data) {
return 0, io.EOF
}
n = copy(p, f.data[f.pos:])
f.pos += n
return n, nil
}
func (*broadcastFakeCloser) Close() error { return nil }
// setupBroadcastDB creates a sqlmock DB with QueryMatcherEqual.
func setupBroadcastDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
return mock
}
// Exact SQL strings from the production handler (whitespace must match verbatim).
const (
sqlBroadcastWorkspaceLookup = `SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`
sqlBroadcastRecipients = `SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`
sqlBroadcastReceiveInsert = `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')`
sqlBroadcastSentInsert = `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')`
)
// ── Broadcast HTTP handler tests ───────────────────────────────────────────────
func TestBroadcast_InvalidWorkspaceID_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/not-a-uuid/broadcast",
broadcastBody(`{"message":"hello"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingMessage_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost,
"/workspaces/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/broadcast",
broadcastBody(`{}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_EmptyMessage_Returns400(t *testing.T) {
gin.SetMode(gin.TestMode)
setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost,
"/workspaces/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/broadcast",
broadcastBody(`{"message":""}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("got %d, want 400: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_WorkspaceNotFound_Returns404(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlBroadcastWorkspaceLookup).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/broadcast",
broadcastBody(`{"message":"hello"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("got %d, want 404: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestBroadcast_BroadcastDisabled_Returns403(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlBroadcastWorkspaceLookup).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).
AddRow("test-workspace", false))
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/broadcast",
broadcastBody(`{"message":"hello"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusForbidden {
t.Errorf("got %d, want 403: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestBroadcast_NoRecipients_Success(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlBroadcastWorkspaceLookup).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).
AddRow("test-workspace", true))
// No recipients (sender is the only non-removed workspace)
mock.ExpectQuery(sqlBroadcastRecipients).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
// Sender's own activity log: 2 args (workspaceID, summary)
mock.ExpectExec(sqlBroadcastSentInsert).
WithArgs(wsID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/broadcast",
broadcastBody(`{"message":"hello everyone"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestBroadcast_WithRecipients_Success_DeliversToAll(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
recipient1 := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
recipient2 := "cccccccc-cccc-cccc-cccc-cccccccccccc"
mock.ExpectQuery(sqlBroadcastWorkspaceLookup).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).
AddRow("broadcaster-ws", true))
mock.ExpectQuery(sqlBroadcastRecipients).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow(recipient1).
AddRow(recipient2))
// broadcast_receive: 3 args (recipientID, senderID, summary)
mock.ExpectExec(sqlBroadcastReceiveInsert).
WithArgs(recipient1, sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(sqlBroadcastReceiveInsert).
WithArgs(recipient2, sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// broadcast_sent: 2 args (workspaceID, summary)
mock.ExpectExec(sqlBroadcastSentInsert).
WithArgs(wsID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/broadcast",
broadcastBody(`{"message":"hello team"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestBroadcast_RecipientInsertError_ContinuesAndSucceeds(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
recipient1 := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
recipient2 := "cccccccc-cccc-cccc-cccc-cccccccccccc"
mock.ExpectQuery(sqlBroadcastWorkspaceLookup).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).
AddRow("broadcaster-ws", true))
mock.ExpectQuery(sqlBroadcastRecipients).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow(recipient1).
AddRow(recipient2))
// First recipient insert fails — handler logs and continues
mock.ExpectExec(sqlBroadcastReceiveInsert).
WithArgs(recipient1, sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnError(sql.ErrConnDone)
// Second recipient succeeds
mock.ExpectExec(sqlBroadcastReceiveInsert).
WithArgs(recipient2, sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(sqlBroadcastSentInsert).
WithArgs(wsID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/broadcast",
broadcastBody(`{"message":"partial delivery"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestBroadcast_SenderActivityLogError_StillReturns200(t *testing.T) {
gin.SetMode(gin.TestMode)
mock := setupBroadcastDB(t)
h := NewBroadcastHandler(newTestBroadcaster())
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(sqlBroadcastWorkspaceLookup).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).
AddRow("broadcaster-ws", true))
mock.ExpectQuery(sqlBroadcastRecipients).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectExec(sqlBroadcastSentInsert).
WithArgs(wsID, sqlmock.AnyArg()).
WillReturnError(sql.ErrConnDone)
r := gin.New()
r.POST("/workspaces/:id/broadcast", h.Broadcast)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/broadcast",
broadcastBody(`{"message":"hello"}`))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
// Handler logs error but still returns 200
if w.Code != http.StatusOK {
t.Errorf("got %d, want 200: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ── broadcastTruncate pure function tests ─────────────────────────────────────
func TestBroadcastTruncate_UnderLimit(t *testing.T) {
input := "short message"
got := broadcastTruncate(input, 50)
if got != input {
t.Errorf("broadcastTruncate(%q, 50) = %q, want %q", input, got, input)
}
}
func TestBroadcastTruncate_ExactlyAtLimit(t *testing.T) {
input := "exactly fifty char"
got := broadcastTruncate(input, 18)
if got != input {
t.Errorf("broadcastTruncate(%q, 18) = %q, want %q", input, got, input)
}
}
func TestBroadcastTruncate_OverLimit_TruncatesAndAddsEllipsis(t *testing.T) {
// 150 ASCII chars → over 120 rune limit → truncate to 120 + ellipsis
input := strings.Repeat("x", 150)
got := broadcastTruncate(input, 120)
if len([]rune(got)) != 121 { // 120 + 1 ellipsis rune
t.Errorf("len(broadcastTruncate) = %d, want 121 (120 + ellipsis)", len([]rune(got)))
}
if got[:len(got)-len("…")] != strings.Repeat("x", 120) {
t.Errorf("broadcastTruncate did not truncate correctly")
}
}
func TestBroadcastTruncate_UnicodeChars_TreatsAsRunes(t *testing.T) {
// Each emoji is 1 rune but multiple bytes. 50 emojis > 30 limit.
input := strings.Repeat("🎉", 50)
got := broadcastTruncate(input, 30)
if len([]rune(got)) != 31 { // 30 + ellipsis
t.Errorf("len(broadcastTruncate with emoji) = %d, want 31", len([]rune(got)))
}
}
func TestBroadcastTruncate_ZeroLimit_ReturnsEllipsis(t *testing.T) {
got := broadcastTruncate("hello", 0)
if got != "…" {
t.Errorf("broadcastTruncate with max=0 = %q, want …", got)
}
}
@@ -207,7 +207,7 @@ func setupSwapEnv(t *testing.T) (*handlers.MCPHandler, *flatPlugin, sqlmock.Sqlm
resolver := namespace.New(db)
// MCPHandler needs a real *sql.DB; pass the sqlmock-backed one.
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
return h, plugin, mock
}
@@ -430,7 +430,7 @@ func TestE2E_PluginUnreachable_AgentSeesClearError(t *testing.T) {
db, _, _ := sqlmock.New()
defer db.Close()
resolver := namespace.New(db)
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
_, err := h.Dispatch(context.Background(), "root-1", "commit_memory_v2", map[string]interface{}{
"content": "x",
-75
View File
@@ -1,75 +0,0 @@
package push
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// Handler exposes HTTP endpoints for push-token management.
type Handler struct {
repo *Repo
}
// NewHandler creates a push-token HTTP handler.
func NewHandler(repo *Repo) *Handler {
return &Handler{repo: repo}
}
// RegisterRoutes mounts push-token routes on the given router group.
func (h *Handler) RegisterRoutes(rg *gin.RouterGroup) {
rg.POST("/push-tokens", h.Create)
rg.DELETE("/push-tokens", h.Delete)
}
// Create handles POST /push-tokens.
// Body: { "token": "ExponentPushToken[xxx]", "platform": "ios" | "android" }
func (h *Handler) Create(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
var body struct {
Token string `json:"token" binding:"required"`
Platform string `json:"platform" binding:"required,oneof=ios android"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.repo.SaveToken(c.Request.Context(), workspaceID, body.Token, body.Platform); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save token"})
return
}
c.Status(http.StatusNoContent)
}
// Delete handles DELETE /push-tokens.
// Body: { "token": "ExponentPushToken[xxx]" }
func (h *Handler) Delete(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
var body struct {
Token string `json:"token" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.repo.DeleteToken(c.Request.Context(), workspaceID, body.Token); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete token"})
return
}
c.Status(http.StatusNoContent)
}
-101
View File
@@ -1,101 +0,0 @@
package push
import (
"context"
"database/sql"
"log"
"time"
)
// Notifier sends push notifications for agent messages.
type Notifier struct {
repo *Repo
sender *Sender
}
// NewNotifier creates a Notifier.
func NewNotifier(db *sql.DB, sender *Sender) *Notifier {
return &Notifier{
repo: NewRepo(db),
sender: sender,
}
}
// NotifyAgentMessage sends a push notification to all registered devices for a
// workspace when an agent sends a message. It runs asynchronously (fire-and-
// forget) so the caller's WebSocket broadcast is never blocked.
func (n *Notifier) NotifyAgentMessage(ctx context.Context, workspaceID, workspaceName, message string) {
if n == nil || n.sender == nil {
return
}
// Capture values for the goroutine.
wsID := workspaceID
wsName := workspaceName
msg := message
go func() {
// Use a fresh context with timeout so a slow Expo API doesn't
// leak the caller's context deadline.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
tokens, err := n.repo.GetTokens(ctx, wsID)
if err != nil {
log.Printf("push: failed to get tokens for workspace %s: %v", wsID, err)
return
}
if len(tokens) == 0 {
return
}
// Expo accepts batches of up to ~100 messages; we cap lower to stay
// well under the limit.
const batchSize = 50
for i := 0; i < len(tokens); i += batchSize {
end := i + batchSize
if end > len(tokens) {
end = len(tokens)
}
batch := tokens[i:end]
messages := make([]Message, 0, len(batch))
for _, t := range batch {
messages = append(messages, Message{
To: t.Token,
Title: wsName,
Body: truncate(msg, 100),
Data: map[string]string{
"type": "agent_message",
"workspaceId": wsID,
"workspaceSlug": "", // populated by caller if available
},
Sound: "default",
Priority: "high",
})
}
results, err := n.sender.Send(ctx, messages)
if err != nil {
log.Printf("push: send failed for workspace %s: %v", wsID, err)
continue
}
// Remove invalid tokens.
for j, r := range results {
if ShouldRemoveToken(r) {
if delErr := n.repo.DeleteToken(ctx, wsID, batch[j].Token); delErr != nil {
log.Printf("push: failed to delete invalid token for workspace %s: %v", wsID, delErr)
}
}
}
}
}()
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "…"
}
-437
View File
@@ -1,437 +0,0 @@
package push
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSenderSend(t *testing.T) {
gin.SetMode(gin.TestMode)
expoResponse := map[string]interface{}{
"data": []map[string]interface{}{
{"status": "ok", "id": "abc123"},
{"status": "error", "message": "Invalid token", "details": map[string]string{"error": "DeviceNotRegistered"}},
},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "POST", r.Method)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
var msgs []Message
require.NoError(t, json.NewDecoder(r.Body).Decode(&msgs))
assert.Len(t, msgs, 2)
assert.Equal(t, "ExponentPushToken[test1]", msgs[0].To)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(expoResponse)
}))
defer server.Close()
sender := NewSender("")
sender.apiURL = server.URL
results, err := sender.Send(context.Background(), []Message{
{To: "ExponentPushToken[test1]", Title: "Test", Body: "Hello"},
{To: "ExponentPushToken[test2]", Title: "Test", Body: "World"},
})
require.NoError(t, err)
require.Len(t, results, 2)
assert.Equal(t, "ok", results[0].Status)
assert.Equal(t, "error", results[1].Status)
assert.True(t, ShouldRemoveToken(results[1]))
}
func TestSenderSendEmpty(t *testing.T) {
sender := NewSender("")
results, err := sender.Send(context.Background(), nil)
require.NoError(t, err)
assert.Nil(t, results)
}
func TestHandlerCreate_InvalidWorkspaceID(t *testing.T) {
gin.SetMode(gin.TestMode)
handler := NewHandler(NewRepo(nil))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"ios"}`
req, _ := http.NewRequest("POST", "/workspaces/not-a-uuid/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}
func TestHandlerCreate(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("INSERT INTO push_tokens").
WithArgs("11111111-1111-1111-1111-111111111111", "ExponentPushToken[abc]", "ios").
WillReturnResult(sqlmock.NewResult(1, 1))
repo := NewRepo(db)
handler := NewHandler(repo)
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"ios"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNoContent, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestHandlerCreateInvalidPlatform(t *testing.T) {
gin.SetMode(gin.TestMode)
db, _, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
handler := NewHandler(NewRepo(db))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"windows"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}
func TestHandlerDelete_BindingError(t *testing.T) {
gin.SetMode(gin.TestMode)
handler := NewHandler(NewRepo(nil))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{}` // missing required "token" field
req, _ := http.NewRequest("DELETE", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}
func TestHandlerDelete_InvalidWorkspaceID(t *testing.T) {
gin.SetMode(gin.TestMode)
handler := NewHandler(NewRepo(nil))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[del]"}`
req, _ := http.NewRequest("DELETE", "/workspaces/not-a-uuid/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}
func TestHandlerDelete(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("DELETE FROM push_tokens").
WithArgs("22222222-2222-2222-2222-222222222222", "ExponentPushToken[del]").
WillReturnResult(sqlmock.NewResult(0, 1))
repo := NewRepo(db)
handler := NewHandler(repo)
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[del]"}`
req, _ := http.NewRequest("DELETE", "/workspaces/22222222-2222-2222-2222-222222222222/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNoContent, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestHandlerCreate_DBSaveError(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("INSERT INTO push_tokens").
WithArgs("11111111-1111-1111-1111-111111111111", "ExponentPushToken[abc]", "ios").
WillReturnError(sql.ErrConnDone)
handler := NewHandler(NewRepo(db))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"ios"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusInternalServerError, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestHandlerDelete_DBError(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("DELETE FROM push_tokens").
WithArgs("22222222-2222-2222-2222-222222222222", "ExponentPushToken[del]").
WillReturnError(sql.ErrConnDone)
handler := NewHandler(NewRepo(db))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[del]"}`
req, _ := http.NewRequest("DELETE", "/workspaces/22222222-2222-2222-2222-222222222222/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusInternalServerError, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestSenderSend_HTTPError(t *testing.T) {
gin.SetMode(gin.TestMode)
// Server that hijacks the connection and closes it before sending a response,
// causing the HTTP client to receive a connection-closed error.
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Drain request body so client send completes.
io.Copy(io.Discard, r.Body)
// Hijack and immediately close — no response written.
conn, _, _ := w.(http.Hijacker).Hijack()
conn.Close()
}))
defer server.Close()
sender := NewSender("")
sender.apiURL = server.URL
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, err := sender.Send(ctx, []Message{
{To: "ExponentPushToken[test]", Title: "T", Body: "H"},
})
require.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "post:") || strings.Contains(err.Error(), "context"))
}
func TestSenderSend_Non200Response(t *testing.T) {
gin.SetMode(gin.TestMode)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(`{"error":"rate limited"}`))
}))
defer server.Close()
sender := NewSender("")
sender.apiURL = server.URL
_, err := sender.Send(context.Background(), []Message{
{To: "ExponentPushToken[test]", Title: "T", Body: "H"},
})
require.Error(t, err)
assert.Contains(t, err.Error(), "expo returned 503")
}
func TestNotifierNotifyAgentMessage_NilGuard(t *testing.T) {
// Must not panic when sender is nil.
n := NewNotifier(nil, nil)
// Should return immediately (nil check passes without panic).
n.NotifyAgentMessage(context.Background(), "ws-1", "Test", "Hello world")
}
func TestNotifierNotifyAgentMessage_ZeroTokens(t *testing.T) {
// Verify that NotifyAgentMessage does NOT panic when there are zero registered
// tokens — it should return early without calling sender.Send().
// Note: the fire-and-forget goroutine inside NotifyAgentMessage is not
// directly verifiable here without modifying production code; the key assertion
// is that no panic occurs and the method returns cleanly.
db, mock, err := sqlmock.New()
require.NoError(t, err)
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token", "platform", "created_at"}))
sender := NewSender("")
sender.apiURL = "http://127.0.0.1:1" // unreachable — would error if Send is called
n := NewNotifier(db, sender)
n.NotifyAgentMessage(context.Background(), "ws-1", "Test", "Hello")
// Give goroutine time to run GetTokens and exit early before closing DB.
time.Sleep(200 * time.Millisecond)
require.NoError(t, mock.ExpectationsWereMet())
db.Close()
}
func TestRepoGetTokens_DBError(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
WithArgs("ws-1").
WillReturnError(sql.ErrConnDone)
repo := NewRepo(db)
_, err = repo.GetTokens(context.Background(), "ws-1")
require.Error(t, err)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestRepoGetTokens_ScanError(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
// Return fewer columns than struct has — causes scan error.
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token"}). // missing platform, created_at
AddRow("1", "ws-1", "ExponentPushToken[a]"))
repo := NewRepo(db)
_, err = repo.GetTokens(context.Background(), "ws-1")
require.Error(t, err) // scan error
require.NoError(t, mock.ExpectationsWereMet())
}
func TestRepoSaveToken_Error(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("INSERT INTO push_tokens").
WithArgs("ws-1", "ExponentPushToken[xyz]", "android").
WillReturnError(sql.ErrConnDone)
repo := NewRepo(db)
err = repo.SaveToken(context.Background(), "ws-1", "ExponentPushToken[xyz]", "android")
require.Error(t, err)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestRepoDeleteToken_Error(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("DELETE FROM push_tokens").
WithArgs("ws-1", "ExponentPushToken[xyz]").
WillReturnError(sql.ErrConnDone)
repo := NewRepo(db)
err = repo.DeleteToken(context.Background(), "ws-1", "ExponentPushToken[xyz]")
require.Error(t, err)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestTruncate(t *testing.T) {
tests := []struct {
name string
s string
max int
want string
}{
{"short string unchanged", "hello", 10, "hello"},
{"exact length unchanged", "hello", 5, "hello"},
{"long string truncated", "hello world", 5, "hello…"},
{"empty string", "", 5, ""},
{"single char at max", "a", 1, "a"},
{"multi-byte truncation adds ellipsis", "こんにちは世界", 5, ""},
{"truncate with ellipsis ends with ellipsis", "hello world", 5, "hello…"},
{"truncate at 1 char", "hello", 1, "h…"},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := truncate(tc.s, tc.max)
if tc.want == "" {
// Multi-byte / edge cases: verify no expansion beyond max+3.
assert.True(t, len(got) <= tc.max+3)
} else {
assert.Equal(t, tc.want, got)
}
})
}
}
func TestRepoGetTokens(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token", "platform", "created_at"}).
AddRow("1", "ws-1", "ExponentPushToken[a]", "ios", "2026-01-01T00:00:00Z").
AddRow("2", "ws-1", "ExponentPushToken[b]", "android", "2026-01-01T00:00:00Z"))
repo := NewRepo(db)
tokens, err := repo.GetTokens(context.Background(), "ws-1")
require.NoError(t, err)
require.Len(t, tokens, 2)
assert.Equal(t, "ExponentPushToken[a]", tokens[0].Token)
assert.Equal(t, "ios", tokens[0].Platform)
assert.Equal(t, "ExponentPushToken[b]", tokens[1].Token)
require.NoError(t, mock.ExpectationsWereMet())
}
-76
View File
@@ -1,76 +0,0 @@
package push
import (
"context"
"database/sql"
"fmt"
)
// Token is one registered push token for a workspace.
type Token struct {
ID string
WorkspaceID string
Token string
Platform string
CreatedAt string
}
// Repo reads and writes push tokens in Postgres.
type Repo struct {
db *sql.DB
}
// NewRepo creates a token repository backed by db.
func NewRepo(db *sql.DB) *Repo {
return &Repo{db: db}
}
// SaveToken registers a push token for a workspace. If the same token already
// exists for the workspace, it updates the timestamp.
func (r *Repo) SaveToken(ctx context.Context, workspaceID, token, platform string) error {
_, err := r.db.ExecContext(ctx, `
INSERT INTO push_tokens (workspace_id, token, platform)
VALUES ($1, $2, $3)
ON CONFLICT (workspace_id, token) DO UPDATE
SET updated_at = now()
`, workspaceID, token, platform)
if err != nil {
return fmt.Errorf("push_tokens: save: %w", err)
}
return nil
}
// DeleteToken removes a push token. Returns nil even if the token did not exist.
func (r *Repo) DeleteToken(ctx context.Context, workspaceID, token string) error {
_, err := r.db.ExecContext(ctx, `
DELETE FROM push_tokens
WHERE workspace_id = $1 AND token = $2
`, workspaceID, token)
if err != nil {
return fmt.Errorf("push_tokens: delete: %w", err)
}
return nil
}
// GetTokens returns all active push tokens for a workspace.
func (r *Repo) GetTokens(ctx context.Context, workspaceID string) ([]Token, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, workspace_id, token, platform, created_at
FROM push_tokens
WHERE workspace_id = $1
`, workspaceID)
if err != nil {
return nil, fmt.Errorf("push_tokens: list: %w", err)
}
defer rows.Close()
var tokens []Token
for rows.Next() {
var t Token
if err := rows.Scan(&t.ID, &t.WorkspaceID, &t.Token, &t.Platform, &t.CreatedAt); err != nil {
return nil, fmt.Errorf("push_tokens: scan: %w", err)
}
tokens = append(tokens, t)
}
return tokens, rows.Err()
}
-104
View File
@@ -1,104 +0,0 @@
package push
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
const expoPushAPI = "https://exp.host/--/api/v2/push/send"
// Message is one Expo push notification.
type Message struct {
To string `json:"to"`
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
Data map[string]string `json:"data,omitempty"`
Sound string `json:"sound,omitempty"`
Priority string `json:"priority,omitempty"`
}
// Sender delivers push notifications via the Expo Push Service.
type Sender struct {
apiURL string
httpClient *http.Client
expoToken string // optional Expo access token for authenticated requests
}
// NewSender creates a Sender. expoToken may be empty for unauthenticated
// requests (sufficient for most use cases).
func NewSender(expoToken string) *Sender {
return &Sender{
apiURL: expoPushAPI,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
expoToken: expoToken,
}
}
// SendResult is the per-recipient status from Expo.
type SendResult struct {
Status string `json:"status"`
ID string `json:"id"`
Message string `json:"message,omitempty"`
Details struct {
Error string `json:"error,omitempty"`
} `json:"details,omitempty"`
}
// expoResponse is the wrapper shape returned by the Expo API.
type expoResponse struct {
Data []SendResult `json:"data"`
}
// Send fires a batch of push messages. It returns a slice of results in the
// same order as the input, plus an error only when the HTTP call itself fails.
// Callers should inspect each result's Status field for per-message errors
// (e.g. "DeviceNotRegistered" → token should be deleted).
func (s *Sender) Send(ctx context.Context, messages []Message) ([]SendResult, error) {
if len(messages) == 0 {
return nil, nil
}
body, err := json.Marshal(messages)
if err != nil {
return nil, fmt.Errorf("push: marshal: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.apiURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("push: new request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Accept-Encoding", "gzip, deflate")
if s.expoToken != "" {
req.Header.Set("Authorization", "Bearer "+s.expoToken)
}
res, err := s.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("push: post: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("push: expo returned %d", res.StatusCode)
}
var resp expoResponse
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
return nil, fmt.Errorf("push: decode: %w", err)
}
return resp.Data, nil
}
// ShouldRemoveToken reports whether a SendResult indicates the token is no
// longer valid and should be deleted from the database.
func ShouldRemoveToken(r SendResult) bool {
return r.Status == "error" && r.Details.Error == "DeviceNotRegistered"
}
+2 -15
View File
@@ -20,7 +20,6 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/docker/docker/client"
@@ -328,25 +327,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
{
// Push notifications (mobile)
var pushNotifier *push.Notifier
if expoToken := os.Getenv("EXPO_ACCESS_TOKEN"); expoToken != "" {
pushNotifier = push.NewNotifier(db.DB, push.NewSender(expoToken))
}
// Activity Logs
acth := handlers.NewActivityHandler(broadcaster, pushNotifier)
acth := handlers.NewActivityHandler(broadcaster)
wsAuth.GET("/activity", acth.List)
wsAuth.GET("/session-search", acth.SessionSearch)
wsAuth.POST("/activity", acth.Report)
wsAuth.POST("/notify", acth.Notify)
// Push token registration (mobile)
if pushNotifier != nil {
pushH := push.NewHandler(push.NewRepo(db.DB))
pushH.RegisterRoutes(wsAuth)
}
// Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue
// #3026). Server-side rendering of activity_logs rows into
// the canonical ChatMessage shape; storage is plugin-shaped
@@ -450,7 +437,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// opencode session cannot saturate the platform.
// C3: commit_memory/recall_memory with scope=GLOBAL → permission error;
// send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
mcpH := handlers.NewMCPHandler(db.DB, broadcaster, pushNotifier)
mcpH := handlers.NewMCPHandler(db.DB, broadcaster)
if memBundle != nil {
mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
@@ -1 +0,0 @@
DROP TABLE IF EXISTS push_tokens;
@@ -1,11 +0,0 @@
CREATE TABLE push_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
token TEXT NOT NULL,
platform TEXT NOT NULL CHECK (platform IN ('ios', 'android')),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(workspace_id, token)
);
CREATE INDEX idx_push_tokens_workspace ON push_tokens(workspace_id);