Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 85f8d78ac0 | |||
| a7c3f6b7ed | |||
| d0c8dd8be8 | |||
| 1d37c0c44b | |||
| aebaef07dd | |||
| 3d295309b0 | |||
| 3018fdee2f | |||
| a5f8a6a4e0 | |||
| 88acde1197 | |||
| 1a7402a8aa | |||
| 9526ca537e | |||
| 7a7eafa991 | |||
| beb65b6c5c | |||
| dbbfb52cbd |
@@ -53,7 +53,7 @@ Molecule AI is the most powerful way to govern an AI agent organization in produ
|
||||
It combines the parts that are usually scattered across demos, internal glue code, and framework-specific tooling into one product:
|
||||
|
||||
- one org-native control plane for teams, roles, hierarchy, and lifecycle
|
||||
- one runtime layer that lets **nine** agent runtimes — LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, **Hermes**, **Gemini CLI**, OpenClaw, and **Codex** — run side by side behind one workspace contract
|
||||
- one runtime layer that lets **eight** agent runtimes — LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, **Hermes**, **Gemini CLI**, and OpenClaw — run side by side behind one workspace contract
|
||||
- one memory model that keeps recall, sharing, and skill evolution aligned with organizational boundaries (Memory v2 backed by pgvector for semantic recall)
|
||||
- one operational surface for observing, pausing, restarting, inspecting, and improving live workspaces
|
||||
|
||||
@@ -75,7 +75,7 @@ You do not wire collaboration paths by hand. Hierarchy defines the default commu
|
||||
|
||||
### 3. Runtime choice stops being a dead-end decision
|
||||
|
||||
LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, Hermes, Gemini CLI, OpenClaw, and Codex can all plug into the same workspace abstraction. Teams can standardize governance without forcing every group onto one runtime.
|
||||
LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, Hermes, Gemini CLI, and OpenClaw can all plug into the same workspace abstraction. Teams can standardize governance without forcing every group onto one runtime.
|
||||
|
||||
### 4. Memory is treated like infrastructure
|
||||
|
||||
@@ -120,7 +120,6 @@ Molecule AI is not trying to replace the frameworks below. It is the system that
|
||||
| **Hermes 4** | Shipping on `main` | Hybrid reasoning, native tools, json_schema (NousResearch/hermes-agent) | Option B upstream hook, A2A bridge to OpenAI-compat API, multi-provider provider derivation |
|
||||
| **Gemini CLI** | Shipping on `main` | Google Gemini CLI continuity | Workspace lifecycle, A2A, hierarchy-aware collaboration, shared ops plane |
|
||||
| **OpenClaw** | Shipping on `main` | CLI-native runtime with its own session model | Workspace lifecycle, templates, activity logs, topology-aware collaboration |
|
||||
| **Codex** | Shipping on `main` | OpenAI Codex CLI continuity | Workspace lifecycle, A2A, hierarchy-aware collaboration, shared ops plane |
|
||||
| **NemoClaw** | WIP on `feat/nemoclaw-t4-docker` | NVIDIA-oriented runtime path | Planned to join the same abstraction once merged; not yet part of `main` |
|
||||
|
||||
This is the key idea: **many agent runtimes, one organizational operating system**.
|
||||
@@ -210,7 +209,7 @@ The result is not just “an agent that learns.” It is **an organization that
|
||||
### Runtime
|
||||
|
||||
- unified `workspace/` image; thin AMI in production (us-east-2)
|
||||
- adapter-driven execution across **9 runtimes** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw, Codex)
|
||||
- adapter-driven execution across **8 runtimes** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw)
|
||||
- Agent Card registration
|
||||
- awareness-backed memory integration; **Memory v2 backed by pgvector** for semantic recall
|
||||
- plugin-mounted shared rules/skills
|
||||
@@ -240,6 +239,7 @@ The result is not just “an agent that learns.” It is **an organization that
|
||||
- no tunnel, no public endpoint — the plugin self-registers each watched workspace as `delivery_mode=poll` and long-polls `/activity?since_id=…`
|
||||
- multi-tenant friendly: one plugin install can watch workspaces across multiple Molecule tenants (`MOLECULE_PLATFORM_URLS` per-workspace)
|
||||
- install via the standard marketplace flow: `/plugin marketplace add Molecule-AI/molecule-mcp-claude-channel` → `/plugin install molecule-channel@molecule-mcp-claude-channel`
|
||||
|
||||
## Built For Teams That Need More Than A Demo
|
||||
|
||||
Molecule AI is especially strong when you need to run:
|
||||
@@ -260,7 +260,7 @@ Canvas (Next.js 15, warm-paper :3000) <--HTTP / WS--> Platform (Go 1.25 :8080)
|
||||
+------------------------- shows ------------------------> workspaces, teams, tasks, traces, events
|
||||
|
||||
Workspace Runtime (Python ≥3.11, image with adapters)
|
||||
- 9 runtimes: LangGraph / DeepAgents / Claude Code / CrewAI / AutoGen / Hermes / Gemini CLI / OpenClaw / Codex
|
||||
- 8 adapters: LangGraph / DeepAgents / Claude Code / CrewAI / AutoGen / Hermes / Gemini CLI / OpenClaw
|
||||
- Agent Card + A2A server (typed-SSOT response path, RFC #2967)
|
||||
- heartbeat + activity + awareness-backed memory (Memory v2 — pgvector semantic recall)
|
||||
- skills + plugins + hot reload
|
||||
@@ -328,7 +328,7 @@ Then open `http://localhost:3000`:
|
||||
|
||||
## Current Scope
|
||||
|
||||
The current `main` branch ships the core platform, Canvas v4 (warm-paper themed), Memory v2 (pgvector semantic recall), the typed-SSOT A2A response path (RFC #2967), **nine production runtimes** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw, Codex), skill lifecycle, and operational surfaces.
|
||||
The current `main` branch ships the core platform, Canvas v4 (warm-paper themed), Memory v2 (pgvector semantic recall), the typed-SSOT A2A response path (RFC #2967), **eight production adapters** (Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw), skill lifecycle, and operational surfaces.
|
||||
|
||||
The companion private repo [`molecule-controlplane`](https://git.moleculesai.app/molecule-ai/molecule-controlplane) provides the SaaS surface — multi-tenant orchestration on EC2 + Neon + Cloudflare Tunnels, KMS envelope encryption, WorkOS auth, Stripe billing, and a `tenant_resources` audit table with a 30-min reconciler.
|
||||
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
# 5-Axis Review: PR #3029 (fix #2989) + PR #3033 (docs refresh)
|
||||
|
||||
**Reviewer:** Kimi / Engineer-A
|
||||
**Date:** 2026-05-31
|
||||
**Scope:** Local review (CR2 auth-down, filling review gap per PM dispatch)
|
||||
|
||||
---
|
||||
|
||||
## PR #3029 — CP orphan sweeper + registry prefix abstraction
|
||||
|
||||
### Correctness ✅ (with 1 semantic conflict to resolve)
|
||||
|
||||
**cp_orphan_sweeper.go** — The deprovision split-write race fix is sound:
|
||||
- SELECT `status='removed' AND instance_id IS NOT NULL AND instance_id != ''` correctly targets leaked EC2s.
|
||||
- Stop → clear instance_id is idempotent; on Stop failure the row stays targeted for retry.
|
||||
- `ORDER BY updated_at DESC` + `LIMIT $1` + `UPDATE updated_at = now()` creates fair round-robin drain across cycles.
|
||||
- `supervised.RunWithRecover` wiring in `cmd/server/main.go` mirrors the Docker sweeper pattern.
|
||||
|
||||
**provisioner/registry.go** — Clean env-driven prefix abstraction:
|
||||
- `RegistryPrefix()` respects `MOLECULE_IMAGE_REGISTRY` override; falls back to GHCR OSS default.
|
||||
- `RuntimeImage()` returns `""` for unknown runtimes, forcing explicit fallback at call sites.
|
||||
- `computeRuntimeImages()` runs at init; captures prefix active at boot.
|
||||
|
||||
** provisioner.go migration** — Hardcoded map → `computeRuntimeImages()` is a safe refactor; no behavioral change for OSS default.
|
||||
|
||||
**admin_workspace_images.go** — `TemplateImageRef()` now uses `provisioner.RegistryPrefix()`; keeps admin ops and provisioner pulls consistent.
|
||||
|
||||
### Security ✅
|
||||
|
||||
- Sweeper SQL has no user-input surface; parameters are internal LIMIT constant and DB-generated IDs.
|
||||
- `RegistryPrefix()` reads env only; comment correctly notes it is deploy-time trusted (operator-set, not user-supplied).
|
||||
- No new secrets, auth tokens, or credential exposure.
|
||||
|
||||
### Performance ✅
|
||||
|
||||
- 60s tick / 30s deadline / LIMIT 100 is conservative and safe.
|
||||
- Sequential Stop calls share the 30s parent context; with typical CP DELETE latency (<1s), 100 orphans finish well within budget.
|
||||
- If CP is degraded, deadline expires, UPDATEs don't fire, and next cycle retries — no stampede.
|
||||
|
||||
### Style / Readability ✅
|
||||
|
||||
- Excellent docstrings; the `#2989` race narrative is clearly documented for future maintainers.
|
||||
- `CPOrphanReaper` interface is minimal and testable.
|
||||
- Nil-reaper and nil-DB guards follow existing patterns.
|
||||
- One minor nit: `cpSweepOnce` could return `[]string` of processed IDs to make post-hoc assertions easier, but the fake-reaper test pattern works fine as-is.
|
||||
|
||||
### Tests ✅ (excellent coverage)
|
||||
|
||||
| Scenario | Covered |
|
||||
|---|---|
|
||||
| Happy path: Stop succeeds, instance_id cleared | ✅ |
|
||||
| Stop fails, instance_id retained for retry | ✅ |
|
||||
| Empty result set (steady state) | ✅ |
|
||||
| Multiple orphans, partial failure, others proceed | ✅ |
|
||||
| DB query error (transient) | ✅ |
|
||||
| UPDATE error after Stop success (logs, continues) | ✅ |
|
||||
| Nil db.DB (defensive boot safety) | ✅ |
|
||||
| Nil reaper (disabled, no goroutine leak) | ✅ |
|
||||
| Boot sweep + tick cadence + ctx cancel | ✅ |
|
||||
| Registry prefix default / env override / empty env | ✅ |
|
||||
| Runtime image format for all known runtimes | ✅ |
|
||||
| Unknown runtime returns `""` | ✅ |
|
||||
| Registry override applies to ALL runtimes | ✅ |
|
||||
| Alphabetical order pin | ✅ |
|
||||
|
||||
**All tests pass:**
|
||||
```
|
||||
ok github.com/.../internal/registry 0.107s (9/9 CP sweeper tests)
|
||||
ok github.com/.../internal/provisioner 0.009s (7/7 registry tests)
|
||||
```
|
||||
|
||||
### ⚠️ BLOCKER: Semantic conflict with PR #3033
|
||||
|
||||
`registry.go` adds `"codex"` to `knownRuntimes`, making **9** production runtimes:
|
||||
```go
|
||||
knownRuntimes = []string{
|
||||
"autogen", "claude-code", "codex", "crewai", "deepagents",
|
||||
"gemini-cli", "hermes", "langgraph", "openclaw",
|
||||
}
|
||||
```
|
||||
|
||||
PR #3033 updates the README to claim **eight** production runtimes and explicitly lists:
|
||||
> Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw
|
||||
|
||||
`codex` is absent from the README compatibility table, the "What Ships In main" section, and the architecture diagram list. After both PRs merge, the code will support 9 runtimes but the docs will claim 8 — a public-facing drift.
|
||||
|
||||
**Fix path:** Add `codex` to the README runtime list in PR #3033 (or a fast-follow) so the count and table stay accurate. `codex` already exists in `manifest.json` and has a template repo, so it is legitimate to list as "shipping on main."
|
||||
|
||||
---
|
||||
|
||||
## PR #3033 — Docs refresh (README + branding assets)
|
||||
|
||||
### Correctness ✅ (with 1 semantic drift pending)
|
||||
|
||||
- Terminology standardization ("adapters" → "runtimes") is correct and consistent with platform usage.
|
||||
- Deploy buttons updated from `molecule-monorepo` → `molecule-core`.
|
||||
- Canvas v4, Memory v2, SaaS surface, RFC #2967 mentions are all factually accurate.
|
||||
- **Missing:** `codex` runtime (see blocker above).
|
||||
|
||||
### Security ✅
|
||||
|
||||
- SVG assets are static branding; no scripts, no external references beyond the existing `<style>` media query.
|
||||
- No auth or credential surface touched.
|
||||
|
||||
### Performance N/A
|
||||
|
||||
- Docs-only; no runtime impact.
|
||||
|
||||
### Style / Readability ✅
|
||||
|
||||
- warm-paper theme description is concise and helpful.
|
||||
- Architecture diagram update (Docker → EC2 + SSM, KMS, SaaS CP) is accurate.
|
||||
- Quick Start clone URL fixed.
|
||||
|
||||
### Tests N/A
|
||||
|
||||
- No code changes; no test delta.
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| PR | Verdict | Action needed |
|
||||
|---|---|---|
|
||||
| #3029 | **Approve with nit** | Merge-ready after confirming #3033 (or follow-up) adds `codex` to README runtime list. |
|
||||
| #3033 | **Approve with blocker** | Add `codex` to the 8-runtimes list (making 9) and to the compatibility table before merge. |
|
||||
|
||||
**Risk if both merge as-is:** Public docs understate runtime count by 1; operators reading README may think `codex` is not supported when the provisioner already knows about it.
|
||||
|
||||
**Recommended merge order:** #3029 first (adds runtime support), then #3033 with `codex` line added (docs catch up).
|
||||
@@ -60,7 +60,8 @@ func refreshEnvFromCP() error {
|
||||
req.Header.Set("Authorization", "Bearer "+adminToken)
|
||||
req.Header.Set("X-Molecule-Org-Id", orgID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("do request: %w", err)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package bundle
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
@@ -86,13 +87,20 @@ func Import(
|
||||
// PluginsPath set by caller if available
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("bundle/importer: PANIC during provision start for %s: %v", wsID, r)
|
||||
}
|
||||
}()
|
||||
provCtx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
||||
defer cancel()
|
||||
url, err := prov.Start(provCtx, cfg)
|
||||
if err != nil {
|
||||
markFailed(provCtx, wsID, broadcaster, err)
|
||||
} else if url != "" {
|
||||
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
|
||||
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -139,12 +147,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
|
||||
// markProvisionFailed in workspace-server/internal/handlers/
|
||||
// workspace_provision_shared.go.
|
||||
msg := err.Error()
|
||||
db.DB.ExecContext(ctx,
|
||||
if _, dbErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
|
||||
models.StatusFailed, msg, wsID)
|
||||
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
models.StatusFailed, msg, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
|
||||
}
|
||||
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
"error": msg,
|
||||
})
|
||||
}); bcErr != nil {
|
||||
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
|
||||
}
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) interface{} {
|
||||
|
||||
@@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
|
||||
// Update stats in DB
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, ch.ID)
|
||||
`, ch.ID); err != nil {
|
||||
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast event
|
||||
if m.broadcaster != nil {
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"username": msg.Username,
|
||||
"direction": "inbound",
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast inbound event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
|
||||
}
|
||||
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, channelID)
|
||||
`, channelID); err != nil {
|
||||
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.broadcaster != nil {
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"direction": "outbound",
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast outbound event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
|
||||
return ""
|
||||
}
|
||||
var config map[string]interface{}
|
||||
json.Unmarshal(configJSON, &config)
|
||||
if err := json.Unmarshal(configJSON, &config); err != nil {
|
||||
log.Printf("Channels: failed to unmarshal channel config: %v", err)
|
||||
return ""
|
||||
}
|
||||
if err := DecryptSensitiveFields(config); err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
|
||||
if err != nil {
|
||||
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
|
||||
}
|
||||
json.Unmarshal(configJSON, &ch.Config)
|
||||
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
|
||||
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
|
||||
}
|
||||
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
|
||||
}
|
||||
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
|
||||
// methods downstream read them as plaintext strings.
|
||||
if err := DecryptSensitiveFields(ch.Config); err != nil {
|
||||
|
||||
@@ -482,10 +482,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
if apiErr.Code == 429 {
|
||||
retryAfter := time.Duration(apiErr.RetryAfter) * time.Second
|
||||
log.Printf("Channels: Telegram poll rate-limited, sleeping %s", retryAfter)
|
||||
timer := time.NewTimer(retryAfter)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil
|
||||
case <-time.After(retryAfter):
|
||||
case <-timer.C:
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -495,10 +497,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
}
|
||||
}
|
||||
log.Printf("Channels: Telegram poll error: %v", err)
|
||||
timer := time.NewTimer(telegramPollInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil
|
||||
case <-time.After(telegramPollInterval):
|
||||
case <-timer.C:
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -513,7 +517,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
|
||||
// Acknowledge the button press (removes loading spinner)
|
||||
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
|
||||
bot.Send(ackCfg)
|
||||
if _, err := bot.Send(ackCfg); err != nil {
|
||||
log.Printf("telegram: failed to send callback ack: %v", err)
|
||||
}
|
||||
|
||||
// Update the message to show what was clicked
|
||||
decision := "approved"
|
||||
@@ -525,7 +531,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
cb.Message.MessageID,
|
||||
cb.Message.Text+"\n\n✅ CEO "+decision,
|
||||
)
|
||||
bot.Send(editMsg)
|
||||
if _, err := bot.Send(editMsg); err != nil {
|
||||
log.Printf("telegram: failed to send edit message: %v", err)
|
||||
}
|
||||
|
||||
// Route the decision as an inbound message to the agent
|
||||
inbound := &InboundMessage{
|
||||
|
||||
@@ -932,7 +932,12 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
sub, unsub := b.SubscribeSSE(workspaceID)
|
||||
go func() {
|
||||
defer unsub()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("a2a_proxy: PANIC in SSE idle watcher for %s: %v", workspaceID, r)
|
||||
}
|
||||
unsub()
|
||||
}()
|
||||
timer := time.NewTimer(idle)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
|
||||
@@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
"task_id": body.TaskID,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval requested: %v", err)
|
||||
}
|
||||
|
||||
// Auto-escalate to parent
|
||||
var parentID *string
|
||||
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
|
||||
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
|
||||
}
|
||||
if parentID != nil {
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
|
||||
@@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Auto-expire stale approvals (older than 10 min)
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
|
||||
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
|
||||
`)
|
||||
`); err != nil {
|
||||
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
|
||||
@@ -211,11 +219,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
|
||||
eventType = "APPROVAL_DENIED"
|
||||
}
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"decision": body.Decision,
|
||||
"decided_by": decidedBy,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval decision: %v", err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
|
||||
}
|
||||
|
||||
@@ -558,6 +558,11 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
|
||||
|
||||
// Process asynchronously — don't block the webhook response
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Channels: PANIC in async HandleInbound for workspace %s: %v", ch.WorkspaceID[:12], r)
|
||||
}
|
||||
}()
|
||||
bgCtx := context.Background()
|
||||
if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil {
|
||||
log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err)
|
||||
|
||||
@@ -239,7 +239,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
|
||||
// Siblings
|
||||
if parentID.Valid {
|
||||
siblings, _ := queryPeerMaps(`
|
||||
siblings, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -247,7 +247,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
parentID.String, workspaceID)
|
||||
peers = append(peers, siblings...)
|
||||
} else {
|
||||
siblings, _ := queryPeerMaps(`
|
||||
siblings, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -257,7 +257,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Children
|
||||
children, _ := queryPeerMaps(`
|
||||
children, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -266,7 +266,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
|
||||
// Parent
|
||||
if parentID.Valid {
|
||||
parent, _ := queryPeerMaps(`
|
||||
parent, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -303,8 +303,8 @@ func filterPeersByQuery(peers []map[string]interface{}, q string) []map[string]i
|
||||
}
|
||||
|
||||
// queryPeerMaps returns clean JSON-serializable maps instead of Workspace structs.
|
||||
func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||||
rows, err := db.DB.Query(query, args...)
|
||||
func queryPeerMaps(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||||
rows, err := db.DB.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
log.Printf("queryPeerMaps error: %v", err)
|
||||
return nil, err
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
@@ -54,6 +55,22 @@ func updateMCPDelegationStatus(ctx context.Context, db *sql.DB, workspaceID, del
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// mcpHTTPClient is a dedicated client for MCP bridge A2A calls.
|
||||
// Per-request deadlines are enforced via context (30 s sync, 8 s async).
|
||||
// Transport-level timeouts ensure dead workspaces fail fast instead of
|
||||
// hanging on OS default TCP timeouts (~75 s Linux).
|
||||
var mcpHTTPClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: 30 * time.Second,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Tool implementations
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -231,7 +248,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
// so this header reflects a verified caller identity, not a spoofable value.
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
resp, err := mcpHTTPClient.Do(httpReq)
|
||||
if err != nil {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", err.Error())
|
||||
return "", fmt.Errorf("A2A call failed: %w", err)
|
||||
@@ -279,6 +296,11 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
// Fire and forget in a detached goroutine. Use a background context so
|
||||
// the call is not cancelled when the HTTP request completes.
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: PANIC for %s → %s: %v", callerID, targetID, r)
|
||||
}
|
||||
}()
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -314,7 +336,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
resp, err := mcpHTTPClient.Do(httpReq)
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A call to %s: %v", targetID, err)
|
||||
return
|
||||
|
||||
@@ -201,6 +201,11 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
|
||||
// Auto-restart (small delay to ensure fs writes are flushed)
|
||||
if h.restartFunc != nil {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("plugins_install: PANIC in delayed restart for %s: %v", workspaceID, r)
|
||||
}
|
||||
}()
|
||||
time.Sleep(2 * time.Second)
|
||||
h.restartFunc(workspaceID)
|
||||
}()
|
||||
|
||||
@@ -133,24 +133,24 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
|
||||
// message bus.
|
||||
keySet := map[string]struct{}{}
|
||||
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var k string
|
||||
if rows.Scan(&k) == nil {
|
||||
keySet[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
if rows, err := db.DB.QueryContext(ctx,
|
||||
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
|
||||
); err == nil {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var k string
|
||||
if rows.Scan(&k) == nil {
|
||||
keySet[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
for k := range keySet {
|
||||
d.EnvKeys = append(d.EnvKeys, k)
|
||||
@@ -163,6 +163,8 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
|
||||
// workspace's status flips to 'online' or the deadline expires.
|
||||
// Returns true on success; callers log+drop on false.
|
||||
func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout time.Duration) bool {
|
||||
ticker := time.NewTicker(restartContextOnlinePollInterval)
|
||||
defer ticker.Stop()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
var status string
|
||||
@@ -174,7 +176,7 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(restartContextOnlinePollInterval):
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -213,7 +213,12 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
|
||||
// Bridge: container stdout → WebSocket
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in stdout bridge: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := resp.Reader.Read(buf)
|
||||
@@ -434,7 +439,12 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
|
||||
|
||||
// PTY → WebSocket
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in PTY bridge: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := ptmx.Read(buf)
|
||||
@@ -456,6 +466,11 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
|
||||
|
||||
// WebSocket → PTY (stdin)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in stdin loop: %v", r)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
_, msg, rErr := conn.ReadMessage()
|
||||
if rErr != nil {
|
||||
|
||||
@@ -296,6 +296,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create workspace"})
|
||||
return
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
maxConcurrent := payload.MaxConcurrentTasks
|
||||
if maxConcurrent <= 0 {
|
||||
@@ -478,7 +479,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// from the external agent (with this token + its URL)
|
||||
// flips the row to online.
|
||||
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id); err != nil {
|
||||
log.Printf("External workspace %s: status update failed: %v", id, err)
|
||||
}
|
||||
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
|
||||
if tokErr != nil {
|
||||
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
|
||||
|
||||
@@ -261,7 +261,7 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
|
||||
workspaceAccess := payload.WorkspaceAccess
|
||||
if (workspacePath == "" || workspaceAccess == "") && db.DB != nil {
|
||||
var dbDir, dbAccess string
|
||||
if err := db.DB.QueryRow(
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&dbDir, &dbAccess); err == nil {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -41,6 +42,11 @@ func NewMCPRateLimiter(rate int, interval time.Duration, ctx context.Context) *M
|
||||
interval: interval,
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("mcp_ratelimit: PANIC in bucket cleanup: %v", r)
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
@@ -3,6 +3,7 @@ package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -35,6 +36,11 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
|
||||
interval: interval,
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("ratelimit: PANIC in bucket cleanup: %v", r)
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
@@ -116,6 +116,11 @@ func sessionCachePut(key string, ok bool) {
|
||||
|
||||
func init() {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("session_auth: PANIC in cache sweeper: %v", r)
|
||||
}
|
||||
}()
|
||||
// Jitter startup so restarts don't align sweeps.
|
||||
time.Sleep(time.Duration(rand.Int64N(int64(sessionCacheSweepEvery))))
|
||||
t := time.NewTicker(sessionCacheSweepEvery)
|
||||
|
||||
@@ -60,10 +60,12 @@ func RunWithRecover(ctx context.Context, name string, fn func(context.Context))
|
||||
}
|
||||
|
||||
// Panic → back off and restart.
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
case <-timer.C:
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
|
||||
Reference in New Issue
Block a user