Compare commits

...

14 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) 85f8d78ac0 ci: remove unused canvasUserMessage type to fix lint on staging
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 10s
CI / Detect changes (pull_request) Successful in 8s
CI / Canvas (Next.js) (pull_request) Failing after 49s
CI / all-required (pull_request) Failing after 6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 39s
E2E API Smoke Test / detect-changes (pull_request) Successful in 10s
E2E Chat / detect-changes (pull_request) Successful in 13s
CI / Platform (Go) (pull_request) Successful in 6m9s
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
Handlers Postgres Integration / detect-changes (pull_request) Successful in 6s
Harness Replays / detect-changes (pull_request) Successful in 6s
CI / Python Lint & Test (pull_request) Successful in 7m15s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 10s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m26s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 8s
qa-review / approved (pull_request_target) Successful in 10s
security-review / approved (pull_request_target) Successful in 8s
Harness Replays / Harness Replays (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m55s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 31s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m42s
E2E Chat / E2E Chat (pull_request) Failing after 7m58s
sop-checklist / review-refire (pull_request_target) Has been skipped
gate-check-v3 / gate-check (pull_request_target) Successful in 3s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 3s
sop-tier-check / tier-check (pull_request_target) Successful in 3s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 1m10s
audit-force-merge / audit (pull_request_target) Has been skipped
internal/handlers/a2a_proxy_helpers.go:412 had an unused struct that
causes golangci-lint `unused` failure on every PR targeting staging.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 12:40:13 +00:00
Molecule AI Dev Engineer A (Kimi) a7c3f6b7ed fix(workspace): check error on external workspace status update
cascade-list-drift-gate / check (pull_request) Waiting to run
Check migration collisions / Migration version collision check (pull_request) Waiting to run
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Waiting to run
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Waiting to run
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Waiting to run
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Waiting to run
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Waiting to run
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Waiting to run
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Waiting to run
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Waiting to run
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Waiting to run
publish-runtime-autobump / pr-validate (pull_request) Waiting to run
publish-runtime-autobump / bump-and-tag (pull_request) Waiting to run
review-check-tests / review-check.sh regression tests (pull_request) Waiting to run
Runtime Pin Compatibility / PyPI-latest install + import smoke (pull_request) Waiting to run
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Waiting to run
sop-checklist / review-refire (pull_request) Waiting to run
audit-force-merge / audit (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 13s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 2m12s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 16s
gate-check-v3 / gate-check (pull_request) Successful in 18s
qa-review / approved (pull_request) Successful in 11s
sop-checklist / na-declarations (pull_request) N/A: (none)
security-review / approved (pull_request) Successful in 18s
sop-checklist / all-items-acked (pull_request) Successful in 12s
sop-tier-check / tier-check (pull_request) Successful in 15s
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
E2E API Smoke Test / E2E API Smoke Test (pull_request) Has been cancelled
E2E Chat / E2E Chat (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Has been cancelled
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Has been cancelled
Harness Replays / Harness Replays (pull_request) Has been cancelled
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Has been cancelled
E2E Chat / detect-changes (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Has been cancelled
Handlers Postgres Integration / detect-changes (pull_request) Has been cancelled
Harness Replays / detect-changes (pull_request) Has been cancelled
Runtime PR-Built Compatibility / detect-changes (pull_request) Has been cancelled
CI / all-required (pull_request) Has been cancelled
CI / Detect changes (pull_request) Has been cancelled
CI / Platform (Go) (pull_request) Has been cancelled
CI / Canvas (Next.js) (pull_request) Has been cancelled
CI / Shellcheck (E2E scripts) (pull_request) Has been cancelled
CI / Python Lint & Test (pull_request) Has been cancelled
E2E API Smoke Test / detect-changes (pull_request) Has been cancelled
2026-06-01 04:21:00 +00:00
Molecule AI Dev Engineer A (Kimi) d0c8dd8be8 fix(errcheck): log ignored errors in channels, telegram, approvals
Apply unchecked-error fixes from errcheck audit (#1062):
- channels/manager.go: log db.ExecContext and broadcaster errors in HandleInbound/SendOutbound; return json.Unmarshal errors in loadChannel
- channels/telegram.go: log bot.Send errors for callback ack and edit message
- handlers/approvals.go: log broadcaster, db.QueryRowContext, and db.ExecContext errors

Improves observability when secondary operations fail silently.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 1d37c0c44b fix(bundle): check errors in markFailed and provision URL update
Add error checking + logging to previously ignored db.ExecContext and
broadcaster.RecordAndBroadcast calls in bundle importer. Improves
observability when provision failures or URL updates fail silently.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) aebaef07dd fix(time.After): replace time.After with NewTimer/NewTicker in loops
Prevents timer accumulation in long-running loops:
- restart_context.go: poll loop uses NewTicker instead of time.After
- supervised.go: backoff sleep uses NewTimer with proper Stop
- telegram.go: poll loop uses NewTimer for both retryAfter and poll interval

Each time.After created a timer that couldn't be GC'd until it fired.
In long-running goroutines this caused bounded but unnecessary memory
growth.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 3d295309b0 fix(restart_context): use NewTicker instead of time.After in poll loop
Prevents bounded timer accumulation during the 30s workspace-online wait.
Each time.After created a timer that couldn't be GC'd until it fired;
with ~60 iterations this was minor but unnecessary.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 3018fdee2f fix(goroutines): recover panics in background and fire-and-forget goroutines
Add panic recovery to goroutines that are NOT wrapped by supervised.RunWithRecover:
- session_auth.go: init() cache sweeper (process-level background task)
- ratelimit.go + mcp_ratelimit.go: bucket cleanup tickers
- bundle/importer.go: fire-and-forget provision start during import
- plugins_install.go: delayed restart after plugin uninstall
- terminal.go: WebSocket bridge goroutines (stdout, PTY, stdin)
- a2a_proxy.go: SSE idle watcher

A panic in any of these would previously crash the process or terminate
a subsystem silently. Now they log and swallow the panic, preserving
process stability.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) a5f8a6a4e0 fix(mcp_tools): recover panics in async delegate_task goroutine 2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 88acde1197 fix(channels): recover panics in async webhook HandleInbound goroutine
The async goroutine that processes inbound webhooks had no recover().
A panic inside HandleInbound (e.g., from the A2A proxy or adapter layer)
would crash the entire platform process. Add defer recover() with
workspace-scoped logging so the goroutine exits cleanly and the process
stays alive.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) 1a7402a8aa fix(context): defer rows.Close in restart_context secret key queries
Two QueryContext calls in restart_context used explicit rows.Close()
after the loop. If a panic or early return occurred during iteration,
the underlying connection would leak back to the pool with an active
result set. Switch to defer rows.Close() inside each if-block for
 RAII-style cleanup.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) 9526ca537e fix(tx): add deferred rollback guard to workspace creation transaction
workspace.go BeginTx had explicit Rollback() on every error path but
no deferred safety net. A panic (or future refactor adding an early
return) would leak the tx hold. Add defer tx.Rollback() immediately
after BeginTx — harmless no-op after Commit, critical safety net on
unexpected exits.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) 7a7eafa991 fix(context): propagate ctx to DB queries in provision + discovery
Two production code paths used context-less sql.DB methods:

