fix(workspace-server): handle unchecked errors in channels, bundle importer, telegram, approvals #2039
@@ -3,6 +3,7 @@ package bundle
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
@@ -92,7 +93,9 @@ func Import(
|
||||
if err != nil {
|
||||
markFailed(provCtx, wsID, broadcaster, err)
|
||||
} else if url != "" {
|
||||
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
|
||||
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -139,12 +142,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
|
||||
// markProvisionFailed in workspace-server/internal/handlers/
|
||||
// workspace_provision_shared.go.
|
||||
msg := err.Error()
|
||||
db.DB.ExecContext(ctx,
|
||||
if _, dbErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
|
||||
models.StatusFailed, msg, wsID)
|
||||
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
models.StatusFailed, msg, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
|
||||
}
|
||||
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
"error": msg,
|
||||
})
|
||||
}); bcErr != nil {
|
||||
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
|
||||
}
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) interface{} {
|
||||
|
||||
@@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
|
||||
// Update stats in DB
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, ch.ID)
|
||||
`, ch.ID); err != nil {
|
||||
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast event
|
||||
if m.broadcaster != nil {
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"username": msg.Username,
|
||||
"direction": "inbound",
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast inbound event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
|
||||
}
|
||||
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, channelID)
|
||||
`, channelID); err != nil {
|
||||
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.broadcaster != nil {
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"direction": "outbound",
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast outbound event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
|
||||
return ""
|
||||
}
|
||||
var config map[string]interface{}
|
||||
json.Unmarshal(configJSON, &config)
|
||||
if err := json.Unmarshal(configJSON, &config); err != nil {
|
||||
log.Printf("Channels: failed to unmarshal channel config: %v", err)
|
||||
return ""
|
||||
}
|
||||
if err := DecryptSensitiveFields(config); err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
|
||||
if err != nil {
|
||||
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
|
||||
}
|
||||
json.Unmarshal(configJSON, &ch.Config)
|
||||
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
|
||||
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
|
||||
}
|
||||
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
|
||||
}
|
||||
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
|
||||
// methods downstream read them as plaintext strings.
|
||||
if err := DecryptSensitiveFields(ch.Config); err != nil {
|
||||
|
||||
@@ -513,7 +513,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
|
||||
// Acknowledge the button press (removes loading spinner)
|
||||
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
|
||||
bot.Send(ackCfg)
|
||||
if _, err := bot.Send(ackCfg); err != nil {
|
||||
log.Printf("telegram: failed to send callback ack: %v", err)
|
||||
}
|
||||
|
||||
// Update the message to show what was clicked
|
||||
decision := "approved"
|
||||
@@ -525,7 +527,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
cb.Message.MessageID,
|
||||
cb.Message.Text+"\n\n✅ CEO "+decision,
|
||||
)
|
||||
bot.Send(editMsg)
|
||||
if _, err := bot.Send(editMsg); err != nil {
|
||||
log.Printf("telegram: failed to send edit message: %v", err)
|
||||
}
|
||||
|
||||
// Route the decision as an inbound message to the agent
|
||||
inbound := &InboundMessage{
|
||||
|
||||
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
|
||||
// matching (the wsauth errors are typed for the invalid case).
|
||||
var errInvalidCallerToken = errors.New("missing caller auth token")
|
||||
|
||||
// canvasUserMessage holds the extracted user message extracted from an
|
||||
// A2A canvas request body for broadcasting to other sessions.
|
||||
type canvasUserMessage struct {
|
||||
Message string `json:"message,omitempty"`
|
||||
Parts []map[string]interface{} `json:"parts,omitempty"`
|
||||
MessageID string `json:"messageId,omitempty"`
|
||||
Attachments []map[string]interface{} `json:"attachments,omitempty"`
|
||||
}
|
||||
|
||||
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
|
||||
// the user-authored text and attachments from a canvas-initiated message/send.
|
||||
// Returns nil when the body is not a canvas user message (empty, malformed,
|
||||
|
||||
@@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
"task_id": body.TaskID,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval requested: %v", err)
|
||||
}
|
||||
|
||||
// Auto-escalate to parent
|
||||
var parentID *string
|
||||
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
|
||||
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
|
||||
}
|
||||
if parentID != nil {
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
|
||||
@@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Auto-expire stale approvals (older than 10 min)
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
|
||||
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
|
||||
`)
|
||||
`); err != nil {
|
||||
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
|
||||
@@ -211,11 +219,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
|
||||
eventType = "APPROVAL_DENIED"
|
||||
}
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"decision": body.Decision,
|
||||
"decided_by": decidedBy,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval decision: %v", err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
|
||||
}
|
||||
|
||||
@@ -67,7 +67,10 @@ func (h *ChannelHandler) List(c *gin.Context) {
|
||||
}
|
||||
|
||||
var config map[string]interface{}
|
||||
json.Unmarshal(configJSON, &config)
|
||||
if err := json.Unmarshal(configJSON, &config); err != nil {
|
||||
log.Printf("Channels: unmarshal config on list for channel %s: %v", id, err)
|
||||
config = map[string]interface{}{}
|
||||
}
|
||||
// #319: decrypt sensitive fields first so the mask operates on
|
||||
// plaintext (first-4 / last-4 of the real token, not the ciphertext
|
||||
// prefix). Decrypt errors are logged but non-fatal — List must keep
|
||||
@@ -86,7 +89,10 @@ func (h *ChannelHandler) List(c *gin.Context) {
|
||||
}
|
||||
|
||||
var allowed []string
|
||||
json.Unmarshal(allowedJSON, &allowed)
|
||||
if err := json.Unmarshal(allowedJSON, &allowed); err != nil {
|
||||
log.Printf("Channels: unmarshal allowed_users on list for channel %s: %v", id, err)
|
||||
allowed = []string{}
|
||||
}
|
||||
|
||||
entry := map[string]interface{}{
|
||||
"id": id,
|
||||
|
||||
Reference in New Issue
Block a user