From 7be380bc78a6837a96fb0a65c56666eaff547790 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Tue, 26 May 2026 15:06:56 +0000 Subject: [PATCH] fix(bundle,scheduler,channels): log ignored DB errors - bundle/importer.go: log runtime store, URL store, and markFailed errors - scheduler/scheduler.go: log empty-run reset, auto-disable, SDK-error reset, skip update, and skip insert errors - channels/manager.go: log inbound and outbound stats update errors Co-Authored-By: Claude Opus 4.7 --- workspace-server/internal/bundle/importer.go | 15 +++++++--- workspace-server/internal/channels/manager.go | 12 +++++--- .../internal/scheduler/scheduler.go | 30 ++++++++++++------- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/workspace-server/internal/bundle/importer.go b/workspace-server/internal/bundle/importer.go index ef452f10d..d8ce175be 100644 --- a/workspace-server/internal/bundle/importer.go +++ b/workspace-server/internal/bundle/importer.go @@ -3,6 +3,7 @@ package bundle import ( "context" "fmt" + "log" "strings" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" @@ -72,7 +73,9 @@ func Import( } } // Store runtime in DB - _, _ = db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID) + if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID); err != nil { + log.Printf("bundle import: failed to store runtime for workspace %s: %v", wsID, err) + } // Provision the container if provisioner is available if prov != nil { @@ -92,7 +95,9 @@ func Import( if err != nil { markFailed(provCtx, wsID, broadcaster, err) } else if url != "" { - db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID) + if _, err := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); err != nil { + log.Printf("bundle import: failed to store URL for workspace %s: %v", wsID, err) + } } }() } @@ -139,9 +144,11 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste // markProvisionFailed in workspace-server/internal/handlers/ // workspace_provision_shared.go. msg := err.Error() - db.DB.ExecContext(ctx, + if _, dbErr := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`, - models.StatusFailed, msg, wsID) + models.StatusFailed, msg, wsID); dbErr != nil { + log.Printf("bundle import: failed to mark workspace %s as failed: %v", wsID, dbErr) + } broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{ "error": msg, }) diff --git a/workspace-server/internal/channels/manager.go b/workspace-server/internal/channels/manager.go index b64336a5e..644e33c74 100644 --- a/workspace-server/internal/channels/manager.go +++ b/workspace-server/internal/channels/manager.go @@ -392,11 +392,13 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound // Update stats in DB if db.DB != nil { - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_channels SET last_message_at = now(), message_count = message_count + 1, updated_at = now() WHERE id = $1 - `, ch.ID) + `, ch.ID); err != nil { + log.Printf("Channels: inbound stats update failed for channel %s: %v", ch.ID, err) + } } // Broadcast event @@ -437,11 +439,13 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin } if db.DB != nil { - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_channels SET last_message_at = now(), message_count = message_count + 1, updated_at = now() WHERE id = $1 - `, channelID) + `, channelID); err != nil { + log.Printf("Channels: outbound stats update failed for channel %s: %v", channelID, err) + } } if m.broadcaster != nil { diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 867b38373..bdac3cb9e 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -490,11 +490,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { } else if lastStatus == "ok" { // Non-empty success — reset the counter resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - _, _ = db.DB.ExecContext(resetCtx, ` + if _, err := db.DB.ExecContext(resetCtx, ` UPDATE workspace_schedules SET consecutive_empty_runs = 0, updated_at = now() - WHERE id = $1`, sched.ID) + WHERE id = $1`, sched.ID); err != nil { + log.Printf("Scheduler: '%s' empty-run reset failed: %v", sched.Name, err) + } resetCancel() } @@ -525,9 +527,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { log.Printf("Scheduler: '%s' AUTO-DISABLING after %d consecutive SDK errors (workspace %s)", sched.Name, consecSDK, short(sched.WorkspaceID, 12)) autoDisableCtx, autoDisableCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - _, _ = db.DB.ExecContext(autoDisableCtx, ` + if _, err := db.DB.ExecContext(autoDisableCtx, ` UPDATE workspace_schedules SET enabled = false, updated_at = now() WHERE id = $1 AND enabled = true`, - sched.ID) + sched.ID); err != nil { + log.Printf("Scheduler: '%s' auto-disable failed: %v", sched.Name, err) + } autoDisableCancel() } } else { @@ -537,11 +541,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { // and we should clear the streak. if lastStatus == "ok" { resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - _, _ = db.DB.ExecContext(resetCtx, ` + if _, err := db.DB.ExecContext(resetCtx, ` UPDATE workspace_schedules SET consecutive_sdk_errors = 0, updated_at = now() - WHERE id = $1`, sched.ID) + WHERE id = $1`, sched.ID); err != nil { + log.Printf("Scheduler: '%s' SDK-error reset failed: %v", sched.Name, err) + } resetCancel() } } @@ -658,7 +664,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active // #2026: bounded Background() context so the bookkeeping can't block // on a stuck DB and stall the scheduler. skipUpdCtx, skipUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - _, _ = db.DB.ExecContext(skipUpdCtx, ` + if _, err := db.DB.ExecContext(skipUpdCtx, ` UPDATE workspace_schedules SET last_run_at = now(), next_run_at = COALESCE($2, next_run_at), @@ -667,7 +673,9 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active last_error = $3, updated_at = now() WHERE id = $1 - `, sched.ID, nextRunPtr, sanitizeUTF8(reason)) + `, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' skip update failed: %v", sched.Name, err) + } skipUpdCancel() cronMeta, _ := json.Marshal(map[string]interface{}{ @@ -680,10 +688,12 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active // #2026: bounded Background() context on the skipped activity log INSERT // for the same reason as the fireSchedule activity_logs INSERT above. skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - _, _ = db.DB.ExecContext(skipInsCtx, ` + if _, err := db.DB.ExecContext(skipInsCtx, ` INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now()) - `, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)) + `, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err) + } skipInsCancel() if s.broadcaster != nil { -- 2.52.0