Compare commits

..

14 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) 85f8d78ac0 ci: remove unused canvasUserMessage type to fix lint on staging
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 10s
CI / Detect changes (pull_request) Successful in 8s
CI / Canvas (Next.js) (pull_request) Failing after 49s
CI / all-required (pull_request) Failing after 6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 39s
E2E API Smoke Test / detect-changes (pull_request) Successful in 10s
E2E Chat / detect-changes (pull_request) Successful in 13s
CI / Platform (Go) (pull_request) Successful in 6m9s
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
Handlers Postgres Integration / detect-changes (pull_request) Successful in 6s
Harness Replays / detect-changes (pull_request) Successful in 6s
CI / Python Lint & Test (pull_request) Successful in 7m15s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 10s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m26s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 8s
qa-review / approved (pull_request_target) Successful in 10s
security-review / approved (pull_request_target) Successful in 8s
Harness Replays / Harness Replays (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m55s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 31s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m42s
E2E Chat / E2E Chat (pull_request) Failing after 7m58s
sop-checklist / review-refire (pull_request_target) Has been skipped
gate-check-v3 / gate-check (pull_request_target) Successful in 3s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 3s
sop-tier-check / tier-check (pull_request_target) Successful in 3s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 1m10s
audit-force-merge / audit (pull_request_target) Has been skipped
internal/handlers/a2a_proxy_helpers.go:412 had an unused struct that
causes golangci-lint `unused` failure on every PR targeting staging.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 12:40:13 +00:00
Molecule AI Dev Engineer A (Kimi) a7c3f6b7ed fix(workspace): check error on external workspace status update
cascade-list-drift-gate / check (pull_request) Waiting to run
Check migration collisions / Migration version collision check (pull_request) Waiting to run
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Waiting to run
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Waiting to run
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Waiting to run
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Waiting to run
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Waiting to run
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Waiting to run
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Waiting to run
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Waiting to run
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Waiting to run
publish-runtime-autobump / pr-validate (pull_request) Waiting to run
publish-runtime-autobump / bump-and-tag (pull_request) Waiting to run
review-check-tests / review-check.sh regression tests (pull_request) Waiting to run
Runtime Pin Compatibility / PyPI-latest install + import smoke (pull_request) Waiting to run
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Waiting to run
sop-checklist / review-refire (pull_request) Waiting to run
audit-force-merge / audit (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 13s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 2m12s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 16s
gate-check-v3 / gate-check (pull_request) Successful in 18s
qa-review / approved (pull_request) Successful in 11s
sop-checklist / na-declarations (pull_request) N/A: (none)
security-review / approved (pull_request) Successful in 18s
sop-checklist / all-items-acked (pull_request) Successful in 12s
sop-tier-check / tier-check (pull_request) Successful in 15s
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
E2E API Smoke Test / E2E API Smoke Test (pull_request) Has been cancelled
E2E Chat / E2E Chat (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Has been cancelled
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Has been cancelled
Harness Replays / Harness Replays (pull_request) Has been cancelled
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Has been cancelled
E2E Chat / detect-changes (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Has been cancelled
Handlers Postgres Integration / detect-changes (pull_request) Has been cancelled
Harness Replays / detect-changes (pull_request) Has been cancelled
Runtime PR-Built Compatibility / detect-changes (pull_request) Has been cancelled
CI / all-required (pull_request) Has been cancelled
CI / Detect changes (pull_request) Has been cancelled
CI / Platform (Go) (pull_request) Has been cancelled
CI / Canvas (Next.js) (pull_request) Has been cancelled
CI / Shellcheck (E2E scripts) (pull_request) Has been cancelled
CI / Python Lint & Test (pull_request) Has been cancelled
E2E API Smoke Test / detect-changes (pull_request) Has been cancelled
2026-06-01 04:21:00 +00:00
Molecule AI Dev Engineer A (Kimi) d0c8dd8be8 fix(errcheck): log ignored errors in channels, telegram, approvals
Apply unchecked-error fixes from errcheck audit (#1062):
- channels/manager.go: log db.ExecContext and broadcaster errors in HandleInbound/SendOutbound; return json.Unmarshal errors in loadChannel
- channels/telegram.go: log bot.Send errors for callback ack and edit message
- handlers/approvals.go: log broadcaster, db.QueryRowContext, and db.ExecContext errors

Improves observability when secondary operations fail silently.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 1d37c0c44b fix(bundle): check errors in markFailed and provision URL update
Add error checking + logging to previously ignored db.ExecContext and
broadcaster.RecordAndBroadcast calls in bundle importer. Improves
observability when provision failures or URL updates fail silently.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) aebaef07dd fix(time.After): replace time.After with NewTimer/NewTicker in loops
Prevents timer accumulation in long-running loops:
- restart_context.go: poll loop uses NewTicker instead of time.After
- supervised.go: backoff sleep uses NewTimer with proper Stop
- telegram.go: poll loop uses NewTimer for both retryAfter and poll interval

Each time.After created a timer that couldn't be GC'd until it fired.
In long-running goroutines this caused bounded but unnecessary memory
growth.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 3d295309b0 fix(restart_context): use NewTicker instead of time.After in poll loop
Prevents bounded timer accumulation during the 30s workspace-online wait.
Each time.After created a timer that couldn't be GC'd until it fired;
with ~60 iterations this was minor but unnecessary.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 3018fdee2f fix(goroutines): recover panics in background and fire-and-forget goroutines
Add panic recovery to goroutines that are NOT wrapped by supervised.RunWithRecover:
- session_auth.go: init() cache sweeper (process-level background task)
- ratelimit.go + mcp_ratelimit.go: bucket cleanup tickers
- bundle/importer.go: fire-and-forget provision start during import
- plugins_install.go: delayed restart after plugin uninstall
- terminal.go: WebSocket bridge goroutines (stdout, PTY, stdin)
- a2a_proxy.go: SSE idle watcher

A panic in any of these would previously crash the process or terminate
a subsystem silently. Now they log and swallow the panic, preserving
process stability.
2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) a5f8a6a4e0 fix(mcp_tools): recover panics in async delegate_task goroutine 2026-06-01 04:20:02 +00:00
Molecule AI Dev Engineer A (Kimi) 88acde1197 fix(channels): recover panics in async webhook HandleInbound goroutine
The async goroutine that processes inbound webhooks had no recover().
A panic inside HandleInbound (e.g., from the A2A proxy or adapter layer)
would crash the entire platform process. Add defer recover() with
workspace-scoped logging so the goroutine exits cleanly and the process
stays alive.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) 1a7402a8aa fix(context): defer rows.Close in restart_context secret key queries
Two QueryContext calls in restart_context used explicit rows.Close()
after the loop. If a panic or early return occurred during iteration,
the underlying connection would leak back to the pool with an active
result set. Switch to defer rows.Close() inside each if-block for
 RAII-style cleanup.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) 9526ca537e fix(tx): add deferred rollback guard to workspace creation transaction
workspace.go BeginTx had explicit Rollback() on every error path but
no deferred safety net. A panic (or future refactor adding an early
return) would leak the tx hold. Add defer tx.Rollback() immediately
after BeginTx — harmless no-op after Commit, critical safety net on
unexpected exits.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) 7a7eafa991 fix(context): propagate ctx to DB queries in provision + discovery
Two production code paths used context-less sql.DB methods:

