fix(approvals,delegation,restart,scheduler): check ExecContext errors sweep #2 #2041

Closed
core-be wants to merge 2 commits from fix/execcontext-err-check-sweep2 into staging
4 changed files with 20 additions and 19 deletions
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
@@ -80,10 +80,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
ctx := c.Request.Context()
// Auto-expire stale approvals (older than 10 min)
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
`)
`); err != nil {
log.Printf("approvals auto-expire error: %v", err)
}
rows, err := db.DB.QueryContext(ctx, `
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
@@ -578,8 +578,10 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
h.stopForRestart(ctx, workspaceID)
db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusProvisioning, workspaceID)
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusProvisioning, workspaceID); err != nil {
log.Printf("Auto-restart: failed to set provisioning status for %s: %v", workspaceID, err)
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), workspaceID, map[string]interface{}{
"name": wsName, "tier": tier, "runtime": dbRuntime,
})
@@ -471,11 +471,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
} else if lastStatus == "ok" {
// Non-empty success — reset the counter
resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(resetCtx, `
if _, err := db.DB.ExecContext(resetCtx, `
UPDATE workspace_schedules
SET consecutive_empty_runs = 0,
updated_at = now()
WHERE id = $1`, sched.ID)
WHERE id = $1`, sched.ID); err != nil {
log.Printf("Scheduler: failed to reset empty-run counter for %s: %v", sched.ID, err)
}
resetCancel()
}
@@ -591,7 +593,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
// #2026: bounded Background() context so the bookkeeping can't block
// on a stuck DB and stall the scheduler.
skipUpdCtx, skipUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(skipUpdCtx, `
if _, err := db.DB.ExecContext(skipUpdCtx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = COALESCE($2, next_run_at),
@@ -600,7 +602,9 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
last_error = $3,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, sanitizeUTF8(reason))
`, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: failed to record skipped run for %s: %v", sched.ID, err)
}
skipUpdCancel()
cronMeta, _ := json.Marshal(map[string]interface{}{
@@ -613,10 +617,12 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
// #2026: bounded Background() context on the skipped activity log INSERT
// for the same reason as the fireSchedule activity_logs INSERT above.
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(skipInsCtx, `
if _, err := db.DB.ExecContext(skipInsCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason))
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: failed to log skipped run for %s: %v", sched.ID, err)
}
skipInsCancel()
if s.broadcaster != nil {