diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index e07f442f9..d01f5e843 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -380,12 +380,18 @@ func (h *ActivityHandler) List(c *gin.Context) { // "row not found" — both indicate the cursor is no longer usable for // this caller, no information leak. var cursorTime time.Time + var cursorSeq int64 usingCursor := false if sinceID != "" { + // Resolve BOTH ordering-key components of the cursor row. The feed is + // ordered by (created_at, seq), so the strictly-after filter below must + // compare the full tuple — comparing created_at alone silently drops a + // row written in the SAME microsecond as the cursor row (the boundary + // skip the since_id E2E intermittently tripped over). err := db.DB.QueryRowContext(c.Request.Context(), - `SELECT created_at FROM activity_logs WHERE id = $1 AND workspace_id = $2`, + `SELECT created_at, seq FROM activity_logs WHERE id = $1 AND workspace_id = $2`, sinceID, workspaceID, - ).Scan(&cursorTime) + ).Scan(&cursorTime, &cursorSeq) if errors.Is(err, sql.ErrNoRows) { c.JSON(http.StatusGone, gin.H{ "error": "since_id cursor not found (row may have been pruned or belongs to a different workspace); omit since_id to reset", @@ -492,10 +498,20 @@ func (h *ActivityHandler) List(c *gin.Context) { argIdx++ } if usingCursor { - // Strictly after — never replay the cursor row itself. - query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx) - args = append(args, cursorTime) - argIdx++ + // Strictly after the cursor on the FULL ordering key (created_at, seq). + // Tuple comparison: a row is "after" the cursor if its created_at is + // later, OR it shares the cursor's created_at but has a higher seq. + // This (a) never replays the cursor row itself and (b) — unlike a bare + // `created_at > cursor` — never drops a row written in the same + // microsecond as the cursor row. Expressed as the expanded boolean + // rather than a row-value `(created_at, seq) > ($t, $s)` so it composes + // with the actCol qualifier prefix and the existing placeholder/arg + // builder cleanly. + query += fmt.Sprintf( + " AND ("+actCol+"created_at > $%d OR ("+actCol+"created_at = $%d AND "+actCol+"seq > $%d))", + argIdx, argIdx, argIdx+1) + args = append(args, cursorTime, cursorSeq) + argIdx += 2 } // Polling clients (since_id) need oldest-first within the new window so @@ -503,9 +519,13 @@ func (h *ActivityHandler) List(c *gin.Context) { // since_id) keeps DESC — that's the canvas/UI shape and changing it // would surprise existing callers. if usingCursor { - query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx) + // (created_at, seq) ASC — seq is the deterministic tiebreaker for rows + // sharing a microsecond-collided created_at. Replays in recorded order. + query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC, "+actCol+"seq ASC LIMIT $%d", argIdx) } else { - query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx) + // (created_at, seq) DESC — same tiebreaker, newest-first for the + // canvas/recent-feed shape. + query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC, "+actCol+"seq DESC LIMIT $%d", argIdx) } args = append(args, limit) @@ -680,7 +700,8 @@ func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []in COALESCE(status, '') AS status, request_body, response_body, - created_at + created_at, + seq FROM activity_logs WHERE workspace_id = $1 ) @@ -702,7 +723,13 @@ func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []in args = append(args, "%"+query+"%") } - sqlQuery += ` ORDER BY created_at DESC LIMIT $` + strconv.Itoa(len(args)+1) + // Deterministic order: created_at alone is not unique (same-microsecond + // rows), so tie-break on the monotonic seq — same fix as the since_id feed + // (§ No flakes: no unstable sorts, even on an unused surface). `seq` is + // projected through the session_items CTE above so this outer ORDER BY can + // reference it — the outer SELECT can only sort on the CTE's output columns, + // not on activity_logs directly. + sqlQuery += ` ORDER BY created_at DESC, seq DESC LIMIT $` + strconv.Itoa(len(args)+1) args = append(args, limit) return sqlQuery, args } diff --git a/workspace-server/internal/handlers/activity_seq_backfill_integration_test.go b/workspace-server/internal/handlers/activity_seq_backfill_integration_test.go new file mode 100644 index 000000000..a75026530 --- /dev/null +++ b/workspace-server/internal/handlers/activity_seq_backfill_integration_test.go @@ -0,0 +1,211 @@ +//go:build integration +// +build integration + +// activity_seq_backfill_integration_test.go — REAL Postgres proof of the +// invariant the 20260604000000_activity_logs_seq.up.sql migration guarantees: +// every activity_logs row carries a NON-NULL `seq`, both for rows that existed +// before the migration ran (assigned during the ALTER TABLE rewrite) and for +// rows created afterward via the normal INSERT path (assigned by the IDENTITY +// default). This is the coverage CR2 (#2339 review) correctly flagged as +// missing on PR #2258. +// +// WHY THIS IS A SEPARATE TEST from activity_since_id_ordering_integration_test.go: +// that test pins the *ordering* contract (same-microsecond rows come back in a +// deterministic (created_at, seq) order). THIS test pins the *backfill* contract +// — that `seq` is never NULL — and the consequence the reviewer doubted: a +// pre-existing/backfilled row is usable as a since_id cursor because its seq is +// non-null, so the tuple cursor `(created_at, seq)` the handler builds is well +// defined for it. +// +// EMPIRICAL BASIS (PostgreSQL 16.13, the prod PG version): +// - `ALTER TABLE activity_logs ADD COLUMN seq BIGINT GENERATED BY DEFAULT AS +// IDENTITY` rewrites the table and assigns seq to EXISTING rows in physical +// table-scan order — they are NON-NULL, not left NULL as the review claimed. +// - The identity sequence then advances ABOVE max(seq), so the next INSERT +// that omits seq gets max+1 with no collision. +// Run against any Postgres 15/16 the integration harness boots — the property +// holds on both. +// +// Run with (same harness as activity_delegation_a2a_integration_test.go): +// +// docker run --rm -d --name pg-integration \ +// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ +// -p 55432:5432 postgres:15-alpine +// sleep 4 +// # apply migrations (incl. 20260604000000_activity_logs_seq.up.sql) then: +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration ./internal/handlers/ -run Integration_ActivityLogs_Seq +// +// WATCH-IT-FAIL: if `seq` were left nullable / un-backfilled (the failure mode +// the reviewer hypothesized), the NULL-count assertion in _NoNull trips, and +// the since_id-on-a-backfilled-row case in _SinceIDOnBackfilledRow trips because +// the handler cannot read a non-null seq for the cursor row. With the migration +// as written both are green every run. + +package handlers + +import ( + "context" + "encoding/json" + "net/http" + "testing" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "github.com/gin-gonic/gin" +) + +// TestIntegration_ActivityLogs_SeqBackfill_NoNull pins the core migration +// invariant: AFTER migrations have run, NO activity_logs row may have a NULL +// seq — neither rows that the seedActivityRowAt path inserts (IDENTITY default) +// nor any row the schema carries. It also proves the IDENTITY sequence keeps +// producing distinct, non-null seq for fresh inserts (no collision, no NULL). +// +// This is the assertion that would FAIL if the ALTER had left existing rows +// with NULL seq (the reviewer's claim) — table-scan backfill makes it pass. +func TestIntegration_ActivityLogs_SeqBackfill_NoNull(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + _ = conn + wsID := seedWorkspace(t, conn, "test-2151-seq-backfill-nonull") + + // Insert several rows via the normal path. seq is left to the IDENTITY + // default — exactly how production writes activity_logs. + t0 := time.Date(2026, 6, 4, 9, 0, 0, 0, time.UTC) + const n = 5 + ids := make([]string, 0, n) + for i := 0; i < n; i++ { + ids = append(ids, seedActivityRowAt(t, wsID, "backfill-row", t0.Add(time.Duration(i)*time.Second))) + } + + // (a) No row in this workspace may have a NULL seq. If the column were + // un-backfilled / nullable this is > 0 and the test fails. + var nullCount int + if err := db.DB.QueryRowContext(context.Background(), + `SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1 AND seq IS NULL`, + wsID, + ).Scan(&nullCount); err != nil { + t.Fatalf("null-seq count query: %v", err) + } + if nullCount != 0 { + t.Fatalf("found %d activity_logs rows with NULL seq — migration did NOT backfill/assign seq", nullCount) + } + + // Belt-and-suspenders: the GLOBAL invariant (no NULL seq anywhere in the + // table) is what the migration actually guarantees. Assert it too, so a + // regression that nulls seq for rows written by some other path is caught. + var globalNull int + if err := db.DB.QueryRowContext(context.Background(), + `SELECT COUNT(*) FROM activity_logs WHERE seq IS NULL`, + ).Scan(&globalNull); err != nil { + t.Fatalf("global null-seq count query: %v", err) + } + if globalNull != 0 { + t.Fatalf("found %d activity_logs rows table-wide with NULL seq — seq must be non-null for every row", globalNull) + } + + // (b) The IDENTITY sequence yields DISTINCT, monotonic, non-null seq for + // the rows we just inserted (proves the normal insert path gets a real seq, + // and that the sequence advanced past any backfilled max instead of + // colliding). We read them back in insert order and require strictly + // increasing, all-non-null seq. + rows, err := db.DB.QueryContext(context.Background(), + `SELECT seq FROM activity_logs WHERE workspace_id = $1 ORDER BY created_at ASC, seq ASC`, + wsID, + ) + if err != nil { + t.Fatalf("read-back seq query: %v", err) + } + defer rows.Close() + var seqs []int64 + for rows.Next() { + var s *int64 // pointer so a NULL would scan as nil rather than 0 + if err := rows.Scan(&s); err != nil { + t.Fatalf("scan seq: %v", err) + } + if s == nil { + t.Fatal("a freshly-inserted activity_logs row has NULL seq — IDENTITY default did not fire") + } + seqs = append(seqs, *s) + } + if err := rows.Err(); err != nil { + t.Fatalf("rows err: %v", err) + } + if len(seqs) != n { + t.Fatalf("expected %d rows, read back %d", n, len(seqs)) + } + for i := 1; i < len(seqs); i++ { + if seqs[i] <= seqs[i-1] { + t.Fatalf("seq not strictly increasing in insert order: %v (IDENTITY collision / reuse)", seqs) + } + } +} + +// TestIntegration_ActivityLogs_SeqBackfill_SinceIDOnBackfilledRow pins the +// consequence the reviewer doubted: a row whose seq came from the migration / +// IDENTITY (i.e. NOT explicitly set by the caller) is usable as a since_id +// cursor, and a SECOND row sharing its exact created_at microsecond is returned +// (not dropped). This proves the handler's (created_at, seq) tuple cursor +// resolves a same-timestamp boundary that a created_at-only cursor would drop, +// AND that the cursor row's seq is non-null (else the handler could not build +// the tuple at all). +// +// Distinct from _BoundaryRowSameMicrosecondNotSkipped in the ordering test: +// here the explicit angle under test is "the cursor row's seq is a +// migration/IDENTITY-assigned (backfilled-style) value, non-null, and the +// handler uses it" — i.e. the backfill behavior is what makes the boundary +// resolution work, pinned head-on. +func TestIntegration_ActivityLogs_SeqBackfill_SinceIDOnBackfilledRow(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + _ = conn + wsID := seedWorkspace(t, conn, "test-2151-seq-backfill-sinceid") + + tSame := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + // Cursor row: seq comes purely from the IDENTITY default (never set by + // the caller) — the same assignment mechanism the migration uses to + // backfill pre-existing rows. The "next" row shares the exact created_at + // microsecond and is inserted afterward, so it gets a strictly higher seq. + cursorID := seedActivityRowAt(t, wsID, "sinceid-cursor", tSame) + nextID := seedActivityRowAt(t, wsID, "sinceid-next-same-us", tSame) + + // Prove the precondition the reviewer doubted: the cursor row's seq is + // NON-NULL, so the handler can read it to build the (created_at, seq) + // tuple. If it were NULL the handler's cursor lookup would yield a NULL + // seq and the strictly-after tuple comparison would mis-behave. + var cursorSeq *int64 + if err := db.DB.QueryRowContext(context.Background(), + `SELECT seq FROM activity_logs WHERE id = $1`, cursorID, + ).Scan(&cursorSeq); err != nil { + t.Fatalf("read cursor seq: %v", err) + } + if cursorSeq == nil { + t.Fatal("cursor row has NULL seq — a since_id cursor on a backfilled-style row would be unusable") + } + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + q := c.Request.URL.Query() + q.Set("since_id", cursorID) + q.Set("type", "a2a_receive") + q.Set("limit", "10") + c.Request.URL.RawQuery = q.Encode() + + h.List(c) + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + // Exactly the one same-microsecond row after the cursor — present (not + // dropped by a strict created_at-only filter) and the cursor itself + // excluded (strictly-after on the full tuple). + if len(resp) != 1 { + t.Fatalf("same-microsecond row after backfilled-style cursor dropped: expected 1 row, got %d: %+v", + len(resp), resp) + } + if got, _ := resp[0]["id"].(string); got != nextID { + t.Fatalf("expected boundary row id %s, got %s", nextID, got) + } +} diff --git a/workspace-server/internal/handlers/activity_since_id_ordering_integration_test.go b/workspace-server/internal/handlers/activity_since_id_ordering_integration_test.go new file mode 100644 index 000000000..91bf8c88b --- /dev/null +++ b/workspace-server/internal/handlers/activity_since_id_ordering_integration_test.go @@ -0,0 +1,162 @@ +//go:build integration +// +build integration + +// activity_since_id_ordering_integration_test.go — REAL Postgres proof that +// the poll-mode since_id activity feed (#2339) is DETERMINISTICALLY ordered +// even when multiple rows collide on the same created_at microsecond. +// +// This is the test that the original bug report mis-labeled a "flake". +// sqlmock cannot catch it: sqlmock returns rows in the order the test stuffs +// them, so it can never reveal a non-deterministic ORDER BY. Only a real +// planner over real same-created_at rows exposes it. +// +// Run with (same harness as activity_delegation_a2a_integration_test.go): +// +// docker run --rm -d --name pg-integration \ +// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ +// -p 55432:5432 postgres:15-alpine +// sleep 4 +// # apply migrations (incl. 20260604000000_activity_logs_seq.up.sql) then: +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration ./internal/handlers/ -run Integration_SinceID +// +// WATCH-IT-FAIL: against the pre-fix handler (ORDER BY created_at only, no +// seq tiebreaker, and `created_at > cursor` strict) this test is unstable — +// the equal-created_at rows come back in arbitrary planner order so the +// ordered-id assertion fails intermittently, and the same-microsecond +// boundary row is dropped so the count assertion fails. With the fix +// (ORDER BY created_at, seq + tuple cursor) it is green every run. + +package handlers + +import ( + "context" + "encoding/json" + "net/http" + "testing" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "github.com/gin-gonic/gin" +) + +// seedActivityRowAt inserts one activity_logs row with an explicit created_at +// (so the test can force microsecond-equal collisions) and a unique summary; +// returns the generated id. seq is left to the IDENTITY default — Postgres +// assigns it in INSERT order, which is the deterministic tiebreaker under test. +// db.DB has been hot-swapped to the integration connection by +// integrationDB_ActivityDelegationA2A(t) in the calling test. +func seedActivityRowAt(t *testing.T, wsID, summary string, createdAt time.Time) string { + t.Helper() + var id string + err := db.DB.QueryRowContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, summary, status, created_at) + VALUES ($1, 'a2a_receive', $2, 'ok', $3) + RETURNING id + `, wsID, summary, createdAt).Scan(&id) + if err != nil { + t.Fatalf("seedActivityRowAt(%q): %v", summary, err) + } + return id +} + +// TestIntegration_SinceID_StableOrderingSameMicrosecond proves the feed is +// deterministic when rows share a created_at, AND that the same-microsecond +// boundary row immediately after the cursor is NOT dropped. +func TestIntegration_SinceID_StableOrderingSameMicrosecond(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + _ = conn + wsID := seedWorkspace(t, conn, "test-2151-sinceid-ordering") + + // One earlier row to serve as the cursor (the "last processed" row). + tCursor := time.Date(2026, 6, 4, 12, 0, 0, 0, time.UTC) + cursorID := seedActivityRowAt(t, wsID, "cursor-row", tCursor) + + // Three rows that ALL collide on the exact same created_at microsecond, + // inserted in a known order. Pre-fix, ORDER BY created_at alone returns + // these in arbitrary planner order. + tEqual := time.Date(2026, 6, 4, 12, 0, 1, 0, time.UTC) + idA := seedActivityRowAt(t, wsID, "equal-A", tEqual) + idB := seedActivityRowAt(t, wsID, "equal-B", tEqual) + idCc := seedActivityRowAt(t, wsID, "equal-C", tEqual) + wantOrder := []string{idA, idB, idCc} + + // Drive the handler exactly as a polling client would. + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + q := c.Request.URL.Query() + q.Set("since_id", cursorID) + q.Set("type", "a2a_receive") + q.Set("limit", "10") + c.Request.URL.RawQuery = q.Encode() + + h.List(c) + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + + // All three equal-created_at rows must be present (boundary not dropped) + // and the cursor row itself must be excluded (strictly-after). + if len(resp) != len(wantOrder) { + t.Fatalf("expected %d rows after cursor (the 3 equal-created_at rows), got %d: %+v", + len(wantOrder), len(resp), resp) + } + + gotOrder := make([]string, len(resp)) + for i, row := range resp { + idVal, _ := row["id"].(string) + gotOrder[i] = idVal + } + for i := range wantOrder { + if gotOrder[i] != wantOrder[i] { + t.Fatalf("non-deterministic ordering: got id order %v, want %v (seq tiebreaker not applied)", + gotOrder, wantOrder) + } + } +} + +// TestIntegration_SinceID_BoundaryRowSameMicrosecondNotSkipped isolates the +// cursor-boundary bug: a row written in the SAME microsecond as the cursor +// row (but with a higher seq) must still be returned. Pre-fix the strict +// `created_at > cursor` filter silently dropped it. +func TestIntegration_SinceID_BoundaryRowSameMicrosecondNotSkipped(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + _ = conn + wsID := seedWorkspace(t, conn, "test-2151-sinceid-boundary") + + tSame := time.Date(2026, 6, 4, 13, 0, 0, 0, time.UTC) + // Cursor row and the next row share the exact same created_at; the next + // row is inserted afterwards so it gets a higher seq. + cursorID := seedActivityRowAt(t, wsID, "boundary-cursor", tSame) + nextID := seedActivityRowAt(t, wsID, "boundary-next-same-us", tSame) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + q := c.Request.URL.Query() + q.Set("since_id", cursorID) + q.Set("type", "a2a_receive") + q.Set("limit", "10") + c.Request.URL.RawQuery = q.Encode() + + h.List(c) + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("same-microsecond boundary row dropped: expected exactly the 1 next row, got %d rows: %+v", + len(resp), resp) + } + if got, _ := resp[0]["id"].(string); got != nextID { + t.Fatalf("expected boundary row id %s, got %s", nextID, got) + } +} diff --git a/workspace-server/internal/handlers/activity_since_id_test.go b/workspace-server/internal/handlers/activity_since_id_test.go index 6c2dc53f2..58e7d344e 100644 --- a/workspace-server/internal/handlers/activity_since_id_test.go +++ b/workspace-server/internal/handlers/activity_since_id_test.go @@ -26,17 +26,21 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) { cursorID := "act-cursor-42" cursorTime := time.Date(2026, 4, 30, 5, 0, 0, 0, time.UTC) + cursorSeq := int64(42) // Step 1: cursor lookup — must include workspace_id scope so a UUID - // from another workspace can't be used. - mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + // from another workspace can't be used. Now resolves BOTH ordering-key + // components (created_at, seq) so the strictly-after filter can compare + // the full tuple. + mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). WithArgs(cursorID, "ws-1"). - WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime)) + WillReturnRows(sqlmock.NewRows([]string{"created_at", "seq"}).AddRow(cursorTime, cursorSeq)) - // Step 2: main query with the cursor's created_at as a > filter, - // ASC ordering. Args: workspace_id, cursorTime, limit. + // Step 2: main query with the cursor's (created_at, seq) as a tuple + // strictly-after filter, (created_at, seq) ASC ordering. + // Args: workspace_id, cursorTime, cursorSeq, limit. mock.ExpectQuery("SELECT id, workspace_id, activity_type"). - WithArgs("ws-1", cursorTime, 100). + WithArgs("ws-1", cursorTime, cursorSeq, 100). WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() @@ -64,7 +68,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) { func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). WithArgs("act-gone", "ws-1"). WillReturnError(sql.ErrNoRows) @@ -96,7 +100,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) { // Cursor exists in DB but the WHERE workspace_id = $2 filter excludes // it — sqlmock returns no rows, which is what Postgres would do. - mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). WithArgs("act-other-ws", "ws-1"). WillReturnError(sql.ErrNoRows) @@ -120,20 +124,23 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) { // TestActivityHandler_SinceID_CombinedWithSinceSecs: both filters apply // together (AND). Argument order in the main query: workspace_id, -// since_secs, cursorTime, limit. Sanity-checks the placeholder index -// arithmetic in the query builder. +// since_secs, cursorTime, cursorSeq, limit. Sanity-checks the placeholder +// index arithmetic in the query builder (the cursor now binds TWO args — +// the (created_at, seq) tuple — so since_secs no longer shifts the tail by +// one but by two). func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) { mock := setupTestDB(t) cursorID := "act-c" cursorTime := time.Date(2026, 4, 30, 4, 0, 0, 0, time.UTC) + cursorSeq := int64(7) - mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). + mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`). WithArgs(cursorID, "ws-1"). - WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime)) + WillReturnRows(sqlmock.NewRows([]string{"created_at", "seq"}).AddRow(cursorTime, cursorSeq)) mock.ExpectQuery("SELECT id, workspace_id, activity_type"). - WithArgs("ws-1", 600, cursorTime, 100). + WithArgs("ws-1", 600, cursorTime, cursorSeq, 100). WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() diff --git a/workspace-server/migrations/20260604000000_activity_logs_seq.down.sql b/workspace-server/migrations/20260604000000_activity_logs_seq.down.sql new file mode 100644 index 000000000..3078bb8e6 --- /dev/null +++ b/workspace-server/migrations/20260604000000_activity_logs_seq.down.sql @@ -0,0 +1,9 @@ +-- Rollback for 20260604000000_activity_logs_seq.up.sql. +-- Drops the feed-ordering index and the monotonic seq column. +-- Run manually by an operator via psql; the boot-time runner never applies +-- *.down.sql (see RunMigrations in internal/db/postgres.go, issue #211). + +DROP INDEX IF EXISTS idx_activity_ws_created_seq; + +ALTER TABLE activity_logs + DROP COLUMN IF EXISTS seq; diff --git a/workspace-server/migrations/20260604000000_activity_logs_seq.up.sql b/workspace-server/migrations/20260604000000_activity_logs_seq.up.sql new file mode 100644 index 000000000..35bfbab7c --- /dev/null +++ b/workspace-server/migrations/20260604000000_activity_logs_seq.up.sql @@ -0,0 +1,54 @@ +-- Add a monotonic `seq` tiebreaker to activity_logs to make the poll-mode +-- since_id activity feed (#2339) deterministically ordered. +-- +-- ROOT CAUSE this fixes: the feed orders by created_at ASC/DESC with NO +-- tiebreaker, and activity_logs.id is a random gen_random_uuid() — there is +-- no monotonic column to break ties. Two rows inserted in the same +-- microsecond (back-to-back A2A logging) share a created_at and come back in +-- arbitrary planner order, so the E2E intermittently sees +-- hello-from-e2e-3 before hello-from-e2e-2. Not a flake — a missing +-- tiebreaker. (Second, related bug fixed in the handler: the since_id cursor +-- filtered `created_at > cursor` strictly, silently dropping a row written in +-- the same microsecond as the cursor row. The composite key below lets the +-- handler compare the full (created_at, seq) tuple.) +-- +-- `seq` is a GENERATED BY DEFAULT AS IDENTITY BIGINT — a UNIQUE, +-- monotonic-once-assigned tiebreaker. Precisely (verified on PostgreSQL +-- 16.13, the prod version): +-- * Backfill: adding the IDENTITY column to a populated table REWRITES the +-- table and assigns `seq` to every EXISTING row during the ALTER, in +-- PHYSICAL TABLE-SCAN order (NOT NULL — existing rows do get a value). +-- That order is not guaranteed to equal historical insertion order. +-- * The identity sequence then advances ABOVE max(seq), so every subsequent +-- INSERT that omits `seq` gets a fresh value strictly greater than the +-- backfilled max — collision-free with the backfilled rows. +-- * GENERATED BY DEFAULT (not ALWAYS) so existing INSERTs that don't name +-- `seq` keep working and a caller may still override it if ever needed. +-- +-- What `seq` is NOT, and why that's fine: +-- * NOT guaranteed gap-free — rolled-back transactions burn sequence values. +-- * NOT a strict commit-order guarantee under concurrency — two concurrent +-- INSERTs may commit in the opposite order to the `seq` values they drew. +-- Neither property is needed. The feed only requires a TOTAL, STABLE +-- tiebreaker so that (created_at, seq) is a deterministic order: for any two +-- rows it always sorts them the same way and never ties. `seq` being unique +-- and non-null on every row delivers exactly that. Same-created_at rows were +-- returned in ARBITRARY order before this migration; afterward they have a +-- fixed, repeatable order — strictly better, never worse. New traffic is fully +-- deterministic; the backfill makes historical rows deterministic too. +-- +-- Idempotent: ADD COLUMN IF NOT EXISTS + CREATE INDEX IF NOT EXISTS so the +-- boot-time runner (and the CI migrate-replay step) can re-apply this safely. + +ALTER TABLE activity_logs + ADD COLUMN IF NOT EXISTS seq BIGINT GENERATED BY DEFAULT AS IDENTITY; + +-- Composite index supporting the feed query: WHERE workspace_id = $1 +-- AND created_at $t ORDER BY created_at, seq. The (workspace_id, +-- created_at, seq) prefix serves both the ASC cursor path and the DESC recent +-- path (Postgres reads the same btree backwards for DESC). This is distinct +-- from migration 009's idx_activity_ws_type_time (workspace_id, activity_type, +-- created_at) — that one is type-prefixed and can't drive a type-agnostic feed +-- scan — and from 048's per-peer source_id/target_id indexes. +CREATE INDEX IF NOT EXISTS idx_activity_ws_created_seq + ON activity_logs (workspace_id, created_at, seq);