fix(QA-audit #307 #308): asyncio lifecycle fix + push-mode queue support #317

Closed
core-be wants to merge 3 commits from fix/qa-audit-307-308 into main
7 changed files with 124 additions and 58 deletions

View File

@ -4,7 +4,6 @@ go 1.25.0
require (
github.com/DATA-DOG/go-sqlmock v1.5.2
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce
github.com/alicebob/miniredis/v2 v2.37.0
github.com/creack/pty v1.1.24
github.com/docker/docker v28.5.2+incompatible
@ -19,6 +18,7 @@ require (
github.com/opencontainers/image-spec v1.1.1
github.com/redis/go-redis/v9 v9.19.0
github.com/robfig/cron/v3 v3.0.1
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce
golang.org/x/crypto v0.50.0
gopkg.in/yaml.v3 v3.0.1
)

View File

@ -4,8 +4,6 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/Molecule-AI/molecule-ai-plugin-gh-identity v0.0.0-20260424033845-4fd5ac7be30f h1:YkLRhUg+9qr9OV9N8dG1Hj0Ml7TThHlRwh5F//oUJVs=
github.com/Molecule-AI/molecule-ai-plugin-gh-identity v0.0.0-20260424033845-4fd5ac7be30f/go.mod h1:NqdtlWZDJvpXNJRHnMkPhTKHdA1LZTNH+63TB66JSOU=
github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68=
github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
@ -154,6 +152,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce h1:ftm0ba0ukLlfqeFes+/jWnXH8XULXmRpMy3fOCZ83/U=
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce/go.mod h1:0aAqoDle2V7Cywso94MXdv1DH/HEe/0oZmcbqWYMK7g=
go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE=
go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=

View File

@ -61,15 +61,26 @@ const DriftSweepInterval = 1 * time.Hour
// that handles Gitea instances on high-latency links.
const ResolveRefDeadline = 60 * time.Second
// PluginResolver resolves plugin sources to installable directories.
// Satisfied by *Registry (which wraps GithubResolver + LocalResolver).
// PluginResolver is the registry-level abstraction the sweeper consumes:
// pick a per-scheme SourceResolver for a parsed Source, and enumerate the
// registered schemes so we can strip the prefix from a stored source_raw.
//
// Resolve returns the production SourceResolver from source.go (NOT another
// PluginResolver) — that's the actual shape of *Registry.Resolve, and the
// sweeper only needs the per-scheme resolver's identity, not its Fetch.
//
// Named PluginResolver (not SourceResolver) to avoid redeclaring the
// SourceResolver interface defined in source.go (core#228 fix).
// per-scheme SourceResolver interface defined in source.go (core#228 fix).
// Satisfied by *Registry from source.go via Resolve + Schemes.
type PluginResolver interface {
Resolve(source Source) (PluginResolver, error)
Resolve(source Source) (SourceResolver, error)
Schemes() []string
}
// Compile-time assertion: *Registry satisfies PluginResolver. Catches any
// future drift in Registry.Resolve / Schemes signatures at build time.
var _ PluginResolver = (*Registry)(nil)
// StartPluginDriftSweeper runs the drift-detection loop until ctx is cancelled.
// Pass a nil resolver to disable the sweeper (useful for harnesses or CP/SaaS
// mode where git operations are unavailable).

View File

@ -499,15 +499,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.POST("/admin/workspace-images/refresh", middleware.AdminAuth(db.DB), imgH.Refresh)
}
// Admin — plugin version-subscription drift queue (core#123).
// List pending drift entries and apply approved updates.
{
driftH := handlers.NewAdminPluginDriftHandler(plgh)
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
}
// Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled().
// NOT behind AdminAuth — this is the bootstrap endpoint E2E tests and
// fresh installs use to obtain their first admin bearer. Adding AdminAuth
@ -635,6 +626,15 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
if pluginResolver != nil {
plgh = plgh.WithSourceResolver(pluginResolver)
}
// Admin — plugin version-subscription drift queue (core#123).
// List pending drift entries and apply approved updates.
{
driftH := handlers.NewAdminPluginDriftHandler(plgh)
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
}
r.GET("/plugins", plgh.ListRegistry)
r.GET("/plugins/sources", plgh.ListSources)
wsAuth.GET("/plugins", plgh.ListInstalled)

View File

@ -153,6 +153,7 @@ _KEY_RETRY_AFTER = "retry_after"
_STATUS_QUEUED = "queued"
_DELIVERY_MODE_POLL = "poll"
_DELIVERY_MODE_PUSH = "push"
def parse(data: Any) -> Variant:
@ -165,8 +166,8 @@ def parse(data: Any) -> Variant:
The order of checks matters:
1. Non-dict input Malformed (server contract is dict-shaped).
2. Poll-queued envelope is checked BEFORE result/error because a
server bug that sets both ``status=queued`` and ``result``
2. Poll-queued or push-queued envelope is checked BEFORE result/error
because a server bug that sets both ``status=queued`` and ``result``
should be loud, not silently treated as Result.
3. ``result`` Result (the JSON-RPC success path).
4. ``error`` Error (JSON-RPC error or platform error).
@ -179,20 +180,23 @@ def parse(data: Any) -> Variant:
)
return Malformed(raw=data)
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is
# ambiguous and we route to Malformed for visibility.
# Poll-queued or push-queued envelope. Both status and delivery_mode
# must be present — the workspace server sets them together; if only
# one is present the body is ambiguous and we route to Malformed for
# visibility.
if (
data.get(_KEY_STATUS) == _STATUS_QUEUED
and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL
and data.get(_KEY_DELIVERY_MODE) is not None
):
method_raw = data.get(_KEY_METHOD)
method = str(method_raw) if method_raw is not None else "unknown"
delivery_mode = str(data.get(_KEY_DELIVERY_MODE))
logger.info(
"a2a_response.parse: queued for poll-mode peer (method=%s)",
"a2a_response.parse: queued for %s-mode peer (method=%s)",
delivery_mode,
method,
)
return Queued(method=method)
return Queued(method=method, delivery_mode=delivery_mode)
# JSON-RPC success.
if _KEY_RESULT in data:

