fix(agent): preserve Codex message items for replay
This commit is contained in:
parent
2536a36f6f
commit
81e01f6ee9
@ -227,6 +227,23 @@ def _responses_tools(tools: Optional[List[Dict[str, Any]]] = None) -> Optional[L
|
||||
# Message format conversion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_RESPONSE_MESSAGE_STATUSES = {"completed", "incomplete", "in_progress"}
|
||||
|
||||
|
||||
def _normalize_responses_message_status(value: Any, *, default: str = "completed") -> str:
|
||||
"""Normalize a Responses assistant message status for replay.
|
||||
|
||||
The API accepts completed/incomplete/in_progress on replayed assistant
|
||||
output messages. Preserve those exactly (modulo case/hyphen spelling) so
|
||||
incomplete Codex continuation turns don't get falsely marked completed.
|
||||
"""
|
||||
if isinstance(value, str):
|
||||
status = value.strip().lower().replace("-", "_").replace(" ", "_")
|
||||
if status in _RESPONSE_MESSAGE_STATUSES:
|
||||
return status
|
||||
return default
|
||||
|
||||
|
||||
def _chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Convert internal chat-style messages to Responses input items."""
|
||||
items: List[Dict[str, Any]] = []
|
||||
@ -272,7 +289,57 @@ def _chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Di
|
||||
seen_item_ids.add(item_id)
|
||||
has_codex_reasoning = True
|
||||
|
||||
if content_parts:
|
||||
# Replay exact assistant message items (with id/phase) from
|
||||
# previous turns so the API can maintain prefix-cache hits.
|
||||
# OpenAI docs: "preserve and resend phase on all assistant
|
||||
# messages — dropping it can degrade performance."
|
||||
codex_message_items = msg.get("codex_message_items")
|
||||
replayed_message_items = 0
|
||||
if isinstance(codex_message_items, list):
|
||||
for raw_item in codex_message_items:
|
||||
if not isinstance(raw_item, dict):
|
||||
continue
|
||||
if raw_item.get("type") != "message" or raw_item.get("role") != "assistant":
|
||||
continue
|
||||
raw_content_parts = raw_item.get("content")
|
||||
if not isinstance(raw_content_parts, list):
|
||||
continue
|
||||
|
||||
normalized_content_parts = []
|
||||
for part in raw_content_parts:
|
||||
if not isinstance(part, dict):
|
||||
continue
|
||||
part_type = str(part.get("type") or "").strip()
|
||||
if part_type not in {"output_text", "text"}:
|
||||
continue
|
||||
text = part.get("text", "")
|
||||
if text is None:
|
||||
text = ""
|
||||
if not isinstance(text, str):
|
||||
text = str(text)
|
||||
normalized_content_parts.append({"type": "output_text", "text": text})
|
||||
|
||||
if not normalized_content_parts:
|
||||
continue
|
||||
|
||||
replay_item = {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": _normalize_responses_message_status(raw_item.get("status")),
|
||||
"content": normalized_content_parts,
|
||||
}
|
||||
item_id = raw_item.get("id")
|
||||
if isinstance(item_id, str) and item_id.strip():
|
||||
replay_item["id"] = item_id.strip()
|
||||
phase = raw_item.get("phase")
|
||||
if isinstance(phase, str) and phase.strip():
|
||||
replay_item["phase"] = phase.strip()
|
||||
items.append(replay_item)
|
||||
replayed_message_items += 1
|
||||
|
||||
if replayed_message_items > 0:
|
||||
pass
|
||||
elif content_parts:
|
||||
items.append({"role": "assistant", "content": content_parts})
|
||||
elif content_text.strip():
|
||||
items.append({"role": "assistant", "content": content_text})
|
||||
@ -432,6 +499,47 @@ def _preflight_codex_input_items(raw_items: Any) -> List[Dict[str, Any]]:
|
||||
normalized.append(reasoning_item)
|
||||
continue
|
||||
|
||||
if item_type == "message":
|
||||
role = item.get("role")
|
||||
if role != "assistant":
|
||||
raise ValueError(f"Codex Responses input[{idx}] message items must have role='assistant'.")
|
||||
content = item.get("content")
|
||||
if not isinstance(content, list):
|
||||
raise ValueError(f"Codex Responses input[{idx}] message item must have content list.")
|
||||
normalized_content = []
|
||||
for part_idx, part in enumerate(content):
|
||||
if not isinstance(part, dict):
|
||||
raise ValueError(
|
||||
f"Codex Responses input[{idx}] message content[{part_idx}] must be an object."
|
||||
)
|
||||
part_type = part.get("type")
|
||||
if part_type not in {"output_text", "text"}:
|
||||
raise ValueError(
|
||||
f"Codex Responses input[{idx}] message content[{part_idx}] has unsupported type {part_type!r}."
|
||||
)
|
||||
text = part.get("text", "")
|
||||
if text is None:
|
||||
text = ""
|
||||
if not isinstance(text, str):
|
||||
text = str(text)
|
||||
normalized_content.append({"type": "output_text", "text": text})
|
||||
if not normalized_content:
|
||||
raise ValueError(f"Codex Responses input[{idx}] message item must contain at least one text part.")
|
||||
normalized_item: Dict[str, Any] = {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": _normalize_responses_message_status(item.get("status")),
|
||||
"content": normalized_content,
|
||||
}
|
||||
item_id = item.get("id")
|
||||
if isinstance(item_id, str) and item_id.strip():
|
||||
normalized_item["id"] = item_id.strip()
|
||||
phase = item.get("phase")
|
||||
if isinstance(phase, str) and phase.strip():
|
||||
normalized_item["phase"] = phase.strip()
|
||||
normalized.append(normalized_item)
|
||||
continue
|
||||
|
||||
role = item.get("role")
|
||||
if role in {"user", "assistant"}:
|
||||
content = item.get("content", "")
|
||||
@ -716,6 +824,7 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]:
|
||||
content_parts: List[str] = []
|
||||
reasoning_parts: List[str] = []
|
||||
reasoning_items_raw: List[Dict[str, Any]] = []
|
||||
message_items_raw: List[Dict[str, Any]] = []
|
||||
tool_calls: List[Any] = []
|
||||
has_incomplete_items = response_status in {"queued", "in_progress", "incomplete"}
|
||||
saw_commentary_phase = False
|
||||
@ -734,6 +843,7 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]:
|
||||
|
||||
if item_type == "message":
|
||||
item_phase = getattr(item, "phase", None)
|
||||
normalized_phase = None
|
||||
if isinstance(item_phase, str):
|
||||
normalized_phase = item_phase.strip().lower()
|
||||
if normalized_phase in {"commentary", "analysis"}:
|
||||
@ -743,6 +853,18 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]:
|
||||
message_text = _extract_responses_message_text(item)
|
||||
if message_text:
|
||||
content_parts.append(message_text)
|
||||
raw_message_item: Dict[str, Any] = {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": _normalize_responses_message_status(item_status),
|
||||
"content": [{"type": "output_text", "text": message_text}],
|
||||
}
|
||||
item_id = getattr(item, "id", None)
|
||||
if isinstance(item_id, str) and item_id:
|
||||
raw_message_item["id"] = item_id
|
||||
if normalized_phase:
|
||||
raw_message_item["phase"] = normalized_phase
|
||||
message_items_raw.append(raw_message_item)
|
||||
elif item_type == "reasoning":
|
||||
reasoning_text = _extract_responses_reasoning_text(item)
|
||||
if reasoning_text:
|
||||
@ -855,6 +977,7 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]:
|
||||
reasoning_content=None,
|
||||
reasoning_details=None,
|
||||
codex_reasoning_items=reasoning_items_raw or None,
|
||||
codex_message_items=message_items_raw or None,
|
||||
)
|
||||
|
||||
if tool_calls:
|
||||
|
||||
@ -23,9 +23,14 @@ def get_transport(api_mode: str):
|
||||
This allows gradual migration — call sites can check for None
|
||||
and fall back to the legacy code path.
|
||||
"""
|
||||
if not _REGISTRY:
|
||||
_discover_transports()
|
||||
cls = _REGISTRY.get(api_mode)
|
||||
if cls is None:
|
||||
# The registry can be partially populated when a specific transport
|
||||
# module was imported directly (for example chat_completions before
|
||||
# codex). Discover on misses, not only when the registry is empty, so
|
||||
# test/order-dependent imports do not make valid api_modes unavailable.
|
||||
_discover_transports()
|
||||
cls = _REGISTRY.get(api_mode)
|
||||
if cls is None:
|
||||
return None
|
||||
return cls()
|
||||
|
||||
@ -31,15 +31,15 @@ class ChatCompletionsTransport(ProviderTransport):
|
||||
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> List[Dict[str, Any]]:
|
||||
"""Messages are already in OpenAI format — sanitize Codex leaks only.
|
||||
|
||||
Strips Codex Responses API fields (``codex_reasoning_items`` on the
|
||||
message, ``call_id``/``response_item_id`` on tool_calls) that strict
|
||||
chat-completions providers reject with 400/422.
|
||||
Strips Codex Responses API fields (``codex_reasoning_items`` /
|
||||
``codex_message_items`` on the message, ``call_id``/``response_item_id``
|
||||
on tool_calls) that strict chat-completions providers reject with 400/422.
|
||||
"""
|
||||
needs_sanitize = False
|
||||
for msg in messages:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if "codex_reasoning_items" in msg:
|
||||
if "codex_reasoning_items" in msg or "codex_message_items" in msg:
|
||||
needs_sanitize = True
|
||||
break
|
||||
tool_calls = msg.get("tool_calls")
|
||||
@ -59,6 +59,7 @@ class ChatCompletionsTransport(ProviderTransport):
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
msg.pop("codex_reasoning_items", None)
|
||||
msg.pop("codex_message_items", None)
|
||||
tool_calls = msg.get("tool_calls")
|
||||
if isinstance(tool_calls, list):
|
||||
for tc in tool_calls:
|
||||
|
||||
@ -120,6 +120,24 @@ class ResponsesApiTransport(ProviderTransport):
|
||||
if request_overrides:
|
||||
kwargs.update(request_overrides)
|
||||
|
||||
if is_codex_backend:
|
||||
prompt_cache_key = kwargs.get("prompt_cache_key")
|
||||
cache_scope_id = str(prompt_cache_key or session_id or "").strip()
|
||||
if cache_scope_id:
|
||||
existing_extra_headers = kwargs.get("extra_headers")
|
||||
merged_extra_headers: Dict[str, str] = {}
|
||||
if isinstance(existing_extra_headers, dict):
|
||||
merged_extra_headers.update(
|
||||
{
|
||||
str(key): str(value)
|
||||
for key, value in existing_extra_headers.items()
|
||||
if key and value is not None
|
||||
}
|
||||
)
|
||||
merged_extra_headers["session_id"] = cache_scope_id
|
||||
merged_extra_headers["x-client-request-id"] = cache_scope_id
|
||||
kwargs["extra_headers"] = merged_extra_headers
|
||||
|
||||
max_tokens = params.get("max_tokens")
|
||||
if max_tokens is not None and not is_codex_backend:
|
||||
kwargs["max_output_tokens"] = max_tokens
|
||||
@ -160,6 +178,8 @@ class ResponsesApiTransport(ProviderTransport):
|
||||
provider_data = {}
|
||||
if msg and hasattr(msg, "codex_reasoning_items") and msg.codex_reasoning_items:
|
||||
provider_data["codex_reasoning_items"] = msg.codex_reasoning_items
|
||||
if msg and hasattr(msg, "codex_message_items") and msg.codex_message_items:
|
||||
provider_data["codex_message_items"] = msg.codex_message_items
|
||||
if msg and hasattr(msg, "reasoning_details") and msg.reasoning_details:
|
||||
provider_data["reasoning_details"] = msg.reasoning_details
|
||||
|
||||
|
||||
@ -97,7 +97,7 @@ class NormalizedResponse:
|
||||
Response-level ``provider_data`` examples:
|
||||
|
||||
* Anthropic: ``{"reasoning_details": [...]}``
|
||||
* Codex: ``{"codex_reasoning_items": [...]}``
|
||||
* Codex: ``{"codex_reasoning_items": [...], "codex_message_items": [...]}``
|
||||
* Others: ``None``
|
||||
"""
|
||||
|
||||
@ -126,6 +126,11 @@ class NormalizedResponse:
|
||||
pd = self.provider_data or {}
|
||||
return pd.get("codex_reasoning_items")
|
||||
|
||||
@property
|
||||
def codex_message_items(self):
|
||||
pd = self.provider_data or {}
|
||||
return pd.get("codex_message_items")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Factory helpers
|
||||
|
||||
@ -1232,6 +1232,7 @@ class SessionStore:
|
||||
reasoning_content=message.get("reasoning_content") if message.get("role") == "assistant" else None,
|
||||
reasoning_details=message.get("reasoning_details") if message.get("role") == "assistant" else None,
|
||||
codex_reasoning_items=message.get("codex_reasoning_items") if message.get("role") == "assistant" else None,
|
||||
codex_message_items=message.get("codex_message_items") if message.get("role") == "assistant" else None,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Session DB operation failed: %s", e)
|
||||
@ -1264,6 +1265,7 @@ class SessionStore:
|
||||
reasoning_content=msg.get("reasoning_content") if role == "assistant" else None,
|
||||
reasoning_details=msg.get("reasoning_details") if role == "assistant" else None,
|
||||
codex_reasoning_items=msg.get("codex_reasoning_items") if role == "assistant" else None,
|
||||
codex_message_items=msg.get("codex_message_items") if role == "assistant" else None,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to rewrite transcript in DB: %s", e)
|
||||
|
||||
@ -31,7 +31,7 @@ T = TypeVar("T")
|
||||
|
||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||
|
||||
SCHEMA_VERSION = 8
|
||||
SCHEMA_VERSION = 9
|
||||
|
||||
SCHEMA_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
@ -83,7 +83,8 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||
reasoning TEXT,
|
||||
reasoning_content TEXT,
|
||||
reasoning_details TEXT,
|
||||
codex_reasoning_items TEXT
|
||||
codex_reasoning_items TEXT,
|
||||
codex_message_items TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS state_meta (
|
||||
@ -356,6 +357,15 @@ class SessionDB:
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
cursor.execute("UPDATE schema_version SET version = 8")
|
||||
if current_version < 9:
|
||||
# v9: preserve replayable Codex assistant message ids/phases so
|
||||
# follow-up turns can rebuild Responses API message items instead
|
||||
# of flattening everything to plain assistant text.
|
||||
try:
|
||||
cursor.execute('ALTER TABLE messages ADD COLUMN "codex_message_items" TEXT')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
cursor.execute("UPDATE schema_version SET version = 9")
|
||||
|
||||
# Unique title index — always ensure it exists (safe to run after migrations
|
||||
# since the title column is guaranteed to exist at this point)
|
||||
@ -956,6 +966,7 @@ class SessionDB:
|
||||
reasoning_content: str = None,
|
||||
reasoning_details: Any = None,
|
||||
codex_reasoning_items: Any = None,
|
||||
codex_message_items: Any = None,
|
||||
) -> int:
|
||||
"""
|
||||
Append a message to a session. Returns the message row ID.
|
||||
@ -972,6 +983,10 @@ class SessionDB:
|
||||
json.dumps(codex_reasoning_items)
|
||||
if codex_reasoning_items else None
|
||||
)
|
||||
codex_message_items_json = (
|
||||
json.dumps(codex_message_items)
|
||||
if codex_message_items else None
|
||||
)
|
||||
tool_calls_json = json.dumps(tool_calls) if tool_calls else None
|
||||
|
||||
# Pre-compute tool call count
|
||||
@ -983,8 +998,9 @@ class SessionDB:
|
||||
cursor = conn.execute(
|
||||
"""INSERT INTO messages (session_id, role, content, tool_call_id,
|
||||
tool_calls, tool_name, timestamp, token_count, finish_reason,
|
||||
reasoning, reasoning_content, reasoning_details, codex_reasoning_items)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
reasoning, reasoning_content, reasoning_details, codex_reasoning_items,
|
||||
codex_message_items)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
session_id,
|
||||
role,
|
||||
@ -999,6 +1015,7 @@ class SessionDB:
|
||||
reasoning_content,
|
||||
reasoning_details_json,
|
||||
codex_items_json,
|
||||
codex_message_items_json,
|
||||
),
|
||||
)
|
||||
msg_id = cursor.lastrowid
|
||||
@ -1112,7 +1129,8 @@ class SessionDB:
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT role, content, tool_call_id, tool_calls, tool_name, "
|
||||
"reasoning, reasoning_content, reasoning_details, codex_reasoning_items "
|
||||
"reasoning, reasoning_content, reasoning_details, codex_reasoning_items, "
|
||||
"codex_message_items "
|
||||
"FROM messages WHERE session_id = ? ORDER BY timestamp, id",
|
||||
(session_id,),
|
||||
)
|
||||
@ -1150,6 +1168,12 @@ class SessionDB:
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning("Failed to deserialize codex_reasoning_items, falling back to None")
|
||||
msg["codex_reasoning_items"] = None
|
||||
if row["codex_message_items"]:
|
||||
try:
|
||||
msg["codex_message_items"] = json.loads(row["codex_message_items"])
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning("Failed to deserialize codex_message_items, falling back to None")
|
||||
msg["codex_message_items"] = None
|
||||
messages.append(msg)
|
||||
return messages
|
||||
|
||||
|
||||
27
run_agent.py
27
run_agent.py
@ -3313,6 +3313,7 @@ class AIAgent:
|
||||
reasoning_content=msg.get("reasoning_content") if role == "assistant" else None,
|
||||
reasoning_details=msg.get("reasoning_details") if role == "assistant" else None,
|
||||
codex_reasoning_items=msg.get("codex_reasoning_items") if role == "assistant" else None,
|
||||
codex_message_items=msg.get("codex_message_items") if role == "assistant" else None,
|
||||
)
|
||||
self._last_flushed_db_idx = len(messages)
|
||||
except Exception as e:
|
||||
@ -7669,6 +7670,13 @@ class AIAgent:
|
||||
if codex_items:
|
||||
msg["codex_reasoning_items"] = codex_items
|
||||
|
||||
# Codex Responses API: preserve exact assistant message items (with
|
||||
# id/phase) so follow-up turns can replay structured items instead of
|
||||
# flattening to plain text. This is required for prefix cache hits.
|
||||
codex_message_items = getattr(assistant_message, "codex_message_items", None)
|
||||
if codex_message_items:
|
||||
msg["codex_message_items"] = codex_message_items
|
||||
|
||||
if assistant_message.tool_calls:
|
||||
tool_calls = []
|
||||
for tool_call in assistant_message.tool_calls:
|
||||
@ -11549,16 +11557,26 @@ class AIAgent:
|
||||
interim_has_content = bool((interim_msg.get("content") or "").strip())
|
||||
interim_has_reasoning = bool(interim_msg.get("reasoning", "").strip()) if isinstance(interim_msg.get("reasoning"), str) else False
|
||||
interim_has_codex_reasoning = bool(interim_msg.get("codex_reasoning_items"))
|
||||
interim_has_codex_message_items = bool(interim_msg.get("codex_message_items"))
|
||||
|
||||
if interim_has_content or interim_has_reasoning or interim_has_codex_reasoning:
|
||||
if (
|
||||
interim_has_content
|
||||
or interim_has_reasoning
|
||||
or interim_has_codex_reasoning
|
||||
or interim_has_codex_message_items
|
||||
):
|
||||
last_msg = messages[-1] if messages else None
|
||||
# Duplicate detection: two consecutive incomplete assistant
|
||||
# messages with identical content AND reasoning are collapsed.
|
||||
# For reasoning-only messages (codex_reasoning_items differ but
|
||||
# visible content/reasoning are both empty), we also compare
|
||||
# the encrypted items to avoid silently dropping new state.
|
||||
# For provider-state-only changes (encrypted reasoning
|
||||
# items or replayable message ids/phases/statuses differ
|
||||
# while visible content/reasoning are unchanged), compare
|
||||
# those opaque payloads too so we don't silently drop the
|
||||
# newer continuation state.
|
||||
last_codex_items = last_msg.get("codex_reasoning_items") if isinstance(last_msg, dict) else None
|
||||
interim_codex_items = interim_msg.get("codex_reasoning_items")
|
||||
last_codex_message_items = last_msg.get("codex_message_items") if isinstance(last_msg, dict) else None
|
||||
interim_codex_message_items = interim_msg.get("codex_message_items")
|
||||
duplicate_interim = (
|
||||
isinstance(last_msg, dict)
|
||||
and last_msg.get("role") == "assistant"
|
||||
@ -11566,6 +11584,7 @@ class AIAgent:
|
||||
and (last_msg.get("content") or "") == (interim_msg.get("content") or "")
|
||||
and (last_msg.get("reasoning") or "") == (interim_msg.get("reasoning") or "")
|
||||
and last_codex_items == interim_codex_items
|
||||
and last_codex_message_items == interim_codex_message_items
|
||||
)
|
||||
if not duplicate_interim:
|
||||
messages.append(interim_msg)
|
||||
|
||||
@ -33,15 +33,18 @@ class TestChatCompletionsBasic:
|
||||
def test_convert_messages_strips_codex_fields(self, transport):
|
||||
msgs = [
|
||||
{"role": "assistant", "content": "ok", "codex_reasoning_items": [{"id": "rs_1"}],
|
||||
"codex_message_items": [{"id": "msg_1", "type": "message"}],
|
||||
"tool_calls": [{"id": "call_1", "call_id": "call_1", "response_item_id": "fc_1",
|
||||
"type": "function", "function": {"name": "t", "arguments": "{}"}}]},
|
||||
]
|
||||
result = transport.convert_messages(msgs)
|
||||
assert "codex_reasoning_items" not in result[0]
|
||||
assert "codex_message_items" not in result[0]
|
||||
assert "call_id" not in result[0]["tool_calls"][0]
|
||||
assert "response_item_id" not in result[0]["tool_calls"][0]
|
||||
# Original list untouched (deepcopy-on-demand)
|
||||
assert "codex_reasoning_items" in msgs[0]
|
||||
assert "codex_message_items" in msgs[0]
|
||||
|
||||
|
||||
class TestChatCompletionsBuildKwargs:
|
||||
|
||||
@ -194,6 +194,36 @@ class TestCodexNormalizeResponse:
|
||||
assert nr.content == "Hello world"
|
||||
assert nr.finish_reason == "stop"
|
||||
|
||||
def test_message_items_preserved_in_provider_data(self, transport):
|
||||
"""Codex assistant message item ids/phases must survive transport normalization."""
|
||||
r = SimpleNamespace(
|
||||
output=[
|
||||
SimpleNamespace(
|
||||
type="message",
|
||||
role="assistant",
|
||||
id="msg_abc",
|
||||
phase="final_answer",
|
||||
content=[SimpleNamespace(type="output_text", text="Hello world")],
|
||||
status="completed",
|
||||
),
|
||||
],
|
||||
status="completed",
|
||||
incomplete_details=None,
|
||||
usage=SimpleNamespace(input_tokens=10, output_tokens=5,
|
||||
input_tokens_details=None, output_tokens_details=None),
|
||||
)
|
||||
nr = transport.normalize_response(r)
|
||||
assert nr.codex_message_items == [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": "completed",
|
||||
"content": [{"type": "output_text", "text": "Hello world"}],
|
||||
"id": "msg_abc",
|
||||
"phase": "final_answer",
|
||||
}
|
||||
]
|
||||
|
||||
def test_tool_call_response(self, transport):
|
||||
"""Normalize a Codex response with tool calls."""
|
||||
r = SimpleNamespace(
|
||||
|
||||
@ -60,6 +60,13 @@ class TestTransportRegistry:
|
||||
assert t is not None
|
||||
assert t.api_mode == "anthropic_messages"
|
||||
|
||||
def test_discovers_missing_transport_when_registry_partially_populated(self):
|
||||
"""Importing one transport directly must not hide other valid api_modes."""
|
||||
import agent.transports.chat_completions # noqa: F401
|
||||
t = get_transport("codex_responses")
|
||||
assert t is not None
|
||||
assert t.api_mode == "codex_responses"
|
||||
|
||||
def test_register_and_get(self):
|
||||
class DummyTransport(ProviderTransport):
|
||||
@property
|
||||
|
||||
@ -270,3 +270,15 @@ class TestNormalizedResponseBackwardCompat:
|
||||
def test_codex_reasoning_items_none_when_absent(self):
|
||||
nr = NormalizedResponse(content="hi", tool_calls=None, finish_reason="stop")
|
||||
assert nr.codex_reasoning_items is None
|
||||
|
||||
def test_codex_message_items_from_provider_data(self):
|
||||
items = [{"id": "msg_1", "type": "message"}]
|
||||
nr = NormalizedResponse(
|
||||
content="hi", tool_calls=None, finish_reason="stop",
|
||||
provider_data={"codex_message_items": items},
|
||||
)
|
||||
assert nr.codex_message_items == items
|
||||
|
||||
def test_codex_message_items_none_when_absent(self):
|
||||
nr = NormalizedResponse(content="hi", tool_calls=None, finish_reason="stop")
|
||||
assert nr.codex_message_items is None
|
||||
|
||||
@ -716,6 +716,103 @@ class TestNormalizeCodexResponse:
|
||||
assert len(msg.tool_calls) == 1
|
||||
assert msg.tool_calls[0].function.name == "web_search"
|
||||
|
||||
def test_message_items_captured_with_id_and_phase(self, monkeypatch):
|
||||
"""Exact message items (with id/phase) must be captured for cache replay."""
|
||||
agent = self._make_codex_agent(monkeypatch)
|
||||
response = SimpleNamespace(
|
||||
output=[
|
||||
SimpleNamespace(
|
||||
type="message", status="completed", id="msg_abc",
|
||||
phase="commentary",
|
||||
content=[SimpleNamespace(type="output_text", text="Thinking...")],
|
||||
),
|
||||
SimpleNamespace(
|
||||
type="message", status="completed", id="msg_def",
|
||||
phase="final_answer",
|
||||
content=[SimpleNamespace(type="output_text", text="Done!")],
|
||||
),
|
||||
],
|
||||
status="completed",
|
||||
)
|
||||
msg, reason = _normalize_codex_response(response)
|
||||
assert msg.codex_message_items is not None
|
||||
assert len(msg.codex_message_items) == 2
|
||||
assert msg.codex_message_items[0]["id"] == "msg_abc"
|
||||
assert msg.codex_message_items[0]["phase"] == "commentary"
|
||||
assert msg.codex_message_items[0]["content"][0]["text"] == "Thinking..."
|
||||
assert msg.codex_message_items[1]["id"] == "msg_def"
|
||||
assert msg.codex_message_items[1]["phase"] == "final_answer"
|
||||
assert msg.codex_message_items[1]["content"][0]["text"] == "Done!"
|
||||
|
||||
def test_message_items_none_when_no_messages(self, monkeypatch):
|
||||
"""Only reasoning + tool calls should yield None codex_message_items."""
|
||||
agent = self._make_codex_agent(monkeypatch)
|
||||
response = SimpleNamespace(
|
||||
output=[
|
||||
SimpleNamespace(type="function_call", status="completed",
|
||||
call_id="call_1", name="web_search", arguments='{}', id="fc_1"),
|
||||
],
|
||||
status="completed",
|
||||
)
|
||||
msg, reason = _normalize_codex_response(response)
|
||||
assert msg.codex_message_items is None
|
||||
|
||||
|
||||
class TestChatMessagesToResponsesInputMessageItems:
|
||||
"""Verify codex_message_items are replayed verbatim instead of reconstructed."""
|
||||
|
||||
def test_replays_exact_message_items(self, monkeypatch):
|
||||
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
|
||||
base_url="https://chatgpt.com/backend-api/codex")
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "Hello world",
|
||||
"codex_message_items": [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": "completed",
|
||||
"id": "msg_123",
|
||||
"phase": "final_answer",
|
||||
"content": [{"type": "output_text", "text": "Hello world"}],
|
||||
},
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "follow up"},
|
||||
]
|
||||
items = _chat_messages_to_responses_input(messages)
|
||||
msg_items = [i for i in items if i.get("type") == "message"]
|
||||
assert len(msg_items) == 1
|
||||
assert msg_items[0]["id"] == "msg_123"
|
||||
assert msg_items[0]["phase"] == "final_answer"
|
||||
assert msg_items[0]["content"][0]["text"] == "Hello world"
|
||||
|
||||
def test_fallback_to_plain_when_no_message_items(self, monkeypatch):
|
||||
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
|
||||
base_url="https://chatgpt.com/backend-api/codex")
|
||||
messages = [{"role": "assistant", "content": "Hello world"}]
|
||||
items = _chat_messages_to_responses_input(messages)
|
||||
assert items == [{"role": "assistant", "content": "Hello world"}]
|
||||
|
||||
def test_skips_invalid_message_items(self, monkeypatch):
|
||||
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
|
||||
base_url="https://chatgpt.com/backend-api/codex")
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "fallback text",
|
||||
"codex_message_items": [
|
||||
{"type": "function_call", "role": "assistant"}, # wrong type
|
||||
{"type": "message", "role": "user"}, # wrong role
|
||||
{"type": "message", "role": "assistant", "content": "not a list"},
|
||||
],
|
||||
},
|
||||
]
|
||||
items = _chat_messages_to_responses_input(messages)
|
||||
# All invalid — falls back to plain text reconstruction
|
||||
assert items == [{"role": "assistant", "content": "fallback text"}]
|
||||
|
||||
|
||||
# ── Chat completions response handling (OpenRouter/Nous) ─────────────────────
|
||||
|
||||
|
||||
@ -943,6 +943,33 @@ def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(mo
|
||||
assert "inspect the repository" in (assistant_message.content or "")
|
||||
|
||||
|
||||
def test_normalize_codex_response_preserves_message_status_for_replay(monkeypatch):
|
||||
"""Incomplete Codex output messages must not be replayed as completed."""
|
||||
agent = _build_agent(monkeypatch)
|
||||
from agent.codex_responses_adapter import _normalize_codex_response
|
||||
|
||||
response = SimpleNamespace(
|
||||
output=[
|
||||
SimpleNamespace(
|
||||
type="message",
|
||||
id="msg_partial",
|
||||
phase="commentary",
|
||||
status="in_progress",
|
||||
content=[SimpleNamespace(type="output_text", text="Still working...")],
|
||||
)
|
||||
],
|
||||
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
|
||||
status="in_progress",
|
||||
model="gpt-5-codex",
|
||||
)
|
||||
|
||||
assistant_message, finish_reason = _normalize_codex_response(response)
|
||||
|
||||
assert finish_reason == "incomplete"
|
||||
assert assistant_message.codex_message_items[0]["id"] == "msg_partial"
|
||||
assert assistant_message.codex_message_items[0]["status"] == "in_progress"
|
||||
|
||||
|
||||
def test_normalize_codex_response_detects_leaked_tool_call_text(monkeypatch):
|
||||
"""Harmony-style `to=functions.foo` leaked into assistant content with no
|
||||
structured function_call items must be treated as incomplete so the
|
||||
@ -1403,6 +1430,44 @@ def test_chat_messages_to_responses_input_reasoning_only_has_following_item(monk
|
||||
assert following.get("role") == "assistant"
|
||||
|
||||
|
||||
def test_codex_message_item_status_survives_conversion_and_preflight(monkeypatch):
|
||||
"""Stored Codex assistant message statuses must survive replay normalization."""
|
||||
agent = _build_agent(monkeypatch)
|
||||
from agent.codex_responses_adapter import (
|
||||
_chat_messages_to_responses_input,
|
||||
_preflight_codex_input_items,
|
||||
)
|
||||
|
||||
items = _chat_messages_to_responses_input([
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "partial",
|
||||
"codex_message_items": [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": "incomplete",
|
||||
"id": "msg_incomplete",
|
||||
"phase": "commentary",
|
||||
"content": [{"type": "output_text", "text": "partial"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
])
|
||||
replay_item = next(item for item in items if item.get("type") == "message")
|
||||
assert replay_item["status"] == "incomplete"
|
||||
|
||||
normalized = _preflight_codex_input_items([
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": "in_progress",
|
||||
"content": [{"type": "output_text", "text": "working"}],
|
||||
}
|
||||
])
|
||||
assert normalized[0]["status"] == "in_progress"
|
||||
|
||||
|
||||
def test_duplicate_detection_distinguishes_different_codex_reasoning(monkeypatch):
|
||||
"""Two consecutive reasoning-only responses with different encrypted content
|
||||
must NOT be treated as duplicates."""
|
||||
@ -1453,6 +1518,58 @@ def test_duplicate_detection_distinguishes_different_codex_reasoning(monkeypatch
|
||||
assert "enc_second" in encrypted_contents
|
||||
|
||||
|
||||
def test_duplicate_detection_distinguishes_different_codex_message_items(monkeypatch):
|
||||
"""Incomplete turns with new message ids/phases/statuses must not be collapsed."""
|
||||
agent = _build_agent(monkeypatch)
|
||||
responses = [
|
||||
SimpleNamespace(
|
||||
output=[
|
||||
SimpleNamespace(
|
||||
type="message",
|
||||
id="msg_first",
|
||||
phase="commentary",
|
||||
status="in_progress",
|
||||
content=[SimpleNamespace(type="output_text", text="Still working...")],
|
||||
)
|
||||
],
|
||||
usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60),
|
||||
status="in_progress",
|
||||
model="gpt-5-codex",
|
||||
),
|
||||
SimpleNamespace(
|
||||
output=[
|
||||
SimpleNamespace(
|
||||
type="message",
|
||||
id="msg_second",
|
||||
phase="commentary",
|
||||
status="in_progress",
|
||||
content=[SimpleNamespace(type="output_text", text="Still working...")],
|
||||
)
|
||||
],
|
||||
usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60),
|
||||
status="in_progress",
|
||||
model="gpt-5-codex",
|
||||
),
|
||||
_codex_message_response("Final answer after progress updates."),
|
||||
]
|
||||
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
|
||||
|
||||
result = agent.run_conversation("keep going")
|
||||
|
||||
assert result["completed"] is True
|
||||
interim_msgs = [
|
||||
msg for msg in result["messages"]
|
||||
if msg.get("role") == "assistant"
|
||||
and msg.get("finish_reason") == "incomplete"
|
||||
]
|
||||
assert len(interim_msgs) == 2
|
||||
assert [msg["codex_message_items"][0]["id"] for msg in interim_msgs] == [
|
||||
"msg_first",
|
||||
"msg_second",
|
||||
]
|
||||
assert all(msg["codex_message_items"][0]["status"] == "in_progress" for msg in interim_msgs)
|
||||
|
||||
|
||||
def test_chat_messages_to_responses_input_deduplicates_reasoning_ids(monkeypatch):
|
||||
"""Duplicate reasoning item IDs across multi-turn incomplete responses
|
||||
must be deduplicated so the Responses API doesn't reject with HTTP 400."""
|
||||
|
||||
@ -308,6 +308,33 @@ class TestMessageStorage:
|
||||
assert "reasoning_content" in conv[0]
|
||||
assert conv[0]["reasoning_content"] == ""
|
||||
|
||||
def test_codex_message_items_persisted_and_restored(self, db):
|
||||
"""codex_message_items must round-trip through JSON serialization."""
|
||||
db.create_session(session_id="s1", source="cli")
|
||||
items = [
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": "completed",
|
||||
"id": "msg_123",
|
||||
"phase": "commentary",
|
||||
"content": [{"type": "output_text", "text": "Thinking..."}],
|
||||
},
|
||||
{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"status": "completed",
|
||||
"id": "msg_456",
|
||||
"phase": "final_answer",
|
||||
"content": [{"type": "output_text", "text": "Done!"}],
|
||||
},
|
||||
]
|
||||
db.append_message("s1", role="assistant", content="Done!", codex_message_items=items)
|
||||
|
||||
conv = db.get_messages_as_conversation("s1")
|
||||
assert len(conv) == 1
|
||||
assert conv[0].get("codex_message_items") == items
|
||||
|
||||
def test_reasoning_not_set_for_non_assistant(self, db):
|
||||
"""reasoning is never leaked onto user or tool messages."""
|
||||
db.create_session(session_id="s1", source="telegram")
|
||||
@ -1173,7 +1200,7 @@ class TestSchemaInit:
|
||||
def test_schema_version(self, db):
|
||||
cursor = db._conn.execute("SELECT version FROM schema_version")
|
||||
version = cursor.fetchone()[0]
|
||||
assert version == 8
|
||||
assert version == 9
|
||||
|
||||
def test_title_column_exists(self, db):
|
||||
"""Verify the title column was created in the sessions table."""
|
||||
@ -1229,12 +1256,12 @@ class TestSchemaInit:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Open with SessionDB — should migrate to v8
|
||||
# Open with SessionDB — should migrate to v9
|
||||
migrated_db = SessionDB(db_path=db_path)
|
||||
|
||||
# Verify migration
|
||||
cursor = migrated_db._conn.execute("SELECT version FROM schema_version")
|
||||
assert cursor.fetchone()[0] == 8
|
||||
assert cursor.fetchone()[0] == 9
|
||||
|
||||
# Verify title column exists and is NULL for existing sessions
|
||||
session = migrated_db.get_session("existing")
|
||||
|
||||
@ -82,8 +82,10 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||
token_count INTEGER,
|
||||
finish_reason TEXT,
|
||||
reasoning TEXT,
|
||||
reasoning_content TEXT,
|
||||
reasoning_details TEXT,
|
||||
codex_reasoning_items TEXT
|
||||
codex_reasoning_items TEXT,
|
||||
codex_message_items TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
|
||||
@ -91,7 +93,7 @@ CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestam
|
||||
|
||||
Notes:
|
||||
- `tool_calls` is stored as a JSON string (serialized list of tool call objects)
|
||||
- `reasoning_details` and `codex_reasoning_items` are stored as JSON strings
|
||||
- `reasoning_details`, `codex_reasoning_items`, and `codex_message_items` are stored as JSON strings
|
||||
- `reasoning` stores the raw reasoning text for providers that expose it
|
||||
- Timestamps are Unix epoch floats (`time.time()`)
|
||||
|
||||
@ -128,7 +130,7 @@ END;
|
||||
|
||||
## Schema Version and Migrations
|
||||
|
||||
Current schema version: **6**
|
||||
Current schema version: **9**
|
||||
|
||||
The `schema_version` table stores a single integer. On initialization,
|
||||
`_init_schema()` checks the current version and applies migrations sequentially:
|
||||
@ -141,6 +143,9 @@ The `schema_version` table stores a single integer. On initialization,
|
||||
| 4 | Add unique index on `title` (NULLs allowed, non-NULL must be unique) |
|
||||
| 5 | Add billing columns: `cache_read_tokens`, `cache_write_tokens`, `reasoning_tokens`, `billing_provider`, `billing_base_url`, `billing_mode`, `estimated_cost_usd`, `actual_cost_usd`, `cost_status`, `cost_source`, `pricing_version` |
|
||||
| 6 | Add reasoning columns to messages: `reasoning`, `reasoning_details`, `codex_reasoning_items` |
|
||||
| 7 | Add `reasoning_content` column to messages |
|
||||
| 8 | Add `api_call_count` column to sessions |
|
||||
| 9 | Add `codex_message_items` column to messages for Codex Responses message id/phase replay |
|
||||
|
||||
Each migration uses `ALTER TABLE ADD COLUMN` wrapped in try/except to handle
|
||||
the column-already-exists case (idempotent). The version number is bumped after
|
||||
|
||||
Loading…
Reference in New Issue
Block a user