fix(handlers,channels,scheduler): log ignored json.Marshal errors #1918

Merged
agent-dev-a merged 1 commits from fix/json-marshal-ignored-errors into main 2026-05-26 16:37:20 +00:00
15 changed files with 158 additions and 39 deletions
@@ -313,7 +313,7 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
history := m.loadHistory(ctx, historyKey)
// Build A2A JSON-RPC payload
a2aBody, _ := json.Marshal(map[string]interface{}{
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
@@ -333,6 +333,9 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
},
},
})
if marshalErr != nil {
log.Printf("Channels %s: json.Marshal a2aBody failed: %v", ch.ChannelType, marshalErr)
}
callerID := "channel:" + ch.ChannelType
@@ -665,12 +668,15 @@ func (m *Manager) appendHistory(ctx context.Context, key string, username, userM
if db.RDB == nil {
return
}
entry, _ := json.Marshal(map[string]string{
entry, marshalErr := json.Marshal(map[string]string{
"user": username,
"message": userMsg,
"reply": agentReply,
"time": time.Now().UTC().Format(time.RFC3339),
})
if marshalErr != nil {
log.Printf("appendHistory %s: json.Marshal entry failed: %v", key, marshalErr)
}
db.RDB.LPush(ctx, key, string(entry))
db.RDB.LTrim(ctx, key, 0, int64(maxHistoryEntries-1))
db.RDB.Expire(ctx, key, historyTTL)
+5 -1
View File
@@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
"time"
@@ -159,7 +160,10 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int
payload["icon_emoji"] = iconEmoji
}
body, _ := json.Marshal(payload)
body, marshalErr := json.Marshal(payload)
if marshalErr != nil {
log.Printf("slack SendMessage: json.Marshal payload failed: %v", marshalErr)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("slack: build request: %w", err)
@@ -115,12 +115,15 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
if logActivity {
h.logA2ABusyQueued(ctx, workspaceID, callerID, body, a2aMethod, durationMs)
}
respBody, _ := json.Marshal(gin.H{
respBody, marshalErr := json.Marshal(gin.H{
"queued": true,
"queue_id": qid,
"queue_depth": depth,
"message": "workspace agent busy — request queued, will dispatch when capacity available",
})
if marshalErr != nil {
log.Printf("ProxyA2A %s: json.Marshal respBody failed: %v", workspaceID, marshalErr)
}
return http.StatusAccepted, respBody, nil
} else {
// Queue insert failed — fall through to legacy 503 behavior
@@ -419,10 +419,13 @@ func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context,
return
}
responseText := extractResponseText(respBody)
respJSON, _ := json.Marshal(map[string]interface{}{
respJSON, marshalErr := json.Marshal(map[string]interface{}{
"text": responseText,
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("a2aQueue stitch %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
}
res, err := db.DB.ExecContext(ctx, `
UPDATE activity_logs
SET status = 'completed',
@@ -164,7 +164,10 @@ func (w *AgentMessageWriter) Send(
}
respPayload["parts"] = fileParts
}
respJSON, _ := json.Marshal(respPayload)
respJSON, marshalErr := json.Marshal(respPayload)
if marshalErr != nil {
log.Printf("AgentMessageWriter %s: json.Marshal respPayload failed: %v", workspaceID, marshalErr)
}
preview := textutil.TruncateRunes(message, 80)
if _, err := w.db.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
@@ -34,7 +34,10 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
return
}
ctxJSON, _ := json.Marshal(body.Context)
ctxJSON, marshalErr := json.Marshal(body.Context)
if marshalErr != nil {
log.Printf("Approvals create %s: json.Marshal context failed: %v", workspaceID, marshalErr)
}
if ctxJSON == nil {
ctxJSON = []byte("{}")
}
+4 -1
View File
@@ -344,7 +344,10 @@ func computeAuditHMAC(key []byte, ev *auditEventRow) string {
"timestamp": ev.Timestamp.UTC().Format("2006-01-02T15:04:05Z"),
}
payload, _ := json.Marshal(canonical) // compact, sorted keys
payload, marshalErr := json.Marshal(canonical) // compact, sorted keys
if marshalErr != nil {
log.Printf("auditChainHash: json.Marshal canonical failed: %v", marshalErr)
}
mac := hmac.New(sha256.New, key)
mac.Write(payload)
return hex.EncodeToString(mac.Sum(nil))
+16 -4
View File
@@ -169,8 +169,14 @@ func (h *ChannelHandler) Create(c *gin.Context) {
return
}
configJSON, _ := json.Marshal(body.Config)
allowedJSON, _ := json.Marshal(body.AllowedUsers)
configJSON, marshalErr := json.Marshal(body.Config)
if marshalErr != nil {
log.Printf("Channels create %s: json.Marshal config failed: %v", workspaceID, marshalErr)
}
allowedJSON, marshalErr := json.Marshal(body.AllowedUsers)
if marshalErr != nil {
log.Printf("Channels create %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
}
enabled := true
if body.Enabled != nil {
enabled = *body.Enabled
@@ -225,11 +231,17 @@ func (h *ChannelHandler) Update(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "encrypt failed"})
return
}
j, _ := json.Marshal(body.Config)
j, marshalErr := json.Marshal(body.Config)
if marshalErr != nil {
log.Printf("Channels update %s: json.Marshal config failed: %v", workspaceID, marshalErr)
}
configArg = string(j)
}
if body.AllowedUsers != nil {
j, _ := json.Marshal(body.AllowedUsers)
j, marshalErr := json.Marshal(body.AllowedUsers)
if marshalErr != nil {
log.Printf("Channels update %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
}
allowedArg = string(j)
}
@@ -57,10 +57,16 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st
"text": responsePreview,
"delegation_id": delegationID,
}
respJSON, _ := json.Marshal(respPayload)
reqJSON, _ := json.Marshal(map[string]interface{}{
respJSON, marshalErr := json.Marshal(respPayload)
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respPayload failed: %v", delegationID, marshalErr)
}
reqJSON, marshalErr := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal reqPayload failed: %v", delegationID, marshalErr)
}
logStatus := "ok"
if status == "failed" {
logStatus = "error"
@@ -165,7 +171,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
// check_task_status returned status='queued' forever even after a
// real reply landed). messageId mirrors delegation_id so the
// platform's idempotency-key extraction also keys off the same id.
a2aBody, _ := json.Marshal(map[string]interface{}{
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
@@ -176,6 +182,9 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
},
},
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
}
// Fire-and-forget: send A2A in a background goroutine.
//
@@ -304,16 +313,22 @@ const (
// insertDelegationRow stores the pending delegation row. See
// insertDelegationOutcome for the three possible return values.
func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, body delegateRequest, delegationID string) insertDelegationOutcome {
taskJSON, _ := json.Marshal(map[string]interface{}{
taskJSON, marshalErr := json.Marshal(map[string]interface{}{
"task": body.Task,
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
}
// Store delegation_id in response_body so agent check_delegation_status
// (which reads response_body->>delegation_id) can locate this row even
// when request_body hasn't propagated yet. Fixes mc#984.
respJSON, _ := json.Marshal(map[string]interface{}{
respJSON, marshalErr := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
}
var idemArg interface{}
if body.IdempotencyKey != "" {
idemArg = body.IdempotencyKey
@@ -484,10 +499,13 @@ handleSuccess:
// dispatch eventually succeeds. Without the key, the drain finds
// the row by (workspace_id, target_id, method) but can't tell
// multiple-queued-delegations-to-same-target apart.
queuedJSON, _ := json.Marshal(map[string]interface{}{
queuedJSON, marshalErr := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
"queued": true,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal queuedJSON failed: %v", delegationID, marshalErr)
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
@@ -507,10 +525,13 @@ handleSuccess:
log.Printf("Delegation %s: step=inserting_success_log", delegationID)
// Store success (response_body must be JSONB, include delegation_id)
respJSON, _ := json.Marshal(map[string]interface{}{
respJSON, marshalErr := json.Marshal(map[string]interface{}{
"text": responseText,
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed')
@@ -592,15 +613,21 @@ func (h *DelegationHandler) Record(c *gin.Context) {
return
}
taskJSON, _ := json.Marshal(map[string]interface{}{
taskJSON, marshalErr := json.Marshal(map[string]interface{}{
"task": body.Task,
"delegation_id": body.DelegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", body.DelegationID, marshalErr)
}
// Store delegation_id in response_body so agent check_delegation_status
// can locate this row. Fixes mc#984.
respJSON, _ := json.Marshal(map[string]interface{}{
respJSON, marshalErr := json.Marshal(map[string]interface{}{
"delegation_id": body.DelegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", body.DelegationID, marshalErr)
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'dispatched')
@@ -664,10 +691,13 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
h.updateDelegationStatus(ctx, sourceID, delegationID, body.Status, body.Error)
if body.Status == "completed" {
respJSON, _ := json.Marshal(map[string]interface{}{
respJSON, marshalErr := json.Marshal(map[string]interface{}{
"text": body.ResponsePreview,
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("Delegation UpdateStatus %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed')
@@ -24,10 +24,13 @@ import (
// Agent Comms tab can show the task text for MCP-initiated delegations.
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
func insertMCPDelegationRow(ctx context.Context, db *sql.DB, workspaceID, targetID, delegationID, task string) error {
taskJSON, _ := json.Marshal(map[string]interface{}{
taskJSON, marshalErr := json.Marshal(map[string]interface{}{
"task": task,
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("insertMCPDelegationRow %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
}
_, err := db.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending')
@@ -138,7 +141,10 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
return "No peers found.", nil
}
b, _ := json.MarshalIndent(peers, "", " ")
b, marshalErr := json.MarshalIndent(peers, "", " ")
if marshalErr != nil {
log.Printf("toolListPeers: json.MarshalIndent peers failed: %v", marshalErr)
}
return string(b), nil
}
@@ -168,7 +174,10 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
if parentID.Valid {
info["parent_id"] = parentID.String
}
b, _ := json.MarshalIndent(info, "", " ")
b, marshalErr := json.MarshalIndent(info, "", " ")
if marshalErr != nil {
log.Printf("toolGetWorkspaceInfo %s: json.MarshalIndent info failed: %v", workspaceID, marshalErr)
}
return string(b), nil
}
@@ -260,7 +269,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
defer cancel()
a2aBody, _ := json.Marshal(map[string]interface{}{
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": delegationID,
"method": "message/send",
@@ -272,6 +281,9 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
},
},
})
if marshalErr != nil {
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
}
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
if err != nil || status < 200 || status >= 300 {
@@ -327,7 +339,10 @@ func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, a
if len(responseBody) > 0 {
result["result"] = extractA2AText(responseBody)
}
b, _ := json.MarshalIndent(result, "", " ")
b, marshalErr := json.MarshalIndent(result, "", " ")
if marshalErr != nil {
log.Printf("toolCheckTaskStatus: json.MarshalIndent result failed: %v", marshalErr)
}
return string(b), nil
}
@@ -482,6 +497,9 @@ func extractA2AText(body []byte) string {
}
// Fallback: marshal result as JSON.
b, _ := json.Marshal(result)
b, marshalErr := json.Marshal(result)
if marshalErr != nil {
log.Printf("extractA2AText: json.Marshal result failed: %v", marshalErr)
}
return string(b)
}
@@ -25,6 +25,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/contract"
@@ -190,7 +191,10 @@ func (h *MCPHandler) recallMemoryLegacyShim(ctx context.Context, workspaceID str
if len(out) == 0 {
return "No memories found.", nil
}
b, _ := json.MarshalIndent(out, "", " ")
b, marshalErr := json.MarshalIndent(out, "", " ")
if marshalErr != nil {
log.Printf("toolRecallMemory: json.MarshalIndent out failed: %v", marshalErr)
}
return string(b), nil
}
@@ -163,7 +163,10 @@ func (h *MCPHandler) toolCommitMemoryV2(ctx context.Context, workspaceID string,
summary := "commit_memory to " + ns
logMemoryMCPActivity(ctx, h.broadcaster, workspaceID, "memory_write", resp.ID, ns, &summary)
out, _ := json.Marshal(resp)
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolCommitMemoryV2 %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
}
return string(out), nil
}
@@ -217,7 +220,10 @@ func (h *MCPHandler) toolSearchMemory(ctx context.Context, workspaceID string, a
}
}
out, _ := json.Marshal(resp)
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolSearchMemory %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
}
return string(out), nil
}
@@ -272,7 +278,10 @@ func (h *MCPHandler) toolCommitSummary(ctx context.Context, workspaceID string,
summary := "commit_summary to " + ns
logMemoryMCPActivity(ctx, h.broadcaster, workspaceID, "memory_summary_write", resp.ID, ns, &summary)
out, _ := json.Marshal(resp)
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolCommitSummary %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
}
return string(out), nil
}
@@ -288,7 +297,10 @@ func (h *MCPHandler) toolListWritableNamespaces(ctx context.Context, workspaceID
if err != nil {
return "", fmt.Errorf("resolve writable: %w", err)
}
b, _ := json.MarshalIndent(ns, "", " ")
b, marshalErr := json.MarshalIndent(ns, "", " ")
if marshalErr != nil {
log.Printf("toolListWritableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
}
return string(b), nil
}
@@ -300,7 +312,10 @@ func (h *MCPHandler) toolListReadableNamespaces(ctx context.Context, workspaceID
if err != nil {
return "", fmt.Errorf("resolve readable: %w", err)
}
b, _ := json.MarshalIndent(ns, "", " ")
b, marshalErr := json.MarshalIndent(ns, "", " ")
if marshalErr != nil {
log.Printf("toolListReadableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
}
return string(b), nil
}
@@ -240,11 +240,14 @@ func (h *MemoriesHandler) Commit(c *gin.Context) {
// Hash the sanitised content so the audit trail reflects what was
// actually persisted (not the raw, potentially secret-bearing input).
sum := sha256.Sum256([]byte(content))
auditBody, _ := json.Marshal(map[string]string{
auditBody, marshalErr := json.Marshal(map[string]string{
"memory_id": memoryID,
"namespace": nsName,
"content_sha256": hex.EncodeToString(sum[:]),
})
if marshalErr != nil {
log.Printf("Commit %s: json.Marshal auditBody failed: %v", workspaceID, marshalErr)
}
summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName
if _, auditErr := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status)
@@ -80,7 +80,10 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
},
"id": nil,
}
body, _ := json.Marshal(payload)
body, marshalErr := json.Marshal(payload)
if marshalErr != nil {
log.Printf("A2AGracefulRestart %s: json.Marshal payload failed: %v", workspaceID, marshalErr)
}
req, reqErr := http.NewRequestWithContext(signalCtx, http.MethodPost, url, bytes.NewReader(body))
if reqErr != nil {
@@ -406,7 +406,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
msgID := fmt.Sprintf("cron-%s-%s", short(sched.ID, 8), uuid.New().String()[:8])
a2aBody, _ := json.Marshal(map[string]interface{}{
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
@@ -416,6 +416,9 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
},
},
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal a2aBody failed: %v", sched.Name, marshalErr)
}
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
@@ -592,12 +595,15 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
// #2026: sanitize the truncated prompt — even UTF-8-safe truncate() can
// carry pre-existing invalid bytes from an agent-edited template. jsonb
// columns reject invalid UTF-8 and hold the transaction open.
cronMeta, _ := json.Marshal(map[string]interface{}{
cronMeta, marshalErr := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"prompt": sanitizeUTF8(textutil.TruncateBytes(sched.Prompt, 200)),
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
}
// #152: persist lastError into error_detail on the activity_logs row
// so GET /workspaces/:id/schedules/:id/history can surface why a run
// failed (previously dropped — history returned status without any
@@ -678,13 +684,16 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
}
skipUpdCancel()
cronMeta, _ := json.Marshal(map[string]interface{}{
cronMeta, marshalErr := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"skipped": true,
"active_tasks": activeTasks,
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
}
// #2026: bounded Background() context on the skipped activity log INSERT
// for the same reason as the fireSchedule activity_logs INSERT above.
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)