1. buildProvisionerConfig (workspace_provision.go:263) called db.DB.QueryRow
   instead of QueryRowContext(ctx, ...) — losing request cancellation.

2. queryPeerMaps (discovery.go:307) called db.DB.Query instead of
   QueryContext(ctx, ...) and did not accept a context parameter. Updated
   signature and all 4 call sites in Peers handler to pass ctx.

No behavioral change for happy path; fixes context-timeout hygiene so
slow queries can be cancelled when the HTTP client disconnects.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:58 +00:00
Molecule AI Dev Engineer A (Kimi) beb65b6c5c fix(http): replace http.DefaultClient in auth-adjacent paths
Replaces http.DefaultClient in MCP bridge (mcp_tools.go) and CP config
fetch (cp_config.go) with dedicated clients that have transport-level
timeouts. Eliminates a fragility class where global mutable client +
missing context timeout could hang forever on dead workspaces or slow CP.

- mcpHTTPClient: 5s dial, 30s response header, 5s TLS handshake
- cp_config fetch: 10s client timeout matching context deadline

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:53 +00:00
Molecule AI Dev Engineer A (Kimi) dbbfb52cbd review: 5-axis review of PR #3029 and PR #3033
- PR #3029: approve with nit (CP orphan sweeper + registry prefix).
- PR #3033: approve with blocker — README undercounts runtimes by 1
  because #3029 adds codex but #3033 claims 8 runtimes and omits it.
