Merge pull request #119 from Molecule-AI/fix/111-112-clean

fix(security+scheduler): IPv6 SSRF gap + scheduler unit tests [supersedes #111, #112]
This commit is contained in:
Hongming Wang 2026-04-15 09:36:59 -07:00 committed by GitHub
commit 06fed1776a
4 changed files with 242 additions and 21 deletions

View File

@ -37,6 +37,13 @@ func NewRegistryHandler(b *events.Broadcaster) *RegistryHandler {
// - 10.0.0.0/8 RFC-1918 — lateral movement within private networks
// - 172.16.0.0/12 RFC-1918 — includes Docker bridge/overlay ranges
// - 192.168.0.0/16 RFC-1918 — home/office LAN ranges
// - fe80::/10 IPv6 link-local — same threat class as 169.254.x.x
// - ::1/128 IPv6 loopback
// - fc00::/7 IPv6 ULA (RFC-4193 private ranges)
//
// IPv4-mapped IPv6 (e.g. ::ffff:169.254.169.254) is normalised to IPv4 by
// Go's net.ParseIP.To4() before Contains() runs, so the IPv4 rules above
// catch those without a separate entry.
//
// Returns a non-nil error suitable for including in a 400 Bad Request response.
func validateAgentURL(rawURL string) error {
@ -59,16 +66,19 @@ func validateAgentURL(rawURL string) error {
cidr string
label string
}{
{"169.254.0.0/16", "link-local (cloud metadata endpoint)"},
{"127.0.0.0/8", "loopback"},
{"10.0.0.0/8", "RFC-1918 private"},
{"172.16.0.0/12", "RFC-1918 private"},
{"192.168.0.0/16", "RFC-1918 private"},
{"169.254.0.0/16", "link-local address (cloud metadata endpoint)"},
{"127.0.0.0/8", "loopback address"},
{"10.0.0.0/8", "RFC-1918 private address"},
{"172.16.0.0/12", "RFC-1918 private address"},
{"192.168.0.0/16", "RFC-1918 private address"},
{"fe80::/10", "IPv6 link-local address (cloud metadata analogue)"},
{"::1/128", "IPv6 loopback address"},
{"fc00::/7", "IPv6 ULA address (RFC-4193 private)"},
}
for _, r := range blockedRanges {
_, network, _ := net.ParseCIDR(r.cidr)
if network.Contains(ip) {
return errors.New("private/reserved IP ranges are not permitted")
return fmt.Errorf("url targets a blocked address: %s", r.label)
}
}
}

View File

@ -480,6 +480,17 @@ func TestValidateAgentURL(t *testing.T) {
{"blocked RFC1918 192.168.0.1", "http://192.168.0.1:8080", true},
{"blocked RFC1918 192.168.1.100", "http://192.168.1.100:8080", true},
{"blocked RFC1918 192.168.255.254", "http://192.168.255.254:8080", true},
// ── Must be rejected: IPv6 SSRF vectors (C6 gap) ─────────────────────
// Go's IPv4 CIDRs do not match pure IPv6 addresses via Contains(), so
// each IPv6 range needs an explicit blocklist entry.
{"blocked IPv6 loopback [::1]", "http://[::1]:8080", true},
{"blocked IPv6 link-local [fe80::1]", "http://[fe80::1]:8080", true},
{"blocked IPv6 ULA [fd00::1]", "http://[fd00::1]:8080", true},
// IPv4-mapped IPv6 for a blocked range must also be rejected.
// Go normalises ::ffff:169.254.x.x to IPv4 via To4(), so the existing
// 169.254.0.0/16 entry catches it without a dedicated rule.
{"blocked IPv4-mapped IPv6 link-local", "http://[::ffff:169.254.169.254]:80", true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {

View File

@ -53,32 +53,38 @@ type Scheduler struct {
// (whether it fired schedules or not). Read by Healthy() and the
// /admin/scheduler/health endpoint to detect stuck-tick conditions.
// Atomic-ish via the mutex; tick rate is 30s so contention is trivial.
mu sync.RWMutex
lastTickAt time.Time
mu sync.RWMutex
lastTickAt time.Time
tickInterval time.Duration // defaults to pollInterval; overridable in tests
}
func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler {
return &Scheduler{proxy: proxy, broadcaster: broadcaster}
return &Scheduler{
proxy: proxy,
broadcaster: broadcaster,
tickInterval: pollInterval,
}
}
// LastTickAt returns the wall-clock time of the most recent successful tick.
// Returns the zero Time if Start() has never been called or no tick has
// completed since process start.
// LastTickAt returns the wall-clock time of the most recently completed tick.
// Returns a zero time.Time if the scheduler has never completed a tick.
func (s *Scheduler) LastTickAt() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastTickAt
}
// Healthy returns true if a tick completed within the last 2× pollInterval
// (i.e. at most 1 missed tick is tolerated). Use from /health and from
// /admin/scheduler/health to surface scheduler liveness.
// Healthy returns true if the scheduler has completed a tick within the last
// 2×pollInterval window. Returns false before the first tick or if the
// scheduler is stalled.
func (s *Scheduler) Healthy() bool {
last := s.LastTickAt()
if last.IsZero() {
s.mu.RLock()
t := s.lastTickAt
s.mu.RUnlock()
if t.IsZero() {
return false
}
return time.Since(last) < 2*pollInterval
return time.Since(t) < 2*pollInterval
}
// Start runs the scheduler poll loop. Blocks until ctx is cancelled.
@ -89,10 +95,10 @@ func (s *Scheduler) Healthy() bool {
// is "no crons firing" — which we observed as a 12+ hour silent outage
// on 2026-04-14 (issue #85).
func (s *Scheduler) Start(ctx context.Context) {
ticker := time.NewTicker(pollInterval)
ticker := time.NewTicker(s.tickInterval)
defer ticker.Stop()
log.Printf("Scheduler: started (poll interval=%s)", pollInterval)
log.Printf("Scheduler: started (poll interval=%s)", s.tickInterval)
tickWithRecover := func() {
defer func() {
@ -198,10 +204,24 @@ func (s *Scheduler) tick(ctx context.Context) {
log.Printf("Scheduler: rows error: %v", err)
}
wg.Wait()
// Record tick completion time for health monitoring.
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
}
// fireSchedule sends the A2A message and updates the schedule row.
// A deferred recover guards against panics in the A2A proxy so that a single
// misbehaving workspace cannot crash the scheduler goroutine pool.
func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: panic recovered in fireSchedule for '%s' (%s): %v",
sched.Name, sched.ID, r)
}
}()
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
defer cancel()
@ -296,7 +316,7 @@ func truncate(s string, maxLen int) string {
func ComputeNextRun(cronExpr, tz string, after time.Time) (time.Time, error) {
loc, err := time.LoadLocation(tz)
if err != nil {
loc = time.UTC
return time.Time{}, fmt.Errorf("invalid timezone %q: %w", tz, err)
}
parser := cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow)

View File

@ -0,0 +1,180 @@
package scheduler
import (
"context"
"testing"
"time"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// setupTestDB replaces the global db.DB with a sqlmock and returns the mock
// handle. The real DB is restored (by closing the mock conn) via t.Cleanup.
func setupTestDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
db.DB = mockDB
t.Cleanup(func() { mockDB.Close() })
return mock
}
// panicProxy is a test double whose ProxyA2ARequest always panics.
// Used by TestPanicRecovery to verify the scheduler's goroutine-level
// panic recovery in fireSchedule.
type panicProxy struct{}
func (p *panicProxy) ProxyA2ARequest(
_ context.Context, _ string, _ []byte, _ string, _ bool,
) (int, []byte, error) {
panic("simulated A2A proxy panic")
}
// ── TestLastTickAt_zero ───────────────────────────────────────────────────────
// TestLastTickAt_zero confirms that LastTickAt returns a zero time.Time on a
// freshly-created scheduler that has never been started or ticked.
func TestLastTickAt_zero(t *testing.T) {
s := New(nil, nil)
if got := s.LastTickAt(); !got.IsZero() {
t.Errorf("LastTickAt() before any tick = %v, want zero time.Time", got)
}
}
// ── TestHealthy_beforeStart ───────────────────────────────────────────────────
// TestHealthy_beforeStart confirms that Healthy returns false when lastTickAt
// is zero (scheduler created but never ticked).
func TestHealthy_beforeStart(t *testing.T) {
s := New(nil, nil)
if s.Healthy() {
t.Error("Healthy() = true on a scheduler that has never ticked, want false")
}
}
// ── TestHealthy_freshTick ─────────────────────────────────────────────────────
// TestHealthy_freshTick sets lastTickAt to the current time — mirroring what
// tick() does after a completed poll cycle — and confirms Healthy returns true.
func TestHealthy_freshTick(t *testing.T) {
s := New(nil, nil)
// Simulate what tick() writes after wg.Wait() returns.
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
if !s.Healthy() {
t.Error("Healthy() = false immediately after a fresh tick timestamp, want true")
}
}
// ── TestHealthy_stale ─────────────────────────────────────────────────────────
// TestHealthy_stale backdates lastTickAt by 3×pollInterval (well beyond the
// 2×pollInterval liveness window) and confirms Healthy returns false.
func TestHealthy_stale(t *testing.T) {
s := New(nil, nil)
s.mu.Lock()
s.lastTickAt = time.Now().Add(-3 * pollInterval) // 90 s ago; threshold is 60 s
s.mu.Unlock()
if s.Healthy() {
t.Errorf("Healthy() = true when lastTickAt is 3×pollInterval (%s) ago, want false",
3*pollInterval)
}
}
// ── TestComputeNextRun_valid ──────────────────────────────────────────────────
// TestComputeNextRun_valid checks that "0 * * * *" (top-of-hour) returns a
// future time whose Minute() == 0 when the reference is mid-hour.
func TestComputeNextRun_valid(t *testing.T) {
// 2025-01-01 12:30 UTC — clearly mid-hour so "next" top-of-hour is 13:00.
ref := time.Date(2025, 1, 1, 12, 30, 0, 0, time.UTC)
next, err := ComputeNextRun("0 * * * *", "UTC", ref)
if err != nil {
t.Fatalf("ComputeNextRun(valid expr) returned unexpected error: %v", err)
}
if !next.After(ref) {
t.Errorf("ComputeNextRun() = %v, want a time strictly after ref %v", next, ref)
}
if next.Minute() != 0 {
t.Errorf("ComputeNextRun() minute = %d, want 0 (top of hour)", next.Minute())
}
}
// ── TestComputeNextRun_invalid ────────────────────────────────────────────────
// TestComputeNextRun_invalid confirms that an unparseable cron expression
// returns a non-nil error.
func TestComputeNextRun_invalid(t *testing.T) {
_, err := ComputeNextRun("not-a-cron", "UTC", time.Now())
if err == nil {
t.Error("ComputeNextRun(invalid cron expr) = nil, want non-nil error")
}
}
// ── TestComputeNextRun_invalidTimezone ────────────────────────────────────────
// TestComputeNextRun_invalidTimezone confirms that an unrecognised IANA
// timezone name returns a non-nil error (rather than silently falling back
// to UTC, which could mask misconfigured schedules).
func TestComputeNextRun_invalidTimezone(t *testing.T) {
_, err := ComputeNextRun("0 * * * *", "Not/AZone", time.Now())
if err == nil {
t.Error("ComputeNextRun(invalid tz) = nil, want non-nil error")
}
}
// ── TestPanicRecovery ─────────────────────────────────────────────────────────
// TestPanicRecovery verifies that a panic inside a fireSchedule goroutine does
// NOT crash the scheduler.
//
// The test calls tick() directly with a sqlmock that surfaces one due schedule.
// panicProxy causes ProxyA2ARequest to panic; the deferred recover() in
// fireSchedule catches it. After tick() returns, lastTickAt must be set and
// Healthy() must return true — proving the scheduler survived.
//
// Without panic recovery an unrecovered goroutine panic terminates the entire
// test binary, so the test completing is itself evidence that recovery worked.
func TestPanicRecovery(t *testing.T) {
mock := setupTestDB(t)
// WorkspaceID must be ≥12 chars: fireSchedule slices it with [:12] for logging.
schedRows := sqlmock.NewRows(
[]string{"id", "workspace_id", "name", "cron_expr", "timezone", "prompt"},
).AddRow(
"sched-panic-01", // id
"ws-panic-workspace-1", // workspace_id (21 chars > 12)
"panic-job", // name
"* * * * *", // cron_expr
"UTC", // timezone
"fire and panic", // prompt
)
mock.ExpectQuery(`SELECT id, workspace_id`).WillReturnRows(schedRows)
s := New(&panicProxy{}, nil)
// tick() launches fireSchedule in a goroutine that will panic.
// If there is no recovery, the goroutine crash terminates the process here.
s.tick(context.Background())
// tick() returned normally → the panic was caught, wg.Wait() completed,
// and lastTickAt was updated.
if !s.Healthy() {
t.Error("Healthy() = false after panic-recovery tick, want true " +
"— scheduler must survive a panicking A2A proxy")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations: %v", err)
}
}