fix(bundle,channels,workspace,crud): check ExecContext errors instead of discarding #2040

Closed
core-be wants to merge 2 commits from fix/execcontext-err-check-high-impact into staging
5 changed files with 30 additions and 22 deletions
+10 -4
View File
@@ -72,7 +72,9 @@ func Import(
}
}
// Store runtime in DB
_, _ = db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID); err != nil {
log.Printf("Bundle import: failed to store runtime for %s: %v", wsID, err)
}
// Provision the container if provisioner is available
if prov != nil {
@@ -92,7 +94,9 @@ func Import(
if err != nil {
markFailed(provCtx, wsID, broadcaster, err)
} else if url != "" {
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
log.Printf("Bundle import: failed to store URL for %s: %v", wsID, dbErr)
}
}
}()
}
@@ -139,9 +143,11 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
// markProvisionFailed in workspace-server/internal/handlers/
// workspace_provision_shared.go.
msg := err.Error()
db.DB.ExecContext(ctx,
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
models.StatusFailed, msg, wsID)
models.StatusFailed, msg, wsID); dbErr != nil {
log.Printf("Bundle import: failed to mark %s failed: %v", wsID, dbErr)
}
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
"error": msg,
})
@@ -375,11 +375,13 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
// Update stats in DB
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, ch.ID)
`, ch.ID); err != nil {
log.Printf("Channels: inbound stats update error for %s: %v", truncID(ch.ID), err)
}
}
// Broadcast event
@@ -420,11 +422,13 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
}
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, channelID)
`, channelID); err != nil {
log.Printf("Channels: outbound stats update error for %s: %v", truncID(channelID), err)
}
}
if m.broadcaster != nil {
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
@@ -465,7 +465,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// Preserve BYO-compute runtime label (kimi, kimi-cli, external) —
// don't coerce to generic "external" so the canvas can show the
// correct runtime name in the node card.
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = $3, updated_at = now() WHERE id = $4`, payload.URL, models.StatusOnline, normalizeExternalRuntime(payload.Runtime), id)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = $3, updated_at = now() WHERE id = $4`, payload.URL, models.StatusOnline, normalizeExternalRuntime(payload.Runtime), id); err != nil {
log.Printf("External workspace: failed to update URL for %s: %v", id, err)
}
if err := db.CacheURL(ctx, id, payload.URL); err != nil {
log.Printf("External workspace: failed to cache URL for %s: %v", id, err)
}
@@ -476,9 +478,10 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// Pre-register flow: mint a token and park the workspace
// in awaiting_agent. First POST /registry/register call
// from the external agent (with this token + its URL)
// flips the row to online.
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id); err != nil {
log.Printf("External workspace: failed to set awaiting_agent for %s: %v", id, err)
}
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
if tokErr != nil {
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
@@ -373,8 +373,12 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
}
}
// Null out parent_id / forwarded_to references
db.DB.ExecContext(ctx, "UPDATE workspaces SET parent_id = NULL WHERE parent_id = ANY($1::uuid[])", purgeIDs)
db.DB.ExecContext(ctx, "UPDATE workspaces SET forwarded_to = NULL WHERE forwarded_to = ANY($1::uuid[])", purgeIDs)
if _, err := db.DB.ExecContext(ctx, "UPDATE workspaces SET parent_id = NULL WHERE parent_id = ANY($1::uuid[])", purgeIDs); err != nil {
log.Printf("Purge parent_id null error for %v: %v", allIDs, err)
}
if _, err := db.DB.ExecContext(ctx, "UPDATE workspaces SET forwarded_to = NULL WHERE forwarded_to = ANY($1::uuid[])", purgeIDs); err != nil {
log.Printf("Purge forwarded_to null error for %v: %v", allIDs, err)
}
// Hard delete the workspace row
if _, err := db.DB.ExecContext(ctx, "DELETE FROM workspaces WHERE id = ANY($1::uuid[])", purgeIDs); err != nil {
log.Printf("Purge workspace row error for %v: %v", allIDs, err)