fix(workspace_restart): check QueryContext/ExecContext errors and rows.Err() in Pause/Resume #2052

Closed
core-be wants to merge 3 commits from fix/workspace-restart-rows-err into staging
3 changed files with 32 additions and 21 deletions
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
@@ -133,24 +133,30 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// message bus.
keySet := map[string]struct{}{}
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Printf("restart-context: global_secrets rows error: %v", err)
}
}
if rows, err := db.DB.QueryContext(ctx,
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Printf("restart-context: workspace_secrets rows error: %v", err)
}
}
for k := range keySet {
d.EnvKeys = append(d.EnvKeys, k)
@@ -638,20 +638,25 @@ func (h *WorkspaceHandler) Pause(c *gin.Context) {
// Collect this workspace + all descendants to pause
toPause := []struct{ id, name string }{{id, wsName}}
rows, _ := db.DB.QueryContext(ctx,
rows, err := db.DB.QueryContext(ctx,
`WITH RECURSIVE descendants AS (
SELECT id, name FROM workspaces WHERE parent_id = $1 AND status NOT IN ('removed', 'paused')
UNION ALL
SELECT w.id, w.name FROM workspaces w JOIN descendants d ON w.parent_id = d.id WHERE w.status NOT IN ('removed', 'paused')
) SELECT id, name FROM descendants`, id)
if rows != nil {
if err != nil {
log.Printf("Pause: descendant query failed: %v", err)
} else {
defer rows.Close()
for rows.Next() {
var cid, cname string
if rows.Scan(&cid, &cname) == nil {
if err := rows.Scan(&cid, &cname); err == nil {
toPause = append(toPause, struct{ id, name string }{cid, cname})
}
}
if err := rows.Err(); err != nil {
log.Printf("Pause: descendant rows error: %v", err)
}
}
// Stop containers and mark all as paused. StopWorkspaceAuto routes
@@ -667,8 +672,10 @@ func (h *WorkspaceHandler) Pause(c *gin.Context) {
if err := h.StopWorkspaceAuto(ctx, ws.id); err != nil {
log.Printf("Pause: stop %s failed: %v — orphan sweeper will reconcile", ws.id, err)
}
db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusPaused, ws.id)
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusPaused, ws.id); err != nil {
log.Printf("Pause: status update failed for %s: %v", ws.id, err)
}
db.ClearWorkspaceKeys(ctx, ws.id)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspacePaused), ws.id, map[string]interface{}{
"name": ws.name,
@@ -719,26 +726,33 @@ func (h *WorkspaceHandler) Resume(c *gin.Context) {
tier int
}
toResume := []wsInfo{{id, wsName, dbRuntime, tier}}
rows, _ := db.DB.QueryContext(ctx,
rows, err := db.DB.QueryContext(ctx,
`WITH RECURSIVE descendants AS (
SELECT id, name, tier, COALESCE(runtime, 'langgraph') AS runtime FROM workspaces WHERE parent_id = $1 AND status = 'paused'
UNION ALL
SELECT w.id, w.name, w.tier, COALESCE(w.runtime, 'langgraph') FROM workspaces w JOIN descendants d ON w.parent_id = d.id WHERE w.status = 'paused'
) SELECT id, name, tier, runtime FROM descendants`, id)
if rows != nil {
if err != nil {
log.Printf("Resume: descendant query failed: %v", err)
} else {
defer rows.Close()
for rows.Next() {
var ws wsInfo
if rows.Scan(&ws.id, &ws.name, &ws.tier, &ws.runtime) == nil {
if err := rows.Scan(&ws.id, &ws.name, &ws.tier, &ws.runtime); err == nil {
toResume = append(toResume, ws)
}
}
if err := rows.Err(); err != nil {
log.Printf("Resume: descendant rows error: %v", err)
}
}
// Re-provision all
for _, ws := range toResume {
db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusProvisioning, ws.id)
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusProvisioning, ws.id); err != nil {
log.Printf("Resume: status update failed for %s: %v", ws.id, err)
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), ws.id, map[string]interface{}{
"name": ws.name, "tier": ws.tier, "runtime": ws.runtime,
})