forked from molecule-ai/molecule-core
Merge pull request #2354 from Molecule-AI/auto/issue-2339-pr3-activity-cursor
feat(activity): since_id cursor on GET /activity (#2339 PR 3)
This commit is contained in:
commit
b5bde0399a
@ -2,7 +2,9 @@ package handlers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -23,7 +25,7 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
|
|||||||
return &ActivityHandler{broadcaster: b}
|
return &ActivityHandler{broadcaster: b}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=
|
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
|
||||||
//
|
//
|
||||||
// since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'.
|
// since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'.
|
||||||
// Optional, additive — callers that don't pass it get today's behavior (the
|
// Optional, additive — callers that don't pass it get today's behavior (the
|
||||||
@ -33,12 +35,29 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
|
|||||||
// Capped at 30 days (2_592_000s) — anything older has typically been paged
|
// Capped at 30 days (2_592_000s) — anything older has typically been paged
|
||||||
// out anyway, and a defensive ceiling keeps a paranoid client from triggering
|
// out anyway, and a defensive ceiling keeps a paranoid client from triggering
|
||||||
// a full-table scan via since_secs=99999999999. Closes #2268.
|
// a full-table scan via since_secs=99999999999. Closes #2268.
|
||||||
|
//
|
||||||
|
// since_id is a CURSOR for poll-mode workspaces (#2339 PR 3). The agent
|
||||||
|
// passes the id of the last activity_logs row it has consumed; the server
|
||||||
|
// returns rows STRICTLY AFTER that cursor in chronological (ASC) order so
|
||||||
|
// the agent processes events in the order they were recorded. Telegram
|
||||||
|
// getUpdates / Slack RTM shape — same proven pattern.
|
||||||
|
//
|
||||||
|
// Cross-workspace safety: the cursor lookup is scoped by workspace_id, so a
|
||||||
|
// caller cannot peek at another workspace's activity by guessing its UUIDs.
|
||||||
|
//
|
||||||
|
// Cursor-not-found: returns 410 Gone. The client should reset its cursor
|
||||||
|
// (omit since_id) and re-fetch the recent backlog. This avoids the silent
|
||||||
|
// loss-window where a pruned cursor silently filters everything out.
|
||||||
|
//
|
||||||
|
// since_id + since_secs together: both filters apply (AND). Output is ASC
|
||||||
|
// when since_id is set (polling order), DESC otherwise (recent feed order).
|
||||||
func (h *ActivityHandler) List(c *gin.Context) {
|
func (h *ActivityHandler) List(c *gin.Context) {
|
||||||
workspaceID := c.Param("id")
|
workspaceID := c.Param("id")
|
||||||
activityType := c.Query("type")
|
activityType := c.Query("type")
|
||||||
source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL
|
source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL
|
||||||
limitStr := c.DefaultQuery("limit", "100")
|
limitStr := c.DefaultQuery("limit", "100")
|
||||||
sinceSecsStr := c.Query("since_secs")
|
sinceSecsStr := c.Query("since_secs")
|
||||||
|
sinceID := c.Query("since_id")
|
||||||
|
|
||||||
limit := 100
|
limit := 100
|
||||||
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
|
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
|
||||||
@ -65,6 +84,37 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
|||||||
sinceSecs = n
|
sinceSecs = n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Resolve since_id cursor (if set) BEFORE building the main query so we
|
||||||
|
// can 410 cleanly when the cursor row is gone — and so the cursor's
|
||||||
|
// created_at is bound as a regular timestamp parameter (not a subquery)
|
||||||
|
// for clean sqlmock matching and to keep the planner predictable.
|
||||||
|
//
|
||||||
|
// The lookup is scoped by workspace_id: a caller cannot enumerate or
|
||||||
|
// peek at another workspace's events by passing a UUID belonging to a
|
||||||
|
// different workspace. Mismatched-workspace cursor → 410, same as
|
||||||
|
// "row not found" — both indicate the cursor is no longer usable for
|
||||||
|
// this caller, no information leak.
|
||||||
|
var cursorTime time.Time
|
||||||
|
usingCursor := false
|
||||||
|
if sinceID != "" {
|
||||||
|
err := db.DB.QueryRowContext(c.Request.Context(),
|
||||||
|
`SELECT created_at FROM activity_logs WHERE id = $1 AND workspace_id = $2`,
|
||||||
|
sinceID, workspaceID,
|
||||||
|
).Scan(&cursorTime)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
c.JSON(http.StatusGone, gin.H{
|
||||||
|
"error": "since_id cursor not found (row may have been pruned or belongs to a different workspace); omit since_id to reset",
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Activity since_id cursor lookup error for ws=%s id=%s: %v", workspaceID, sinceID, err)
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "cursor lookup failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
usingCursor = true
|
||||||
|
}
|
||||||
|
|
||||||
// Build query with optional filters
|
// Build query with optional filters
|
||||||
query := `SELECT id, workspace_id, activity_type, source_id, target_id, method,
|
query := `SELECT id, workspace_id, activity_type, source_id, target_id, method,
|
||||||
summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at
|
summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at
|
||||||
@ -94,8 +144,22 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
|||||||
args = append(args, sinceSecs)
|
args = append(args, sinceSecs)
|
||||||
argIdx++
|
argIdx++
|
||||||
}
|
}
|
||||||
|
if usingCursor {
|
||||||
|
// Strictly after — never replay the cursor row itself.
|
||||||
|
query += fmt.Sprintf(" AND created_at > $%d", argIdx)
|
||||||
|
args = append(args, cursorTime)
|
||||||
|
argIdx++
|
||||||
|
}
|
||||||
|
|
||||||
query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx)
|
// Polling clients (since_id) need oldest-first within the new window so
|
||||||
|
// they process events in recorded order. The recent-feed view (no
|
||||||
|
// since_id) keeps DESC — that's the canvas/UI shape and changing it
|
||||||
|
// would surprise existing callers.
|
||||||
|
if usingCursor {
|
||||||
|
query += fmt.Sprintf(" ORDER BY created_at ASC LIMIT $%d", argIdx)
|
||||||
|
} else {
|
||||||
|
query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx)
|
||||||
|
}
|
||||||
args = append(args, limit)
|
args = append(args, limit)
|
||||||
|
|
||||||
rows, err := db.DB.QueryContext(c.Request.Context(), query, args...)
|
rows, err := db.DB.QueryContext(c.Request.Context(), query, args...)
|
||||||
|
|||||||
156
workspace-server/internal/handlers/activity_since_id_test.go
Normal file
156
workspace-server/internal/handlers/activity_since_id_test.go
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tests for the since_id cursor on GET /workspaces/:id/activity (#2339 PR 3).
|
||||||
|
//
|
||||||
|
// Cursor shape: Telegram getUpdates / Slack RTM. The polling agent passes
|
||||||
|
// the id of the last activity_logs row it processed; the server returns
|
||||||
|
// rows STRICTLY AFTER that cursor in ASC order. Cross-workspace lookups
|
||||||
|
// return 410 to prevent UUID-guessing peeks at other workspaces' events.
|
||||||
|
|
||||||
|
// TestActivityHandler_SinceID_ReturnsNewerASC: with a valid cursor the
|
||||||
|
// handler does the cursor lookup, then queries with the cursor's
|
||||||
|
// created_at as a > filter and ASC ordering — the polling shape.
|
||||||
|
func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
|
||||||
|
mock := setupTestDB(t)
|
||||||
|
|
||||||
|
cursorID := "act-cursor-42"
|
||||||
|
cursorTime := time.Date(2026, 4, 30, 5, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
// Step 1: cursor lookup — must include workspace_id scope so a UUID
|
||||||
|
// from another workspace can't be used.
|
||||||
|
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||||
|
WithArgs(cursorID, "ws-1").
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime))
|
||||||
|
|
||||||
|
// Step 2: main query with the cursor's created_at as a > filter,
|
||||||
|
// ASC ordering. Args: workspace_id, cursorTime, limit.
|
||||||
|
mock.ExpectQuery("SELECT id, workspace_id, activity_type").
|
||||||
|
WithArgs("ws-1", cursorTime, 100).
|
||||||
|
WillReturnRows(newActivityRows())
|
||||||
|
|
||||||
|
broadcaster := newTestBroadcaster()
|
||||||
|
handler := NewActivityHandler(broadcaster)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(w)
|
||||||
|
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||||
|
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?since_id="+cursorID, nil)
|
||||||
|
|
||||||
|
handler.List(c)
|
||||||
|
|
||||||
|
if w.Code != http.StatusOK {
|
||||||
|
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestActivityHandler_SinceID_CursorNotFound_410: cursor row doesn't exist
|
||||||
|
// (pruned, never existed, or wrong UUID). Server returns 410 Gone so the
|
||||||
|
// client knows to reset its cursor — silent empty results would cause a
|
||||||
|
// stuck-poll bug where the agent never sees new events.
|
||||||
|
func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
|
||||||
|
mock := setupTestDB(t)
|
||||||
|
|
||||||
|
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||||
|
WithArgs("act-gone", "ws-1").
|
||||||
|
WillReturnError(sql.ErrNoRows)
|
||||||
|
|
||||||
|
broadcaster := newTestBroadcaster()
|
||||||
|
handler := NewActivityHandler(broadcaster)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(w)
|
||||||
|
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||||
|
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?since_id=act-gone", nil)
|
||||||
|
|
||||||
|
handler.List(c)
|
||||||
|
|
||||||
|
if w.Code != http.StatusGone {
|
||||||
|
t.Fatalf("expected 410, got %d: %s", w.Code, w.Body.String())
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestActivityHandler_SinceID_CrossWorkspaceCursor_410: a caller passes a
|
||||||
|
// UUID that belongs to a different workspace. The cursor lookup is scoped
|
||||||
|
// by workspace_id so the row is "not found" from this caller's perspective —
|
||||||
|
// same 410 path as the pruned case. No information leak (caller cannot tell
|
||||||
|
// whether the UUID belongs to nobody or to another workspace).
|
||||||
|
func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
|
||||||
|
mock := setupTestDB(t)
|
||||||
|
|
||||||
|
// Cursor exists in DB but the WHERE workspace_id = $2 filter excludes
|
||||||
|
// it — sqlmock returns no rows, which is what Postgres would do.
|
||||||
|
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||||
|
WithArgs("act-other-ws", "ws-1").
|
||||||
|
WillReturnError(sql.ErrNoRows)
|
||||||
|
|
||||||
|
broadcaster := newTestBroadcaster()
|
||||||
|
handler := NewActivityHandler(broadcaster)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(w)
|
||||||
|
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||||
|
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?since_id=act-other-ws", nil)
|
||||||
|
|
||||||
|
handler.List(c)
|
||||||
|
|
||||||
|
if w.Code != http.StatusGone {
|
||||||
|
t.Fatalf("cross-workspace cursor: expected 410, got %d: %s", w.Code, w.Body.String())
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestActivityHandler_SinceID_CombinedWithSinceSecs: both filters apply
|
||||||
|
// together (AND). Argument order in the main query: workspace_id,
|
||||||
|
// since_secs, cursorTime, limit. Sanity-checks the placeholder index
|
||||||
|
// arithmetic in the query builder.
|
||||||
|
func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
|
||||||
|
mock := setupTestDB(t)
|
||||||
|
|
||||||
|
cursorID := "act-c"
|
||||||
|
cursorTime := time.Date(2026, 4, 30, 4, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||||
|
WithArgs(cursorID, "ws-1").
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime))
|
||||||
|
|
||||||
|
mock.ExpectQuery("SELECT id, workspace_id, activity_type").
|
||||||
|
WithArgs("ws-1", 600, cursorTime, 100).
|
||||||
|
WillReturnRows(newActivityRows())
|
||||||
|
|
||||||
|
broadcaster := newTestBroadcaster()
|
||||||
|
handler := NewActivityHandler(broadcaster)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(w)
|
||||||
|
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||||
|
c.Request = httptest.NewRequest("GET",
|
||||||
|
"/workspaces/ws-1/activity?since_secs=600&since_id="+cursorID, nil)
|
||||||
|
|
||||||
|
handler.List(c)
|
||||||
|
|
||||||
|
if w.Code != http.StatusOK {
|
||||||
|
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user