Merge branch 'main' into fix/a2a-tools-and-workflow-cleanup
This commit is contained in:
commit
fe1b3d9a82
2
.github/workflows/publish-runtime.yml
vendored
2
.github/workflows/publish-runtime.yml
vendored
@ -180,7 +180,7 @@ jobs:
|
||||
# environment pypi-publish. The action mints a short-lived OIDC
|
||||
# token and exchanges it for a PyPI upload credential — no static
|
||||
# API token in this repo's secrets.
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
uses: pypa/gh-action-pypi-publish@cef221092ed1bacb1cc03d23a2d87d1d172e277b # release/v1
|
||||
with:
|
||||
packages-dir: ${{ runner.temp }}/runtime-build/dist/
|
||||
|
||||
|
||||
2
.github/workflows/secret-pattern-drift.yml
vendored
2
.github/workflows/secret-pattern-drift.yml
vendored
@ -48,7 +48,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 5
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
|
||||
with:
|
||||
|
||||
@ -269,6 +269,28 @@ Each workspace exposes an A2A server, builds an Agent Card, and registers with t
|
||||
|
||||
But the long-term collaboration model remains direct workspace-to-workspace communication via A2A.
|
||||
|
||||
## Known Limitations
|
||||
|
||||
### Playwright / browser system libs are not installed
|
||||
|
||||
The base `molecule-ai-workspace-runtime` image (`workspace/Dockerfile`) is built on `python:3.11-slim` with Node.js 22, git, and `gh` — about 500 MB. It deliberately **does not** include the system libraries Chromium needs (`libnss3`, `libatk-bridge2.0-0`, `libxkbcommon0`, `libcups2`, `libdrm2`, `libxcomposite1`, `libxdamage1`, `libxrandr2`, `libgbm1`, `libpango-1.0-0`, `libasound2`, etc.). Adding them would inflate the image by ~200–250 MB (~40%) for every workspace, even though only frontend / QA workspaces ever launch a browser.
|
||||
|
||||
Practical consequences:
|
||||
|
||||
- `npx playwright test` (and any other Chromium-driven E2E tooling) **will fail at browser launch** when run from inside an in-container workspace agent.
|
||||
- The error surface is missing-shared-object messages such as `error while loading shared libraries: libnss3.so` or `Host system is missing dependencies to run browsers`.
|
||||
- Unit and integration tests (Vitest, Jest, etc.) that don't spawn a real browser are unaffected.
|
||||
|
||||
Recommended workflow:
|
||||
|
||||
1. **Run E2E in CI**, not in-container. The Gitea Actions self-hosted runner (and the GitHub Actions runner used by mirror repos) has the full Playwright dep set installed and is the supported surface for E2E. Push a branch, let CI run the suite.
|
||||
2. **Local debugging** of a single failing spec is best done on a developer laptop with `npx playwright install-deps` run once.
|
||||
3. **In-container iteration** on test logic itself is fine — write specs, lint them, type-check them — just don't expect `playwright test` to actually launch a browser.
|
||||
|
||||
If a particular workspace role genuinely needs in-container E2E (a dedicated QA template, for instance), the right place to layer Playwright deps is in a **role-specific adapter template image** that does `FROM molecule-ai-workspace-runtime:<tag>` and adds `RUN npx playwright install-deps`. Open a request against `molecule-ai-workspace-runtime` if you need this template stamped.
|
||||
|
||||
Tracking issue: [molecule-ai/molecule-app#7](https://git.moleculesai.app/molecule-ai/molecule-app/issues/7).
|
||||
|
||||
## Related Docs
|
||||
|
||||
- [Agent Runtime Adapters](./cli-runtime.md)
|
||||
|
||||
@ -4,7 +4,6 @@ go 1.25.0
|
||||
|
||||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce
|
||||
github.com/alicebob/miniredis/v2 v2.37.0
|
||||
github.com/creack/pty v1.1.24
|
||||
github.com/docker/docker v28.5.2+incompatible
|
||||
@ -19,6 +18,7 @@ require (
|
||||
github.com/opencontainers/image-spec v1.1.1
|
||||
github.com/redis/go-redis/v9 v9.19.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce
|
||||
golang.org/x/crypto v0.50.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@ -4,8 +4,6 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/Molecule-AI/molecule-ai-plugin-gh-identity v0.0.0-20260424033845-4fd5ac7be30f h1:YkLRhUg+9qr9OV9N8dG1Hj0Ml7TThHlRwh5F//oUJVs=
|
||||
github.com/Molecule-AI/molecule-ai-plugin-gh-identity v0.0.0-20260424033845-4fd5ac7be30f/go.mod h1:NqdtlWZDJvpXNJRHnMkPhTKHdA1LZTNH+63TB66JSOU=
|
||||
github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68=
|
||||
github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
@ -154,6 +152,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
|
||||
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
|
||||
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce h1:ftm0ba0ukLlfqeFes+/jWnXH8XULXmRpMy3fOCZ83/U=
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce/go.mod h1:0aAqoDle2V7Cywso94MXdv1DH/HEe/0oZmcbqWYMK7g=
|
||||
go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE=
|
||||
go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
|
||||
@ -8,7 +8,6 @@ package handlers
|
||||
// POST /admin/plugin-updates/:id/apply — apply a queued drift update
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
@ -1262,4 +1262,3 @@ func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
@ -326,7 +327,7 @@ func (h *MCPHandler) Call(c *gin.Context) {
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, mcpResponse{
|
||||
JSONRPC: "2.0",
|
||||
Error: &mcpRPCError{Code: -32700, Message: "parse error: " + err.Error()},
|
||||
Error: &mcpRPCError{Code: -32700, Message: "parse error"},
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -414,12 +415,16 @@ func (h *MCPHandler) dispatchRPC(ctx context.Context, workspaceID string, req mc
|
||||
Arguments map[string]interface{} `json:"arguments"`
|
||||
}
|
||||
if err := json.Unmarshal(req.Params, ¶ms); err != nil {
|
||||
base.Error = &mcpRPCError{Code: -32602, Message: "invalid params: " + err.Error()}
|
||||
base.Error = &mcpRPCError{Code: -32602, Message: "invalid parameters"}
|
||||
return base
|
||||
}
|
||||
text, err := h.dispatch(ctx, workspaceID, params.Name, params.Arguments)
|
||||
if err != nil {
|
||||
base.Error = &mcpRPCError{Code: -32000, Message: err.Error()}
|
||||
// Log full error server-side for forensics; return constant string
|
||||
// to client per OFFSEC-001 / #259. WorkspaceAuth required — caller
|
||||
// already authenticated, so this is defence-in-depth.
|
||||
log.Printf("mcp: tool call failed workspace=%s tool=%s: %v", workspaceID, params.Name, err)
|
||||
base.Error = &mcpRPCError{Code: -32000, Message: "tool call failed"}
|
||||
return base
|
||||
}
|
||||
base.Result = map[string]interface{}{
|
||||
|
||||
@ -1024,3 +1024,126 @@ func TestIsPrivateOrMetadataIP_PublicAllowed(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_Call_MalformedJSON returns constant parse-error message.
|
||||
// Per OFFSEC-001 / #259: err.Error() must not leak struct field names or
|
||||
// JSON library internals in JSON-RPC error.message.
|
||||
func TestMCPHandler_Call_MalformedJSON_ReturnsConstantParseError(t *testing.T) {
|
||||
h, _ := newMCPHandler(t)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
// Valid JSON-RPC 2.0 envelope but JSON body is malformed.
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("not valid json{][")))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.Call(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp.Error == nil {
|
||||
t.Fatal("expected JSON-RPC error, got nil")
|
||||
}
|
||||
// Message must be a constant — no err.Error() content.
|
||||
if resp.Error.Message != "parse error" {
|
||||
t.Errorf("error message should be constant 'parse error', got: %q", resp.Error.Message)
|
||||
}
|
||||
// Code must be -32700 (Parse error).
|
||||
if resp.Error.Code != -32700 {
|
||||
t.Errorf("error code should be -32700, got: %d", resp.Error.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_dispatchRPC_InvalidParams returns constant message.
|
||||
// Per OFFSEC-001 / #259: err.Error() from json.Unmarshal must not be
|
||||
// returned in JSON-RPC error.message.
|
||||
func TestMCPHandler_dispatchRPC_InvalidParams_ReturnsConstantMessage(t *testing.T) {
|
||||
h, _ := newMCPHandler(t)
|
||||
|
||||
// Valid JSON-RPC but params is a string (not an object) — invalid for tools/call.
|
||||
w := mcpPost(t, h, "ws-1", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "tools/call",
|
||||
"params": "not an object", // string instead of object — json.Unmarshal fails
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp.Error == nil {
|
||||
t.Fatal("expected JSON-RPC error, got nil")
|
||||
}
|
||||
// Message must be a constant — no JSON library error content.
|
||||
if resp.Error.Message != "invalid parameters" {
|
||||
t.Errorf("error message should be constant 'invalid parameters', got: %q", resp.Error.Message)
|
||||
}
|
||||
if resp.Error.Code != -32602 {
|
||||
t.Errorf("error code should be -32602 (Invalid params), got: %d", resp.Error.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_dispatchRPC_UnknownTool returns constant tool-failed message.
|
||||
// Per OFFSEC-001 / #259: dispatch errors must not leak workspace IDs or
|
||||
// internal paths. Note: this test exercises the dispatch path through
|
||||
// dispatchRPC since dispatch is package-private.
|
||||
func TestMCPHandler_dispatchRPC_UnknownTool_ReturnsConstantMessage(t *testing.T) {
|
||||
h, _ := newMCPHandler(t)
|
||||
|
||||
// Valid params shape but tool name does not exist.
|
||||
w := mcpPost(t, h, "ws-1", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "nonexistent_tool_xyz",
|
||||
"arguments": map[string]interface{}{},
|
||||
},
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp.Error == nil {
|
||||
t.Fatal("expected JSON-RPC error for unknown tool, got nil")
|
||||
}
|
||||
// Message must be a constant — no "unknown tool: nonexistent_tool_xyz" leak.
|
||||
if resp.Error.Message != "tool call failed" {
|
||||
t.Errorf("error message should be constant 'tool call failed', got: %q", resp.Error.Message)
|
||||
}
|
||||
if resp.Error.Code != -32000 {
|
||||
t.Errorf("error code should be -32000 (Server error), got: %d", resp.Error.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_dispatchRPC_InvalidParams_NilParams covers the edge case
|
||||
// where params is present but not an object (e.g. an array). json.Unmarshal
|
||||
// into the params struct fails, and we assert the constant error message.
|
||||
func TestMCPHandler_dispatchRPC_InvalidParams_ArrayInsteadOfObject(t *testing.T) {
|
||||
h, _ := newMCPHandler(t)
|
||||
|
||||
w := mcpPost(t, h, "ws-1", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 3,
|
||||
"method": "tools/call",
|
||||
"params": []interface{}{"one", "two"}, // array instead of object
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp.Error == nil {
|
||||
t.Fatal("expected JSON-RPC error, got nil")
|
||||
}
|
||||
if resp.Error.Message != "invalid parameters" {
|
||||
t.Errorf("error message should be constant 'invalid parameters', got: %q", resp.Error.Message)
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,7 +112,10 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH
|
||||
|
||||
// Sources returns the underlying plugin source registry. Used by main.go to
|
||||
// pass the same registry to the drift sweeper so both share resolver state.
|
||||
func (h *PluginsHandler) Sources() plugins.SourceResolver {
|
||||
// Returns the narrow pluginSources interface so callers receive only the
|
||||
// methods they need (Register, Resolve, Schemes), not the full SourceResolver
|
||||
// contract with Fetch.
|
||||
func (h *PluginsHandler) Sources() pluginSources {
|
||||
return h.sources
|
||||
}
|
||||
|
||||
|
||||
@ -120,7 +120,7 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
|
||||
// Try Redis cache first.
|
||||
agentURL, err := db.GetCachedURL(ctx, workspaceID)
|
||||
if err == nil && agentURL != "" {
|
||||
return rewriteForDocker(agentURL, workspaceID), nil
|
||||
return h.rewriteForDocker(agentURL, workspaceID), nil
|
||||
}
|
||||
|
||||
// Cache miss — fall back to DB.
|
||||
@ -136,13 +136,13 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
|
||||
}
|
||||
agentURL = *urlNullable
|
||||
_ = db.CacheURL(ctx, workspaceID, agentURL)
|
||||
return rewriteForDocker(agentURL, workspaceID), nil
|
||||
return h.rewriteForDocker(agentURL, workspaceID), nil
|
||||
}
|
||||
|
||||
// rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form
|
||||
// when the platform is running inside a Docker container. When platform is
|
||||
// on the host (non-Docker), 127.0.0.1 IS the host and the original URL works.
|
||||
func rewriteForDocker(agentURL, workspaceID string) string {
|
||||
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
|
||||
if platformInDocker && h.provisioner != nil {
|
||||
// Only rewrite if the URL points to localhost (the ephemeral port
|
||||
// binding the container published to the host). Internal Docker
|
||||
|
||||
@ -97,10 +97,10 @@ func TestRewriteForDocker_LocalhostUrlRewritten(t *testing.T) {
|
||||
// TestResolveAgentURLForRestartSignal_CacheHit verifies that a Redis-cached
|
||||
// URL is returned without hitting the DB.
|
||||
func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) {
|
||||
mockDB, mock := setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
_ = setupTestDB(t) // db.DB must be set before setupTestRedisWithURL
|
||||
_ = setupTestRedisWithURL(t, "http://cached.internal:9000/agent")
|
||||
|
||||
h := newHandlerWithTestDepsWithDB(t, mockDB)
|
||||
h := newHandlerWithTestDeps(t)
|
||||
|
||||
// Redis cache hit → DB should NOT be queried
|
||||
url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-hit-123")
|
||||
@ -110,19 +110,18 @@ func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) {
|
||||
if url == "" {
|
||||
t.Fatal("expected non-empty URL from cache")
|
||||
}
|
||||
// DB should not be queried (no rows returned to sqlmock)
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unfulfilled DB expectations: %v", err)
|
||||
if url != "http://cached.internal:9000/agent" {
|
||||
t.Errorf("expected cached URL, got %q", url)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveAgentURLForRestartSignal_DBError verifies that a DB error is
|
||||
// returned and propagated when neither Redis cache nor DB lookup succeeds.
|
||||
func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) {
|
||||
mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
|
||||
_ = setupTestRedis(t) // empty → cache miss
|
||||
mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
|
||||
_ = setupTestRedis(t) // empty → cache miss
|
||||
|
||||
h := newHandlerWithTestDepsWithDB(t, mockDB)
|
||||
h := newHandlerWithTestDeps(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-db-err-789").
|
||||
@ -141,10 +140,10 @@ func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) {
|
||||
// TestResolveAgentURLForRestartSignal_CacheMiss verifies that on Redis miss,
|
||||
// the URL is fetched from the DB and cached.
|
||||
func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
|
||||
mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
|
||||
mr := setupTestRedis(t) // empty → cache miss
|
||||
mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
|
||||
_ = setupTestRedis(t) // empty → cache miss
|
||||
|
||||
h := newHandlerWithTestDepsWithDB(t, mockDB)
|
||||
h := newHandlerWithTestDeps(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-cache-miss-456").
|
||||
@ -159,10 +158,12 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
|
||||
t.Errorf("expected DB URL, got %q", url)
|
||||
}
|
||||
|
||||
// Verify the URL was cached in Redis
|
||||
cached, err := mr.Get(context.Background(), "ws:ws-cache-miss-456:url").Result()
|
||||
// Verify the URL was cached in Redis via db.GetCachedURL.
|
||||
// GetCachedURL takes workspaceID and builds the key internally, so
|
||||
// pass "ws-cache-miss-456" (not the full "ws:ws-cache-miss-456:url").
|
||||
cached, err := db.GetCachedURL(context.Background(), "ws-cache-miss-456")
|
||||
if err != nil {
|
||||
t.Fatalf("URL was not cached in Redis: %v", err)
|
||||
t.Fatalf("URL cache read failed: %v", err)
|
||||
}
|
||||
if cached != "http://db.internal:8000/agent" {
|
||||
t.Errorf("expected cached URL %q, got %q", "http://db.internal:8000/agent", cached)
|
||||
@ -175,9 +176,7 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
|
||||
// TestGracefulPreRestart_Success verifies that when the workspace returns 200,
|
||||
// the signal is logged as acknowledged without error.
|
||||
func TestGracefulPreRestart_Success(t *testing.T) {
|
||||
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
|
||||
mr := setupTestRedisWithURL(t, "http://localhost:18000/agent")
|
||||
_ = setupTestDB(t)
|
||||
|
||||
// httptest server simulating the workspace container's /signals/restart_pending
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -206,44 +205,40 @@ func TestGracefulPreRestart_Success(t *testing.T) {
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
mr.Set("ws:ws-ack-789:url", srv.URL, 5*time.Minute)
|
||||
|
||||
// Patch the handler's resolveAgentURLForRestartSignal to return the test server URL
|
||||
// (avoids needing a real provisioner for this test)
|
||||
h := newHandlerWithTestDeps(t)
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return srv.URL + "/agent", nil
|
||||
// Pre-populate Redis cache with the test server URL
|
||||
_ = setupTestRedisWithURL(t, srv.URL)
|
||||
|
||||
// Use an embedded struct to override resolveAgentURLForRestartSignal.
|
||||
hWrapper := &resolveURLTestWrapper{
|
||||
WorkspaceHandler: newHandlerWithTestDeps(t),
|
||||
testURL: srv.URL + "/agent",
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
// gracefulPreRestart runs in a goroutine with its own timeout.
|
||||
// We give it time to complete before the test ends.
|
||||
h.gracefulPreRestart(context.Background(), "ws-ack-789")
|
||||
hWrapper.gracefulPreRestart(context.Background(), "ws-ack-789")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
// TestGracefulPreRestart_NotImplemented verifies that when the workspace returns
|
||||
// 404 (old SDK version), the platform proceeds gracefully (log + no error).
|
||||
func TestGracefulPreRestart_NotImplemented(t *testing.T) {
|
||||
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
|
||||
mr := setupTestRedisWithURL(t, "http://localhost:18001/agent")
|
||||
_ = setupTestDB(t)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}))
|
||||
defer srv.Close()
|
||||
mr.Set("ws:ws-noimpl-999:url", srv.URL, 5*time.Minute)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return srv.URL + "/agent", nil
|
||||
_ = setupTestRedisWithURL(t, srv.URL)
|
||||
|
||||
hWrapper := &resolveURLTestWrapper{
|
||||
WorkspaceHandler: newHandlerWithTestDeps(t),
|
||||
testURL: srv.URL + "/agent",
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
h.gracefulPreRestart(context.Background(), "ws-noimpl-999")
|
||||
hWrapper.gracefulPreRestart(context.Background(), "ws-noimpl-999")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// No panic or error expected — graceful degradation
|
||||
}
|
||||
@ -251,19 +246,17 @@ func TestGracefulPreRestart_NotImplemented(t *testing.T) {
|
||||
// TestGracefulPreRestart_ConnectionRefused verifies that when the workspace
|
||||
// is unreachable, the platform proceeds gracefully without error.
|
||||
func TestGracefulPreRestart_ConnectionRefused(t *testing.T) {
|
||||
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
_ = setupTestDB(t)
|
||||
|
||||
mr := setupTestRedisWithURL(t, "http://localhost:19999/agent") // nothing listening on 19999
|
||||
mr.Set("ws:ws-unreachable-000:url", "http://localhost:19999/agent", 5*time.Minute)
|
||||
_ = mr
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return "http://localhost:19999/agent", nil
|
||||
hWrapper := &resolveURLTestWrapper{
|
||||
WorkspaceHandler: newHandlerWithTestDeps(t),
|
||||
testURL: "http://localhost:19999/agent",
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
h.gracefulPreRestart(context.Background(), "ws-unreachable-000")
|
||||
hWrapper.gracefulPreRestart(context.Background(), "ws-unreachable-000")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// No panic or error expected — proceeds with stop as documented
|
||||
}
|
||||
@ -274,36 +267,35 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
|
||||
_ = setupTestDB(t)
|
||||
_ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
|
||||
// Override resolveAgentURLForRestartSignal to return an error
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return "", context.DeadlineExceeded
|
||||
hWrapper := &resolveURLTestWrapper{
|
||||
WorkspaceHandler: newHandlerWithTestDeps(t),
|
||||
errToReturn: context.DeadlineExceeded,
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
h.gracefulPreRestart(context.Background(), "ws-url-err-111")
|
||||
hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// No panic or error expected — proceeds with stop as documented
|
||||
}
|
||||
|
||||
// ─── helpers ─────────────────────────────────────────────────────────────────
|
||||
|
||||
// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs.
|
||||
// provisioner is nil so rewriteForDocker returns URL unchanged.
|
||||
func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler {
|
||||
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
// resolveURLTestWrapper embeds *WorkspaceHandler and overrides
|
||||
// resolveAgentURLForRestartSignal so tests can inject a fixed URL or error.
|
||||
type resolveURLTestWrapper struct {
|
||||
*WorkspaceHandler
|
||||
testURL string
|
||||
errToReturn error
|
||||
}
|
||||
|
||||
// newHandlerWithTestDepsWithDB creates a WorkspaceHandler with a specific mock DB.
|
||||
// Use this when you need to control the DB mock expectations.
|
||||
func newHandlerWithTestDepsWithDB(t *testing.T, mockDB *sql.DB) *WorkspaceHandler {
|
||||
// We need to temporarily replace db.DB with our mock
|
||||
origDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = origDB })
|
||||
func (w *resolveURLTestWrapper) resolveAgentURLForRestartSignal(ctx context.Context, workspaceID string) (string, error) {
|
||||
if w.errToReturn != nil {
|
||||
return "", w.errToReturn
|
||||
}
|
||||
return w.testURL, nil
|
||||
}
|
||||
|
||||
// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs.
|
||||
func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler {
|
||||
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
}
|
||||
|
||||
@ -314,7 +306,6 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
|
||||
t.Fatalf("failed to start miniredis: %v", err)
|
||||
}
|
||||
db.RDB = redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
// Pre-populate a URL for the test workspace IDs used in these tests
|
||||
for _, wsID := range []string{"ws-cache-hit-123", "ws-cache-miss-456", "ws-ack-789", "ws-noimpl-999", "ws-unreachable-000"} {
|
||||
if err := db.CacheURL(context.Background(), wsID, url); err != nil {
|
||||
t.Fatalf("failed to cache URL for %s: %v", wsID, err)
|
||||
@ -322,9 +313,4 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
|
||||
}
|
||||
t.Cleanup(func() { mr.Close() })
|
||||
return mr
|
||||
}
|
||||
|
||||
// rewriteForDocker is exported from restart_signals.go so it can be tested here.
|
||||
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
|
||||
return rewriteForDocker(agentURL, workspaceID)
|
||||
}
|
||||
}
|
||||
@ -248,6 +248,19 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// Begin a transaction so the workspace row and any initial secrets are
|
||||
// committed atomically. A secret-encrypt or DB error rolls back the
|
||||
// workspace insert so we never leave a workspace row with missing secrets.
|
||||
|
||||
// SSRF guard: validate workspace URL before starting any DB transaction.
|
||||
// registry.go:324 calls this same guard for agent self-registration;
|
||||
// the admin-create path must be covered too (core#212).
|
||||
// Must stay above BeginTx so the rejection path never touches the DB.
|
||||
if payload.URL != "" {
|
||||
if err := validateAgentURL(payload.URL); err != nil {
|
||||
log.Printf("Create: workspace URL rejected: %v", err)
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "unsafe workspace URL: " + err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
tx, txErr := db.DB.BeginTx(ctx, nil)
|
||||
if txErr != nil {
|
||||
log.Printf("Create workspace: begin tx error: %v", txErr)
|
||||
@ -383,16 +396,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
if payload.External || payload.Runtime == "external" {
|
||||
var connectionToken string
|
||||
if payload.URL != "" {
|
||||
// SSRF guard (issue #212): validateAgentURL blocks cloud metadata
|
||||
// IPs (169.254/16), loopback, link-local, and RFC-1918 in
|
||||
// strict/self-hosted mode. AdminAuth is required here, but the
|
||||
// admin token could be leaked or a compromised insider — defence
|
||||
// in depth. Compare: registry.go:324 (heartbeat path) also
|
||||
// calls validateAgentURL; external_rotate.go should too.
|
||||
if err := validateAgentURL(payload.URL); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "unsafe workspace URL: " + err.Error()})
|
||||
return
|
||||
}
|
||||
// URL already validated by validateAgentURL above (before BeginTx).
|
||||
// Now persist it: the external URL is set after the workspace row
|
||||
// commits so that a failed URL UPDATE doesn't roll back the row.
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = 'external', updated_at = now() WHERE id = $3`, payload.URL, models.StatusOnline, id)
|
||||
if err := db.CacheURL(ctx, id, payload.URL); err != nil {
|
||||
log.Printf("External workspace: failed to cache URL for %s: %v", id, err)
|
||||
|
||||
@ -537,17 +537,15 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) {
|
||||
WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
// External URL update (SSRF-safe public URL passes validateAgentURL).
|
||||
// External URL update (localhost is explicitly allowed by validateAgentURL).
|
||||
mock.ExpectExec("UPDATE workspaces SET url").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// CacheURL is non-fatal but still called.
|
||||
mock.ExpectExec("SELECT").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"ok"}).AddRow("ok"))
|
||||
// CacheURL is non-fatal — uses Redis (db.RDB, set by setupTestRedis), not the DB.
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
body := `{"name":"Ext Agent","runtime":"external","external":true,"url":"https://agent.example.com/a2a"}`
|
||||
body := `{"name":"Ext Agent","runtime":"external","external":true,"url":"http://localhost:8000"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
|
||||
@ -9,7 +9,7 @@ package plugins
|
||||
// 1. SELECTs workspace_plugins rows where tracked_ref != 'none'
|
||||
// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA).
|
||||
// 2. For each row, resolves the tracked ref to its current upstream SHA
|
||||
// using the appropriate SourceResolver.
|
||||
// using the appropriate PluginResolver.
|
||||
// 3. If the resolved SHA differs from installed_sha → drift detected.
|
||||
// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so
|
||||
// a re-drift while a row is still pending is a no-op).
|
||||
@ -61,20 +61,33 @@ const DriftSweepInterval = 1 * time.Hour
|
||||
// that handles Gitea instances on high-latency links.
|
||||
const ResolveRefDeadline = 60 * time.Second
|
||||
|
||||
// SourceResolver resolves plugin sources to installable directories.
|
||||
// Satisfied by *Registry (which wraps GithubResolver + LocalResolver).
|
||||
type SourceResolver interface {
|
||||
// PluginResolver is the registry-level abstraction the sweeper consumes:
|
||||
// pick a per-scheme SourceResolver for a parsed Source, and enumerate the
|
||||
// registered schemes so we can strip the prefix from a stored source_raw.
|
||||
//
|
||||
// Resolve returns the production SourceResolver from source.go (NOT another
|
||||
// PluginResolver) — that's the actual shape of *Registry.Resolve, and the
|
||||
// sweeper only needs the per-scheme resolver's identity, not its Fetch.
|
||||
//
|
||||
// Named PluginResolver (not SourceResolver) to avoid redeclaring the
|
||||
// per-scheme SourceResolver interface defined in source.go (core#228 fix).
|
||||
// Satisfied by *Registry from source.go via Resolve + Schemes.
|
||||
type PluginResolver interface {
|
||||
Resolve(source Source) (SourceResolver, error)
|
||||
Schemes() []string
|
||||
}
|
||||
|
||||
// Compile-time assertion: *Registry satisfies PluginResolver. Catches any
|
||||
// future drift in Registry.Resolve / Schemes signatures at build time.
|
||||
var _ PluginResolver = (*Registry)(nil)
|
||||
|
||||
// StartPluginDriftSweeper runs the drift-detection loop until ctx is cancelled.
|
||||
// Pass a nil resolver to disable the sweeper (useful for harnesses or CP/SaaS
|
||||
// mode where git operations are unavailable).
|
||||
//
|
||||
// Registers itself via atexits in cmd/server/main.go so the process
|
||||
// shuts down cleanly on SIGTERM.
|
||||
func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
|
||||
func StartPluginDriftSweeper(ctx context.Context, resolver PluginResolver) {
|
||||
if resolver == nil {
|
||||
log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled")
|
||||
return
|
||||
@ -107,7 +120,7 @@ func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
|
||||
// sweepDriftOnce runs one full drift-detection cycle.
|
||||
// Errors are non-fatal — each row is handled independently so a single
|
||||
// slow row doesn't block the rest of the sweep.
|
||||
func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
|
||||
func sweepDriftOnce(parent context.Context, resolver PluginResolver) {
|
||||
ctx, cancel := context.WithTimeout(parent, 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
@ -170,7 +183,7 @@ func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
|
||||
// resolveLatestSHA resolves the tracked ref to its current upstream SHA.
|
||||
// Handles both github:// and local:// sources; local sources are skipped
|
||||
// (no meaningful upstream to drift against).
|
||||
func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) {
|
||||
func resolveLatestSHA(ctx context.Context, resolver PluginResolver, sourceRaw, trackedRef string) (string, error) {
|
||||
// Strip the scheme prefix to get the raw spec.
|
||||
// sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0"
|
||||
spec := sourceRaw
|
||||
@ -231,7 +244,7 @@ func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, c
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing.
|
||||
func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) {
|
||||
func SweepDriftOnceForTest(parent context.Context, resolver PluginResolver) {
|
||||
sweepDriftOnce(parent, resolver)
|
||||
}
|
||||
|
||||
|
||||
@ -2,12 +2,14 @@ package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// stubResolver is a SourceResolver that always returns a stub github resolver.
|
||||
// stubResolver is a PluginResolver that always returns a stub github
|
||||
// resolver. *GithubResolver satisfies the production SourceResolver from
|
||||
// source.go via Scheme() + Fetch(); the sweeper only uses Schemes() and
|
||||
// Resolve(), so the returned resolver's Fetch is never invoked here.
|
||||
type stubResolver struct {
|
||||
schemes []string
|
||||
}
|
||||
@ -156,8 +158,9 @@ func TestPluginUpdateQueueRow_Struct(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSourceResolverInterface_StubResolver verifies that a stub resolver
|
||||
// satisfies the SourceResolver interface.
|
||||
func TestSourceResolverInterface_StubResolver(t *testing.T) {
|
||||
var _ SourceResolver = (*stubResolver)(nil)
|
||||
// TestPluginResolverInterface_StubResolver verifies that a stub resolver
|
||||
// satisfies the PluginResolver interface (the sweeper-side abstraction
|
||||
// over *Registry — distinct from the per-scheme SourceResolver in source.go).
|
||||
func TestPluginResolverInterface_StubResolver(t *testing.T) {
|
||||
var _ PluginResolver = (*stubResolver)(nil)
|
||||
}
|
||||
|
||||
@ -27,7 +27,15 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle, pluginResolver plugins.SourceResolver) *gin.Engine {
|
||||
// Setup wires the gin router. pluginResolver is the registry-level resolver
|
||||
// (typically *plugins.Registry from main.go) reserved for future per-deploy
|
||||
// customisation — currently passed only to satisfy the call-site contract;
|
||||
// plgh (PluginsHandler) constructs its own internal registry with the
|
||||
// default github+local resolvers via NewPluginsHandler. The drift sweeper
|
||||
// (main.go) gets the same pluginResolver instance so it can share scheme
|
||||
// enumeration if a deployment registers extra schemes externally. A nil
|
||||
// pluginResolver is harmless: plgh still works with its built-in defaults.
|
||||
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle, pluginResolver plugins.PluginResolver) *gin.Engine {
|
||||
r := gin.Default()
|
||||
|
||||
// Issue #179 — trust no reverse-proxy headers. Without this call Gin's
|
||||
@ -499,6 +507,72 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
r.POST("/admin/workspace-images/refresh", middleware.AdminAuth(db.DB), imgH.Refresh)
|
||||
}
|
||||
|
||||
// dockerCli is shared across plugins, terminal, templates, and bundle
|
||||
// handlers. Declared up-front (was at line ~594) because the plugins
|
||||
// init block — moved here in 70f84823 to fix "undefined: plgh" — needs
|
||||
// dockerCli at construction time (NewPluginsHandler signature). Moving
|
||||
// only the plgh block left dockerCli used-before-declared. Same nil
|
||||
// guard semantics: prov nil → dockerCli nil → handlers fall back to
|
||||
// non-Docker paths or skip Docker-dependent routes.
|
||||
var dockerCli *client.Client
|
||||
if prov != nil {
|
||||
dockerCli = prov.DockerClient()
|
||||
}
|
||||
|
||||
// Plugins — plgh must be initialized before the drift handler that uses it.
|
||||
// Moved here (core#248 fix) because the drift handler block (core#123) was
|
||||
// registered before plgh was created, causing "undefined: plgh" on main.
|
||||
pluginsDir := findPluginsDir(configsDir)
|
||||
// Runtime lookup lets the plugins handler filter the registry to plugins
|
||||
// that declare support for the workspace's runtime, without taking a
|
||||
// direct DB dependency in the handler package.
|
||||
runtimeLookup := func(workspaceID string) (string, error) {
|
||||
var runtime string
|
||||
err := db.DB.QueryRowContext(
|
||||
context.Background(),
|
||||
`SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&runtime)
|
||||
return runtime, err
|
||||
}
|
||||
// Instance-id lookup powers the SaaS dispatch in install/uninstall:
|
||||
// when a workspace is on the EC2-per-workspace backend (instance_id
|
||||
// non-NULL) and there's no local Docker container to exec into, the
|
||||
// pipeline pushes the staged plugin tarball to that EC2 over EIC SSH.
|
||||
// Empty result means the workspace lives on the local-Docker backend
|
||||
// (or hasn't been provisioned yet) and the handler falls back to its
|
||||
// original Docker path. Same pattern templates.go and terminal.go use.
|
||||
instanceIDLookup := func(workspaceID string) (string, error) {
|
||||
var instanceID string
|
||||
err := db.DB.QueryRowContext(
|
||||
context.Background(),
|
||||
`SELECT COALESCE(instance_id, '') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&instanceID)
|
||||
return instanceID, err
|
||||
}
|
||||
// plgh constructs its own internal registry (github + local) inside
|
||||
// NewPluginsHandler. The pluginResolver param is the SHARED registry the
|
||||
// drift sweeper consumes (main.go); we don't graft it onto plgh because
|
||||
// plgh's WithSourceResolver expects a per-scheme SourceResolver, not a
|
||||
// PluginResolver/registry. Cross-wiring those types was the original
|
||||
// "*Registry doesn't implement SourceResolver" build break (core#228).
|
||||
// Use of pluginResolver here is intentionally read-side only.
|
||||
_ = pluginResolver
|
||||
plgh := handlers.NewPluginsHandler(pluginsDir, dockerCli, wh.RestartByID).
|
||||
WithRuntimeLookup(runtimeLookup).
|
||||
WithInstanceIDLookup(instanceIDLookup)
|
||||
r.GET("/plugins", plgh.ListRegistry)
|
||||
r.GET("/plugins/sources", plgh.ListSources)
|
||||
wsAuth.GET("/plugins", plgh.ListInstalled)
|
||||
wsAuth.GET("/plugins/available", plgh.ListAvailableForWorkspace)
|
||||
wsAuth.GET("/plugins/compatibility", plgh.CheckRuntimeCompatibility)
|
||||
wsAuth.POST("/plugins", plgh.Install)
|
||||
wsAuth.DELETE("/plugins/:name", plgh.Uninstall)
|
||||
// Phase 30.3 — stream plugin as tar.gz so remote agents can pull +
|
||||
// unpack locally instead of going through Docker exec.
|
||||
wsAuth.GET("/plugins/:name/download", plgh.Download)
|
||||
|
||||
// Admin — plugin version-subscription drift queue (core#123).
|
||||
// List pending drift entries and apply approved updates.
|
||||
{
|
||||
@ -537,11 +611,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAuth.GET("/github-installation-token", ghTokH.GetInstallationToken)
|
||||
}
|
||||
|
||||
// Terminal — shares Docker client with provisioner
|
||||
var dockerCli *client.Client
|
||||
if prov != nil {
|
||||
dockerCli = prov.DockerClient()
|
||||
}
|
||||
// Terminal — shares Docker client with provisioner (declared above).
|
||||
th := handlers.NewTerminalHandler(dockerCli)
|
||||
wsAuth.GET("/terminal", th.HandleConnect)
|
||||
wsAuth.GET("/terminal/diagnose", th.HandleDiagnose)
|
||||
@ -595,57 +665,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAuth.GET("/pending-uploads/:file_id/content", puh.GetContent)
|
||||
wsAuth.POST("/pending-uploads/:file_id/ack", puh.Ack)
|
||||
|
||||
// Plugins
|
||||
pluginsDir := findPluginsDir(configsDir)
|
||||
// Runtime lookup lets the plugins handler filter the registry to plugins
|
||||
// that declare support for the workspace's runtime, without taking a
|
||||
// direct DB dependency in the handler package.
|
||||
runtimeLookup := func(workspaceID string) (string, error) {
|
||||
var runtime string
|
||||
err := db.DB.QueryRowContext(
|
||||
context.Background(),
|
||||
`SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&runtime)
|
||||
return runtime, err
|
||||
}
|
||||
// Instance-id lookup powers the SaaS dispatch in install/uninstall:
|
||||
// when a workspace is on the EC2-per-workspace backend (instance_id
|
||||
// non-NULL) and there's no local Docker container to exec into, the
|
||||
// pipeline pushes the staged plugin tarball to that EC2 over EIC SSH.
|
||||
// Empty result means the workspace lives on the local-Docker backend
|
||||
// (or hasn't been provisioned yet) and the handler falls back to its
|
||||
// original Docker path. Same pattern templates.go and terminal.go use.
|
||||
instanceIDLookup := func(workspaceID string) (string, error) {
|
||||
var instanceID string
|
||||
err := db.DB.QueryRowContext(
|
||||
context.Background(),
|
||||
`SELECT COALESCE(instance_id, '') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&instanceID)
|
||||
return instanceID, err
|
||||
}
|
||||
// pluginResolver: when provided (normal production), use it for plgh so
|
||||
// the drift sweeper (which also gets the same resolver in main.go) uses
|
||||
// identical resolver state. When nil (test / backward compat), let
|
||||
// NewPluginsHandler create its own default registry.
|
||||
plgh := handlers.NewPluginsHandler(pluginsDir, dockerCli, wh.RestartByID).
|
||||
WithRuntimeLookup(runtimeLookup).
|
||||
WithInstanceIDLookup(instanceIDLookup)
|
||||
if pluginResolver != nil {
|
||||
plgh = plgh.WithSourceResolver(pluginResolver)
|
||||
}
|
||||
r.GET("/plugins", plgh.ListRegistry)
|
||||
r.GET("/plugins/sources", plgh.ListSources)
|
||||
wsAuth.GET("/plugins", plgh.ListInstalled)
|
||||
wsAuth.GET("/plugins/available", plgh.ListAvailableForWorkspace)
|
||||
wsAuth.GET("/plugins/compatibility", plgh.CheckRuntimeCompatibility)
|
||||
wsAuth.POST("/plugins", plgh.Install)
|
||||
wsAuth.DELETE("/plugins/:name", plgh.Uninstall)
|
||||
// Phase 30.3 — stream plugin as tar.gz so remote agents can pull +
|
||||
// unpack locally instead of going through Docker exec.
|
||||
wsAuth.GET("/plugins/:name/download", plgh.Download)
|
||||
|
||||
// Bundles — #164 + #165: both gated behind AdminAuth.
|
||||
// POST /bundles/import — CRITICAL: anon creation of arbitrary workspaces
|
||||
// with user-supplied config (system prompts,
|
||||
|
||||
@ -179,6 +179,23 @@ def parse(data: Any) -> Variant:
|
||||
)
|
||||
return Malformed(raw=data)
|
||||
|
||||
# Push-mode queue envelope — returned when a push-mode workspace
|
||||
# (one with a public URL) is at capacity. The platform queues the
|
||||
# request and returns {"queued": true, "message": "...", "queue_id": "..."}.
|
||||
# Unlike the poll-mode envelope (status=queued + delivery_mode=poll),
|
||||
# this shape has no delivery_mode key — it's distinguishable by
|
||||
# data.get("queued") is True alone. Checked before poll-mode so the
|
||||
# two cases are mutually exclusive even if a buggy server sends both.
|
||||
if data.get("queued") is True:
|
||||
method_raw = data.get(_KEY_METHOD)
|
||||
method = str(method_raw) if method_raw is not None else "message/send"
|
||||
logger.info(
|
||||
"a2a_response.parse: queued for busy push-mode peer (method=%s, queue_id=%s)",
|
||||
method,
|
||||
data.get("queue_id", "?"),
|
||||
)
|
||||
return Queued(method=method)
|
||||
|
||||
# Poll-queued envelope. Both keys must be present — the workspace
|
||||
# server sets them together; if only one is present the body is
|
||||
# ambiguous and we route to Malformed for visibility.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user