fix(bundle,scheduler,channels): log ignored DB errors #1913

Merged
agent-dev-a merged 1 commits from fix/more-ignored-db-errors-batch-2 into main 2026-05-26 15:15:00 +00:00
3 changed files with 39 additions and 18 deletions
+11 -4
View File
@@ -3,6 +3,7 @@ package bundle
import (
"context"
"fmt"
"log"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
@@ -72,7 +73,9 @@ func Import(
}
}
// Store runtime in DB
_, _ = db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID); err != nil {
log.Printf("bundle import: failed to store runtime for workspace %s: %v", wsID, err)
}
// Provision the container if provisioner is available
if prov != nil {
@@ -92,7 +95,9 @@ func Import(
if err != nil {
markFailed(provCtx, wsID, broadcaster, err)
} else if url != "" {
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
if _, err := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); err != nil {
log.Printf("bundle import: failed to store URL for workspace %s: %v", wsID, err)
}
}
}()
}
@@ -139,9 +144,11 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
// markProvisionFailed in workspace-server/internal/handlers/
// workspace_provision_shared.go.
msg := err.Error()
db.DB.ExecContext(ctx,
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
models.StatusFailed, msg, wsID)
models.StatusFailed, msg, wsID); dbErr != nil {
log.Printf("bundle import: failed to mark workspace %s as failed: %v", wsID, dbErr)
}
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
"error": msg,
})
@@ -392,11 +392,13 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
// Update stats in DB
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, ch.ID)
`, ch.ID); err != nil {
log.Printf("Channels: inbound stats update failed for channel %s: %v", ch.ID, err)
}
}
// Broadcast event
@@ -437,11 +439,13 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
}
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, channelID)
`, channelID); err != nil {
log.Printf("Channels: outbound stats update failed for channel %s: %v", channelID, err)
}
}
if m.broadcaster != nil {
@@ -490,11 +490,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
} else if lastStatus == "ok" {
// Non-empty success — reset the counter
resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(resetCtx, `
if _, err := db.DB.ExecContext(resetCtx, `
UPDATE workspace_schedules
SET consecutive_empty_runs = 0,
updated_at = now()
WHERE id = $1`, sched.ID)
WHERE id = $1`, sched.ID); err != nil {
log.Printf("Scheduler: '%s' empty-run reset failed: %v", sched.Name, err)
}
resetCancel()
}
@@ -525,9 +527,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
log.Printf("Scheduler: '%s' AUTO-DISABLING after %d consecutive SDK errors (workspace %s)",
sched.Name, consecSDK, short(sched.WorkspaceID, 12))
autoDisableCtx, autoDisableCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(autoDisableCtx, `
if _, err := db.DB.ExecContext(autoDisableCtx, `
UPDATE workspace_schedules SET enabled = false, updated_at = now() WHERE id = $1 AND enabled = true`,
sched.ID)
sched.ID); err != nil {
log.Printf("Scheduler: '%s' auto-disable failed: %v", sched.Name, err)
}
autoDisableCancel()
}
} else {
@@ -537,11 +541,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
// and we should clear the streak.
if lastStatus == "ok" {
resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(resetCtx, `
if _, err := db.DB.ExecContext(resetCtx, `
UPDATE workspace_schedules
SET consecutive_sdk_errors = 0,
updated_at = now()
WHERE id = $1`, sched.ID)
WHERE id = $1`, sched.ID); err != nil {
log.Printf("Scheduler: '%s' SDK-error reset failed: %v", sched.Name, err)
}
resetCancel()
}
}
@@ -658,7 +664,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
// #2026: bounded Background() context so the bookkeeping can't block
// on a stuck DB and stall the scheduler.
skipUpdCtx, skipUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(skipUpdCtx, `
if _, err := db.DB.ExecContext(skipUpdCtx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = COALESCE($2, next_run_at),
@@ -667,7 +673,9 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
last_error = $3,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, sanitizeUTF8(reason))
`, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: '%s' skip update failed: %v", sched.Name, err)
}
skipUpdCancel()
cronMeta, _ := json.Marshal(map[string]interface{}{
@@ -680,10 +688,12 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
// #2026: bounded Background() context on the skipped activity log INSERT
// for the same reason as the fireSchedule activity_logs INSERT above.
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(skipInsCtx, `
if _, err := db.DB.ExecContext(skipInsCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason))
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err)
}
skipInsCancel()
if s.broadcaster != nil {