fix(handlers): prevent invalid JSONB inserts on json.Marshal failure (2nd pass) #1938
@@ -335,6 +335,7 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels %s: json.Marshal a2aBody failed: %v", ch.ChannelType, marshalErr)
|
||||
return fmt.Errorf("marshal a2a body: %w", marshalErr)
|
||||
}
|
||||
|
||||
callerID := "channel:" + ch.ChannelType
|
||||
@@ -676,6 +677,7 @@ func (m *Manager) appendHistory(ctx context.Context, key string, username, userM
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("appendHistory %s: json.Marshal entry failed: %v", key, marshalErr)
|
||||
return
|
||||
}
|
||||
db.RDB.LPush(ctx, key, string(entry))
|
||||
db.RDB.LTrim(ctx, key, 0, int64(maxHistoryEntries-1))
|
||||
|
||||
@@ -163,6 +163,7 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int
|
||||
body, marshalErr := json.Marshal(payload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("slack SendMessage: json.Marshal payload failed: %v", marshalErr)
|
||||
return fmt.Errorf("slack: marshal payload: %w", marshalErr)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
|
||||
@@ -425,6 +425,7 @@ func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("a2aQueue stitch %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
return
|
||||
}
|
||||
res, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE activity_logs
|
||||
|
||||
@@ -167,6 +167,7 @@ func (w *AgentMessageWriter) Send(
|
||||
respJSON, marshalErr := json.Marshal(respPayload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("AgentMessageWriter %s: json.Marshal respPayload failed: %v", workspaceID, marshalErr)
|
||||
return nil
|
||||
}
|
||||
preview := textutil.TruncateRunes(message, 80)
|
||||
if _, err := w.db.ExecContext(ctx, `
|
||||
|
||||
@@ -347,6 +347,7 @@ func computeAuditHMAC(key []byte, ev *auditEventRow) string {
|
||||
payload, marshalErr := json.Marshal(canonical) // compact, sorted keys
|
||||
if marshalErr != nil {
|
||||
log.Printf("auditChainHash: json.Marshal canonical failed: %v", marshalErr)
|
||||
return ""
|
||||
}
|
||||
mac := hmac.New(sha256.New, key)
|
||||
mac.Write(payload)
|
||||
|
||||
@@ -172,10 +172,14 @@ func (h *ChannelHandler) Create(c *gin.Context) {
|
||||
configJSON, marshalErr := json.Marshal(body.Config)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels create %s: json.Marshal config failed: %v", workspaceID, marshalErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal config failed"})
|
||||
return
|
||||
}
|
||||
allowedJSON, marshalErr := json.Marshal(body.AllowedUsers)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels create %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal allowed_users failed"})
|
||||
return
|
||||
}
|
||||
enabled := true
|
||||
if body.Enabled != nil {
|
||||
@@ -234,6 +238,8 @@ func (h *ChannelHandler) Update(c *gin.Context) {
|
||||
j, marshalErr := json.Marshal(body.Config)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels update %s: json.Marshal config failed: %v", workspaceID, marshalErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal config failed"})
|
||||
return
|
||||
}
|
||||
configArg = string(j)
|
||||
}
|
||||
@@ -241,6 +247,8 @@ func (h *ChannelHandler) Update(c *gin.Context) {
|
||||
j, marshalErr := json.Marshal(body.AllowedUsers)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels update %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal allowed_users failed"})
|
||||
return
|
||||
}
|
||||
allowedArg = string(j)
|
||||
}
|
||||
|
||||
@@ -60,12 +60,14 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st
|
||||
respJSON, marshalErr := json.Marshal(respPayload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respPayload failed: %v", delegationID, marshalErr)
|
||||
return
|
||||
}
|
||||
reqJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal reqPayload failed: %v", delegationID, marshalErr)
|
||||
return
|
||||
}
|
||||
logStatus := "ok"
|
||||
if status == "failed" {
|
||||
@@ -319,6 +321,7 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
|
||||
return insertTrackingUnavailable
|
||||
}
|
||||
// Store delegation_id in response_body so agent check_delegation_status
|
||||
// (which reads response_body->>delegation_id) can locate this row even
|
||||
@@ -328,6 +331,7 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
return insertTrackingUnavailable
|
||||
}
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
@@ -505,12 +509,13 @@ handleSuccess:
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal queuedJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
|
||||
`, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil {
|
||||
log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err)
|
||||
} else {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
|
||||
`, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil {
|
||||
log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err)
|
||||
}
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{
|
||||
"delegation_id": delegationID, "target_id": targetID, "status": "queued",
|
||||
@@ -531,12 +536,13 @@ handleSuccess:
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed')
|
||||
`, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil {
|
||||
log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err)
|
||||
} else {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed')
|
||||
`, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil {
|
||||
log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err)
|
||||
}
|
||||
}
|
||||
log.Printf("Delegation %s: step=recording_ledger_completed", delegationID)
|
||||
|
||||
@@ -619,6 +625,8 @@ func (h *DelegationHandler) Record(c *gin.Context) {
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", body.DelegationID, marshalErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal task"})
|
||||
return
|
||||
}
|
||||
// Store delegation_id in response_body so agent check_delegation_status
|
||||
// can locate this row. Fixes mc#984.
|
||||
@@ -627,6 +635,8 @@ func (h *DelegationHandler) Record(c *gin.Context) {
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", body.DelegationID, marshalErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal response"})
|
||||
return
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
|
||||
@@ -697,12 +707,13 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation UpdateStatus %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed')
|
||||
`, sourceID, sourceID, "Delegation completed ("+textutil.TruncateBytes(body.ResponsePreview, 80)+")", string(respJSON)); err != nil {
|
||||
log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err)
|
||||
} else {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed')
|
||||
`, sourceID, sourceID, "Delegation completed ("+textutil.TruncateBytes(body.ResponsePreview, 80)+")", string(respJSON)); err != nil {
|
||||
log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err)
|
||||
}
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
|
||||
@@ -247,13 +247,14 @@ func (h *MemoriesHandler) Commit(c *gin.Context) {
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Commit %s: json.Marshal auditBody failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName
|
||||
if _, auditErr := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status)
|
||||
VALUES ($1, $2, $3, $4, $5::jsonb, $6)
|
||||
`, workspaceID, "memory_write_global", workspaceID, summary, string(auditBody), "ok"); auditErr != nil {
|
||||
log.Printf("Commit: GLOBAL memory audit log failed for %s/%s: %v", workspaceID, memoryID, auditErr)
|
||||
} else {
|
||||
summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName
|
||||
if _, auditErr := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status)
|
||||
VALUES ($1, $2, $3, $4, $5::jsonb, $6)
|
||||
`, workspaceID, "memory_write_global", workspaceID, summary, string(auditBody), "ok"); auditErr != nil {
|
||||
log.Printf("Commit: GLOBAL memory audit log failed for %s/%s: %v", workspaceID, memoryID, auditErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -418,6 +418,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal a2aBody failed: %v", sched.Name, marshalErr)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
|
||||
@@ -603,23 +604,24 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
|
||||
} else {
|
||||
// #152: persist lastError into error_detail on the activity_logs row
|
||||
// so GET /workspaces/:id/schedules/:id/history can surface why a run
|
||||
// failed (previously dropped — history returned status without any
|
||||
// error context, making root-cause debugging impossible).
|
||||
// #2026: bounded Background() context — this INSERT was observed wedging
|
||||
// indefinitely on invalid-UTF-8 jsonb payloads, blocking wg.Wait() in
|
||||
// tick() and stalling the whole scheduler. Now: 10s deadline, survives
|
||||
// outer ctx cancellation, and every string is UTF-8 sanitized.
|
||||
insertCtx, insertCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
if _, insErr := db.DB.ExecContext(insertCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron: "+sched.Name), string(cronMeta), lastStatus, sanitizeUTF8(lastError)); insErr != nil {
|
||||
log.Printf("Scheduler: activity_logs insert failed for '%s' (%s): %v", sched.Name, sched.ID, insErr)
|
||||
}
|
||||
insertCancel()
|
||||
}
|
||||
// #152: persist lastError into error_detail on the activity_logs row
|
||||
// so GET /workspaces/:id/schedules/:id/history can surface why a run
|
||||
// failed (previously dropped — history returned status without any
|
||||
// error context, making root-cause debugging impossible).
|
||||
// #2026: bounded Background() context — this INSERT was observed wedging
|
||||
// indefinitely on invalid-UTF-8 jsonb payloads, blocking wg.Wait() in
|
||||
// tick() and stalling the whole scheduler. Now: 10s deadline, survives
|
||||
// outer ctx cancellation, and every string is UTF-8 sanitized.
|
||||
insertCtx, insertCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
if _, insErr := db.DB.ExecContext(insertCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron: "+sched.Name), string(cronMeta), lastStatus, sanitizeUTF8(lastError)); insErr != nil {
|
||||
log.Printf("Scheduler: activity_logs insert failed for '%s' (%s): %v", sched.Name, sched.ID, insErr)
|
||||
}
|
||||
insertCancel()
|
||||
|
||||
if s.broadcaster != nil {
|
||||
s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronExecuted), sched.WorkspaceID, map[string]interface{}{
|
||||
@@ -693,17 +695,18 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
|
||||
} else {
|
||||
// #2026: bounded Background() context on the skipped activity log INSERT
|
||||
// for the same reason as the fireSchedule activity_logs INSERT above.
|
||||
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
if _, err := db.DB.ExecContext(skipInsCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
|
||||
log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err)
|
||||
}
|
||||
skipInsCancel()
|
||||
}
|
||||
// #2026: bounded Background() context on the skipped activity log INSERT
|
||||
// for the same reason as the fireSchedule activity_logs INSERT above.
|
||||
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
if _, err := db.DB.ExecContext(skipInsCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
|
||||
log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err)
|
||||
}
|
||||
skipInsCancel()
|
||||
|
||||
if s.broadcaster != nil {
|
||||
_ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{
|
||||
|
||||
Reference in New Issue
Block a user