feat(workspace-server): GHCR digest watcher closes runtime CD chain (#2114)
Adds an opt-in goroutine that polls GHCR every 5 minutes for digest changes on each workspace-template-*:latest tag and invokes the same refresh logic /admin/workspace-images/refresh exposes. With this, the chain from "merge runtime PR" to "containers running new code" is fully hands-off — no operator step between auto-tag → publish-runtime → cascade → template image rebuild → host pull + recreate. Opt-in via IMAGE_AUTO_REFRESH=true. SaaS deploys whose pipeline already pulls every release should leave it off (would be redundant work); self-hosters get true zero-touch. Why a refactor of admin_workspace_images.go is in this PR: The HTTP handler held all the refresh logic inline. To share it with the new watcher without HTTP loopback, extracted WorkspaceImageService with a Refresh(ctx, runtimes, recreate) (RefreshResult, error) shape. HTTP handler is now a thin wrapper; behavior is preserved (same JSON response, same 500-on-list-failure, same per-runtime soft-fail). Watcher design notes: - Last-observed digest tracked in memory (not persisted). On boot the first observation per runtime is seed-only — no spurious refresh fires on every restart. - On Refresh error, the seen digest rolls back so the next tick retries. Without this rollback a transient Docker glitch would convince the watcher the work was done. - Per-runtime fetch errors don't block other runtimes (one template's brief 500 doesn't pause the others). - digestFetcher injection seam in tick() lets unit tests cover all bookkeeping branches without standing up an httptest GHCR server. Verified live: probed GHCR's /token + manifest HEAD against workspace-template-claude-code; got HTTP 200 + a real Docker-Content-Digest. Same calls the watcher makes. Co-authored-by: Hongming Wang <hongmingwangalt@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
168d6ec8d9
commit
9375e3d4ee
@ -183,6 +183,20 @@ needs Docker socket access (the compose stack mounts
|
|||||||
(`docker login ghcr.io` once per host). On a fresh host without GHCR auth,
|
(`docker login ghcr.io` once per host). On a fresh host without GHCR auth,
|
||||||
the pull step warns per runtime and the response surfaces the failures.
|
the pull step warns per runtime and the response surfaces the failures.
|
||||||
|
|
||||||
|
**Fully hands-off (opt-in image auto-refresh):**
|
||||||
|
|
||||||
|
Set `IMAGE_AUTO_REFRESH=true` on the platform process. A watcher polls
|
||||||
|
GHCR every 5 minutes for digest changes on each `workspace-template-*:latest`
|
||||||
|
tag and invokes the same refresh logic the admin endpoint exposes —
|
||||||
|
no operator action required between "runtime PR merged" and
|
||||||
|
"containers running new code". Disabled by default because SaaS deploy
|
||||||
|
pipelines that already pull on every release would do redundant work.
|
||||||
|
|
||||||
|
Optional companion env (same as the admin endpoint):
|
||||||
|
|
||||||
|
- `GHCR_USER` + `GHCR_TOKEN` — required for private template images;
|
||||||
|
unused for the current public set, but harmless if set.
|
||||||
|
|
||||||
## Local dev (build the package without publishing)
|
## Local dev (build the package without publishing)
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -16,6 +17,7 @@ import (
|
|||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/imagewatch"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
|
||||||
@ -265,6 +267,18 @@ func main() {
|
|||||||
channelMgr := channels.NewManager(wh, broadcaster)
|
channelMgr := channels.NewManager(wh, broadcaster)
|
||||||
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
||||||
|
|
||||||
|
// Image auto-refresh — closes the runtime CD chain to "merge → containers
|
||||||
|
// running new code" with no human in between. Polls GHCR for digest
|
||||||
|
// changes on workspace-template-* :latest tags and invokes the same
|
||||||
|
// refresh logic /admin/workspace-images/refresh exposes. Opt-in:
|
||||||
|
// SaaS deploys whose pipeline already pulls every release should leave
|
||||||
|
// it off (would be redundant work). Self-hosters get true zero-touch.
|
||||||
|
if prov != nil && strings.EqualFold(os.Getenv("IMAGE_AUTO_REFRESH"), "true") {
|
||||||
|
svc := handlers.NewWorkspaceImageService(prov.DockerClient())
|
||||||
|
watcher := imagewatch.New(svc)
|
||||||
|
go supervised.RunWithRecover(ctx, "image-auto-refresh", watcher.Run)
|
||||||
|
}
|
||||||
|
|
||||||
// Wire channel manager into scheduler for auto-posting cron output to Slack
|
// Wire channel manager into scheduler for auto-posting cron output to Slack
|
||||||
cronSched.SetChannels(channelMgr)
|
cronSched.SetChannels(channelMgr)
|
||||||
|
|
||||||
|
|||||||
@ -21,54 +21,53 @@ import (
|
|||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AdminWorkspaceImagesHandler serves POST /admin/workspace-images/refresh — the
|
// WorkspaceImageService is the production-side end of the runtime CD chain.
|
||||||
// production-side end of the runtime CD chain. Operators (or post-publish
|
// It (1) pulls workspace template images from GHCR via the Docker SDK and
|
||||||
// automation) hit this to (1) pull the latest workspace template images from
|
// (2) recreates running ws-* containers so they adopt the new image.
|
||||||
// GHCR via the Docker SDK and (2) recreate any running ws-* containers so
|
|
||||||
// they adopt the new image. Without this, a freshly-published runtime sits
|
|
||||||
// in the registry but containers keep running the old image until the next
|
|
||||||
// manual restart.
|
|
||||||
//
|
//
|
||||||
// On a SaaS deployment the deploy pipeline already pulls on every release,
|
// Two callers:
|
||||||
// so the pull step is a no-op there; the recreate step is still the way to
|
// - AdminWorkspaceImagesHandler — POST /admin/workspace-images/refresh, the
|
||||||
// make running workspaces adopt the new image without a full host restart.
|
// manual end-of-chain trigger documented in
|
||||||
//
|
// docs/workspace-runtime-package.md.
|
||||||
// POST /admin/workspace-images/refresh
|
// - imagewatch.Watcher — the auto-refresh goroutine that polls GHCR
|
||||||
//
|
// digests and invokes Refresh when an image changes upstream. This is
|
||||||
// ?runtime=claude-code (optional; default = all 8 templates)
|
// what closes the chain to "merge → containers running new code" with
|
||||||
// &recreate=true|false (default true; false = pull only)
|
// no human in between.
|
||||||
//
|
type WorkspaceImageService struct {
|
||||||
// Returns JSON {pulled: [...], failed: [...], recreated: [...]}
|
|
||||||
type AdminWorkspaceImagesHandler struct {
|
|
||||||
docker *dockerclient.Client
|
docker *dockerclient.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAdminWorkspaceImagesHandler(docker *dockerclient.Client) *AdminWorkspaceImagesHandler {
|
func NewWorkspaceImageService(docker *dockerclient.Client) *WorkspaceImageService {
|
||||||
return &AdminWorkspaceImagesHandler{docker: docker}
|
return &WorkspaceImageService{docker: docker}
|
||||||
}
|
}
|
||||||
|
|
||||||
// allRuntimes is the canonical list mirroring docs/workspace-runtime-package.md.
|
// AllRuntimes is the canonical list mirroring docs/workspace-runtime-package.md.
|
||||||
// Update both when a new template is added.
|
// Update both when a new template is added.
|
||||||
var allRuntimes = []string{
|
var AllRuntimes = []string{
|
||||||
"claude-code", "langgraph", "crewai", "autogen",
|
"claude-code", "langgraph", "crewai", "autogen",
|
||||||
"deepagents", "hermes", "gemini-cli", "openclaw",
|
"deepagents", "hermes", "gemini-cli", "openclaw",
|
||||||
}
|
}
|
||||||
|
|
||||||
type refreshResult struct {
|
// RefreshResult is the per-call outcome surfaced to HTTP callers AND logged
|
||||||
|
// by the auto-refresh watcher.
|
||||||
|
type RefreshResult struct {
|
||||||
Pulled []string `json:"pulled"`
|
Pulled []string `json:"pulled"`
|
||||||
Failed []string `json:"failed"`
|
Failed []string `json:"failed"`
|
||||||
Recreated []string `json:"recreated"`
|
Recreated []string `json:"recreated"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TemplateImageRef returns the canonical GHCR ref for a runtime's template
|
||||||
|
// image. Single source of truth shared with imagewatch.
|
||||||
|
func TemplateImageRef(runtime string) string {
|
||||||
|
return fmt.Sprintf("ghcr.io/molecule-ai/workspace-template-%s:latest", runtime)
|
||||||
|
}
|
||||||
|
|
||||||
// ghcrAuthHeader returns the base64-encoded JSON auth payload Docker's
|
// ghcrAuthHeader returns the base64-encoded JSON auth payload Docker's
|
||||||
// ImagePull expects in PullOptions.RegistryAuth, or empty string when no
|
// ImagePull expects in PullOptions.RegistryAuth, or empty string when no
|
||||||
// GHCR_USER/GHCR_TOKEN env is set (lets public images pull through).
|
// GHCR_USER/GHCR_TOKEN env is set (lets public images pull through).
|
||||||
//
|
//
|
||||||
// The Docker SDK doesn't read ~/.docker/config.json — every authenticated
|
// The Docker SDK doesn't read ~/.docker/config.json — every authenticated
|
||||||
// pull needs an explicit RegistryAuth string. Format per the Docker
|
// pull needs an explicit RegistryAuth string.
|
||||||
// engine API: {"username":"…","password":"…","serveraddress":"ghcr.io"}
|
|
||||||
// → base64-encoded JSON with no trailing padding stripped (engine handles
|
|
||||||
// either form).
|
|
||||||
func ghcrAuthHeader() string {
|
func ghcrAuthHeader() string {
|
||||||
user := strings.TrimSpace(os.Getenv("GHCR_USER"))
|
user := strings.TrimSpace(os.Getenv("GHCR_USER"))
|
||||||
token := strings.TrimSpace(os.Getenv("GHCR_TOKEN"))
|
token := strings.TrimSpace(os.Getenv("GHCR_TOKEN"))
|
||||||
@ -82,63 +81,40 @@ func ghcrAuthHeader() string {
|
|||||||
}
|
}
|
||||||
js, err := json.Marshal(payload)
|
js, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Should be unreachable for a static map[string]string. Log so a
|
|
||||||
// future contributor adding a non-marshallable field notices.
|
|
||||||
log.Printf("workspace-images: failed to marshal GHCR auth: %v", err)
|
log.Printf("workspace-images: failed to marshal GHCR auth: %v", err)
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
return base64.URLEncoding.EncodeToString(js)
|
return base64.URLEncoding.EncodeToString(js)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
// Refresh pulls the requested runtimes' template images from GHCR and (if
|
||||||
runtimes := allRuntimes
|
// recreate) force-removes any matching ws-* containers so the platform
|
||||||
if r := c.Query("runtime"); r != "" {
|
// re-provisions them on next interaction.
|
||||||
// Accept a single runtime; reject anything not in the canonical list
|
//
|
||||||
// so a typo doesn't silently no-op.
|
// Soft-fails per runtime: one missing image (e.g. unpublished template)
|
||||||
found := false
|
// doesn't abort the others. Per-runtime failures are in RefreshResult.Failed.
|
||||||
for _, known := range allRuntimes {
|
// Returns a non-nil error only when the recreate phase couldn't enumerate
|
||||||
if known == r {
|
// containers at all (caller should surface that as 500).
|
||||||
found = true
|
func (s *WorkspaceImageService) Refresh(ctx context.Context, runtimes []string, recreate bool) (RefreshResult, error) {
|
||||||
break
|
res := RefreshResult{Pulled: []string{}, Failed: []string{}, Recreated: []string{}}
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
c.JSON(http.StatusBadRequest, gin.H{
|
|
||||||
"error": fmt.Sprintf("unknown runtime: %s", r),
|
|
||||||
"known_runtimes": allRuntimes,
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
runtimes = []string{r}
|
|
||||||
}
|
|
||||||
recreate := c.DefaultQuery("recreate", "true") == "true"
|
|
||||||
|
|
||||||
res := refreshResult{Pulled: []string{}, Failed: []string{}, Recreated: []string{}}
|
|
||||||
auth := ghcrAuthHeader()
|
auth := ghcrAuthHeader()
|
||||||
|
|
||||||
// 1. Pull each template image via the Docker SDK. Soft-fail per-runtime
|
pullCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||||
// so one missing image (e.g. unpublished template) doesn't abort
|
|
||||||
// the others. Each pull's progress stream is drained to completion
|
|
||||||
// — the engine treats early-close as "abandon", leaving partial
|
|
||||||
// layers around with no reference.
|
|
||||||
pullCtx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Minute)
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
for _, rt := range runtimes {
|
for _, rt := range runtimes {
|
||||||
image := fmt.Sprintf("ghcr.io/molecule-ai/workspace-template-%s:latest", rt)
|
image := TemplateImageRef(rt)
|
||||||
opts := dockerimage.PullOptions{Platform: provisioner.DefaultImagePlatform()}
|
opts := dockerimage.PullOptions{Platform: provisioner.DefaultImagePlatform()}
|
||||||
if auth != "" {
|
if auth != "" {
|
||||||
opts.RegistryAuth = auth
|
opts.RegistryAuth = auth
|
||||||
}
|
}
|
||||||
rc, err := h.docker.ImagePull(pullCtx, image, opts)
|
rc, err := s.docker.ImagePull(pullCtx, image, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("workspace-images/refresh: pull %s failed: %v", rt, err)
|
log.Printf("workspace-images/refresh: pull %s failed: %v", rt, err)
|
||||||
res.Failed = append(res.Failed, rt)
|
res.Failed = append(res.Failed, rt)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Drain to completion. We discard progress payload because no
|
// Drain to completion. The engine treats early-close as "abandon",
|
||||||
// caller renders it; the platform log already records pulled/failed
|
// leaving partial layers around with no reference.
|
||||||
// per runtime. If a future caller wants live progress, decode the
|
|
||||||
// JSON-line stream into events here.
|
|
||||||
if _, err := io.Copy(io.Discard, rc); err != nil {
|
if _, err := io.Copy(io.Discard, rc); err != nil {
|
||||||
rc.Close()
|
rc.Close()
|
||||||
log.Printf("workspace-images/refresh: drain %s failed: %v", rt, err)
|
log.Printf("workspace-images/refresh: drain %s failed: %v", rt, err)
|
||||||
@ -150,23 +126,18 @@ func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !recreate {
|
if !recreate {
|
||||||
c.JSON(http.StatusOK, res)
|
return res, nil
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Find ws-* containers running an image we just pulled. Recreate
|
listCtx, listCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
// them — kill+remove and let the platform's normal provisioning
|
|
||||||
// flow re-create on next canvas interaction.
|
|
||||||
listCtx, listCancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
|
|
||||||
defer listCancel()
|
defer listCancel()
|
||||||
containers, err := h.docker.ContainerList(listCtx, container.ListOptions{
|
containers, err := s.docker.ContainerList(listCtx, container.ListOptions{
|
||||||
All: true,
|
All: true,
|
||||||
Filters: filters.NewArgs(filters.Arg("name", "ws-")),
|
Filters: filters.NewArgs(filters.Arg("name", "ws-")),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("workspace-images/refresh: container list failed: %v", err)
|
log.Printf("workspace-images/refresh: container list failed: %v", err)
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "container list failed", "partial_result": res})
|
return res, fmt.Errorf("container list: %w", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pulledSet := map[string]struct{}{}
|
pulledSet := map[string]struct{}{}
|
||||||
@ -175,14 +146,10 @@ func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
for _, ctr := range containers {
|
for _, ctr := range containers {
|
||||||
// ContainerList's ctr.Image is the *resolved digest* (sha256:…),
|
// ContainerList's ctr.Image is the *resolved digest* (sha256:…),
|
||||||
// not the human-readable tag. Use ContainerInspect to get the
|
// not the human-readable tag. Inspect to get Config.Image so we
|
||||||
// original Config.Image (e.g. "ghcr.io/molecule-ai/workspace-
|
// can match against the pulled-runtime set.
|
||||||
// template-claude-code:latest") so we can match against the
|
inspectCtx, inspectCancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
// pulled-runtime set. The cost is one extra round-trip per
|
full, err := s.docker.ContainerInspect(inspectCtx, ctr.ID)
|
||||||
// ws-* container — there are at most 8 typically, so this is
|
|
||||||
// well below any UX threshold.
|
|
||||||
inspectCtx, inspectCancel := context.WithTimeout(c.Request.Context(), 10*time.Second)
|
|
||||||
full, err := h.docker.ContainerInspect(inspectCtx, ctr.ID)
|
|
||||||
inspectCancel()
|
inspectCancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("workspace-images/refresh: inspect %s failed: %v", ctr.ID[:12], err)
|
log.Printf("workspace-images/refresh: inspect %s failed: %v", ctr.ID[:12], err)
|
||||||
@ -203,12 +170,8 @@ func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
name := strings.TrimPrefix(ctr.Names[0], "/")
|
name := strings.TrimPrefix(ctr.Names[0], "/")
|
||||||
// Remove with force — the workspace will re-provision on the next
|
rmCtx, rmCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
// canvas interaction. This drops in-flight conversations on the
|
err = s.docker.ContainerRemove(rmCtx, ctr.ID, container.RemoveOptions{Force: true})
|
||||||
// removed container; document via the response so callers can
|
|
||||||
// schedule the refresh during a quiet window.
|
|
||||||
rmCtx, rmCancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
|
|
||||||
err = h.docker.ContainerRemove(rmCtx, ctr.ID, container.RemoveOptions{Force: true})
|
|
||||||
rmCancel()
|
rmCancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("workspace-images/refresh: remove %s failed: %v", name, err)
|
log.Printf("workspace-images/refresh: remove %s failed: %v", name, err)
|
||||||
@ -216,12 +179,60 @@ func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
res.Recreated = append(res.Recreated, name)
|
res.Recreated = append(res.Recreated, name)
|
||||||
}
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AdminWorkspaceImagesHandler serves POST /admin/workspace-images/refresh.
|
||||||
|
//
|
||||||
|
// ?runtime=claude-code (optional; default = all 8 templates)
|
||||||
|
// &recreate=true|false (default true; false = pull only)
|
||||||
|
//
|
||||||
|
// Returns JSON {pulled: [...], failed: [...], recreated: [...]}
|
||||||
|
type AdminWorkspaceImagesHandler struct {
|
||||||
|
svc *WorkspaceImageService
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAdminWorkspaceImagesHandler(docker *dockerclient.Client) *AdminWorkspaceImagesHandler {
|
||||||
|
return &AdminWorkspaceImagesHandler{svc: NewWorkspaceImageService(docker)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service exposes the underlying refresh logic so the auto-refresh watcher
|
||||||
|
// in cmd/server can share the exact code path the HTTP handler uses.
|
||||||
|
func (h *AdminWorkspaceImagesHandler) Service() *WorkspaceImageService {
|
||||||
|
return h.svc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
||||||
|
runtimes := AllRuntimes
|
||||||
|
if r := c.Query("runtime"); r != "" {
|
||||||
|
found := false
|
||||||
|
for _, known := range AllRuntimes {
|
||||||
|
if known == r {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{
|
||||||
|
"error": fmt.Sprintf("unknown runtime: %s", r),
|
||||||
|
"known_runtimes": AllRuntimes,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
runtimes = []string{r}
|
||||||
|
}
|
||||||
|
recreate := c.DefaultQuery("recreate", "true") == "true"
|
||||||
|
|
||||||
|
res, err := h.svc.Refresh(c.Request.Context(), runtimes, recreate)
|
||||||
authStatus := "no GHCR auth (public images only)"
|
authStatus := "no GHCR auth (public images only)"
|
||||||
if auth != "" {
|
if ghcrAuthHeader() != "" {
|
||||||
authStatus = "GHCR_USER/GHCR_TOKEN auth"
|
authStatus = "GHCR_USER/GHCR_TOKEN auth"
|
||||||
}
|
}
|
||||||
log.Printf("workspace-images/refresh: pulled=%d failed=%d recreated=%d (%s)",
|
log.Printf("workspace-images/refresh: pulled=%d failed=%d recreated=%d (%s)",
|
||||||
len(res.Pulled), len(res.Failed), len(res.Recreated), authStatus)
|
len(res.Pulled), len(res.Failed), len(res.Recreated), authStatus)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error(), "partial_result": res})
|
||||||
|
return
|
||||||
|
}
|
||||||
c.JSON(http.StatusOK, res)
|
c.JSON(http.StatusOK, res)
|
||||||
}
|
}
|
||||||
|
|||||||
221
workspace-server/internal/imagewatch/watch.go
Normal file
221
workspace-server/internal/imagewatch/watch.go
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
// Package imagewatch closes the last manual step of the runtime CD chain
|
||||||
|
// (see docs/workspace-runtime-package.md): polling GHCR for digest changes
|
||||||
|
// on the workspace-template-* :latest tags and invoking the existing
|
||||||
|
// workspace-image refresh logic when one moves.
|
||||||
|
//
|
||||||
|
// Without this, an operator has to either SSH and run
|
||||||
|
// scripts/refresh-workspace-images.sh OR curl
|
||||||
|
// /admin/workspace-images/refresh after every runtime release. With it,
|
||||||
|
// the platform self-heals to the latest published runtime within one
|
||||||
|
// polling interval — fully hands-off from "merge PR" to "containers
|
||||||
|
// running new code".
|
||||||
|
//
|
||||||
|
// Opt-in via IMAGE_AUTO_REFRESH=true. SaaS deployments whose deploy
|
||||||
|
// pipeline pulls on every release should leave it disabled (would be
|
||||||
|
// redundant work).
|
||||||
|
package imagewatch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DefaultInterval is the polling cadence. Runtime publishes happen at most
|
||||||
|
// a handful of times per day; a 5-minute lag between PyPI publish + image
|
||||||
|
// rebuild and the platform pulling is well within the implicit SLA. Going
|
||||||
|
// shorter wastes GHCR rate budget for no real win.
|
||||||
|
const DefaultInterval = 5 * time.Minute
|
||||||
|
|
||||||
|
// Refresher is the subset of *handlers.WorkspaceImageService the watcher
|
||||||
|
// needs. Defined here so the watcher can be tested with a fake.
|
||||||
|
type Refresher interface {
|
||||||
|
Refresh(ctx context.Context, runtimes []string, recreate bool) (handlers.RefreshResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watcher polls GHCR for digest changes and invokes Refresher when one
|
||||||
|
// moves. Tracks last-observed remote digest per runtime in memory; on a
|
||||||
|
// fresh boot, the first observation per runtime seeds the tracker without
|
||||||
|
// triggering a refresh (containers stay on whatever image they have until
|
||||||
|
// the NEXT upstream change moves the digest).
|
||||||
|
type Watcher struct {
|
||||||
|
svc Refresher
|
||||||
|
runtimes []string
|
||||||
|
interval time.Duration
|
||||||
|
http *http.Client
|
||||||
|
seen map[string]string // runtime → last-observed remote digest
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a watcher configured with the canonical runtimes list. Pass
|
||||||
|
// the WorkspaceImageService from the handlers package as svc.
|
||||||
|
func New(svc Refresher) *Watcher {
|
||||||
|
return &Watcher{
|
||||||
|
svc: svc,
|
||||||
|
runtimes: append([]string(nil), handlers.AllRuntimes...),
|
||||||
|
interval: DefaultInterval,
|
||||||
|
http: &http.Client{Timeout: 10 * time.Second},
|
||||||
|
seen: make(map[string]string, len(handlers.AllRuntimes)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// digestFetcher returns the current upstream digest for a given runtime.
|
||||||
|
// Pulled out of tick() so tests can substitute a deterministic fake
|
||||||
|
// without standing up an httptest server for the GHCR API.
|
||||||
|
type digestFetcher func(ctx context.Context, runtime string) (string, error)
|
||||||
|
|
||||||
|
// Run blocks until ctx is cancelled, polling once per interval.
|
||||||
|
func (w *Watcher) Run(ctx context.Context) {
|
||||||
|
log.Printf("image-auto-refresh: started (interval=%s, runtimes=%d)", w.interval, len(w.runtimes))
|
||||||
|
tick := time.NewTicker(w.interval)
|
||||||
|
defer tick.Stop()
|
||||||
|
// Run one tick immediately so digests get seeded without waiting a full
|
||||||
|
// interval. The first tick is seed-only: no refresh fires.
|
||||||
|
w.tick(ctx, w.remoteDigest)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Printf("image-auto-refresh: stopping (%v)", ctx.Err())
|
||||||
|
return
|
||||||
|
case <-tick.C:
|
||||||
|
w.tick(ctx, w.remoteDigest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) tick(ctx context.Context, fetch digestFetcher) {
|
||||||
|
for _, rt := range w.runtimes {
|
||||||
|
remote, err := fetch(ctx, rt)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("image-auto-refresh: %s digest fetch failed: %v", rt, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
prev, hadPrev := w.seen[rt]
|
||||||
|
w.seen[rt] = remote
|
||||||
|
if !hadPrev {
|
||||||
|
// Seed-only — don't refresh on first observation. Server may
|
||||||
|
// have just booted; the local image either matches this digest
|
||||||
|
// or operator can manually refresh once at deploy time.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if prev == remote {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("image-auto-refresh: %s digest moved %s → %s, refreshing",
|
||||||
|
rt, shortDigest(prev), shortDigest(remote))
|
||||||
|
res, err := w.svc.Refresh(ctx, []string{rt}, true)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("image-auto-refresh: %s refresh failed: %v (pulled=%v recreated=%v)",
|
||||||
|
rt, err, res.Pulled, res.Recreated)
|
||||||
|
// Roll back the seen-digest so the next tick retries — without
|
||||||
|
// this, a transient Docker error during recreate would leave
|
||||||
|
// the watcher convinced the work was done.
|
||||||
|
w.seen[rt] = prev
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("image-auto-refresh: %s pulled=%v recreated=%v failed=%v",
|
||||||
|
rt, res.Pulled, res.Recreated, res.Failed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// remoteDigest queries GHCR for the current manifest digest of the
|
||||||
|
// workspace-template-<runtime>:latest image. Uses the Docker Registry V2
|
||||||
|
// HTTP API: get a bearer token, then HEAD the manifest.
|
||||||
|
//
|
||||||
|
// Auth: if GHCR_USER+GHCR_TOKEN are set, basic-auth the token request
|
||||||
|
// (works for both public and private images). If unset, anonymous token
|
||||||
|
// (works for public images only — every workspace template is public).
|
||||||
|
func (w *Watcher) remoteDigest(ctx context.Context, runtime string) (string, error) {
|
||||||
|
repo := "molecule-ai/workspace-template-" + runtime
|
||||||
|
tok, err := w.fetchPullToken(ctx, repo)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("pull token: %w", err)
|
||||||
|
}
|
||||||
|
manifestURL := fmt.Sprintf("https://ghcr.io/v2/%s/manifests/latest", repo)
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "HEAD", manifestURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
req.Header.Set("Authorization", "Bearer "+tok)
|
||||||
|
// Accept manifest + index media types so GHCR returns the digest of
|
||||||
|
// whatever the :latest tag points at without doing a content-negotiation
|
||||||
|
// rewrite that would change the digest server-side.
|
||||||
|
req.Header.Set("Accept", strings.Join([]string{
|
||||||
|
"application/vnd.oci.image.index.v1+json",
|
||||||
|
"application/vnd.oci.image.manifest.v1+json",
|
||||||
|
"application/vnd.docker.distribution.manifest.list.v2+json",
|
||||||
|
"application/vnd.docker.distribution.manifest.v2+json",
|
||||||
|
}, ","))
|
||||||
|
resp, err := w.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
io.Copy(io.Discard, resp.Body)
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return "", fmt.Errorf("HEAD %s → %d", manifestURL, resp.StatusCode)
|
||||||
|
}
|
||||||
|
digest := resp.Header.Get("Docker-Content-Digest")
|
||||||
|
if digest == "" {
|
||||||
|
return "", fmt.Errorf("no Docker-Content-Digest in %s response", manifestURL)
|
||||||
|
}
|
||||||
|
return digest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchPullToken negotiates a short-lived bearer token from GHCR's token
|
||||||
|
// endpoint scoped to repo:pull. GHCR requires a token even for anonymous
|
||||||
|
// pulls of public images.
|
||||||
|
func (w *Watcher) fetchPullToken(ctx context.Context, repo string) (string, error) {
|
||||||
|
q := url.Values{}
|
||||||
|
q.Set("service", "ghcr.io")
|
||||||
|
q.Set("scope", "repository:"+repo+":pull")
|
||||||
|
tokURL := "https://ghcr.io/token?" + q.Encode()
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", tokURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if user, tok := strings.TrimSpace(os.Getenv("GHCR_USER")), strings.TrimSpace(os.Getenv("GHCR_TOKEN")); user != "" && tok != "" {
|
||||||
|
auth := base64.StdEncoding.EncodeToString([]byte(user + ":" + tok))
|
||||||
|
req.Header.Set("Authorization", "Basic "+auth)
|
||||||
|
}
|
||||||
|
resp, err := w.http.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return "", fmt.Errorf("token endpoint → %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
var body struct {
|
||||||
|
Token string `json:"token"`
|
||||||
|
AccessToken string `json:"access_token"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if body.Token != "" {
|
||||||
|
return body.Token, nil
|
||||||
|
}
|
||||||
|
if body.AccessToken != "" {
|
||||||
|
return body.AccessToken, nil
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("token endpoint returned empty token")
|
||||||
|
}
|
||||||
|
|
||||||
|
func shortDigest(d string) string {
|
||||||
|
// Digests look like "sha256:abc123..." — show enough to be diff-readable
|
||||||
|
// in logs without filling the line.
|
||||||
|
if i := strings.IndexByte(d, ':'); i >= 0 && len(d) >= i+13 {
|
||||||
|
return d[:i+13]
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
176
workspace-server/internal/imagewatch/watch_test.go
Normal file
176
workspace-server/internal/imagewatch/watch_test.go
Normal file
@ -0,0 +1,176 @@
|
|||||||
|
package imagewatch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fakeRefresher records every Refresh call and lets tests inject errors.
|
||||||
|
type fakeRefresher struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
calls [][]string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeRefresher) Refresh(_ context.Context, runtimes []string, _ bool) (handlers.RefreshResult, error) {
|
||||||
|
f.mu.Lock()
|
||||||
|
defer f.mu.Unlock()
|
||||||
|
f.calls = append(f.calls, append([]string(nil), runtimes...))
|
||||||
|
if f.err != nil {
|
||||||
|
return handlers.RefreshResult{}, f.err
|
||||||
|
}
|
||||||
|
return handlers.RefreshResult{Pulled: runtimes}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeRefresher) callCount() int {
|
||||||
|
f.mu.Lock()
|
||||||
|
defer f.mu.Unlock()
|
||||||
|
return len(f.calls)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestWatcher(svc Refresher, runtimes ...string) *Watcher {
|
||||||
|
return &Watcher{
|
||||||
|
svc: svc,
|
||||||
|
runtimes: runtimes,
|
||||||
|
seen: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// staticFetcher returns a fixed digest for every call. mutableFetcher lets
|
||||||
|
// tests change the returned digest between ticks.
|
||||||
|
func staticFetcher(digest string) digestFetcher {
|
||||||
|
return func(_ context.Context, _ string) (string, error) {
|
||||||
|
return digest, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTick_FirstObservationSeedsWithoutRefresh(t *testing.T) {
|
||||||
|
svc := &fakeRefresher{}
|
||||||
|
w := newTestWatcher(svc, "claude-code")
|
||||||
|
|
||||||
|
w.tick(context.Background(), staticFetcher("sha256:aaaa"))
|
||||||
|
|
||||||
|
if svc.callCount() != 0 {
|
||||||
|
t.Errorf("first tick must seed only, got %d Refresh calls", svc.callCount())
|
||||||
|
}
|
||||||
|
if w.seen["claude-code"] != "sha256:aaaa" {
|
||||||
|
t.Errorf("seen digest not recorded: got %q", w.seen["claude-code"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTick_NoRefreshWhenDigestUnchanged(t *testing.T) {
|
||||||
|
svc := &fakeRefresher{}
|
||||||
|
w := newTestWatcher(svc, "claude-code")
|
||||||
|
|
||||||
|
fetch := staticFetcher("sha256:steady")
|
||||||
|
w.tick(context.Background(), fetch) // seed
|
||||||
|
w.tick(context.Background(), fetch) // unchanged
|
||||||
|
w.tick(context.Background(), fetch) // unchanged
|
||||||
|
|
||||||
|
if svc.callCount() != 0 {
|
||||||
|
t.Errorf("steady-state ticks must not refresh, got %d calls", svc.callCount())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTick_RefreshFiresWhenDigestChanges(t *testing.T) {
|
||||||
|
svc := &fakeRefresher{}
|
||||||
|
w := newTestWatcher(svc, "claude-code", "hermes")
|
||||||
|
|
||||||
|
w.tick(context.Background(), staticFetcher("sha256:v1")) // seed both
|
||||||
|
if svc.callCount() != 0 {
|
||||||
|
t.Fatalf("seed tick should not refresh; got %d", svc.callCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only claude-code's digest moves. hermes stays.
|
||||||
|
moveOne := func(_ context.Context, rt string) (string, error) {
|
||||||
|
if rt == "claude-code" {
|
||||||
|
return "sha256:v2", nil
|
||||||
|
}
|
||||||
|
return "sha256:v1", nil
|
||||||
|
}
|
||||||
|
w.tick(context.Background(), moveOne)
|
||||||
|
|
||||||
|
if svc.callCount() != 1 {
|
||||||
|
t.Fatalf("expected exactly 1 Refresh call (only claude-code moved), got %d", svc.callCount())
|
||||||
|
}
|
||||||
|
if got := svc.calls[0]; len(got) != 1 || got[0] != "claude-code" {
|
||||||
|
t.Errorf("Refresh called with wrong runtime: got %v, want [claude-code]", got)
|
||||||
|
}
|
||||||
|
if w.seen["claude-code"] != "sha256:v2" {
|
||||||
|
t.Errorf("post-refresh seen digest should advance: got %q", w.seen["claude-code"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTick_RollsBackSeenDigestOnRefreshError(t *testing.T) {
|
||||||
|
// Critical safety property: a transient Docker glitch during Refresh
|
||||||
|
// must not convince the watcher the work is done. Next tick should
|
||||||
|
// retry against the same upstream digest.
|
||||||
|
svc := &fakeRefresher{err: errors.New("docker daemon unreachable")}
|
||||||
|
w := newTestWatcher(svc, "claude-code")
|
||||||
|
|
||||||
|
w.tick(context.Background(), staticFetcher("sha256:old")) // seed
|
||||||
|
w.tick(context.Background(), staticFetcher("sha256:new")) // change → fails
|
||||||
|
|
||||||
|
if got := w.seen["claude-code"]; got != "sha256:old" {
|
||||||
|
t.Errorf("after Refresh error, seen must roll back to %q (so next tick retries), got %q", "sha256:old", got)
|
||||||
|
}
|
||||||
|
if svc.callCount() != 1 {
|
||||||
|
t.Fatalf("expected 1 Refresh attempt (the failed one), got %d", svc.callCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recovery: clear the error, run again with same upstream digest.
|
||||||
|
// Watcher should retry because seen was rolled back.
|
||||||
|
svc.err = nil
|
||||||
|
w.tick(context.Background(), staticFetcher("sha256:new"))
|
||||||
|
if svc.callCount() != 2 {
|
||||||
|
t.Errorf("after rollback, next tick should retry refresh; got %d total calls", svc.callCount())
|
||||||
|
}
|
||||||
|
if got := w.seen["claude-code"]; got != "sha256:new" {
|
||||||
|
t.Errorf("after successful retry, seen should advance: got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTick_DigestFetchErrorSkipsRuntime(t *testing.T) {
|
||||||
|
// One runtime's GHCR call failing must not block other runtimes from
|
||||||
|
// being checked (e.g. one template repo briefly 500s).
|
||||||
|
svc := &fakeRefresher{}
|
||||||
|
w := newTestWatcher(svc, "claude-code", "hermes")
|
||||||
|
w.seen["claude-code"] = "sha256:old"
|
||||||
|
w.seen["hermes"] = "sha256:old"
|
||||||
|
|
||||||
|
flaky := func(_ context.Context, rt string) (string, error) {
|
||||||
|
if rt == "claude-code" {
|
||||||
|
return "", errors.New("registry hiccup")
|
||||||
|
}
|
||||||
|
return "sha256:new", nil
|
||||||
|
}
|
||||||
|
w.tick(context.Background(), flaky)
|
||||||
|
|
||||||
|
// hermes moved → 1 refresh fired.
|
||||||
|
if svc.callCount() != 1 || svc.calls[0][0] != "hermes" {
|
||||||
|
t.Errorf("expected hermes-only refresh after claude-code fetch error, got calls=%v", svc.calls)
|
||||||
|
}
|
||||||
|
// claude-code's seen digest must not be touched (no remote observed).
|
||||||
|
if got := w.seen["claude-code"]; got != "sha256:old" {
|
||||||
|
t.Errorf("fetch error must leave seen digest untouched, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestShortDigest(t *testing.T) {
|
||||||
|
cases := map[string]string{
|
||||||
|
"sha256:abcdef0123456789": "sha256:abcdef012345",
|
||||||
|
"sha256:short": "sha256:short",
|
||||||
|
"": "",
|
||||||
|
"no-colon-format": "no-colon-format",
|
||||||
|
"sha256:0000000000000000abcd": "sha256:000000000000",
|
||||||
|
}
|
||||||
|
for in, want := range cases {
|
||||||
|
if got := shortDigest(in); got != want {
|
||||||
|
t.Errorf("shortDigest(%q): got %q, want %q", in, got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user