fix(handlers): add missing return after json.Marshal error in delegate_task_async #1933

Closed
agent-pm wants to merge 15 commits from fix/mcp-tools-marshal-error-return into main
14 changed files with 68 additions and 10 deletions
@@ -532,6 +532,7 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
var config map[string]interface{}
if err := json.Unmarshal(configJSON, &config); err != nil {
log.Printf("ChannelManager: unmarshal config: %v", err)
config = map[string]interface{}{}
}
if err := DecryptSensitiveFields(config); err != nil {
return ""
@@ -126,6 +126,12 @@ const maxProxyResponseBody = 10 << 20
// gets `{"error":"workspace agent unreachable","restarting":true}` instead
// of Cloudflare's opaque 502 error page. Without these, dead workspaces hang
// long enough that CF gives up first and shows its own page.
//
// No Client.Timeout here — per-request context deadlines govern the full
// request lifetime (canvas = 5 min, agent-to-agent = 30 min). A fixed
// Client.Timeout would pre-empt legitimate slow cold-start flows (e.g.
// Claude Code first-token over OAuth can take 30-60s on boot). Transport-
// level timeouts (Dial, TLS, ResponseHeader) are sufficient safety nets.
var a2aClient = &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
@@ -123,6 +123,10 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
})
if marshalErr != nil {
log.Printf("ProxyA2A %s: json.Marshal respBody failed: %v", workspaceID, marshalErr)
return 0, nil, &proxyA2AError{
Status: http.StatusInternalServerError,
Response: gin.H{"error": "marshal queue response: " + marshalErr.Error()},
}
}
return http.StatusAccepted, respBody, nil
} else {
@@ -153,7 +153,15 @@ func queueRowAuthFields(ctx context.Context, queueID string) (callerID, workspac
if err != nil {
return "", "", err
}
return callerNS.String, workspaceNS.String, nil
callerID = ""
if callerNS.Valid {
callerID = callerNS.String
}
workspaceID = ""
if workspaceNS.Valid {
workspaceID = workspaceNS.String
}
return callerID, workspaceID, nil
}
// GetA2AQueueStatus handles GET /workspaces/:id/a2a/queue/:queue_id.
+10 -2
View File
@@ -73,6 +73,7 @@ func (h *ChannelHandler) List(c *gin.Context) {
var config map[string]interface{}
if err := json.Unmarshal(configJSON, &config); err != nil {
log.Printf("Channels: unmarshal config for channel %s: %v", id, err)
config = map[string]interface{}{}
}
// #319: decrypt sensitive fields first so the mask operates on
// plaintext (first-4 / last-4 of the real token, not the ciphertext
@@ -94,6 +95,7 @@ func (h *ChannelHandler) List(c *gin.Context) {
var allowed []string
if err := json.Unmarshal(allowedJSON, &allowed); err != nil {
log.Printf("Channels: unmarshal allowed_users for channel %s: %v", id, err)
allowed = []string{}
}
entry := map[string]interface{}{
@@ -104,8 +106,12 @@ func (h *ChannelHandler) List(c *gin.Context) {
"enabled": enabled,
"allowed_users": allowed,
"message_count": msgCount,
"created_at": createdAt.Time,
"updated_at": updatedAt.Time,
}
if createdAt.Valid {
entry["created_at"] = createdAt.Time
}
if updatedAt.Valid {
entry["updated_at"] = updatedAt.Time
}
if lastMsg.Valid {
entry["last_message_at"] = lastMsg.Time
@@ -540,9 +546,11 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
}
if err := json.Unmarshal(configJSON, &row.Config); err != nil {
log.Printf("Channels: unmarshal config for webhook row %s: %v", row.ID, err)
row.Config = map[string]interface{}{}
}
if err := json.Unmarshal(allowedJSON, &row.AllowedUsers); err != nil {
log.Printf("Channels: unmarshal allowed_users for webhook row %s: %v", row.ID, err)
row.AllowedUsers = []string{}
}
if err := channels.DecryptSensitiveFields(row.Config); err != nil {
log.Printf("Channels: decrypt webhook row %s: %v", row.ID, err)
@@ -186,6 +186,8 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build A2A request"})
return
}
// Fire-and-forget: send A2A in a background goroutine.
@@ -167,6 +167,9 @@ func generateAppInstallationToken() (string, time.Time, error) {
return "", time.Time{}, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusCreated {
return "", time.Time{}, fmt.Errorf("github token endpoint returned status %d", resp.StatusCode)
}
var result struct {
Token string `json:"token"`
ExpiresAt time.Time `json:"expires_at"`
@@ -30,6 +30,7 @@ func insertMCPDelegationRow(ctx context.Context, db *sql.DB, workspaceID, target
})
if marshalErr != nil {
log.Printf("insertMCPDelegationRow %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
return fmt.Errorf("marshal task JSON: %w", marshalErr)
}
_, err := db.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
@@ -144,6 +145,7 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
b, marshalErr := json.MarshalIndent(peers, "", " ")
if marshalErr != nil {
log.Printf("toolListPeers: json.MarshalIndent peers failed: %v", marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -177,6 +179,7 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
b, marshalErr := json.MarshalIndent(info, "", " ")
if marshalErr != nil {
log.Printf("toolGetWorkspaceInfo %s: json.MarshalIndent info failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -283,6 +286,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
})
if marshalErr != nil {
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
return
}
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
@@ -330,9 +334,13 @@ func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, a
result := map[string]interface{}{
"task_id": taskID,
"status": status.String,
"target_id": targetID,
}
if status.Valid {
result["status"] = status.String
} else {
result["status"] = "unknown"
}
if errorDetail.Valid && errorDetail.String != "" {
result["error"] = errorDetail.String
}
@@ -342,6 +350,7 @@ func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, a
b, marshalErr := json.MarshalIndent(result, "", " ")
if marshalErr != nil {
log.Printf("toolCheckTaskStatus: json.MarshalIndent result failed: %v", marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -194,6 +194,7 @@ func (h *MCPHandler) recallMemoryLegacyShim(ctx context.Context, workspaceID str
b, marshalErr := json.MarshalIndent(out, "", " ")
if marshalErr != nil {
log.Printf("toolRecallMemory: json.MarshalIndent out failed: %v", marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -166,6 +166,7 @@ func (h *MCPHandler) toolCommitMemoryV2(ctx context.Context, workspaceID string,
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolCommitMemoryV2 %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(out), nil
}
@@ -223,6 +224,7 @@ func (h *MCPHandler) toolSearchMemory(ctx context.Context, workspaceID string, a
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolSearchMemory %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(out), nil
}
@@ -281,6 +283,7 @@ func (h *MCPHandler) toolCommitSummary(ctx context.Context, workspaceID string,
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolCommitSummary %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(out), nil
}
@@ -300,6 +303,7 @@ func (h *MCPHandler) toolListWritableNamespaces(ctx context.Context, workspaceID
b, marshalErr := json.MarshalIndent(ns, "", " ")
if marshalErr != nil {
log.Printf("toolListWritableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -315,6 +319,7 @@ func (h *MCPHandler) toolListReadableNamespaces(ctx context.Context, workspaceID
b, marshalErr := json.MarshalIndent(ns, "", " ")
if marshalErr != nil {
log.Printf("toolListReadableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -345,8 +345,16 @@ func (h *RegistryHandler) Register(c *gin.Context) {
if qErr := db.DB.QueryRowContext(ctx,
`SELECT name, role FROM workspaces WHERE id = $1`, payload.ID,
).Scan(&dbName, &dbRole); qErr == nil {
name := ""
if dbName.Valid {
name = dbName.String
}
role := ""
if dbRole.Valid {
role = dbRole.String
}
if rc, did := reconcileAgentCardIdentity(
payload.AgentCard, payload.ID, dbName.String, dbRole.String,
payload.AgentCard, payload.ID, name, role,
); did {
reconciledCard = rc
log.Printf("Registry register: reconciled agent_card identity for %s from workspaces row", payload.ID)
@@ -83,6 +83,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
body, marshalErr := json.Marshal(payload)
if marshalErr != nil {
log.Printf("A2AGracefulRestart %s: json.Marshal payload failed: %v", workspaceID, marshalErr)
return
}
req, reqErr := http.NewRequestWithContext(signalCtx, http.MethodPost, url, bytes.NewReader(body))
@@ -160,13 +160,14 @@ func (h *ScheduleHandler) Create(c *gin.Context) {
}
// Validate timezone
if _, err := time.LoadLocation(body.Timezone); err != nil {
loc, err := time.LoadLocation(body.Timezone)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid timezone: " + body.Timezone})
return
}
// Validate and compute next run
nextRun, err := scheduler.ComputeNextRun(body.CronExpr, body.Timezone, time.Now())
nextRun, err := scheduler.ComputeNextRun(body.CronExpr, body.Timezone, time.Now().In(loc))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
@@ -260,11 +261,12 @@ func (h *ScheduleHandler) Update(c *gin.Context) {
if body.Timezone != nil {
tz = *body.Timezone
}
if _, err := time.LoadLocation(tz); err != nil {
loc, err := time.LoadLocation(tz)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid timezone: " + tz})
return
}
nextRun, err := scheduler.ComputeNextRun(cronExpr, tz, time.Now())
nextRun, err := scheduler.ComputeNextRun(cronExpr, tz, time.Now().In(loc))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
@@ -1004,7 +1004,7 @@ func stripPlatformManagedLLMBypassEnv(envVars map[string]string) {
}
func runtimeUsesAnthropicNativeProxy(runtime string) bool {
return strings.TrimSpace(strings.ToLower(runtime)) == "claude-code"
return strings.EqualFold(strings.TrimSpace(runtime), "claude-code")
}
func firstNonEmptyEnv(names ...string) string {