fix(bundle,scheduler,channels): log ignored DB errors #1913
@@ -3,6 +3,7 @@ package bundle
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
@@ -72,7 +73,9 @@ func Import(
|
||||
}
|
||||
}
|
||||
// Store runtime in DB
|
||||
_, _ = db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID); err != nil {
|
||||
log.Printf("bundle import: failed to store runtime for workspace %s: %v", wsID, err)
|
||||
}
|
||||
|
||||
// Provision the container if provisioner is available
|
||||
if prov != nil {
|
||||
@@ -92,7 +95,9 @@ func Import(
|
||||
if err != nil {
|
||||
markFailed(provCtx, wsID, broadcaster, err)
|
||||
} else if url != "" {
|
||||
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
|
||||
if _, err := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); err != nil {
|
||||
log.Printf("bundle import: failed to store URL for workspace %s: %v", wsID, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -139,9 +144,11 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
|
||||
// markProvisionFailed in workspace-server/internal/handlers/
|
||||
// workspace_provision_shared.go.
|
||||
msg := err.Error()
|
||||
db.DB.ExecContext(ctx,
|
||||
if _, dbErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
|
||||
models.StatusFailed, msg, wsID)
|
||||
models.StatusFailed, msg, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to mark workspace %s as failed: %v", wsID, dbErr)
|
||||
}
|
||||
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
"error": msg,
|
||||
})
|
||||
|
||||
@@ -392,11 +392,13 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
|
||||
// Update stats in DB
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, ch.ID)
|
||||
`, ch.ID); err != nil {
|
||||
log.Printf("Channels: inbound stats update failed for channel %s: %v", ch.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast event
|
||||
@@ -437,11 +439,13 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
|
||||
}
|
||||
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, channelID)
|
||||
`, channelID); err != nil {
|
||||
log.Printf("Channels: outbound stats update failed for channel %s: %v", channelID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.broadcaster != nil {
|
||||
|
||||
@@ -490,11 +490,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
} else if lastStatus == "ok" {
|
||||
// Non-empty success — reset the counter
|
||||
resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
_, _ = db.DB.ExecContext(resetCtx, `
|
||||
if _, err := db.DB.ExecContext(resetCtx, `
|
||||
UPDATE workspace_schedules
|
||||
SET consecutive_empty_runs = 0,
|
||||
updated_at = now()
|
||||
WHERE id = $1`, sched.ID)
|
||||
WHERE id = $1`, sched.ID); err != nil {
|
||||
log.Printf("Scheduler: '%s' empty-run reset failed: %v", sched.Name, err)
|
||||
}
|
||||
resetCancel()
|
||||
}
|
||||
|
||||
@@ -525,9 +527,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
log.Printf("Scheduler: '%s' AUTO-DISABLING after %d consecutive SDK errors (workspace %s)",
|
||||
sched.Name, consecSDK, short(sched.WorkspaceID, 12))
|
||||
autoDisableCtx, autoDisableCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
_, _ = db.DB.ExecContext(autoDisableCtx, `
|
||||
if _, err := db.DB.ExecContext(autoDisableCtx, `
|
||||
UPDATE workspace_schedules SET enabled = false, updated_at = now() WHERE id = $1 AND enabled = true`,
|
||||
sched.ID)
|
||||
sched.ID); err != nil {
|
||||
log.Printf("Scheduler: '%s' auto-disable failed: %v", sched.Name, err)
|
||||
}
|
||||
autoDisableCancel()
|
||||
}
|
||||
} else {
|
||||
@@ -537,11 +541,13 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
// and we should clear the streak.
|
||||
if lastStatus == "ok" {
|
||||
resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
_, _ = db.DB.ExecContext(resetCtx, `
|
||||
if _, err := db.DB.ExecContext(resetCtx, `
|
||||
UPDATE workspace_schedules
|
||||
SET consecutive_sdk_errors = 0,
|
||||
updated_at = now()
|
||||
WHERE id = $1`, sched.ID)
|
||||
WHERE id = $1`, sched.ID); err != nil {
|
||||
log.Printf("Scheduler: '%s' SDK-error reset failed: %v", sched.Name, err)
|
||||
}
|
||||
resetCancel()
|
||||
}
|
||||
}
|
||||
@@ -658,7 +664,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
// #2026: bounded Background() context so the bookkeeping can't block
|
||||
// on a stuck DB and stall the scheduler.
|
||||
skipUpdCtx, skipUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
_, _ = db.DB.ExecContext(skipUpdCtx, `
|
||||
if _, err := db.DB.ExecContext(skipUpdCtx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = COALESCE($2, next_run_at),
|
||||
@@ -667,7 +673,9 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
last_error = $3,
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
`, sched.ID, nextRunPtr, sanitizeUTF8(reason))
|
||||
`, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil {
|
||||
log.Printf("Scheduler: '%s' skip update failed: %v", sched.Name, err)
|
||||
}
|
||||
skipUpdCancel()
|
||||
|
||||
cronMeta, _ := json.Marshal(map[string]interface{}{
|
||||
@@ -680,10 +688,12 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
// #2026: bounded Background() context on the skipped activity log INSERT
|
||||
// for the same reason as the fireSchedule activity_logs INSERT above.
|
||||
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
_, _ = db.DB.ExecContext(skipInsCtx, `
|
||||
if _, err := db.DB.ExecContext(skipInsCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason))
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
|
||||
log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err)
|
||||
}
|
||||
skipInsCancel()
|
||||
|
||||
if s.broadcaster != nil {
|
||||
|
||||
Reference in New Issue
Block a user