Files
Molecule AI Dev Engineer A (Kimi) 7f93aaea21
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 5s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 9s
CI / Detect changes (pull_request) Successful in 11s
E2E API Smoke Test / detect-changes (pull_request) Successful in 10s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 10s
E2E Chat / detect-changes (pull_request) Successful in 12s
Harness Replays / detect-changes (pull_request) Successful in 5s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 10s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 10s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 12s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m22s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m12s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 7s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 14s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m30s
gate-check-v3 / gate-check (pull_request) Successful in 15s
qa-review / approved (pull_request) Successful in 9s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m15s
security-review / approved (pull_request) Failing after 4s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 4s
sop-checklist / review-refire (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request) Successful in 4s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m8s
CI / Canvas (Next.js) (pull_request) Successful in 3s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m37s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 8s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 3s
Harness Replays / Harness Replays (pull_request) Successful in 9s
E2E Chat / E2E Chat (pull_request) Successful in 10s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m46s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m9s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Successful in 5m29s
CI / all-required (pull_request) Successful in 8m24s
audit-force-merge / audit (pull_request) Successful in 6s
fix(checkpoints): handle RowsAffected error in Delete handler
Previously the error from result.RowsAffected() was discarded, so a
driver error after DELETE would incorrectly return 404 instead of 500.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 13:03:03 +00:00

245 lines
8.0 KiB
Go

