Merge pull request #76 from Molecule-AI/fix/issue-24-schedules-db-authoritative
fix(org): DB-authoritative schedules; org/import is additive on template rows (#24)
This commit is contained in:
commit
3ddd0cffbf
@ -30,6 +30,28 @@ import (
|
||||
// during org import. Prevents overwhelming Docker when creating many containers.
|
||||
const workspaceCreatePacingMs = 50
|
||||
|
||||
// orgImportScheduleSQL is the upsert executed for every schedule during
|
||||
// org/import. Extracted to a const so TestImport_OrgScheduleSQLShape can
|
||||
// assert its shape without regex-scanning org.go (issue #24 follow-up).
|
||||
//
|
||||
// Guarantees, in one statement:
|
||||
// - INSERT new rows with source='template'
|
||||
// - On (workspace_id, name) collision, only refresh template-source rows
|
||||
// (runtime-added schedules are preserved across re-imports)
|
||||
// - No DELETE — removal is out of scope (additive semantics)
|
||||
const orgImportScheduleSQL = `
|
||||
INSERT INTO workspace_schedules (workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, 'template')
|
||||
ON CONFLICT (workspace_id, name) DO UPDATE
|
||||
SET cron_expr = EXCLUDED.cron_expr,
|
||||
timezone = EXCLUDED.timezone,
|
||||
prompt = EXCLUDED.prompt,
|
||||
enabled = EXCLUDED.enabled,
|
||||
next_run_at = EXCLUDED.next_run_at,
|
||||
updated_at = now()
|
||||
WHERE workspace_schedules.source = 'template'
|
||||
`
|
||||
|
||||
type OrgHandler struct {
|
||||
workspace *WorkspaceHandler
|
||||
broadcaster *events.Broadcaster
|
||||
@ -474,13 +496,11 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, defa
|
||||
enabled = *sched.Enabled
|
||||
}
|
||||
nextRun, _ := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now())
|
||||
if _, err := db.DB.ExecContext(context.Background(), `
|
||||
INSERT INTO workspace_schedules (workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
`, id, sched.Name, sched.CronExpr, tz, sched.Prompt, enabled, nextRun); err != nil {
|
||||
log.Printf("Org import: failed to create schedule '%s' for %s: %v", sched.Name, ws.Name, err)
|
||||
if _, err := db.DB.ExecContext(context.Background(), orgImportScheduleSQL,
|
||||
id, sched.Name, sched.CronExpr, tz, sched.Prompt, enabled, nextRun); err != nil {
|
||||
log.Printf("Org import: failed to upsert schedule '%s' for %s: %v", sched.Name, ws.Name, err)
|
||||
} else {
|
||||
log.Printf("Org import: schedule '%s' (%s) created for %s", sched.Name, sched.CronExpr, ws.Name)
|
||||
log.Printf("Org import: schedule '%s' (%s) upserted for %s (source=template)", sched.Name, sched.CronExpr, ws.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ type scheduleResponse struct {
|
||||
RunCount int `json:"run_count"`
|
||||
LastStatus string `json:"last_status"`
|
||||
LastError string `json:"last_error"`
|
||||
Source string `json:"source,omitempty"` // 'template' (seeded by org/import) | 'runtime' (created via Canvas/API). Issue #24.
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
@ -44,7 +45,7 @@ func (h *ScheduleHandler) List(c *gin.Context) {
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, workspace_id, name, cron_expr, timezone, prompt, enabled,
|
||||
last_run_at, next_run_at, run_count, last_status, last_error,
|
||||
created_at, updated_at
|
||||
source, created_at, updated_at
|
||||
FROM workspace_schedules
|
||||
WHERE workspace_id = $1
|
||||
ORDER BY created_at ASC
|
||||
@ -61,7 +62,7 @@ func (h *ScheduleHandler) List(c *gin.Context) {
|
||||
if err := rows.Scan(
|
||||
&s.ID, &s.WorkspaceID, &s.Name, &s.CronExpr, &s.Timezone,
|
||||
&s.Prompt, &s.Enabled, &s.LastRunAt, &s.NextRunAt, &s.RunCount,
|
||||
&s.LastStatus, &s.LastError, &s.CreatedAt, &s.UpdatedAt,
|
||||
&s.LastStatus, &s.LastError, &s.Source, &s.CreatedAt, &s.UpdatedAt,
|
||||
); err != nil {
|
||||
log.Printf("Schedules.List: scan error: %v", err)
|
||||
continue
|
||||
@ -117,9 +118,12 @@ func (h *ScheduleHandler) Create(c *gin.Context) {
|
||||
}
|
||||
|
||||
var id string
|
||||
// source='runtime' marks this row as user-created (Canvas/API). The
|
||||
// org/import path inserts with source='template' and only refreshes
|
||||
// template-source rows on re-import (issue #24), so runtime rows survive.
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
INSERT INTO workspace_schedules (workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
INSERT INTO workspace_schedules (workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, 'runtime')
|
||||
RETURNING id
|
||||
`, workspaceID, body.Name, body.CronExpr, body.Timezone, body.Prompt, enabled, nextRun).Scan(&id)
|
||||
if err != nil {
|
||||
|
||||
126
platform/internal/handlers/schedules_test.go
Normal file
126
platform/internal/handlers/schedules_test.go
Normal file
@ -0,0 +1,126 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Issue #24 — DB is the source of truth; org/import is additive on
|
||||
// template-source rows only. Runtime-added schedules survive re-imports.
|
||||
|
||||
// TestRuntimeSchedule_HasSourceRuntime asserts that POST /workspaces/:id/schedules
|
||||
// writes source='runtime' so that re-imports of the org template never touch
|
||||
// these user-created rows (preserved across re-imports).
|
||||
func TestRuntimeSchedule_HasSourceRuntime(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewScheduleHandler()
|
||||
|
||||
// Match the literal 'runtime' source baked into the INSERT and capture
|
||||
// the workspace id arg. The inserted row id is returned via RETURNING.
|
||||
mock.ExpectQuery("INSERT INTO workspace_schedules .* VALUES .* 'runtime'").
|
||||
WithArgs("550e8400-e29b-41d4-a716-446655440000", "test", "*/5 * * * *", "UTC", "do thing", true, sqlmock.AnyArg()).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("11111111-1111-1111-1111-111111111111"))
|
||||
|
||||
body := []byte(`{"name":"test","cron_expr":"*/5 * * * *","prompt":"do thing"}`)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/550e8400-e29b-41d4-a716-446655440000/schedules", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestImport_OrgScheduleSQLShape verifies the SQL emitted by the org/import
|
||||
// path for schedules. It MUST be an INSERT ... ON CONFLICT (workspace_id, name)
|
||||
// DO UPDATE ... WHERE source='template' with VALUES ... 'template'. Together
|
||||
// these guarantee that re-import is:
|
||||
// - additive (new template rows are inserted),
|
||||
// - idempotent (existing template rows are refreshed),
|
||||
// - non-destructive of runtime rows (the WHERE filter skips them),
|
||||
// - never DELETE-based (additive only).
|
||||
//
|
||||
// This is a structural assertion against the source — cheap and catches a
|
||||
// regression that would silently break user-created schedules across
|
||||
// re-imports without needing a full provisioner harness.
|
||||
func TestImport_OrgScheduleSQLShape(t *testing.T) {
|
||||
got := orgImportScheduleSQL
|
||||
|
||||
// Single test covers four CEO requirements at once: additive seed
|
||||
// (template marker), idempotent refresh (ON CONFLICT DO UPDATE),
|
||||
// runtime-row preservation (WHERE source='template'), and never-DELETE.
|
||||
mustContain := []string{
|
||||
"INSERT INTO workspace_schedules",
|
||||
"source",
|
||||
"'template'",
|
||||
"ON CONFLICT (workspace_id, name) DO UPDATE",
|
||||
"WHERE workspace_schedules.source = 'template'",
|
||||
}
|
||||
for _, s := range mustContain {
|
||||
if !strings.Contains(got, s) {
|
||||
t.Errorf("org/import schedule SQL missing fragment %q\n--- SQL ---\n%s", s, got)
|
||||
}
|
||||
}
|
||||
if regexp.MustCompile(`(?i)\bDELETE\b\s+FROM\s+workspace_schedules`).MatchString(got) {
|
||||
t.Error("org/import schedule SQL must never DELETE — additive only")
|
||||
}
|
||||
}
|
||||
|
||||
// TestList_IncludesSourceColumn asserts GET /workspaces/:id/schedules
|
||||
// returns the source field so Canvas can render template/runtime badges.
|
||||
func TestList_IncludesSourceColumn(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewScheduleHandler()
|
||||
|
||||
cols := []string{
|
||||
"id", "workspace_id", "name", "cron_expr", "timezone", "prompt", "enabled",
|
||||
"last_run_at", "next_run_at", "run_count", "last_status", "last_error",
|
||||
"source", "created_at", "updated_at",
|
||||
}
|
||||
now := time.Now()
|
||||
mock.ExpectQuery("SELECT .* source, created_at, updated_at\\s+FROM workspace_schedules").
|
||||
WithArgs("550e8400-e29b-41d4-a716-446655440000").
|
||||
WillReturnRows(sqlmock.NewRows(cols).
|
||||
AddRow("id1", "550e8400-e29b-41d4-a716-446655440000", "tmpl-sched", "0 * * * *", "UTC", "p", true,
|
||||
nil, nil, 0, "", "", "template", now, now).
|
||||
AddRow("id2", "550e8400-e29b-41d4-a716-446655440000", "user-sched", "*/5 * * * *", "UTC", "p2", true,
|
||||
nil, nil, 0, "", "", "runtime", now, now))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/550e8400-e29b-41d4-a716-446655440000/schedules", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
body := w.Body.String()
|
||||
if !strings.Contains(body, `"source":"template"`) {
|
||||
t.Errorf(`response missing "source":"template": %s`, body)
|
||||
}
|
||||
if !strings.Contains(body, `"source":"runtime"`) {
|
||||
t.Errorf(`response missing "source":"runtime": %s`, body)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,3 @@
|
||||
DROP INDEX IF EXISTS idx_schedules_workspace_name;
|
||||
ALTER TABLE workspace_schedules DROP CONSTRAINT IF EXISTS workspace_schedules_source_check;
|
||||
ALTER TABLE workspace_schedules DROP COLUMN IF EXISTS source;
|
||||
40
platform/migrations/022_workspace_schedules_source.up.sql
Normal file
40
platform/migrations/022_workspace_schedules_source.up.sql
Normal file
@ -0,0 +1,40 @@
|
||||
-- Add `source` column to workspace_schedules so the org-import path can
|
||||
-- distinguish rows it owns ('template') from rows created via the runtime
|
||||
-- API/Canvas ('runtime'). DB is the source of truth; org/import is now
|
||||
-- additive — it only INSERTs missing template rows and only UPDATEs rows
|
||||
-- where source = 'template'. Runtime-added schedules survive re-imports.
|
||||
--
|
||||
-- Legacy-row policy: every row predating this migration is backfilled to
|
||||
-- 'template'. Rationale — before this migration the only way to get a row
|
||||
-- into workspace_schedules at scale was org/import (Canvas UI for schedules
|
||||
-- was minimal); defaulting legacy rows to 'template' preserves the
|
||||
-- idempotent-refresh path on re-import. Users who had runtime-created
|
||||
-- schedules can reclassify them via UPDATE post-deployment.
|
||||
|
||||
ALTER TABLE workspace_schedules
|
||||
ADD COLUMN IF NOT EXISTS source TEXT;
|
||||
|
||||
UPDATE workspace_schedules SET source = 'template' WHERE source IS NULL;
|
||||
|
||||
ALTER TABLE workspace_schedules ALTER COLUMN source SET NOT NULL;
|
||||
ALTER TABLE workspace_schedules ALTER COLUMN source SET DEFAULT 'runtime';
|
||||
|
||||
-- idempotent constraint add (Postgres lacks IF NOT EXISTS on ADD CONSTRAINT pre-15)
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint WHERE conname = 'workspace_schedules_source_check'
|
||||
) THEN
|
||||
ALTER TABLE workspace_schedules
|
||||
ADD CONSTRAINT workspace_schedules_source_check
|
||||
CHECK (source IN ('template', 'runtime'));
|
||||
END IF;
|
||||
END$$;
|
||||
|
||||
COMMENT ON COLUMN workspace_schedules.source IS
|
||||
'template = seeded by org/import (refreshable); runtime = created via Canvas/API (preserved across re-imports)';
|
||||
|
||||
-- Required so org-import can use ON CONFLICT (workspace_id, name) DO UPDATE.
|
||||
-- Schedules within a single workspace are uniquely identified by name.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_schedules_workspace_name
|
||||
ON workspace_schedules(workspace_id, name);
|
||||
Loading…
Reference in New Issue
Block a user