1. buildProvisionerConfig (workspace_provision.go:263) called db.DB.QueryRow
   instead of QueryRowContext(ctx, ...) — losing request cancellation.

2. queryPeerMaps (discovery.go:307) called db.DB.Query instead of
   QueryContext(ctx, ...) and did not accept a context parameter. Updated
   signature and all 4 call sites in Peers handler to pass ctx.

No behavioral change for happy path; fixes context-timeout hygiene so
slow queries can be cancelled when the HTTP client disconnects.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) beb65b6c5c fix(http): replace http.DefaultClient in auth-adjacent paths
Replaces http.DefaultClient in MCP bridge (mcp_tools.go) and CP config
fetch (cp_config.go) with dedicated clients that have transport-level
timeouts. Eliminates a fragility class where global mutable client +
missing context timeout could hang forever on dead workspaces or slow CP.

- mcpHTTPClient: 5s dial, 30s response header, 5s TLS handshake
- cp_config fetch: 10s client timeout matching context deadline

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:53 +00:00
Molecule AI Dev Engineer A (Kimi) dbbfb52cbd review: 5-axis review of PR #3029 and PR #3033
- PR #3029: approve with nit (CP orphan sweeper + registry prefix).
- PR #3033: approve with blocker — README undercounts runtimes by 1
  because #3029 adds codex but #3033 claims 8 runtimes and omits it.
