fix(activity): deterministic since_id feed ordering — monotonic seq tiebreaker (#2339) #2258

Merged
claude-ceo-assistant merged 4 commits from fix/activity-feed-stable-ordering into main 2026-06-05 00:32:05 +00:00
6 changed files with 493 additions and 23 deletions
+37 -10
View File
@@ -380,12 +380,18 @@ func (h *ActivityHandler) List(c *gin.Context) {
// "row not found" — both indicate the cursor is no longer usable for
// this caller, no information leak.
var cursorTime time.Time
var cursorSeq int64
usingCursor := false
if sinceID != "" {
// Resolve BOTH ordering-key components of the cursor row. The feed is
// ordered by (created_at, seq), so the strictly-after filter below must
// compare the full tuple — comparing created_at alone silently drops a
// row written in the SAME microsecond as the cursor row (the boundary
// skip the since_id E2E intermittently tripped over).
err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT created_at FROM activity_logs WHERE id = $1 AND workspace_id = $2`,
`SELECT created_at, seq FROM activity_logs WHERE id = $1 AND workspace_id = $2`,
sinceID, workspaceID,
).Scan(&cursorTime)
).Scan(&cursorTime, &cursorSeq)
if errors.Is(err, sql.ErrNoRows) {
c.JSON(http.StatusGone, gin.H{
"error": "since_id cursor not found (row may have been pruned or belongs to a different workspace); omit since_id to reset",
@@ -492,10 +498,20 @@ func (h *ActivityHandler) List(c *gin.Context) {
argIdx++
}
if usingCursor {
// Strictly after — never replay the cursor row itself.
query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx)
args = append(args, cursorTime)
argIdx++
// Strictly after the cursor on the FULL ordering key (created_at, seq).
// Tuple comparison: a row is "after" the cursor if its created_at is
// later, OR it shares the cursor's created_at but has a higher seq.
// This (a) never replays the cursor row itself and (b) — unlike a bare
// `created_at > cursor` — never drops a row written in the same
// microsecond as the cursor row. Expressed as the expanded boolean
// rather than a row-value `(created_at, seq) > ($t, $s)` so it composes
// with the actCol qualifier prefix and the existing placeholder/arg
// builder cleanly.
query += fmt.Sprintf(
" AND ("+actCol+"created_at > $%d OR ("+actCol+"created_at = $%d AND "+actCol+"seq > $%d))",
argIdx, argIdx, argIdx+1)
args = append(args, cursorTime, cursorSeq)
argIdx += 2
}
// Polling clients (since_id) need oldest-first within the new window so
@@ -503,9 +519,13 @@ func (h *ActivityHandler) List(c *gin.Context) {
// since_id) keeps DESC — that's the canvas/UI shape and changing it
// would surprise existing callers.
if usingCursor {
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx)
// (created_at, seq) ASC — seq is the deterministic tiebreaker for rows
// sharing a microsecond-collided created_at. Replays in recorded order.
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC, "+actCol+"seq ASC LIMIT $%d", argIdx)
} else {
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx)
// (created_at, seq) DESC — same tiebreaker, newest-first for the
// canvas/recent-feed shape.
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC, "+actCol+"seq DESC LIMIT $%d", argIdx)
}
args = append(args, limit)
@@ -680,7 +700,8 @@ func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []in
COALESCE(status, '') AS status,
request_body,
response_body,
created_at
created_at,
seq
FROM activity_logs
WHERE workspace_id = $1
)
@@ -702,7 +723,13 @@ func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []in
args = append(args, "%"+query+"%")
}
sqlQuery += ` ORDER BY created_at DESC LIMIT $` + strconv.Itoa(len(args)+1)
// Deterministic order: created_at alone is not unique (same-microsecond
// rows), so tie-break on the monotonic seq — same fix as the since_id feed
// (§ No flakes: no unstable sorts, even on an unused surface). `seq` is
// projected through the session_items CTE above so this outer ORDER BY can
// reference it — the outer SELECT can only sort on the CTE's output columns,
// not on activity_logs directly.
sqlQuery += ` ORDER BY created_at DESC, seq DESC LIMIT $` + strconv.Itoa(len(args)+1)
args = append(args, limit)
return sqlQuery, args
}
@@ -0,0 +1,211 @@
//go:build integration
// +build integration
// activity_seq_backfill_integration_test.go — REAL Postgres proof of the
// invariant the 20260604000000_activity_logs_seq.up.sql migration guarantees:
// every activity_logs row carries a NON-NULL `seq`, both for rows that existed
// before the migration ran (assigned during the ALTER TABLE rewrite) and for
// rows created afterward via the normal INSERT path (assigned by the IDENTITY
// default). This is the coverage CR2 (#2339 review) correctly flagged as
// missing on PR #2258.
//
// WHY THIS IS A SEPARATE TEST from activity_since_id_ordering_integration_test.go:
// that test pins the *ordering* contract (same-microsecond rows come back in a
// deterministic (created_at, seq) order). THIS test pins the *backfill* contract
// — that `seq` is never NULL — and the consequence the reviewer doubted: a
// pre-existing/backfilled row is usable as a since_id cursor because its seq is
// non-null, so the tuple cursor `(created_at, seq)` the handler builds is well
// defined for it.
//
// EMPIRICAL BASIS (PostgreSQL 16.13, the prod PG version):
// - `ALTER TABLE activity_logs ADD COLUMN seq BIGINT GENERATED BY DEFAULT AS
// IDENTITY` rewrites the table and assigns seq to EXISTING rows in physical
// table-scan order — they are NON-NULL, not left NULL as the review claimed.
// - The identity sequence then advances ABOVE max(seq), so the next INSERT
// that omits seq gets max+1 with no collision.
// Run against any Postgres 15/16 the integration harness boots — the property
// holds on both.
//
// Run with (same harness as activity_delegation_a2a_integration_test.go):
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply migrations (incl. 20260604000000_activity_logs_seq.up.sql) then:
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run Integration_ActivityLogs_Seq
//
// WATCH-IT-FAIL: if `seq` were left nullable / un-backfilled (the failure mode
// the reviewer hypothesized), the NULL-count assertion in _NoNull trips, and
// the since_id-on-a-backfilled-row case in _SinceIDOnBackfilledRow trips because
// the handler cannot read a non-null seq for the cursor row. With the migration
// as written both are green every run.
package handlers
import (
"context"
"encoding/json"
"net/http"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/gin-gonic/gin"
)
// TestIntegration_ActivityLogs_SeqBackfill_NoNull pins the core migration
// invariant: AFTER migrations have run, NO activity_logs row may have a NULL
// seq — neither rows that the seedActivityRowAt path inserts (IDENTITY default)
// nor any row the schema carries. It also proves the IDENTITY sequence keeps
// producing distinct, non-null seq for fresh inserts (no collision, no NULL).
//
// This is the assertion that would FAIL if the ALTER had left existing rows
// with NULL seq (the reviewer's claim) — table-scan backfill makes it pass.
func TestIntegration_ActivityLogs_SeqBackfill_NoNull(t *testing.T) {
conn := integrationDB_ActivityDelegationA2A(t)
_ = conn
wsID := seedWorkspace(t, conn, "test-2151-seq-backfill-nonull")
// Insert several rows via the normal path. seq is left to the IDENTITY
// default — exactly how production writes activity_logs.
t0 := time.Date(2026, 6, 4, 9, 0, 0, 0, time.UTC)
const n = 5
ids := make([]string, 0, n)
for i := 0; i < n; i++ {
ids = append(ids, seedActivityRowAt(t, wsID, "backfill-row", t0.Add(time.Duration(i)*time.Second)))
}
// (a) No row in this workspace may have a NULL seq. If the column were
// un-backfilled / nullable this is > 0 and the test fails.
var nullCount int
if err := db.DB.QueryRowContext(context.Background(),
`SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1 AND seq IS NULL`,
wsID,
).Scan(&nullCount); err != nil {
t.Fatalf("null-seq count query: %v", err)
}
if nullCount != 0 {
t.Fatalf("found %d activity_logs rows with NULL seq — migration did NOT backfill/assign seq", nullCount)
}
// Belt-and-suspenders: the GLOBAL invariant (no NULL seq anywhere in the
// table) is what the migration actually guarantees. Assert it too, so a
// regression that nulls seq for rows written by some other path is caught.
var globalNull int
if err := db.DB.QueryRowContext(context.Background(),
`SELECT COUNT(*) FROM activity_logs WHERE seq IS NULL`,
).Scan(&globalNull); err != nil {
t.Fatalf("global null-seq count query: %v", err)
}
if globalNull != 0 {
t.Fatalf("found %d activity_logs rows table-wide with NULL seq — seq must be non-null for every row", globalNull)
}
// (b) The IDENTITY sequence yields DISTINCT, monotonic, non-null seq for
// the rows we just inserted (proves the normal insert path gets a real seq,
// and that the sequence advanced past any backfilled max instead of
// colliding). We read them back in insert order and require strictly
// increasing, all-non-null seq.
rows, err := db.DB.QueryContext(context.Background(),
`SELECT seq FROM activity_logs WHERE workspace_id = $1 ORDER BY created_at ASC, seq ASC`,
wsID,
)
if err != nil {
t.Fatalf("read-back seq query: %v", err)
}
defer rows.Close()
var seqs []int64
for rows.Next() {
var s *int64 // pointer so a NULL would scan as nil rather than 0
if err := rows.Scan(&s); err != nil {
t.Fatalf("scan seq: %v", err)
}
if s == nil {
t.Fatal("a freshly-inserted activity_logs row has NULL seq — IDENTITY default did not fire")
}
seqs = append(seqs, *s)
}
if err := rows.Err(); err != nil {
t.Fatalf("rows err: %v", err)
}
if len(seqs) != n {
t.Fatalf("expected %d rows, read back %d", n, len(seqs))
}
for i := 1; i < len(seqs); i++ {
if seqs[i] <= seqs[i-1] {
t.Fatalf("seq not strictly increasing in insert order: %v (IDENTITY collision / reuse)", seqs)
}
}
}
// TestIntegration_ActivityLogs_SeqBackfill_SinceIDOnBackfilledRow pins the
// consequence the reviewer doubted: a row whose seq came from the migration /
// IDENTITY (i.e. NOT explicitly set by the caller) is usable as a since_id
// cursor, and a SECOND row sharing its exact created_at microsecond is returned
// (not dropped). This proves the handler's (created_at, seq) tuple cursor
// resolves a same-timestamp boundary that a created_at-only cursor would drop,
// AND that the cursor row's seq is non-null (else the handler could not build
// the tuple at all).
//
// Distinct from _BoundaryRowSameMicrosecondNotSkipped in the ordering test:
// here the explicit angle under test is "the cursor row's seq is a
// migration/IDENTITY-assigned (backfilled-style) value, non-null, and the
// handler uses it" — i.e. the backfill behavior is what makes the boundary
// resolution work, pinned head-on.
func TestIntegration_ActivityLogs_SeqBackfill_SinceIDOnBackfilledRow(t *testing.T) {
conn := integrationDB_ActivityDelegationA2A(t)
_ = conn
wsID := seedWorkspace(t, conn, "test-2151-seq-backfill-sinceid")
tSame := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC)
// Cursor row: seq comes purely from the IDENTITY default (never set by
// the caller) — the same assignment mechanism the migration uses to
// backfill pre-existing rows. The "next" row shares the exact created_at
// microsecond and is inserted afterward, so it gets a strictly higher seq.
cursorID := seedActivityRowAt(t, wsID, "sinceid-cursor", tSame)
nextID := seedActivityRowAt(t, wsID, "sinceid-next-same-us", tSame)
// Prove the precondition the reviewer doubted: the cursor row's seq is
// NON-NULL, so the handler can read it to build the (created_at, seq)
// tuple. If it were NULL the handler's cursor lookup would yield a NULL
// seq and the strictly-after tuple comparison would mis-behave.
var cursorSeq *int64
if err := db.DB.QueryRowContext(context.Background(),
`SELECT seq FROM activity_logs WHERE id = $1`, cursorID,
).Scan(&cursorSeq); err != nil {
t.Fatalf("read cursor seq: %v", err)
}
if cursorSeq == nil {
t.Fatal("cursor row has NULL seq — a since_id cursor on a backfilled-style row would be unusable")
}
h := NewActivityHandler(nil)
c, w := newTestGinContext()
c.Params = gin.Params{{Key: "id", Value: wsID}}
q := c.Request.URL.Query()
q.Set("since_id", cursorID)
q.Set("type", "a2a_receive")
q.Set("limit", "10")
c.Request.URL.RawQuery = q.Encode()
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal: %v", err)
}
// Exactly the one same-microsecond row after the cursor — present (not
// dropped by a strict created_at-only filter) and the cursor itself
// excluded (strictly-after on the full tuple).
if len(resp) != 1 {
t.Fatalf("same-microsecond row after backfilled-style cursor dropped: expected 1 row, got %d: %+v",
len(resp), resp)
}
if got, _ := resp[0]["id"].(string); got != nextID {
t.Fatalf("expected boundary row id %s, got %s", nextID, got)
}
}
@@ -0,0 +1,162 @@
//go:build integration
// +build integration
// activity_since_id_ordering_integration_test.go — REAL Postgres proof that
// the poll-mode since_id activity feed (#2339) is DETERMINISTICALLY ordered
// even when multiple rows collide on the same created_at microsecond.
//
// This is the test that the original bug report mis-labeled a "flake".
// sqlmock cannot catch it: sqlmock returns rows in the order the test stuffs
// them, so it can never reveal a non-deterministic ORDER BY. Only a real
// planner over real same-created_at rows exposes it.
//
// Run with (same harness as activity_delegation_a2a_integration_test.go):
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply migrations (incl. 20260604000000_activity_logs_seq.up.sql) then:
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run Integration_SinceID
//
// WATCH-IT-FAIL: against the pre-fix handler (ORDER BY created_at only, no
// seq tiebreaker, and `created_at > cursor` strict) this test is unstable —
// the equal-created_at rows come back in arbitrary planner order so the
// ordered-id assertion fails intermittently, and the same-microsecond
// boundary row is dropped so the count assertion fails. With the fix
// (ORDER BY created_at, seq + tuple cursor) it is green every run.
package handlers
import (
"context"
"encoding/json"
"net/http"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/gin-gonic/gin"
)
// seedActivityRowAt inserts one activity_logs row with an explicit created_at
// (so the test can force microsecond-equal collisions) and a unique summary;
// returns the generated id. seq is left to the IDENTITY default — Postgres
// assigns it in INSERT order, which is the deterministic tiebreaker under test.
// db.DB has been hot-swapped to the integration connection by
// integrationDB_ActivityDelegationA2A(t) in the calling test.
func seedActivityRowAt(t *testing.T, wsID, summary string, createdAt time.Time) string {
t.Helper()
var id string
err := db.DB.QueryRowContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, summary, status, created_at)
VALUES ($1, 'a2a_receive', $2, 'ok', $3)
RETURNING id
`, wsID, summary, createdAt).Scan(&id)
if err != nil {
t.Fatalf("seedActivityRowAt(%q): %v", summary, err)
}
return id
}
// TestIntegration_SinceID_StableOrderingSameMicrosecond proves the feed is
// deterministic when rows share a created_at, AND that the same-microsecond
// boundary row immediately after the cursor is NOT dropped.
func TestIntegration_SinceID_StableOrderingSameMicrosecond(t *testing.T) {
conn := integrationDB_ActivityDelegationA2A(t)
_ = conn
wsID := seedWorkspace(t, conn, "test-2151-sinceid-ordering")
// One earlier row to serve as the cursor (the "last processed" row).
tCursor := time.Date(2026, 6, 4, 12, 0, 0, 0, time.UTC)
cursorID := seedActivityRowAt(t, wsID, "cursor-row", tCursor)
// Three rows that ALL collide on the exact same created_at microsecond,
// inserted in a known order. Pre-fix, ORDER BY created_at alone returns
// these in arbitrary planner order.
tEqual := time.Date(2026, 6, 4, 12, 0, 1, 0, time.UTC)
idA := seedActivityRowAt(t, wsID, "equal-A", tEqual)
idB := seedActivityRowAt(t, wsID, "equal-B", tEqual)
idCc := seedActivityRowAt(t, wsID, "equal-C", tEqual)
wantOrder := []string{idA, idB, idCc}
// Drive the handler exactly as a polling client would.
h := NewActivityHandler(nil)
c, w := newTestGinContext()
c.Params = gin.Params{{Key: "id", Value: wsID}}
q := c.Request.URL.Query()
q.Set("since_id", cursorID)
q.Set("type", "a2a_receive")
q.Set("limit", "10")
c.Request.URL.RawQuery = q.Encode()
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal: %v", err)
}
// All three equal-created_at rows must be present (boundary not dropped)
// and the cursor row itself must be excluded (strictly-after).
if len(resp) != len(wantOrder) {
t.Fatalf("expected %d rows after cursor (the 3 equal-created_at rows), got %d: %+v",
len(wantOrder), len(resp), resp)
}
gotOrder := make([]string, len(resp))
for i, row := range resp {
idVal, _ := row["id"].(string)
gotOrder[i] = idVal
}
for i := range wantOrder {
if gotOrder[i] != wantOrder[i] {
t.Fatalf("non-deterministic ordering: got id order %v, want %v (seq tiebreaker not applied)",
gotOrder, wantOrder)
}
}
}
// TestIntegration_SinceID_BoundaryRowSameMicrosecondNotSkipped isolates the
// cursor-boundary bug: a row written in the SAME microsecond as the cursor
// row (but with a higher seq) must still be returned. Pre-fix the strict
// `created_at > cursor` filter silently dropped it.
func TestIntegration_SinceID_BoundaryRowSameMicrosecondNotSkipped(t *testing.T) {
conn := integrationDB_ActivityDelegationA2A(t)
_ = conn
wsID := seedWorkspace(t, conn, "test-2151-sinceid-boundary")
tSame := time.Date(2026, 6, 4, 13, 0, 0, 0, time.UTC)
// Cursor row and the next row share the exact same created_at; the next
// row is inserted afterwards so it gets a higher seq.
cursorID := seedActivityRowAt(t, wsID, "boundary-cursor", tSame)
nextID := seedActivityRowAt(t, wsID, "boundary-next-same-us", tSame)
h := NewActivityHandler(nil)
c, w := newTestGinContext()
c.Params = gin.Params{{Key: "id", Value: wsID}}
q := c.Request.URL.Query()
q.Set("since_id", cursorID)
q.Set("type", "a2a_receive")
q.Set("limit", "10")
c.Request.URL.RawQuery = q.Encode()
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if len(resp) != 1 {
t.Fatalf("same-microsecond boundary row dropped: expected exactly the 1 next row, got %d rows: %+v",
len(resp), resp)
}
if got, _ := resp[0]["id"].(string); got != nextID {
t.Fatalf("expected boundary row id %s, got %s", nextID, got)
}
}
@@ -26,17 +26,21 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
cursorID := "act-cursor-42"
cursorTime := time.Date(2026, 4, 30, 5, 0, 0, 0, time.UTC)
cursorSeq := int64(42)
// Step 1: cursor lookup — must include workspace_id scope so a UUID
// from another workspace can't be used.
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
// from another workspace can't be used. Now resolves BOTH ordering-key
// components (created_at, seq) so the strictly-after filter can compare
// the full tuple.
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
WithArgs(cursorID, "ws-1").
WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime))
WillReturnRows(sqlmock.NewRows([]string{"created_at", "seq"}).AddRow(cursorTime, cursorSeq))
// Step 2: main query with the cursor's created_at as a > filter,
// ASC ordering. Args: workspace_id, cursorTime, limit.
// Step 2: main query with the cursor's (created_at, seq) as a tuple
// strictly-after filter, (created_at, seq) ASC ordering.
// Args: workspace_id, cursorTime, cursorSeq, limit.
mock.ExpectQuery("SELECT id, workspace_id, activity_type").
WithArgs("ws-1", cursorTime, 100).
WithArgs("ws-1", cursorTime, cursorSeq, 100).
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
@@ -64,7 +68,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
WithArgs("act-gone", "ws-1").
WillReturnError(sql.ErrNoRows)
@@ -96,7 +100,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
// Cursor exists in DB but the WHERE workspace_id = $2 filter excludes
// it — sqlmock returns no rows, which is what Postgres would do.
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
WithArgs("act-other-ws", "ws-1").
WillReturnError(sql.ErrNoRows)
@@ -120,20 +124,23 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
// TestActivityHandler_SinceID_CombinedWithSinceSecs: both filters apply
// together (AND). Argument order in the main query: workspace_id,
// since_secs, cursorTime, limit. Sanity-checks the placeholder index
// arithmetic in the query builder.
// since_secs, cursorTime, cursorSeq, limit. Sanity-checks the placeholder
// index arithmetic in the query builder (the cursor now binds TWO args —
// the (created_at, seq) tuple — so since_secs no longer shifts the tail by
// one but by two).
func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
mock := setupTestDB(t)
cursorID := "act-c"
cursorTime := time.Date(2026, 4, 30, 4, 0, 0, 0, time.UTC)
cursorSeq := int64(7)
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
WithArgs(cursorID, "ws-1").
WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime))
WillReturnRows(sqlmock.NewRows([]string{"created_at", "seq"}).AddRow(cursorTime, cursorSeq))
mock.ExpectQuery("SELECT id, workspace_id, activity_type").
WithArgs("ws-1", 600, cursorTime, 100).
WithArgs("ws-1", 600, cursorTime, cursorSeq, 100).
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
@@ -0,0 +1,9 @@
-- Rollback for 20260604000000_activity_logs_seq.up.sql.
-- Drops the feed-ordering index and the monotonic seq column.
-- Run manually by an operator via psql; the boot-time runner never applies
-- *.down.sql (see RunMigrations in internal/db/postgres.go, issue #211).
DROP INDEX IF EXISTS idx_activity_ws_created_seq;
ALTER TABLE activity_logs
DROP COLUMN IF EXISTS seq;
@@ -0,0 +1,54 @@
-- Add a monotonic `seq` tiebreaker to activity_logs to make the poll-mode
-- since_id activity feed (#2339) deterministically ordered.
--
-- ROOT CAUSE this fixes: the feed orders by created_at ASC/DESC with NO
-- tiebreaker, and activity_logs.id is a random gen_random_uuid() — there is
-- no monotonic column to break ties. Two rows inserted in the same
-- microsecond (back-to-back A2A logging) share a created_at and come back in
-- arbitrary planner order, so the E2E intermittently sees
-- hello-from-e2e-3 before hello-from-e2e-2. Not a flake — a missing
-- tiebreaker. (Second, related bug fixed in the handler: the since_id cursor
-- filtered `created_at > cursor` strictly, silently dropping a row written in
-- the same microsecond as the cursor row. The composite key below lets the
-- handler compare the full (created_at, seq) tuple.)
--
-- `seq` is a GENERATED BY DEFAULT AS IDENTITY BIGINT — a UNIQUE,
-- monotonic-once-assigned tiebreaker. Precisely (verified on PostgreSQL
-- 16.13, the prod version):
-- * Backfill: adding the IDENTITY column to a populated table REWRITES the
-- table and assigns `seq` to every EXISTING row during the ALTER, in
-- PHYSICAL TABLE-SCAN order (NOT NULL — existing rows do get a value).
-- That order is not guaranteed to equal historical insertion order.
-- * The identity sequence then advances ABOVE max(seq), so every subsequent
-- INSERT that omits `seq` gets a fresh value strictly greater than the
-- backfilled max — collision-free with the backfilled rows.
-- * GENERATED BY DEFAULT (not ALWAYS) so existing INSERTs that don't name
-- `seq` keep working and a caller may still override it if ever needed.
--
-- What `seq` is NOT, and why that's fine:
-- * NOT guaranteed gap-free — rolled-back transactions burn sequence values.
-- * NOT a strict commit-order guarantee under concurrency — two concurrent
-- INSERTs may commit in the opposite order to the `seq` values they drew.
-- Neither property is needed. The feed only requires a TOTAL, STABLE
-- tiebreaker so that (created_at, seq) is a deterministic order: for any two
-- rows it always sorts them the same way and never ties. `seq` being unique
-- and non-null on every row delivers exactly that. Same-created_at rows were
-- returned in ARBITRARY order before this migration; afterward they have a
-- fixed, repeatable order — strictly better, never worse. New traffic is fully
-- deterministic; the backfill makes historical rows deterministic too.
--
-- Idempotent: ADD COLUMN IF NOT EXISTS + CREATE INDEX IF NOT EXISTS so the
-- boot-time runner (and the CI migrate-replay step) can re-apply this safely.
ALTER TABLE activity_logs
ADD COLUMN IF NOT EXISTS seq BIGINT GENERATED BY DEFAULT AS IDENTITY;
-- Composite index supporting the feed query: WHERE workspace_id = $1
-- AND created_at <cmp> $t ORDER BY created_at, seq. The (workspace_id,
-- created_at, seq) prefix serves both the ASC cursor path and the DESC recent
-- path (Postgres reads the same btree backwards for DESC). This is distinct
-- from migration 009's idx_activity_ws_type_time (workspace_id, activity_type,
-- created_at) — that one is type-prefixed and can't drive a type-agnostic feed
-- scan — and from 048's per-peer source_id/target_id indexes.
CREATE INDEX IF NOT EXISTS idx_activity_ws_created_seq
ON activity_logs (workspace_id, created_at, seq);