fix(activity): deterministic since_id feed ordering — monotonic seq tiebreaker (#2339) #2258
@@ -380,12 +380,18 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// "row not found" — both indicate the cursor is no longer usable for
|
||||
// this caller, no information leak.
|
||||
var cursorTime time.Time
|
||||
var cursorSeq int64
|
||||
usingCursor := false
|
||||
if sinceID != "" {
|
||||
// Resolve BOTH ordering-key components of the cursor row. The feed is
|
||||
// ordered by (created_at, seq), so the strictly-after filter below must
|
||||
// compare the full tuple — comparing created_at alone silently drops a
|
||||
// row written in the SAME microsecond as the cursor row (the boundary
|
||||
// skip the since_id E2E intermittently tripped over).
|
||||
err := db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT created_at FROM activity_logs WHERE id = $1 AND workspace_id = $2`,
|
||||
`SELECT created_at, seq FROM activity_logs WHERE id = $1 AND workspace_id = $2`,
|
||||
sinceID, workspaceID,
|
||||
).Scan(&cursorTime)
|
||||
).Scan(&cursorTime, &cursorSeq)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
c.JSON(http.StatusGone, gin.H{
|
||||
"error": "since_id cursor not found (row may have been pruned or belongs to a different workspace); omit since_id to reset",
|
||||
@@ -492,10 +498,20 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
argIdx++
|
||||
}
|
||||
if usingCursor {
|
||||
// Strictly after — never replay the cursor row itself.
|
||||
query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx)
|
||||
args = append(args, cursorTime)
|
||||
argIdx++
|
||||
// Strictly after the cursor on the FULL ordering key (created_at, seq).
|
||||
// Tuple comparison: a row is "after" the cursor if its created_at is
|
||||
// later, OR it shares the cursor's created_at but has a higher seq.
|
||||
// This (a) never replays the cursor row itself and (b) — unlike a bare
|
||||
// `created_at > cursor` — never drops a row written in the same
|
||||
// microsecond as the cursor row. Expressed as the expanded boolean
|
||||
// rather than a row-value `(created_at, seq) > ($t, $s)` so it composes
|
||||
// with the actCol qualifier prefix and the existing placeholder/arg
|
||||
// builder cleanly.
|
||||
query += fmt.Sprintf(
|
||||
" AND ("+actCol+"created_at > $%d OR ("+actCol+"created_at = $%d AND "+actCol+"seq > $%d))",
|
||||
argIdx, argIdx, argIdx+1)
|
||||
args = append(args, cursorTime, cursorSeq)
|
||||
argIdx += 2
|
||||
}
|
||||
|
||||
// Polling clients (since_id) need oldest-first within the new window so
|
||||
@@ -503,9 +519,13 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// since_id) keeps DESC — that's the canvas/UI shape and changing it
|
||||
// would surprise existing callers.
|
||||
if usingCursor {
|
||||
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx)
|
||||
// (created_at, seq) ASC — seq is the deterministic tiebreaker for rows
|
||||
// sharing a microsecond-collided created_at. Replays in recorded order.
|
||||
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC, "+actCol+"seq ASC LIMIT $%d", argIdx)
|
||||
} else {
|
||||
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx)
|
||||
// (created_at, seq) DESC — same tiebreaker, newest-first for the
|
||||
// canvas/recent-feed shape.
|
||||
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC, "+actCol+"seq DESC LIMIT $%d", argIdx)
|
||||
}
|
||||
args = append(args, limit)
|
||||
|
||||
@@ -680,7 +700,8 @@ func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []in
|
||||
COALESCE(status, '') AS status,
|
||||
request_body,
|
||||
response_body,
|
||||
created_at
|
||||
created_at,
|
||||
seq
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1
|
||||
)
|
||||
@@ -702,7 +723,13 @@ func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []in
|
||||
args = append(args, "%"+query+"%")
|
||||
}
|
||||
|
||||
sqlQuery += ` ORDER BY created_at DESC LIMIT $` + strconv.Itoa(len(args)+1)
|
||||
// Deterministic order: created_at alone is not unique (same-microsecond
|
||||
// rows), so tie-break on the monotonic seq — same fix as the since_id feed
|
||||
// (§ No flakes: no unstable sorts, even on an unused surface). `seq` is
|
||||
// projected through the session_items CTE above so this outer ORDER BY can
|
||||
// reference it — the outer SELECT can only sort on the CTE's output columns,
|
||||
// not on activity_logs directly.
|
||||
sqlQuery += ` ORDER BY created_at DESC, seq DESC LIMIT $` + strconv.Itoa(len(args)+1)
|
||||
args = append(args, limit)
|
||||
return sqlQuery, args
|
||||
}
|
||||
|
||||
@@ -0,0 +1,211 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// activity_seq_backfill_integration_test.go — REAL Postgres proof of the
|
||||
// invariant the 20260604000000_activity_logs_seq.up.sql migration guarantees:
|
||||
// every activity_logs row carries a NON-NULL `seq`, both for rows that existed
|
||||
// before the migration ran (assigned during the ALTER TABLE rewrite) and for
|
||||
// rows created afterward via the normal INSERT path (assigned by the IDENTITY
|
||||
// default). This is the coverage CR2 (#2339 review) correctly flagged as
|
||||
// missing on PR #2258.
|
||||
//
|
||||
// WHY THIS IS A SEPARATE TEST from activity_since_id_ordering_integration_test.go:
|
||||
// that test pins the *ordering* contract (same-microsecond rows come back in a
|
||||
// deterministic (created_at, seq) order). THIS test pins the *backfill* contract
|
||||
// — that `seq` is never NULL — and the consequence the reviewer doubted: a
|
||||
// pre-existing/backfilled row is usable as a since_id cursor because its seq is
|
||||
// non-null, so the tuple cursor `(created_at, seq)` the handler builds is well
|
||||
// defined for it.
|
||||
//
|
||||
// EMPIRICAL BASIS (PostgreSQL 16.13, the prod PG version):
|
||||
// - `ALTER TABLE activity_logs ADD COLUMN seq BIGINT GENERATED BY DEFAULT AS
|
||||
// IDENTITY` rewrites the table and assigns seq to EXISTING rows in physical
|
||||
// table-scan order — they are NON-NULL, not left NULL as the review claimed.
|
||||
// - The identity sequence then advances ABOVE max(seq), so the next INSERT
|
||||
// that omits seq gets max+1 with no collision.
|
||||
// Run against any Postgres 15/16 the integration harness boots — the property
|
||||
// holds on both.
|
||||
//
|
||||
// Run with (same harness as activity_delegation_a2a_integration_test.go):
|
||||
//
|
||||
// docker run --rm -d --name pg-integration \
|
||||
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
|
||||
// -p 55432:5432 postgres:15-alpine
|
||||
// sleep 4
|
||||
// # apply migrations (incl. 20260604000000_activity_logs_seq.up.sql) then:
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_ActivityLogs_Seq
|
||||
//
|
||||
// WATCH-IT-FAIL: if `seq` were left nullable / un-backfilled (the failure mode
|
||||
// the reviewer hypothesized), the NULL-count assertion in _NoNull trips, and
|
||||
// the since_id-on-a-backfilled-row case in _SinceIDOnBackfilledRow trips because
|
||||
// the handler cannot read a non-null seq for the cursor row. With the migration
|
||||
// as written both are green every run.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestIntegration_ActivityLogs_SeqBackfill_NoNull pins the core migration
|
||||
// invariant: AFTER migrations have run, NO activity_logs row may have a NULL
|
||||
// seq — neither rows that the seedActivityRowAt path inserts (IDENTITY default)
|
||||
// nor any row the schema carries. It also proves the IDENTITY sequence keeps
|
||||
// producing distinct, non-null seq for fresh inserts (no collision, no NULL).
|
||||
//
|
||||
// This is the assertion that would FAIL if the ALTER had left existing rows
|
||||
// with NULL seq (the reviewer's claim) — table-scan backfill makes it pass.
|
||||
func TestIntegration_ActivityLogs_SeqBackfill_NoNull(t *testing.T) {
|
||||
conn := integrationDB_ActivityDelegationA2A(t)
|
||||
_ = conn
|
||||
wsID := seedWorkspace(t, conn, "test-2151-seq-backfill-nonull")
|
||||
|
||||
// Insert several rows via the normal path. seq is left to the IDENTITY
|
||||
// default — exactly how production writes activity_logs.
|
||||
t0 := time.Date(2026, 6, 4, 9, 0, 0, 0, time.UTC)
|
||||
const n = 5
|
||||
ids := make([]string, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
ids = append(ids, seedActivityRowAt(t, wsID, "backfill-row", t0.Add(time.Duration(i)*time.Second)))
|
||||
}
|
||||
|
||||
// (a) No row in this workspace may have a NULL seq. If the column were
|
||||
// un-backfilled / nullable this is > 0 and the test fails.
|
||||
var nullCount int
|
||||
if err := db.DB.QueryRowContext(context.Background(),
|
||||
`SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1 AND seq IS NULL`,
|
||||
wsID,
|
||||
).Scan(&nullCount); err != nil {
|
||||
t.Fatalf("null-seq count query: %v", err)
|
||||
}
|
||||
if nullCount != 0 {
|
||||
t.Fatalf("found %d activity_logs rows with NULL seq — migration did NOT backfill/assign seq", nullCount)
|
||||
}
|
||||
|
||||
// Belt-and-suspenders: the GLOBAL invariant (no NULL seq anywhere in the
|
||||
// table) is what the migration actually guarantees. Assert it too, so a
|
||||
// regression that nulls seq for rows written by some other path is caught.
|
||||
var globalNull int
|
||||
if err := db.DB.QueryRowContext(context.Background(),
|
||||
`SELECT COUNT(*) FROM activity_logs WHERE seq IS NULL`,
|
||||
).Scan(&globalNull); err != nil {
|
||||
t.Fatalf("global null-seq count query: %v", err)
|
||||
}
|
||||
if globalNull != 0 {
|
||||
t.Fatalf("found %d activity_logs rows table-wide with NULL seq — seq must be non-null for every row", globalNull)
|
||||
}
|
||||
|
||||
// (b) The IDENTITY sequence yields DISTINCT, monotonic, non-null seq for
|
||||
// the rows we just inserted (proves the normal insert path gets a real seq,
|
||||
// and that the sequence advanced past any backfilled max instead of
|
||||
// colliding). We read them back in insert order and require strictly
|
||||
// increasing, all-non-null seq.
|
||||
rows, err := db.DB.QueryContext(context.Background(),
|
||||
`SELECT seq FROM activity_logs WHERE workspace_id = $1 ORDER BY created_at ASC, seq ASC`,
|
||||
wsID,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("read-back seq query: %v", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
var seqs []int64
|
||||
for rows.Next() {
|
||||
var s *int64 // pointer so a NULL would scan as nil rather than 0
|
||||
if err := rows.Scan(&s); err != nil {
|
||||
t.Fatalf("scan seq: %v", err)
|
||||
}
|
||||
if s == nil {
|
||||
t.Fatal("a freshly-inserted activity_logs row has NULL seq — IDENTITY default did not fire")
|
||||
}
|
||||
seqs = append(seqs, *s)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
t.Fatalf("rows err: %v", err)
|
||||
}
|
||||
if len(seqs) != n {
|
||||
t.Fatalf("expected %d rows, read back %d", n, len(seqs))
|
||||
}
|
||||
for i := 1; i < len(seqs); i++ {
|
||||
if seqs[i] <= seqs[i-1] {
|
||||
t.Fatalf("seq not strictly increasing in insert order: %v (IDENTITY collision / reuse)", seqs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_ActivityLogs_SeqBackfill_SinceIDOnBackfilledRow pins the
|
||||
// consequence the reviewer doubted: a row whose seq came from the migration /
|
||||
// IDENTITY (i.e. NOT explicitly set by the caller) is usable as a since_id
|
||||
// cursor, and a SECOND row sharing its exact created_at microsecond is returned
|
||||
// (not dropped). This proves the handler's (created_at, seq) tuple cursor
|
||||
// resolves a same-timestamp boundary that a created_at-only cursor would drop,
|
||||
// AND that the cursor row's seq is non-null (else the handler could not build
|
||||
// the tuple at all).
|
||||
//
|
||||
// Distinct from _BoundaryRowSameMicrosecondNotSkipped in the ordering test:
|
||||
// here the explicit angle under test is "the cursor row's seq is a
|
||||
// migration/IDENTITY-assigned (backfilled-style) value, non-null, and the
|
||||
// handler uses it" — i.e. the backfill behavior is what makes the boundary
|
||||
// resolution work, pinned head-on.
|
||||
func TestIntegration_ActivityLogs_SeqBackfill_SinceIDOnBackfilledRow(t *testing.T) {
|
||||
conn := integrationDB_ActivityDelegationA2A(t)
|
||||
_ = conn
|
||||
wsID := seedWorkspace(t, conn, "test-2151-seq-backfill-sinceid")
|
||||
|
||||
tSame := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC)
|
||||
// Cursor row: seq comes purely from the IDENTITY default (never set by
|
||||
// the caller) — the same assignment mechanism the migration uses to
|
||||
// backfill pre-existing rows. The "next" row shares the exact created_at
|
||||
// microsecond and is inserted afterward, so it gets a strictly higher seq.
|
||||
cursorID := seedActivityRowAt(t, wsID, "sinceid-cursor", tSame)
|
||||
nextID := seedActivityRowAt(t, wsID, "sinceid-next-same-us", tSame)
|
||||
|
||||
// Prove the precondition the reviewer doubted: the cursor row's seq is
|
||||
// NON-NULL, so the handler can read it to build the (created_at, seq)
|
||||
// tuple. If it were NULL the handler's cursor lookup would yield a NULL
|
||||
// seq and the strictly-after tuple comparison would mis-behave.
|
||||
var cursorSeq *int64
|
||||
if err := db.DB.QueryRowContext(context.Background(),
|
||||
`SELECT seq FROM activity_logs WHERE id = $1`, cursorID,
|
||||
).Scan(&cursorSeq); err != nil {
|
||||
t.Fatalf("read cursor seq: %v", err)
|
||||
}
|
||||
if cursorSeq == nil {
|
||||
t.Fatal("cursor row has NULL seq — a since_id cursor on a backfilled-style row would be unusable")
|
||||
}
|
||||
|
||||
h := NewActivityHandler(nil)
|
||||
c, w := newTestGinContext()
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
q := c.Request.URL.Query()
|
||||
q.Set("since_id", cursorID)
|
||||
q.Set("type", "a2a_receive")
|
||||
q.Set("limit", "10")
|
||||
c.Request.URL.RawQuery = q.Encode()
|
||||
|
||||
h.List(c)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
// Exactly the one same-microsecond row after the cursor — present (not
|
||||
// dropped by a strict created_at-only filter) and the cursor itself
|
||||
// excluded (strictly-after on the full tuple).
|
||||
if len(resp) != 1 {
|
||||
t.Fatalf("same-microsecond row after backfilled-style cursor dropped: expected 1 row, got %d: %+v",
|
||||
len(resp), resp)
|
||||
}
|
||||
if got, _ := resp[0]["id"].(string); got != nextID {
|
||||
t.Fatalf("expected boundary row id %s, got %s", nextID, got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,162 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// activity_since_id_ordering_integration_test.go — REAL Postgres proof that
|
||||
// the poll-mode since_id activity feed (#2339) is DETERMINISTICALLY ordered
|
||||
// even when multiple rows collide on the same created_at microsecond.
|
||||
//
|
||||
// This is the test that the original bug report mis-labeled a "flake".
|
||||
// sqlmock cannot catch it: sqlmock returns rows in the order the test stuffs
|
||||
// them, so it can never reveal a non-deterministic ORDER BY. Only a real
|
||||
// planner over real same-created_at rows exposes it.
|
||||
//
|
||||
// Run with (same harness as activity_delegation_a2a_integration_test.go):
|
||||
//
|
||||
// docker run --rm -d --name pg-integration \
|
||||
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
|
||||
// -p 55432:5432 postgres:15-alpine
|
||||
// sleep 4
|
||||
// # apply migrations (incl. 20260604000000_activity_logs_seq.up.sql) then:
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_SinceID
|
||||
//
|
||||
// WATCH-IT-FAIL: against the pre-fix handler (ORDER BY created_at only, no
|
||||
// seq tiebreaker, and `created_at > cursor` strict) this test is unstable —
|
||||
// the equal-created_at rows come back in arbitrary planner order so the
|
||||
// ordered-id assertion fails intermittently, and the same-microsecond
|
||||
// boundary row is dropped so the count assertion fails. With the fix
|
||||
// (ORDER BY created_at, seq + tuple cursor) it is green every run.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// seedActivityRowAt inserts one activity_logs row with an explicit created_at
|
||||
// (so the test can force microsecond-equal collisions) and a unique summary;
|
||||
// returns the generated id. seq is left to the IDENTITY default — Postgres
|
||||
// assigns it in INSERT order, which is the deterministic tiebreaker under test.
|
||||
// db.DB has been hot-swapped to the integration connection by
|
||||
// integrationDB_ActivityDelegationA2A(t) in the calling test.
|
||||
func seedActivityRowAt(t *testing.T, wsID, summary string, createdAt time.Time) string {
|
||||
t.Helper()
|
||||
var id string
|
||||
err := db.DB.QueryRowContext(context.Background(), `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, summary, status, created_at)
|
||||
VALUES ($1, 'a2a_receive', $2, 'ok', $3)
|
||||
RETURNING id
|
||||
`, wsID, summary, createdAt).Scan(&id)
|
||||
if err != nil {
|
||||
t.Fatalf("seedActivityRowAt(%q): %v", summary, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// TestIntegration_SinceID_StableOrderingSameMicrosecond proves the feed is
|
||||
// deterministic when rows share a created_at, AND that the same-microsecond
|
||||
// boundary row immediately after the cursor is NOT dropped.
|
||||
func TestIntegration_SinceID_StableOrderingSameMicrosecond(t *testing.T) {
|
||||
conn := integrationDB_ActivityDelegationA2A(t)
|
||||
_ = conn
|
||||
wsID := seedWorkspace(t, conn, "test-2151-sinceid-ordering")
|
||||
|
||||
// One earlier row to serve as the cursor (the "last processed" row).
|
||||
tCursor := time.Date(2026, 6, 4, 12, 0, 0, 0, time.UTC)
|
||||
cursorID := seedActivityRowAt(t, wsID, "cursor-row", tCursor)
|
||||
|
||||
// Three rows that ALL collide on the exact same created_at microsecond,
|
||||
// inserted in a known order. Pre-fix, ORDER BY created_at alone returns
|
||||
// these in arbitrary planner order.
|
||||
tEqual := time.Date(2026, 6, 4, 12, 0, 1, 0, time.UTC)
|
||||
idA := seedActivityRowAt(t, wsID, "equal-A", tEqual)
|
||||
idB := seedActivityRowAt(t, wsID, "equal-B", tEqual)
|
||||
idCc := seedActivityRowAt(t, wsID, "equal-C", tEqual)
|
||||
wantOrder := []string{idA, idB, idCc}
|
||||
|
||||
// Drive the handler exactly as a polling client would.
|
||||
h := NewActivityHandler(nil)
|
||||
c, w := newTestGinContext()
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
q := c.Request.URL.Query()
|
||||
q.Set("since_id", cursorID)
|
||||
q.Set("type", "a2a_receive")
|
||||
q.Set("limit", "10")
|
||||
c.Request.URL.RawQuery = q.Encode()
|
||||
|
||||
h.List(c)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
|
||||
// All three equal-created_at rows must be present (boundary not dropped)
|
||||
// and the cursor row itself must be excluded (strictly-after).
|
||||
if len(resp) != len(wantOrder) {
|
||||
t.Fatalf("expected %d rows after cursor (the 3 equal-created_at rows), got %d: %+v",
|
||||
len(wantOrder), len(resp), resp)
|
||||
}
|
||||
|
||||
gotOrder := make([]string, len(resp))
|
||||
for i, row := range resp {
|
||||
idVal, _ := row["id"].(string)
|
||||
gotOrder[i] = idVal
|
||||
}
|
||||
for i := range wantOrder {
|
||||
if gotOrder[i] != wantOrder[i] {
|
||||
t.Fatalf("non-deterministic ordering: got id order %v, want %v (seq tiebreaker not applied)",
|
||||
gotOrder, wantOrder)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_SinceID_BoundaryRowSameMicrosecondNotSkipped isolates the
|
||||
// cursor-boundary bug: a row written in the SAME microsecond as the cursor
|
||||
// row (but with a higher seq) must still be returned. Pre-fix the strict
|
||||
// `created_at > cursor` filter silently dropped it.
|
||||
func TestIntegration_SinceID_BoundaryRowSameMicrosecondNotSkipped(t *testing.T) {
|
||||
conn := integrationDB_ActivityDelegationA2A(t)
|
||||
_ = conn
|
||||
wsID := seedWorkspace(t, conn, "test-2151-sinceid-boundary")
|
||||
|
||||
tSame := time.Date(2026, 6, 4, 13, 0, 0, 0, time.UTC)
|
||||
// Cursor row and the next row share the exact same created_at; the next
|
||||
// row is inserted afterwards so it gets a higher seq.
|
||||
cursorID := seedActivityRowAt(t, wsID, "boundary-cursor", tSame)
|
||||
nextID := seedActivityRowAt(t, wsID, "boundary-next-same-us", tSame)
|
||||
|
||||
h := NewActivityHandler(nil)
|
||||
c, w := newTestGinContext()
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
q := c.Request.URL.Query()
|
||||
q.Set("since_id", cursorID)
|
||||
q.Set("type", "a2a_receive")
|
||||
q.Set("limit", "10")
|
||||
c.Request.URL.RawQuery = q.Encode()
|
||||
|
||||
h.List(c)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("List returned %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if len(resp) != 1 {
|
||||
t.Fatalf("same-microsecond boundary row dropped: expected exactly the 1 next row, got %d rows: %+v",
|
||||
len(resp), resp)
|
||||
}
|
||||
if got, _ := resp[0]["id"].(string); got != nextID {
|
||||
t.Fatalf("expected boundary row id %s, got %s", nextID, got)
|
||||
}
|
||||
}
|
||||
@@ -26,17 +26,21 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
|
||||
|
||||
cursorID := "act-cursor-42"
|
||||
cursorTime := time.Date(2026, 4, 30, 5, 0, 0, 0, time.UTC)
|
||||
cursorSeq := int64(42)
|
||||
|
||||
// Step 1: cursor lookup — must include workspace_id scope so a UUID
|
||||
// from another workspace can't be used.
|
||||
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
// from another workspace can't be used. Now resolves BOTH ordering-key
|
||||
// components (created_at, seq) so the strictly-after filter can compare
|
||||
// the full tuple.
|
||||
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
WithArgs(cursorID, "ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"created_at", "seq"}).AddRow(cursorTime, cursorSeq))
|
||||
|
||||
// Step 2: main query with the cursor's created_at as a > filter,
|
||||
// ASC ordering. Args: workspace_id, cursorTime, limit.
|
||||
// Step 2: main query with the cursor's (created_at, seq) as a tuple
|
||||
// strictly-after filter, (created_at, seq) ASC ordering.
|
||||
// Args: workspace_id, cursorTime, cursorSeq, limit.
|
||||
mock.ExpectQuery("SELECT id, workspace_id, activity_type").
|
||||
WithArgs("ws-1", cursorTime, 100).
|
||||
WithArgs("ws-1", cursorTime, cursorSeq, 100).
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
@@ -64,7 +68,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
|
||||
func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
WithArgs("act-gone", "ws-1").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
@@ -96,7 +100,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
|
||||
|
||||
// Cursor exists in DB but the WHERE workspace_id = $2 filter excludes
|
||||
// it — sqlmock returns no rows, which is what Postgres would do.
|
||||
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
WithArgs("act-other-ws", "ws-1").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
@@ -120,20 +124,23 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
|
||||
|
||||
// TestActivityHandler_SinceID_CombinedWithSinceSecs: both filters apply
|
||||
// together (AND). Argument order in the main query: workspace_id,
|
||||
// since_secs, cursorTime, limit. Sanity-checks the placeholder index
|
||||
// arithmetic in the query builder.
|
||||
// since_secs, cursorTime, cursorSeq, limit. Sanity-checks the placeholder
|
||||
// index arithmetic in the query builder (the cursor now binds TWO args —
|
||||
// the (created_at, seq) tuple — so since_secs no longer shifts the tail by
|
||||
// one but by two).
|
||||
func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
cursorID := "act-c"
|
||||
cursorTime := time.Date(2026, 4, 30, 4, 0, 0, 0, time.UTC)
|
||||
cursorSeq := int64(7)
|
||||
|
||||
mock.ExpectQuery(`SELECT created_at FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
mock.ExpectQuery(`SELECT created_at, seq FROM activity_logs WHERE id = \$1 AND workspace_id = \$2`).
|
||||
WithArgs(cursorID, "ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(cursorTime))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"created_at", "seq"}).AddRow(cursorTime, cursorSeq))
|
||||
|
||||
mock.ExpectQuery("SELECT id, workspace_id, activity_type").
|
||||
WithArgs("ws-1", 600, cursorTime, 100).
|
||||
WithArgs("ws-1", 600, cursorTime, cursorSeq, 100).
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
-- Rollback for 20260604000000_activity_logs_seq.up.sql.
|
||||
-- Drops the feed-ordering index and the monotonic seq column.
|
||||
-- Run manually by an operator via psql; the boot-time runner never applies
|
||||
-- *.down.sql (see RunMigrations in internal/db/postgres.go, issue #211).
|
||||
|
||||
DROP INDEX IF EXISTS idx_activity_ws_created_seq;
|
||||
|
||||
ALTER TABLE activity_logs
|
||||
DROP COLUMN IF EXISTS seq;
|
||||
@@ -0,0 +1,54 @@
|
||||
-- Add a monotonic `seq` tiebreaker to activity_logs to make the poll-mode
|
||||
-- since_id activity feed (#2339) deterministically ordered.
|
||||
--
|
||||
-- ROOT CAUSE this fixes: the feed orders by created_at ASC/DESC with NO
|
||||
-- tiebreaker, and activity_logs.id is a random gen_random_uuid() — there is
|
||||
-- no monotonic column to break ties. Two rows inserted in the same
|
||||
-- microsecond (back-to-back A2A logging) share a created_at and come back in
|
||||
-- arbitrary planner order, so the E2E intermittently sees
|
||||
-- hello-from-e2e-3 before hello-from-e2e-2. Not a flake — a missing
|
||||
-- tiebreaker. (Second, related bug fixed in the handler: the since_id cursor
|
||||
-- filtered `created_at > cursor` strictly, silently dropping a row written in
|
||||
-- the same microsecond as the cursor row. The composite key below lets the
|
||||
-- handler compare the full (created_at, seq) tuple.)
|
||||
--
|
||||
-- `seq` is a GENERATED BY DEFAULT AS IDENTITY BIGINT — a UNIQUE,
|
||||
-- monotonic-once-assigned tiebreaker. Precisely (verified on PostgreSQL
|
||||
-- 16.13, the prod version):
|
||||
-- * Backfill: adding the IDENTITY column to a populated table REWRITES the
|
||||
-- table and assigns `seq` to every EXISTING row during the ALTER, in
|
||||
-- PHYSICAL TABLE-SCAN order (NOT NULL — existing rows do get a value).
|
||||
-- That order is not guaranteed to equal historical insertion order.
|
||||
-- * The identity sequence then advances ABOVE max(seq), so every subsequent
|
||||
-- INSERT that omits `seq` gets a fresh value strictly greater than the
|
||||
-- backfilled max — collision-free with the backfilled rows.
|
||||
-- * GENERATED BY DEFAULT (not ALWAYS) so existing INSERTs that don't name
|
||||
-- `seq` keep working and a caller may still override it if ever needed.
|
||||
--
|
||||
-- What `seq` is NOT, and why that's fine:
|
||||
-- * NOT guaranteed gap-free — rolled-back transactions burn sequence values.
|
||||
-- * NOT a strict commit-order guarantee under concurrency — two concurrent
|
||||
-- INSERTs may commit in the opposite order to the `seq` values they drew.
|
||||
-- Neither property is needed. The feed only requires a TOTAL, STABLE
|
||||
-- tiebreaker so that (created_at, seq) is a deterministic order: for any two
|
||||
-- rows it always sorts them the same way and never ties. `seq` being unique
|
||||
-- and non-null on every row delivers exactly that. Same-created_at rows were
|
||||
-- returned in ARBITRARY order before this migration; afterward they have a
|
||||
-- fixed, repeatable order — strictly better, never worse. New traffic is fully
|
||||
-- deterministic; the backfill makes historical rows deterministic too.
|
||||
--
|
||||
-- Idempotent: ADD COLUMN IF NOT EXISTS + CREATE INDEX IF NOT EXISTS so the
|
||||
-- boot-time runner (and the CI migrate-replay step) can re-apply this safely.
|
||||
|
||||
ALTER TABLE activity_logs
|
||||
ADD COLUMN IF NOT EXISTS seq BIGINT GENERATED BY DEFAULT AS IDENTITY;
|
||||
|
||||
-- Composite index supporting the feed query: WHERE workspace_id = $1
|
||||
-- AND created_at <cmp> $t ORDER BY created_at, seq. The (workspace_id,
|
||||
-- created_at, seq) prefix serves both the ASC cursor path and the DESC recent
|
||||
-- path (Postgres reads the same btree backwards for DESC). This is distinct
|
||||
-- from migration 009's idx_activity_ws_type_time (workspace_id, activity_type,
|
||||
-- created_at) — that one is type-prefixed and can't drive a type-agnostic feed
|
||||
-- scan — and from 048's per-peer source_id/target_id indexes.
|
||||
CREATE INDEX IF NOT EXISTS idx_activity_ws_created_seq
|
||||
ON activity_logs (workspace_id, created_at, seq);
|
||||
Reference in New Issue
Block a user