fix(handlers,channels,scheduler): log ignored json.Marshal errors #1918
@@ -313,7 +313,7 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
history := m.loadHistory(ctx, historyKey)
|
||||
|
||||
// Build A2A JSON-RPC payload
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
@@ -333,6 +333,9 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
},
|
||||
},
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels %s: json.Marshal a2aBody failed: %v", ch.ChannelType, marshalErr)
|
||||
}
|
||||
|
||||
callerID := "channel:" + ch.ChannelType
|
||||
|
||||
@@ -665,12 +668,15 @@ func (m *Manager) appendHistory(ctx context.Context, key string, username, userM
|
||||
if db.RDB == nil {
|
||||
return
|
||||
}
|
||||
entry, _ := json.Marshal(map[string]string{
|
||||
entry, marshalErr := json.Marshal(map[string]string{
|
||||
"user": username,
|
||||
"message": userMsg,
|
||||
"reply": agentReply,
|
||||
"time": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("appendHistory %s: json.Marshal entry failed: %v", key, marshalErr)
|
||||
}
|
||||
db.RDB.LPush(ctx, key, string(entry))
|
||||
db.RDB.LTrim(ctx, key, 0, int64(maxHistoryEntries-1))
|
||||
db.RDB.Expire(ctx, key, historyTTL)
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -159,7 +160,10 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int
|
||||
payload["icon_emoji"] = iconEmoji
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(payload)
|
||||
body, marshalErr := json.Marshal(payload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("slack SendMessage: json.Marshal payload failed: %v", marshalErr)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return fmt.Errorf("slack: build request: %w", err)
|
||||
|
||||
@@ -115,12 +115,15 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
if logActivity {
|
||||
h.logA2ABusyQueued(ctx, workspaceID, callerID, body, a2aMethod, durationMs)
|
||||
}
|
||||
respBody, _ := json.Marshal(gin.H{
|
||||
respBody, marshalErr := json.Marshal(gin.H{
|
||||
"queued": true,
|
||||
"queue_id": qid,
|
||||
"queue_depth": depth,
|
||||
"message": "workspace agent busy — request queued, will dispatch when capacity available",
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("ProxyA2A %s: json.Marshal respBody failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return http.StatusAccepted, respBody, nil
|
||||
} else {
|
||||
// Queue insert failed — fall through to legacy 503 behavior
|
||||
|
||||
@@ -419,10 +419,13 @@ func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context,
|
||||
return
|
||||
}
|
||||
responseText := extractResponseText(respBody)
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
respJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"text": responseText,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("a2aQueue stitch %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
res, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE activity_logs
|
||||
SET status = 'completed',
|
||||
|
||||
@@ -164,7 +164,10 @@ func (w *AgentMessageWriter) Send(
|
||||
}
|
||||
respPayload["parts"] = fileParts
|
||||
}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
respJSON, marshalErr := json.Marshal(respPayload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("AgentMessageWriter %s: json.Marshal respPayload failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
preview := textutil.TruncateRunes(message, 80)
|
||||
if _, err := w.db.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
|
||||
@@ -34,7 +34,10 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
ctxJSON, _ := json.Marshal(body.Context)
|
||||
ctxJSON, marshalErr := json.Marshal(body.Context)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Approvals create %s: json.Marshal context failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
if ctxJSON == nil {
|
||||
ctxJSON = []byte("{}")
|
||||
}
|
||||
|
||||
@@ -344,7 +344,10 @@ func computeAuditHMAC(key []byte, ev *auditEventRow) string {
|
||||
"timestamp": ev.Timestamp.UTC().Format("2006-01-02T15:04:05Z"),
|
||||
}
|
||||
|
||||
payload, _ := json.Marshal(canonical) // compact, sorted keys
|
||||
payload, marshalErr := json.Marshal(canonical) // compact, sorted keys
|
||||
if marshalErr != nil {
|
||||
log.Printf("auditChainHash: json.Marshal canonical failed: %v", marshalErr)
|
||||
}
|
||||
mac := hmac.New(sha256.New, key)
|
||||
mac.Write(payload)
|
||||
return hex.EncodeToString(mac.Sum(nil))
|
||||
|
||||
@@ -169,8 +169,14 @@ func (h *ChannelHandler) Create(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
configJSON, _ := json.Marshal(body.Config)
|
||||
allowedJSON, _ := json.Marshal(body.AllowedUsers)
|
||||
configJSON, marshalErr := json.Marshal(body.Config)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels create %s: json.Marshal config failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
allowedJSON, marshalErr := json.Marshal(body.AllowedUsers)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels create %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
enabled := true
|
||||
if body.Enabled != nil {
|
||||
enabled = *body.Enabled
|
||||
@@ -225,11 +231,17 @@ func (h *ChannelHandler) Update(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "encrypt failed"})
|
||||
return
|
||||
}
|
||||
j, _ := json.Marshal(body.Config)
|
||||
j, marshalErr := json.Marshal(body.Config)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels update %s: json.Marshal config failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
configArg = string(j)
|
||||
}
|
||||
if body.AllowedUsers != nil {
|
||||
j, _ := json.Marshal(body.AllowedUsers)
|
||||
j, marshalErr := json.Marshal(body.AllowedUsers)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Channels update %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
allowedArg = string(j)
|
||||
}
|
||||
|
||||
|
||||
@@ -57,10 +57,16 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st
|
||||
"text": responsePreview,
|
||||
"delegation_id": delegationID,
|
||||
}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
reqJSON, _ := json.Marshal(map[string]interface{}{
|
||||
respJSON, marshalErr := json.Marshal(respPayload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respPayload failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
reqJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal reqPayload failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
logStatus := "ok"
|
||||
if status == "failed" {
|
||||
logStatus = "error"
|
||||
@@ -165,7 +171,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
// check_task_status returned status='queued' forever even after a
|
||||
// real reply landed). messageId mirrors delegation_id so the
|
||||
// platform's idempotency-key extraction also keys off the same id.
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
@@ -176,6 +182,9 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
},
|
||||
},
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
|
||||
// Fire-and-forget: send A2A in a background goroutine.
|
||||
//
|
||||
@@ -304,16 +313,22 @@ const (
|
||||
// insertDelegationRow stores the pending delegation row. See
|
||||
// insertDelegationOutcome for the three possible return values.
|
||||
func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, body delegateRequest, delegationID string) insertDelegationOutcome {
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
taskJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"task": body.Task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
// Store delegation_id in response_body so agent check_delegation_status
|
||||
// (which reads response_body->>delegation_id) can locate this row even
|
||||
// when request_body hasn't propagated yet. Fixes mc#984.
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
respJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
idemArg = body.IdempotencyKey
|
||||
@@ -484,10 +499,13 @@ handleSuccess:
|
||||
// dispatch eventually succeeds. Without the key, the drain finds
|
||||
// the row by (workspace_id, target_id, method) but can't tell
|
||||
// multiple-queued-delegations-to-same-target apart.
|
||||
queuedJSON, _ := json.Marshal(map[string]interface{}{
|
||||
queuedJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
"queued": true,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal queuedJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
|
||||
@@ -507,10 +525,13 @@ handleSuccess:
|
||||
|
||||
log.Printf("Delegation %s: step=inserting_success_log", delegationID)
|
||||
// Store success (response_body must be JSONB, include delegation_id)
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
respJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"text": responseText,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed')
|
||||
@@ -592,15 +613,21 @@ func (h *DelegationHandler) Record(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
taskJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"task": body.Task,
|
||||
"delegation_id": body.DelegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", body.DelegationID, marshalErr)
|
||||
}
|
||||
// Store delegation_id in response_body so agent check_delegation_status
|
||||
// can locate this row. Fixes mc#984.
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
respJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": body.DelegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", body.DelegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'dispatched')
|
||||
@@ -664,10 +691,13 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
|
||||
h.updateDelegationStatus(ctx, sourceID, delegationID, body.Status, body.Error)
|
||||
|
||||
if body.Status == "completed" {
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
respJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"text": body.ResponsePreview,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation UpdateStatus %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed')
|
||||
|
||||
@@ -24,10 +24,13 @@ import (
|
||||
// Agent Comms tab can show the task text for MCP-initiated delegations.
|
||||
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
|
||||
func insertMCPDelegationRow(ctx context.Context, db *sql.DB, workspaceID, targetID, delegationID, task string) error {
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
taskJSON, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"task": task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("insertMCPDelegationRow %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
_, err := db.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending')
|
||||
@@ -138,7 +141,10 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
|
||||
return "No peers found.", nil
|
||||
}
|
||||
|
||||
b, _ := json.MarshalIndent(peers, "", " ")
|
||||
b, marshalErr := json.MarshalIndent(peers, "", " ")
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolListPeers: json.MarshalIndent peers failed: %v", marshalErr)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
@@ -168,7 +174,10 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
|
||||
if parentID.Valid {
|
||||
info["parent_id"] = parentID.String
|
||||
}
|
||||
b, _ := json.MarshalIndent(info, "", " ")
|
||||
b, marshalErr := json.MarshalIndent(info, "", " ")
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolGetWorkspaceInfo %s: json.MarshalIndent info failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
@@ -260,7 +269,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": delegationID,
|
||||
"method": "message/send",
|
||||
@@ -272,6 +281,9 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
},
|
||||
},
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
|
||||
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
|
||||
if err != nil || status < 200 || status >= 300 {
|
||||
@@ -327,7 +339,10 @@ func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, a
|
||||
if len(responseBody) > 0 {
|
||||
result["result"] = extractA2AText(responseBody)
|
||||
}
|
||||
b, _ := json.MarshalIndent(result, "", " ")
|
||||
b, marshalErr := json.MarshalIndent(result, "", " ")
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolCheckTaskStatus: json.MarshalIndent result failed: %v", marshalErr)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
@@ -482,6 +497,9 @@ func extractA2AText(body []byte) string {
|
||||
}
|
||||
|
||||
// Fallback: marshal result as JSON.
|
||||
b, _ := json.Marshal(result)
|
||||
b, marshalErr := json.Marshal(result)
|
||||
if marshalErr != nil {
|
||||
log.Printf("extractA2AText: json.Marshal result failed: %v", marshalErr)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/contract"
|
||||
@@ -190,7 +191,10 @@ func (h *MCPHandler) recallMemoryLegacyShim(ctx context.Context, workspaceID str
|
||||
if len(out) == 0 {
|
||||
return "No memories found.", nil
|
||||
}
|
||||
b, _ := json.MarshalIndent(out, "", " ")
|
||||
b, marshalErr := json.MarshalIndent(out, "", " ")
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolRecallMemory: json.MarshalIndent out failed: %v", marshalErr)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -163,7 +163,10 @@ func (h *MCPHandler) toolCommitMemoryV2(ctx context.Context, workspaceID string,
|
||||
summary := "commit_memory to " + ns
|
||||
logMemoryMCPActivity(ctx, h.broadcaster, workspaceID, "memory_write", resp.ID, ns, &summary)
|
||||
|
||||
out, _ := json.Marshal(resp)
|
||||
out, marshalErr := json.Marshal(resp)
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolCommitMemoryV2 %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return string(out), nil
|
||||
}
|
||||
|
||||
@@ -217,7 +220,10 @@ func (h *MCPHandler) toolSearchMemory(ctx context.Context, workspaceID string, a
|
||||
}
|
||||
}
|
||||
|
||||
out, _ := json.Marshal(resp)
|
||||
out, marshalErr := json.Marshal(resp)
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolSearchMemory %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return string(out), nil
|
||||
}
|
||||
|
||||
@@ -272,7 +278,10 @@ func (h *MCPHandler) toolCommitSummary(ctx context.Context, workspaceID string,
|
||||
summary := "commit_summary to " + ns
|
||||
logMemoryMCPActivity(ctx, h.broadcaster, workspaceID, "memory_summary_write", resp.ID, ns, &summary)
|
||||
|
||||
out, _ := json.Marshal(resp)
|
||||
out, marshalErr := json.Marshal(resp)
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolCommitSummary %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return string(out), nil
|
||||
}
|
||||
|
||||
@@ -288,7 +297,10 @@ func (h *MCPHandler) toolListWritableNamespaces(ctx context.Context, workspaceID
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("resolve writable: %w", err)
|
||||
}
|
||||
b, _ := json.MarshalIndent(ns, "", " ")
|
||||
b, marshalErr := json.MarshalIndent(ns, "", " ")
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolListWritableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
@@ -300,7 +312,10 @@ func (h *MCPHandler) toolListReadableNamespaces(ctx context.Context, workspaceID
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("resolve readable: %w", err)
|
||||
}
|
||||
b, _ := json.MarshalIndent(ns, "", " ")
|
||||
b, marshalErr := json.MarshalIndent(ns, "", " ")
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolListReadableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -240,11 +240,14 @@ func (h *MemoriesHandler) Commit(c *gin.Context) {
|
||||
// Hash the sanitised content so the audit trail reflects what was
|
||||
// actually persisted (not the raw, potentially secret-bearing input).
|
||||
sum := sha256.Sum256([]byte(content))
|
||||
auditBody, _ := json.Marshal(map[string]string{
|
||||
auditBody, marshalErr := json.Marshal(map[string]string{
|
||||
"memory_id": memoryID,
|
||||
"namespace": nsName,
|
||||
"content_sha256": hex.EncodeToString(sum[:]),
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Commit %s: json.Marshal auditBody failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName
|
||||
if _, auditErr := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status)
|
||||
|
||||
@@ -80,7 +80,10 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
|
||||
},
|
||||
"id": nil,
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
body, marshalErr := json.Marshal(payload)
|
||||
if marshalErr != nil {
|
||||
log.Printf("A2AGracefulRestart %s: json.Marshal payload failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
|
||||
req, reqErr := http.NewRequestWithContext(signalCtx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if reqErr != nil {
|
||||
|
||||
@@ -406,7 +406,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
|
||||
msgID := fmt.Sprintf("cron-%s-%s", short(sched.ID, 8), uuid.New().String()[:8])
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
@@ -416,6 +416,9 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
},
|
||||
},
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal a2aBody failed: %v", sched.Name, marshalErr)
|
||||
}
|
||||
|
||||
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
|
||||
|
||||
@@ -592,12 +595,15 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
// #2026: sanitize the truncated prompt — even UTF-8-safe truncate() can
|
||||
// carry pre-existing invalid bytes from an agent-edited template. jsonb
|
||||
// columns reject invalid UTF-8 and hold the transaction open.
|
||||
cronMeta, _ := json.Marshal(map[string]interface{}{
|
||||
cronMeta, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"cron_expr": sched.CronExpr,
|
||||
"prompt": sanitizeUTF8(textutil.TruncateBytes(sched.Prompt, 200)),
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
|
||||
}
|
||||
// #152: persist lastError into error_detail on the activity_logs row
|
||||
// so GET /workspaces/:id/schedules/:id/history can surface why a run
|
||||
// failed (previously dropped — history returned status without any
|
||||
@@ -678,13 +684,16 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
}
|
||||
skipUpdCancel()
|
||||
|
||||
cronMeta, _ := json.Marshal(map[string]interface{}{
|
||||
cronMeta, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"cron_expr": sched.CronExpr,
|
||||
"skipped": true,
|
||||
"active_tasks": activeTasks,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
|
||||
}
|
||||
// #2026: bounded Background() context on the skipped activity log INSERT
|
||||
// for the same reason as the fireSchedule activity_logs INSERT above.
|
||||
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
|
||||
Reference in New Issue
Block a user