diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 0ae9f1b8..4f7cf98e 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -2,7 +2,9 @@ package handlers import ( "context" + "database/sql" "encoding/json" + "errors" "fmt" "log" "net/http" @@ -23,7 +25,7 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler { return &ActivityHandler{broadcaster: b} } -// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs= +// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id= // // since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'. // Optional, additive — callers that don't pass it get today's behavior (the @@ -33,12 +35,29 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler { // Capped at 30 days (2_592_000s) — anything older has typically been paged // out anyway, and a defensive ceiling keeps a paranoid client from triggering // a full-table scan via since_secs=99999999999. Closes #2268. +// +// since_id is a CURSOR for poll-mode workspaces (#2339 PR 3). The agent +// passes the id of the last activity_logs row it has consumed; the server +// returns rows STRICTLY AFTER that cursor in chronological (ASC) order so +// the agent processes events in the order they were recorded. Telegram +// getUpdates / Slack RTM shape — same proven pattern. +// +// Cross-workspace safety: the cursor lookup is scoped by workspace_id, so a +// caller cannot peek at another workspace's activity by guessing its UUIDs. +// +// Cursor-not-found: returns 410 Gone. The client should reset its cursor +// (omit since_id) and re-fetch the recent backlog. This avoids the silent +// loss-window where a pruned cursor silently filters everything out. +// +// since_id + since_secs together: both filters apply (AND). Output is ASC +// when since_id is set (polling order), DESC otherwise (recent feed order). func (h *ActivityHandler) List(c *gin.Context) { workspaceID := c.Param("id") activityType := c.Query("type") source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL limitStr := c.DefaultQuery("limit", "100") sinceSecsStr := c.Query("since_secs") + sinceID := c.Query("since_id") limit := 100 if n, err := strconv.Atoi(limitStr); err == nil && n > 0 { @@ -65,6 +84,37 @@ func (h *ActivityHandler) List(c *gin.Context) { sinceSecs = n } + // Resolve since_id cursor (if set) BEFORE building the main query so we + // can 410 cleanly when the cursor row is gone — and so the cursor's + // created_at is bound as a regular timestamp parameter (not a subquery) + // for clean sqlmock matching and to keep the planner predictable. + // + // The lookup is scoped by workspace_id: a caller cannot enumerate or + // peek at another workspace's events by passing a UUID belonging to a + // different workspace. Mismatched-workspace cursor → 410, same as + // "row not found" — both indicate the cursor is no longer usable for + // this caller, no information leak. + var cursorTime time.Time + usingCursor := false + if sinceID != "" { + err := db.DB.QueryRowContext(c.Request.Context(), + `SELECT created_at FROM activity_logs WHERE id = $1 AND workspace_id = $2`, + sinceID, workspaceID, + ).Scan(&cursorTime) + if errors.Is(err, sql.ErrNoRows) { + c.JSON(http.StatusGone, gin.H{ + "error": "since_id cursor not found (row may have been pruned or belongs to a different workspace); omit since_id to reset", + }) + return + } + if err != nil { + log.Printf("Activity since_id cursor lookup error for ws=%s id=%s: %v", workspaceID, sinceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "cursor lookup failed"}) + return + } + usingCursor = true + } + // Build query with optional filters query := `SELECT id, workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at @@ -94,8 +144,22 @@ func (h *ActivityHandler) List(c *gin.Context) { args = append(args, sinceSecs) argIdx++ } + if usingCursor { + // Strictly after — never replay the cursor row itself. + query += fmt.Sprintf(" AND created_at > $%d", argIdx) + args = append(args, cursorTime) + argIdx++ + } - query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx) + // Polling clients (since_id) need oldest-first within the new window so + // they process events in recorded order. The recent-feed view (no + // since_id) keeps DESC — that's the canvas/UI shape and changing it + // would surprise existing callers. + if usingCursor { + query += fmt.Sprintf(" ORDER BY created_at ASC LIMIT $%d", argIdx) + } else { + query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx) + } args = append(args, limit) rows, err := db.DB.QueryContext(c.Request.Context(), query, args...) diff --git a/workspace-server/internal/handlers/activity_since_id_test.go b/workspace-server/internal/handlers/activity_since_id_test.go new file mode 100644 index 00000000..6c2dc53f --- /dev/null +++ b/workspace-server/internal/handlers/activity_since_id_test.go @@ -0,0 +1,156 @@ +package handlers + +import ( + "database/sql" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// Tests for the since_id cursor on GET /workspaces/:id/activity (#2339 PR 3). +// +// Cursor shape: Telegram getUpdates / Slack RTM. The polling agent passes +// the id of the last activity_logs row it processed; the server returns +// rows STRICTLY AFTER that cursor in ASC order. Cross-workspace lookups +// return 410 to prevent UUID-guessing peeks at other workspaces' events. + +// TestActivityHandler_SinceID_ReturnsNewerASC: with a valid cursor the +// handler does the cursor lookup, then queries with the cursor's +// created_at as a > filter and ASC ordering — the polling shape. +func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) { + mock := setupTestDB(t) + + cursorID := "act-cursor-42" + cursorTime := time.Date(2026, 4, 30, 5, 0, 0, 0, time.UTC) + + // Step 1: cursor lookup — must include workspace_id scope so a UUID + // from another workspace can't be used. + mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + WithArgs(cursorID, "ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime)) + + // Step 2: main query with the cursor's created_at as a > filter, + // ASC ordering. Args: workspace_id, cursorTime, limit. + mock.ExpectQuery("SELECT id, workspace_id, activity_type"). + WithArgs("ws-1", cursorTime, 100). + WillReturnRows(newActivityRows()) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?since_id="+cursorID, nil) + + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestActivityHandler_SinceID_CursorNotFound_410: cursor row doesn't exist +// (pruned, never existed, or wrong UUID). Server returns 410 Gone so the +// client knows to reset its cursor — silent empty results would cause a +// stuck-poll bug where the agent never sees new events. +func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + WithArgs("act-gone", "ws-1"). + WillReturnError(sql.ErrNoRows) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?since_id=act-gone", nil) + + handler.List(c) + + if w.Code != http.StatusGone { + t.Fatalf("expected 410, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestActivityHandler_SinceID_CrossWorkspaceCursor_410: a caller passes a +// UUID that belongs to a different workspace. The cursor lookup is scoped +// by workspace_id so the row is "not found" from this caller's perspective — +// same 410 path as the pruned case. No information leak (caller cannot tell +// whether the UUID belongs to nobody or to another workspace). +func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) { + mock := setupTestDB(t) + + // Cursor exists in DB but the WHERE workspace_id = $2 filter excludes + // it — sqlmock returns no rows, which is what Postgres would do. + mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + WithArgs("act-other-ws", "ws-1"). + WillReturnError(sql.ErrNoRows) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?since_id=act-other-ws", nil) + + handler.List(c) + + if w.Code != http.StatusGone { + t.Fatalf("cross-workspace cursor: expected 410, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestActivityHandler_SinceID_CombinedWithSinceSecs: both filters apply +// together (AND). Argument order in the main query: workspace_id, +// since_secs, cursorTime, limit. Sanity-checks the placeholder index +// arithmetic in the query builder. +func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) { + mock := setupTestDB(t) + + cursorID := "act-c" + cursorTime := time.Date(2026, 4, 30, 4, 0, 0, 0, time.UTC) + + mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + WithArgs(cursorID, "ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime)) + + mock.ExpectQuery("SELECT id, workspace_id, activity_type"). + WithArgs("ws-1", 600, cursorTime, 100). + WillReturnRows(newActivityRows()) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", + "/workspaces/ws-1/activity?since_secs=600&since_id="+cursorID, nil) + + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +}