diff --git a/workspace-server/internal/bundle/importer.go b/workspace-server/internal/bundle/importer.go index f61c7a98c..29c4da7d3 100644 --- a/workspace-server/internal/bundle/importer.go +++ b/workspace-server/internal/bundle/importer.go @@ -3,6 +3,7 @@ package bundle import ( "context" "fmt" + "log" "strings" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" @@ -92,7 +93,9 @@ func Import( if err != nil { markFailed(provCtx, wsID, broadcaster, err) } else if url != "" { - db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID) + if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil { + log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr) + } } }() } @@ -139,12 +142,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste // markProvisionFailed in workspace-server/internal/handlers/ // workspace_provision_shared.go. msg := err.Error() - db.DB.ExecContext(ctx, + if _, dbErr := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`, - models.StatusFailed, msg, wsID) - broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{ + models.StatusFailed, msg, wsID); dbErr != nil { + log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr) + } + if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{ "error": msg, - }) + }); bcErr != nil { + log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr) + } } func nilIfEmpty(s string) interface{} { diff --git a/workspace-server/internal/channels/manager.go b/workspace-server/internal/channels/manager.go index 63cfe9503..1e4c54aa5 100644 --- a/workspace-server/internal/channels/manager.go +++ b/workspace-server/internal/channels/manager.go @@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound // Update stats in DB if db.DB != nil { - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_channels SET last_message_at = now(), message_count = message_count + 1, updated_at = now() WHERE id = $1 - `, ch.ID) + `, ch.ID); err != nil { + log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err) + } } // Broadcast event if m.broadcaster != nil { - m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ + if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ "channel_id": ch.ID, "channel_type": ch.ChannelType, "username": msg.Username, "direction": "inbound", - }) + }); err != nil { + log.Printf("Channels: failed to broadcast inbound event: %v", err) + } } return nil @@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin } if db.DB != nil { - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_channels SET last_message_at = now(), message_count = message_count + 1, updated_at = now() WHERE id = $1 - `, channelID) + `, channelID); err != nil { + log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err) + } } if m.broadcaster != nil { - m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ + if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ "channel_id": ch.ID, "channel_type": ch.ChannelType, "direction": "outbound", - }) + }); err != nil { + log.Printf("Channels: failed to broadcast outbound event: %v", err) + } } return nil @@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID return "" } var config map[string]interface{} - json.Unmarshal(configJSON, &config) + if err := json.Unmarshal(configJSON, &config); err != nil { + log.Printf("Channels: failed to unmarshal channel config: %v", err) + return "" + } if err := DecryptSensitiveFields(config); err != nil { return "" } @@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow if err != nil { return ch, fmt.Errorf("channel %s not found: %w", channelID, err) } - json.Unmarshal(configJSON, &ch.Config) - json.Unmarshal(allowedJSON, &ch.AllowedUsers) + if err := json.Unmarshal(configJSON, &ch.Config); err != nil { + return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err) + } + if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil { + return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err) + } // #319: decrypt bot_token / webhook_secret — SendOutbound and adapter // methods downstream read them as plaintext strings. if err := DecryptSensitiveFields(ch.Config); err != nil { diff --git a/workspace-server/internal/channels/telegram.go b/workspace-server/internal/channels/telegram.go index 778afa5c0..fc7717cb0 100644 --- a/workspace-server/internal/channels/telegram.go +++ b/workspace-server/internal/channels/telegram.go @@ -513,7 +513,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in // Acknowledge the button press (removes loading spinner) ackCfg := tgbotapi.NewCallback(cb.ID, "Received") - bot.Send(ackCfg) + if _, err := bot.Send(ackCfg); err != nil { + log.Printf("telegram: failed to send callback ack: %v", err) + } // Update the message to show what was clicked decision := "approved" @@ -525,7 +527,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in cb.Message.MessageID, cb.Message.Text+"\n\nāœ… CEO "+decision, ) - bot.Send(editMsg) + if _, err := bot.Send(editMsg); err != nil { + log.Printf("telegram: failed to send edit message: %v", err) + } // Route the decision as an inbound message to the agent inbound := &InboundMessage{ diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 98c51bb7d..11916e6b1 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e // matching (the wsauth errors are typed for the invalid case). var errInvalidCallerToken = errors.New("missing caller auth token") -// canvasUserMessage holds the extracted user message extracted from an -// A2A canvas request body for broadcasting to other sessions. -type canvasUserMessage struct { - Message string `json:"message,omitempty"` - Parts []map[string]interface{} `json:"parts,omitempty"` - MessageID string `json:"messageId,omitempty"` - Attachments []map[string]interface{} `json:"attachments,omitempty"` -} - // extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts // the user-authored text and attachments from a canvas-initiated message/send. // Returns nil when the body is not a canvas user message (empty, malformed, diff --git a/workspace-server/internal/handlers/approvals.go b/workspace-server/internal/handlers/approvals.go index dcce896d6..36e4c5e9c 100644 --- a/workspace-server/internal/handlers/approvals.go +++ b/workspace-server/internal/handlers/approvals.go @@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{ "approval_id": approvalID, "action": body.Action, "reason": body.Reason, "task_id": body.TaskID, - }) + }); err != nil { + log.Printf("approvals: failed to broadcast approval requested: %v", err) + } // Auto-escalate to parent var parentID *string - db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID) + if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil { + log.Printf("approvals: failed to lookup parent for escalation: %v", err) + } if parentID != nil { - h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{ "approval_id": approvalID, "from_workspace_id": workspaceID, "action": body.Action, "reason": body.Reason, - }) + }); err != nil { + log.Printf("approvals: failed to broadcast approval escalated: %v", err) + } } c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"}) @@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) { ctx := c.Request.Context() // Auto-expire stale approvals (older than 10 min) - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now() WHERE status = 'pending' AND created_at < now() - interval '10 minutes' - `) + `); err != nil { + log.Printf("approvals: failed to auto-expire stale approvals: %v", err) + } rows, err := db.DB.QueryContext(ctx, ` SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at @@ -211,11 +219,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) { eventType = "APPROVAL_DENIED" } - h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{ "approval_id": approvalID, "decision": body.Decision, "decided_by": decidedBy, - }) + }); err != nil { + log.Printf("approvals: failed to broadcast approval decision: %v", err) + } c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID}) } diff --git a/workspace-server/internal/handlers/channels.go b/workspace-server/internal/handlers/channels.go index 6d9008bf5..409774f49 100644 --- a/workspace-server/internal/handlers/channels.go +++ b/workspace-server/internal/handlers/channels.go @@ -67,7 +67,10 @@ func (h *ChannelHandler) List(c *gin.Context) { } var config map[string]interface{} - json.Unmarshal(configJSON, &config) + if err := json.Unmarshal(configJSON, &config); err != nil { + log.Printf("Channels: unmarshal config on list for channel %s: %v", id, err) + config = map[string]interface{}{} + } // #319: decrypt sensitive fields first so the mask operates on // plaintext (first-4 / last-4 of the real token, not the ciphertext // prefix). Decrypt errors are logged but non-fatal — List must keep @@ -86,7 +89,10 @@ func (h *ChannelHandler) List(c *gin.Context) { } var allowed []string - json.Unmarshal(allowedJSON, &allowed) + if err := json.Unmarshal(allowedJSON, &allowed); err != nil { + log.Printf("Channels: unmarshal allowed_users on list for channel %s: %v", id, err) + allowed = []string{} + } entry := map[string]interface{}{ "id": id,