feat(api_server): expose run status for external UIs (#17085)

Adds two API server endpoints for external UIs and orchestrators:

- GET /v1/capabilities — machine-readable feature discovery so clients
  can detect which Runs API / SSE / auth features this Hermes version
  supports before depending on them.
- GET /v1/runs/{run_id} — pollable run status so dashboards can check
  queued/running/completed/failed/cancelled/stopping state without
  holding an SSE connection open.

Also moves request validation ahead of run allocation so invalid
payloads no longer leave orphaned entries in _run_streams waiting for
the TTL sweep.

task_id is intentionally kept as "default" for the Runs API to
preserve the shared-sandbox model used by CLI, gateway, and the
existing _run_agent_with_callbacks path. session_id is surfaced in
run status for external-UI correlation only.

Salvage of PR #17085 by @Magaav.
This commit is contained in:
Magaav 2026-04-29 06:36:56 -07:00 committed by Teknium
parent 83c288da01
commit 810d98e892
4 changed files with 362 additions and 23 deletions

View File

@ -7,7 +7,9 @@ Exposes an HTTP server with endpoints:
- GET /v1/responses/{response_id} Retrieve a stored response
- DELETE /v1/responses/{response_id} Delete a stored response
- GET /v1/models lists hermes-agent as an available model
- GET /v1/capabilities machine-readable API capabilities for external UIs
- POST /v1/runs start a run, returns run_id immediately (202)
- GET /v1/runs/{run_id} retrieve current run status
- GET /v1/runs/{run_id}/events SSE stream of structured lifecycle events
- POST /v1/runs/{run_id}/stop interrupt a running agent
- GET /health health check
@ -590,6 +592,8 @@ class APIServerAdapter(BasePlatformAdapter):
# Active run agent/task references for stop support
self._active_run_agents: Dict[str, Any] = {}
self._active_run_tasks: Dict[str, "asyncio.Task"] = {}
# Pollable run status for dashboards and external control-plane UIs.
self._run_statuses: Dict[str, Dict[str, Any]] = {}
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
@staticmethod
@ -808,6 +812,51 @@ class APIServerAdapter(BasePlatformAdapter):
],
})
async def _handle_capabilities(self, request: "web.Request") -> "web.Response":
"""GET /v1/capabilities — advertise the stable API surface.
External UIs and orchestrators use this endpoint to discover the API
server's plugin-safe contract without scraping docs or assuming that
every Hermes version exposes the same endpoints.
"""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
return web.json_response({
"object": "hermes.api_server.capabilities",
"platform": "hermes-agent",
"model": self._model_name,
"auth": {
"type": "bearer",
"required": bool(self._api_key),
},
"features": {
"chat_completions": True,
"chat_completions_streaming": True,
"responses_api": True,
"responses_streaming": True,
"run_submission": True,
"run_status": True,
"run_events_sse": True,
"run_stop": True,
"tool_progress_events": True,
"session_continuity_header": "X-Hermes-Session-Id",
"cors": bool(self._cors_origins),
},
"endpoints": {
"health": {"method": "GET", "path": "/health"},
"health_detailed": {"method": "GET", "path": "/health/detailed"},
"models": {"method": "GET", "path": "/v1/models"},
"chat_completions": {"method": "POST", "path": "/v1/chat/completions"},
"responses": {"method": "POST", "path": "/v1/responses"},
"runs": {"method": "POST", "path": "/v1/runs"},
"run_status": {"method": "GET", "path": "/v1/runs/{run_id}"},
"run_events": {"method": "GET", "path": "/v1/runs/{run_id}/events"},
"run_stop": {"method": "POST", "path": "/v1/runs/{run_id}/stop"},
},
})
async def _handle_chat_completions(self, request: "web.Request") -> "web.Response":
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
auth_err = self._check_auth(request)
@ -2297,10 +2346,31 @@ class APIServerAdapter(BasePlatformAdapter):
_MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation
_RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept
_RUN_STATUS_TTL = 3600 # seconds to retain terminal run status for polling
def _set_run_status(self, run_id: str, status: str, **fields: Any) -> Dict[str, Any]:
"""Update pollable run status without exposing private agent objects."""
now = time.time()
current = self._run_statuses.get(run_id, {})
current.update({
"object": "hermes.run",
"run_id": run_id,
"status": status,
"updated_at": now,
})
current.setdefault("created_at", fields.pop("created_at", now))
current.update(fields)
self._run_statuses[run_id] = current
return current
def _make_run_event_callback(self, run_id: str, loop: "asyncio.AbstractEventLoop"):
"""Return a tool_progress_callback that pushes structured events to the run's SSE queue."""
def _push(event: Dict[str, Any]) -> None:
self._set_run_status(
run_id,
self._run_statuses.get(run_id, {}).get("status", "running"),
last_event=event.get("event"),
)
q = self._run_streams.get(run_id)
if q is None:
return
@ -2365,28 +2435,6 @@ class APIServerAdapter(BasePlatformAdapter):
if not user_message:
return web.json_response(_openai_error("No user message found in input"), status=400)
run_id = f"run_{uuid.uuid4().hex}"
loop = asyncio.get_running_loop()
q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue()
self._run_streams[run_id] = q
self._run_streams_created[run_id] = time.time()
event_cb = self._make_run_event_callback(run_id, loop)
# Also wire stream_delta_callback so message.delta events flow through
def _text_cb(delta: Optional[str]) -> None:
if delta is None:
return
try:
loop.call_soon_threadsafe(q.put_nowait, {
"event": "message.delta",
"run_id": run_id,
"timestamp": time.time(),
"delta": delta,
})
except Exception:
pass
instructions = body.get("instructions")
previous_response_id = body.get("previous_response_id")
@ -2434,11 +2482,42 @@ class APIServerAdapter(BasePlatformAdapter):
)
conversation_history.append({"role": msg["role"], "content": str(content)})
run_id = f"run_{uuid.uuid4().hex}"
session_id = body.get("session_id") or stored_session_id or run_id
ephemeral_system_prompt = instructions
loop = asyncio.get_running_loop()
q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue()
created_at = time.time()
self._run_streams[run_id] = q
self._run_streams_created[run_id] = created_at
event_cb = self._make_run_event_callback(run_id, loop)
# Also wire stream_delta_callback so message.delta events flow through.
def _text_cb(delta: Optional[str]) -> None:
if delta is None:
return
try:
loop.call_soon_threadsafe(q.put_nowait, {
"event": "message.delta",
"run_id": run_id,
"timestamp": time.time(),
"delta": delta,
})
except Exception:
pass
self._set_run_status(
run_id,
"queued",
created_at=created_at,
session_id=session_id,
model=body.get("model", self._model_name),
)
async def _run_and_close():
try:
self._set_run_status(run_id, "running")
agent = self._create_agent(
ephemeral_system_prompt=ephemeral_system_prompt,
session_id=session_id,
@ -2468,8 +2547,36 @@ class APIServerAdapter(BasePlatformAdapter):
"output": final_response,
"usage": usage,
})
self._set_run_status(
run_id,
"completed",
output=final_response,
usage=usage,
last_event="run.completed",
)
except asyncio.CancelledError:
self._set_run_status(
run_id,
"cancelled",
last_event="run.cancelled",
)
try:
q.put_nowait({
"event": "run.cancelled",
"run_id": run_id,
"timestamp": time.time(),
})
except Exception:
pass
raise
except Exception as exc:
logger.exception("[api_server] run %s failed", run_id)
self._set_run_status(
run_id,
"failed",
error=str(exc),
last_event="run.failed",
)
try:
q.put_nowait({
"event": "run.failed",
@ -2499,6 +2606,21 @@ class APIServerAdapter(BasePlatformAdapter):
return web.json_response({"run_id": run_id, "status": "started"}, status=202)
async def _handle_get_run(self, request: "web.Request") -> "web.Response":
"""GET /v1/runs/{run_id} — return pollable run status for external UIs."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
run_id = request.match_info["run_id"]
status = self._run_statuses.get(run_id)
if status is None:
return web.json_response(
_openai_error(f"Run not found: {run_id}", code="run_not_found"),
status=404,
)
return web.json_response(status)
async def _handle_run_events(self, request: "web.Request") -> "web.StreamResponse":
"""GET /v1/runs/{run_id}/events — SSE stream of structured agent lifecycle events."""
auth_err = self._check_auth(request)
@ -2561,6 +2683,8 @@ class APIServerAdapter(BasePlatformAdapter):
if agent is None and task is None:
return web.json_response(_openai_error(f"Run not found: {run_id}", code="run_not_found"), status=404)
self._set_run_status(run_id, "stopping", last_event="run.stopping")
if agent is not None:
try:
agent.interrupt("Stop requested via API")
@ -2603,6 +2727,15 @@ class APIServerAdapter(BasePlatformAdapter):
self._active_run_agents.pop(run_id, None)
self._active_run_tasks.pop(run_id, None)
stale_statuses = [
run_id
for run_id, status in list(self._run_statuses.items())
if status.get("status") in {"completed", "failed", "cancelled"}
and now - float(status.get("updated_at", 0) or 0) > self._RUN_STATUS_TTL
]
for run_id in stale_statuses:
self._run_statuses.pop(run_id, None)
# ------------------------------------------------------------------
# BasePlatformAdapter interface
# ------------------------------------------------------------------
@ -2621,6 +2754,7 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_get("/health/detailed", self._handle_health_detailed)
self._app.router.add_get("/v1/health", self._handle_health)
self._app.router.add_get("/v1/models", self._handle_models)
self._app.router.add_get("/v1/capabilities", self._handle_capabilities)
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
self._app.router.add_post("/v1/responses", self._handle_responses)
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
@ -2636,6 +2770,7 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}", self._handle_get_run)
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
self._app.router.add_post("/v1/runs/{run_id}/stop", self._handle_stop_run)
# Start background sweep to clean up orphaned (unconsumed) run streams

View File

@ -314,6 +314,7 @@ def _create_app(adapter: APIServerAdapter) -> web.Application:
app.router.add_get("/health/detailed", adapter._handle_health_detailed)
app.router.add_get("/v1/health", adapter._handle_health)
app.router.add_get("/v1/models", adapter._handle_models)
app.router.add_get("/v1/capabilities", adapter._handle_capabilities)
app.router.add_post("/v1/chat/completions", adapter._handle_chat_completions)
app.router.add_post("/v1/responses", adapter._handle_responses)
app.router.add_get("/v1/responses/{response_id}", adapter._handle_get_response)
@ -491,6 +492,46 @@ class TestModelsEndpoint:
assert resp.status == 200
# ---------------------------------------------------------------------------
# /v1/capabilities endpoint
# ---------------------------------------------------------------------------
class TestCapabilitiesEndpoint:
@pytest.mark.asyncio
async def test_capabilities_advertises_plugin_safe_contract(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.get("/v1/capabilities")
assert resp.status == 200
data = await resp.json()
assert data["object"] == "hermes.api_server.capabilities"
assert data["platform"] == "hermes-agent"
assert data["model"] == "hermes-agent"
assert data["auth"]["type"] == "bearer"
assert data["auth"]["required"] is False
assert data["features"]["chat_completions"] is True
assert data["features"]["run_status"] is True
assert data["features"]["run_events_sse"] is True
assert data["features"]["session_continuity_header"] == "X-Hermes-Session-Id"
assert data["endpoints"]["run_status"]["path"] == "/v1/runs/{run_id}"
@pytest.mark.asyncio
async def test_capabilities_requires_auth_when_key_configured(self, auth_adapter):
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.get("/v1/capabilities")
assert resp.status == 401
authed = await cli.get(
"/v1/capabilities",
headers={"Authorization": "Bearer sk-secret"},
)
assert authed.status == 200
data = await authed.json()
assert data["auth"]["required"] is True
# ---------------------------------------------------------------------------
# /v1/chat/completions endpoint
# ---------------------------------------------------------------------------

View File

@ -1,7 +1,8 @@
"""Tests for /v1/runs endpoints: start, events, and stop.
"""Tests for /v1/runs endpoints: start, status, events, and stop.
Covers:
- POST /v1/runs start a run (202)
- GET /v1/runs/{run_id} poll run status
- GET /v1/runs/{run_id}/events SSE event stream
- POST /v1/runs/{run_id}/stop interrupt a running agent
- Auth, error handling, and cleanup
@ -46,6 +47,7 @@ def _create_runs_app(adapter: APIServerAdapter) -> web.Application:
app = web.Application(middlewares=mws)
app["api_server_adapter"] = adapter
app.router.add_post("/v1/runs", adapter._handle_runs)
app.router.add_get("/v1/runs/{run_id}", adapter._handle_get_run)
app.router.add_get("/v1/runs/{run_id}/events", adapter._handle_run_events)
app.router.add_post("/v1/runs/{run_id}/stop", adapter._handle_stop_run)
return app
@ -116,6 +118,13 @@ class TestStartRun:
assert data["status"] == "started"
assert data["run_id"].startswith("run_")
status_resp = await cli.get(f"/v1/runs/{data['run_id']}")
assert status_resp.status == 200
status = await status_resp.json()
assert status["run_id"] == data["run_id"]
assert status["status"] in {"queued", "running", "completed"}
assert status["object"] == "hermes.run"
@pytest.mark.asyncio
async def test_start_invalid_json_returns_400(self, adapter):
app = _create_runs_app(adapter)
@ -143,6 +152,18 @@ class TestStartRun:
resp = await cli.post("/v1/runs", json={"input": ""})
assert resp.status == 400
@pytest.mark.asyncio
async def test_start_invalid_history_does_not_allocate_run(self, adapter):
app = _create_runs_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/v1/runs",
json={"input": "hello", "conversation_history": {"role": "user"}},
)
assert resp.status == 400
assert adapter._run_streams == {}
assert adapter._run_statuses == {}
@pytest.mark.asyncio
async def test_start_requires_auth(self, auth_adapter):
app = _create_runs_app(auth_adapter)
@ -170,6 +191,89 @@ class TestStartRun:
assert resp.status == 202
# ---------------------------------------------------------------------------
# GET /v1/runs/{run_id} — poll run status
# ---------------------------------------------------------------------------
class TestRunStatus:
@pytest.mark.asyncio
async def test_status_completed_run_includes_output_and_usage(self, adapter):
app = _create_runs_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_create_agent") as mock_create:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "done"}
mock_agent.session_prompt_tokens = 4
mock_agent.session_completion_tokens = 2
mock_agent.session_total_tokens = 6
mock_create.return_value = mock_agent
resp = await cli.post("/v1/runs", json={"input": "hello"})
data = await resp.json()
run_id = data["run_id"]
for _ in range(20):
status_resp = await cli.get(f"/v1/runs/{run_id}")
assert status_resp.status == 200
status = await status_resp.json()
if status["status"] == "completed":
break
await asyncio.sleep(0.05)
assert status["status"] == "completed"
assert status["output"] == "done"
assert status["usage"]["total_tokens"] == 6
assert status["last_event"] == "run.completed"
@pytest.mark.asyncio
async def test_status_reflects_explicit_session_id(self, adapter):
app = _create_runs_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_create_agent") as mock_create:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "done"}
mock_agent.session_prompt_tokens = 0
mock_agent.session_completion_tokens = 0
mock_agent.session_total_tokens = 0
mock_create.return_value = mock_agent
resp = await cli.post(
"/v1/runs",
json={"input": "hello", "session_id": "space-session"},
)
data = await resp.json()
run_id = data["run_id"]
for _ in range(20):
status_resp = await cli.get(f"/v1/runs/{run_id}")
status = await status_resp.json()
if status["status"] == "completed":
break
await asyncio.sleep(0.05)
mock_agent.run_conversation.assert_called_once()
# task_id stays "default" so the Runs API shares one sandbox
# container with CLI/gateway; session_id is surfaced in status
# for external UIs to correlate runs with their own session IDs.
assert mock_agent.run_conversation.call_args.kwargs["task_id"] == "default"
assert status["session_id"] == "space-session"
@pytest.mark.asyncio
async def test_status_not_found_returns_404(self, adapter):
app = _create_runs_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.get("/v1/runs/run_nonexistent")
assert resp.status == 404
@pytest.mark.asyncio
async def test_status_requires_auth(self, auth_adapter):
app = _create_runs_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.get("/v1/runs/run_any")
assert resp.status == 401
# ---------------------------------------------------------------------------
# GET /v1/runs/{run_id}/events — SSE event stream
# ---------------------------------------------------------------------------
@ -257,6 +361,11 @@ class TestStopRun:
# Agent interrupt should have been called
mock_agent.interrupt.assert_called_once_with("Stop requested via API")
status_resp = await cli.get(f"/v1/runs/{run_id}")
assert status_resp.status == 200
status_data = await status_resp.json()
assert status_data["status"] in {"stopping", "cancelled"}
# Refs should be cleaned up
await asyncio.sleep(0.5)
assert run_id not in adapter._active_run_agents

View File

@ -194,6 +194,29 @@ Delete a stored response.
Lists the agent as an available model. The advertised model name defaults to the [profile](/docs/user-guide/profiles) name (or `hermes-agent` for the default profile). Required by most frontends for model discovery.
### GET /v1/capabilities
Returns a machine-readable description of the API server's stable surface for external UIs, orchestrators, and plugin bridges.
```json
{
"object": "hermes.api_server.capabilities",
"platform": "hermes-agent",
"model": "hermes-agent",
"auth": {"type": "bearer", "required": true},
"features": {
"chat_completions": true,
"responses_api": true,
"run_submission": true,
"run_status": true,
"run_events_sse": true,
"run_stop": true
}
}
```
Use this endpoint when integrating dashboards, browser UIs, or control planes so they can discover whether the running Hermes version supports runs, streaming, cancellation, and session continuity without depending on private Python internals.
### GET /health
Health check. Returns `{"status": "ok"}`. Also available at **GET /v1/health** for OpenAI-compatible clients that expect the `/v1/` prefix.
@ -210,10 +233,41 @@ In addition to `/v1/chat/completions` and `/v1/responses`, the server exposes a
Create a new agent run. Returns a `run_id` that can be used to subscribe to progress events.
```json
{
"run_id": "run_abc123",
"status": "started"
}
```
Runs accept a simple `input` string and optional `session_id`, `instructions`, `conversation_history`, or `previous_response_id`. When `session_id` is provided, Hermes surfaces it in the run status so external UIs can correlate runs with their own conversation IDs.
### GET /v1/runs/\{run_id\}
Poll the current run state. This is useful for dashboards that need status without holding an SSE connection open, or for UIs that reconnect after navigation.
```json
{
"object": "hermes.run",
"run_id": "run_abc123",
"status": "completed",
"session_id": "space-session",
"model": "hermes-agent",
"output": "Done.",
"usage": {"input_tokens": 50, "output_tokens": 200, "total_tokens": 250}
}
```
Statuses are retained briefly after terminal states (`completed`, `failed`, or `cancelled`) for polling and UI reconciliation.
### GET /v1/runs/\{run_id\}/events
Server-Sent Events stream of the run's tool-call progress, token deltas, and lifecycle events. Designed for dashboards and thick clients that want to attach/detach without losing state.
### POST /v1/runs/\{run_id\}/stop
Interrupt a running agent turn. The endpoint returns immediately with `{"status": "stopping"}` while Hermes asks the active agent to stop at the next safe interruption point.
## Jobs API (background scheduled work)
The server exposes a lightweight jobs CRUD surface for managing scheduled / background agent runs from a remote client. All endpoints are gated behind the same bearer auth.