feat(irc): add interactive setup

feat(gateway): refine Platform._missing_ and platform-connected dispatch

Restricts plugin-name acceptance to bundled plugin scan + registry
(no arbitrary string -> enum-pollution), pulls per-platform connectivity
checks into a _PLATFORM_CONNECTED_CHECKERS lambda map with a clean
_is_platform_connected method, and adds tests covering the checker map,
plugin platform interface, and IRC setup wizard.
This commit is contained in:
Ari Lotter 2026-04-20 18:52:15 -04:00 committed by Teknium
parent 6e42daf7dd
commit 868bc1c242
38 changed files with 2191 additions and 189 deletions

View File

@ -13,7 +13,7 @@ import os
import json
from pathlib import Path
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from hermes_cli.config import get_hermes_home
@ -45,6 +45,11 @@ def _normalize_unauthorized_dm_behavior(value: Any, default: str = "pair") -> st
return default
# Module-level cache for bundled platform plugin names (lives outside the
# enum so it doesn't become an accidental enum member).
_Platform__bundled_plugin_names: Optional[set] = None
class Platform(Enum):
"""Supported messaging platforms.
@ -76,10 +81,11 @@ class Platform(Enum):
YUANBAO = "yuanbao"
@classmethod
def _missing_(cls, value):
"""Accept unknown platform names for plugin-registered adapters.
"""Accept unknown platform names only for known plugin adapters.
Creates a pseudo-member cached in ``_value2member_map_`` so that
``Platform("irc") is Platform("irc")`` holds True (identity-stable).
Arbitrary strings are rejected to prevent enum pollution.
"""
if not isinstance(value, str) or not value.strip():
return None
@ -88,14 +94,62 @@ class Platform(Enum):
# Check cache first (another call may have created it already)
if value in cls._value2member_map_:
return cls._value2member_map_[value]
# Only create pseudo-members for bundled plugin platforms (discovered
# via filesystem scan) or runtime-registered plugin platforms.
global _Platform__bundled_plugin_names
if _Platform__bundled_plugin_names is None:
_Platform__bundled_plugin_names = cls._scan_bundled_plugin_platforms()
if value in _Platform__bundled_plugin_names:
pseudo = object.__new__(cls)
pseudo._value_ = value
pseudo._name_ = value.upper().replace("-", "_").replace(" ", "_")
# Cache so future lookups return the same object
cls._value2member_map_[value] = pseudo
cls._member_map_[pseudo._name_] = pseudo
return pseudo
# Runtime-registered plugins (e.g. user-installed, discovered after
# the enum was defined).
try:
from gateway.platform_registry import platform_registry
if platform_registry.is_registered(value):
pseudo = object.__new__(cls)
pseudo._value_ = value
pseudo._name_ = value.upper().replace("-", "_").replace(" ", "_")
cls._value2member_map_[value] = pseudo
cls._member_map_[pseudo._name_] = pseudo
return pseudo
except Exception:
pass
return None
@classmethod
def _scan_bundled_plugin_platforms(cls) -> set:
"""Return names of bundled platform plugins under ``plugins/platforms/``."""
names: set = set()
try:
platforms_dir = Path(__file__).parent.parent / "plugins" / "platforms"
if platforms_dir.is_dir():
for child in platforms_dir.iterdir():
if (
child.is_dir()
and (child / "__init__.py").exists()
and (
(child / "plugin.yaml").exists()
or (child / "plugin.yml").exists()
)
):
names.add(child.name.lower())
except Exception:
pass
return names
# Snapshot of built-in platform values before any dynamic _missing_ lookups.
# Used to distinguish real platforms from arbitrary strings.
_BUILTIN_PLATFORM_VALUES = frozenset(m.value for m in Platform.__members__.values())
@dataclass
class HomeChannel:
@ -258,6 +312,44 @@ class StreamingConfig:
)
# -----------------------------------------------------------------------------
# Built-in platform connection checkers
# -----------------------------------------------------------------------------
# Each callable receives a ``PlatformConfig`` and returns ``True`` when the
# platform is sufficiently configured to be considered "connected". Platforms
# that rely on the generic ``token or api_key`` check (Telegram, Discord,
# Slack, Matrix, Mattermost, HomeAssistant) do not need an entry here.
_PLATFORM_CONNECTED_CHECKERS: dict[Platform, Callable[[PlatformConfig], bool]] = {
Platform.WEIXIN: lambda cfg: bool(
cfg.extra.get("account_id") and (cfg.token or cfg.extra.get("token"))
),
Platform.WHATSAPP: lambda cfg: True, # bridge handles auth
Platform.SIGNAL: lambda cfg: bool(cfg.extra.get("http_url")),
Platform.EMAIL: lambda cfg: bool(cfg.extra.get("address")),
Platform.SMS: lambda cfg: bool(os.getenv("TWILIO_ACCOUNT_SID")),
Platform.API_SERVER: lambda cfg: True,
Platform.WEBHOOK: lambda cfg: True,
Platform.FEISHU: lambda cfg: bool(cfg.extra.get("app_id")),
Platform.WECOM: lambda cfg: bool(cfg.extra.get("bot_id")),
Platform.WECOM_CALLBACK: lambda cfg: bool(
cfg.extra.get("corp_id") or cfg.extra.get("apps")
),
Platform.BLUEBUBBLES: lambda cfg: bool(
cfg.extra.get("server_url") and cfg.extra.get("password")
),
Platform.QQBOT: lambda cfg: bool(
cfg.extra.get("app_id") and cfg.extra.get("client_secret")
),
Platform.YUANBAO: lambda cfg: bool(
cfg.extra.get("app_id") and cfg.extra.get("app_secret")
),
Platform.DINGTALK: lambda cfg: bool(
(cfg.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID"))
and (cfg.extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET"))
),
}
@dataclass
class GatewayConfig:
"""
@ -311,72 +403,43 @@ class GatewayConfig:
for platform, config in self.platforms.items():
if not config.enabled:
continue
# Weixin requires both a token and an account_id
if self._is_platform_connected(platform, config):
connected.append(platform)
return connected
def _is_platform_connected(self, platform: Platform, config: PlatformConfig) -> bool:
"""Check whether a single platform is sufficiently configured."""
# Weixin requires both a token and an account_id (checked first so
# the generic token branch doesn't let it through without account_id).
if platform == Platform.WEIXIN:
if config.extra.get("account_id") and (config.token or config.extra.get("token")):
connected.append(platform)
continue
# Platforms that use token/api_key auth
return bool(
config.extra.get("account_id")
and (config.token or config.extra.get("token"))
)
# Generic token/api_key auth covers Telegram, Discord, Slack, etc.
if config.token or config.api_key:
connected.append(platform)
# WhatsApp uses enabled flag only (bridge handles auth)
elif platform == Platform.WHATSAPP:
connected.append(platform)
# Signal uses extra dict for config (http_url + account)
elif platform == Platform.SIGNAL and config.extra.get("http_url"):
connected.append(platform)
# Email uses extra dict for config (address + imap_host + smtp_host)
elif platform == Platform.EMAIL and config.extra.get("address"):
connected.append(platform)
# SMS uses api_key (Twilio auth token) — SID checked via env
elif platform == Platform.SMS and os.getenv("TWILIO_ACCOUNT_SID"):
connected.append(platform)
# API Server uses enabled flag only (no token needed)
elif platform == Platform.API_SERVER:
connected.append(platform)
# Webhook uses enabled flag only (secrets are per-route)
elif platform == Platform.WEBHOOK:
connected.append(platform)
# Feishu uses extra dict for app credentials
elif platform == Platform.FEISHU and config.extra.get("app_id"):
connected.append(platform)
# WeCom bot mode uses extra dict for bot credentials
elif platform == Platform.WECOM and config.extra.get("bot_id"):
connected.append(platform)
# WeCom callback mode uses corp_id or apps list
elif platform == Platform.WECOM_CALLBACK and (
config.extra.get("corp_id") or config.extra.get("apps")
):
connected.append(platform)
# BlueBubbles uses extra dict for local server config
elif platform == Platform.BLUEBUBBLES and config.extra.get("server_url") and config.extra.get("password"):
connected.append(platform)
# QQBot uses extra dict for app credentials
elif platform == Platform.QQBOT and config.extra.get("app_id") and config.extra.get("client_secret"):
connected.append(platform)
# Yuanbao uses extra dict for app credentials
elif platform == Platform.YUANBAO and config.extra.get("app_id") and config.extra.get("app_secret"):
connected.append(platform)
# DingTalk uses client_id/client_secret from config.extra or env vars
elif platform == Platform.DINGTALK and (
config.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID")
) and (
config.extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET")
):
connected.append(platform)
else:
# Plugin-registered platform — delegate validation to the
# registry entry's validate_config if available.
return True
# Platform-specific check
checker = _PLATFORM_CONNECTED_CHECKERS.get(platform)
if checker is not None:
return checker(config)
# Plugin-registered platforms
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform.value)
if entry:
if entry.validate_config is None or entry.validate_config(config):
connected.append(platform)
if entry.is_connected is not None:
return entry.is_connected(config)
if entry.validate_config is not None:
return entry.validate_config(config)
return True
except Exception:
pass # Registry not yet initialised during early import
return connected
return False
def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]:
"""Get the home channel for a platform."""
@ -1419,3 +1482,25 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.default_reset_policy.at_hour = int(reset_hour)
except ValueError:
pass
# Registry-driven enable for plugin platforms. Built-ins have explicit
# blocks above; plugins expose check_fn() which is the single source of
# truth for "are my env vars set?". When it returns True, ensure the
# platform is enabled so start() will create its adapter.
try:
from hermes_cli.plugins import discover_plugins
discover_plugins() # idempotent
from gateway.platform_registry import platform_registry
for entry in platform_registry.plugin_entries():
try:
if not entry.check_fn():
continue
except Exception as e:
logger.debug("check_fn for %s raised: %s", entry.name, e)
continue
platform = Platform(entry.name)
if platform not in config.platforms:
config.platforms[platform] = PlatformConfig()
config.platforms[platform].enabled = True
except Exception as e:
logger.debug("Plugin platform enable pass failed: %s", e)

View File

@ -58,6 +58,11 @@ class PlatformEntry:
# fail at connect() time with a descriptive error.
validate_config: Optional[Callable[[Any], bool]] = None
# Optional: given a PlatformConfig, is the platform connected/enabled?
# Used by ``GatewayConfig.get_connected_platforms()`` and setup UI status.
# If None, falls back to ``validate_config`` or ``check_fn``.
is_connected: Optional[Callable[[Any], bool]] = None
# Env vars this platform needs (for ``hermes setup`` display).
required_env: list = field(default_factory=list)

View File

@ -374,6 +374,39 @@ def is_host_excluded_by_no_proxy(hostname: str, no_proxy_value: str | None = Non
return False
def is_host_excluded_by_no_proxy(hostname: str, no_proxy_value: str | None = None) -> bool:
"""Return True when ``hostname`` matches a ``NO_PROXY`` entry.
Supports comma- or whitespace-separated entries with optional leading dots
and ``*.`` wildcards, which match both the apex domain and subdomains.
"""
raw = no_proxy_value
if raw is None:
raw = os.environ.get("NO_PROXY") or os.environ.get("no_proxy") or ""
raw = raw.strip()
if not raw:
return False
lower_hostname = hostname.lower()
for entry in re.split(r"[\s,]+", raw):
normalized = entry.strip().lower()
if not normalized:
continue
if normalized == "*":
return True
if normalized.startswith("*."):
normalized = normalized[2:]
elif normalized.startswith("."):
normalized = normalized[1:]
if lower_hostname == normalized or lower_hostname.endswith(f".{normalized}"):
return True
return False
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path

View File

@ -1318,6 +1318,31 @@ class TelegramAdapter(BasePlatformAdapter):
)
return False
async def delete_message(self, chat_id: str, message_id: str) -> bool:
"""Delete a previously sent Telegram message.
Used by the stream consumer's fresh-final cleanup path (ported
from openclaw/openclaw#72038) to remove long-lived preview
messages after sending the completed reply as a fresh message.
Telegram's Bot API ``deleteMessage`` works for bot-posted
messages in the last 48 hours. Failures are non-fatal the
caller leaves the preview in place and logs at debug level.
"""
if not self._bot:
return False
try:
await self._bot.delete_message(
chat_id=int(chat_id),
message_id=int(message_id),
)
return True
except Exception as e:
logger.debug(
"[%s] Failed to delete Telegram message %s: %s",
self.name, message_id, e,
)
return False
async def send_update_prompt(
self, chat_id: str, prompt: str, default: str = "",
session_key: str = "",

View File

@ -421,6 +421,7 @@ if not _configured_cwd or _configured_cwd in (".", "auto", "cwd"):
from gateway.config import (
Platform,
_BUILTIN_PLATFORM_VALUES,
GatewayConfig,
load_gateway_config,
)
@ -1687,6 +1688,66 @@ class GatewayRunner:
else:
self._session_reasoning_overrides[session_key] = dict(reasoning_config)
@staticmethod
def _parse_reasoning_command_args(raw_args: str) -> tuple[str, bool]:
"""Parse `/reasoning` args into `(value, persist_global)`.
`/reasoning <level>` is session-scoped by default. `--global` may be
supplied in any position to persist the change to config.yaml.
"""
import shlex
text = str(raw_args or "").strip().replace("", "--")
if not text:
return "", False
try:
tokens = shlex.split(text)
except ValueError:
tokens = text.split()
persist_global = False
value_tokens = []
for token in tokens:
if token == "--global":
persist_global = True
else:
value_tokens.append(token)
return " ".join(value_tokens).strip().lower(), persist_global
def _resolve_session_reasoning_config(
self,
*,
source: Optional[SessionSource] = None,
session_key: Optional[str] = None,
) -> dict | None:
"""Resolve reasoning effort for a session, honoring session overrides."""
resolved_session_key = session_key
if not resolved_session_key and source is not None:
try:
resolved_session_key = self._session_key_for_source(source)
except Exception:
resolved_session_key = None
overrides = getattr(self, "_session_reasoning_overrides", {}) or {}
if resolved_session_key and resolved_session_key in overrides:
return overrides[resolved_session_key]
return self._load_reasoning_config()
def _set_session_reasoning_override(
self,
session_key: str,
reasoning_config: Optional[dict],
) -> None:
"""Set or clear the session-scoped reasoning override."""
if not session_key:
return
if not hasattr(self, "_session_reasoning_overrides"):
self._session_reasoning_overrides = {}
if reasoning_config is None:
self._session_reasoning_overrides.pop(session_key, None)
else:
self._session_reasoning_overrides[session_key] = dict(reasoning_config)
@staticmethod
def _load_service_tier() -> str | None:
"""Load Priority Processing setting from config.yaml.
@ -2357,9 +2418,8 @@ class GatewayRunner:
pass
# Warn if no user allowlists are configured and open access is not opted in
_any_allowlist = any(
os.getenv(v)
for v in ("TELEGRAM_ALLOWED_USERS", "DISCORD_ALLOWED_USERS",
_builtin_allowed_vars = (
"TELEGRAM_ALLOWED_USERS", "DISCORD_ALLOWED_USERS",
"WHATSAPP_ALLOWED_USERS", "SLACK_ALLOWED_USERS",
"SIGNAL_ALLOWED_USERS", "SIGNAL_GROUP_ALLOWED_USERS",
"TELEGRAM_GROUP_ALLOWED_USERS",
@ -2374,11 +2434,10 @@ class GatewayRunner:
"BLUEBUBBLES_ALLOWED_USERS",
"QQ_ALLOWED_USERS",
"YUANBAO_ALLOWED_USERS",
"GATEWAY_ALLOWED_USERS")
"GATEWAY_ALLOWED_USERS",
)
_allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any(
os.getenv(v, "").lower() in ("true", "1", "yes")
for v in ("TELEGRAM_ALLOW_ALL_USERS", "DISCORD_ALLOW_ALL_USERS",
_builtin_allow_all_vars = (
"TELEGRAM_ALLOW_ALL_USERS", "DISCORD_ALLOW_ALL_USERS",
"WHATSAPP_ALLOW_ALL_USERS", "SLACK_ALLOW_ALL_USERS",
"SIGNAL_ALLOW_ALL_USERS", "EMAIL_ALLOW_ALL_USERS",
"SMS_ALLOW_ALL_USERS", "MATTERMOST_ALLOW_ALL_USERS",
@ -2389,7 +2448,31 @@ class GatewayRunner:
"WEIXIN_ALLOW_ALL_USERS",
"BLUEBUBBLES_ALLOW_ALL_USERS",
"QQ_ALLOW_ALL_USERS",
"YUANBAO_ALLOW_ALL_USERS")
"YUANBAO_ALLOW_ALL_USERS",
)
# Also pick up plugin-registered platforms — each entry can declare
# its own allowed_users_env / allow_all_env, so the warning stays
# accurate as plugins like IRC come online.
_plugin_allowed_vars: tuple = ()
_plugin_allow_all_vars: tuple = ()
try:
from gateway.platform_registry import platform_registry
_plugin_allowed_vars = tuple(
e.allowed_users_env for e in platform_registry.plugin_entries()
if e.allowed_users_env
)
_plugin_allow_all_vars = tuple(
e.allow_all_env for e in platform_registry.plugin_entries()
if e.allow_all_env
)
except Exception:
pass
_any_allowlist = any(
os.getenv(v) for v in _builtin_allowed_vars + _plugin_allowed_vars
)
_allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any(
os.getenv(v, "").lower() in ("true", "1", "yes")
for v in _builtin_allow_all_vars + _plugin_allow_all_vars
)
if not _any_allowlist and not _allow_all:
logger.warning(
@ -3256,12 +3339,21 @@ class GatewayRunner:
getattr(self.config, "thread_sessions_per_user", False),
)
# ── Plugin-registered platforms (checked first) ──────────────
# ── Plugin-registered platforms (checked first) ───────────────────
try:
from gateway.platform_registry import platform_registry
if platform_registry.is_registered(platform.value):
adapter = platform_registry.create_adapter(platform.value, config)
if adapter is not None:
return adapter
# Registered but failed to instantiate — don't silently fall
# through to built-ins (there are none for plugin platforms).
logger.error(
"Platform '%s' is registered but adapter creation failed "
"(check dependencies and config)",
platform.value,
)
return None
except Exception as e:
logger.debug("Platform registry lookup for '%s' failed: %s", platform.value, e)
# Fall through to built-in adapters below
@ -9462,6 +9554,16 @@ class GatewayRunner:
try:
platform = Platform(platform_name)
# Reject arbitrary strings that create dynamic pseudo-members.
# Built-in platforms are always valid; plugin platforms must be
# registered in the platform registry.
if platform.value not in _BUILTIN_PLATFORM_VALUES:
try:
from gateway.platform_registry import platform_registry
if not platform_registry.is_registered(platform.value):
raise ValueError(platform_name)
except Exception:
raise ValueError(platform_name)
except Exception:
logger.warning(
"Synthetic process event has invalid platform metadata: %r",
@ -10549,6 +10651,7 @@ class GatewayRunner:
logger.debug("tool-progress onboarding hint failed: %s", _hint_err)
return
# Only act on tool.started events (ignore tool.completed, reasoning.available, etc.)
if event_type not in ("tool.started",):
return
@ -10675,6 +10778,22 @@ class GatewayRunner:
raw = progress_queue.get_nowait()
# Drain silently when interrupted: events queued in the
# window between tool parse and interrupt processing
# should not render as bubbles. The "⚡ Interrupting
# current task" message is sent separately and is the
# last progress-flavored bubble the user should see.
try:
_agent_for_interrupt = agent_holder[0] if agent_holder else None
if _agent_for_interrupt is not None and getattr(
_agent_for_interrupt, "is_interrupted", False
):
# Drop this event and continue draining.
await asyncio.sleep(0)
continue
except Exception:
pass
# Drain silently when interrupted: events queued in the
# window between tool parse and interrupt processing
# should not render as bubbles. The "⚡ Interrupting

View File

@ -854,6 +854,81 @@ class GatewayStreamConsumer:
self._final_response_sent = True
return True
def _should_send_fresh_final(self) -> bool:
"""Return True when a long-lived preview should be replaced with a
fresh final message instead of an edit.
Conditions:
- Fresh-final is enabled (``fresh_final_after_seconds > 0``).
- We have a real preview message id (not the ``__no_edit__`` sentinel
and not ``None``).
- The preview has been visible for at least the configured threshold.
Ported from openclaw/openclaw#72038.
"""
threshold = getattr(self.cfg, "fresh_final_after_seconds", 0.0) or 0.0
if threshold <= 0:
return False
if not self._message_id or self._message_id == "__no_edit__":
return False
if self._message_created_ts is None:
return False
age = time.monotonic() - self._message_created_ts
return age >= threshold
async def _try_fresh_final(self, text: str) -> bool:
"""Send ``text`` as a brand-new message (best-effort delete the old
preview) so the platform's visible timestamp reflects completion
time. Returns True on successful delivery, False on any failure so
the caller falls back to the normal edit path.
Ported from openclaw/openclaw#72038.
"""
old_message_id = self._message_id
try:
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
metadata=self.metadata,
)
except Exception as e:
logger.debug("Fresh-final send failed, falling back to edit: %s", e)
return False
if not getattr(result, "success", False):
return False
# Successful fresh send — try to delete the stale preview so the
# user doesn't see the old edit-stuck message underneath. Cleanup
# is best-effort; platforms that don't implement ``delete_message``
# just leave the preview behind (still an acceptable outcome —
# the visible final timestamp is the important part).
if old_message_id and old_message_id != "__no_edit__":
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
try:
await delete_fn(self.chat_id, old_message_id)
except Exception as e:
logger.debug(
"Fresh-final preview cleanup failed (%s): %s",
old_message_id, e,
)
# Adopt the new message id as the current message so subsequent
# callers (e.g. overflow split loops, finalize retries) see a
# consistent state.
new_message_id = getattr(result, "message_id", None)
if new_message_id:
self._message_id = new_message_id
self._message_created_ts = time.monotonic()
else:
# Send succeeded but platform didn't return an id — treat the
# delivery as final-only and fall back to "__no_edit__" so we
# don't try to edit something we can't address.
self._message_id = "__no_edit__"
self._message_created_ts = None
self._already_sent = True
self._last_sent_text = text
self._final_response_sent = True
return True
async def _send_or_edit(self, text: str, *, finalize: bool = False) -> bool:
"""Send or edit the streaming message.

View File

@ -255,6 +255,14 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
api_key_env_vars=("GMI_API_KEY",),
base_url_env_var="GMI_BASE_URL",
),
"gmi": ProviderConfig(
id="gmi",
name="GMI Cloud",
auth_type="api_key",
inference_base_url="https://api.gmi-serving.com/v1",
api_key_env_vars=("GMI_API_KEY",),
base_url_env_var="GMI_BASE_URL",
),
"minimax": ProviderConfig(
id="minimax",
name="MiniMax",
@ -414,6 +422,14 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
api_key_env_vars=("AZURE_FOUNDRY_API_KEY",),
base_url_env_var="AZURE_FOUNDRY_BASE_URL",
),
"azure-foundry": ProviderConfig(
id="azure-foundry",
name="Azure Foundry",
auth_type="api_key",
inference_base_url="", # User-provided endpoint
api_key_env_vars=("AZURE_FOUNDRY_API_KEY",),
base_url_env_var="AZURE_FOUNDRY_BASE_URL",
),
}

View File

@ -831,6 +831,114 @@ _SLACK_NAME_LIMIT = 32
_SLACK_INVALID_CHARS = re.compile(r"[^a-z0-9_\-]")
def _sanitize_slack_name(raw: str) -> str:
"""Convert a command name to a valid Slack slash command name.
Slack allows lowercase a-z, digits, hyphens, and underscores. Max 32
chars. Uppercase is lowercased; invalid chars are stripped.
"""
name = raw.lower()
name = _SLACK_INVALID_CHARS.sub("", name)
name = name.strip("-_")
return name[:_SLACK_NAME_LIMIT]
def slack_native_slashes() -> list[tuple[str, str, str]]:
"""Return (slash_name, description, usage_hint) triples for Slack.
Every gateway-available command in ``COMMAND_REGISTRY`` is surfaced as
a standalone Slack slash command (e.g. ``/btw``, ``/stop``, ``/model``),
matching Discord's and Telegram's model where every command is a
first-class slash and not a ``/hermes <verb>`` subcommand.
Both canonical names and aliases are included so users can type any
documented form (e.g. ``/background``, ``/bg``, and ``/btw`` all work).
Plugin-registered slash commands are included too.
Results are clamped to Slack's 50-command limit with duplicate-name
avoidance. ``/hermes`` is always reserved as the first entry so the
legacy ``/hermes <subcommand>`` form keeps working for anything that
gets dropped by the clamp or for free-form questions.
"""
overrides = _resolve_config_gates()
entries: list[tuple[str, str, str]] = []
seen: set[str] = set()
# Reserve /hermes as the catch-all top-level command.
entries.append(("hermes", "Talk to Hermes or run a subcommand", "[subcommand] [args]"))
seen.add("hermes")
def _add(name: str, desc: str, hint: str) -> None:
slack_name = _sanitize_slack_name(name)
if not slack_name or slack_name in seen:
return
if len(entries) >= _SLACK_MAX_SLASH_COMMANDS:
return
# Slack description cap is 2000 chars; keep it short.
entries.append((slack_name, desc[:140], hint[:100]))
seen.add(slack_name)
# First pass: canonical names (so they win slots if we hit the cap).
for cmd in COMMAND_REGISTRY:
if not _is_gateway_available(cmd, overrides):
continue
_add(cmd.name, cmd.description, cmd.args_hint or "")
# Second pass: aliases.
for cmd in COMMAND_REGISTRY:
if not _is_gateway_available(cmd, overrides):
continue
for alias in cmd.aliases:
# Skip aliases that only differ from canonical by case/punctuation
# normalization (already covered by _add dedup).
_add(alias, f"Alias for /{cmd.name}{cmd.description}", cmd.args_hint or "")
# Third pass: plugin commands.
for name, description, args_hint in _iter_plugin_command_entries():
_add(name, description, args_hint or "")
return entries
def slack_app_manifest(request_url: str = "https://hermes-agent.local/slack/commands") -> dict[str, Any]:
"""Generate a Slack app manifest with all gateway commands as slashes.
``request_url`` is required by Slack's manifest schema for every slash
command, but in Socket Mode (which we use) Slack ignores it and routes
the command event through the WebSocket. A placeholder URL is fine.
The returned dict is the ``features.slash_commands`` portion only
callers compose it into a full manifest (or merge into an existing
one). Keeping it narrow avoids coupling us to the rest of the manifest
schema (display_information, oauth_config, settings, etc.) which users
set up once in the Slack UI and rarely change.
"""
slashes = []
for name, desc, usage in slack_native_slashes():
entry = {
"command": f"/{name}",
"description": desc or f"Run /{name}",
"should_escape": False,
"url": request_url,
}
if usage:
entry["usage_hint"] = usage
slashes.append(entry)
return {"features": {"slash_commands": slashes}}
# ---------------------------------------------------------------------------
# Slack native slash commands
# ---------------------------------------------------------------------------
# Slack slash command name constraints: lowercase a-z, 0-9, hyphens,
# underscores. Max 32 chars. Slack app manifest accepts up to 50 slash
# commands per app.
_SLACK_MAX_SLASH_COMMANDS = 50
_SLACK_NAME_LIMIT = 32
_SLACK_INVALID_CHARS = re.compile(r"[^a-z0-9_\-]")
def _sanitize_slack_name(raw: str) -> str:
"""Convert a command name to a valid Slack slash command name.

View File

@ -1808,6 +1808,44 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
# ── Bundled skills (opt-in: only needed if the user uses that skill) ──
# These use category="skill" (distinct from "tool") so the sandbox
# env blocklist in tools/environments/local.py does NOT rewrite them —
# skills legitimately need these passed through to curl via
# tools/env_passthrough.py when the user's skill calls out.
"NOTION_API_KEY": {
"description": "Notion integration token (used by the `notion` skill)",
"prompt": "Notion API key",
"url": "https://www.notion.so/my-integrations",
"password": True,
"category": "skill",
"advanced": True,
},
"LINEAR_API_KEY": {
"description": "Linear personal API key (used by the `linear` skill)",
"prompt": "Linear API key",
"url": "https://linear.app/settings/api",
"password": True,
"category": "skill",
"advanced": True,
},
"AIRTABLE_API_KEY": {
"description": "Airtable personal access token (used by the `airtable` skill)",
"prompt": "Airtable API key",
"url": "https://airtable.com/create/tokens",
"password": True,
"category": "skill",
"advanced": True,
},
"TENOR_API_KEY": {
"description": "Tenor API key for GIF search (used by the `gif-search` skill)",
"prompt": "Tenor API key",
"url": "https://developers.google.com/tenor/guides/quickstart",
"password": True,
"category": "skill",
"advanced": True,
},
# ── Honcho ──
"HONCHO_API_KEY": {
"description": "Honcho API key for AI-native persistent memory",
@ -2617,6 +2655,71 @@ def get_custom_provider_context_length(
return None
def get_custom_provider_context_length(
model: str,
base_url: str,
custom_providers: Optional[List[Dict[str, Any]]] = None,
config: Optional[Dict[str, Any]] = None,
) -> Optional[int]:
"""Look up a per-model ``context_length`` override from ``custom_providers``.
Matches any entry whose ``base_url`` equals ``base_url`` (trailing-slash
insensitive) and returns ``custom_providers[i].models.<model>.context_length``
if present and valid. Returns ``None`` when no override applies.
This is the single source of truth for custom-provider context overrides,
used by:
* ``AIAgent.__init__`` (startup resolution)
* ``AIAgent.switch_model`` (mid-session ``/model`` switch)
* ``hermes_cli.model_switch.resolve_display_context_length`` (``/model`` confirmation display)
* ``gateway.run._format_session_info`` (``/info`` display)
* ``agent.model_metadata.get_model_context_length`` (when custom_providers is threaded through)
Before this helper existed, the lookup was duplicated in ``run_agent.py``'s
startup path only; every other path (notably ``/model`` switch) fell back
to the 128K default. See #15779.
"""
if not model or not base_url:
return None
if custom_providers is None:
try:
custom_providers = get_compatible_custom_providers(config)
except Exception:
if config is None:
return None
raw = config.get("custom_providers")
custom_providers = raw if isinstance(raw, list) else []
if not isinstance(custom_providers, list):
return None
target_url = (base_url or "").rstrip("/")
if not target_url:
return None
for entry in custom_providers:
if not isinstance(entry, dict):
continue
entry_url = (entry.get("base_url") or "").rstrip("/")
if not entry_url or entry_url != target_url:
continue
models = entry.get("models")
if not isinstance(models, dict):
continue
model_cfg = models.get(model)
if not isinstance(model_cfg, dict):
continue
raw_ctx = model_cfg.get("context_length")
if raw_ctx is None:
continue
try:
ctx = int(raw_ctx)
except (TypeError, ValueError):
continue
if ctx > 0:
return ctx
return None
def check_config_version() -> Tuple[int, int]:
"""
Check config version.

View File

@ -2773,6 +2773,12 @@ def _load_bundled_platform_plugins_for_enumeration() -> set[str]:
the registry no adapters run, no network I/O so loading it here is
side-effect-free for the short-lived setup process.
**Contract:** Platform plugin ``register()`` functions MUST NOT register
tools, hooks, or start background threads. They should only call
``ctx.register_platform()`` to populate the platform registry. Violating
this contract will cause side effects (tool registration, hook firing)
during setup menu rendering even when the plugin is disabled.
Returns the set of plugin names that were force-loaded (i.e. plugins
not in ``plugins.enabled``), so the caller can display a hint and
auto-enable them on selection.
@ -2887,6 +2893,17 @@ def _platform_status(platform: dict) -> str:
"""
entry = platform.get("_registry_entry")
if entry is not None:
configured = False
# Prefer is_connected (checks both env and config.yaml) over
# check_fn (typically just dependency / env presence).
if entry.is_connected is not None:
try:
from gateway.config import PlatformConfig
synthetic = PlatformConfig(enabled=True)
configured = bool(entry.is_connected(synthetic))
except Exception:
configured = False
if not configured:
try:
configured = bool(entry.check_fn())
except Exception:
@ -3278,6 +3295,12 @@ def _setup_yuanbao():
_setup_standard_platform(yuanbao_platform)
def _setup_yuanbao():
"""Configure Yuanbao via the standard platform setup."""
yuanbao_platform = next(p for p in _PLATFORMS if p["key"] == "yuanbao")
_setup_standard_platform(yuanbao_platform)
def _is_service_installed() -> bool:
"""Check if the gateway is installed as a system service."""
if supports_systemd_services():
@ -4009,11 +4032,21 @@ def gateway_setup():
_configure_platform(platforms[choice])
# ── Post-setup: offer to install/restart gateway ──
# Consider any platform (built-in or plugin) where the user has made
# meaningful progress. ``_platform_status`` already handles plugin
# entries via their check_fn and per-platform dual-states like
# WhatsApp's "enabled, not paired".
def _is_progress(status: str) -> bool:
s = status.lower()
return not (
s == "not configured"
or s.startswith("partially")
or s.startswith("plugin disabled")
)
any_configured = any(
bool(get_env_value(p["token_var"]))
for p in _PLATFORMS
if p["key"] != "whatsapp"
) or (get_env_value("WHATSAPP_ENABLED") or "").lower() == "true"
_is_progress(_platform_status(p)) for p in _all_platforms()
)
if any_configured:
print()

View File

@ -337,6 +337,14 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"anthropic/claude-sonnet-4.6",
"openai/gpt-5.4",
],
"gmi": [
"zai-org/GLM-5.1-FP8",
"deepseek-ai/DeepSeek-V3.2",
"moonshotai/Kimi-K2.5",
"google/gemini-3.1-flash-lite-preview",
"anthropic/claude-sonnet-4.6",
"openai/gpt-5.4",
],
"opencode-zen": [
"kimi-k2.5",
"gpt-5.4-pro",

View File

@ -2204,6 +2204,7 @@ def _setup_mattermost():
home_channel = prompt("Home channel ID (leave empty to set later with /set-home)")
if home_channel:
save_env_value("MATTERMOST_HOME_CHANNEL", home_channel)
print_info(" Open config in your editor: hermes config edit")
def _setup_bluebubbles():
@ -2321,8 +2322,7 @@ def _setup_webhooks():
print_info(" https://hermes-agent.nousresearch.com/docs/user-guide/messaging/webhooks/#configuring-routes")
print()
print_info(" Open config in your editor: hermes config edit")
print_info(" Open config in your editor: hermes config edit")
def setup_gateway(config: dict):
@ -2355,24 +2355,19 @@ def setup_gateway(config: dict):
_configure_platform(platforms[idx])
# ── Gateway Service Setup ──
any_messaging = (
get_env_value("TELEGRAM_BOT_TOKEN")
or get_env_value("DISCORD_BOT_TOKEN")
or get_env_value("SLACK_BOT_TOKEN")
or get_env_value("SIGNAL_HTTP_URL")
or get_env_value("EMAIL_ADDRESS")
or get_env_value("TWILIO_ACCOUNT_SID")
or get_env_value("MATTERMOST_TOKEN")
or get_env_value("MATRIX_ACCESS_TOKEN")
or get_env_value("MATRIX_PASSWORD")
or get_env_value("WHATSAPP_ENABLED")
or get_env_value("DINGTALK_CLIENT_ID")
or get_env_value("FEISHU_APP_ID")
or get_env_value("WECOM_BOT_ID")
or get_env_value("WEIXIN_ACCOUNT_ID")
or get_env_value("BLUEBUBBLES_SERVER_URL")
or get_env_value("QQ_APP_ID")
or get_env_value("WEBHOOK_ENABLED")
# Count any platform (built-in or plugin) the user configured during this
# setup pass — reuses ``_platform_status`` so plugin platforms like IRC
# are picked up without another hard-coded env-var list.
def _is_progress(status: str) -> bool:
s = status.lower()
return not (
s == "not configured"
or s.startswith("partially")
or s.startswith("plugin disabled")
)
any_messaging = any(
_is_progress(_platform_status(p)) for p in _all_platforms()
)
if any_messaging:
print()

View File

@ -647,6 +647,16 @@
}];
}
# ── Assertions ─────────────────────────────────────────────────────
{
assertions = let
names = map lib.getName cfg.extraPlugins;
in [{
assertion = (lib.length names) == (lib.length (lib.unique names));
message = "services.hermes-agent.extraPlugins: duplicate plugin names detected: ${toString names}. If using fetchFromGitHub, set name = \"plugin-name\" to disambiguate.";
}];
}
# ── Warnings ──────────────────────────────────────────────────────
# ── Per-user profile for extraPackages ───────────────────────────
# Wire extraPackages into the hermes user's per-user profile so the
@ -730,12 +740,12 @@
# is disabled so the host CLI falls back to native execution.
${if cfg.container.enable then ''
cat > ${cfg.stateDir}/.hermes/.container-mode <<'HERMES_CONTAINER_MODE_EOF'
# Written by NixOS activation script. Do not edit manually.
backend=${cfg.container.backend}
container_name=${containerName}
exec_user=${cfg.user}
hermes_bin=${containerDataDir}/current-package/bin/hermes
HERMES_CONTAINER_MODE_EOF
# Written by NixOS activation script. Do not edit manually.
backend=${cfg.container.backend}
container_name=${containerName}
exec_user=${cfg.user}
hermes_bin=${containerDataDir}/current-package/bin/hermes
HERMES_CONTAINER_MODE_EOF
chown ${cfg.user}:${cfg.group} ${cfg.stateDir}/.hermes/.container-mode
chmod 0644 ${cfg.stateDir}/.hermes/.container-mode
'' else ''
@ -796,8 +806,8 @@ HERMES_CONTAINER_MODE_EOF
ENV_FILE="${cfg.stateDir}/.hermes/.env"
install -o ${cfg.user} -g ${cfg.group} -m 0640 /dev/null "$ENV_FILE"
cat > "$ENV_FILE" <<'HERMES_NIX_ENV_EOF'
${envFileContent}
HERMES_NIX_ENV_EOF
${envFileContent}
HERMES_NIX_ENV_EOF
${lib.concatStringsSep "\n" (map (f: ''
if [ -f "${f}" ]; then
echo "" >> "$ENV_FILE"

View File

@ -71,7 +71,11 @@ def _parse_irc_message(raw: str) -> dict:
trailing = ""
if raw.startswith(":"):
try:
prefix, raw = raw[1:].split(" ", 1)
except ValueError:
prefix = raw[1:]
raw = ""
if " :" in raw:
raw, trailing = raw.split(" :", 1)
@ -122,9 +126,20 @@ class IRCAdapter(BasePlatformAdapter):
# Auth
self.allowed_users: list = extra.get("allowed_users", [])
# IRC nicks are case-insensitive — normalise for lookups
self._allowed_users_lower: set = {u.lower() for u in self.allowed_users if isinstance(u, str)}
# IRC limits
self.max_message_length = int(extra.get("max_message_length", 450))
max_msg = extra.get("max_message_length")
if max_msg is None:
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get("irc")
if entry and entry.max_message_length:
max_msg = entry.max_message_length
except Exception:
pass
self.max_message_length = int(max_msg or 450)
# Runtime state
self._reader: Optional[asyncio.StreamReader] = None
@ -290,21 +305,36 @@ class IRCAdapter(BasePlatformAdapter):
overhead = len(f"PRIVMSG {target} :".encode("utf-8")) + 2 # +2 for \r\n
max_bytes = 510 - overhead
max_chars = min(self.max_message_length, max_bytes)
user_limit = self.max_message_length
lines: List[str] = []
for paragraph in content.split("\n"):
if not paragraph.strip():
continue
while len(paragraph) > max_chars:
# Find a space to break at
split_at = paragraph.rfind(" ", 0, max_chars)
if split_at < max_chars // 3:
split_at = max_chars
lines.append(paragraph[:split_at])
paragraph = paragraph[split_at:].lstrip()
while True:
para_bytes = paragraph.encode("utf-8")
limit = min(user_limit, max_bytes)
if len(para_bytes) <= limit:
if paragraph.strip():
lines.append(paragraph)
break
# Binary search for a safe character boundary <= limit
low, high = 1, len(paragraph)
best = 0
while low <= high:
mid = (low + high) // 2
if len(paragraph[:mid].encode("utf-8")) <= limit:
best = mid
low = mid + 1
else:
high = mid - 1
split_at = best
# Prefer a space boundary
space = paragraph.rfind(" ", 0, split_at)
if space > split_at // 3:
split_at = space
lines.append(paragraph[:split_at].rstrip())
paragraph = paragraph[split_at:].lstrip()
return lines if lines else [""]
@ -386,7 +416,16 @@ class IRCAdapter(BasePlatformAdapter):
# ERR_NICKNAMEINUSE (433) — nick collision during registration
if command == "433":
# Retry with incrementing suffix: hermes_, hermes_1, hermes_2...
base = self.nickname.rstrip("_0123456789")
suffix_match = re.search(r"_(\d+)$", self._current_nick)
if suffix_match:
next_num = int(suffix_match.group(1)) + 1
self._current_nick = f"{base}_{next_num}"
elif self._current_nick == self.nickname:
self._current_nick = self.nickname + "_"
else:
self._current_nick = self.nickname + "_1"
await self._send_raw(f"NICK {self._current_nick}")
return
@ -425,8 +464,8 @@ class IRCAdapter(BasePlatformAdapter):
if not addressed:
return # Ignore unaddressed channel messages
# Auth check
if self.allowed_users and sender_nick not in self.allowed_users:
# Auth check (case-insensitive)
if self._allowed_users_lower and sender_nick.lower() not in self._allowed_users_lower:
logger.debug("IRC: ignoring message from unauthorized user %s", sender_nick)
return
@ -499,6 +538,121 @@ def validate_config(config) -> bool:
return bool(server and channel)
def interactive_setup() -> None:
"""Interactive `hermes gateway setup` flow for the IRC platform.
Lazy-imports ``hermes_cli.setup`` helpers so the plugin stays importable
in non-CLI contexts (gateway runtime, tests).
"""
from hermes_cli.setup import (
prompt,
prompt_yes_no,
save_env_value,
get_env_value,
print_header,
print_info,
print_warning,
print_success,
)
print_header("IRC")
existing_server = get_env_value("IRC_SERVER")
if existing_server:
print_info(f"IRC: already configured (server: {existing_server})")
if not prompt_yes_no("Reconfigure IRC?", False):
return
print_info("Connect Hermes to an IRC network. Uses Python stdlib — no extra packages needed.")
print_info(" Works with Libera.Chat, OFTC, your own ZNC/InspIRCd, etc.")
print()
server = prompt("IRC server hostname (e.g. irc.libera.chat)", default=existing_server or "")
if not server:
print_warning("Server is required — skipping IRC setup")
return
save_env_value("IRC_SERVER", server.strip())
use_tls = prompt_yes_no("Use TLS (recommended)?", True)
save_env_value("IRC_USE_TLS", "true" if use_tls else "false")
default_port = "6697" if use_tls else "6667"
port = prompt(f"Port (default {default_port})", default=get_env_value("IRC_PORT") or "")
if port:
try:
save_env_value("IRC_PORT", str(int(port)))
except ValueError:
print_warning(f"Invalid port — using default {default_port}")
elif get_env_value("IRC_PORT"):
# User cleared the prompt; drop the override so the default applies.
save_env_value("IRC_PORT", "")
nickname = prompt(
"Bot nickname (e.g. hermes-bot)",
default=get_env_value("IRC_NICKNAME") or "",
)
if not nickname:
print_warning("Nickname is required — skipping IRC setup")
return
save_env_value("IRC_NICKNAME", nickname.strip())
channel = prompt(
"Channel to join (e.g. #hermes — comma-separate for multiple)",
default=get_env_value("IRC_CHANNEL") or "",
)
if not channel:
print_warning("Channel is required — skipping IRC setup")
return
save_env_value("IRC_CHANNEL", channel.strip())
print()
print_info("🔑 Optional authentication")
print_info(" Leave blank to skip.")
if prompt_yes_no("Configure a server password (PASS command)?", False):
server_password = prompt("Server password", password=True)
if server_password:
save_env_value("IRC_SERVER_PASSWORD", server_password)
if prompt_yes_no("Identify with NickServ on connect?", False):
nickserv = prompt("NickServ password", password=True)
if nickserv:
save_env_value("IRC_NICKSERV_PASSWORD", nickserv)
print()
print_info("🔒 Access control: restrict who can message the bot")
print_info(" IRC nicks are not authenticated — anyone can claim any nick.")
print_info(" For public channels, pair with NickServ-only mode on your network")
print_info(" if you want stronger identity guarantees.")
allow_all = prompt_yes_no("Allow all users in the channel to talk to the bot?", False)
if allow_all:
save_env_value("IRC_ALLOW_ALL_USERS", "true")
save_env_value("IRC_ALLOWED_USERS", "")
print_warning("⚠️ Open access — any nick in the channel can command the bot.")
else:
save_env_value("IRC_ALLOW_ALL_USERS", "false")
allowed = prompt(
"Allowed nicks (comma-separated, leave empty to deny everyone)",
default=get_env_value("IRC_ALLOWED_USERS") or "",
)
if allowed:
save_env_value("IRC_ALLOWED_USERS", allowed.replace(" ", ""))
print_success("Allowlist configured")
else:
save_env_value("IRC_ALLOWED_USERS", "")
print_info("No nicks allowed — the bot will ignore all messages until you add nicks.")
print()
print_success("IRC configuration saved to ~/.hermes/.env")
print_info("Restart the gateway for changes to take effect: hermes gateway restart")
def is_connected(config) -> bool:
"""Check whether IRC is configured (env or config.yaml)."""
extra = getattr(config, "extra", {}) or {}
server = os.getenv("IRC_SERVER") or extra.get("server", "")
channel = os.getenv("IRC_CHANNEL") or extra.get("channel", "")
return bool(server and channel)
def register(ctx):
"""Plugin entry point — called by the Hermes plugin system."""
ctx.register_platform(
@ -507,8 +661,10 @@ def register(ctx):
adapter_factory=lambda cfg: IRCAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
is_connected=is_connected,
required_env=["IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"],
install_hint="No extra packages needed (stdlib only)",
setup_fn=interactive_setup,
# Auth env vars for _is_user_authorized() integration
allowed_users_env="IRC_ALLOWED_USERS",
allow_all_env="IRC_ALLOW_ALL_USERS",

View File

@ -1855,6 +1855,16 @@ class AIAgent:
if not isinstance(_custom_providers, list):
_custom_providers = []
# Resolve custom_providers list once for reuse below (startup
# context-length override and plugin context-engine init).
try:
from hermes_cli.config import get_compatible_custom_providers
_custom_providers = get_compatible_custom_providers(_agent_cfg)
except Exception:
_custom_providers = _agent_cfg.get("custom_providers")
if not isinstance(_custom_providers, list):
_custom_providers = []
# Check custom_providers per-model context_length
if _config_context_length is None and _custom_providers:
try:
@ -1911,6 +1921,7 @@ class AIAgent:
self._ensure_lmstudio_runtime_loaded(_config_context_length)
# Select context engine: config-driven (like memory providers).
# 1. Check config.yaml context.engine setting
# 2. Check plugins/context_engine/<name>/ directory (repo-shipped)
@ -4777,6 +4788,9 @@ class AIAgent:
# Pointer to the hermes-agent skill + docs for user questions about Hermes itself.
prompt_parts.append(HERMES_AGENT_HELP_GUIDANCE)
# Pointer to the hermes-agent skill + docs for user questions about Hermes itself.
prompt_parts.append(HERMES_AGENT_HELP_GUIDANCE)
# Tool-aware behavioral guidance: only inject when the tools are loaded
tool_guidance = []
if "memory" in self.valid_tool_names:
@ -8572,6 +8586,13 @@ class AIAgent:
if codex_message_items:
msg["codex_message_items"] = codex_message_items
# Codex Responses API: preserve exact assistant message items (with
# id/phase) so follow-up turns can replay structured items instead of
# flattening to plain text. This is required for prefix cache hits.
codex_message_items = getattr(assistant_message, "codex_message_items", None)
if codex_message_items:
msg["codex_message_items"] = codex_message_items
if assistant_message.tool_calls:
tool_calls = []
for tool_call in assistant_message.tool_calls:

View File

@ -300,6 +300,129 @@ class TestIRCAdapterMessageParsing:
assert len(dispatched) == 1
assert dispatched[0]["text"] == "* user waves"
@pytest.mark.asyncio
async def test_allowed_users_case_insensitive(self, monkeypatch):
"""Allowlist should match nicks case-insensitively."""
for key in ("IRC_SERVER", "IRC_PORT", "IRC_NICKNAME", "IRC_CHANNEL", "IRC_USE_TLS"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(
enabled=True,
extra={
"server": "localhost",
"port": 6667,
"nickname": "hermes",
"channel": "#test",
"use_tls": False,
"allowed_users": ["Admin", "BOB"],
},
)
adapter = IRCAdapter(cfg)
adapter._current_nick = "hermes"
adapter._registered = True
dispatched = []
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
adapter._message_handler = AsyncMock()
# "admin" matches "Admin" in allowlist
await adapter._handle_line(":admin!u@host PRIVMSG #test :hermes: hello")
assert len(dispatched) == 1
assert dispatched[0]["text"] == "hello"
@pytest.mark.asyncio
async def test_unauthorized_user_blocked(self, monkeypatch):
"""Nicks not in allowlist should be ignored."""
for key in ("IRC_SERVER", "IRC_PORT", "IRC_NICKNAME", "IRC_CHANNEL", "IRC_USE_TLS"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(
enabled=True,
extra={
"server": "localhost",
"port": 6667,
"nickname": "hermes",
"channel": "#test",
"use_tls": False,
"allowed_users": ["Admin", "BOB"],
},
)
adapter = IRCAdapter(cfg)
adapter._current_nick = "hermes"
adapter._registered = True
dispatched = []
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
adapter._message_handler = AsyncMock()
await adapter._handle_line(":eve!u@host PRIVMSG #test :hermes: hello")
assert len(dispatched) == 0
@pytest.mark.asyncio
async def test_nick_collision_retry(self, adapter):
"""Multiple 433 responses should keep incrementing the suffix."""
writer = MagicMock()
writer.is_closing = MagicMock(return_value=False)
writer.write = MagicMock()
writer.drain = AsyncMock()
adapter._writer = writer
await adapter._handle_line(":server 433 * hermes :Nickname in use")
assert adapter._current_nick == "hermes_"
await adapter._handle_line(":server 433 * hermes_ :Nickname in use")
assert adapter._current_nick == "hermes_1"
await adapter._handle_line(":server 433 * hermes_1 :Nickname in use")
assert adapter._current_nick == "hermes_2"
class TestIRCAdapterSplitting:
def test_split_respects_byte_limit(self):
"""Multi-byte characters should not exceed IRC byte limit."""
# 100 japanese chars = 300 bytes in utf-8
text = "" * 100
from gateway.config import PlatformConfig
cfg = PlatformConfig(enabled=True, extra={"server": "x", "channel": "#x"})
adapter = IRCAdapter(cfg)
adapter._current_nick = "bot"
lines = adapter._split_message(text, "#test")
for line in lines:
overhead = len(f"PRIVMSG #test :{line}\r\n".encode("utf-8"))
assert overhead <= 512, f"line over 512 bytes: {overhead}"
def test_split_prefers_word_boundary(self):
text = "hello world foo bar baz qux"
from gateway.config import PlatformConfig
cfg = PlatformConfig(enabled=True, extra={"server": "x", "channel": "#x"})
adapter = IRCAdapter(cfg)
adapter._current_nick = "bot"
lines = adapter._split_message(text, "#test")
# Should not split in the middle of "world"
assert any("hello" in ln for ln in lines)
assert any("world" in ln for ln in lines)
class TestIRCProtocolHelpersExtra:
def test_parse_malformed_no_space(self):
"""A line starting with : but no space should not crash."""
msg = _parse_irc_message(":justaprefix")
assert msg["prefix"] == "justaprefix"
assert msg["command"] == ""
assert msg["params"] == []
def test_parse_empty(self):
msg = _parse_irc_message("")
assert msg["prefix"] == ""
assert msg["command"] == ""
assert msg["params"] == []
class TestIRCAdapterMarkdown:

View File

@ -0,0 +1,99 @@
"""
Verify that every gateway platform built-in and plugin has a connection
checker so ``GatewayConfig.get_connected_platforms()`` doesn't silently drop
platforms with bespoke auth requirements.
"""
from unittest.mock import MagicMock
import pytest
from gateway.config import Platform, _PLATFORM_CONNECTED_CHECKERS, _BUILTIN_PLATFORM_VALUES
def test_all_builtins_have_checker_or_generic_token_path():
"""Every built-in Platform member must be reachable by either:
1. The generic ``config.token or config.api_key`` check, OR
2. A platform-specific entry in ``_PLATFORM_CONNECTED_CHECKERS``.
This guarantees ``get_connected_platforms()`` doesn't silently ignore
a built-in just because nobody added it to the checker dict.
"""
# Platforms covered by the generic token/api_key branch
generic_token_values = {p.value for p in {
Platform.TELEGRAM,
Platform.DISCORD,
Platform.SLACK,
Platform.MATRIX,
Platform.MATTERMOST,
Platform.HOMEASSISTANT,
}}
# Platforms with a bespoke checker
checker_values = {p.value for p in set(_PLATFORM_CONNECTED_CHECKERS.keys())}
# Every built-in should be in one of the two sets
all_builtins = set(_BUILTIN_PLATFORM_VALUES)
missing = all_builtins - generic_token_values - checker_values - {"local"}
assert not missing, (
f"Built-in platforms missing a connection checker: "
f"{sorted(missing)}. "
f"Add them to _PLATFORM_CONNECTED_CHECKERS or generic_token_platforms."
)
@pytest.mark.parametrize("platform, checker", list(_PLATFORM_CONNECTED_CHECKERS.items()))
def test_checker_handles_minimal_config(platform, checker):
"""Each bespoke checker must not crash on a minimal PlatformConfig."""
mock_config = MagicMock()
mock_config.extra = {}
mock_config.token = None
mock_config.api_key = None
mock_config.enabled = True
# Should return a bool without raising
result = checker(mock_config)
assert isinstance(result, bool)
@pytest.mark.parametrize("platform, checker", list(_PLATFORM_CONNECTED_CHECKERS.items()))
def test_checker_returns_true_when_configured(platform, checker, monkeypatch):
"""Each bespoke checker must return True when the config looks valid."""
mock_config = MagicMock()
mock_config.token = None
mock_config.api_key = None
mock_config.enabled = True
# Set up platform-specific mock extra fields so the checker succeeds
if platform == Platform.WEIXIN:
mock_config.extra = {"account_id": "123", "token": "***"}
elif platform == Platform.SIGNAL:
mock_config.extra = {"http_url": "http://signal:8080"}
elif platform == Platform.EMAIL:
mock_config.extra = {"address": "hermes@example.com"}
elif platform == Platform.SMS:
monkeypatch.setenv("TWILIO_ACCOUNT_SID", "ACtest")
mock_config.extra = {}
elif platform in (Platform.API_SERVER, Platform.WEBHOOK, Platform.WHATSAPP):
mock_config.extra = {}
elif platform == Platform.FEISHU:
mock_config.extra = {"app_id": "app"}
elif platform == Platform.WECOM:
mock_config.extra = {"bot_id": "bot"}
elif platform == Platform.WECOM_CALLBACK:
mock_config.extra = {"corp_id": "corp"}
elif platform == Platform.BLUEBUBBLES:
mock_config.extra = {"server_url": "http://bb:1234", "password": "pw"}
elif platform == Platform.QQBOT:
mock_config.extra = {"app_id": "app", "client_secret": "sec"}
elif platform == Platform.YUANBAO:
mock_config.extra = {"app_id": "app", "app_secret": "sec"}
elif platform == Platform.DINGTALK:
mock_config.extra = {"client_id": "id", "client_secret": "sec"}
else:
pytest.skip(f"No synthetic config defined for {platform.value}")
result = checker(mock_config)
assert result is True, f"{platform.value} checker should return True with valid-looking config"

View File

@ -38,9 +38,28 @@ class TestPlatformEnumDynamic:
assert a.value == "irc"
def test_dynamic_member_with_hyphens(self):
"""Registered plugin platforms with hyphens work once registered."""
from gateway.platform_registry import platform_registry as _reg
entry = PlatformEntry(
name="my-platform",
label="My Platform",
adapter_factory=lambda cfg: MagicMock(),
check_fn=lambda: True,
source="plugin",
)
_reg.register(entry)
try:
p = Platform("my-platform")
assert p.value == "my-platform"
assert p.name == "MY_PLATFORM"
finally:
_reg.unregister("my-platform")
def test_dynamic_member_rejects_unregistered(self):
"""Arbitrary strings are rejected to prevent enum pollution."""
with pytest.raises(ValueError):
Platform("totally-fake-platform")
def test_dynamic_member_rejects_non_string(self):
with pytest.raises(ValueError):

View File

@ -0,0 +1,230 @@
"""
Interface compliance tests for all plugin-based gateway platforms.
Discovers platforms dynamically under ``plugins/platforms/`` no manual
enumeration and verifies each one implements the required contract.
"""
import importlib
import sys
from pathlib import Path
from types import ModuleType
from typing import Any
from unittest.mock import MagicMock
import pytest
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
PLATFORMS_DIR = PROJECT_ROOT / "plugins" / "platforms"
def _discover_platform_plugins() -> list[str]:
"""Return names of all bundled platform plugins."""
if not PLATFORMS_DIR.is_dir():
return []
names = []
for child in sorted(PLATFORMS_DIR.iterdir()):
if child.is_dir() and (child / "__init__.py").exists():
names.append(child.name)
return names
# Dynamically parametrise over discovered platforms
_PLATFORM_NAMES = _discover_platform_plugins()
@pytest.fixture
def clean_registry():
"""Yield with a clean platform registry, restoring state afterwards."""
from gateway.platform_registry import platform_registry
original = dict(platform_registry._entries)
platform_registry._entries.clear()
yield platform_registry
platform_registry._entries.clear()
platform_registry._entries.update(original)
class _MockPluginContext:
"""Minimal mock of hermes_cli.plugins.PluginContext.
Only implements register_platform so we can exercise the plugin's
register() entrypoint without importing the real plugin system.
"""
def __init__(self):
self.registered_names: list[str] = []
def register_platform(
self,
*,
name: str,
label: str,
adapter_factory: Any,
check_fn: Any,
**kwargs: Any,
) -> None:
from gateway.platform_registry import platform_registry, PlatformEntry
entry = PlatformEntry(
name=name,
label=label,
adapter_factory=adapter_factory,
check_fn=check_fn,
**kwargs,
)
platform_registry.register(entry)
self.registered_names.append(name)
def _import_platform_module(name: str) -> ModuleType:
"""Import plugins.platforms.<name> in a test-safe way."""
# Make sure the project root is on sys.path so relative imports work
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
module = importlib.import_module(f"plugins.platforms.{name}")
return module
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_plugin_exposes_register_function(platform_name: str):
"""Every platform plugin must expose a callable register function."""
module = _import_platform_module(platform_name)
assert hasattr(module, "register"), f"{platform_name} missing register()"
assert callable(module.register), f"{platform_name}.register not callable"
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_plugin_registers_valid_platform_entry(platform_name: str, clean_registry):
"""Calling register() must create a valid PlatformEntry."""
module = _import_platform_module(platform_name)
ctx = _MockPluginContext()
module.register(ctx)
assert platform_name in ctx.registered_names
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
assert entry is not None, f"{platform_name} did not register an entry"
assert entry.name == platform_name
assert entry.label
assert callable(entry.adapter_factory)
assert callable(entry.check_fn)
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_platform_entry_has_required_fields(platform_name: str, clean_registry):
"""PlatformEntry must have the mandatory metadata fields."""
module = _import_platform_module(platform_name)
ctx = _MockPluginContext()
module.register(ctx)
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
assert entry is not None
# Mandatory fields
assert isinstance(entry.name, str) and entry.name
assert isinstance(entry.label, str) and entry.label
assert callable(entry.adapter_factory)
assert callable(entry.check_fn)
# Optional but recommended fields
if entry.validate_config is not None:
assert callable(entry.validate_config)
if entry.is_connected is not None:
assert callable(entry.is_connected)
if entry.setup_fn is not None:
assert callable(entry.setup_fn)
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_adapter_factory_produces_valid_adapter(platform_name: str, clean_registry):
"""The adapter factory must return an object with the base interface."""
module = _import_platform_module(platform_name)
ctx = _MockPluginContext()
module.register(ctx)
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
assert entry is not None
# Build a minimal synthetic config that shouldn't crash __init__
mock_config = MagicMock()
mock_config.extra = {}
mock_config.enabled = True
mock_config.token = None
mock_config.api_key = None
mock_config.home_channel = None
mock_config.reply_to_mode = "first"
adapter = entry.adapter_factory(mock_config)
assert adapter is not None, f"{platform_name} adapter_factory returned None"
# Required adapter interface
assert hasattr(adapter, "connect") and callable(adapter.connect)
assert hasattr(adapter, "disconnect") and callable(adapter.disconnect)
assert hasattr(adapter, "send") and callable(adapter.send)
assert hasattr(adapter, "name")
# Should be a BasePlatformAdapter subclass if importable
try:
from gateway.platforms.base import BasePlatformAdapter
assert isinstance(adapter, BasePlatformAdapter)
except Exception:
pytest.skip("BasePlatformAdapter not available for isinstance check")
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_check_fn_returns_bool(platform_name: str, clean_registry):
"""check_fn() must return a boolean."""
module = _import_platform_module(platform_name)
ctx = _MockPluginContext()
module.register(ctx)
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
assert entry is not None
result = entry.check_fn()
assert isinstance(result, bool), f"{platform_name}.check_fn() returned {type(result)}, expected bool"
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_validate_config_if_present(platform_name: str, clean_registry):
"""If validate_config is provided, it must accept a config object."""
module = _import_platform_module(platform_name)
ctx = _MockPluginContext()
module.register(ctx)
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
assert entry is not None
if entry.validate_config is None:
pytest.skip("No validate_config provided")
mock_config = MagicMock()
mock_config.extra = {}
result = entry.validate_config(mock_config)
assert isinstance(result, bool)
@pytest.mark.parametrize("platform_name", _PLATFORM_NAMES)
def test_is_connected_if_present(platform_name: str, clean_registry):
"""If is_connected is provided, it must accept a config object."""
module = _import_platform_module(platform_name)
ctx = _MockPluginContext()
module.register(ctx)
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
assert entry is not None
if entry.is_connected is None:
pytest.skip("No is_connected provided")
mock_config = MagicMock()
mock_config.extra = {}
result = entry.is_connected(mock_config)
assert isinstance(result, bool)

View File

@ -89,12 +89,14 @@ class TestSessionSourceRoundtrip:
assert restored.chat_topic is None
assert restored.chat_type == "dm"
def test_unknown_platform_accepted_for_plugins(self):
"""Unknown platform names are now accepted (dynamic enum members for
plugin platforms), so from_dict should succeed rather than raise."""
source = SessionSource.from_dict({"platform": "nonexistent", "chat_id": "1"})
assert source.platform.value == "nonexistent"
assert source.chat_id == "1"
def test_unknown_platform_rejected_for_bad_names(self):
"""Arbitrary platform names are rejected (no accidental enum pollution).
Only bundled platform plugins (discovered under ``plugins/platforms/``)
and runtime-registered plugins get dynamic enum members.
"""
with pytest.raises(ValueError):
SessionSource.from_dict({"platform": "nonexistent", "chat_id": "1"})
class TestSessionSourceDescription:

View File

@ -552,6 +552,19 @@ class TestResolveApiKeyProviderCredentials:
creds = resolve_api_key_provider_credentials("gmi")
assert creds["base_url"] == "https://custom.gmi.example/v1"
def test_resolve_gmi_with_key(self, monkeypatch):
monkeypatch.setenv("GMI_API_KEY", "gmi-secret-key")
creds = resolve_api_key_provider_credentials("gmi")
assert creds["provider"] == "gmi"
assert creds["api_key"] == "gmi-secret-key"
assert creds["base_url"] == "https://api.gmi-serving.com/v1"
def test_resolve_gmi_custom_base_url(self, monkeypatch):
monkeypatch.setenv("GMI_API_KEY", "gmi-key")
monkeypatch.setenv("GMI_BASE_URL", "https://custom.gmi.example/v1")
creds = resolve_api_key_provider_credentials("gmi")
assert creds["base_url"] == "https://custom.gmi.example/v1"
def test_resolve_kilocode_custom_base_url(self, monkeypatch):
monkeypatch.setenv("KILOCODE_API_KEY", "kilo-key")
monkeypatch.setenv("KILOCODE_BASE_URL", "https://custom.kilo.example/v1")

View File

@ -430,6 +430,43 @@ def test_run_doctor_accepts_hermes_provider_ids_that_catalog_aliases(
)
def test_run_doctor_accepts_bare_custom_provider(monkeypatch, tmp_path):
home = tmp_path / ".hermes"
home.mkdir(parents=True, exist_ok=True)
(home / "config.yaml").write_text(
"model:\n"
" provider: custom\n"
" default: local-model\n"
" base_url: http://localhost:8000/v1\n",
encoding="utf-8",
)
monkeypatch.setattr(doctor_mod, "HERMES_HOME", home)
monkeypatch.setattr(doctor_mod, "PROJECT_ROOT", tmp_path / "project")
monkeypatch.setattr(doctor_mod, "_DHH", str(home))
(tmp_path / "project").mkdir(exist_ok=True)
fake_model_tools = types.SimpleNamespace(
check_tool_availability=lambda *a, **kw: ([], []),
TOOLSET_REQUIREMENTS={},
)
monkeypatch.setitem(sys.modules, "model_tools", fake_model_tools)
try:
from hermes_cli import auth as _auth_mod
monkeypatch.setattr(_auth_mod, "get_nous_auth_status", lambda: {})
monkeypatch.setattr(_auth_mod, "get_codex_auth_status", lambda: {})
except Exception:
pass
buf = io.StringIO()
with contextlib.redirect_stdout(buf):
doctor_mod.run_doctor(Namespace(fix=False))
out = buf.getvalue()
assert "model.provider 'custom' is not a recognised provider" not in out
def test_run_doctor_termux_does_not_mark_browser_available_without_agent_browser(monkeypatch, tmp_path):
home = tmp_path / ".hermes"
home.mkdir(parents=True, exist_ok=True)

View File

@ -479,6 +479,69 @@ class TestAzureFoundryModelApiMode:
assert azure_foundry_model_api_mode("Codex-Mini") == "codex_responses"
class TestAzureFoundryModelApiMode:
"""Azure Foundry deploys GPT-5.x / codex / o-series as Responses-API-only.
Azure returns ``400 "The requested operation is unsupported."`` when
/chat/completions is called against these deployments. Verified in the
wild by a user debug bundle on 2026-04-26: gpt-5.3-codex failed with
that exact payload while gpt-4o-pure worked on the same endpoint.
"""
def test_gpt5_family_uses_responses(self):
assert azure_foundry_model_api_mode("gpt-5") == "codex_responses"
assert azure_foundry_model_api_mode("gpt-5.3") == "codex_responses"
assert azure_foundry_model_api_mode("gpt-5.4") == "codex_responses"
assert azure_foundry_model_api_mode("gpt-5-codex") == "codex_responses"
assert azure_foundry_model_api_mode("gpt-5.3-codex") == "codex_responses"
# gpt-5-mini exceptions are Copilot-specific; Azure deploys the whole
# gpt-5 family on Responses API uniformly.
assert azure_foundry_model_api_mode("gpt-5-mini") == "codex_responses"
def test_codex_family_uses_responses(self):
assert azure_foundry_model_api_mode("codex") == "codex_responses"
assert azure_foundry_model_api_mode("codex-mini") == "codex_responses"
def test_o_series_reasoning_uses_responses(self):
assert azure_foundry_model_api_mode("o1") == "codex_responses"
assert azure_foundry_model_api_mode("o1-preview") == "codex_responses"
assert azure_foundry_model_api_mode("o1-mini") == "codex_responses"
assert azure_foundry_model_api_mode("o3") == "codex_responses"
assert azure_foundry_model_api_mode("o3-mini") == "codex_responses"
assert azure_foundry_model_api_mode("o4-mini") == "codex_responses"
def test_gpt4_family_returns_none(self):
"""GPT-4, GPT-4o, etc. speak chat completions on Azure."""
assert azure_foundry_model_api_mode("gpt-4") is None
assert azure_foundry_model_api_mode("gpt-4o") is None
assert azure_foundry_model_api_mode("gpt-4o-pure") is None
assert azure_foundry_model_api_mode("gpt-4o-mini") is None
assert azure_foundry_model_api_mode("gpt-4-turbo") is None
assert azure_foundry_model_api_mode("gpt-4.1") is None
assert azure_foundry_model_api_mode("gpt-3.5-turbo") is None
def test_non_openai_deployments_return_none(self):
"""Llama, Mistral, Grok, etc. keep the default chat completions."""
assert azure_foundry_model_api_mode("llama-3.1-70b") is None
assert azure_foundry_model_api_mode("mistral-large") is None
assert azure_foundry_model_api_mode("grok-4") is None
assert azure_foundry_model_api_mode("phi-3-medium") is None
def test_vendor_prefix_stripped(self):
"""Users who copy-paste ``openai/gpt-5.3-codex`` should still match."""
assert azure_foundry_model_api_mode("openai/gpt-5.3-codex") == "codex_responses"
assert azure_foundry_model_api_mode("openai/gpt-4o") is None
def test_empty_and_none_return_none(self):
assert azure_foundry_model_api_mode(None) is None
assert azure_foundry_model_api_mode("") is None
assert azure_foundry_model_api_mode(" ") is None
def test_case_insensitive(self):
assert azure_foundry_model_api_mode("GPT-5.3-Codex") == "codex_responses"
assert azure_foundry_model_api_mode("Codex-Mini") == "codex_responses"
# -- validate — format checks -----------------------------------------------
class TestValidateFormatChecks:

View File

@ -0,0 +1,309 @@
"""Tests for IRC gateway configuration via `hermes setup gateway` UI.
Covers the full plugin-platform discovery status configure flow so that
a fresh Hermes install (no state, no env vars) can set up IRC through the
interactive setup menus.
"""
import os
import pytest
from gateway.platform_registry import PlatformEntry, platform_registry
def _register_irc_platform(**overrides):
"""Manually register the IRC platform entry as if discover_plugins() found it.
Tests run outside the normal plugin-discovery path, so we inject the entry
directly into the singleton registry and yield its dict shape.
"""
needs_enable = overrides.pop("needs_enable", False)
defaults = dict(
name="irc",
label="IRC",
adapter_factory=lambda cfg: None,
check_fn=lambda: bool(os.getenv("IRC_SERVER", "") and os.getenv("IRC_CHANNEL", "")),
validate_config=None,
required_env=["IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"],
install_hint="No extra packages needed (stdlib only)",
setup_fn=lambda: None,
source="plugin",
plugin_name="irc_platform",
allowed_users_env="IRC_ALLOWED_USERS",
allow_all_env="IRC_ALLOW_ALL_USERS",
max_message_length=450,
pii_safe=False,
emoji="💬",
allow_update_command=True,
platform_hint="You are chatting via IRC.",
)
defaults.update(overrides)
entry = PlatformEntry(**defaults)
platform_registry.register(entry)
return {
"key": entry.name,
"label": entry.label,
"emoji": entry.emoji,
"token_var": entry.required_env[0] if entry.required_env else "",
"install_hint": entry.install_hint,
"_registry_entry": entry,
"needs_enable": needs_enable,
}
def _unregister_irc_platform():
platform_registry.unregister("irc")
# ── Fresh-install discovery ─────────────────────────────────────────────────
class TestIRCFreshInstallDiscovery:
"""IRC appears in the setup menu on a brand-new Hermes install."""
def test_irc_appears_in_all_platforms(self, monkeypatch):
"""When the IRC plugin is registered, _all_platforms() surfaces it."""
import hermes_cli.gateway as gateway_mod
_register_irc_platform()
try:
# Ensure no stale env vars leak in
for key in ("IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"):
monkeypatch.delenv(key, raising=False)
platforms = gateway_mod._all_platforms()
keys = {p["key"] for p in platforms}
assert "irc" in keys
irc_plat = next(p for p in platforms if p["key"] == "irc")
assert irc_plat["label"] == "IRC"
assert irc_plat["emoji"] == "💬"
finally:
_unregister_irc_platform()
def test_irc_status_not_configured_when_fresh(self, monkeypatch):
"""On a fresh install with no env vars, IRC shows 'not configured'."""
import hermes_cli.gateway as gateway_mod
plat = _register_irc_platform()
try:
for key in ("IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"):
monkeypatch.delenv(key, raising=False)
status = gateway_mod._platform_status(plat)
assert status == "not configured"
finally:
_unregister_irc_platform()
def test_irc_status_configured_when_env_set(self, monkeypatch):
"""After the user sets IRC_SERVER and IRC_CHANNEL, status is 'configured'."""
import hermes_cli.gateway as gateway_mod
plat = _register_irc_platform()
try:
monkeypatch.setenv("IRC_SERVER", "irc.libera.chat")
monkeypatch.setenv("IRC_CHANNEL", "#hermes")
monkeypatch.setenv("IRC_NICKNAME", "hermes-bot")
status = gateway_mod._platform_status(plat)
assert status == "configured"
finally:
_unregister_irc_platform()
def test_irc_status_partial_when_only_server_set(self, monkeypatch):
"""If only IRC_SERVER is set, the platform is still not configured."""
import hermes_cli.gateway as gateway_mod
plat = _register_irc_platform()
try:
monkeypatch.delenv("IRC_CHANNEL", raising=False)
monkeypatch.delenv("IRC_NICKNAME", raising=False)
monkeypatch.setenv("IRC_SERVER", "irc.libera.chat")
status = gateway_mod._platform_status(plat)
assert status == "not configured"
finally:
_unregister_irc_platform()
# ── Plugin-disabled flow ────────────────────────────────────────────────────
class TestIRCPluginDisabledFlow:
"""When the IRC plugin is disabled, setup offers to enable it."""
def test_disabled_plugin_shows_enable_prompt(self, monkeypatch):
"""A disabled plugin platform surfaces 'plugin disabled — select to enable'."""
import hermes_cli.gateway as gateway_mod
plat = _register_irc_platform(needs_enable=True)
try:
for key in ("IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"):
monkeypatch.delenv(key, raising=False)
status = gateway_mod._platform_status(plat)
assert "plugin disabled" in status.lower()
assert "select to enable" in status.lower()
finally:
_unregister_irc_platform()
def test_disabled_but_already_configured_shows_configured(self, monkeypatch):
"""If the plugin is disabled but env vars are already present, show 'configured'."""
import hermes_cli.gateway as gateway_mod
plat = _register_irc_platform(needs_enable=True)
try:
monkeypatch.setenv("IRC_SERVER", "irc.libera.chat")
monkeypatch.setenv("IRC_CHANNEL", "#hermes")
status = gateway_mod._platform_status(plat)
assert status == "configured"
finally:
_unregister_irc_platform()
# ── Interactive setup dispatch ──────────────────────────────────────────────
class TestIRCInteractiveSetup:
"""The setup UI dispatches to IRC's interactive_setup() correctly."""
def test_configure_platform_dispatches_to_irc_setup_fn(self, monkeypatch, capsys):
"""_configure_platform() calls the IRC plugin's setup_fn when selected."""
import hermes_cli.gateway as gateway_mod
calls = []
def fake_setup():
calls.append("setup_called")
print("IRC setup complete!")
plat = _register_irc_platform(setup_fn=fake_setup)
try:
gateway_mod._configure_platform(plat)
finally:
_unregister_irc_platform()
assert "setup_called" in calls
out = capsys.readouterr().out
assert "IRC setup complete!" in out
def test_configure_platform_enables_disabled_plugin_first(self, monkeypatch, capsys, tmp_path):
"""If the plugin is disabled, _configure_platform enables it before running setup."""
import hermes_cli.gateway as gateway_mod
from hermes_cli.config import save_config, load_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Ensure plugins.enabled exists but does NOT include irc_platform
cfg = load_config()
cfg.setdefault("plugins", {})["enabled"] = ["some_other_plugin"]
save_config(cfg)
calls = []
def fake_setup():
calls.append("setup_called")
plat = _register_irc_platform(setup_fn=fake_setup, needs_enable=True)
try:
gateway_mod._configure_platform(plat)
finally:
_unregister_irc_platform()
assert "setup_called" in calls
# Plugin should now be enabled
reloaded = load_config()
assert "irc_platform" in reloaded.get("plugins", {}).get("enabled", [])
def test_configure_platform_fallback_when_no_setup_fn(self, monkeypatch, capsys):
"""A plugin with no setup_fn falls back to env-var instructions."""
import hermes_cli.gateway as gateway_mod
plat = _register_irc_platform(setup_fn=None)
try:
gateway_mod._configure_platform(plat)
finally:
_unregister_irc_platform()
out = capsys.readouterr().out
assert "IRC" in out
assert "IRC_SERVER" in out
# ── End-to-end fresh-install gateway setup ──────────────────────────────────
class TestIRCGatewaySetupFreshInstall:
"""Simulate the full `hermes setup gateway` experience with IRC present."""
def test_setup_gateway_shows_irc_in_platform_menu(self, monkeypatch, capsys, tmp_path):
"""The gateway setup menu lists IRC among the available platforms."""
import hermes_cli.gateway as gateway_mod
from hermes_cli import setup as setup_mod
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_register_irc_platform()
try:
for key in ("IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"):
monkeypatch.delenv(key, raising=False)
# Sanity-check: IRC must be visible to _all_platforms()
platforms = gateway_mod._all_platforms()
assert any(p["key"] == "irc" for p in platforms), \
f"IRC not in platforms: {[p['key'] for p in platforms]}"
# Capture what prompt_checklist is asked to display
checklist_calls = []
def capture_prompt_checklist(question, choices, pre_selected=None):
checklist_calls.append({"question": question, "choices": choices})
return [] # nothing selected → clean exit
monkeypatch.setattr(setup_mod, "prompt_yes_no", lambda *a, **kw: False)
monkeypatch.setattr(setup_mod, "prompt_checklist", capture_prompt_checklist)
monkeypatch.setattr(gateway_mod, "supports_systemd_services", lambda: False)
monkeypatch.setattr(gateway_mod, "is_macos", lambda: False)
monkeypatch.setattr(gateway_mod, "_is_service_installed", lambda: False)
monkeypatch.setattr(gateway_mod, "_is_service_running", lambda: False)
setup_mod.setup_gateway({})
# Find the platform-selection prompt
platform_prompt = next(
(c for c in checklist_calls if "platform" in c["question"].lower()),
None,
)
assert platform_prompt is not None, \
f"No platform prompt found in {checklist_calls}"
choices_text = "\n".join(platform_prompt["choices"])
assert "IRC" in choices_text
assert "💬" in choices_text
assert "not configured" in choices_text.lower()
finally:
_unregister_irc_platform()
def test_setup_gateway_irc_counts_as_messaging_platform(self, monkeypatch, capsys, tmp_path):
"""When IRC is configured, setup_gateway counts it as a messaging platform."""
import hermes_cli.gateway as gateway_mod
from hermes_cli import setup as setup_mod
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_register_irc_platform()
try:
monkeypatch.setenv("IRC_SERVER", "irc.libera.chat")
monkeypatch.setenv("IRC_CHANNEL", "#hermes")
monkeypatch.setenv("IRC_NICKNAME", "hermes-bot")
monkeypatch.setattr(setup_mod, "prompt_yes_no", lambda *a, **kw: False)
monkeypatch.setattr(setup_mod, "prompt_choice", lambda *a, **kw: 0)
monkeypatch.setattr(gateway_mod, "supports_systemd_services", lambda: False)
monkeypatch.setattr(gateway_mod, "is_macos", lambda: False)
monkeypatch.setattr(gateway_mod, "_is_service_installed", lambda: False)
monkeypatch.setattr(gateway_mod, "_is_service_running", lambda: False)
setup_mod.setup_gateway({})
out = capsys.readouterr().out
assert "Messaging platforms configured!" in out
finally:
_unregister_irc_platform()

View File

@ -569,6 +569,28 @@ class TestToolHandlers:
first_client.arecall.assert_called_once()
second_client.arecall.assert_called_once()
def test_local_embedded_recall_reconnects_after_idle_shutdown(self, provider, monkeypatch):
first_client = _make_mock_client()
first_client.arecall.side_effect = RuntimeError("Cannot connect to host 127.0.0.1:8888")
second_client = _make_mock_client()
second_client.arecall.return_value = SimpleNamespace(
results=[SimpleNamespace(text="Recovered memory")]
)
clients = iter([first_client, second_client])
provider._mode = "local_embedded"
provider._client = first_client
monkeypatch.setattr(provider, "_get_client", lambda: next(clients))
result = json.loads(provider.handle_tool_call(
"hindsight_recall", {"query": "test"}
))
assert result["result"] == "1. Recovered memory"
assert provider._client is second_client
first_client.arecall.assert_called_once()
second_client.arecall.assert_called_once()
# ---------------------------------------------------------------------------
# Prefetch tests

View File

@ -1535,6 +1535,24 @@ class TestBuildAssistantMessage:
assert "<memory-context>" in result["content"]
assert "Visible answer" in result["content"]
def test_memory_context_in_stored_content_is_preserved(self, agent):
"""`_build_assistant_message` must not silently mutate model output
containing literal <memory-context> markers that's legitimate text
(e.g. documentation, code) that the model may emit. Streaming-path
leak prevention is handled by StreamingContextScrubber upstream."""
original = (
"<memory-context>\n"
"[System note: The following is recalled memory context, NOT new user input. Treat as informational background data.]\n\n"
"## Honcho Context\n"
"stale memory\n"
"</memory-context>\n\n"
"Visible answer"
)
msg = _mock_assistant_msg(content=original)
result = agent._build_assistant_message(msg, "stop")
assert "<memory-context>" in result["content"]
assert "Visible answer" in result["content"]
def test_unterminated_think_block_stripped(self, agent):
"""Unterminated <think> block (MiniMax / NIM dropped close tag) is
fully stripped from stored content."""

View File

@ -57,6 +57,32 @@ def _run_steps(dockerfile_text: str) -> list[str]:
]
def _dockerfile_instructions(dockerfile_text: str) -> list[str]:
instructions: list[str] = []
current = ""
for raw_line in dockerfile_text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
continued = line.removesuffix("\\").strip()
current = f"{current} {continued}".strip()
if not line.endswith("\\"):
instructions.append(current)
current = ""
return instructions
def _run_steps(dockerfile_text: str) -> list[str]:
return [
instruction
for instruction in _dockerfile_instructions(dockerfile_text)
if instruction.startswith("RUN ")
]
def test_dockerfile_installs_an_init_for_zombie_reaping(dockerfile_text):
"""Some init (tini, dumb-init, catatonit) must be installed.
@ -105,6 +131,26 @@ def test_dockerfile_entrypoint_routes_through_the_init(dockerfile_text):
)
def test_dockerfile_installs_tui_dependencies(dockerfile_text):
assert "ui-tui/package.json" in dockerfile_text
assert "ui-tui/packages/hermes-ink/package-lock.json" in dockerfile_text
assert any(
"ui-tui" in step
and "npm" in step
and (" install" in step or " ci" in step)
for step in _run_steps(dockerfile_text)
)
def test_dockerfile_builds_tui_assets(dockerfile_text):
assert any(
"ui-tui" in step
and "npm" in step
and "run build" in step
for step in _run_steps(dockerfile_text)
)
def test_dockerfile_installs_tui_dependencies(dockerfile_text):
assert "ui-tui/package.json" in dockerfile_text
assert "ui-tui/packages/hermes-ink/package-lock.json" in dockerfile_text

View File

@ -207,6 +207,101 @@ def _hardline_block_result(description: str) -> dict:
}
# =========================================================================
# Hardline (unconditional) blocklist
# =========================================================================
#
# Commands so catastrophic they should NEVER run via the agent, regardless
# of --yolo, /yolo, approvals.mode=off, or cron approve mode. This is a
# floor below yolo: opting into yolo is the user trusting the agent with
# their files and services, not trusting it to wipe the disk or power the
# box off.
#
# Hardline only applies to environments that can actually damage the host
# (local, ssh, container-host cron). Containerized backends (docker,
# singularity, modal, daytona) already bypass the dangerous-command layer
# because nothing they do can touch the host, so we leave that behavior
# alone.
#
# The list is deliberately tiny — only things with no recovery path:
# filesystem destruction rooted at /, raw block device overwrites, kernel
# shutdown/reboot, and denial-of-service commands that take the host down.
# Recoverable-but-costly operations (git reset --hard, rm -rf /tmp/x,
# chmod -R 777, curl|sh) stay in DANGEROUS_PATTERNS where yolo can pass
# them through — that's what yolo is for.
#
# Inspired by Mercury Agent's permission-hardened blocklist
# (https://github.com/cosmicstack-labs/mercury-agent).
# Regex fragment matching the *start* of a command (i.e. positions where
# a shell would begin parsing a new command). Used by shutdown/reboot
# patterns so they don't fire on "echo reboot" or "grep 'shutdown' log".
# Matches: start of string, after command separators (; && || | newline),
# after subshell openers ( `$(` or backtick ), optionally consuming
# leading wrapper commands (sudo, env VAR=VAL, exec, nohup, setsid).
_CMDPOS = (
r'(?:^|[;&|\n`]|\$\()' # start position
r'\s*' # optional whitespace
r'(?:sudo\s+(?:-[^\s]+\s+)*)?' # optional sudo with flags
r'(?:env\s+(?:\w+=\S*\s+)*)?' # optional env with VAR=VAL pairs
r'(?:(?:exec|nohup|setsid|time)\s+)*' # optional wrapper commands
r'\s*'
)
HARDLINE_PATTERNS = [
# rm recursive targeting the root filesystem or protected roots
(r'\brm\s+(-[^\s]*\s+)*(/|/\*|/ \*)(\s|$)', "recursive delete of root filesystem"),
(r'\brm\s+(-[^\s]*\s+)*(/home|/home/\*|/root|/root/\*|/etc|/etc/\*|/usr|/usr/\*|/var|/var/\*|/bin|/bin/\*|/sbin|/sbin/\*|/boot|/boot/\*|/lib|/lib/\*)(\s|$)', "recursive delete of system directory"),
(r'\brm\s+(-[^\s]*\s+)*(~|\$HOME)(/?|/\*)?(\s|$)', "recursive delete of home directory"),
# Filesystem format
(r'\bmkfs(\.[a-z0-9]+)?\b', "format filesystem (mkfs)"),
# Raw block device overwrites (dd + redirection)
(r'\bdd\b[^\n]*\bof=/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*', "dd to raw block device"),
(r'>\s*/dev/(sd|nvme|hd|mmcblk|vd|xvd)[a-z0-9]*\b', "redirect to raw block device"),
# Fork bomb (classic shell form)
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
# Kill every process on the system
(r'\bkill\s+(-[^\s]+\s+)*-1\b', "kill all processes"),
# System shutdown / reboot — anchor to command position (start of line,
# after a command separator, or after sudo/env wrappers) so we don't
# false-positive on "echo reboot" or "grep 'shutdown' logs".
# _CMDPOS matches start-of-command positions.
(_CMDPOS + r'(shutdown|reboot|halt|poweroff)\b', "system shutdown/reboot"),
(_CMDPOS + r'init\s+[06]\b', "init 0/6 (shutdown/reboot)"),
(_CMDPOS + r'systemctl\s+(poweroff|reboot|halt|kexec)\b', "systemctl poweroff/reboot"),
(_CMDPOS + r'telinit\s+[06]\b', "telinit 0/6 (shutdown/reboot)"),
]
def detect_hardline_command(command: str) -> tuple:
"""Check if a command matches the unconditional hardline blocklist.
Returns:
(is_hardline, description) or (False, None)
"""
normalized = _normalize_command_for_detection(command).lower()
for pattern, description in HARDLINE_PATTERNS:
if re.search(pattern, normalized, re.IGNORECASE | re.DOTALL):
return (True, description)
return (False, None)
def _hardline_block_result(description: str) -> dict:
"""Build the standard block result for a hardline match."""
return {
"approved": False,
"hardline": True,
"message": (
f"BLOCKED (hardline): {description}. "
"This command is on the unconditional blocklist and cannot "
"be executed via the agent — not even with --yolo, /yolo, "
"approvals.mode=off, or cron approve mode. If you genuinely "
"need to run it, run it yourself in a terminal outside the "
"agent."
),
}
# =========================================================================
# Dangerous command patterns
# =========================================================================
@ -795,6 +890,16 @@ def check_dangerous_command(command: str, env_type: str,
logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200])
return _hardline_block_result(hardline_desc)
# Hardline floor: commands with no recovery path (rm -rf /, mkfs, dd
# to raw device, shutdown/reboot, fork bomb, kill -1) are blocked
# unconditionally, BEFORE the yolo bypass. Opting into yolo is
# trusting the agent with your files and services, not trusting it
# to wipe the disk or power the box off.
is_hardline, hardline_desc = detect_hardline_command(command)
if is_hardline:
logger.warning("Hardline block: %s (command: %s)", hardline_desc, command[:200])
return _hardline_block_result(hardline_desc)
# --yolo: bypass all approval prompts. Gateway /yolo is session-scoped;
# CLI --yolo remains process-scoped via the env var for local use.
if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled():

View File

@ -1794,6 +1794,13 @@ _stdio_pids: Dict[int, str] = {} # pid -> server_name
# sessions (e.g. concurrent cron jobs or live user chats).
_orphan_stdio_pids: set = set()
# PIDs that survived their session context exit (SDK teardown failed to
# terminate them). These are detected in _run_stdio's finally block and
# can be cleaned up asynchronously by _kill_orphaned_mcp_children().
# Separate from _stdio_pids so cleanup sweeps never race with active
# sessions (e.g. concurrent cron jobs or live user chats).
_orphan_stdio_pids: set = set()
def _snapshot_child_pids() -> set:
"""Return a set of current child process PIDs.

View File

@ -30,7 +30,7 @@ export { useTerminalFocus } from './src/ink/hooks/use-terminal-focus.ts'
export { useTerminalTitle } from './src/ink/hooks/use-terminal-title.ts'
export { useTerminalViewport } from './src/ink/hooks/use-terminal-viewport.ts'
export { default as measureElement } from './src/ink/measure-element.ts'
export { createRoot, forceRedraw, default as render, renderSync } from './src/ink/root.ts'
export { createRoot, forceRedraw, default as render, forceRedraw, renderSync } from './src/ink/root.ts'
export type { Instance, RenderOptions, Root } from './src/ink/root.ts'
export { stringWidth } from './src/ink/stringWidth.ts'
export { default as TextInput, UncontrolledTextInput } from 'ink-text-input'

View File

@ -23,7 +23,7 @@ export { useTerminalTitle } from './ink/hooks/use-terminal-title.js'
export { useTerminalViewport } from './ink/hooks/use-terminal-viewport.js'
export { default as measureElement } from './ink/measure-element.js'
export { scrollFastPathStats, type ScrollFastPathStats } from './ink/render-node-to-output.js'
export { createRoot, forceRedraw, default as render, renderSync } from './ink/root.js'
export { createRoot, forceRedraw, default as render, forceRedraw, renderSync } from './ink/root.js'
export { stringWidth } from './ink/stringWidth.js'
export { isXtermJs } from './ink/terminal.js'
export { default as TextInput, UncontrolledTextInput } from 'ink-text-input'

View File

@ -893,6 +893,43 @@ function selectionContentBounds(
return { first, last }
}
function selectableCell(screen: Screen, row: number, col: number): boolean {
const cell = cellAt(screen, col, row)
return (
screen.noSelect[row * screen.width + col] !== 1 &&
isWrittenCellAt(screen, col, row) &&
!!cell &&
cell.width !== CellWidth.SpacerTail &&
cell.width !== CellWidth.SpacerHead
)
}
function selectionContentBounds(
screen: Screen,
row: number,
start: number,
end: number
): { first: number; last: number } | null {
let first = start
while (first <= end && !selectableCell(screen, row, first)) {
first++
}
if (first > end) {
return null
}
let last = end
while (last >= first && !selectableCell(screen, row, last)) {
last--
}
return { first, last }
}
/** Extract text from one screen row. When the next row is a soft-wrap
* continuation (screen.softWrap[row+1]>0), clamp to that content-end
* column and skip the trailing trim so the word-separator space survives

View File

@ -41,10 +41,10 @@ export interface SelectionApi {
captureScrolledRows: (firstRow: number, lastRow: number, side: 'above' | 'below') => void
clearSelection: () => void
copySelection: () => Promise<string>
copySelectionNoClear: () => Promise<string>
getState: () => unknown
version: () => number
shiftAnchor: (dRow: number, minRow: number, maxRow: number) => void
copySelectionNoClear: () => Promise<string>;
getState: () => unknown;
version: () => number;
shiftAnchor: (dRow: number, minRow: number, maxRow: number) => void;
shiftSelection: (dRow: number, minRow: number, maxRow: number) => void
}

View File

@ -437,6 +437,39 @@ export const coreCommands: SlashCommand[] = [
}
},
{
help: 'save the current transcript to JSON',
name: 'save',
run: (_arg, ctx) => {
const hasConversation = ctx.local
.getHistoryItems()
.some(m => m.role === 'user' || m.role === 'assistant' || m.role === 'tool')
if (!hasConversation) {
return ctx.transcript.sys('no conversation yet')
}
if (!ctx.sid) {
return ctx.transcript.sys('no active session — nothing to save')
}
ctx.gateway
.rpc<SessionSaveResponse>('session.save', { session_id: ctx.sid })
.then(
ctx.guarded<SessionSaveResponse>(r => {
const file = r?.file
if (file) {
ctx.transcript.sys(`conversation saved to: ${file}`)
} else {
ctx.transcript.sys('failed to save')
}
})
)
.catch(ctx.guardedErr)
}
},
{
aliases: ['sb'],
help: 'status bar position (on|off|top|bottom)',

View File

@ -904,6 +904,21 @@ export const ToolTrail = memo(function ToolTrail({
)
}
const toolLabel = (group: Group) => {
const { duration, label } = splitToolDuration(String(group.content))
return duration ? (
<>
{label}
<Text color={t.color.statusFg} dim>
{duration}
</Text>
</>
) : (
group.content
)
}
// ── Backstop: floating alerts when every panel is hidden ─────────
//
// Per-section overrides win over the global details_mode (they're computed

View File

@ -1,3 +1,5 @@
import { evictInkCaches } from '@hermes/ink'
import { type HeapDumpResult, performHeapDump } from './memory.js'
export type MemoryLevel = 'critical' | 'high' | 'normal'
@ -71,6 +73,10 @@ export function startMemoryMonitor({
return
}
// Prune Ink content caches before dump/exit — half on 'high' (recoverable),
// full on 'critical' (post-dump RSS reduction, keeps user running).
evictInkCaches(level === 'critical' ? 'all' : 'half')
if (dumped.has(level) || inFlight.has(level)) {
return
}

View File

@ -142,12 +142,12 @@ export interface McpServerStatus {
export interface SessionInfo {
cwd?: string
fast?: boolean
lazy?: boolean
mcp_servers?: McpServerStatus[]
model: string
reasoning_effort?: string
release_date?: string
fast?: boolean;
lazy?: boolean;
mcp_servers?: McpServerStatus[];
model: string;
reasoning_effort?: string;
release_date?: string;
service_tier?: string
skills: Record<string, string[]>
tools: Record<string, string[]>

View File

@ -59,6 +59,32 @@ declare module '@hermes/ink' {
}>
}
export type FrameEvent = {
readonly durationMs: number
readonly phases?: {
readonly renderer: number
readonly diff: number
readonly optimize: number
readonly write: number
readonly patches: number
readonly optimizedPatches: number
readonly writeBytes: number
readonly backpressure: boolean
readonly prevFrameDrainMs: number
readonly yoga: number
readonly commit: number
readonly yogaVisited: number
readonly yogaMeasured: number
readonly yogaCacheHits: number
readonly yogaLive: number
}
readonly flickers: ReadonlyArray<{
readonly desiredHeight: number
readonly availableHeight: number
readonly reason: 'resize' | 'offscreen' | 'clear'
}>
}
export type RenderOptions = {
readonly stdin?: NodeJS.ReadStream
readonly stdout?: NodeJS.WriteStream