forked from molecule-ai/molecule-core
Merge pull request #417 from Molecule-AI/feat/memory-checkpoint-reconciliation
feat(memory): optimistic-locking via if_match_version on workspace_memory writes
This commit is contained in:
commit
c545e3a276
@ -324,9 +324,9 @@ func TestMemoryHandler_List_Empty(t *testing.T) {
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at FROM workspace_memory").
|
||||
mock.ExpectQuery("SELECT key, value, version, expires_at, updated_at FROM workspace_memory").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "value", "expires_at", "updated_at"}))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "value", "version", "expires_at", "updated_at"}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@ -345,7 +345,7 @@ func TestMemoryHandler_Get_NotFound(t *testing.T) {
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at FROM workspace_memory").
|
||||
mock.ExpectQuery("SELECT key, value, version, expires_at, updated_at FROM workspace_memory").
|
||||
WithArgs("ws-1", "missing-key").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
|
||||
@ -11,9 +11,13 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// MemoryEntry is what GET returns. The Version field enables optimistic-
|
||||
// concurrency on subsequent writes — callers echo it back as
|
||||
// if_match_version to detect concurrent modification.
|
||||
type MemoryEntry struct {
|
||||
Key string `json:"key"`
|
||||
Value json.RawMessage `json:"value"`
|
||||
Version int64 `json:"version"`
|
||||
ExpiresAt *time.Time `json:"expires_at,omitempty"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
@ -27,7 +31,7 @@ func (h *MemoryHandler) List(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
rows, err := db.DB.QueryContext(c.Request.Context(), `
|
||||
SELECT key, value, expires_at, updated_at
|
||||
SELECT key, value, version, expires_at, updated_at
|
||||
FROM workspace_memory
|
||||
WHERE workspace_id = $1 AND (expires_at IS NULL OR expires_at > NOW())
|
||||
ORDER BY key
|
||||
@ -43,7 +47,7 @@ func (h *MemoryHandler) List(c *gin.Context) {
|
||||
for rows.Next() {
|
||||
var entry MemoryEntry
|
||||
var value []byte
|
||||
if err := rows.Scan(&entry.Key, &value, &entry.ExpiresAt, &entry.UpdatedAt); err != nil {
|
||||
if err := rows.Scan(&entry.Key, &value, &entry.Version, &entry.ExpiresAt, &entry.UpdatedAt); err != nil {
|
||||
log.Printf("Memory list scan error: %v", err)
|
||||
continue
|
||||
}
|
||||
@ -62,10 +66,10 @@ func (h *MemoryHandler) Get(c *gin.Context) {
|
||||
var entry MemoryEntry
|
||||
var value []byte
|
||||
err := db.DB.QueryRowContext(c.Request.Context(), `
|
||||
SELECT key, value, expires_at, updated_at
|
||||
SELECT key, value, version, expires_at, updated_at
|
||||
FROM workspace_memory
|
||||
WHERE workspace_id = $1 AND key = $2 AND (expires_at IS NULL OR expires_at > NOW())
|
||||
`, workspaceID, key).Scan(&entry.Key, &value, &entry.ExpiresAt, &entry.UpdatedAt)
|
||||
`, workspaceID, key).Scan(&entry.Key, &value, &entry.Version, &entry.ExpiresAt, &entry.UpdatedAt)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "key not found"})
|
||||
@ -81,7 +85,24 @@ func (h *MemoryHandler) Get(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, entry)
|
||||
}
|
||||
|
||||
// Set handles POST /workspaces/:id/memory
|
||||
// Set handles POST /workspaces/:id/memory with optimistic-locking support.
|
||||
//
|
||||
// Back-compat (no if_match_version): behaves exactly as before — last-
|
||||
// write-wins upsert. Every existing agent tool keeps working unmodified.
|
||||
//
|
||||
// Optimistic-locking (if_match_version set): the write is conditional on
|
||||
// the current row version. On conflict (concurrent writer incremented
|
||||
// version since the caller read), returns 409 with the latest version so
|
||||
// the caller can re-read + retry. This closes the silent-overwrite hole
|
||||
// for orchestrators running concurrent delegation-ledger / task-queue
|
||||
// state in memory.
|
||||
//
|
||||
// Expected call pattern for conflict-free reads:
|
||||
//
|
||||
// 1. GET /memory/:key → {value, version: V}
|
||||
// 2. modify value
|
||||
// 3. POST /memory with {key, value, if_match_version: V}
|
||||
// 4. on 200 → done; on 409 → goto 1.
|
||||
func (h *MemoryHandler) Set(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
@ -89,6 +110,10 @@ func (h *MemoryHandler) Set(c *gin.Context) {
|
||||
Key string `json:"key"`
|
||||
Value json.RawMessage `json:"value"`
|
||||
TTLSeconds *int `json:"ttl_seconds"`
|
||||
// IfMatchVersion, when non-nil, gates the write on the row's
|
||||
// current version matching this value. Mismatch → 409 + latest
|
||||
// version in the response so the caller can retry cleanly.
|
||||
IfMatchVersion *int64 `json:"if_match_version"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
@ -106,19 +131,107 @@ func (h *MemoryHandler) Set(c *gin.Context) {
|
||||
expiresAt = &t
|
||||
}
|
||||
|
||||
_, err := db.DB.ExecContext(c.Request.Context(), `
|
||||
INSERT INTO workspace_memory(id, workspace_id, key, value, expires_at, updated_at)
|
||||
VALUES(gen_random_uuid(), $1, $2, $3::jsonb, $4, NOW())
|
||||
ON CONFLICT(workspace_id, key) DO UPDATE
|
||||
SET value = $3::jsonb, expires_at = $4, updated_at = NOW()
|
||||
`, workspaceID, body.Key, string(body.Value), expiresAt)
|
||||
if err != nil {
|
||||
log.Printf("Memory set error: %v", err)
|
||||
// Path A — no version guard: unchanged last-write-wins upsert.
|
||||
if body.IfMatchVersion == nil {
|
||||
var newVersion int64
|
||||
err := db.DB.QueryRowContext(c.Request.Context(), `
|
||||
INSERT INTO workspace_memory(id, workspace_id, key, value, expires_at, updated_at, version)
|
||||
VALUES(gen_random_uuid(), $1, $2, $3::jsonb, $4, NOW(), 1)
|
||||
ON CONFLICT(workspace_id, key) DO UPDATE
|
||||
SET value = EXCLUDED.value,
|
||||
expires_at = EXCLUDED.expires_at,
|
||||
updated_at = NOW(),
|
||||
version = workspace_memory.version + 1
|
||||
RETURNING version
|
||||
`, workspaceID, body.Key, string(body.Value), expiresAt).Scan(&newVersion)
|
||||
if err != nil {
|
||||
log.Printf("Memory set error: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to set memory"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok", "key": body.Key, "version": newVersion})
|
||||
return
|
||||
}
|
||||
|
||||
// Path B — optimistic-locking guard.
|
||||
//
|
||||
// Strategy:
|
||||
// 1. Try to UPDATE the existing row with version check. RETURNING
|
||||
// the new version tells us whether the guard matched.
|
||||
// 2. If the UPDATE affected zero rows, the row either doesn't exist
|
||||
// (treat if_match_version=0 as "must not exist yet", otherwise
|
||||
// 409) or the version didn't match (409).
|
||||
//
|
||||
// We don't collapse into a single ON CONFLICT because we need the
|
||||
// "caller expected version N, current is M" response shape to be
|
||||
// accurate — ON CONFLICT DO NOTHING would hide whether it was a
|
||||
// version-mismatch or something else.
|
||||
expected := *body.IfMatchVersion
|
||||
var newVersion int64
|
||||
updateErr := db.DB.QueryRowContext(c.Request.Context(), `
|
||||
UPDATE workspace_memory
|
||||
SET value = $3::jsonb,
|
||||
expires_at = $4,
|
||||
updated_at = NOW(),
|
||||
version = version + 1
|
||||
WHERE workspace_id = $1 AND key = $2 AND version = $5
|
||||
RETURNING version
|
||||
`, workspaceID, body.Key, string(body.Value), expiresAt, expected).Scan(&newVersion)
|
||||
|
||||
if updateErr == sql.ErrNoRows {
|
||||
// Either the row doesn't exist yet, or version mismatch. Look
|
||||
// up the actual state so the 409 body carries useful context.
|
||||
var currentVersion sql.NullInt64
|
||||
probeErr := db.DB.QueryRowContext(c.Request.Context(), `
|
||||
SELECT version FROM workspace_memory
|
||||
WHERE workspace_id = $1 AND key = $2
|
||||
`, workspaceID, body.Key).Scan(¤tVersion)
|
||||
|
||||
if probeErr == sql.ErrNoRows {
|
||||
// Row absent. Caller with expected=0 means "create only" —
|
||||
// honour it. Any other expected is a 409 (tried to update a
|
||||
// non-existent key with version assertion).
|
||||
if expected == 0 {
|
||||
var createdVersion int64
|
||||
err := db.DB.QueryRowContext(c.Request.Context(), `
|
||||
INSERT INTO workspace_memory(id, workspace_id, key, value, expires_at, updated_at, version)
|
||||
VALUES(gen_random_uuid(), $1, $2, $3::jsonb, $4, NOW(), 1)
|
||||
RETURNING version
|
||||
`, workspaceID, body.Key, string(body.Value), expiresAt).Scan(&createdVersion)
|
||||
if err != nil {
|
||||
log.Printf("Memory set error (create-only path): %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to set memory"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok", "key": body.Key, "version": createdVersion})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusConflict, gin.H{
|
||||
"error": "if_match_version mismatch: key does not exist",
|
||||
"expected_version": expected,
|
||||
"current_version": nil,
|
||||
})
|
||||
return
|
||||
}
|
||||
if probeErr != nil {
|
||||
log.Printf("Memory set probe error: %v", probeErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to probe current version"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusConflict, gin.H{
|
||||
"error": "if_match_version mismatch",
|
||||
"expected_version": expected,
|
||||
"current_version": currentVersion.Int64,
|
||||
})
|
||||
return
|
||||
}
|
||||
if updateErr != nil {
|
||||
log.Printf("Memory set conditional update error: %v", updateErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to set memory"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok", "key": body.Key})
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok", "key": body.Key, "version": newVersion})
|
||||
}
|
||||
|
||||
// Delete handles DELETE /workspaces/:id/memory/:key
|
||||
|
||||
@ -21,11 +21,11 @@ func TestMemoryList_Success(t *testing.T) {
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
now := time.Now()
|
||||
rows := sqlmock.NewRows([]string{"key", "value", "expires_at", "updated_at"}).
|
||||
AddRow("api-key", []byte(`"sk-123"`), nil, now).
|
||||
AddRow("count", []byte(`42`), nil, now)
|
||||
rows := sqlmock.NewRows([]string{"key", "value", "version", "expires_at", "updated_at"}).
|
||||
AddRow("api-key", []byte(`"sk-123"`), int64(1), nil, now).
|
||||
AddRow("count", []byte(`42`), int64(3), nil, now)
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at").
|
||||
mock.ExpectQuery("SELECT key, value, version, expires_at, updated_at").
|
||||
WithArgs("ws-mem-1").
|
||||
WillReturnRows(rows)
|
||||
|
||||
@ -39,64 +39,34 @@ func TestMemoryList_Success(t *testing.T) {
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []MemoryEntry
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
if len(resp) != 2 {
|
||||
t.Errorf("expected 2 entries, got %d", len(resp))
|
||||
t.Fatalf("expected 2 entries, got %d", len(resp))
|
||||
}
|
||||
if resp[0].Key != "api-key" {
|
||||
t.Errorf("expected key 'api-key', got %q", resp[0].Key)
|
||||
if resp[0].Key != "api-key" || resp[0].Version != 1 {
|
||||
t.Errorf("entry 0: got (%q, v%d), want (api-key, v1)", resp[0].Key, resp[0].Version)
|
||||
}
|
||||
if resp[1].Version != 3 {
|
||||
t.Errorf("entry 1 version: got %d, want 3", resp[1].Version)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryList_Empty(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at").
|
||||
WithArgs("ws-empty").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "value", "expires_at", "updated_at"}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-empty"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-empty/memory", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp []MemoryEntry
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list, got %d entries", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryList_DBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at").
|
||||
WithArgs("ws-dberr").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
mock.ExpectQuery("SELECT key, value, version").WithArgs("ws-dberr").WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-dberr"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-dberr/memory", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
@ -112,29 +82,26 @@ func TestMemoryGet_Success(t *testing.T) {
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
now := time.Now()
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at").
|
||||
mock.ExpectQuery("SELECT key, value, version").
|
||||
WithArgs("ws-get", "api-key").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "value", "expires_at", "updated_at"}).
|
||||
AddRow("api-key", []byte(`"sk-123"`), nil, now))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "value", "version", "expires_at", "updated_at"}).
|
||||
AddRow("api-key", []byte(`"sk-123"`), int64(5), nil, now))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "id", Value: "ws-get"},
|
||||
{Key: "key", Value: "api-key"},
|
||||
{Key: "id", Value: "ws-get"}, {Key: "key", Value: "api-key"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-get/memory/api-key", nil)
|
||||
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp MemoryEntry
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp.Key != "api-key" {
|
||||
t.Errorf("expected key 'api-key', got %q", resp.Key)
|
||||
if resp.Version != 5 {
|
||||
t.Errorf("expected version 5, got %d", resp.Version)
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,59 +109,30 @@ func TestMemoryGet_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at").
|
||||
WithArgs("ws-nf", "missing-key").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery("SELECT key, value, version").
|
||||
WithArgs("ws-nf", "missing").WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "id", Value: "ws-nf"},
|
||||
{Key: "key", Value: "missing-key"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-nf/memory/missing-key", nil)
|
||||
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-nf"}, {Key: "key", Value: "missing"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-nf/memory/missing", nil)
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryGet_DBError(t *testing.T) {
|
||||
// ==================== POST /workspaces/:id/memory (Set — no version) ====================
|
||||
|
||||
func TestMemorySet_NoVersion_CreateOrOverwrite(t *testing.T) {
|
||||
// Back-compat path: no if_match_version — last-write-wins upsert.
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("SELECT key, value, expires_at, updated_at").
|
||||
WithArgs("ws-err", "key").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "id", Value: "ws-err"},
|
||||
{Key: "key", Value: "key"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-err/memory/key", nil)
|
||||
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== POST /workspaces/:id/memory (Set) ====================
|
||||
|
||||
func TestMemorySet_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspace_memory").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("INSERT INTO workspace_memory").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"version"}).AddRow(int64(7)))
|
||||
|
||||
body := `{"key":"counter","value":42}`
|
||||
w := httptest.NewRecorder()
|
||||
@ -202,43 +140,15 @@ func TestMemorySet_Success(t *testing.T) {
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-set"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-set/memory", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["key"] != "counter" {
|
||||
t.Errorf("expected key 'counter', got %v", resp["key"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemorySet_WithTTL(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspace_memory").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
body := `{"key":"temp","value":"ephemeral","ttl_seconds":3600}`
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-ttl"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-ttl/memory", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
if v, ok := resp["version"].(float64); !ok || int64(v) != 7 {
|
||||
t.Errorf("response should include version=7, got %v", resp["version"])
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,57 +157,136 @@ func TestMemorySet_MissingKey(t *testing.T) {
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
body := `{"value":"no-key"}`
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-nokey"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-nokey/memory", bytes.NewBufferString(body))
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-nokey/memory", bytes.NewBufferString(`{"value":"no-key"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemorySet_InvalidJSON(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
// ==================== POST /workspaces/:id/memory (Set — with if_match_version) ====================
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-bad"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-bad/memory", bytes.NewBufferString("not json"))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemorySet_DBError(t *testing.T) {
|
||||
func TestMemorySet_IfMatchVersion_Match_Updates(t *testing.T) {
|
||||
// Optimistic-lock happy path: client's expected version matches current.
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspace_memory").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
mock.ExpectQuery("UPDATE workspace_memory").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"version"}).AddRow(int64(6)))
|
||||
|
||||
body := `{"key":"fail","value":"oops"}`
|
||||
body := `{"key":"queue","value":[1,2,3],"if_match_version":5}`
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-set-err"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-set-err/memory", bytes.NewBufferString(body))
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-lock"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-lock/memory", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if int64(resp["version"].(float64)) != 6 {
|
||||
t.Errorf("version should advance to 6, got %v", resp["version"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemorySet_IfMatchVersion_Mismatch_Returns409(t *testing.T) {
|
||||
// Concurrent writer incremented version while we held v=5. Caller
|
||||
// gets 409 + the current version so they can re-read + retry.
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("UPDATE workspace_memory").WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery("SELECT version FROM workspace_memory").
|
||||
WithArgs("ws-conflict", "queue").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"version"}).AddRow(int64(8)))
|
||||
|
||||
body := `{"key":"queue","value":[1,2,3],"if_match_version":5}`
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-conflict"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-conflict/memory", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusConflict {
|
||||
t.Errorf("expected 409, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if int64(resp["expected_version"].(float64)) != 5 {
|
||||
t.Errorf("expected_version in 409 body should be 5, got %v", resp["expected_version"])
|
||||
}
|
||||
if int64(resp["current_version"].(float64)) != 8 {
|
||||
t.Errorf("current_version in 409 body should be 8, got %v", resp["current_version"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemorySet_IfMatchVersion_CreateOnly_OnAbsentKey(t *testing.T) {
|
||||
// if_match_version=0 is the "create-only" marker: succeed iff the
|
||||
// key doesn't exist yet. Use case: two agents simultaneously try to
|
||||
// seed a shared key — only one should win.
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
// UPDATE matches no row (key doesn't exist).
|
||||
mock.ExpectQuery("UPDATE workspace_memory").WillReturnError(sql.ErrNoRows)
|
||||
// Probe: still no row.
|
||||
mock.ExpectQuery("SELECT version FROM workspace_memory").
|
||||
WithArgs("ws-create", "new-key").WillReturnError(sql.ErrNoRows)
|
||||
// Create path succeeds.
|
||||
mock.ExpectQuery("INSERT INTO workspace_memory").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"version"}).AddRow(int64(1)))
|
||||
|
||||
body := `{"key":"new-key","value":"hello","if_match_version":0}`
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-create"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-create/memory", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if int64(resp["version"].(float64)) != 1 {
|
||||
t.Errorf("new row should have version 1, got %v", resp["version"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemorySet_IfMatchVersion_NonZero_OnAbsentKey_Returns409(t *testing.T) {
|
||||
// Caller asserted version=3 on a key that doesn't exist. 409 —
|
||||
// caller's mental model is wrong, retry after re-reading.
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectQuery("UPDATE workspace_memory").WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery("SELECT version FROM workspace_memory").
|
||||
WithArgs("ws-ghost", "ghost").WillReturnError(sql.ErrNoRows)
|
||||
|
||||
body := `{"key":"ghost","value":"?","if_match_version":3}`
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-ghost"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-ghost/memory", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
handler.Set(c)
|
||||
|
||||
if w.Code != http.StatusConflict {
|
||||
t.Errorf("expected 409, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
@ -307,52 +296,17 @@ func TestMemoryDelete_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectExec("DELETE FROM workspace_memory").
|
||||
WithArgs("ws-del", "old-key").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "id", Value: "ws-del"},
|
||||
{Key: "key", Value: "old-key"},
|
||||
}
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-del"}, {Key: "key", Value: "old-key"}}
|
||||
c.Request = httptest.NewRequest("DELETE", "/workspaces/ws-del/memory/old-key", nil)
|
||||
|
||||
handler.Delete(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["status"] != "deleted" {
|
||||
t.Errorf("expected status 'deleted', got %v", resp["status"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryDelete_DBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewMemoryHandler()
|
||||
|
||||
mock.ExpectExec("DELETE FROM workspace_memory").
|
||||
WithArgs("ws-del-err", "key").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "id", Value: "ws-del-err"},
|
||||
{Key: "key", Value: "key"},
|
||||
}
|
||||
c.Request = httptest.NewRequest("DELETE", "/workspaces/ws-del-err/memory/key", nil)
|
||||
|
||||
handler.Delete(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d", w.Code)
|
||||
t.Errorf("expected 200, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1 @@
|
||||
ALTER TABLE workspace_memory DROP COLUMN IF EXISTS version;
|
||||
25
platform/migrations/023_workspace_memory_version.up.sql
Normal file
25
platform/migrations/023_workspace_memory_version.up.sql
Normal file
@ -0,0 +1,25 @@
|
||||
-- Optimistic-locking version column for workspace_memory.
|
||||
--
|
||||
-- Purpose: two agents can race a read → modify → write against the same
|
||||
-- (workspace_id, key) pair. Current INSERT ... ON CONFLICT UPDATE has
|
||||
-- last-writer-wins semantics — the first writer's work is silently
|
||||
-- overwritten. This matters for orchestrators (PM, Dev Lead) that keep
|
||||
-- structured running state in memory (task queues, delegation-result
|
||||
-- ledgers) and for the `research-backlog:*` keys that multiple idle
|
||||
-- loops can touch concurrently.
|
||||
--
|
||||
-- The version column advances on every successful write. The memory
|
||||
-- handler accepts an optional `if_match_version` on write; when set,
|
||||
-- the UPDATE is guarded by `WHERE version = $expected` and returns 409
|
||||
-- Conflict on mismatch so the caller can re-read + retry. When absent,
|
||||
-- behaviour is unchanged from pre-migration (last-write-wins), so every
|
||||
-- existing agent tool keeps working without modification.
|
||||
--
|
||||
-- Baseline: existing rows start at version 1. New rows default to 1.
|
||||
ALTER TABLE workspace_memory
|
||||
ADD COLUMN version BIGINT NOT NULL DEFAULT 1;
|
||||
|
||||
COMMENT ON COLUMN workspace_memory.version IS
|
||||
'Monotonic revision counter. Incremented on every successful write. '
|
||||
'Clients doing read-modify-write loops pass this value as if_match_version '
|
||||
'on the next write to get 409 on conflict instead of silent last-write-wins.';
|
||||
Loading…
Reference in New Issue
Block a user