Mechanical migration of bare event-name strings in BroadcastOnly / RecordAndBroadcast call sites to the typed constants from internal/events/types.go (RFC #2945 PR-B). Wire format unchanged (both shapes serialize to identical WSMessage.Event literals); pinned by TestAllEventTypes_IsSnapshot in #2965. Migrated (18 files, scope: handlers/, scheduler/, registry/, bundle/, channels/): - handlers/{approvals,a2a_proxy_helpers,a2a_queue,activity,agent, delegation,external_rotate,org_import,registry,workspace, workspace_bootstrap,workspace_crud,workspace_provision_shared, workspace_restart}.go - channels/manager.go (caught by hostile-reviewer pass — initial scope missed channels/, found via grep on the post-migration tree) - scheduler/scheduler.go - registry/provisiontimeout.go - bundle/importer.go Hostile self-review (3 weakest spots, addressed) ------------------------------------------------ 1. Missed call sites — initial scope omitted channels/. Post-migration `grep -rEn 'BroadcastOnly\([^,]+,[^,]*"[A-Z_]+"|RecordAndBroadcast\([^,]+,[^,]*"[A-Z_]+"' internal/` found 2 stragglers in channels/manager.go. Migrated. Final grep on the same pattern returns only the docstring example in types.go (intentional). 2. gofmt drift — auto-import injection produced non-canonical import ordering. `gofmt -w` applied ONLY to the 18 modified files (NOT the whole tree, to avoid sweeping unrelated pre-existing drift into this PR's diff). Three pre-existing un-gofmt'd files in handlers/ (a2a_proxy.go, a2a_proxy_test.go, a2a_queue_test.go) left as-is — they're unchanged by this PR and their drift predates it. 3. Wire format — paranoia check: do the constants serialize to the exact strings consumers (canvas TS, hermes plugin, anything parsing WSMessage.Event) expect? Yes. Pinned by the snapshot test. The migration is name-only; not a single character of wire output changes. Verified - go build ./... clean - go vet ./internal/... clean - gofmt -l on the 5 migrated package dirs: only pre-existing files - Full tests: handlers/, channels/, scheduler/, registry/, events/, bundle/ all green (5 ok, 0 fail) PR-B-2 (canvas TS mirror + cross-language parity gate) remains as the final piece of RFC #2945 PR-B. Tracked separately so this PR stays mechanical + reviewable. Refs RFC #2945, PR #2965 (PR-B types). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
156 lines
4.8 KiB
Go
156 lines
4.8 KiB
Go
package bundle
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// ImportResult tracks the outcome of importing a bundle tree.
|
|
type ImportResult struct {
|
|
WorkspaceID string `json:"workspace_id"`
|
|
Name string `json:"name"`
|
|
Status string `json:"status"` // "provisioning" or "failed"
|
|
Error string `json:"error,omitempty"`
|
|
Children []ImportResult `json:"children,omitempty"`
|
|
}
|
|
|
|
// Import provisions a workspace tree from a Bundle.
|
|
// It creates workspace records, writes config files to a temp dir, and triggers the provisioner.
|
|
func Import(
|
|
ctx context.Context,
|
|
b *Bundle,
|
|
parentID *string,
|
|
broadcaster *events.Broadcaster,
|
|
prov *provisioner.Provisioner,
|
|
platformURL string,
|
|
) ImportResult {
|
|
// Generate fresh workspace ID
|
|
wsID := uuid.New().String()
|
|
|
|
result := ImportResult{
|
|
WorkspaceID: wsID,
|
|
Name: b.Name,
|
|
Status: "provisioning",
|
|
}
|
|
|
|
// Create workspace record
|
|
_, err := db.DB.ExecContext(ctx, `
|
|
INSERT INTO workspaces (id, name, role, tier, status, parent_id, source_bundle_id)
|
|
VALUES ($1, $2, $3, $4, 'provisioning', $5, $6)
|
|
`, wsID, b.Name, nilIfEmpty(b.Description), b.Tier, parentID, b.ID)
|
|
if err != nil {
|
|
result.Status = "failed"
|
|
result.Error = fmt.Sprintf("failed to create workspace record: %v", err)
|
|
return result
|
|
}
|
|
|
|
_ = broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), wsID, map[string]interface{}{
|
|
"name": b.Name,
|
|
"tier": b.Tier,
|
|
"source_bundle_id": b.ID,
|
|
})
|
|
|
|
// Build config files in memory for the provisioner
|
|
configFiles := buildBundleConfigFiles(b)
|
|
|
|
// Extract runtime from config.yaml in the bundle
|
|
bundleRuntime := "langgraph"
|
|
if configYaml, ok := b.Prompts["config.yaml"]; ok {
|
|
for _, line := range strings.Split(configYaml, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if strings.HasPrefix(line, "runtime:") {
|
|
bundleRuntime = strings.TrimSpace(strings.TrimPrefix(line, "runtime:"))
|
|
break
|
|
}
|
|
}
|
|
}
|
|
// Store runtime in DB
|
|
_, _ = db.DB.ExecContext(ctx, `UPDATE workspaces SET runtime = $1 WHERE id = $2`, bundleRuntime, wsID)
|
|
|
|
// Provision the container if provisioner is available
|
|
if prov != nil {
|
|
cfg := provisioner.WorkspaceConfig{
|
|
WorkspaceID: wsID,
|
|
ConfigFiles: configFiles,
|
|
Tier: b.Tier,
|
|
Runtime: bundleRuntime,
|
|
EnvVars: map[string]string{},
|
|
PlatformURL: platformURL,
|
|
// PluginsPath set by caller if available
|
|
}
|
|
go func() {
|
|
provCtx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
|
defer cancel()
|
|
url, err := prov.Start(provCtx, cfg)
|
|
if err != nil {
|
|
markFailed(provCtx, wsID, broadcaster, err)
|
|
} else if url != "" {
|
|
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Recursively import sub-workspaces
|
|
for _, sub := range b.SubWorkspaces {
|
|
childResult := Import(ctx, &sub, &wsID, broadcaster, prov, platformURL)
|
|
result.Children = append(result.Children, childResult)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// buildBundleConfigFiles builds a map of config files from a bundle for writing into a container volume.
|
|
func buildBundleConfigFiles(b *Bundle) map[string][]byte {
|
|
files := make(map[string][]byte)
|
|
|
|
// Write system-prompt.md
|
|
if b.SystemPrompt != "" {
|
|
files["system-prompt.md"] = []byte(b.SystemPrompt)
|
|
}
|
|
|
|
// Write config.yaml from prompts if present
|
|
if configYaml, ok := b.Prompts["config.yaml"]; ok {
|
|
files["config.yaml"] = []byte(configYaml)
|
|
}
|
|
|
|
// Write skills
|
|
for _, skill := range b.Skills {
|
|
for relPath, content := range skill.Files {
|
|
files[fmt.Sprintf("skills/%s/%s", skill.ID, relPath)] = []byte(content)
|
|
}
|
|
}
|
|
|
|
return files
|
|
}
|
|
|
|
func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaster, err error) {
|
|
// Set last_sample_error along with status so operators (and the
|
|
// Canvas E2E + GET /workspaces/:id callers) get a non-null reason
|
|
// in the row. Pre-2026-05-05 this UPDATE only set status, leaving
|
|
// last_sample_error NULL — Canvas E2E #2632 surfaced the gap with
|
|
// `Workspace failed: (no last_sample_error)`. Same UPDATE shape as
|
|
// markProvisionFailed in workspace-server/internal/handlers/
|
|
// workspace_provision_shared.go.
|
|
msg := err.Error()
|
|
db.DB.ExecContext(ctx,
|
|
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
|
|
models.StatusFailed, msg, wsID)
|
|
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
|
"error": msg,
|
|
})
|
|
}
|
|
|
|
func nilIfEmpty(s string) interface{} {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|