- All tests pass locally.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:53 +00:00
20 changed files with 299 additions and 56 deletions
+130
View File
@@ -0,0 +1,130 @@
# 5-Axis Review: PR #3029 (fix #2989) + PR #3033 (docs refresh)
**Reviewer:** Kimi / Engineer-A
**Date:** 2026-05-31
**Scope:** Local review (CR2 auth-down, filling review gap per PM dispatch)
---
## PR #3029 — CP orphan sweeper + registry prefix abstraction
### Correctness ✅ (with 1 semantic conflict to resolve)
**cp_orphan_sweeper.go** — The deprovision split-write race fix is sound:
- SELECT `status='removed' AND instance_id IS NOT NULL AND instance_id != ''` correctly targets leaked EC2s.
- Stop → clear instance_id is idempotent; on Stop failure the row stays targeted for retry.
- `ORDER BY updated_at DESC` + `LIMIT $1` + `UPDATE updated_at = now()` creates fair round-robin drain across cycles.
- `supervised.RunWithRecover` wiring in `cmd/server/main.go` mirrors the Docker sweeper pattern.
**provisioner/registry.go** — Clean env-driven prefix abstraction:
- `RegistryPrefix()` respects `MOLECULE_IMAGE_REGISTRY` override; falls back to GHCR OSS default.
- `RuntimeImage()` returns `""` for unknown runtimes, forcing explicit fallback at call sites.
- `computeRuntimeImages()` runs at init; captures prefix active at boot.
** provisioner.go migration** — Hardcoded map → `computeRuntimeImages()` is a safe refactor; no behavioral change for OSS default.
**admin_workspace_images.go**`TemplateImageRef()` now uses `provisioner.RegistryPrefix()`; keeps admin ops and provisioner pulls consistent.
### Security ✅
- Sweeper SQL has no user-input surface; parameters are internal LIMIT constant and DB-generated IDs.
- `RegistryPrefix()` reads env only; comment correctly notes it is deploy-time trusted (operator-set, not user-supplied).
- No new secrets, auth tokens, or credential exposure.
### Performance ✅
- 60s tick / 30s deadline / LIMIT 100 is conservative and safe.
- Sequential Stop calls share the 30s parent context; with typical CP DELETE latency (<1s), 100 orphans finish well within budget.
- If CP is degraded, deadline expires, UPDATEs don't fire, and next cycle retries — no stampede.
### Style / Readability ✅
- Excellent docstrings; the `#2989` race narrative is clearly documented for future maintainers.
- `CPOrphanReaper` interface is minimal and testable.
- Nil-reaper and nil-DB guards follow existing patterns.
- One minor nit: `cpSweepOnce` could return `[]string` of processed IDs to make post-hoc assertions easier, but the fake-reaper test pattern works fine as-is.
### Tests ✅ (excellent coverage)
| Scenario | Covered |
|---|---|
| Happy path: Stop succeeds, instance_id cleared | ✅ |
| Stop fails, instance_id retained for retry | ✅ |
| Empty result set (steady state) | ✅ |
| Multiple orphans, partial failure, others proceed | ✅ |
| DB query error (transient) | ✅ |
| UPDATE error after Stop success (logs, continues) | ✅ |
| Nil db.DB (defensive boot safety) | ✅ |
| Nil reaper (disabled, no goroutine leak) | ✅ |
| Boot sweep + tick cadence + ctx cancel | ✅ |
| Registry prefix default / env override / empty env | ✅ |
| Runtime image format for all known runtimes | ✅ |
| Unknown runtime returns `""` | ✅ |
| Registry override applies to ALL runtimes | ✅ |
| Alphabetical order pin | ✅ |
**All tests pass:**
```
ok github.com/.../internal/registry 0.107s (9/9 CP sweeper tests)
ok github.com/.../internal/provisioner 0.009s (7/7 registry tests)
```
### ⚠️ BLOCKER: Semantic conflict with PR #3033
`registry.go` adds `"codex"` to `knownRuntimes`, making **9** production runtimes:
```go
knownRuntimes = []string{
"autogen", "claude-code", "codex", "crewai", "deepagents",
"gemini-cli", "hermes", "langgraph", "openclaw",
}
```
PR #3033 updates the README to claim **eight** production runtimes and explicitly lists:
> Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw
`codex` is absent from the README compatibility table, the "What Ships In main" section, and the architecture diagram list. After both PRs merge, the code will support 9 runtimes but the docs will claim 8 — a public-facing drift.
**Fix path:** Add `codex` to the README runtime list in PR #3033 (or a fast-follow) so the count and table stay accurate. `codex` already exists in `manifest.json` and has a template repo, so it is legitimate to list as "shipping on main."
---
## PR #3033 — Docs refresh (README + branding assets)
### Correctness ✅ (with 1 semantic drift pending)
- Terminology standardization ("adapters" → "runtimes") is correct and consistent with platform usage.
- Deploy buttons updated from `molecule-monorepo``molecule-core`.
- Canvas v4, Memory v2, SaaS surface, RFC #2967 mentions are all factually accurate.
- **Missing:** `codex` runtime (see blocker above).
### Security ✅
- SVG assets are static branding; no scripts, no external references beyond the existing `<style>` media query.
- No auth or credential surface touched.
### Performance N/A
- Docs-only; no runtime impact.
### Style / Readability ✅
- warm-paper theme description is concise and helpful.
- Architecture diagram update (Docker → EC2 + SSM, KMS, SaaS CP) is accurate.
- Quick Start clone URL fixed.
### Tests N/A
- No code changes; no test delta.
---
## Summary
| PR | Verdict | Action needed |
|---|---|---|
| #3029 | **Approve with nit** | Merge-ready after confirming #3033 (or follow-up) adds `codex` to README runtime list. |
| #3033 | **Approve with blocker** | Add `codex` to the 8-runtimes list (making 9) and to the compatibility table before merge. |
**Risk if both merge as-is:** Public docs understate runtime count by 1; operators reading README may think `codex` is not supported when the provisioner already knows about it.
**Recommended merge order:** #3029 first (adds runtime support), then #3033 with `codex` line added (docs catch up).
+2 -1
View File
@@ -60,7 +60,8 @@ func refreshEnvFromCP() error {
req.Header.Set("Authorization", "Bearer "+adminToken)
req.Header.Set("X-Molecule-Org-Id", orgID)
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
+17 -5
View File
@@ -3,6 +3,7 @@ package bundle
import (
"context"
"fmt"
"log"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@@ -86,13 +87,20 @@ func Import(
// PluginsPath set by caller if available
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("bundle/importer: PANIC during provision start for %s: %v", wsID, r)
}
}()
provCtx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
url, err := prov.Start(provCtx, cfg)
if err != nil {
markFailed(provCtx, wsID, broadcaster, err)
} else if url != "" {
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
}
}
}()
}
@@ -139,12 +147,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
// markProvisionFailed in workspace-server/internal/handlers/
// workspace_provision_shared.go.
msg := err.Error()
db.DB.ExecContext(ctx,
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
models.StatusFailed, msg, wsID)
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
models.StatusFailed, msg, wsID); dbErr != nil {
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
}
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
"error": msg,
})
}); bcErr != nil {
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
}
}
func nilIfEmpty(s string) interface{} {
+26 -11
View File
@@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
// Update stats in DB
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, ch.ID)
`, ch.ID); err != nil {
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
}
}
// Broadcast event
if m.broadcaster != nil {
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"username": msg.Username,
"direction": "inbound",
})
}); err != nil {
log.Printf("Channels: failed to broadcast inbound event: %v", err)
}
}
return nil
@@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
}
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, channelID)
`, channelID); err != nil {
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
}
}
if m.broadcaster != nil {
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"direction": "outbound",
})
}); err != nil {
log.Printf("Channels: failed to broadcast outbound event: %v", err)
}
}
return nil
@@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
return ""
}
var config map[string]interface{}
json.Unmarshal(configJSON, &config)
if err := json.Unmarshal(configJSON, &config); err != nil {
log.Printf("Channels: failed to unmarshal channel config: %v", err)
return ""
}
if err := DecryptSensitiveFields(config); err != nil {
return ""
}
@@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
if err != nil {
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
}
json.Unmarshal(configJSON, &ch.Config)
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
}
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
}
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
// methods downstream read them as plaintext strings.
if err := DecryptSensitiveFields(ch.Config); err != nil {
+12 -4
View File
@@ -482,10 +482,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
if apiErr.Code == 429 {
retryAfter := time.Duration(apiErr.RetryAfter) * time.Second
log.Printf("Channels: Telegram poll rate-limited, sleeping %s", retryAfter)
timer := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-time.After(retryAfter):
case <-timer.C:
continue
}
}
@@ -495,10 +497,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
}
}
log.Printf("Channels: Telegram poll error: %v", err)
timer := time.NewTimer(telegramPollInterval)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-time.After(telegramPollInterval):
case <-timer.C:
continue
}
}
@@ -513,7 +517,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
// Acknowledge the button press (removes loading spinner)
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
bot.Send(ackCfg)
if _, err := bot.Send(ackCfg); err != nil {
log.Printf("telegram: failed to send callback ack: %v", err)
}
// Update the message to show what was clicked
decision := "approved"
@@ -525,7 +531,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
cb.Message.MessageID,
cb.Message.Text+"\n\n✅ CEO "+decision,
)
bot.Send(editMsg)
if _, err := bot.Send(editMsg); err != nil {
log.Printf("telegram: failed to send edit message: %v", err)
}
// Route the decision as an inbound message to the agent
inbound := &InboundMessage{
@@ -932,7 +932,12 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
ctx, cancel := context.WithCancel(parent)
sub, unsub := b.SubscribeSSE(workspaceID)
go func() {
defer unsub()
defer func() {
if r := recover(); r != nil {
log.Printf("a2a_proxy: PANIC in SSE idle watcher for %s: %v", workspaceID, r)
}
unsub()
}()
timer := time.NewTimer(idle)
defer timer.Stop()
for {
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
@@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
return
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
"approval_id": approvalID,
"action": body.Action,
"reason": body.Reason,
"task_id": body.TaskID,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval requested: %v", err)
}
// Auto-escalate to parent
var parentID *string
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
}
if parentID != nil {
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
"approval_id": approvalID,
"from_workspace_id": workspaceID,
"action": body.Action,
"reason": body.Reason,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
}
}
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
@@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
ctx := c.Request.Context()
// Auto-expire stale approvals (older than 10 min)
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
`)
`); err != nil {
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
}
rows, err := db.DB.QueryContext(ctx, `
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
@@ -211,11 +219,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
eventType = "APPROVAL_DENIED"
}
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
"approval_id": approvalID,
"decision": body.Decision,
"decided_by": decidedBy,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval decision: %v", err)
}
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
}
@@ -558,6 +558,11 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
// Process asynchronously — don't block the webhook response
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Channels: PANIC in async HandleInbound for workspace %s: %v", ch.WorkspaceID[:12], r)
}
}()
bgCtx := context.Background()
if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil {
log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err)
@@ -239,7 +239,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
// Siblings
if parentID.Valid {
siblings, _ := queryPeerMaps(`
siblings, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -247,7 +247,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
parentID.String, workspaceID)
peers = append(peers, siblings...)
} else {
siblings, _ := queryPeerMaps(`
siblings, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -257,7 +257,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
}
// Children
children, _ := queryPeerMaps(`
children, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -266,7 +266,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
// Parent
if parentID.Valid {
parent, _ := queryPeerMaps(`
parent, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -303,8 +303,8 @@ func filterPeersByQuery(peers []map[string]interface{}, q string) []map[string]i
}
// queryPeerMaps returns clean JSON-serializable maps instead of Workspace structs.
func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{}, error) {
rows, err := db.DB.Query(query, args...)
func queryPeerMaps(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) {
rows, err := db.DB.QueryContext(ctx, query, args...)
if err != nil {
log.Printf("queryPeerMaps error: %v", err)
return nil, err
@@ -15,6 +15,7 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"strings"
@@ -54,6 +55,22 @@ func updateMCPDelegationStatus(ctx context.Context, db *sql.DB, workspaceID, del
}
}
// ─────────────────────────────────────────────────────────────────────────────
// mcpHTTPClient is a dedicated client for MCP bridge A2A calls.
// Per-request deadlines are enforced via context (30 s sync, 8 s async).
// Transport-level timeouts ensure dead workspaces fail fast instead of
// hanging on OS default TCP timeouts (~75 s Linux).
var mcpHTTPClient = &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 30 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
},
}
// ─────────────────────────────────────────────────────────────────────────────
// Tool implementations
// ─────────────────────────────────────────────────────────────────────────────
@@ -231,7 +248,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
// so this header reflects a verified caller identity, not a spoofable value.
httpReq.Header.Set("X-Workspace-ID", callerID)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := mcpHTTPClient.Do(httpReq)
if err != nil {
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", err.Error())
return "", fmt.Errorf("A2A call failed: %w", err)
@@ -279,6 +296,11 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
// Fire and forget in a detached goroutine. Use a background context so
// the call is not cancelled when the HTTP request completes.
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("MCPHandler.delegate_task_async: PANIC for %s → %s: %v", callerID, targetID, r)
}
}()
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
defer cancel()
@@ -314,7 +336,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-Workspace-ID", callerID)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := mcpHTTPClient.Do(httpReq)
if err != nil {
log.Printf("MCPHandler.delegate_task_async: A2A call to %s: %v", targetID, err)
return
@@ -201,6 +201,11 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
// Auto-restart (small delay to ensure fs writes are flushed)
if h.restartFunc != nil {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("plugins_install: PANIC in delayed restart for %s: %v", workspaceID, r)
}
}()
time.Sleep(2 * time.Second)
h.restartFunc(workspaceID)
}()
@@ -133,24 +133,24 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// message bus.
keySet := map[string]struct{}{}
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
}
if rows, err := db.DB.QueryContext(ctx,
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
}
for k := range keySet {
d.EnvKeys = append(d.EnvKeys, k)
@@ -163,6 +163,8 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// workspace's status flips to 'online' or the deadline expires.
// Returns true on success; callers log+drop on false.
func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout time.Duration) bool {
ticker := time.NewTicker(restartContextOnlinePollInterval)
defer ticker.Stop()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
var status string
@@ -174,7 +176,7 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
select {
case <-ctx.Done():
return false
case <-time.After(restartContextOnlinePollInterval):
case <-ticker.C:
}
}
return false
+17 -2
View File
@@ -213,7 +213,12 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
// Bridge: container stdout → WebSocket
done := make(chan struct{})
go func() {
defer close(done)
defer func() {
if r := recover(); r != nil {
log.Printf("Terminal: PANIC in stdout bridge: %v", r)
}
close(done)
}()
buf := make([]byte, 4096)
for {
n, err := resp.Reader.Read(buf)
@@ -434,7 +439,12 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
// PTY → WebSocket
go func() {
defer close(done)
defer func() {
if r := recover(); r != nil {
log.Printf("Terminal: PANIC in PTY bridge: %v", r)
}
close(done)
}()
buf := make([]byte, 4096)
for {
n, err := ptmx.Read(buf)
@@ -456,6 +466,11 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
// WebSocket → PTY (stdin)
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Terminal: PANIC in stdin loop: %v", r)
}
}()
for {
_, msg, rErr := conn.ReadMessage()
if rErr != nil {
@@ -296,6 +296,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create workspace"})
return
}
defer func() { _ = tx.Rollback() }()
maxConcurrent := payload.MaxConcurrentTasks
if maxConcurrent <= 0 {
@@ -478,7 +479,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// from the external agent (with this token + its URL)
// flips the row to online.
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id); err != nil {
log.Printf("External workspace %s: status update failed: %v", id, err)
}
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
if tokErr != nil {
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
@@ -261,7 +261,7 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
workspaceAccess := payload.WorkspaceAccess
if (workspacePath == "" || workspaceAccess == "") && db.DB != nil {
var dbDir, dbAccess string
if err := db.DB.QueryRow(
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none') FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&dbDir, &dbAccess); err == nil {
@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"log"
"net/http"
"strconv"
"strings"
@@ -41,6 +42,11 @@ func NewMCPRateLimiter(rate int, interval time.Duration, ctx context.Context) *M
interval: interval,
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("mcp_ratelimit: PANIC in bucket cleanup: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
@@ -3,6 +3,7 @@ package middleware
import (
"context"
"log"
"net/http"
"strconv"
"strings"
@@ -35,6 +36,11 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
interval: interval,
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("ratelimit: PANIC in bucket cleanup: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
@@ -116,6 +116,11 @@ func sessionCachePut(key string, ok bool) {
func init() {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("session_auth: PANIC in cache sweeper: %v", r)
}
}()
// Jitter startup so restarts don't align sweeps.
time.Sleep(time.Duration(rand.Int64N(int64(sessionCacheSweepEvery))))
t := time.NewTicker(sessionCacheSweepEvery)
@@ -60,10 +60,12 @@ func RunWithRecover(ctx context.Context, name string, fn func(context.Context))
}
// Panic → back off and restart.
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return
case <-time.After(backoff):
case <-timer.C:
}
if backoff < maxBackoff {
backoff *= 2