View File

@ -105,6 +105,20 @@ _FIXTURES = {
"status": "queued",
"delivery_mode": "poll",
},
"push_queued_full": {
"status": "queued",
"delivery_mode": "push",
"method": "message/send",
},
"push_queued_notify": {
"status": "queued",
"delivery_mode": "push",
"method": "notify",
},
"push_queued_no_method": {
"status": "queued",
"delivery_mode": "push",
},
"malformed_empty_dict": {},
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
"malformed_status_queued_no_delivery_mode": {
@ -159,6 +173,30 @@ class TestQueuedVariant:
a2a_response.parse(_FIXTURES["poll_queued_full"])
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
# Push-mode tests
def test_push_queued_full_envelope(self):
v = a2a_response.parse(_FIXTURES["push_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_notify(self):
v = a2a_response.parse(_FIXTURES["push_queued_notify"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "notify"
assert v.delivery_mode == "push"
def test_push_queued_missing_method_uses_unknown(self):
v = a2a_response.parse(_FIXTURES["push_queued_no_method"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "unknown"
assert v.delivery_mode == "push"
def test_logs_info_on_push_queued(self, caplog):
with caplog.at_level(logging.INFO, logger="a2a_response"):
a2a_response.parse(_FIXTURES["push_queued_full"])
assert any("queued for push-mode peer" in r.message for r in caplog.records)
class TestResultVariant:
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
@ -361,7 +399,7 @@ _ADVERSARIAL_INPUTS: list[Any] = [
{"error": {"message": None, "code": None}},
{"error": {"message": ["nested", "list"]}},
{"status": None, "delivery_mode": None, "method": None},
{"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode
{"status": "queued", "delivery_mode": "other-mode", "method": "x"}, # unknown-but-present delivery_mode → Queued (matches on delivery_mode is not None)
{"status": "running", "delivery_mode": "poll"}, # wrong status
{"status": 42, "delivery_mode": "poll"}, # non-string status
# Deeply-nested junk
@ -436,6 +474,9 @@ class TestRegressionGate:
"poll_queued_full": a2a_response.Queued,
"poll_queued_notify": a2a_response.Queued,
"poll_queued_no_method": a2a_response.Queued,
"push_queued_full": a2a_response.Queued,
"push_queued_notify": a2a_response.Queued,
"push_queued_no_method": a2a_response.Queued,
"malformed_empty_dict": a2a_response.Malformed,
"malformed_unexpected_keys": a2a_response.Malformed,
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,

View File

@ -15,11 +15,11 @@ The wrappers are ~40 LOC of glue. The full delivery behavior
"""
from __future__ import annotations
import asyncio
import json
from unittest.mock import MagicMock, patch
import pytest
import pytest_asyncio
@pytest.fixture(autouse=True)
@ -29,24 +29,22 @@ def _require_workspace_id(monkeypatch):
yield
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
# ---------------------------------------------------------------------------
# tool_inbox_peek
# ---------------------------------------------------------------------------
class TestToolInboxPeek:
def test_returns_not_enabled_when_state_none(self):
@pytest.mark.asyncio
async def test_returns_not_enabled_when_state_none(self):
import a2a_tools
with patch("inbox.get_state", return_value=None):
out = _run(a2a_tools.tool_inbox_peek())
out = await a2a_tools.tool_inbox_peek()
assert "not enabled" in out
def test_returns_json_array_of_messages(self):
@pytest.mark.asyncio
async def test_returns_json_array_of_messages(self):
import a2a_tools
msg1 = MagicMock()
@ -58,20 +56,21 @@ class TestToolInboxPeek:
fake_state.peek.return_value = [msg1, msg2]
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_inbox_peek(limit=5))
out = await a2a_tools.tool_inbox_peek(limit=5)
# peek limit is forwarded
fake_state.peek.assert_called_once_with(limit=5)
parsed = json.loads(out)
assert len(parsed) == 2
assert parsed[0]["activity_id"] == "a1"
def test_non_int_limit_falls_back_to_10(self):
@pytest.mark.asyncio
async def test_non_int_limit_falls_back_to_10(self):
import a2a_tools
fake_state = MagicMock()
fake_state.peek.return_value = []
with patch("inbox.get_state", return_value=fake_state):
_run(a2a_tools.tool_inbox_peek(limit="garbage")) # type: ignore[arg-type]
await a2a_tools.tool_inbox_peek(limit="garbage") # type: ignore[arg-type]
fake_state.peek.assert_called_once_with(limit=10)
@ -81,49 +80,54 @@ class TestToolInboxPeek:
class TestToolInboxPop:
def test_returns_not_enabled_when_state_none(self):
@pytest.mark.asyncio
async def test_returns_not_enabled_when_state_none(self):
import a2a_tools
with patch("inbox.get_state", return_value=None):
out = _run(a2a_tools.tool_inbox_pop("act-1"))
out = await a2a_tools.tool_inbox_pop("act-1")
assert "not enabled" in out
def test_rejects_empty_activity_id(self):
@pytest.mark.asyncio
async def test_rejects_empty_activity_id(self):
import a2a_tools
fake_state = MagicMock()
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_inbox_pop(""))
out = await a2a_tools.tool_inbox_pop("")
assert "activity_id is required" in out
fake_state.pop.assert_not_called()
def test_rejects_non_str_activity_id(self):
@pytest.mark.asyncio
async def test_rejects_non_str_activity_id(self):
import a2a_tools
fake_state = MagicMock()
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_inbox_pop(123)) # type: ignore[arg-type]
out = await a2a_tools.tool_inbox_pop(123) # type: ignore[arg-type]
assert "activity_id is required" in out
fake_state.pop.assert_not_called()
def test_returns_removed_true_when_popped(self):
@pytest.mark.asyncio
async def test_returns_removed_true_when_popped(self):
import a2a_tools
fake_state = MagicMock()
fake_state.pop.return_value = MagicMock() # truthy = something was removed
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_inbox_pop("act-7"))
out = await a2a_tools.tool_inbox_pop("act-7")
parsed = json.loads(out)
assert parsed == {"removed": True, "activity_id": "act-7"}
fake_state.pop.assert_called_once_with("act-7")
def test_returns_removed_false_when_unknown(self):
@pytest.mark.asyncio
async def test_returns_removed_false_when_unknown(self):
import a2a_tools
fake_state = MagicMock()
fake_state.pop.return_value = None
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_inbox_pop("act-missing"))
out = await a2a_tools.tool_inbox_pop("act-missing")
parsed = json.loads(out)
assert parsed == {"removed": False, "activity_id": "act-missing"}
@ -134,25 +138,28 @@ class TestToolInboxPop:
class TestToolWaitForMessage:
def test_returns_not_enabled_when_state_none(self):
@pytest.mark.asyncio
async def test_returns_not_enabled_when_state_none(self):
import a2a_tools
with patch("inbox.get_state", return_value=None):
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=1.0))
out = await a2a_tools.tool_wait_for_message(timeout_secs=1.0)
assert "not enabled" in out
def test_timeout_payload_when_no_message(self):
@pytest.mark.asyncio
async def test_timeout_payload_when_no_message(self):
import a2a_tools
fake_state = MagicMock()
fake_state.wait.return_value = None
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=0.1))
out = await a2a_tools.tool_wait_for_message(timeout_secs=0.1)
parsed = json.loads(out)
assert parsed["timeout"] is True
assert parsed["timeout_secs"] == 0.1
def test_returns_message_when_delivered(self):
@pytest.mark.asyncio
async def test_returns_message_when_delivered(self):
import a2a_tools
msg = MagicMock()
@ -160,37 +167,40 @@ class TestToolWaitForMessage:
fake_state = MagicMock()
fake_state.wait.return_value = msg
with patch("inbox.get_state", return_value=fake_state):
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=2.0))
out = await a2a_tools.tool_wait_for_message(timeout_secs=2.0)
parsed = json.loads(out)
assert parsed["activity_id"] == "a-9"
def test_timeout_clamped_to_300(self):
@pytest.mark.asyncio
async def test_timeout_clamped_to_300(self):
import a2a_tools
fake_state = MagicMock()
fake_state.wait.return_value = None
with patch("inbox.get_state", return_value=fake_state):
_run(a2a_tools.tool_wait_for_message(timeout_secs=99999))
await a2a_tools.tool_wait_for_message(timeout_secs=99999)
# Whatever wait was called with, it must not exceed 300
passed = fake_state.wait.call_args.args[0]
assert passed == 300.0
def test_timeout_clamped_to_zero_floor(self):
@pytest.mark.asyncio
async def test_timeout_clamped_to_zero_floor(self):
import a2a_tools
fake_state = MagicMock()
fake_state.wait.return_value = None
with patch("inbox.get_state", return_value=fake_state):
_run(a2a_tools.tool_wait_for_message(timeout_secs=-5))
await a2a_tools.tool_wait_for_message(timeout_secs=-5)
passed = fake_state.wait.call_args.args[0]
assert passed == 0.0
def test_non_numeric_timeout_falls_back_to_60(self):
@pytest.mark.asyncio
async def test_non_numeric_timeout_falls_back_to_60(self):
import a2a_tools
fake_state = MagicMock()
fake_state.wait.return_value = None
with patch("inbox.get_state", return_value=fake_state):
_run(a2a_tools.tool_wait_for_message(timeout_secs="garbage")) # type: ignore[arg-type]
await a2a_tools.tool_wait_for_message(timeout_secs="garbage") # type: ignore[arg-type]
passed = fake_state.wait.call_args.args[0]
assert passed == 60.0