diff --git a/workspace-server/internal/channels/manager.go b/workspace-server/internal/channels/manager.go index 0a9ae8ead..b4c90c0fd 100644 --- a/workspace-server/internal/channels/manager.go +++ b/workspace-server/internal/channels/manager.go @@ -335,6 +335,7 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound }) if marshalErr != nil { log.Printf("Channels %s: json.Marshal a2aBody failed: %v", ch.ChannelType, marshalErr) + return fmt.Errorf("marshal a2a body: %w", marshalErr) } callerID := "channel:" + ch.ChannelType @@ -676,6 +677,7 @@ func (m *Manager) appendHistory(ctx context.Context, key string, username, userM }) if marshalErr != nil { log.Printf("appendHistory %s: json.Marshal entry failed: %v", key, marshalErr) + return } db.RDB.LPush(ctx, key, string(entry)) db.RDB.LTrim(ctx, key, 0, int64(maxHistoryEntries-1)) diff --git a/workspace-server/internal/channels/slack.go b/workspace-server/internal/channels/slack.go index ddf1300eb..2b78c77cf 100644 --- a/workspace-server/internal/channels/slack.go +++ b/workspace-server/internal/channels/slack.go @@ -163,6 +163,7 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int body, marshalErr := json.Marshal(payload) if marshalErr != nil { log.Printf("slack SendMessage: json.Marshal payload failed: %v", marshalErr) + return fmt.Errorf("slack: marshal payload: %w", marshalErr) } req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body)) if err != nil { diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index 5c617e53d..7997dda3e 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -425,6 +425,7 @@ func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context, }) if marshalErr != nil { log.Printf("a2aQueue stitch %s: json.Marshal respJSON failed: %v", delegationID, marshalErr) + return } res, err := db.DB.ExecContext(ctx, ` UPDATE activity_logs diff --git a/workspace-server/internal/handlers/agent_message_writer.go b/workspace-server/internal/handlers/agent_message_writer.go index 8afd48206..10e4da077 100644 --- a/workspace-server/internal/handlers/agent_message_writer.go +++ b/workspace-server/internal/handlers/agent_message_writer.go @@ -167,6 +167,7 @@ func (w *AgentMessageWriter) Send( respJSON, marshalErr := json.Marshal(respPayload) if marshalErr != nil { log.Printf("AgentMessageWriter %s: json.Marshal respPayload failed: %v", workspaceID, marshalErr) + return nil } preview := textutil.TruncateRunes(message, 80) if _, err := w.db.ExecContext(ctx, ` diff --git a/workspace-server/internal/handlers/audit.go b/workspace-server/internal/handlers/audit.go index c5ac0c26e..f2cc5a39c 100644 --- a/workspace-server/internal/handlers/audit.go +++ b/workspace-server/internal/handlers/audit.go @@ -347,6 +347,7 @@ func computeAuditHMAC(key []byte, ev *auditEventRow) string { payload, marshalErr := json.Marshal(canonical) // compact, sorted keys if marshalErr != nil { log.Printf("auditChainHash: json.Marshal canonical failed: %v", marshalErr) + return "" } mac := hmac.New(sha256.New, key) mac.Write(payload) diff --git a/workspace-server/internal/handlers/channels.go b/workspace-server/internal/handlers/channels.go index ddf0bc6c8..e776a1d86 100644 --- a/workspace-server/internal/handlers/channels.go +++ b/workspace-server/internal/handlers/channels.go @@ -172,10 +172,14 @@ func (h *ChannelHandler) Create(c *gin.Context) { configJSON, marshalErr := json.Marshal(body.Config) if marshalErr != nil { log.Printf("Channels create %s: json.Marshal config failed: %v", workspaceID, marshalErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal config failed"}) + return } allowedJSON, marshalErr := json.Marshal(body.AllowedUsers) if marshalErr != nil { log.Printf("Channels create %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal allowed_users failed"}) + return } enabled := true if body.Enabled != nil { @@ -234,6 +238,8 @@ func (h *ChannelHandler) Update(c *gin.Context) { j, marshalErr := json.Marshal(body.Config) if marshalErr != nil { log.Printf("Channels update %s: json.Marshal config failed: %v", workspaceID, marshalErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal config failed"}) + return } configArg = string(j) } @@ -241,6 +247,8 @@ func (h *ChannelHandler) Update(c *gin.Context) { j, marshalErr := json.Marshal(body.AllowedUsers) if marshalErr != nil { log.Printf("Channels update %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal allowed_users failed"}) + return } allowedArg = string(j) } diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index 7bc4b8b05..f3353a541 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -60,12 +60,14 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st respJSON, marshalErr := json.Marshal(respPayload) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal respPayload failed: %v", delegationID, marshalErr) + return } reqJSON, marshalErr := json.Marshal(map[string]interface{}{ "delegation_id": delegationID, }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal reqPayload failed: %v", delegationID, marshalErr) + return } logStatus := "ok" if status == "failed" { @@ -319,6 +321,7 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr) + return insertTrackingUnavailable } // Store delegation_id in response_body so agent check_delegation_status // (which reads response_body->>delegation_id) can locate this row even @@ -328,6 +331,7 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr) + return insertTrackingUnavailable } var idemArg interface{} if body.IdempotencyKey != "" { @@ -505,12 +509,13 @@ handleSuccess: }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal queuedJSON failed: %v", delegationID, marshalErr) - } - if _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status) - VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued') - `, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil { - log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err) + } else { + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status) + VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued') + `, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil { + log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err) + } } h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "status": "queued", @@ -531,12 +536,13 @@ handleSuccess: }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr) - } - if _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status) - VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed') - `, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil { - log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err) + } else { + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status) + VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed') + `, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil { + log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err) + } } log.Printf("Delegation %s: step=recording_ledger_completed", delegationID) @@ -619,6 +625,8 @@ func (h *DelegationHandler) Record(c *gin.Context) { }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", body.DelegationID, marshalErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal task"}) + return } // Store delegation_id in response_body so agent check_delegation_status // can locate this row. Fixes mc#984. @@ -627,6 +635,8 @@ func (h *DelegationHandler) Record(c *gin.Context) { }) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal respJSON failed: %v", body.DelegationID, marshalErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal response"}) + return } if _, err := db.DB.ExecContext(ctx, ` INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status) @@ -697,12 +707,13 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) { }) if marshalErr != nil { log.Printf("Delegation UpdateStatus %s: json.Marshal respJSON failed: %v", delegationID, marshalErr) - } - if _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status) - VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed') - `, sourceID, sourceID, "Delegation completed ("+textutil.TruncateBytes(body.ResponsePreview, 80)+")", string(respJSON)); err != nil { - log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err) + } else { + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status) + VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed') + `, sourceID, sourceID, "Delegation completed ("+textutil.TruncateBytes(body.ResponsePreview, 80)+")", string(respJSON)); err != nil { + log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err) + } } h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{ "delegation_id": delegationID, diff --git a/workspace-server/internal/handlers/memories.go b/workspace-server/internal/handlers/memories.go index 37eb31d04..578186bd8 100644 --- a/workspace-server/internal/handlers/memories.go +++ b/workspace-server/internal/handlers/memories.go @@ -247,13 +247,14 @@ func (h *MemoriesHandler) Commit(c *gin.Context) { }) if marshalErr != nil { log.Printf("Commit %s: json.Marshal auditBody failed: %v", workspaceID, marshalErr) - } - summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName - if _, auditErr := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status) - VALUES ($1, $2, $3, $4, $5::jsonb, $6) - `, workspaceID, "memory_write_global", workspaceID, summary, string(auditBody), "ok"); auditErr != nil { - log.Printf("Commit: GLOBAL memory audit log failed for %s/%s: %v", workspaceID, memoryID, auditErr) + } else { + summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName + if _, auditErr := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status) + VALUES ($1, $2, $3, $4, $5::jsonb, $6) + `, workspaceID, "memory_write_global", workspaceID, summary, string(auditBody), "ok"); auditErr != nil { + log.Printf("Commit: GLOBAL memory audit log failed for %s/%s: %v", workspaceID, memoryID, auditErr) + } } } diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index b364f1292..3136a86b9 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -418,6 +418,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { }) if marshalErr != nil { log.Printf("Scheduler '%s': json.Marshal a2aBody failed: %v", sched.Name, marshalErr) + return } log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12)) @@ -603,23 +604,24 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { }) if marshalErr != nil { log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr) + } else { + // #152: persist lastError into error_detail on the activity_logs row + // so GET /workspaces/:id/schedules/:id/history can surface why a run + // failed (previously dropped — history returned status without any + // error context, making root-cause debugging impossible). + // #2026: bounded Background() context — this INSERT was observed wedging + // indefinitely on invalid-UTF-8 jsonb payloads, blocking wg.Wait() in + // tick() and stalling the whole scheduler. Now: 10s deadline, survives + // outer ctx cancellation, and every string is UTF-8 sanitized. + insertCtx, insertCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if _, insErr := db.DB.ExecContext(insertCtx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now()) + `, sched.WorkspaceID, sanitizeUTF8("Cron: "+sched.Name), string(cronMeta), lastStatus, sanitizeUTF8(lastError)); insErr != nil { + log.Printf("Scheduler: activity_logs insert failed for '%s' (%s): %v", sched.Name, sched.ID, insErr) + } + insertCancel() } - // #152: persist lastError into error_detail on the activity_logs row - // so GET /workspaces/:id/schedules/:id/history can surface why a run - // failed (previously dropped — history returned status without any - // error context, making root-cause debugging impossible). - // #2026: bounded Background() context — this INSERT was observed wedging - // indefinitely on invalid-UTF-8 jsonb payloads, blocking wg.Wait() in - // tick() and stalling the whole scheduler. Now: 10s deadline, survives - // outer ctx cancellation, and every string is UTF-8 sanitized. - insertCtx, insertCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - if _, insErr := db.DB.ExecContext(insertCtx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) - VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now()) - `, sched.WorkspaceID, sanitizeUTF8("Cron: "+sched.Name), string(cronMeta), lastStatus, sanitizeUTF8(lastError)); insErr != nil { - log.Printf("Scheduler: activity_logs insert failed for '%s' (%s): %v", sched.Name, sched.ID, insErr) - } - insertCancel() if s.broadcaster != nil { s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronExecuted), sched.WorkspaceID, map[string]interface{}{ @@ -693,17 +695,18 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active }) if marshalErr != nil { log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr) + } else { + // #2026: bounded Background() context on the skipped activity log INSERT + // for the same reason as the fireSchedule activity_logs INSERT above. + skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if _, err := db.DB.ExecContext(skipInsCtx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now()) + `, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err) + } + skipInsCancel() } - // #2026: bounded Background() context on the skipped activity log INSERT - // for the same reason as the fireSchedule activity_logs INSERT above. - skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout) - if _, err := db.DB.ExecContext(skipInsCtx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) - VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now()) - `, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil { - log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err) - } - skipInsCancel() if s.broadcaster != nil { _ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{