test(handlers): introduce events.EventEmitter interface (#1814 partial)

The 3 skipped tests in workspace_provision_test.go (#1206 regression
tests) were blocked because captureBroadcaster's struct-embed wouldn't
type-check against WorkspaceHandler.broadcaster's concrete
*events.Broadcaster field. This PR fixes the interface blocker for
the 2 broadcaster-related tests; the 3rd (plugins.Registry resolver)
is a separate blocker tracked elsewhere.

Changes:

- internal/events/broadcaster.go: define `EventEmitter` interface with
  RecordAndBroadcast + BroadcastOnly. *Broadcaster satisfies it via
  its existing methods (compile-time assertion guards future drift).
  SubscribeSSE / Subscribe stay off the interface because only sse.go
  + cmd/server/main.go call them, and both still hold the concrete
  *Broadcaster.

- internal/handlers/workspace.go: WorkspaceHandler.broadcaster type
  changes from *events.Broadcaster to events.EventEmitter.
  NewWorkspaceHandler signature updated to match. Production callers
  unchanged — they pass *events.Broadcaster, which the interface
  accepts.

- internal/handlers/activity.go: LogActivity takes events.EventEmitter
  for the same reason — tests passing a stub no longer need to
  construct the full broadcaster.

- internal/handlers/workspace_provision_test.go: captureBroadcaster
  drops the struct embed (no more zero-value Broadcaster underlying
  the SSE+hub fields), implements RecordAndBroadcast directly, and
  adds a no-op BroadcastOnly to satisfy the interface. Skip messages
  on the 2 empty broadcaster-blocked tests updated to reflect the
  new "interface unblocked, test body still needed" state.

Verified `go build ./...`, `go test ./internal/handlers/`, and
`go vet ./...` all clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-26 09:05:52 -07:00
parent d86eabdd58
commit 7d48f24fef
4 changed files with 45 additions and 8 deletions

View File

@ -15,6 +15,29 @@ import (
const broadcastChannel = "events:broadcast"
// EventEmitter is the contract handler code needs from a broadcaster.
// Defining it here lets tests substitute a capture-only stub instead
// of standing up the full Redis + WebSocket hub topology that the
// concrete *Broadcaster builds (and that previously blocked
// TestProvisionWorkspace_* regression tests on issue #1814).
//
// Includes BroadcastOnly because the activity-log + A2A-response paths
// inside the handler package fan out via that method — narrowing
// further would force production callers back to the concrete type.
//
// *Broadcaster satisfies this interface trivially. Production code that
// needs the wider surface (SubscribeSSE, Subscribe) keeps using the
// concrete *Broadcaster type — sse.go + cmd/server/main.go are the
// only such call sites today.
type EventEmitter interface {
RecordAndBroadcast(ctx context.Context, eventType string, workspaceID string, payload interface{}) error
BroadcastOnly(workspaceID string, eventType string, payload interface{})
}
// Compile-time assertion: a renamed/reshaped Broadcaster method that
// silently broke this interface would fail to build here.
var _ EventEmitter = (*Broadcaster)(nil)
// sseSubscription is a single in-process SSE subscriber.
// deliverToSSE writes to ch; StreamEvents reads from it.
type sseSubscription struct {

View File

@ -373,7 +373,9 @@ func (h *ActivityHandler) Report(c *gin.Context) {
}
// LogActivity inserts an activity log and optionally broadcasts via WebSocket.
func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params ActivityParams) {
// Takes events.EventEmitter (#1814) so callers passing a stub broadcaster
// in tests no longer need to construct the full *events.Broadcaster.
func LogActivity(ctx context.Context, broadcaster events.EventEmitter, params ActivityParams) {
reqJSON, reqErr := json.Marshal(params.RequestBody)
if reqErr != nil {
log.Printf("LogActivity: failed to marshal request_body for %s: %v", params.WorkspaceID, reqErr)

View File

@ -26,7 +26,13 @@ import (
)
type WorkspaceHandler struct {
broadcaster *events.Broadcaster
// broadcaster narrowed from `*events.Broadcaster` to the
// events.EventEmitter interface (#1814) so tests can substitute a
// capture-only stub without standing up the real Redis + WS-hub
// topology. Production callers still pass *events.Broadcaster, which
// satisfies the interface — see the compile-time assertion in
// internal/events/broadcaster.go.
broadcaster events.EventEmitter
provisioner *provisioner.Provisioner
cpProv *provisioner.CPProvisioner
platformURL string
@ -46,7 +52,7 @@ type WorkspaceHandler struct {
provisionTimeouts runtimeProvisionTimeoutsCache
}
func NewWorkspaceHandler(b *events.Broadcaster, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler {
func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler {
return &WorkspaceHandler{
broadcaster: b,
provisioner: p,

View File

@ -9,7 +9,6 @@ import (
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
@ -1044,13 +1043,20 @@ var errInternalDB = fmt.Errorf("pq: connection refused")
var errInternalOS = fmt.Errorf("operation failed: no such file or directory")
// captureBroadcaster is a test broadcaster that captures the last data
// payload passed to RecordAndBroadcast so tests can inspect it.
// payload passed to RecordAndBroadcast so tests can inspect it. Now
// satisfies events.EventEmitter (#1814) directly — RecordAndBroadcast
// captures, BroadcastOnly is a no-op since none of the
// WorkspaceHandler paths under test call it.
type captureBroadcaster struct {
events.Broadcaster // embed to satisfy the interface — only RecordAndBroadcast is overridden
lastData map[string]interface{}
lastErr error
}
// BroadcastOnly is required to satisfy events.EventEmitter. None of the
// captureBroadcaster's exercising tests should land here — if a future
// test does, it'll need to add capture state for that channel.
func (c *captureBroadcaster) BroadcastOnly(_ string, _ string, _ interface{}) {}
func (c *captureBroadcaster) RecordAndBroadcast(_ context.Context, _, _ string, data interface{}) error {
if m, ok := data.(map[string]interface{}); ok {
// Shallow-copy so the caller can't mutate our capture.
@ -1107,14 +1113,14 @@ func containsUnsafeString(v interface{}) bool {
// never leaks internal error details in WORKSPACE_PROVISION_FAILED broadcasts.
// Regression test for issue #1206.
func TestProvisionWorkspace_NoInternalErrorsInBroadcast(t *testing.T) {
t.Skip("TODO: captureBroadcaster type mismatch with WorkspaceHandler.broadcaster (*events.Broadcaster). Needs broadcaster interface refactor — currently blocking package compile on main (2026-04-21).")
t.Skip("TODO: write the test body. The interface blocker (#1814) is fixed — captureBroadcaster now satisfies events.EventEmitter and can be passed to NewWorkspaceHandler. The remaining work is constructing the provisionWorkspace failure path + asserting captured payload doesn't contain unsafeErrorStrings.")
}
// TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast asserts that
// provisionWorkspaceCP never leaks err.Error() in WORKSPACE_PROVISION_FAILED
// broadcasts. Regression test for issue #1206.
func TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast(t *testing.T) {
t.Skip("TODO: captureBroadcaster type mismatch with WorkspaceHandler.broadcaster (*events.Broadcaster). Needs broadcaster interface refactor — currently blocking package compile on main (2026-04-21).")
t.Skip("TODO: write the test body. Same status as TestProvisionWorkspace_NoInternalErrorsInBroadcast — interface blocker fixed (#1814), need to construct the provisionWorkspaceCP failure path + assert payload sanitization.")
}
// mockEnvMutator is a provisionhook.Registry stub that always returns a fixed error.