Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ffd1bb7fc7 | |||
| d9e6ce5792 | |||
| 007be71d7f | |||
| 14c6c8761a | |||
| ce2980ab67 |
@@ -1,130 +0,0 @@
|
||||
# 5-Axis Review: PR #3029 (fix #2989) + PR #3033 (docs refresh)
|
||||
|
||||
**Reviewer:** Kimi / Engineer-A
|
||||
**Date:** 2026-05-31
|
||||
**Scope:** Local review (CR2 auth-down, filling review gap per PM dispatch)
|
||||
|
||||
---
|
||||
|
||||
## PR #3029 — CP orphan sweeper + registry prefix abstraction
|
||||
|
||||
### Correctness ✅ (with 1 semantic conflict to resolve)
|
||||
|
||||
**cp_orphan_sweeper.go** — The deprovision split-write race fix is sound:
|
||||
- SELECT `status='removed' AND instance_id IS NOT NULL AND instance_id != ''` correctly targets leaked EC2s.
|
||||
- Stop → clear instance_id is idempotent; on Stop failure the row stays targeted for retry.
|
||||
- `ORDER BY updated_at DESC` + `LIMIT $1` + `UPDATE updated_at = now()` creates fair round-robin drain across cycles.
|
||||
- `supervised.RunWithRecover` wiring in `cmd/server/main.go` mirrors the Docker sweeper pattern.
|
||||
|
||||
**provisioner/registry.go** — Clean env-driven prefix abstraction:
|
||||
- `RegistryPrefix()` respects `MOLECULE_IMAGE_REGISTRY` override; falls back to GHCR OSS default.
|
||||
- `RuntimeImage()` returns `""` for unknown runtimes, forcing explicit fallback at call sites.
|
||||
- `computeRuntimeImages()` runs at init; captures prefix active at boot.
|
||||
|
||||
** provisioner.go migration** — Hardcoded map → `computeRuntimeImages()` is a safe refactor; no behavioral change for OSS default.
|
||||
|
||||
**admin_workspace_images.go** — `TemplateImageRef()` now uses `provisioner.RegistryPrefix()`; keeps admin ops and provisioner pulls consistent.
|
||||
|
||||
### Security ✅
|
||||
|
||||
- Sweeper SQL has no user-input surface; parameters are internal LIMIT constant and DB-generated IDs.
|
||||
- `RegistryPrefix()` reads env only; comment correctly notes it is deploy-time trusted (operator-set, not user-supplied).
|
||||
- No new secrets, auth tokens, or credential exposure.
|
||||
|
||||
### Performance ✅
|
||||
|
||||
- 60s tick / 30s deadline / LIMIT 100 is conservative and safe.
|
||||
- Sequential Stop calls share the 30s parent context; with typical CP DELETE latency (<1s), 100 orphans finish well within budget.
|
||||
- If CP is degraded, deadline expires, UPDATEs don't fire, and next cycle retries — no stampede.
|
||||
|
||||
### Style / Readability ✅
|
||||
|
||||
- Excellent docstrings; the `#2989` race narrative is clearly documented for future maintainers.
|
||||
- `CPOrphanReaper` interface is minimal and testable.
|
||||
- Nil-reaper and nil-DB guards follow existing patterns.
|
||||
- One minor nit: `cpSweepOnce` could return `[]string` of processed IDs to make post-hoc assertions easier, but the fake-reaper test pattern works fine as-is.
|
||||
|
||||
### Tests ✅ (excellent coverage)
|
||||
|
||||
| Scenario | Covered |
|
||||
|---|---|
|
||||
| Happy path: Stop succeeds, instance_id cleared | ✅ |
|
||||
| Stop fails, instance_id retained for retry | ✅ |
|
||||
| Empty result set (steady state) | ✅ |
|
||||
| Multiple orphans, partial failure, others proceed | ✅ |
|
||||
| DB query error (transient) | ✅ |
|
||||
| UPDATE error after Stop success (logs, continues) | ✅ |
|
||||
| Nil db.DB (defensive boot safety) | ✅ |
|
||||
| Nil reaper (disabled, no goroutine leak) | ✅ |
|
||||
| Boot sweep + tick cadence + ctx cancel | ✅ |
|
||||
| Registry prefix default / env override / empty env | ✅ |
|
||||
| Runtime image format for all known runtimes | ✅ |
|
||||
| Unknown runtime returns `""` | ✅ |
|
||||
| Registry override applies to ALL runtimes | ✅ |
|
||||
| Alphabetical order pin | ✅ |
|
||||
|
||||
**All tests pass:**
|
||||
```
|
||||
ok github.com/.../internal/registry 0.107s (9/9 CP sweeper tests)
|
||||
ok github.com/.../internal/provisioner 0.009s (7/7 registry tests)
|
||||
```
|
||||
|
||||
### ⚠️ BLOCKER: Semantic conflict with PR #3033
|
||||
|
||||
`registry.go` adds `"codex"` to `knownRuntimes`, making **9** production runtimes:
|
||||
```go
|
||||
knownRuntimes = []string{
|
||||
"autogen", "claude-code", "codex", "crewai", "deepagents",
|
||||
"gemini-cli", "hermes", "langgraph", "openclaw",
|
||||
}
|
||||
```
|
||||
|
||||
PR #3033 updates the README to claim **eight** production runtimes and explicitly lists:
|
||||
> Claude Code, Hermes, Gemini CLI, LangGraph, DeepAgents, CrewAI, AutoGen, OpenClaw
|
||||
|
||||
`codex` is absent from the README compatibility table, the "What Ships In main" section, and the architecture diagram list. After both PRs merge, the code will support 9 runtimes but the docs will claim 8 — a public-facing drift.
|
||||
|
||||
**Fix path:** Add `codex` to the README runtime list in PR #3033 (or a fast-follow) so the count and table stay accurate. `codex` already exists in `manifest.json` and has a template repo, so it is legitimate to list as "shipping on main."
|
||||
|
||||
---
|
||||
|
||||
## PR #3033 — Docs refresh (README + branding assets)
|
||||
|
||||
### Correctness ✅ (with 1 semantic drift pending)
|
||||
|
||||
- Terminology standardization ("adapters" → "runtimes") is correct and consistent with platform usage.
|
||||
- Deploy buttons updated from `molecule-monorepo` → `molecule-core`.
|
||||
- Canvas v4, Memory v2, SaaS surface, RFC #2967 mentions are all factually accurate.
|
||||
- **Missing:** `codex` runtime (see blocker above).
|
||||
|
||||
### Security ✅
|
||||
|
||||
- SVG assets are static branding; no scripts, no external references beyond the existing `<style>` media query.
|
||||
- No auth or credential surface touched.
|
||||
|
||||
### Performance N/A
|
||||
|
||||
- Docs-only; no runtime impact.
|
||||
|
||||
### Style / Readability ✅
|
||||
|
||||
- warm-paper theme description is concise and helpful.
|
||||
- Architecture diagram update (Docker → EC2 + SSM, KMS, SaaS CP) is accurate.
|
||||
- Quick Start clone URL fixed.
|
||||
|
||||
### Tests N/A
|
||||
|
||||
- No code changes; no test delta.
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| PR | Verdict | Action needed |
|
||||
|---|---|---|
|
||||
| #3029 | **Approve with nit** | Merge-ready after confirming #3033 (or follow-up) adds `codex` to README runtime list. |
|
||||
| #3033 | **Approve with blocker** | Add `codex` to the 8-runtimes list (making 9) and to the compatibility table before merge. |
|
||||
|
||||
**Risk if both merge as-is:** Public docs understate runtime count by 1; operators reading README may think `codex` is not supported when the provisioner already knows about it.
|
||||
|
||||
**Recommended merge order:** #3029 first (adds runtime support), then #3033 with `codex` line added (docs catch up).
|
||||
@@ -60,8 +60,7 @@ func refreshEnvFromCP() error {
|
||||
req.Header.Set("Authorization", "Bearer "+adminToken)
|
||||
req.Header.Set("X-Molecule-Org-Id", orgID)
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("do request: %w", err)
|
||||
}
|
||||
@@ -106,3 +105,53 @@ func refreshEnvFromCP() error {
|
||||
log.Printf("CP env refresh: applied %d values from %s/cp/tenants/config", applied, base)
|
||||
return nil
|
||||
}
|
||||
|
||||
// requiredLLMEnvVars is the set of LLM proxy env vars a managed SaaS
|
||||
// tenant must have populated after refreshEnvFromCP. cp#469 (tenant
|
||||
// proxy-env delivery) — guaranteed CP-delivered creds reach the
|
||||
// tenant process env on boot. Per Researcher Task #37 / Spec 2 and
|
||||
// Task #46 (watch-fail-first test).
|
||||
//
|
||||
// Key set byte-matched against Researcher's verified emission in
|
||||
// controlplane tenant_config.go:140-144 (Researcher REQUEST_CHANGES
|
||||
// iterate body, 3987f59c). The four keys below ARE the LLM-proxy
|
||||
// subset of the 8 CP-emitted keys; OPENAI_BASE_URL / OPENAI_API_KEY /
|
||||
// ANTHROPIC_BASE_URL / ANTHROPIC_API_KEY are out of scope for cp#469
|
||||
// (different feature surfaces — direct-to-provider fallbacks, not
|
||||
// the proxy). v2 fix: MOLECULE_LLM_USAGE_TOKEN, MOLECULE_LLM_USAGE_URL,
|
||||
// MOLECULE_LLM_BASE_URL, MOLECULE_LLM_ANTHROPIC_BASE_URL — note the
|
||||
// 4th key is namespaced MOLECULE_LLM_ANTHROPIC_BASE_URL, NOT bare
|
||||
// ANTHROPIC_BASE_URL. Bare ANTHROPIC_BASE_URL is a separate CP-emitted
|
||||
// key for direct-provider use, not the LLM proxy.
|
||||
var requiredLLMEnvVars = []string{
|
||||
"MOLECULE_LLM_USAGE_TOKEN",
|
||||
"MOLECULE_LLM_USAGE_URL", // CRITICAL fix v2: was MOLECULE_LLM_URL in v1
|
||||
"MOLECULE_LLM_BASE_URL",
|
||||
"MOLECULE_LLM_ANTHROPIC_BASE_URL", // CRITICAL fix v3: was ANTHROPIC_BASE_URL in v2 (different key!)
|
||||
}
|
||||
|
||||
// assertManagedTenantHasLLMEnv verifies that, when running as a
|
||||
// managed SaaS tenant (MOLECULE_ORG_ID + ADMIN_TOKEN both set), all
|
||||
// required LLM proxy env vars are populated after refreshEnvFromCP.
|
||||
//
|
||||
// Self-hosted (no orgID/adminToken) is exempt — dev must not be
|
||||
// blocked here. Managed tenants with missing LLM keys fail with
|
||||
// MISSING_CP_LLM_ENV so they do not silently boot with broken proxy
|
||||
// creds. Caller in main.go decides whether to log and continue or
|
||||
// log.Fatalf depending on deployment context.
|
||||
func assertManagedTenantHasLLMEnv() error {
|
||||
if os.Getenv("MOLECULE_ORG_ID") == "" || os.Getenv("ADMIN_TOKEN") == "" {
|
||||
// Self-hosted dev / not yet provisioned — not a managed tenant.
|
||||
return nil
|
||||
}
|
||||
var missing []string
|
||||
for _, k := range requiredLLMEnvVars {
|
||||
if os.Getenv(k) == "" {
|
||||
missing = append(missing, k)
|
||||
}
|
||||
}
|
||||
if len(missing) > 0 {
|
||||
return fmt.Errorf("MISSING_CP_LLM_ENV: required LLM proxy keys not set after refreshEnvFromCP: %v", missing)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -47,6 +48,138 @@ func TestRefreshEnvFromCP_AppliesCPResponse(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshEnvFromCP_ManagedTenantRequiresLLMKeys: watch-fail-first
|
||||
// per Researcher Task #46. When running as a managed tenant
|
||||
// (MOLECULE_ORG_ID + ADMIN_TOKEN set), missing LLM proxy env vars
|
||||
// after refreshEnvFromCP MUST surface as MISSING_CP_LLM_ENV, not be
|
||||
// silently accepted. Without this guard, a CP that loses its LLM
|
||||
// creds (e.g. during an incident) would let a tenant boot and then
|
||||
// fail later at first LLM call — worse than a loud refusal here.
|
||||
func TestRefreshEnvFromCP_ManagedTenantRequiresLLMKeys(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Stub CP returns a CP response WITHOUT any of the required
|
||||
// LLM keys — simulates the failure mode where the CP side
|
||||
// dropped or never had the LLM creds for this org.
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"MOLECULE_CP_SHARED_SECRET":"x","MOLECULE_CP_URL":"https://api.moleculesai.app"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
t.Setenv("MOLECULE_ORG_ID", "org-managed-1")
|
||||
t.Setenv("ADMIN_TOKEN", "admin-tok")
|
||||
t.Setenv("MOLECULE_CP_URL", srv.URL)
|
||||
// Clear all LLM keys to simulate the boot-without-LLM-env failure mode.
|
||||
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_BASE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
|
||||
|
||||
// refreshEnvFromCP itself should succeed — CP is reachable, returned 200.
|
||||
if err := refreshEnvFromCP(); err != nil {
|
||||
t.Fatalf("refreshEnvFromCP: %v", err)
|
||||
}
|
||||
// The boot assertion must catch the missing LLM keys.
|
||||
err := assertManagedTenantHasLLMEnv()
|
||||
if err == nil {
|
||||
t.Fatal("expected MISSING_CP_LLM_ENV error for managed tenant without LLM keys, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "MISSING_CP_LLM_ENV") {
|
||||
t.Errorf("expected error to contain MISSING_CP_LLM_ENV, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshEnvFromCP_ManagedTenantHappyPath: when the CP returns
|
||||
// all 4 LLM-proxy keys, the gate must PASS — no MISSING_CP_LLM_ENV
|
||||
// for a properly-configured managed tenant. Watch-fail counterpart
|
||||
// to TestRefreshEnvFromCP_ManagedTenantRequiresLLMKeys: if THIS test
|
||||
// ever fires MISSING_CP_LLM_ENV on the byte-correct key set, the
|
||||
// requiredLLMEnvVars list has drifted from the CP emission again.
|
||||
// Per Researcher REQUEST_CHANGES TEST ADEQUACY note.
|
||||
func TestRefreshEnvFromCP_ManagedTenantHappyPath(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
// Return ALL 4 LLM-proxy keys — names byte-matched to
|
||||
// tenant_config.go:140-144 CP emission.
|
||||
fmt.Fprint(w, `{"MOLECULE_LLM_USAGE_TOKEN":"tok-1","MOLECULE_LLM_USAGE_URL":"https://llm.example.com/usage","MOLECULE_LLM_BASE_URL":"https://llm.example.com","MOLECULE_LLM_ANTHROPIC_BASE_URL":"https://llm.example.com/anthropic"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
t.Setenv("MOLECULE_ORG_ID", "org-managed-happy")
|
||||
t.Setenv("ADMIN_TOKEN", "admin-tok")
|
||||
t.Setenv("MOLECULE_CP_URL", srv.URL)
|
||||
// Pre-clear so we can verify the refresh actually populated them.
|
||||
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_BASE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
|
||||
|
||||
if err := refreshEnvFromCP(); err != nil {
|
||||
t.Fatalf("refreshEnvFromCP: %v", err)
|
||||
}
|
||||
// Sanity: refresh actually applied the keys.
|
||||
if got := os.Getenv("MOLECULE_LLM_USAGE_TOKEN"); got != "tok-1" {
|
||||
t.Errorf("refresh did not apply USAGE_TOKEN: got %q", got)
|
||||
}
|
||||
// The boot assertion must pass — no MISSING_CP_LLM_ENV.
|
||||
if err := assertManagedTenantHasLLMEnv(); err != nil {
|
||||
t.Errorf("managed happy path must not MISSING_CP_LLM_ENV, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshEnvFromCP_ManagedTenantPartialEnv: when the CP returns
|
||||
// 3 of 4 LLM-proxy keys (one missing), the gate must STILL catch it
|
||||
// and the error must name the missing key. Per Researcher
|
||||
// REQUEST_CHANGES TEST ADEQUACY note — partial-env coverage is
|
||||
// critical because the production failure mode is usually "one
|
||||
// key dropped" not "all keys dropped".
|
||||
func TestRefreshEnvFromCP_ManagedTenantPartialEnv(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
// 3 of 4 — MOLECULE_LLM_ANTHROPIC_BASE_URL is missing.
|
||||
fmt.Fprint(w, `{"MOLECULE_LLM_USAGE_TOKEN":"tok-1","MOLECULE_LLM_USAGE_URL":"https://llm.example.com/usage","MOLECULE_LLM_BASE_URL":"https://llm.example.com"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
t.Setenv("MOLECULE_ORG_ID", "org-managed-partial")
|
||||
t.Setenv("ADMIN_TOKEN", "admin-tok")
|
||||
t.Setenv("MOLECULE_CP_URL", srv.URL)
|
||||
// Pre-clear all 4 so the 3 that come back from CP are the only
|
||||
// ones set; the 4th (MOLECULE_LLM_ANTHROPIC_BASE_URL) stays empty.
|
||||
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_BASE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
|
||||
|
||||
if err := refreshEnvFromCP(); err != nil {
|
||||
t.Fatalf("refreshEnvFromCP: %v", err)
|
||||
}
|
||||
err := assertManagedTenantHasLLMEnv()
|
||||
if err == nil {
|
||||
t.Fatal("expected MISSING_CP_LLM_ENV for partial env (3 of 4 keys), got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "MISSING_CP_LLM_ENV") {
|
||||
t.Errorf("expected error to contain MISSING_CP_LLM_ENV, got: %v", err)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "MOLECULE_LLM_ANTHROPIC_BASE_URL") {
|
||||
t.Errorf("expected error to name the missing key MOLECULE_LLM_ANTHROPIC_BASE_URL, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAssertManagedTenantHasLLMEnv_NotManagedIsNoop: self-hosted
|
||||
// (no orgID/adminToken) must NOT block on missing LLM keys — dev
|
||||
// ergonomics matter and the assertion's contract is "managed only".
|
||||
func TestAssertManagedTenantHasLLMEnv_NotManagedIsNoop(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("ADMIN_TOKEN", "")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_BASE_URL", "")
|
||||
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "")
|
||||
if err := assertManagedTenantHasLLMEnv(); err != nil {
|
||||
t.Errorf("self-hosted (not managed) must not block, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshEnvFromCP_CPUnreachableDoesNotFailBoot: network errors must
|
||||
// return non-nil BUT main.go treats that as warn-and-continue. We assert
|
||||
// the function returns an error (not a panic) so the caller can log.
|
||||
|
||||
@@ -56,6 +56,16 @@ func main() {
|
||||
log.Printf("CP env refresh: %v (continuing with baked-in env)", err)
|
||||
}
|
||||
|
||||
// Managed-tenant boot assertion (cp#469 — tenant proxy-env delivery).
|
||||
// If we're a managed SaaS tenant (orgID + adminToken set), all required
|
||||
// LLM proxy env vars must be present after refresh. Missing keys block
|
||||
// the tenant from booting with broken LLM creds — silent-fail is worse
|
||||
// than a loud refusal. Self-hosted (no orgID/adminToken) short-circuits
|
||||
// inside the assertion, so this never fires for dev.
|
||||
if err := assertManagedTenantHasLLMEnv(); err != nil {
|
||||
log.Fatalf("Managed tenant boot assertion: %v", err)
|
||||
}
|
||||
|
||||
// Secrets encryption. In MOLECULE_ENV=prod, boot refuses to start
|
||||
// without a valid SECRETS_ENCRYPTION_KEY (fail-secure — Top-5 #5).
|
||||
// In any other environment, missing keys just log a warning and
|
||||
|
||||
@@ -3,7 +3,6 @@ package bundle
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
@@ -87,20 +86,13 @@ func Import(
|
||||
// PluginsPath set by caller if available
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("bundle/importer: PANIC during provision start for %s: %v", wsID, r)
|
||||
}
|
||||
}()
|
||||
provCtx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
||||
defer cancel()
|
||||
url, err := prov.Start(provCtx, cfg)
|
||||
if err != nil {
|
||||
markFailed(provCtx, wsID, broadcaster, err)
|
||||
} else if url != "" {
|
||||
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
|
||||
}
|
||||
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -147,16 +139,12 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
|
||||
// markProvisionFailed in workspace-server/internal/handlers/
|
||||
// workspace_provision_shared.go.
|
||||
msg := err.Error()
|
||||
if _, dbErr := db.DB.ExecContext(ctx,
|
||||
db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
|
||||
models.StatusFailed, msg, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
|
||||
}
|
||||
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
models.StatusFailed, msg, wsID)
|
||||
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
"error": msg,
|
||||
}); bcErr != nil {
|
||||
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) interface{} {
|
||||
|
||||
@@ -375,25 +375,21 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
|
||||
// Update stats in DB
|
||||
if db.DB != nil {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, ch.ID); err != nil {
|
||||
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
|
||||
}
|
||||
`, ch.ID)
|
||||
}
|
||||
|
||||
// Broadcast event
|
||||
if m.broadcaster != nil {
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"username": msg.Username,
|
||||
"direction": "inbound",
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast inbound event: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -424,23 +420,19 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
|
||||
}
|
||||
|
||||
if db.DB != nil {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, channelID); err != nil {
|
||||
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
|
||||
}
|
||||
`, channelID)
|
||||
}
|
||||
|
||||
if m.broadcaster != nil {
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"direction": "outbound",
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast outbound event: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -506,10 +498,7 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
|
||||
return ""
|
||||
}
|
||||
var config map[string]interface{}
|
||||
if err := json.Unmarshal(configJSON, &config); err != nil {
|
||||
log.Printf("Channels: failed to unmarshal channel config: %v", err)
|
||||
return ""
|
||||
}
|
||||
json.Unmarshal(configJSON, &config)
|
||||
if err := DecryptSensitiveFields(config); err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -566,12 +555,8 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
|
||||
if err != nil {
|
||||
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
|
||||
}
|
||||
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
|
||||
}
|
||||
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
|
||||
}
|
||||
json.Unmarshal(configJSON, &ch.Config)
|
||||
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
|
||||
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
|
||||
// methods downstream read them as plaintext strings.
|
||||
if err := DecryptSensitiveFields(ch.Config); err != nil {
|
||||
|
||||
@@ -482,12 +482,10 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
if apiErr.Code == 429 {
|
||||
retryAfter := time.Duration(apiErr.RetryAfter) * time.Second
|
||||
log.Printf("Channels: Telegram poll rate-limited, sleeping %s", retryAfter)
|
||||
timer := time.NewTimer(retryAfter)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil
|
||||
case <-timer.C:
|
||||
case <-time.After(retryAfter):
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -497,12 +495,10 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
}
|
||||
}
|
||||
log.Printf("Channels: Telegram poll error: %v", err)
|
||||
timer := time.NewTimer(telegramPollInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil
|
||||
case <-timer.C:
|
||||
case <-time.After(telegramPollInterval):
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -517,9 +513,7 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
|
||||
// Acknowledge the button press (removes loading spinner)
|
||||
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
|
||||
if _, err := bot.Send(ackCfg); err != nil {
|
||||
log.Printf("telegram: failed to send callback ack: %v", err)
|
||||
}
|
||||
bot.Send(ackCfg)
|
||||
|
||||
// Update the message to show what was clicked
|
||||
decision := "approved"
|
||||
@@ -531,9 +525,7 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
cb.Message.MessageID,
|
||||
cb.Message.Text+"\n\n✅ CEO "+decision,
|
||||
)
|
||||
if _, err := bot.Send(editMsg); err != nil {
|
||||
log.Printf("telegram: failed to send edit message: %v", err)
|
||||
}
|
||||
bot.Send(editMsg)
|
||||
|
||||
// Route the decision as an inbound message to the agent
|
||||
inbound := &InboundMessage{
|
||||
|
||||
@@ -932,12 +932,7 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
sub, unsub := b.SubscribeSSE(workspaceID)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("a2a_proxy: PANIC in SSE idle watcher for %s: %v", workspaceID, r)
|
||||
}
|
||||
unsub()
|
||||
}()
|
||||
defer unsub()
|
||||
timer := time.NewTimer(idle)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
|
||||
@@ -51,29 +51,23 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
"task_id": body.TaskID,
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval requested: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Auto-escalate to parent
|
||||
var parentID *string
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
|
||||
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
|
||||
}
|
||||
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
|
||||
if parentID != nil {
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
|
||||
@@ -86,12 +80,10 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Auto-expire stale approvals (older than 10 min)
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
db.DB.ExecContext(ctx, `
|
||||
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
|
||||
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
|
||||
`); err != nil {
|
||||
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
|
||||
}
|
||||
`)
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
|
||||
@@ -219,13 +211,11 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
|
||||
eventType = "APPROVAL_DENIED"
|
||||
}
|
||||
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"decision": body.Decision,
|
||||
"decided_by": decidedBy,
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval decision: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
|
||||
}
|
||||
|
||||
@@ -558,11 +558,6 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
|
||||
|
||||
// Process asynchronously — don't block the webhook response
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Channels: PANIC in async HandleInbound for workspace %s: %v", ch.WorkspaceID[:12], r)
|
||||
}
|
||||
}()
|
||||
bgCtx := context.Background()
|
||||
if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil {
|
||||
log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err)
|
||||
|
||||
@@ -239,7 +239,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
|
||||
// Siblings
|
||||
if parentID.Valid {
|
||||
siblings, _ := queryPeerMaps(ctx, `
|
||||
siblings, _ := queryPeerMaps(`
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -247,7 +247,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
parentID.String, workspaceID)
|
||||
peers = append(peers, siblings...)
|
||||
} else {
|
||||
siblings, _ := queryPeerMaps(ctx, `
|
||||
siblings, _ := queryPeerMaps(`
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -257,7 +257,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Children
|
||||
children, _ := queryPeerMaps(ctx, `
|
||||
children, _ := queryPeerMaps(`
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -266,7 +266,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
|
||||
// Parent
|
||||
if parentID.Valid {
|
||||
parent, _ := queryPeerMaps(ctx, `
|
||||
parent, _ := queryPeerMaps(`
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -303,8 +303,8 @@ func filterPeersByQuery(peers []map[string]interface{}, q string) []map[string]i
|
||||
}
|
||||
|
||||
// queryPeerMaps returns clean JSON-serializable maps instead of Workspace structs.
|
||||
func queryPeerMaps(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||||
rows, err := db.DB.QueryContext(ctx, query, args...)
|
||||
func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||||
rows, err := db.DB.Query(query, args...)
|
||||
if err != nil {
|
||||
log.Printf("queryPeerMaps error: %v", err)
|
||||
return nil, err
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
@@ -55,22 +54,6 @@ func updateMCPDelegationStatus(ctx context.Context, db *sql.DB, workspaceID, del
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// mcpHTTPClient is a dedicated client for MCP bridge A2A calls.
|
||||
// Per-request deadlines are enforced via context (30 s sync, 8 s async).
|
||||
// Transport-level timeouts ensure dead workspaces fail fast instead of
|
||||
// hanging on OS default TCP timeouts (~75 s Linux).
|
||||
var mcpHTTPClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: 30 * time.Second,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Tool implementations
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -248,7 +231,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
// so this header reflects a verified caller identity, not a spoofable value.
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := mcpHTTPClient.Do(httpReq)
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
if err != nil {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", err.Error())
|
||||
return "", fmt.Errorf("A2A call failed: %w", err)
|
||||
@@ -296,11 +279,6 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
// Fire and forget in a detached goroutine. Use a background context so
|
||||
// the call is not cancelled when the HTTP request completes.
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: PANIC for %s → %s: %v", callerID, targetID, r)
|
||||
}
|
||||
}()
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -336,7 +314,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := mcpHTTPClient.Do(httpReq)
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A call to %s: %v", targetID, err)
|
||||
return
|
||||
|
||||
@@ -201,11 +201,6 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
|
||||
// Auto-restart (small delay to ensure fs writes are flushed)
|
||||
if h.restartFunc != nil {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("plugins_install: PANIC in delayed restart for %s: %v", workspaceID, r)
|
||||
}
|
||||
}()
|
||||
time.Sleep(2 * time.Second)
|
||||
h.restartFunc(workspaceID)
|
||||
}()
|
||||
|
||||
@@ -133,24 +133,24 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
|
||||
// message bus.
|
||||
keySet := map[string]struct{}{}
|
||||
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var k string
|
||||
if rows.Scan(&k) == nil {
|
||||
keySet[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
if rows, err := db.DB.QueryContext(ctx,
|
||||
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
|
||||
); err == nil {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var k string
|
||||
if rows.Scan(&k) == nil {
|
||||
keySet[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
for k := range keySet {
|
||||
d.EnvKeys = append(d.EnvKeys, k)
|
||||
@@ -163,8 +163,6 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
|
||||
// workspace's status flips to 'online' or the deadline expires.
|
||||
// Returns true on success; callers log+drop on false.
|
||||
func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout time.Duration) bool {
|
||||
ticker := time.NewTicker(restartContextOnlinePollInterval)
|
||||
defer ticker.Stop()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
var status string
|
||||
@@ -176,7 +174,7 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-ticker.C:
|
||||
case <-time.After(restartContextOnlinePollInterval):
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -213,12 +213,7 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
|
||||
// Bridge: container stdout → WebSocket
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in stdout bridge: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
defer close(done)
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := resp.Reader.Read(buf)
|
||||
@@ -439,12 +434,7 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
|
||||
|
||||
// PTY → WebSocket
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in PTY bridge: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
defer close(done)
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := ptmx.Read(buf)
|
||||
@@ -466,11 +456,6 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
|
||||
|
||||
// WebSocket → PTY (stdin)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in stdin loop: %v", r)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
_, msg, rErr := conn.ReadMessage()
|
||||
if rErr != nil {
|
||||
|
||||
@@ -296,7 +296,6 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create workspace"})
|
||||
return
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
maxConcurrent := payload.MaxConcurrentTasks
|
||||
if maxConcurrent <= 0 {
|
||||
@@ -479,9 +478,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// from the external agent (with this token + its URL)
|
||||
// flips the row to online.
|
||||
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id); err != nil {
|
||||
log.Printf("External workspace %s: status update failed: %v", id, err)
|
||||
}
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
|
||||
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
|
||||
if tokErr != nil {
|
||||
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
|
||||
|
||||
@@ -261,7 +261,7 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
|
||||
workspaceAccess := payload.WorkspaceAccess
|
||||
if (workspacePath == "" || workspaceAccess == "") && db.DB != nil {
|
||||
var dbDir, dbAccess string
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
if err := db.DB.QueryRow(
|
||||
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&dbDir, &dbAccess); err == nil {
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -42,11 +41,6 @@ func NewMCPRateLimiter(rate int, interval time.Duration, ctx context.Context) *M
|
||||
interval: interval,
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("mcp_ratelimit: PANIC in bucket cleanup: %v", r)
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
@@ -3,7 +3,6 @@ package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -36,11 +35,6 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
|
||||
interval: interval,
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("ratelimit: PANIC in bucket cleanup: %v", r)
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
@@ -116,11 +116,6 @@ func sessionCachePut(key string, ok bool) {
|
||||
|
||||
func init() {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("session_auth: PANIC in cache sweeper: %v", r)
|
||||
}
|
||||
}()
|
||||
// Jitter startup so restarts don't align sweeps.
|
||||
time.Sleep(time.Duration(rand.Int64N(int64(sessionCacheSweepEvery))))
|
||||
t := time.NewTicker(sessionCacheSweepEvery)
|
||||
|
||||
@@ -60,12 +60,10 @@ func RunWithRecover(ctx context.Context, name string, fn func(context.Context))
|
||||
}
|
||||
|
||||
// Panic → back off and restart.
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
|
||||
Reference in New Issue
Block a user