test(workspace-server): integration e2e — canvas client disconnect mid-flight preserves user message
Some checks failed
audit-force-merge / audit (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 27s
CI / Detect changes (pull_request) Successful in 26s
E2E API Smoke Test / detect-changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 38s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
E2E Chat / detect-changes (pull_request) Successful in 26s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 26s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 20s
Harness Replays / detect-changes (pull_request) Successful in 18s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 27s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 24s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 1m4s
gate-check-v3 / gate-check (pull_request) Successful in 28s
qa-review / approved (pull_request) Failing after 27s
security-review / approved (pull_request) Failing after 27s
sop-checklist / all-items-acked (pull_request) Successful in 24s
sop-tier-check / tier-check (pull_request) Successful in 19s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m38s
CI / Python Lint & Test (pull_request) Successful in 8m8s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 16s
Harness Replays / Harness Replays (pull_request) Successful in 16s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 16s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 3m3s
CI / all-required (pull_request) Successful in 15m9s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 8m31s
E2E Chat / E2E Chat (pull_request) Failing after 11m24s
CI / Canvas (Next.js) (pull_request) Successful in 24m24s
CI / Canvas Deploy Reminder (pull_request) Waiting to run
CI / Platform (Go) (pull_request) Successful in 28m11s
Some checks failed
audit-force-merge / audit (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 27s
CI / Detect changes (pull_request) Successful in 26s
E2E API Smoke Test / detect-changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 38s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
E2E Chat / detect-changes (pull_request) Successful in 26s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 26s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 20s
Harness Replays / detect-changes (pull_request) Successful in 18s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 27s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 24s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 1m4s
gate-check-v3 / gate-check (pull_request) Successful in 28s
qa-review / approved (pull_request) Failing after 27s
security-review / approved (pull_request) Failing after 27s
sop-checklist / all-items-acked (pull_request) Successful in 24s
sop-tier-check / tier-check (pull_request) Successful in 19s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m38s
CI / Python Lint & Test (pull_request) Successful in 8m8s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 16s
Harness Replays / Harness Replays (pull_request) Successful in 16s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 16s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 3m3s
CI / all-required (pull_request) Successful in 15m9s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 8m31s
E2E Chat / E2E Chat (pull_request) Failing after 11m24s
CI / Canvas (Next.js) (pull_request) Successful in 24m24s
CI / Canvas Deploy Reminder (pull_request) Waiting to run
CI / Platform (Go) (pull_request) Successful in 28m11s
Drives the real ProxyA2A HTTP handler through the literal bug scenario: canvas message/send, mock agent hangs, client request context cancelled (user exits chat) before any reply. Asserts the ingest INSERT fires synchronously before dispatch (on context.WithoutCancel) so the user message is durable even though no logA2ASuccess/finalize ever runs — exactly the pre-fix loss window, now closed. Refs: molecule-ai/internal#470 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
180230a6d7
commit
a674990282
@ -18,13 +18,18 @@ package handlers
|
||||
// INSERTing a duplicate / overwriting with an agent-error bubble.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// persistUserMessageAtIngest must INSERT a pending a2a_receive row
|
||||
@ -216,3 +221,107 @@ func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) {
|
||||
t.Errorf("ingest INSERT did not match the required shape contract: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Integration e2e through the REAL ProxyA2A HTTP handler — the literal bug
|
||||
// scenario: a canvas user sends a message, the agent is slow, and the client
|
||||
// disconnects (exits the chat) BEFORE the agent replies. Pre-fix: no
|
||||
// a2a_receive row was ever written → message lost on reopen. Post-fix: the
|
||||
// ingest INSERT fires synchronously before dispatch, on context.WithoutCancel,
|
||||
// so the user message is durable regardless of the client disconnect.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestProxyA2A_CanvasClientDisconnectMidFlight_UserMessageStillPersistedAtIngest(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// The mock agent hangs until the test releases it — modelling an agent
|
||||
// still synthesising when the user exits the chat. The proxy forwards
|
||||
// on context.WithoutCancel so this dispatch is in-flight while the
|
||||
// inbound client request is already cancelled.
|
||||
release := make(chan struct{})
|
||||
var agentHit int32
|
||||
agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&agentHit, 1)
|
||||
<-release // hang until released (or test ends)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
|
||||
}))
|
||||
defer agent.Close()
|
||||
defer close(release)
|
||||
|
||||
const ws = "ws-disconnect"
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", ws), agent.URL)
|
||||
|
||||
// SQL sequence for a canvas push-mode message/send up to dispatch:
|
||||
// 1. budget check
|
||||
// 2. resolveAgentURL DB fallback miss is avoided (Redis has the URL)
|
||||
// 3. lookupDeliveryMode → NULL ⇒ push-mode (the loss case)
|
||||
// 4. ingest: SELECT name, then INSERT ... RETURNING id ← THE FIX
|
||||
// The agent never replies (hung) and the client ctx is cancelled, so
|
||||
// NO logA2ASuccess/finalize runs — exactly the pre-fix loss window.
|
||||
// The assertion: the ingest INSERT fired anyway (message durable).
|
||||
expectBudgetCheck(mock, ws)
|
||||
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id =`).
|
||||
WithArgs(ws).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) // NULL ⇒ push
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs(ws).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Disconnect WS"))
|
||||
ingestFired := make(chan struct{}, 1)
|
||||
mock.ExpectQuery(`INSERT INTO activity_logs.*RETURNING id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ingest-row-1")).
|
||||
// signal the moment the durable write lands
|
||||
WillDelayFor(0)
|
||||
_ = ingestFired
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: ws}}
|
||||
|
||||
// Cancel the inbound request context shortly after the call starts —
|
||||
// this is the user exiting the chat / closing the tab mid-flight.
|
||||
reqCtx, cancelClient := context.WithCancel(context.Background())
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"DO NOT LOSE THIS MESSAGE"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+ws+"/a2a", bytes.NewBufferString(body)).WithContext(reqCtx)
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
handler.ProxyA2A(c)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait until the agent has been hit (dispatch in-flight), then the
|
||||
// user "exits the chat": cancel the client request context.
|
||||
deadline := time.After(5 * time.Second)
|
||||
for atomic.LoadInt32(&agentHit) == 0 {
|
||||
select {
|
||||
case <-deadline:
|
||||
t.Fatal("agent was never dispatched to within 5s")
|
||||
default:
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
cancelClient() // ← user exits the chat mid-flight
|
||||
|
||||
// Give the handler a moment; it will be stuck on the hung agent, but
|
||||
// the ingest INSERT must already have happened SYNCHRONOUSLY before
|
||||
// dispatch. Verify the durable write landed despite the disconnect.
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("ingest persist did not fire before dispatch — user message would be LOST on disconnect: %v", err)
|
||||
}
|
||||
|
||||
// Clean up: release the hung agent so the handler goroutine can exit.
|
||||
// (release is closed by the deferred close; unblock now.)
|
||||
select {
|
||||
case release <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
<-done
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user