package handlers
import (
"database/sql"
"encoding/json"
"log"
"net/http"
"github.com/gin-gonic/gin"
)
// CheckpointsHandler persists Temporal workflow step checkpoints so workflows
// can resume from the last completed step after a crash or restart (#788).
type CheckpointsHandler struct {
db *sql.DB
}
// NewCheckpointsHandler wires the handler to the given database. Pass db.DB
// at router-setup time; pass a sqlmock DB in tests.
func NewCheckpointsHandler(database *sql.DB) *CheckpointsHandler {
return &CheckpointsHandler{db: database}
}
// checkpointEntry is the canonical shape returned by List.
type checkpointEntry struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
WorkflowID string `json:"workflow_id"`
StepName string `json:"step_name"`
StepIndex int `json:"step_index"`
CompletedAt string `json:"completed_at"`
Payload json.RawMessage `json:"payload,omitempty"`
}
// callerMismatch guards against cross-workspace access in unit-test and
// middleware-injected scenarios. When the Gin context carries a
// "caller_workspace_id" key (set by middleware or a test), the value must
// match the URL :id param; otherwise the handler aborts with 403.
//
// In production the WorkspaceAuth middleware already validates that the
// bearer token belongs to :id (401 on mismatch), so this key is typically
// absent and the check is a no-op. The key exists so that future
// middleware layers and unit tests can exercise workspace-isolation logic
// at the handler level without modifying WorkspaceAuth.
func callerMismatch(c *gin.Context, workspaceID string) bool {
if caller := c.GetString("caller_workspace_id"); caller != "" && caller != workspaceID {
c.JSON(http.StatusForbidden, gin.H{"error": "workspace access denied"})
return true
}
return false
}
// Upsert handles POST /workspaces/:id/checkpoints
//
// Body: { "workflow_id", "step_name", "step_index", "payload"? }
//
// On first call for a (workspace_id, workflow_id, step_name) triple: INSERT.
// On repeat call: UPDATE step_index + completed_at + payload in-place.
// Returns 201 with the checkpoint id on success.
func (h *CheckpointsHandler) Upsert(c *gin.Context) {
workspaceID := c.Param("id")
if callerMismatch(c, workspaceID) {
return
}
ctx := c.Request.Context()
var body struct {
WorkflowID string `json:"workflow_id" binding:"required"`
StepName string `json:"step_name" binding:"required"`
StepIndex int `json:"step_index"`
Payload json.RawMessage `json:"payload"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
// Normalise payload: a missing or zero-length field is stored as JSON null.
payloadStr := "null"
if len(body.Payload) > 0 {
payloadStr = string(body.Payload)
}
var id string
err := h.db.QueryRowContext(ctx, `
INSERT INTO workflow_checkpoints
(workspace_id, workflow_id, step_name, step_index, payload)
VALUES ($1, $2, $3, $4, $5::jsonb)
ON CONFLICT (workspace_id, workflow_id, step_name) DO UPDATE
SET step_index = EXCLUDED.step_index,
completed_at = now(),
payload = EXCLUDED.payload
RETURNING id
`, workspaceID, body.WorkflowID, body.StepName, body.StepIndex, payloadStr).Scan(&id)
if err != nil {
log.Printf("Upsert checkpoint error workspace=%s wf=%s step=%s: %v",
workspaceID, body.WorkflowID, body.StepName, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to upsert checkpoint"})
return
}
c.JSON(http.StatusCreated, gin.H{
"id": id,
"workspace_id": workspaceID,
"workflow_id": body.WorkflowID,
"step_name": body.StepName,
})
}
// List handles GET /workspaces/:id/checkpoints/:wfid
//
// Returns all checkpoints for the given workflow ordered by step_index DESC
// so the most recently completed step is first.
// Returns 404 when no checkpoints exist for that workflow.
func (h *CheckpointsHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
if callerMismatch(c, workspaceID) {
return
}
workflowID := c.Param("wfid")
ctx := c.Request.Context()
rows, err := h.db.QueryContext(ctx, `
SELECT id, workspace_id, workflow_id, step_name, step_index, completed_at, payload
FROM workflow_checkpoints
WHERE workspace_id = $1 AND workflow_id = $2
ORDER BY step_index DESC
`, workspaceID, workflowID)
if err != nil {
log.Printf("List checkpoints error workspace=%s wf=%s: %v", workspaceID, workflowID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list checkpoints"})
return
}
defer rows.Close()
checkpoints := make([]checkpointEntry, 0)
for rows.Next() {
var e checkpointEntry
var payload []byte
if err := rows.Scan(
&e.ID, &e.WorkspaceID, &e.WorkflowID,
&e.StepName, &e.StepIndex, &e.CompletedAt, &payload,
); err != nil {
log.Printf("List checkpoints scan error workspace=%s wf=%s: %v", workspaceID, workflowID, err)
continue
}
if len(payload) > 0 {
e.Payload = json.RawMessage(payload)
}
checkpoints = append(checkpoints, e)
}
if err := rows.Err(); err != nil {
log.Printf("List checkpoints rows.Err workspace=%s wf=%s: %v", workspaceID, workflowID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "checkpoint read failed"})
return
}
if len(checkpoints) == 0 {
c.JSON(http.StatusNotFound, gin.H{"error": "no checkpoints found for workflow"})
return
}
c.JSON(http.StatusOK, checkpoints)
}
// Latest handles GET /workspaces/:id/checkpoints/latest
//
// Returns the single most recently completed checkpoint across all workflows
// for this workspace — ordered by completed_at DESC. The workspace-template
// Temporal resume path calls this on startup to inject the last known step
// into the agent context (issue #837 step 3/3, closes #583).
//
// 200 — checkpoint found; body is a single checkpointEntry JSON object.
// 404 — no checkpoints exist yet for this workspace.
func (h *CheckpointsHandler) Latest(c *gin.Context) {
workspaceID := c.Param("id")
if callerMismatch(c, workspaceID) {
return
}
ctx := c.Request.Context()
var e checkpointEntry
var payload []byte
err := h.db.QueryRowContext(ctx, `
SELECT id, workspace_id, workflow_id, step_name, step_index, completed_at, payload
FROM workflow_checkpoints
WHERE workspace_id = $1
ORDER BY completed_at DESC
LIMIT 1
`, workspaceID).Scan(
&e.ID, &e.WorkspaceID, &e.WorkflowID,
&e.StepName, &e.StepIndex, &e.CompletedAt, &payload,
)
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "no checkpoints found for workspace"})
return
}
if err != nil {
log.Printf("Latest checkpoint error workspace=%s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch latest checkpoint"})
return
}
if len(payload) > 0 {
e.Payload = json.RawMessage(payload)
}
c.JSON(http.StatusOK, e)
}
// Delete handles DELETE /workspaces/:id/checkpoints/:wfid
//
// Removes all checkpoints for a workflow (clean shutdown path).
// Returns 404 if no checkpoints existed.
func (h *CheckpointsHandler) Delete(c *gin.Context) {
workspaceID := c.Param("id")
if callerMismatch(c, workspaceID) {
return
}
workflowID := c.Param("wfid")
ctx := c.Request.Context()
result, err := h.db.ExecContext(ctx, `
DELETE FROM workflow_checkpoints
WHERE workspace_id = $1 AND workflow_id = $2
`, workspaceID, workflowID)
if err != nil {
log.Printf("Delete checkpoints error workspace=%s wf=%s: %v", workspaceID, workflowID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete checkpoints"})
return
}
n, err := result.RowsAffected()
if err != nil {
log.Printf("Delete checkpoints RowsAffected error workspace=%s wf=%s: %v", workspaceID, workflowID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete checkpoints"})
return
}
if n == 0 {
c.JSON(http.StatusNotFound, gin.H{"error": "no checkpoints found for workflow"})
return
}
c.JSON(http.StatusOK, gin.H{"deleted": n, "workflow_id": workflowID})
}