- All tests pass locally.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:19:53 +00:00
20 changed files with 305 additions and 53 deletions
+6 -6
View File
@@ -53,7 +53,7 @@ Molecule AI is the most powerful way to govern an AI agent organization in produ
It combines the parts that are usually scattered across demos, internal glue code, and framework-specific tooling into one product:
- one org-native control plane for teams, roles, hierarchy, and lifecycle
- one runtime layer that lets **nine** agent runtimes — LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, **Hermes**, **Gemini CLI**, OpenClaw, and **Codex** — run side by side behind one workspace contract
- one runtime layer that lets **eight** agent runtimes — LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, **Hermes**, **Gemini CLI**, and OpenClaw — run side by side behind one workspace contract
- one memory model that keeps recall, sharing, and skill evolution aligned with organizational boundaries (Memory v2 backed by pgvector for semantic recall)
- one operational surface for observing, pausing, restarting, inspecting, and improving live workspaces
@@ -75,7 +75,7 @@ You do not wire collaboration paths by hand. Hierarchy defines the default commu
### 3. Runtime choice stops being a dead-end decision
LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, Hermes, Gemini CLI, OpenClaw, and Codex can all plug into the same workspace abstraction. Teams can standardize governance without forcing every group onto one runtime.
LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, Hermes, Gemini CLI, and OpenClaw can all plug into the same workspace abstraction. Teams can standardize governance without forcing every group onto one runtime.
### 4. Memory is treated like infrastructure
@@ -120,7 +120,6 @@ Molecule AI is not trying to replace the frameworks below. It is the system that
| **Hermes 4** | Shipping on `main` | Hybrid reasoning, native tools, json_schema (NousResearch/hermes-agent) | Option B upstream hook, A2A bridge to OpenAI-compat API, multi-provider provider derivation |
| **Gemini CLI** | Shipping on `main` | Google Gemini CLI continuity | Workspace lifecycle, A2A, hierarchy-aware collaboration, shared ops plane |
| **OpenClaw** | Shipping on `main` | CLI-native runtime with its own session model | Workspace lifecycle, templates, activity logs, topology-aware collaboration |
| **Codex** | Shipping on `main` | OpenAI Codex CLI continuity | Workspace lifecycle, A2A, hierarchy-aware collaboration, shared ops plane |
| **NemoClaw** | WIP on `feat/nemoclaw-t4-docker` | NVIDIA-oriented runtime path | Planned to join the same abstraction once merged; not yet part of `main` |
This is the key idea: **many agent runtimes, one organizational operating system**.
@@ -210,7 +209,7 @@ The result is not just “an agent that learns.” It is **an organization that
### Runtime
- unified `workspace/` image; thin AMI in production (us-east-2)
- adapter-driven execution across **9 runtimes** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw, Codex)
- adapter-driven execution across **8 runtimes** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw)
- Agent Card registration
- awareness-backed memory integration; **Memory v2 backed by pgvector** for semantic recall
- plugin-mounted shared rules/skills
@@ -240,6 +239,7 @@ The result is not just “an agent that learns.” It is **an organization that
- no tunnel, no public endpoint — the plugin self-registers each watched workspace as `delivery_mode=poll` and long-polls `/activity?since_id=…`
- multi-tenant friendly: one plugin install can watch workspaces across multiple Molecule tenants (`MOLECULE_PLATFORM_URLS` per-workspace)
- install via the standard marketplace flow: `/plugin marketplace add Molecule-AI/molecule-mcp-claude-channel``/plugin install molecule-channel@molecule-mcp-claude-channel`
## Built For Teams That Need More Than A Demo
Molecule AI is especially strong when you need to run:
@@ -260,7 +260,7 @@ Canvas (Next.js 15, warm-paper :3000) <--HTTP / WS--> Platform (Go 1.25 :8080)
+------------------------- shows ------------------------> workspaces, teams, tasks, traces, events
Workspace Runtime (Python ≥3.11, image with adapters)
- 9 runtimes: LangGraph / DeepAgents / Claude Code / CrewAI / AutoGen / Hermes / Gemini CLI / OpenClaw / Codex
- 8 adapters: LangGraph / DeepAgents / Claude Code / CrewAI / AutoGen / Hermes / Gemini CLI / OpenClaw
- Agent Card + A2A server (typed-SSOT response path, RFC #2967)
- heartbeat + activity + awareness-backed memory (Memory v2 — pgvector semantic recall)
- skills + plugins + hot reload
@@ -328,7 +328,7 @@ Then open `http://localhost:3000`:
## Current Scope
The current `main` branch ships the core platform, Canvas v4 (warm-paper themed), Memory v2 (pgvector semantic recall), the typed-SSOT A2A response path (RFC #2967), **nine production runtimes** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw, Codex), skill lifecycle, and operational surfaces.
The current `main` branch ships the core platform, Canvas v4 (warm-paper themed), Memory v2 (pgvector semantic recall), the typed-SSOT A2A response path (RFC #2967), **eight production adapters** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw), skill lifecycle, and operational surfaces.
The companion private repo [`molecule-controlplane`](https://git.moleculesai.app/molecule-ai/molecule-controlplane) provides the SaaS surface — multi-tenant orchestration on EC2 + Neon + Cloudflare Tunnels, KMS envelope encryption, WorkOS auth, Stripe billing, and a `tenant_resources` audit table with a 30-min reconciler.
+130
View File
@@ -0,0 +1,130 @@
# 5-Axis Review: PR #3029 (fix #2989) + PR #3033 (docs refresh)
**Reviewer:** Kimi / Engineer-A
**Date:** 2026-05-31
**Scope:** Local review (CR2 auth-down, filling review gap per PM dispatch)
---
## PR #3029 — CP orphan sweeper + registry prefix abstraction
### Correctness ✅ (with 1 semantic conflict to resolve)
**cp_orphan_sweeper.go** — The deprovision split-write race fix is sound:
- SELECT `status='removed' AND instance_id IS NOT NULL AND instance_id != ''` correctly targets leaked EC2s.
- Stop → clear instance_id is idempotent; on Stop failure the row stays targeted for retry.
- `ORDER BY updated_at DESC` + `LIMIT $1` + `UPDATE updated_at = now()` creates fair round-robin drain across cycles.
- `supervised.RunWithRecover` wiring in `cmd/server/main.go` mirrors the Docker sweeper pattern.
**provisioner/registry.go** — Clean env-driven prefix abstraction:
- `RegistryPrefix()` respects `MOLECULE_IMAGE_REGISTRY` override; falls back to GHCR OSS default.
- `RuntimeImage()` returns `""` for unknown runtimes, forcing explicit fallback at call sites.
- `computeRuntimeImages()` runs at init; captures prefix active at boot.
** provisioner.go migration** — Hardcoded map → `computeRuntimeImages()` is a safe refactor; no behavioral change for OSS default.
**admin_workspace_images.go**`TemplateImageRef()` now uses `provisioner.RegistryPrefix()`; keeps admin ops and provisioner pulls consistent.
### Security ✅
- Sweeper SQL has no user-input surface; parameters are internal LIMIT constant and DB-generated IDs.
- `RegistryPrefix()` reads env only; comment correctly notes it is deploy-time trusted (operator-set, not user-supplied).
- No new secrets, auth tokens, or credential exposure.
### Performance ✅
- 60s tick / 30s deadline / LIMIT 100 is conservative and safe.
- Sequential Stop calls share the 30s parent context; with typical CP DELETE latency (<1s), 100 orphans finish well within budget.
- If CP is degraded, deadline expires, UPDATEs don't fire, and next cycle retries — no stampede.
### Style / Readability ✅
- Excellent docstrings; the `#2989` race narrative is clearly documented for future maintainers.
- `CPOrphanReaper` interface is minimal and testable.
- Nil-reaper and nil-DB guards follow existing patterns.
- One minor nit: `cpSweepOnce` could return `[]string` of processed IDs to make post-hoc assertions easier, but the fake-reaper test pattern works fine as-is.
### Tests ✅ (excellent coverage)
| Scenario | Covered |
|---|---|
| Happy path: Stop succeeds, instance_id cleared | ✅ |
| Stop fails, instance_id retained for retry | ✅ |
| Empty result set (steady state) | ✅ |
| Multiple orphans, partial failure, others proceed | ✅ |
| DB query error (transient) | ✅ |
| UPDATE error after Stop success (logs, continues) | ✅ |
| Nil db.DB (defensive boot safety) | ✅ |
| Nil reaper (disabled, no goroutine leak) | ✅ |
| Boot sweep + tick cadence + ctx cancel | ✅ |
| Registry prefix default / env override / empty env | ✅ |
| Runtime image format for all known runtimes | ✅ |
| Unknown runtime returns `""` | ✅ |
| Registry override applies to ALL runtimes | ✅ |
| Alphabetical order pin | ✅ |
**All tests pass:**
```
ok github.com/.../internal/registry 0.107s (9/9 CP sweeper tests)
ok github.com/.../internal/provisioner 0.009s (7/7 registry tests)
```
### ⚠️ BLOCKER: Semantic conflict with PR #3033
`registry.go` adds `"codex"` to `knownRuntimes`, making **9** production runtimes:
```go
knownRuntimes = []string{
"autogen", "claude-code", "codex", "crewai", "deepagents",
"gemini-cli", "hermes", "langgraph", "openclaw",
}
```
PR #3033 updates the README to claim **eight** production runtimes and explicitly lists:
> Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw
`codex` is absent from the README compatibility table, the "What Ships In main" section, and the architecture diagram list. After both PRs merge, the code will support 9 runtimes but the docs will claim 8 — a public-facing drift.
**Fix path:** Add `codex` to the README runtime list in PR #3033 (or a fast-follow) so the count and table stay accurate. `codex` already exists in `manifest.json` and has a template repo, so it is legitimate to list as "shipping on main."
---
## PR #3033 — Docs refresh (README + branding assets)
### Correctness ✅ (with 1 semantic drift pending)
- Terminology standardization ("adapters" → "runtimes") is correct and consistent with platform usage.
- Deploy buttons updated from `molecule-monorepo``molecule-core`.
- Canvas v4, Memory v2, SaaS surface, RFC #2967 mentions are all factually accurate.
- **Missing:** `codex` runtime (see blocker above).
### Security ✅
- SVG assets are static branding; no scripts, no external references beyond the existing `<style>` media query.
- No auth or credential surface touched.
### Performance N/A
- Docs-only; no runtime impact.
### Style / Readability ✅
- warm-paper theme description is concise and helpful.
- Architecture diagram update (Docker → EC2 + SSM, KMS, SaaS CP) is accurate.
- Quick Start clone URL fixed.
### Tests N/A
- No code changes; no test delta.
---
## Summary
| PR | Verdict | Action needed |
|---|---|---|
| #3029 | **Approve with nit** | Merge-ready after confirming #3033 (or follow-up) adds `codex` to README runtime list. |
| #3033 | **Approve with blocker** | Add `codex` to the 8-runtimes list (making 9) and to the compatibility table before merge. |
**Risk if both merge as-is:** Public docs understate runtime count by 1; operators reading README may think `codex` is not supported when the provisioner already knows about it.
**Recommended merge order:** #3029 first (adds runtime support), then #3033 with `codex` line added (docs catch up).
+2 -1
View File
@@ -60,7 +60,8 @@ func refreshEnvFromCP() error {
req.Header.Set("Authorization", "Bearer "+adminToken)
req.Header.Set("X-Molecule-Org-Id", orgID)
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
+17 -5
View File
@@ -3,6 +3,7 @@ package bundle
import (
"context"
"fmt"
"log"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@@ -86,13 +87,20 @@ func Import(
// PluginsPath set by caller if available
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("bundle/importer: PANIC during provision start for %s: %v", wsID, r)
}
}()
provCtx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
url, err := prov.Start(provCtx, cfg)
if err != nil {
markFailed(provCtx, wsID, broadcaster, err)
} else if url != "" {
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
}
}
}()
}
@@ -139,12 +147,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
// markProvisionFailed in workspace-server/internal/handlers/
// workspace_provision_shared.go.
msg := err.Error()
db.DB.ExecContext(ctx,
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
models.StatusFailed, msg, wsID)
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
models.StatusFailed, msg, wsID); dbErr != nil {
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
}
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
"error": msg,
})
}); bcErr != nil {
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
}
}
func nilIfEmpty(s string) interface{} {
+26 -11
View File
@@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
// Update stats in DB
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, ch.ID)
`, ch.ID); err != nil {
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
}
}
// Broadcast event
if m.broadcaster != nil {
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"username": msg.Username,
"direction": "inbound",
})
}); err != nil {
log.Printf("Channels: failed to broadcast inbound event: %v", err)
}
}
return nil
@@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
}
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, channelID)
`, channelID); err != nil {
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
}
}
if m.broadcaster != nil {
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"direction": "outbound",
})
}); err != nil {
log.Printf("Channels: failed to broadcast outbound event: %v", err)
}
}
return nil
@@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
return ""
}
var config map[string]interface{}
json.Unmarshal(configJSON, &config)
if err := json.Unmarshal(configJSON, &config); err != nil {
log.Printf("Channels: failed to unmarshal channel config: %v", err)
return ""
}
if err := DecryptSensitiveFields(config); err != nil {
return ""
}
@@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
if err != nil {
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
}
json.Unmarshal(configJSON, &ch.Config)
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
}
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
}
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
// methods downstream read them as plaintext strings.
if err := DecryptSensitiveFields(ch.Config); err != nil {
+12 -4
View File
@@ -482,10 +482,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
if apiErr.Code == 429 {
retryAfter := time.Duration(apiErr.RetryAfter) * time.Second
log.Printf("Channels: Telegram poll rate-limited, sleeping %s", retryAfter)
timer := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-time.After(retryAfter):
case <-timer.C:
continue
}
}
@@ -495,10 +497,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
}
}
log.Printf("Channels: Telegram poll error: %v", err)
timer := time.NewTimer(telegramPollInterval)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-time.After(telegramPollInterval):
case <-timer.C:
continue
}
}
@@ -513,7 +517,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
// Acknowledge the button press (removes loading spinner)
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
bot.Send(ackCfg)
if _, err := bot.Send(ackCfg); err != nil {
log.Printf("telegram: failed to send callback ack: %v", err)
}
// Update the message to show what was clicked
decision := "approved"
@@ -525,7 +531,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
cb.Message.MessageID,
cb.Message.Text+"\n\n✅ CEO "+decision,
)
bot.Send(editMsg)
if _, err := bot.Send(editMsg); err != nil {
log.Printf("telegram: failed to send edit message: %v", err)
}
// Route the decision as an inbound message to the agent
inbound := &InboundMessage{
@@ -932,7 +932,12 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
ctx, cancel := context.WithCancel(parent)
sub, unsub := b.SubscribeSSE(workspaceID)
go func() {
defer unsub()
defer func() {
if r := recover(); r != nil {
log.Printf("a2a_proxy: PANIC in SSE idle watcher for %s: %v", workspaceID, r)
}
unsub()
}()
timer := time.NewTimer(idle)
defer timer.Stop()
for {
@@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
return
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
"approval_id": approvalID,
"action": body.Action,
"reason": body.Reason,
"task_id": body.TaskID,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval requested: %v", err)
}
// Auto-escalate to parent
var parentID *string
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
}
if parentID != nil {
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
"approval_id": approvalID,
"from_workspace_id": workspaceID,
"action": body.Action,
"reason": body.Reason,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
}
}
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
@@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
ctx := c.Request.Context()
// Auto-expire stale approvals (older than 10 min)
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
`)
`); err != nil {
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
}
rows, err := db.DB.QueryContext(ctx, `
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
@@ -211,11 +219,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
eventType = "APPROVAL_DENIED"
}
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
"approval_id": approvalID,
"decision": body.Decision,
"decided_by": decidedBy,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval decision: %v", err)
}
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
}
@@ -558,6 +558,11 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
// Process asynchronously — don't block the webhook response
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Channels: PANIC in async HandleInbound for workspace %s: %v", ch.WorkspaceID[:12], r)
}
}()
bgCtx := context.Background()
if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil {
log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err)
@@ -239,7 +239,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
// Siblings
if parentID.Valid {
siblings, _ := queryPeerMaps(`
siblings, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -247,7 +247,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
parentID.String, workspaceID)
peers = append(peers, siblings...)
} else {
siblings, _ := queryPeerMaps(`
siblings, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -257,7 +257,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
}
// Children
children, _ := queryPeerMaps(`
children, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -266,7 +266,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
// Parent
if parentID.Valid {
parent, _ := queryPeerMaps(`
parent, _ := queryPeerMaps(ctx, `
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
w.parent_id, w.active_tasks
@@ -303,8 +303,8 @@ func filterPeersByQuery(peers []map[string]interface{}, q string) []map[string]i
}
// queryPeerMaps returns clean JSON-serializable maps instead of Workspace structs.
func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{}, error) {
rows, err := db.DB.Query(query, args...)
func queryPeerMaps(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) {
rows, err := db.DB.QueryContext(ctx, query, args...)
if err != nil {
log.Printf("queryPeerMaps error: %v", err)
return nil, err
@@ -15,6 +15,7 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"strings"
@@ -54,6 +55,22 @@ func updateMCPDelegationStatus(ctx context.Context, db *sql.DB, workspaceID, del
}
}
// ─────────────────────────────────────────────────────────────────────────────
// mcpHTTPClient is a dedicated client for MCP bridge A2A calls.
// Per-request deadlines are enforced via context (30 s sync, 8 s async).
// Transport-level timeouts ensure dead workspaces fail fast instead of
// hanging on OS default TCP timeouts (~75 s Linux).
var mcpHTTPClient = &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ResponseHeaderTimeout: 30 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
},
}
// ─────────────────────────────────────────────────────────────────────────────
// Tool implementations
// ─────────────────────────────────────────────────────────────────────────────
@@ -231,7 +248,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
// so this header reflects a verified caller identity, not a spoofable value.
httpReq.Header.Set("X-Workspace-ID", callerID)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := mcpHTTPClient.Do(httpReq)
if err != nil {
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", err.Error())
return "", fmt.Errorf("A2A call failed: %w", err)
@@ -279,6 +296,11 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
// Fire and forget in a detached goroutine. Use a background context so
// the call is not cancelled when the HTTP request completes.
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("MCPHandler.delegate_task_async: PANIC for %s → %s: %v", callerID, targetID, r)
}
}()
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
defer cancel()
@@ -314,7 +336,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-Workspace-ID", callerID)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := mcpHTTPClient.Do(httpReq)
if err != nil {
log.Printf("MCPHandler.delegate_task_async: A2A call to %s: %v", targetID, err)
return
@@ -201,6 +201,11 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
// Auto-restart (small delay to ensure fs writes are flushed)
if h.restartFunc != nil {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("plugins_install: PANIC in delayed restart for %s: %v", workspaceID, r)
}
}()
time.Sleep(2 * time.Second)
h.restartFunc(workspaceID)
}()
@@ -133,24 +133,24 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// message bus.
keySet := map[string]struct{}{}
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
}
if rows, err := db.DB.QueryContext(ctx,
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
}
for k := range keySet {
d.EnvKeys = append(d.EnvKeys, k)
@@ -163,6 +163,8 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// workspace's status flips to 'online' or the deadline expires.
// Returns true on success; callers log+drop on false.
func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout time.Duration) bool {
ticker := time.NewTicker(restartContextOnlinePollInterval)
defer ticker.Stop()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
var status string
@@ -174,7 +176,7 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
select {
case <-ctx.Done():
return false
case <-time.After(restartContextOnlinePollInterval):
case <-ticker.C:
}
}
return false
+17 -2
View File
@@ -213,7 +213,12 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
// Bridge: container stdout → WebSocket
done := make(chan struct{})
go func() {
defer close(done)
defer func() {
if r := recover(); r != nil {
log.Printf("Terminal: PANIC in stdout bridge: %v", r)
}
close(done)
}()
buf := make([]byte, 4096)
for {
n, err := resp.Reader.Read(buf)
@@ -434,7 +439,12 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
// PTY → WebSocket
go func() {
defer close(done)
defer func() {
if r := recover(); r != nil {
log.Printf("Terminal: PANIC in PTY bridge: %v", r)
}
close(done)
}()
buf := make([]byte, 4096)
for {
n, err := ptmx.Read(buf)
@@ -456,6 +466,11 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
// WebSocket → PTY (stdin)
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Terminal: PANIC in stdin loop: %v", r)
}
}()
for {
_, msg, rErr := conn.ReadMessage()
if rErr != nil {
@@ -296,6 +296,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create workspace"})
return
}
defer func() { _ = tx.Rollback() }()
maxConcurrent := payload.MaxConcurrentTasks
if maxConcurrent <= 0 {
@@ -478,7 +479,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// from the external agent (with this token + its URL)
// flips the row to online.
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id); err != nil {
log.Printf("External workspace %s: status update failed: %v", id, err)
}
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
if tokErr != nil {
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
@@ -261,7 +261,7 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
workspaceAccess := payload.WorkspaceAccess
if (workspacePath == "" || workspaceAccess == "") && db.DB != nil {
var dbDir, dbAccess string
if err := db.DB.QueryRow(
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none') FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&dbDir, &dbAccess); err == nil {
@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"log"
"net/http"
"strconv"
"strings"
@@ -41,6 +42,11 @@ func NewMCPRateLimiter(rate int, interval time.Duration, ctx context.Context) *M
interval: interval,
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("mcp_ratelimit: PANIC in bucket cleanup: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
@@ -3,6 +3,7 @@ package middleware
import (
"context"
"log"
"net/http"
"strconv"
"strings"
@@ -35,6 +36,11 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
interval: interval,
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("ratelimit: PANIC in bucket cleanup: %v", r)
}
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
@@ -116,6 +116,11 @@ func sessionCachePut(key string, ok bool) {
func init() {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("session_auth: PANIC in cache sweeper: %v", r)
}
}()
// Jitter startup so restarts don't align sweeps.
time.Sleep(time.Duration(rand.Int64N(int64(sessionCacheSweepEvery))))
t := time.NewTicker(sessionCacheSweepEvery)
@@ -60,10 +60,12 @@ func RunWithRecover(ctx context.Context, name string, fn func(context.Context))
}
// Panic → back off and restart.
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return
case <-time.After(backoff):
case <-timer.C:
}
if backoff < maxBackoff {
backoff *= 2