yuanbao platform (#16298)

Co-authored-by: loongzhao <loongzhao@tencent.com>
This commit is contained in:
Teknium 2026-04-26 18:50:49 -07:00 committed by GitHub
parent 5eb6cd82b2
commit ab6879634e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 10997 additions and 12 deletions

View File

@ -422,6 +422,29 @@ PLATFORM_HINTS = {
"your response. Images are sent as native photos, and other files arrive as downloadable "
"documents."
),
"yuanbao": (
"You are on Yuanbao (腾讯元宝), a Chinese AI assistant platform. "
"Markdown formatting is supported (code blocks, tables, bold/italic). "
"You CAN send media files natively — to deliver a file to the user, include "
"MEDIA:/absolute/path/to/file in your response. The file will be sent as a native "
"Yuanbao attachment: images (.jpg, .png, .webp, .gif) are sent as photos, "
"and other files (.pdf, .docx, .txt, .zip, etc.) arrive as downloadable documents "
"(max 50 MB). You can also include image URLs in markdown format ![alt](url) and "
"they will be downloaded and sent as native photos. "
"Do NOT tell the user you lack file-sending capability — use MEDIA: syntax "
"whenever a file delivery is appropriate.\n\n"
"Stickers (贴纸 / 表情包 / TIM face): Yuanbao has a built-in sticker catalogue. "
"When the user sends a sticker (you see '[emoji: 名称]' in their message) or asks "
"you to send/reply-with a 贴纸/表情/表情包, you MUST use the sticker tools:\n"
" 1. Call yb_search_sticker with a Chinese keyword (e.g. '666', '比心', '吃瓜', "
" '捂脸', '合十') to discover matching sticker_ids.\n"
" 2. Call yb_send_sticker with the chosen sticker_id or name — this sends a real "
" TIMFaceElem that renders as a native sticker in the chat.\n"
"DO NOT draw sticker-like PNGs with execute_code/Pillow/matplotlib and then send "
"them via MEDIA: or send_image_file. That produces a fake low-quality 'sticker' "
"image and is the WRONG path. Bare Unicode emoji in text is also not a substitute "
"— when a sticker is the right response, use yb_send_sticker."
),
}
# ---------------------------------------------------------------------------

View File

@ -606,6 +606,7 @@ platform_toolsets:
signal: [hermes-signal]
homeassistant: [hermes-homeassistant]
qqbot: [hermes-qqbot]
yuanbao: [hermes-yuanbao]
# =============================================================================
# Gateway Platform Settings

View File

@ -77,7 +77,7 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
"telegram", "discord", "slack", "whatsapp", "signal",
"matrix", "mattermost", "homeassistant", "dingtalk", "feishu",
"wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles",
"qqbot",
"qqbot", "yuanbao",
})
# Platforms that support a configured cron/notification home target, mapped to
@ -337,6 +337,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
"sms": Platform.SMS,
"bluebubbles": Platform.BLUEBUBBLES,
"qqbot": Platform.QQBOT,
"yuanbao": Platform.YUANBAO,
}
# Optionally wrap the content with a header/footer so the user knows this

View File

@ -67,6 +67,7 @@ class Platform(Enum):
WEIXIN = "weixin"
BLUEBUBBLES = "bluebubbles"
QQBOT = "qqbot"
YUANBAO = "yuanbao"
@dataclass
@ -326,6 +327,9 @@ class GatewayConfig:
# QQBot uses extra dict for app credentials
elif platform == Platform.QQBOT and config.extra.get("app_id") and config.extra.get("client_secret"):
connected.append(platform)
# Yuanbao uses extra dict for app credentials
elif platform == Platform.YUANBAO and config.extra.get("app_id") and config.extra.get("app_secret"):
connected.append(platform)
# DingTalk uses client_id/client_secret from config.extra or env vars
elif platform == Platform.DINGTALK and (
config.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID")
@ -1296,6 +1300,48 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("QQBOT_HOME_CHANNEL_NAME") or os.getenv(qq_home_name_env, "Home"),
)
# Yuanbao — YUANBAO_APP_ID preferred
yuanbao_app_id = os.getenv("YUANBAO_APP_ID") or os.getenv("YUANBAO_APP_KEY")
yuanbao_app_secret = os.getenv("YUANBAO_APP_SECRET")
if yuanbao_app_id and yuanbao_app_secret:
if Platform.YUANBAO not in config.platforms:
config.platforms[Platform.YUANBAO] = PlatformConfig()
config.platforms[Platform.YUANBAO].enabled = True
extra = config.platforms[Platform.YUANBAO].extra
extra["app_id"] = yuanbao_app_id
extra["app_secret"] = yuanbao_app_secret
yuanbao_bot_id = os.getenv("YUANBAO_BOT_ID")
if yuanbao_bot_id:
extra["bot_id"] = yuanbao_bot_id
yuanbao_ws_url = os.getenv("YUANBAO_WS_URL")
if yuanbao_ws_url:
extra["ws_url"] = yuanbao_ws_url
yuanbao_api_domain = os.getenv("YUANBAO_API_DOMAIN")
if yuanbao_api_domain:
extra["api_domain"] = yuanbao_api_domain
yuanbao_route_env = os.getenv("YUANBAO_ROUTE_ENV")
if yuanbao_route_env:
extra["route_env"] = yuanbao_route_env
yuanbao_home = os.getenv("YUANBAO_HOME_CHANNEL")
if yuanbao_home:
config.platforms[Platform.YUANBAO].home_channel = HomeChannel(
platform=Platform.YUANBAO,
chat_id=yuanbao_home,
name=os.getenv("YUANBAO_HOME_CHANNEL_NAME", "Home"),
)
yuanbao_dm_policy = os.getenv("YUANBAO_DM_POLICY")
if yuanbao_dm_policy:
extra["dm_policy"] = yuanbao_dm_policy.strip().lower()
yuanbao_dm_allow_from = os.getenv("YUANBAO_DM_ALLOW_FROM")
if yuanbao_dm_allow_from:
extra["dm_allow_from"] = yuanbao_dm_allow_from
yuanbao_group_policy = os.getenv("YUANBAO_GROUP_POLICY")
if yuanbao_group_policy:
extra["group_policy"] = yuanbao_group_policy.strip().lower()
yuanbao_group_allow_from = os.getenv("YUANBAO_GROUP_ALLOW_FROM")
if yuanbao_group_allow_from:
extra["group_allow_from"] = yuanbao_group_allow_from
# Session settings
idle_minutes = os.getenv("SESSION_IDLE_MINUTES")
if idle_minutes:

View File

@ -10,10 +10,12 @@ Each adapter handles:
from .base import BasePlatformAdapter, MessageEvent, SendResult
from .qqbot import QQAdapter
from .yuanbao import YuanbaoAdapter
__all__ = [
"BasePlatformAdapter",
"MessageEvent",
"SendResult",
"QQAdapter",
"YuanbaoAdapter",
]

4754
gateway/platforms/yuanbao.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,647 @@
"""
yuanbao_media.py 元宝平台媒体处理模块
提供 COS 上传文件下载TIM 媒体消息构建等功能
移植自 TypeScript media.tsyuanbao-openclaw-plugin
使用 httpx 替代 cos-nodejs-sdk-v5避免引入额外 SDK 依赖
COS 上传流程
1. 调用 genUploadInfo 获取临时凭证tmpSecretId/tmpSecretKey/sessionToken
2. 用临时凭证通过 HMAC-SHA1 签名构建 Authorization
3. HTTP PUT 上传到 COS
TIM 消息体构建
- buildImageMsgBody() TIMImageElem
- buildFileMsgBody() TIMFileElem
"""
from __future__ import annotations
import hashlib
import hmac
import logging
import os
import re
import secrets
import struct
import time
import urllib.parse
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
import httpx
logger = logging.getLogger(__name__)
# ============ 常量 ============
UPLOAD_INFO_PATH = "/api/resource/genUploadInfo"
DEFAULT_API_DOMAIN = "yuanbao.tencent.com"
DEFAULT_MAX_SIZE_MB = 50
# COS 加速域名后缀(优先使用全球加速)
COS_USE_ACCELERATE = True
# ============ 类型映射 ============
# MIME → image_format 数字TIM 协议字段)
_MIME_TO_IMAGE_FORMAT: dict[str, int] = {
"image/jpeg": 1,
"image/jpg": 1,
"image/gif": 2,
"image/png": 3,
"image/bmp": 4,
"image/webp": 255,
"image/heic": 255,
"image/tiff": 255,
}
# 文件扩展名 → MIME
_EXT_TO_MIME: dict[str, str] = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
".heic": "image/heic",
".tiff": "image/tiff",
".ico": "image/x-icon",
".pdf": "application/pdf",
".doc": "application/msword",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".xls": "application/vnd.ms-excel",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".ppt": "application/vnd.ms-powerpoint",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".txt": "text/plain",
".zip": "application/zip",
".tar": "application/x-tar",
".gz": "application/gzip",
".mp3": "audio/mpeg",
".mp4": "video/mp4",
".wav": "audio/wav",
".ogg": "audio/ogg",
".webm": "video/webm",
}
# ============ 工具函数 ============
def guess_mime_type(filename: str) -> str:
"""根据文件扩展名猜测 MIME 类型。"""
ext = os.path.splitext(filename)[-1].lower()
return _EXT_TO_MIME.get(ext, "application/octet-stream")
def is_image(filename: str, mime_type: str = "") -> bool:
"""判断是否为图片类型。"""
if mime_type.startswith("image/"):
return True
ext = os.path.splitext(filename)[-1].lower()
return ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".heic", ".tiff", ".ico"}
def get_image_format(mime_type: str) -> int:
"""获取 TIM 图片格式编号。"""
return _MIME_TO_IMAGE_FORMAT.get(mime_type.lower(), 255)
def md5_hex(data: bytes) -> str:
"""计算 MD5 十六进制摘要。"""
return hashlib.md5(data).hexdigest()
def generate_file_id() -> str:
"""生成随机文件 ID32 位 hex"""
return secrets.token_hex(16)
# ============ 图片尺寸解析(纯 Python无需 Pillow ============
def parse_image_size(data: bytes) -> Optional[dict[str, int]]:
"""
解析图片宽高支持 JPEG/PNG/GIF/WebP无需第三方依赖
返回 {"width": w, "height": h} None无法识别
"""
return (
_parse_png_size(data)
or _parse_jpeg_size(data)
or _parse_gif_size(data)
or _parse_webp_size(data)
)
def _parse_png_size(buf: bytes) -> Optional[dict[str, int]]:
if len(buf) < 24:
return None
if buf[:4] != b"\x89PNG":
return None
w = struct.unpack(">I", buf[16:20])[0]
h = struct.unpack(">I", buf[20:24])[0]
return {"width": w, "height": h}
def _parse_jpeg_size(buf: bytes) -> Optional[dict[str, int]]:
if len(buf) < 4 or buf[0] != 0xFF or buf[1] != 0xD8:
return None
i = 2
while i < len(buf) - 9:
if buf[i] != 0xFF:
i += 1
continue
marker = buf[i + 1]
if marker in (0xC0, 0xC2):
h = struct.unpack(">H", buf[i + 5: i + 7])[0]
w = struct.unpack(">H", buf[i + 7: i + 9])[0]
return {"width": w, "height": h}
if i + 3 < len(buf):
i += 2 + struct.unpack(">H", buf[i + 2: i + 4])[0]
else:
break
return None
def _parse_gif_size(buf: bytes) -> Optional[dict[str, int]]:
if len(buf) < 10:
return None
sig = buf[:6].decode("ascii", errors="replace")
if sig not in ("GIF87a", "GIF89a"):
return None
w = struct.unpack("<H", buf[6:8])[0]
h = struct.unpack("<H", buf[8:10])[0]
return {"width": w, "height": h}
def _parse_webp_size(buf: bytes) -> Optional[dict[str, int]]:
if len(buf) < 16:
return None
if buf[:4] != b"RIFF" or buf[8:12] != b"WEBP":
return None
chunk = buf[12:16].decode("ascii", errors="replace")
if chunk == "VP8 ":
if len(buf) >= 30 and buf[23] == 0x9D and buf[24] == 0x01 and buf[25] == 0x2A:
w = struct.unpack("<H", buf[26:28])[0] & 0x3FFF
h = struct.unpack("<H", buf[28:30])[0] & 0x3FFF
return {"width": w, "height": h}
elif chunk == "VP8L":
if len(buf) >= 25 and buf[20] == 0x2F:
bits = struct.unpack("<I", buf[21:25])[0]
w = (bits & 0x3FFF) + 1
h = ((bits >> 14) & 0x3FFF) + 1
return {"width": w, "height": h}
elif chunk == "VP8X":
if len(buf) >= 30:
w = (buf[24] | (buf[25] << 8) | (buf[26] << 16)) + 1
h = (buf[27] | (buf[28] << 8) | (buf[29] << 16)) + 1
return {"width": w, "height": h}
return None
# ============ URL 下载 ============
async def download_url(
url: str,
max_size_mb: int = DEFAULT_MAX_SIZE_MB,
) -> tuple[bytes, str]:
"""
下载 URL 内容返回 (bytes, content_type)
Args:
url: HTTP(S) URL
max_size_mb: 最大允许大小MB超过则抛出异常
Returns:
(data_bytes, content_type_string)
Raises:
ValueError: 内容超过大小限制
httpx.HTTPError: 网络/HTTP 错误
"""
max_bytes = max_size_mb * 1024 * 1024
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
# 先 HEAD 检查大小
try:
head = await client.head(url)
content_length = int(head.headers.get("content-length", 0) or 0)
if content_length > 0 and content_length > max_bytes:
raise ValueError(
f"文件过大: {content_length / 1024 / 1024:.1f} MB > {max_size_mb} MB"
)
except httpx.HTTPStatusError:
pass # 部分服务器不支持 HEAD忽略
# GET 下载(流式读取,防止超限)
async with client.stream("GET", url) as resp:
resp.raise_for_status()
content_type = resp.headers.get("content-type", "").split(";")[0].strip()
chunks: list[bytes] = []
downloaded = 0
async for chunk in resp.aiter_bytes(65536):
downloaded += len(chunk)
if downloaded > max_bytes:
raise ValueError(
f"文件过大: 已超过 {max_size_mb} MB 限制"
)
chunks.append(chunk)
data = b"".join(chunks)
return data, content_type
# ============ COS 鉴权HMAC-SHA1 ============
def _cos_sign(
method: str,
path: str,
params: dict[str, str],
headers: dict[str, str],
secret_id: str,
secret_key: str,
start_time: Optional[int] = None,
expire_seconds: int = 3600,
) -> str:
"""
构建 COS 请求签名q-sign-algorithm=sha1 方案
参考https://cloud.tencent.com/document/product/436/7778
Args:
method: HTTP 方法小写 "put"
path: URL 路径URL encode 后的小写
params: URL 查询参数 dict用于签名
headers: 参与签名的请求头 dictkey 需小写
secret_id: 临时 SecretIdtmpSecretId
secret_key: 临时 SecretKeytmpSecretKey
start_time: 签名起始 Unix 时间戳默认 now
expire_seconds: 签名有效期默认 3600
Returns:
Authorization header 完整字符串
"""
now = int(time.time())
q_sign_time = f"{start_time or now};{(start_time or now) + expire_seconds}"
# Step 1: SignKey = HMAC-SHA1(SecretKey, q-sign-time)
sign_key = hmac.new(
secret_key.encode("utf-8"),
q_sign_time.encode("utf-8"),
hashlib.sha1,
).hexdigest()
# Step 2: HttpString
# 参数和头部需按字典序排列key 小写
sorted_params = sorted((k.lower(), urllib.parse.quote(str(v), safe="") ) for k, v in params.items())
sorted_headers = sorted((k.lower(), urllib.parse.quote(str(v), safe="") ) for k, v in headers.items())
url_param_list = ";".join(k for k, _ in sorted_params)
url_params = "&".join(f"{k}={v}" for k, v in sorted_params)
header_list = ";".join(k for k, _ in sorted_headers)
header_str = "&".join(f"{k}={v}" for k, v in sorted_headers)
http_string = "\n".join([
method.lower(),
path,
url_params,
header_str,
"",
])
# Step 3: StringToSign = sha1 hash of HttpString
sha1_of_http = hashlib.sha1(http_string.encode("utf-8")).hexdigest()
string_to_sign = "\n".join([
"sha1",
q_sign_time,
sha1_of_http,
"",
])
# Step 4: Signature = HMAC-SHA1(SignKey, StringToSign)
signature = hmac.new(
sign_key.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha1,
).hexdigest()
return (
f"q-sign-algorithm=sha1"
f"&q-ak={secret_id}"
f"&q-sign-time={q_sign_time}"
f"&q-key-time={q_sign_time}"
f"&q-header-list={header_list}"
f"&q-url-param-list={url_param_list}"
f"&q-signature={signature}"
)
# ============ 主要公开 API ============
async def get_cos_credentials(
app_key: str,
api_domain: str,
token: str,
filename: str = "file",
file_id: Optional[str] = None,
bot_id: str = "",
route_env: str = "",
) -> dict:
"""
调用 genUploadInfo 接口获取 COS 临时密钥及上传配置
Args:
app_key: 应用 Key用于 X-ID
api_domain: API 域名 https://bot.yuanbao.tencent.com
token: 当前有效的签票 tokenX-Token
filename: 待上传的文件名含扩展名
file_id: 客户端生成的唯一文件 ID不传则自动生成
bot_id: Bot 账号 ID用于 X-ID
Returns:
COS 上传配置 dict包含以下字段
bucketName (str) COS Bucket 名称
region (str) COS 地域
location (str) 上传 Key对象路径
encryptTmpSecretId (str) 临时 SecretId
encryptTmpSecretKey(str) 临时 SecretKey
encryptToken (str) SessionToken
startTime (int) 凭证起始时间戳Unix
expiredTime (int) 凭证过期时间戳Unix
resourceUrl (str) 上传后的公网访问 URL
resourceID (str) 资源 ID可选
Raises:
RuntimeError: 接口返回非 0 code 或字段缺失
"""
if file_id is None:
file_id = generate_file_id()
upload_url = f"{api_domain.rstrip('/')}{UPLOAD_INFO_PATH}"
headers = {
"Content-Type": "application/json",
"X-Token": token,
"X-ID": bot_id or app_key,
"X-Source": "web",
}
if route_env:
headers["X-Route-Env"] = route_env
body = {
"fileName": filename,
"fileId": file_id,
"docFrom": "localDoc",
"docOpenId": "",
}
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(upload_url, json=body, headers=headers)
resp.raise_for_status()
result: dict[str, Any] = resp.json()
code = result.get("code")
if code != 0 and code is not None:
raise RuntimeError(
f"genUploadInfo 失败: code={code}, msg={result.get('msg', '')}"
)
data = result.get("data") or result
required_fields = ["bucketName", "location"]
missing = [f for f in required_fields if not data.get(f)]
if missing:
raise RuntimeError(
f"genUploadInfo 返回字段不完整: 缺少字段 {missing}"
)
return data
async def upload_to_cos(
file_bytes: bytes,
filename: str,
content_type: str,
credentials: dict,
bucket: str,
region: str,
) -> dict:
"""
通过 httpx PUT 请求将文件上传到 COS
使用临时凭证tmpSecretId/tmpSecretKey/sessionToken构建 HMAC-SHA1 签名
Args:
file_bytes: 文件二进制内容
filename: 文件名用于辅助计算 MIMEUUID
content_type: MIME 类型 "image/jpeg"
credentials: get_cos_credentials() 返回的 dict包含
encryptTmpSecretId tmpSecretId
encryptTmpSecretKey tmpSecretKey
encryptToken sessionToken
location COS key对象路径
resourceUrl 上传后公网 URL
startTime 凭证起始时间Unix
expiredTime 凭证过期时间Unix
bucket: COS Bucket 名称 chatbot-1234567890
region: COS 地域 ap-guangzhou
Returns:
上传结果 dict包含
url (str) COS 公网访问 URL
uuid (str) 文件内容 MD5
size (int) 文件大小字节
width (int, optional) 图片宽度仅图片
height (int, optional) 图片高度仅图片
Raises:
httpx.HTTPStatusError: COS 返回非 2xx 状态
RuntimeError: credentials 字段缺失
"""
secret_id: str = credentials.get("encryptTmpSecretId", "")
secret_key: str = credentials.get("encryptTmpSecretKey", "")
session_token: str = credentials.get("encryptToken", "")
cos_key: str = credentials.get("location", "")
resource_url: str = credentials.get("resourceUrl", "")
start_time: Optional[int] = credentials.get("startTime")
expired_time: Optional[int] = credentials.get("expiredTime")
if not secret_id or not secret_key or not cos_key:
raise RuntimeError(
f"COS credentials 不完整: secretId={bool(secret_id)}, "
f"secretKey={bool(secret_key)}, location={bool(cos_key)}"
)
# 构建 COS 上传 URL优先使用全球加速域名
if COS_USE_ACCELERATE:
cos_host = f"{bucket}.cos.accelerate.myqcloud.com"
else:
cos_host = f"{bucket}.cos.{region}.myqcloud.com"
# URL encode cos_key保留 /
encoded_key = urllib.parse.quote(cos_key, safe="/")
cos_url = f"https://{cos_host}/{encoded_key.lstrip('/')}"
# 确定 Content-Type
if not content_type or content_type == "application/octet-stream":
if is_image(filename):
content_type = guess_mime_type(filename)
else:
content_type = "application/octet-stream"
# 计算文件 MD5 + size
file_uuid = md5_hex(file_bytes)
file_size = len(file_bytes)
# 参与签名的请求头
sign_headers = {
"host": cos_host,
"content-type": content_type,
"x-cos-security-token": session_token,
}
# 计算签名有效期
now = int(time.time())
sign_start = start_time if start_time else now
sign_expire = (expired_time - now) if expired_time and expired_time > now else 3600
authorization = _cos_sign(
method="put",
path=f"/{encoded_key.lstrip('/')}",
params={},
headers=sign_headers,
secret_id=secret_id,
secret_key=secret_key,
start_time=sign_start,
expire_seconds=sign_expire,
)
put_headers = {
"Authorization": authorization,
"Content-Type": content_type,
"x-cos-security-token": session_token,
}
logger.info(
"COS PUT: bucket=%s region=%s key=%s size=%d mime=%s",
bucket, region, cos_key, file_size, content_type,
)
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.put(
cos_url,
content=file_bytes,
headers=put_headers,
)
resp.raise_for_status()
# 解析图片尺寸(仅图片类型)
result: dict[str, Any] = {
"url": resource_url or cos_url,
"uuid": file_uuid,
"size": file_size,
}
if content_type.startswith("image/"):
size_info = parse_image_size(file_bytes)
if size_info:
result["width"] = size_info["width"]
result["height"] = size_info["height"]
logger.info(
"COS 上传成功: url=%s size=%d",
result["url"], file_size,
)
return result
# ============ TIM 媒体消息构建 ============
def build_image_msg_body(
url: str,
uuid: Optional[str] = None,
filename: Optional[str] = None,
size: int = 0,
width: int = 0,
height: int = 0,
mime_type: str = "",
) -> list[dict]:
"""
构建腾讯 IM TIMImageElem 消息体
参考https://cloud.tencent.com/document/product/269/2720
Args:
url: 图片公网访问 URLCOS resourceUrl
uuid: 文件 UUIDMD5 或其他唯一标识
filename: 文件名uuid 为空时作为备用
size: 文件大小字节
width: 图片宽度像素
height: 图片高度像素
mime_type: MIME 类型用于确定 image_format
Returns:
TIMImageElem 消息体列表适合直接放入 msg_body
"""
_uuid = uuid or filename or _basename_from_url(url) or "image"
image_format = get_image_format(mime_type) if mime_type else 255
return [
{
"msg_type": "TIMImageElem",
"msg_content": {
"uuid": _uuid,
"image_format": image_format,
"image_info_array": [
{
"type": 1, # 1 = 原图
"size": size,
"width": width,
"height": height,
"url": url,
}
],
},
}
]
def build_file_msg_body(
url: str,
filename: str,
uuid: Optional[str] = None,
size: int = 0,
) -> list[dict]:
"""
构建腾讯 IM TIMFileElem 消息体
参考https://cloud.tencent.com/document/product/269/2720
Args:
url: 文件公网访问 URLCOS resourceUrl
filename: 文件名含扩展名
uuid: 文件 UUIDMD5 或其他唯一标识不传则使用 filename
size: 文件大小字节
Returns:
TIMFileElem 消息体列表适合直接放入 msg_body
"""
_uuid = uuid or filename
return [
{
"msg_type": "TIMFileElem",
"msg_content": {
"uuid": _uuid,
"file_name": filename,
"file_size": size,
"url": url,
},
}
]
# ============ 内部工具 ============
def _basename_from_url(url: str) -> str:
"""从 URL 提取文件名。"""
try:
parsed = urllib.parse.urlparse(url)
return os.path.basename(parsed.path)
except Exception:
return ""

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,558 @@
"""
Yuanbao sticker (TIMFaceElem) support.
Ported from yuanbao-openclaw-plugin/src/sticker/.
TIMFaceElem wire format:
{
"msg_type": "TIMFaceElem",
"msg_content": {
"index": 0, # always 0 per Yuanbao convention
"data": "<json>", # serialised sticker metadata
}
}
The `data` field carries a JSON string with the sticker's metadata so the
receiver can look up the correct asset in the emoji pack.
"""
from __future__ import annotations
import json
import random
import re
import unicodedata
from typing import Optional
# ---------------------------------------------------------------------------
# Sticker catalogue ported from builtin-stickers.json
# Key : canonical name (Chinese)
# Value : {sticker_id, package_id, name, description, width, height, formats}
# ---------------------------------------------------------------------------
STICKER_MAP: dict[str, dict] = {
"六六六": {
"sticker_id": "278", "package_id": "1003", "name": "六六六",
"description": "666 厉害 牛 棒 绝了 好强 awesome",
"width": 128, "height": 128, "formats": "png",
},
"我想开了": {
"sticker_id": "262", "package_id": "1003", "name": "我想开了",
"description": "想开 佛系 释怀 顿悟 看淡了 无所谓",
"width": 128, "height": 128, "formats": "png",
},
"害羞": {
"sticker_id": "130", "package_id": "1003", "name": "害羞",
"description": "腼腆 不好意思 脸红 娇羞 羞涩 捂脸",
"width": 128, "height": 128, "formats": "png",
},
"比心": {
"sticker_id": "252", "package_id": "1003", "name": "比心",
"description": "笔芯 爱你 爱心手势 love heart 喜欢你",
"width": 128, "height": 128, "formats": "png",
},
"委屈": {
"sticker_id": "125", "package_id": "1003", "name": "委屈",
"description": "难过 想哭 可怜巴巴 瘪嘴 受伤 被欺负",
"width": 128, "height": 128, "formats": "png",
},
"亲亲": {
"sticker_id": "146", "package_id": "1003", "name": "亲亲",
"description": "么么 mua 亲一下 kiss 飞吻 啵",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "131", "package_id": "1003", "name": "",
"description": "帅 墨镜 cool 高冷 有型 swagger",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "145", "package_id": "1003", "name": "",
"description": "睡觉 困 zzZ 打盹 躺平 休眠 sleepy",
"width": 128, "height": 128, "formats": "png",
},
"发呆": {
"sticker_id": "152", "package_id": "1003", "name": "发呆",
"description": "懵 愣住 放空 呆滞 出神 脑子空白",
"width": 128, "height": 128, "formats": "png",
},
"可怜": {
"sticker_id": "157", "package_id": "1003", "name": "可怜",
"description": "卖萌 求饶 委屈巴巴 弱小 拜托 眼巴巴",
"width": 128, "height": 128, "formats": "png",
},
"摊手": {
"sticker_id": "200", "package_id": "1003", "name": "摊手",
"description": "无奈 没办法 耸肩 随便 那咋整 whatever",
"width": 128, "height": 128, "formats": "png",
},
"头大": {
"sticker_id": "213", "package_id": "1003", "name": "头大",
"description": "头疼 烦恼 郁闷 难搞 崩溃 一团乱",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "256", "package_id": "1003", "name": "",
"description": "害怕 惊恐 震惊 吓一跳 恐怖 怂",
"width": 128, "height": 128, "formats": "png",
},
"吐血": {
"sticker_id": "203", "package_id": "1003", "name": "吐血",
"description": "无语 崩溃 被雷 内伤 一口老血 屮",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "185", "package_id": "1003", "name": "",
"description": "傲娇 生气 不满 撇嘴 不理 赌气",
"width": 128, "height": 128, "formats": "png",
},
"嘿嘿": {
"sticker_id": "220", "package_id": "1003", "name": "嘿嘿",
"description": "坏笑 猥琐笑 偷笑 憨笑 得意 你懂的",
"width": 128, "height": 128, "formats": "png",
},
"头秃": {
"sticker_id": "218", "package_id": "1003", "name": "头秃",
"description": "程序员 加班 焦虑 没头发 秃了 肝爆",
"width": 128, "height": 128, "formats": "png",
},
"暗中观察": {
"sticker_id": "221", "package_id": "1003", "name": "暗中观察",
"description": "窥屏 潜水 偷偷看 角落 围观 屏住呼吸",
"width": 128, "height": 128, "formats": "png",
},
"我酸了": {
"sticker_id": "224", "package_id": "1003", "name": "我酸了",
"description": "嫉妒 柠檬精 羡慕 吃柠檬 眼红 恰柠檬",
"width": 128, "height": 128, "formats": "png",
},
"打call": {
"sticker_id": "246", "package_id": "1003", "name": "打call",
"description": "应援 加油 支持 喝彩 助威 call",
"width": 128, "height": 128, "formats": "png",
},
"庆祝": {
"sticker_id": "251", "package_id": "1003", "name": "庆祝",
"description": "祝贺 开心 耶 party 胜利 干杯",
"width": 128, "height": 128, "formats": "png",
},
"奋斗": {
"sticker_id": "151", "package_id": "1003", "name": "奋斗",
"description": "努力 加油 拼搏 冲 干劲 卷起来",
"width": 128, "height": 128, "formats": "png",
},
"惊讶": {
"sticker_id": "143", "package_id": "1003", "name": "惊讶",
"description": "震惊 哇 不敢相信 OMG 居然 这么离谱",
"width": 128, "height": 128, "formats": "png",
},
"疑问": {
"sticker_id": "144", "package_id": "1003", "name": "疑问",
"description": "问号 不懂 啥 为什么 啥情况 懵逼问",
"width": 128, "height": 128, "formats": "png",
},
"仔细分析": {
"sticker_id": "248", "package_id": "1003", "name": "仔细分析",
"description": "思考 推敲 认真 研究 琢磨 让我想想",
"width": 128, "height": 128, "formats": "png",
},
"撅嘴": {
"sticker_id": "184", "package_id": "1003", "name": "撅嘴",
"description": "嘟嘴 卖萌 不高兴 撒娇 嘴翘",
"width": 128, "height": 128, "formats": "png",
},
"泪奔": {
"sticker_id": "199", "package_id": "1003", "name": "泪奔",
"description": "大哭 伤心 破防 感动哭 泪流满面 呜呜",
"width": 128, "height": 128, "formats": "png",
},
"尊嘟假嘟": {
"sticker_id": "276", "package_id": "1003", "name": "尊嘟假嘟",
"description": "真的假的 真假 可爱问 你骗我 是不是",
"width": 128, "height": 128, "formats": "png",
},
"略略略": {
"sticker_id": "113", "package_id": "1003", "name": "略略略",
"description": "调皮 吐舌 不服 略 气死你 鬼脸",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "180", "package_id": "1003", "name": "",
"description": "想睡 倦 打哈欠 睁不开眼 好困啊 sleepy",
"width": 128, "height": 128, "formats": "png",
},
"折磨": {
"sticker_id": "181", "package_id": "1003", "name": "折磨",
"description": "难受 痛苦 煎熬 蚌埠住了 受不了 要命",
"width": 128, "height": 128, "formats": "png",
},
"抠鼻": {
"sticker_id": "182", "package_id": "1003", "name": "抠鼻",
"description": "不屑 无聊 淡定 无所谓 鄙视 挖鼻",
"width": 128, "height": 128, "formats": "png",
},
"鼓掌": {
"sticker_id": "183", "package_id": "1003", "name": "鼓掌",
"description": "拍手 叫好 赞同 666 喝彩 掌声",
"width": 128, "height": 128, "formats": "png",
},
"斜眼笑": {
"sticker_id": "204", "package_id": "1003", "name": "斜眼笑",
"description": "滑稽 坏笑 doge 意味深长 阴阳怪气 嘿嘿嘿",
"width": 128, "height": 128, "formats": "png",
},
"辣眼睛": {
"sticker_id": "216", "package_id": "1003", "name": "辣眼睛",
"description": "看不下去 cringe 毁三观 太丑了 瞎了",
"width": 128, "height": 128, "formats": "png",
},
"哦哟": {
"sticker_id": "217", "package_id": "1003", "name": "哦哟",
"description": "惊讶 起哄 哇哦 有戏 不简单 哟",
"width": 128, "height": 128, "formats": "png",
},
"吃瓜": {
"sticker_id": "222", "package_id": "1003", "name": "吃瓜",
"description": "围观 看戏 八卦 路人 看热闹 板凳",
"width": 128, "height": 128, "formats": "png",
},
"狗头": {
"sticker_id": "225", "package_id": "1003", "name": "狗头",
"description": "doge 保命 开玩笑 滑稽 反讽 懂的都懂",
"width": 128, "height": 128, "formats": "png",
},
"敬礼": {
"sticker_id": "227", "package_id": "1003", "name": "敬礼",
"description": "salute 尊重 收到 遵命 致敬 报告",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "231", "package_id": "1003", "name": "",
"description": "知道了 明白 敷衍 嗯 这样啊 收到",
"width": 128, "height": 128, "formats": "png",
},
"拿到红包": {
"sticker_id": "236", "package_id": "1003", "name": "拿到红包",
"description": "红包 谢谢老板 发财 开心 抢到了 欧气",
"width": 128, "height": 128, "formats": "png",
},
"牛吖": {
"sticker_id": "239", "package_id": "1003", "name": "牛吖",
"description": "牛 厉害 强 666 佩服 大佬",
"width": 128, "height": 128, "formats": "png",
},
"贴贴": {
"sticker_id": "272", "package_id": "1003", "name": "贴贴",
"description": "抱抱 亲昵 蹭蹭 亲密 靠靠 撒娇贴",
"width": 128, "height": 128, "formats": "png",
},
"爱心": {
"sticker_id": "138", "package_id": "1003", "name": "爱心",
"description": "心 love 喜欢你 红心 示爱 么么哒",
"width": 128, "height": 128, "formats": "png",
},
"晚安": {
"sticker_id": "170", "package_id": "1003", "name": "晚安",
"description": "好梦 睡了 night 早点休息 安啦 moon",
"width": 128, "height": 128, "formats": "png",
},
"太阳": {
"sticker_id": "176", "package_id": "1003", "name": "太阳",
"description": "晴天 早上好 阳光 morning 好天气 日",
"width": 128, "height": 128, "formats": "png",
},
"柠檬": {
"sticker_id": "266", "package_id": "1003", "name": "柠檬",
"description": "酸 嫉妒 柠檬精 羡慕 我酸 恰柠檬",
"width": 128, "height": 128, "formats": "png",
},
"大冤种": {
"sticker_id": "267", "package_id": "1003", "name": "大冤种",
"description": "倒霉 吃亏 自嘲 好心没好报 背锅 工具人",
"width": 128, "height": 128, "formats": "png",
},
"吐了": {
"sticker_id": "132", "package_id": "1003", "name": "吐了",
"description": "恶心 yue 受不了 嫌弃 想吐 生理不适",
"width": 128, "height": 128, "formats": "png",
},
"": {
"sticker_id": "134", "package_id": "1003", "name": "",
"description": "生气 愤怒 火大 暴躁 气炸 怼",
"width": 128, "height": 128, "formats": "png",
},
"玫瑰": {
"sticker_id": "165", "package_id": "1003", "name": "玫瑰",
"description": "花 示爱 表白 浪漫 送你花 情人节",
"width": 128, "height": 128, "formats": "png",
},
"凋谢": {
"sticker_id": "119", "package_id": "1003", "name": "凋谢",
"description": "花谢 失恋 难过 枯萎 心碎 凉了",
"width": 128, "height": 128, "formats": "png",
},
"点赞": {
"sticker_id": "159", "package_id": "1003", "name": "点赞",
"description": "赞 认同 好棒 good like 大拇指 顶",
"width": 128, "height": 128, "formats": "png",
},
"握手": {
"sticker_id": "164", "package_id": "1003", "name": "握手",
"description": "合作 你好 商务 hello deal 成交 友好",
"width": 128, "height": 128, "formats": "png",
},
"抱拳": {
"sticker_id": "163", "package_id": "1003", "name": "抱拳",
"description": "谢谢 失敬 江湖 承让 拜托 有礼",
"width": 128, "height": 128, "formats": "png",
},
"ok": {
"sticker_id": "169", "package_id": "1003", "name": "ok",
"description": "好的 收到 没问题 okay 行 可以 懂了",
"width": 128, "height": 128, "formats": "png",
},
"拳头": {
"sticker_id": "174", "package_id": "1003", "name": "拳头",
"description": "加油 干 冲 fight 力量 击拳 硬气",
"width": 128, "height": 128, "formats": "png",
},
"鞭炮": {
"sticker_id": "191", "package_id": "1003", "name": "鞭炮",
"description": "过年 喜庆 爆竹 春节 噼里啪啦 红",
"width": 128, "height": 128, "formats": "png",
},
"烟花": {
"sticker_id": "258", "package_id": "1003", "name": "烟花",
"description": "庆典 漂亮 新年 嘭 绽放 节日快乐",
"width": 128, "height": 128, "formats": "png",
},
}
def get_sticker_by_name(name: str) -> Optional[dict]:
"""
按名称查找贴纸支持模糊匹配
匹配优先级
1. 完全相等name
2. name 包含查询词前缀/子串
3. description 包含查询词同义词搜索
4. 通用模糊评分 sticker-search 同算法命中即返回得分最高的一条
返回 sticker dict找不到返回 None
"""
if not name:
return None
query = name.strip()
if query in STICKER_MAP:
return STICKER_MAP[query]
for key, sticker in STICKER_MAP.items():
if query in key or key in query:
return sticker
for sticker in STICKER_MAP.values():
desc = sticker.get("description", "")
if query in desc:
return sticker
matches = search_stickers(query, limit=1)
return matches[0] if matches else None
def get_random_sticker(category: str = None) -> dict:
"""
随机返回一个贴纸
若指定 category则在 description 中含有该关键词的贴纸里随机选取
category None 时从全表随机
"""
if category:
candidates = [
s for s in STICKER_MAP.values()
if category in s.get("description", "") or category in s.get("name", "")
]
if candidates:
return random.choice(candidates)
return random.choice(list(STICKER_MAP.values()))
def get_sticker_by_id(sticker_id: str) -> Optional[dict]:
"""按 sticker_id 精确查找贴纸。"""
if not sticker_id:
return None
sid = str(sticker_id).strip()
for sticker in STICKER_MAP.values():
if sticker.get("sticker_id") == sid:
return sticker
return None
# ---------------------------------------------------------------------------
# 模糊搜索(对齐 chatbot-web yuanbao-openclaw-plugin/sticker-cache.ts.searchStickers
# ---------------------------------------------------------------------------
_PUNCT_RE = re.compile(r"[\s\u3000\-_·.,,。!?\"“”'‘’、/\\]+")
def _normalize_text(raw: str) -> str:
return unicodedata.normalize("NFKC", str(raw or "")).strip().lower()
def _compact_text(raw: str) -> str:
return _PUNCT_RE.sub("", _normalize_text(raw))
def _multiset_char_hit_ratio(needle: str, haystack: str) -> float:
if not needle:
return 0.0
bag: dict[str, int] = {}
for ch in haystack:
bag[ch] = bag.get(ch, 0) + 1
hits = 0
for ch in needle:
n = bag.get(ch, 0)
if n > 0:
hits += 1
bag[ch] = n - 1
return hits / len(needle)
def _bigram_jaccard(a: str, b: str) -> float:
if len(a) < 2 or len(b) < 2:
return 0.0
A = {a[i:i + 2] for i in range(len(a) - 1)}
B = {b[i:i + 2] for i in range(len(b) - 1)}
inter = len(A & B)
union = len(A) + len(B) - inter
return inter / union if union else 0.0
def _longest_subsequence_ratio(needle: str, haystack: str) -> float:
if not needle:
return 0.0
j = 0
for ch in haystack:
if j >= len(needle):
break
if ch == needle[j]:
j += 1
return j / len(needle)
def _score_field(haystack: str, query: str) -> float:
hay = _normalize_text(haystack)
q = _normalize_text(query)
if not hay or not q:
return 0.0
hay_c = _compact_text(haystack)
q_c = _compact_text(query)
best = 0.0
if hay == q:
best = max(best, 100.0)
if q in hay:
best = max(best, 92 + min(6, len(q)))
if len(q) >= 2 and hay.startswith(q):
best = max(best, 88.0)
if q_c and q_c in hay_c:
best = max(best, 86.0)
best = max(best, _multiset_char_hit_ratio(q_c, hay_c) * 62)
best = max(best, _bigram_jaccard(q_c, hay_c) * 58)
best = max(best, _longest_subsequence_ratio(q_c, hay_c) * 52)
if len(q) == 1 and q in hay:
best = max(best, 68.0)
return best
def search_stickers(query: str, limit: int = 10) -> list[dict]:
"""
在内置贴纸表中按模糊匹配排序返回前 N 条结果
评分综合 name/description 字段的子串字符多重集覆盖bigram Jaccard子序列比例
name 权重略高于 description×0.88 query 时按字典顺序返回前 N
"""
safe_limit = max(1, min(500, int(limit) if limit else 10))
if not query or not _normalize_text(query):
return list(STICKER_MAP.values())[:safe_limit]
scored: list[tuple[float, dict]] = []
for sticker in STICKER_MAP.values():
name_s = _score_field(sticker.get("name", ""), query)
desc_s = _score_field(sticker.get("description", ""), query) * 0.88
sid = str(sticker.get("sticker_id", "")).strip()
q_norm = _normalize_text(query)
id_s = 0.0
if sid and q_norm:
sid_norm = _normalize_text(sid)
if sid_norm == q_norm:
id_s = 100.0
elif q_norm in sid_norm:
id_s = 84.0
scored.append((max(name_s, desc_s, id_s), sticker))
scored.sort(key=lambda x: x[0], reverse=True)
top = scored[0][0] if scored else 0
if top <= 0:
return [s for _, s in scored[:safe_limit]]
if top >= 22:
floor = 18.0
elif top >= 12:
floor = max(10.0, top * 0.5)
else:
floor = max(6.0, top * 0.35)
filtered = [pair for pair in scored if pair[0] >= floor]
out = filtered if filtered else scored
return [s for _, s in out[:safe_limit]]
def build_face_msg_body(
face_index: int,
face_type: int = 1,
data: Optional[str] = None,
) -> list:
"""
构造 TIMFaceElem 消息体
Yuanbao 约定
- index 固定传 0服务端通过 data 字段识别具体表情
- data JSON 字符串包含 sticker_id / package_id 等字段
Args:
face_index: 保留字段暂时不影响 wire formatYuanbao 固定 index=0
face_index > 0 时视为旧版 QQ 表情 ID直接放入 index
face_type: 保留字段兼容旧接口当前未使用
data: 已序列化的 JSON 字符串 None 时仅传 index
Returns:
符合 Yuanbao TIM 协议的 msg_body list::
[{"msg_type": "TIMFaceElem", "msg_content": {"index": 0, "data": "..."}}]
"""
msg_content: dict = {"index": face_index}
if data is not None:
msg_content["data"] = data
return [{"msg_type": "TIMFaceElem", "msg_content": msg_content}]
def build_sticker_msg_body(sticker: dict) -> list:
"""
STICKER_MAP 中的 sticker dict 直接构造 TIMFaceElem 消息体
这是 send_sticker() 的内部辅助确保 data 字段与原始 JS 插件一致
"""
data_payload = json.dumps(
{
"sticker_id": sticker["sticker_id"],
"package_id": sticker["package_id"],
"width": sticker.get("width", 128),
"height": sticker.get("height", 128),
"formats": sticker.get("formats", "png"),
"name": sticker["name"],
},
ensure_ascii=False,
separators=(",", ":"),
)
return build_face_msg_body(face_index=0, data=data_payload)

View File

@ -2123,6 +2123,7 @@ class GatewayRunner:
"WEIXIN_ALLOWED_USERS",
"BLUEBUBBLES_ALLOWED_USERS",
"QQ_ALLOWED_USERS",
"YUANBAO_ALLOWED_USERS",
"GATEWAY_ALLOWED_USERS")
)
_allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any(
@ -2137,7 +2138,8 @@ class GatewayRunner:
"WECOM_CALLBACK_ALLOW_ALL_USERS",
"WEIXIN_ALLOW_ALL_USERS",
"BLUEBUBBLES_ALLOW_ALL_USERS",
"QQ_ALLOW_ALL_USERS")
"QQ_ALLOW_ALL_USERS",
"YUANBAO_ALLOW_ALL_USERS")
)
if not _any_allowlist and not _allow_all:
logger.warning(
@ -3114,8 +3116,14 @@ class GatewayRunner:
return None
return QQAdapter(config)
elif platform == Platform.YUANBAO:
from gateway.platforms.yuanbao import YuanbaoAdapter, WEBSOCKETS_AVAILABLE
if not WEBSOCKETS_AVAILABLE:
logger.warning("Yuanbao: websockets not installed. Run: pip install websockets")
return None
return YuanbaoAdapter(config)
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
"""
Check if a user is authorized to use the bot.
@ -3156,6 +3164,7 @@ class GatewayRunner:
Platform.WEIXIN: "WEIXIN_ALLOWED_USERS",
Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOWED_USERS",
Platform.QQBOT: "QQ_ALLOWED_USERS",
Platform.YUANBAO: "YUANBAO_ALLOWED_USERS",
}
platform_group_env_map = {
Platform.TELEGRAM: "TELEGRAM_GROUP_ALLOWED_USERS",
@ -3178,6 +3187,7 @@ class GatewayRunner:
Platform.WEIXIN: "WEIXIN_ALLOW_ALL_USERS",
Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOW_ALL_USERS",
Platform.QQBOT: "QQ_ALLOW_ALL_USERS",
Platform.YUANBAO: "YUANBAO_ALLOW_ALL_USERS",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)

View File

@ -354,6 +354,14 @@ def build_session_context_prompt(
"If the user needs a detailed answer, give the short version first "
"and offer to elaborate."
)
elif context.source.platform == Platform.YUANBAO:
lines.append("")
lines.append(
"**Platform notes:** You are running inside Yuanbao. "
"You CAN send private (DM) messages via the send_message tool. "
"Use target='yuanbao:direct:<account_id>' for DM "
"and target='yuanbao:group:<group_code>' for group chat."
)
# Connected platforms
platforms_list = ["local (files on this machine)"]

View File

@ -2724,6 +2724,24 @@ _PLATFORMS = [
"help": "OpenID to deliver cron results and notifications to."},
],
},
{
"key": "yuanbao",
"label": "Yuanbao",
"emoji": "💎",
"token_var": "YUANBAO_APP_ID",
"setup_instructions": [
"1. Download the Yuanbao app from https://yuanbao.tencent.com/",
"2. In the app, go to PAI → My Bot and create a new bot",
"3. After the bot is created, copy the App ID and App Secret",
"4. Enter them below and Hermes will connect automatically over WebSocket",
],
"vars": [
{"name": "YUANBAO_APP_ID", "prompt": "App ID", "password": False,
"help": "The App ID from your Yuanbao IM Bot credentials."},
{"name": "YUANBAO_APP_SECRET", "prompt": "App Secret", "password": True,
"help": "The App Secret (used for HMAC signing) from your Yuanbao IM Bot."},
],
},
]
@ -3108,6 +3126,12 @@ def _setup_wecom():
print_success("💬 WeCom configured!")
def _setup_yuanbao():
"""Configure Yuanbao via the standard platform setup."""
yuanbao_platform = next(p for p in _PLATFORMS if p["key"] == "yuanbao")
_setup_standard_platform(yuanbao_platform)
def _is_service_installed() -> bool:
"""Check if the gateway is installed as a system service."""
if supports_systemd_services():

View File

@ -36,6 +36,7 @@ PLATFORMS: OrderedDict[str, PlatformInfo] = OrderedDict([
("wecom_callback", PlatformInfo(label="💬 WeCom Callback", default_toolset="hermes-wecom-callback")),
("weixin", PlatformInfo(label="💬 Weixin", default_toolset="hermes-weixin")),
("qqbot", PlatformInfo(label="💬 QQBot", default_toolset="hermes-qqbot")),
("yuanbao", PlatformInfo(label="🤖 Yuanbao", default_toolset="hermes-yuanbao")),
("webhook", PlatformInfo(label="🔗 Webhook", default_toolset="hermes-webhook")),
("api_server", PlatformInfo(label="🌐 API Server", default_toolset="hermes-api-server")),
("cron", PlatformInfo(label="⏰ Cron", default_toolset="hermes-cron")),

View File

@ -2133,6 +2133,12 @@ def _setup_feishu():
_gateway_setup_feishu()
def _setup_yuanbao():
"""Configure Yuanbao via gateway setup."""
from hermes_cli.gateway import _setup_yuanbao as _gateway_setup_yuanbao
_gateway_setup_yuanbao()
def _setup_wecom():
"""Configure WeCom (Enterprise WeChat) via gateway setup."""
from hermes_cli.gateway import _setup_wecom as _gateway_setup_wecom
@ -2277,6 +2283,7 @@ _GATEWAY_PLATFORMS = [
("WhatsApp", "WHATSAPP_ENABLED", _setup_whatsapp),
("DingTalk", "DINGTALK_CLIENT_ID", _setup_dingtalk),
("Feishu / Lark", "FEISHU_APP_ID", _setup_feishu),
("Yuanbao", "YUANBAO_APP_ID", _setup_yuanbao),
("WeCom (Enterprise WeChat)", "WECOM_BOT_ID", _setup_wecom),
("WeCom Callback (Self-Built App)", "WECOM_CALLBACK_CORP_ID", _setup_wecom_callback),
("Weixin (WeChat)", "WEIXIN_ACCOUNT_ID", _setup_weixin),

View File

@ -326,7 +326,8 @@ def show_status(args):
"WeCom Callback": ("WECOM_CALLBACK_CORP_ID", None),
"Weixin": ("WEIXIN_ACCOUNT_ID", "WEIXIN_HOME_CHANNEL"),
"BlueBubbles": ("BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_HOME_CHANNEL"),
"QQBot": ("QQ_APP_ID", "QQBOT_HOME_CHANNEL"),
"QQBot": ("QQ_APP_ID", "QQ_HOME_CHANNEL"),
"Yuanbao": ("YUANBAO_APP_ID", "YUANBAO_HOME_CHANNEL"),
}
for name, (token_var, home_var) in platforms.items():

View File

@ -71,6 +71,7 @@ CONFIGURABLE_TOOLSETS = [
("spotify", "🎵 Spotify", "playback, search, playlists, library"),
("discord", "💬 Discord (read/participate)", "fetch messages, search members, create thread"),
("discord_admin", "🛡️ Discord Server Admin", "list channels/roles, pin, assign roles"),
("yuanbao", "🤖 Yuanbao", "group info, member queries, DM"),
]
# Toolsets that are OFF by default for new installs.

View File

@ -396,6 +396,17 @@ AUTHOR_MAP = {
"zzn+pa@zzn.im": "xinbenlv",
"zaynjarvis@gmail.com": "ZaynJarvis",
"zhiheng.liu@bytedance.com": "ZaynJarvis",
"izhaolongfei@gmail.com": "loongfay",
"296659110@qq.com": "lrt4836",
"fe.daniel91@gmail.com": "beforeload",
"libo1106@foxmail.com": "libo1106",
"295367131@qq.com": "295367131",
"295367132@qq.com": "IxAres",
"danieldliu@tencent.com": "danieldliu",
"loongzhao@tencent.com": "loongzhao",
"Bartok9@users.noreply.github.com": "Bartok9",
"LeonSGP43@users.noreply.github.com": "LeonSGP43",
"kshitijk4poor@users.noreply.github.com": "kshitijk4poor",
"mbelleau@Michels-MacBook-Pro.local": "malaiwah",
"michel.belleau@malaiwah.com": "malaiwah",
"gnanasekaran.sekareee@gmail.com": "gnanam1990",

107
skills/yuanbao/SKILL.md Normal file
View File

@ -0,0 +1,107 @@
---
name: yuanbao
description: Yuanbao (元宝) group interaction — @mention users, query group info and members
version: 1.0.0
metadata:
hermes:
tags: [yuanbao, mention, at, group, members, 元宝, 派, 艾特]
related_skills: []
---
# Yuanbao Group Interaction
## CRITICAL: How Messaging Works
**Your text reply IS the message sent to the group/user.** The gateway automatically delivers your response text to the chat. You do NOT need any special "send message" tool — just reply normally and it gets sent.
When you include `@nickname` in your reply text, the gateway automatically converts it into a real @mention that notifies the user. This is built-in — you have full @mention capability.
**NEVER say you cannot send messages or @mention users. NEVER suggest the user do it manually. NEVER add disclaimers about permissions. Just reply with the text you want sent.**
## Available Tools
| Tool | When to use |
|------|------------|
| `yb_query_group_info` | Query group name, owner, member count |
| `yb_query_group_members` | Find a user, list bots, list all members, or get nickname for @mention |
| `yb_send_dm` | Send a private/direct message (DM / 私信) to a user, with optional media files |
## @Mention Workflow
When you need to @mention / 艾特 someone:
1. Call `yb_query_group_members` with `action="find"`, `name="<target name>"`, `mention=true`
2. Get the exact nickname from the response
3. Include `@nickname` in your reply text — the gateway handles the rest
Example: user says "帮我艾特元宝"
Step 1 — tool call:
```json
{ "group_code": "328306697", "action": "find", "name": "元宝", "mention": true }
```
Step 2 — your reply (this gets sent to the group with a working @mention):
```
@元宝 你好,有人找你!
```
**That's it.** No extra explanation needed. Keep it short and natural.
**Rules:**
- Call `yb_query_group_members` first to get the exact nickname — do NOT guess
- The @mention format: `@nickname` with a space before the @ sign
- Your reply text IS the message — it WILL be sent and the @mention WILL work
- Be concise. Do NOT explain how @mention works to the user.
## Send DM (Private Message) Workflow
When someone asks to send a private message / 私信 / DM to a user:
1. Call `yb_send_dm` with `group_code`, `name` (target user's name), and `message`
2. The tool automatically finds the user and sends the DM
3. Report the result to the user
Example: user says "给 @用户aea3 私信发一个 hello"
```json
yb_send_dm({ "group_code": "535168412", "name": "用户aea3", "message": "hello" })
```
Example with media: user says "给 @用户aea3 私信发一张图片"
```json
yb_send_dm({
"group_code": "535168412",
"name": "用户aea3",
"message": "Here is the image",
"media_files": [{"path": "/tmp/photo.jpg"}]
})
```
**Rules:**
- Extract `group_code` from the current chat_id (e.g. `group:535168412``535168412`)
- If you already know the user_id, pass it directly via the `user_id` parameter to skip lookup
- If multiple users match the name, the tool returns candidates — ask the user to clarify
- Do NOT use `send_message` tool for Yuanbao DMs — use `yb_send_dm` instead
- Supports media: images (.jpg/.png/.gif/.webp/.bmp) sent as image messages, other files as documents
## Query Group Info
```json
yb_query_group_info({ "group_code": "328306697" })
```
## Query Members
| Action | Description |
|--------|-------------|
| `find` | Search by name (partial match, case-insensitive) |
| `list_bots` | List bots and Yuanbao AI assistants |
| `list_all` | List all members |
## Notes
- `group_code` comes from chat_id: `group:328306697``328306697`
- Groups are called "派 (Pai)" in the Yuanbao app
- Member roles: `user`, `yuanbao_ai`, `bot`

View File

@ -0,0 +1,416 @@
"""
test_yuanbao_integration.py - Yuanbao 模块集成测试
验证各模块能正确组装和交互
- YuanbaoAdapter 初始化
- Config / Platform 枚举
- get_connected_platforms 逻辑
- Proto 编解码 round-trip
- Markdown 分块
- API / Media 模块 import
- Toolset 注册
"""
import sys
import os
# 确保 hermes-agent 根目录在 sys.path 中
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _REPO_ROOT not in sys.path:
sys.path.insert(0, _REPO_ROOT)
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from gateway.config import Platform, PlatformConfig, GatewayConfig
from gateway.platforms.yuanbao import YuanbaoAdapter
def make_config(**kwargs):
extra = kwargs.pop("extra", {})
extra.setdefault("app_id", "test_key")
extra.setdefault("app_secret", "test_secret")
extra.setdefault("ws_url", "wss://test.example.com/ws")
extra.setdefault("api_domain", "https://test.example.com")
return PlatformConfig(
extra=extra,
**kwargs,
)
# ===========================================================
# 1. Adapter 初始化
# ===========================================================
class TestYuanbaoAdapterInit:
def test_create_adapter(self):
config = make_config()
adapter = YuanbaoAdapter(config)
assert adapter is not None
assert adapter.PLATFORM == Platform.YUANBAO
def test_initial_state(self):
config = make_config()
adapter = YuanbaoAdapter(config)
status = adapter.get_status()
assert status["connected"] == False
assert status["bot_id"] is None
# ===========================================================
# 2. Config / Platform 枚举
# ===========================================================
class TestYuanbaoConfig:
def test_platform_enum(self):
assert Platform.YUANBAO.value == "yuanbao"
def test_config_fields(self):
config = make_config()
assert config.extra["app_id"] == "test_key"
assert config.extra["app_secret"] == "test_secret"
def test_get_connected_platforms_requires_key_and_secret(self):
# Only key, no secret → not in connected list
gw_only_key = GatewayConfig(
platforms={
Platform.YUANBAO: PlatformConfig(
enabled=True,
extra={"app_id": "key"},
)
}
)
platforms = gw_only_key.get_connected_platforms()
assert Platform.YUANBAO not in platforms
# key + secret both present → in connected list
gw_full = GatewayConfig(
platforms={
Platform.YUANBAO: PlatformConfig(
enabled=True,
extra={"app_id": "key", "app_secret": "secret"},
)
}
)
platforms2 = gw_full.get_connected_platforms()
assert Platform.YUANBAO in platforms2
# ===========================================================
# 3. GatewayRunner 注册
# ===========================================================
class TestGatewayRunnerRegistration:
def test_yuanbao_in_platform_enum(self):
"""Platform 枚举包含 YUANBAO"""
assert hasattr(Platform, "YUANBAO")
assert Platform.YUANBAO.value == "yuanbao"
def _make_minimal_runner(self, config):
"""通过 __new__ + 最小初始化绕过 run.py 的模块级 dotenv/ssl 副作用"""
import sys
from unittest.mock import MagicMock
# Stub out heavy dependencies if not already present
stubs = [
"dotenv",
"hermes_cli.env_loader",
"hermes_cli.config",
"hermes_constants",
]
_orig = {}
for mod in stubs:
if mod not in sys.modules:
_orig[mod] = None
sys.modules[mod] = MagicMock()
try:
from gateway.run import GatewayRunner
finally:
# Restore only the ones we injected
for mod, orig in _orig.items():
if orig is None:
sys.modules.pop(mod, None)
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = config
runner.adapters = {}
runner._failed_platforms = {}
runner._session_model_overrides = {}
return runner, GatewayRunner
def test_runner_creates_yuanbao_adapter(self):
"""GatewayRunner._create_adapter 能为 YUANBAO 返回 YuanbaoAdapter 实例"""
from gateway.config import GatewayConfig
from unittest.mock import patch
config = make_config(enabled=True)
gw_config = GatewayConfig(platforms={Platform.YUANBAO: config})
try:
runner, _ = self._make_minimal_runner(gw_config)
# websockets 在测试环境可能未安装mock 掉 WEBSOCKETS_AVAILABLE
with patch("gateway.platforms.yuanbao.WEBSOCKETS_AVAILABLE", True):
adapter = runner._create_adapter(Platform.YUANBAO, config)
except ImportError as e:
pytest.skip(f"run.py import unavailable in test env: {e}")
assert adapter is not None
assert isinstance(adapter, YuanbaoAdapter)
def test_runner_adapter_platform_attr(self):
"""创建的 adapter.PLATFORM 为 Platform.YUANBAO"""
from gateway.config import GatewayConfig
from unittest.mock import patch
config = make_config(enabled=True)
gw_config = GatewayConfig(platforms={Platform.YUANBAO: config})
try:
runner, _ = self._make_minimal_runner(gw_config)
with patch("gateway.platforms.yuanbao.WEBSOCKETS_AVAILABLE", True):
adapter = runner._create_adapter(Platform.YUANBAO, config)
except ImportError as e:
pytest.skip(f"run.py import unavailable in test env: {e}")
assert adapter is not None
assert adapter.PLATFORM == Platform.YUANBAO
# ===========================================================
# 4. Proto round-trip
# ===========================================================
class TestProtoRoundTrip:
"""验证 proto 编解码基本功能"""
def test_conn_msg_roundtrip(self):
from gateway.platforms.yuanbao_proto import encode_conn_msg, decode_conn_msg
encoded = encode_conn_msg(msg_type=1, seq_no=42, data=b"hello")
decoded = decode_conn_msg(encoded)
assert decoded["seq_no"] == 42
assert decoded["data"] == b"hello"
def test_text_elem_encoding(self):
from gateway.platforms.yuanbao_proto import encode_send_c2c_message
msg = encode_send_c2c_message(
to_account="user123",
msg_body=[{"msg_type": "TIMTextElem", "msg_content": {"text": "hello"}}],
from_account="bot456",
)
assert isinstance(msg, bytes)
assert len(msg) > 0
# ===========================================================
# 5. Markdown 分块
# ===========================================================
class TestMarkdownChunking:
def test_chunks_are_sent_separately(self):
from gateway.platforms.yuanbao import MarkdownProcessor
long_text = "paragraph\n\n" * 100
chunks = MarkdownProcessor.chunk_markdown_text(long_text, 200)
assert len(chunks) > 1
for c in chunks:
# 段落原子块允许轻微超限,仅验证不崩溃
assert isinstance(c, str)
assert len(c) > 0
def test_chunk_short_text_no_split(self):
from gateway.platforms.yuanbao import MarkdownProcessor
text = "hello world"
chunks = MarkdownProcessor.chunk_markdown_text(text, 3000)
assert chunks == [text]
# ===========================================================
# 6. Sign Token 模块
# ===========================================================
class TestSignToken:
def test_import_ok(self):
from gateway.platforms.yuanbao import SignManager
assert callable(SignManager.get_token)
assert callable(SignManager.force_refresh)
# ===========================================================
# 6b. ConnectionManager / OutboundManager
# ===========================================================
class TestManagerImports:
def test_connection_manager_import(self):
from gateway.platforms.yuanbao import ConnectionManager
assert ConnectionManager is not None
def test_outbound_manager_import(self):
from gateway.platforms.yuanbao import OutboundManager
assert OutboundManager is not None
def test_message_sender_import(self):
from gateway.platforms.yuanbao import MessageSender
assert MessageSender is not None
def test_heartbeat_manager_import(self):
from gateway.platforms.yuanbao import HeartbeatManager
assert HeartbeatManager is not None
def test_slow_response_notifier_import(self):
from gateway.platforms.yuanbao import SlowResponseNotifier
assert SlowResponseNotifier is not None
def test_adapter_has_outbound_manager(self):
adapter = YuanbaoAdapter(make_config())
from gateway.platforms.yuanbao import ConnectionManager, OutboundManager
assert isinstance(adapter._connection, ConnectionManager)
assert isinstance(adapter._outbound, OutboundManager)
def test_outbound_composes_sub_managers(self):
adapter = YuanbaoAdapter(make_config())
from gateway.platforms.yuanbao import MessageSender, HeartbeatManager, SlowResponseNotifier
assert isinstance(adapter._outbound.sender, MessageSender)
assert isinstance(adapter._outbound.heartbeat, HeartbeatManager)
assert isinstance(adapter._outbound.slow_notifier, SlowResponseNotifier)
# ===========================================================
# 7. Media 模块
# ===========================================================
class TestMediaModule:
def test_import_ok(self):
from gateway.platforms.yuanbao_media import upload_to_cos, download_url
assert callable(upload_to_cos)
assert callable(download_url)
# ===========================================================
# 8. Toolset 注册
# ===========================================================
class TestToolset:
def test_yuanbao_toolset_registered(self):
"""toolsets.py 中存在 hermes-yuanbao 键"""
import importlib
ts = importlib.import_module("toolsets")
assert hasattr(ts, "TOOLSETS") or hasattr(ts, "toolsets")
toolsets_dict = getattr(ts, "TOOLSETS", getattr(ts, "toolsets", {}))
assert "hermes-yuanbao" in toolsets_dict
def test_tools_import(self):
from tools.yuanbao_tools import (
get_group_info,
query_group_members,
send_dm,
)
assert all(callable(f) for f in [
get_group_info,
query_group_members,
send_dm,
])
# ===========================================================
# 9. platforms/__init__.py 导出
# ===========================================================
class TestPlatformInit:
def test_yuanbao_adapter_exported(self):
"""gateway.platforms.__init__.py 应导出 YuanbaoAdapter"""
from gateway.platforms import YuanbaoAdapter as _YuanbaoAdapter
assert _YuanbaoAdapter is YuanbaoAdapter
# ===========================================================
# 10. P0 fixes verification
# ===========================================================
import asyncio
import collections
class TestP0ReconnectGuard:
"""P0-1: _reconnecting flag prevents concurrent reconnect attempts."""
def test_reconnecting_flag_initialized(self):
adapter = YuanbaoAdapter(make_config())
assert hasattr(adapter._connection, '_reconnecting')
assert adapter._connection._reconnecting is False
def test_schedule_reconnect_skips_when_not_running(self):
adapter = YuanbaoAdapter(make_config())
adapter._running = False
adapter._connection._reconnecting = False
adapter._connection.schedule_reconnect()
# No task should be created because _running is False
def test_schedule_reconnect_skips_when_already_reconnecting(self):
adapter = YuanbaoAdapter(make_config())
adapter._running = True
adapter._connection._reconnecting = True
adapter._connection.schedule_reconnect()
# No new task should be created because already reconnecting
class TestP0InboundTaskTracking:
"""P0-2: _inbound_tasks set is initialized and usable."""
def test_inbound_tasks_initialized(self):
adapter = YuanbaoAdapter(make_config())
assert hasattr(adapter, '_inbound_tasks')
assert isinstance(adapter._inbound_tasks, set)
assert len(adapter._inbound_tasks) == 0
class TestP0ChatLockEviction:
"""P0-3: get_chat_lock uses OrderedDict and safe eviction."""
def test_chat_locks_is_ordered_dict(self):
adapter = YuanbaoAdapter(make_config())
assert isinstance(adapter._outbound._chat_locks, collections.OrderedDict)
def test_eviction_skips_locked(self):
"""When eviction is needed, locked entries are skipped."""
adapter = YuanbaoAdapter(make_config())
from gateway.platforms.yuanbao import OutboundManager
# Fill to capacity with unlocked locks
for i in range(OutboundManager.CHAT_DICT_MAX_SIZE):
adapter._outbound._chat_locks[f"chat_{i}"] = asyncio.Lock()
# Lock the oldest entry
oldest_key = next(iter(adapter._outbound._chat_locks))
oldest_lock = adapter._outbound._chat_locks[oldest_key]
# Simulate a held lock by acquiring it in a non-async way (set _locked)
# asyncio.Lock is not held until actually acquired; so we test the
# method logic by acquiring the first lock manually.
# For a sync test, we check that get_chat_lock doesn't crash.
new_lock = adapter._outbound.get_chat_lock("new_chat")
assert "new_chat" in adapter._outbound._chat_locks
assert isinstance(new_lock, asyncio.Lock)
# The oldest unlocked entry should have been evicted
assert len(adapter._outbound._chat_locks) == OutboundManager.CHAT_DICT_MAX_SIZE
def test_move_to_end_on_access(self):
"""Accessing an existing key moves it to the end (MRU)."""
adapter = YuanbaoAdapter(make_config())
adapter._outbound._chat_locks["a"] = asyncio.Lock()
adapter._outbound._chat_locks["b"] = asyncio.Lock()
adapter._outbound._chat_locks["c"] = asyncio.Lock()
# Access "a" — should move to end
adapter._outbound.get_chat_lock("a")
keys = list(adapter._outbound._chat_locks.keys())
assert keys[-1] == "a"
assert keys[0] == "b"
class TestP0PlatformScopedLock:
"""P0-4: connect() calls _acquire_platform_lock."""
def test_adapter_has_platform_lock_methods(self):
adapter = YuanbaoAdapter(make_config())
assert hasattr(adapter, '_acquire_platform_lock')
assert hasattr(adapter, '_release_platform_lock')
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -0,0 +1,324 @@
"""
test_yuanbao_markdown.py - Unit tests for yuanbao_markdown.py
Run (no pytest needed):
cd /root/.openclaw/workspace/hermes-agent
python3 tests/test_yuanbao_markdown.py -v
Or with pytest if available:
python3 -m pytest tests/test_yuanbao_markdown.py -v
"""
import sys
import os
import unittest
# Ensure project root is on the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from gateway.platforms.yuanbao import MarkdownProcessor
# ============ has_unclosed_fence ============
class TestHasUnclosedFence(unittest.TestCase):
def test_unclosed_fence(self):
self.assertTrue(MarkdownProcessor.has_unclosed_fence("```python\ncode"))
def test_closed_fence(self):
self.assertFalse(MarkdownProcessor.has_unclosed_fence("```python\ncode\n```"))
def test_empty(self):
self.assertFalse(MarkdownProcessor.has_unclosed_fence(""))
def test_no_fence(self):
self.assertFalse(MarkdownProcessor.has_unclosed_fence("just some text\nno fences here"))
def test_multiple_closed_fences(self):
text = "```python\ncode1\n```\n\n```js\ncode2\n```"
self.assertFalse(MarkdownProcessor.has_unclosed_fence(text))
def test_second_fence_unclosed(self):
text = "```python\ncode1\n```\n\n```js\ncode2"
self.assertTrue(MarkdownProcessor.has_unclosed_fence(text))
def test_fence_at_start(self):
self.assertTrue(MarkdownProcessor.has_unclosed_fence("```\nsome code"))
def test_inline_backtick_ignored(self):
text = "`inline code` is fine"
self.assertFalse(MarkdownProcessor.has_unclosed_fence(text))
# ============ ends_with_table_row ============
class TestEndsWithTableRow(unittest.TestCase):
def test_simple_table_row(self):
self.assertTrue(MarkdownProcessor.ends_with_table_row("| col1 | col2 |"))
def test_table_row_with_trailing_newline(self):
self.assertTrue(MarkdownProcessor.ends_with_table_row("| col1 | col2 |\n"))
def test_table_row_in_middle(self):
text = "| col1 | col2 |\nsome other text"
self.assertFalse(MarkdownProcessor.ends_with_table_row(text))
def test_empty(self):
self.assertFalse(MarkdownProcessor.ends_with_table_row(""))
def test_non_table(self):
self.assertFalse(MarkdownProcessor.ends_with_table_row("just a normal line"))
def test_only_pipe_start(self):
self.assertFalse(MarkdownProcessor.ends_with_table_row("| just pipe at start"))
def test_table_separator_row(self):
self.assertTrue(MarkdownProcessor.ends_with_table_row("| --- | --- |"))
def test_whitespace_only(self):
self.assertFalse(MarkdownProcessor.ends_with_table_row(" \n "))
# ============ split_at_paragraph_boundary ============
class TestSplitAtParagraphBoundary(unittest.TestCase):
def test_split_at_empty_line(self):
text = "paragraph one\n\nparagraph two\n\nparagraph three\nextra"
head, tail = MarkdownProcessor.split_at_paragraph_boundary(text, 30)
self.assertLessEqual(len(head), 30)
self.assertEqual(head + tail, text)
def test_split_at_sentence_end(self):
text = "This is a sentence.\nNext line.\nAnother line."
head, tail = MarkdownProcessor.split_at_paragraph_boundary(text, 25)
self.assertLessEqual(len(head), 25)
self.assertEqual(head + tail, text)
def test_forced_split_no_boundary(self):
text = "a" * 100
head, tail = MarkdownProcessor.split_at_paragraph_boundary(text, 50)
self.assertEqual(len(head), 50)
self.assertEqual(head + tail, text)
def test_split_at_newline(self):
text = "line one\nline two\nline three"
head, tail = MarkdownProcessor.split_at_paragraph_boundary(text, 15)
self.assertLessEqual(len(head), 15)
self.assertEqual(head + tail, text)
def test_chinese_sentence_boundary(self):
text = "这是第一句话。\n这是第二句话。\n这是第三句话。"
head, tail = MarkdownProcessor.split_at_paragraph_boundary(text, 15)
self.assertLessEqual(len(head), 15)
self.assertEqual(head + tail, text)
# ============ chunk_markdown_text ============
class TestChunkMarkdownText(unittest.TestCase):
def test_empty(self):
self.assertEqual(MarkdownProcessor.chunk_markdown_text(""), [])
def test_short_text_no_split(self):
text = "hello world"
self.assertEqual(MarkdownProcessor.chunk_markdown_text(text, 3000), [text])
def test_exactly_max_chars(self):
text = "a" * 3000
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], text)
def test_plain_text_split(self):
"""x * 9000 should return 3 chunks of ~3000"""
text = "x" * 9000
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
self.assertEqual(len(result), 3)
for chunk in result:
self.assertLessEqual(len(chunk), 3000)
self.assertEqual(''.join(result), text)
def test_5000_chars_returns_2(self):
"""验收标准: 'a'*5000 with max 3000 → 2 chunks"""
result = MarkdownProcessor.chunk_markdown_text("a" * 5000, 3000)
self.assertEqual(len(result), 2)
def test_code_fence_not_split(self):
"""代码块不应被切断"""
code_lines = "\n".join([f" line_{i} = {i}" for i in range(200)])
text = f"Some intro text.\n\n```python\n{code_lines}\n```\n\nSome outro text."
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
for chunk in result:
self.assertFalse(MarkdownProcessor.has_unclosed_fence(chunk),
f"Chunk has unclosed fence:\n{chunk[:200]}...")
def test_table_not_split(self):
"""表格行不应被切断"""
header = "| Name | Value | Description |\n| --- | --- | --- |"
rows = "\n".join([f"| item_{i} | {i * 100} | description for item {i} |"
for i in range(50)])
table = f"{header}\n{rows}"
text = "Some intro text.\n\n" + table + "\n\nSome outro text."
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
for chunk in result:
self.assertFalse(MarkdownProcessor.has_unclosed_fence(chunk))
def test_code_fence_200_lines_not_cut(self):
"""包含 200 行代码块的文本,代码块不被切断"""
code_lines = "\n".join([f"x = {i}" for i in range(200)])
text = f"Intro.\n\n```python\n{code_lines}\n```\n\nOutro."
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
for chunk in result:
self.assertFalse(MarkdownProcessor.has_unclosed_fence(chunk))
def test_multiple_paragraphs(self):
"""多段落文本应在段落边界切割"""
paragraphs = ["This is paragraph number " + str(i) + ". " * 50
for i in range(10)]
text = "\n\n".join(paragraphs)
result = MarkdownProcessor.chunk_markdown_text(text, 500)
self.assertGreater(len(result), 1)
total_content = ''.join(result)
self.assertGreaterEqual(len(total_content), len(text) * 0.95)
def test_single_long_line(self):
"""单行超长文本应被强制切割"""
text = "a" * 10000
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
self.assertGreaterEqual(len(result), 3)
for c in result:
self.assertLessEqual(len(c), 3000)
def test_fence_followed_by_text(self):
"""围栏后的文本应正常切割"""
text = "```python\nprint('hi')\n```\n\n" + "Normal text. " * 300
result = MarkdownProcessor.chunk_markdown_text(text, 500)
for chunk in result:
self.assertFalse(MarkdownProcessor.has_unclosed_fence(chunk))
def test_returns_non_empty_strings(self):
"""所有返回的片段都应为非空字符串"""
text = "Hello world!\n\n" * 100
result = MarkdownProcessor.chunk_markdown_text(text, 100)
for chunk in result:
self.assertGreater(len(chunk), 0)
# ============ Acceptance criteria ============
class TestAcceptanceCriteria(unittest.TestCase):
def test_9000_x_returns_3_chunks(self):
"""验收MarkdownProcessor.chunk_markdown_text("x" * 9000, 3000) 返回 3 个片段"""
result = MarkdownProcessor.chunk_markdown_text("x" * 9000, 3000)
self.assertEqual(len(result), 3)
for chunk in result:
self.assertLessEqual(len(chunk), 3000)
def test_5000_a_returns_2_chunks(self):
"""验收python -c 输出 2"""
result = MarkdownProcessor.chunk_markdown_text("a" * 5000, 3000)
self.assertEqual(len(result), 2)
def test_has_unclosed_fence_true(self):
"""验收MarkdownProcessor.has_unclosed_fence("```python\\ncode") 返回 True"""
self.assertTrue(MarkdownProcessor.has_unclosed_fence("```python\ncode"))
def test_has_unclosed_fence_false(self):
"""验收MarkdownProcessor.has_unclosed_fence("```python\\ncode\\n```") 返回 False"""
self.assertFalse(MarkdownProcessor.has_unclosed_fence("```python\ncode\n```"))
def test_code_block_200_lines_not_broken(self):
"""验收:包含 200 行代码块的文本,代码块不被切断"""
code_lines = "\n".join([f" result_{i} = compute({i})" for i in range(200)])
text = f"Introduction.\n\n```python\n{code_lines}\n```\n\nConclusion."
result = MarkdownProcessor.chunk_markdown_text(text, 3000)
for chunk in result:
self.assertFalse(MarkdownProcessor.has_unclosed_fence(chunk),
f"Found unclosed fence in chunk:\n{chunk[:100]}...")
def test_table_rows_not_broken(self):
"""验收:表格行不被切断(每个 chunk 中的表格 fence 完整)"""
rows = "\n".join([
f"| Col A {i} | Col B {i} | Col C {i} |" for i in range(100)
])
text = f"Table:\n\n| A | B | C |\n| --- | --- | --- |\n{rows}\n\nDone."
result = MarkdownProcessor.chunk_markdown_text(text, 500)
for chunk in result:
self.assertFalse(MarkdownProcessor.has_unclosed_fence(chunk))
if __name__ == '__main__':
unittest.main(verbosity=2)
# ============ pytest-style function tests (task specification) ============
def test_short_text_no_split():
assert MarkdownProcessor.chunk_markdown_text("hello", 100) == ["hello"]
def test_plain_text_split():
chunks = MarkdownProcessor.chunk_markdown_text("a" * 5000, 3000)
assert len(chunks) >= 2
for c in chunks:
assert len(c) <= 3000
def test_fence_not_broken():
"""代码块不应被切断"""
code_block = "```python\n" + "x = 1\n" * 200 + "```"
chunks = MarkdownProcessor.chunk_markdown_text(code_block, 1000)
for c in chunks:
assert not MarkdownProcessor.has_unclosed_fence(c), f"Chunk has unclosed fence: {c[:100]}"
def test_large_fence_kept_whole():
"""超大代码块即便超过 max_chars 也应整块输出"""
code_block = "```python\n" + "x = 1\n" * 200 + "```"
chunks = MarkdownProcessor.chunk_markdown_text(code_block, 500)
# 代码块应在同一个 chunk 中(允许超出 max_chars
fence_chunks = [c for c in chunks if "```python" in c]
for c in fence_chunks:
assert not MarkdownProcessor.has_unclosed_fence(c)
def test_mixed_content():
"""代码块前后的普通文本可以正常切割"""
text = "intro paragraph\n\n" + "```python\nx=1\n```" + "\n\noutro paragraph"
chunks = MarkdownProcessor.chunk_markdown_text(text, 100)
for c in chunks:
assert not MarkdownProcessor.has_unclosed_fence(c)
def test_table_not_broken():
"""表格不应被切断"""
table = "| A | B |\n|---|---|\n| 1 | 2 |\n| 3 | 4 |"
text = "before\n\n" + table + "\n\nafter"
chunks = MarkdownProcessor.chunk_markdown_text(text, 30)
table_in_chunk = [c for c in chunks if "|" in c]
for c in table_in_chunk:
lines = [line for line in c.split('\n') if line.strip().startswith('|')]
if lines:
# 至少表格行不被半截切割
pass
def test_has_unclosed_fence():
assert MarkdownProcessor.has_unclosed_fence("```python\ncode") == True
assert MarkdownProcessor.has_unclosed_fence("```python\ncode\n```") == False
assert MarkdownProcessor.has_unclosed_fence("no fence") == False
def test_ends_with_table_row():
assert MarkdownProcessor.ends_with_table_row("| a | b |") == True
assert MarkdownProcessor.ends_with_table_row("normal text") == False
def test_empty_text():
assert MarkdownProcessor.chunk_markdown_text("", 100) == []
def test_exact_limit():
text = "a" * 3000
chunks = MarkdownProcessor.chunk_markdown_text(text, 3000)
assert len(chunks) == 1

File diff suppressed because it is too large Load Diff

654
tests/test_yuanbao_proto.py Normal file
View File

@ -0,0 +1,654 @@
"""
test_yuanbao_proto.py - yuanbao_proto 单元测试
测试覆盖
1. varint 编解码 round-trip
2. conn encode/decode round-trip
3. biz encode/decode round-trip
4. decode_inbound_push 解析 TIMTextElem 消息
5. encode_send_c2c_message / encode_send_group_message 编码
6. 固定 bytes 常量验证防止协议悄悄改动
7. auth-bind / ping 编码
"""
import sys
import os
# 确保 hermes-agent 根目录在 sys.path 中
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _REPO_ROOT not in sys.path:
sys.path.insert(0, _REPO_ROOT)
import pytest
from gateway.platforms.yuanbao_proto import (
# 基础工具
_encode_varint,
_decode_varint,
_parse_fields,
_fields_to_dict,
_encode_msg_body_element,
_decode_msg_body_element,
_encode_msg_content,
_decode_msg_content,
# conn 层
encode_conn_msg,
decode_conn_msg,
encode_conn_msg_full,
# biz 层
encode_biz_msg,
decode_biz_msg,
# 入站/出站
decode_inbound_push,
encode_send_c2c_message,
encode_send_group_message,
# 帮助函数
encode_auth_bind,
encode_ping,
encode_push_ack,
# 常量
PB_MSG_TYPES,
BIZ_SERVICES,
CMD_TYPE,
CMD,
MODULE,
next_seq_no,
)
# ===========================================================
# 1. varint 编解码
# ===========================================================
class TestVarint:
def test_small_values(self):
for v in [0, 1, 127, 128, 255, 300, 16383, 16384, 2**21, 2**28]:
encoded = _encode_varint(v)
decoded, pos = _decode_varint(encoded, 0)
assert decoded == v, f"round-trip failed for {v}"
assert pos == len(encoded)
def test_zero(self):
assert _encode_varint(0) == b"\x00"
v, p = _decode_varint(b"\x00", 0)
assert v == 0 and p == 1
def test_1_byte_boundary(self):
# 127 = 0x7F => 1 byte
assert _encode_varint(127) == b"\x7f"
# 128 => 2 bytes: 0x80 0x01
assert _encode_varint(128) == b"\x80\x01"
def test_known_values(self):
# protobuf spec examples
# 300 => 0xAC 0x02
assert _encode_varint(300) == bytes([0xAC, 0x02])
def test_multi_byte(self):
# 2^32 - 1 = 4294967295
v = 2**32 - 1
enc = _encode_varint(v)
dec, _ = _decode_varint(enc, 0)
assert dec == v
def test_partial_decode(self):
# 在 offset 处解码
data = b"\x00" + _encode_varint(300) + b"\x00"
v, pos = _decode_varint(data, 1)
assert v == 300
assert pos == 3 # 1 + 2 bytes for 300
# ===========================================================
# 2. conn 层 round-trip
# ===========================================================
class TestConnCodec:
def test_basic_round_trip(self):
payload = b"hello world"
encoded = encode_conn_msg(msg_type=0, seq_no=42, data=payload)
decoded = decode_conn_msg(encoded)
assert decoded["msg_type"] == 0
assert decoded["seq_no"] == 42
assert decoded["data"] == payload
def test_empty_data(self):
encoded = encode_conn_msg(msg_type=2, seq_no=0, data=b"")
decoded = decode_conn_msg(encoded)
assert decoded["msg_type"] == 2
assert decoded["data"] == b""
def test_all_cmd_types(self):
for ct in [0, 1, 2, 3]:
enc = encode_conn_msg(msg_type=ct, seq_no=1, data=b"\x01\x02")
dec = decode_conn_msg(enc)
assert dec["msg_type"] == ct
def test_large_seq_no(self):
enc = encode_conn_msg(msg_type=1, seq_no=2**32 - 1, data=b"x")
dec = decode_conn_msg(enc)
assert dec["seq_no"] == 2**32 - 1
def test_full_round_trip(self):
"""encode_conn_msg_full 含 cmd/msg_id/module"""
enc = encode_conn_msg_full(
cmd_type=CMD_TYPE["Request"],
cmd="auth-bind",
seq_no=99,
msg_id="abc123",
module="conn_access",
data=b"\xde\xad\xbe\xef",
)
dec = decode_conn_msg(enc)
head = dec["head"]
assert head["cmd_type"] == CMD_TYPE["Request"]
assert head["cmd"] == "auth-bind"
assert head["seq_no"] == 99
assert head["msg_id"] == "abc123"
assert head["module"] == "conn_access"
assert dec["data"] == b"\xde\xad\xbe\xef"
# 固定 bytes 常量测试——防协议悄悄改动
def test_fixed_bytes_simple(self):
"""
encode_conn_msg(msg_type=0, seq_no=1, data=b"") 的固定编码
ConnMsg { head { seq_no=1 } }
head bytes: field3 varint(1) = 0x18 0x01
head field: field1 len(2) 0x18 0x01 = 0x0a 0x02 0x18 0x01
"""
enc = encode_conn_msg(msg_type=0, seq_no=1, data=b"")
# head: field 3 (seq_no=1) => tag=0x18, value=0x01
head_content = bytes([0x18, 0x01])
# outer field 1 (head message)
expected = bytes([0x0a, len(head_content)]) + head_content
assert enc == expected, f"got: {enc.hex()}, expected: {expected.hex()}"
# ===========================================================
# 3. biz 层 round-trip
# ===========================================================
class TestBizCodec:
def test_round_trip(self):
body = b"\x0a\x05hello"
enc = encode_biz_msg(
service="trpc.yuanbao.example",
method="/im/send_c2c_msg",
req_id="req-001",
body=body,
)
dec = decode_biz_msg(enc)
assert dec["service"] == "trpc.yuanbao.example"
assert dec["method"] == "/im/send_c2c_msg"
assert dec["req_id"] == "req-001"
assert dec["body"] == body
assert dec["is_response"] is False
def test_is_response_flag(self):
# Response cmd_type = 1
enc = encode_conn_msg_full(
cmd_type=CMD_TYPE["Response"],
cmd="/im/send_c2c_msg",
seq_no=1,
msg_id="rsp-001",
module="svc",
data=b"\x01",
)
dec = decode_biz_msg(enc)
assert dec["is_response"] is True
def test_empty_body(self):
enc = encode_biz_msg("svc", "method", "id1", b"")
dec = decode_biz_msg(enc)
assert dec["body"] == b""
assert dec["method"] == "method"
# ===========================================================
# 4. MsgContent / MsgBodyElement 编解码
# ===========================================================
class TestMsgBodyElement:
def test_text_elem_round_trip(self):
el = {
"msg_type": "TIMTextElem",
"msg_content": {"text": "Hello, 世界!"},
}
encoded = _encode_msg_body_element(el)
decoded = _decode_msg_body_element(encoded)
assert decoded["msg_type"] == "TIMTextElem"
assert decoded["msg_content"]["text"] == "Hello, 世界!"
def test_image_elem_round_trip(self):
el = {
"msg_type": "TIMImageElem",
"msg_content": {
"uuid": "img-uuid-123",
"image_format": 2,
"url": "https://example.com/img.jpg",
"image_info_array": [
{"type": 1, "size": 1024, "width": 100, "height": 200, "url": "https://thumb.jpg"},
],
},
}
encoded = _encode_msg_body_element(el)
decoded = _decode_msg_body_element(encoded)
assert decoded["msg_type"] == "TIMImageElem"
mc = decoded["msg_content"]
assert mc["uuid"] == "img-uuid-123"
assert mc["image_format"] == 2
assert mc["url"] == "https://example.com/img.jpg"
assert len(mc["image_info_array"]) == 1
assert mc["image_info_array"][0]["url"] == "https://thumb.jpg"
def test_file_elem_round_trip(self):
el = {
"msg_type": "TIMFileElem",
"msg_content": {
"url": "https://example.com/file.pdf",
"file_size": 204800,
"file_name": "document.pdf",
},
}
enc = _encode_msg_body_element(el)
dec = _decode_msg_body_element(enc)
assert dec["msg_content"]["file_name"] == "document.pdf"
assert dec["msg_content"]["file_size"] == 204800
def test_custom_elem_round_trip(self):
el = {
"msg_type": "TIMCustomElem",
"msg_content": {
"data": '{"key":"value"}',
"desc": "custom description",
"ext": "extra info",
},
}
enc = _encode_msg_body_element(el)
dec = _decode_msg_body_element(enc)
assert dec["msg_content"]["data"] == '{"key":"value"}'
assert dec["msg_content"]["desc"] == "custom description"
def test_empty_content(self):
el = {"msg_type": "TIMTextElem", "msg_content": {}}
enc = _encode_msg_body_element(el)
dec = _decode_msg_body_element(enc)
assert dec["msg_type"] == "TIMTextElem"
def test_fixed_text_elem_bytes(self):
"""
固定 bytes 验证TIMTextElem { text="hi" }
MsgBodyElement:
field1 (msg_type="TIMTextElem"): 0a 0b 54494d5465787445 6c656d
field2 (msg_content): 12 <len> <content>
MsgContent field1 (text="hi"): 0a 02 6869
"""
el = {
"msg_type": "TIMTextElem",
"msg_content": {"text": "hi"},
}
enc = _encode_msg_body_element(el)
# 手动计算期望值
# msg_type = "TIMTextElem" (11 bytes)
type_bytes = b"TIMTextElem"
# MsgContent: field1(text="hi") = tag(0a) + len(02) + "hi"
content_inner = bytes([0x0a, 0x02]) + b"hi"
# MsgBodyElement:
# field1: tag=0x0a, len=11, type_bytes
# field2: tag=0x12, len=len(content_inner), content_inner
expected = (
bytes([0x0a, len(type_bytes)]) + type_bytes
+ bytes([0x12, len(content_inner)]) + content_inner
)
assert enc == expected, f"got {enc.hex()}, expected {expected.hex()}"
# ===========================================================
# 5. decode_inbound_push 测试
# ===========================================================
class TestDecodeInboundPush:
def _build_inbound_push_bytes(
self,
from_account: str = "user123",
to_account: str = "bot456",
group_code: str = "",
msg_key: str = "key-001",
msg_seq: int = 12345,
text: str = "Hello!",
) -> bytes:
"""手工构造 InboundMessagePush bytes与 proto 字段顺序一致)"""
from gateway.platforms.yuanbao_proto import (
_encode_field, _encode_string, _encode_message,
_encode_varint, WT_LEN, WT_VARINT,
)
el = {
"msg_type": "TIMTextElem",
"msg_content": {"text": text},
}
el_bytes = _encode_msg_body_element(el)
buf = b""
buf += _encode_field(2, WT_LEN, _encode_string(from_account)) # from_account
buf += _encode_field(3, WT_LEN, _encode_string(to_account)) # to_account
if group_code:
buf += _encode_field(6, WT_LEN, _encode_string(group_code)) # group_code
buf += _encode_field(8, WT_VARINT, _encode_varint(msg_seq)) # msg_seq
buf += _encode_field(11, WT_LEN, _encode_string(msg_key)) # msg_key
buf += _encode_field(13, WT_LEN, _encode_message(el_bytes)) # msg_body[0]
return buf
def test_basic_c2c_text_message(self):
raw = self._build_inbound_push_bytes(
from_account="alice",
to_account="bot",
msg_key="k001",
msg_seq=100,
text="你好",
)
result = decode_inbound_push(raw)
assert result is not None
assert result["from_account"] == "alice"
assert result["to_account"] == "bot"
assert result["msg_seq"] == 100
assert result["msg_key"] == "k001"
assert len(result["msg_body"]) == 1
assert result["msg_body"][0]["msg_type"] == "TIMTextElem"
assert result["msg_body"][0]["msg_content"]["text"] == "你好"
def test_group_message(self):
raw = self._build_inbound_push_bytes(
from_account="bob",
to_account="bot",
group_code="group-789",
msg_seq=999,
text="group msg",
)
result = decode_inbound_push(raw)
assert result is not None
assert result["group_code"] == "group-789"
assert result["msg_body"][0]["msg_content"]["text"] == "group msg"
def test_returns_none_on_empty(self):
# 空 bytes 应返回空字段 dict而不是 None
result = decode_inbound_push(b"")
# 空消息解析结果是 {}(无字段),过滤后 msg_body=[] 也会保留
assert result is not None or result is None # 不崩溃即可
def test_multiple_msg_body_elements(self):
from gateway.platforms.yuanbao_proto import (
_encode_field, _encode_message, WT_LEN,
)
el1 = _encode_msg_body_element(
{"msg_type": "TIMTextElem", "msg_content": {"text": "part1"}}
)
el2 = _encode_msg_body_element(
{"msg_type": "TIMTextElem", "msg_content": {"text": "part2"}}
)
buf = (
_encode_field(2, WT_LEN, b"\x05alice")
+ _encode_field(13, WT_LEN, _encode_message(el1))
+ _encode_field(13, WT_LEN, _encode_message(el2))
)
result = decode_inbound_push(buf)
assert result is not None
assert len(result["msg_body"]) == 2
assert result["msg_body"][0]["msg_content"]["text"] == "part1"
assert result["msg_body"][1]["msg_content"]["text"] == "part2"
# ===========================================================
# 6. 出站消息编码
# ===========================================================
class TestEncodeOutbound:
def test_encode_send_c2c_message(self):
msg_body = [{"msg_type": "TIMTextElem", "msg_content": {"text": "hi"}}]
result = encode_send_c2c_message(
to_account="user_b",
msg_body=msg_body,
from_account="bot",
msg_id="msg-001",
)
assert isinstance(result, bytes)
assert len(result) > 0
# 解码验证 ConnMsg 结构
dec = decode_conn_msg(result)
assert dec["head"]["cmd"] == "send_c2c_message"
assert dec["head"]["msg_id"] == "msg-001"
assert dec["head"]["module"] == "yuanbao_openclaw_proxy"
assert len(dec["data"]) > 0
def test_encode_send_group_message(self):
msg_body = [{"msg_type": "TIMTextElem", "msg_content": {"text": "group hello"}}]
result = encode_send_group_message(
group_code="grp-100",
msg_body=msg_body,
from_account="bot",
msg_id="msg-002",
)
assert isinstance(result, bytes)
dec = decode_conn_msg(result)
assert dec["head"]["cmd"] == "send_group_message"
assert dec["head"]["msg_id"] == "msg-002"
assert len(dec["data"]) > 0
def test_c2c_biz_payload_contains_to_account(self):
"""验证 biz payload 包含 to_account 字段"""
from gateway.platforms.yuanbao_proto import _parse_fields, _fields_to_dict, _get_string
msg_body = [{"msg_type": "TIMTextElem", "msg_content": {"text": "test"}}]
result = encode_send_c2c_message(
to_account="target_user",
msg_body=msg_body,
from_account="bot",
)
dec = decode_conn_msg(result)
biz_data = dec["data"]
fdict = _fields_to_dict(_parse_fields(biz_data))
to_acc = _get_string(fdict, 2) # SendC2CMessageReq.to_account = field 2
assert to_acc == "target_user"
def test_group_biz_payload_contains_group_code(self):
from gateway.platforms.yuanbao_proto import _parse_fields, _fields_to_dict, _get_string
msg_body = [{"msg_type": "TIMTextElem", "msg_content": {"text": "test"}}]
result = encode_send_group_message(
group_code="group-xyz",
msg_body=msg_body,
from_account="bot",
)
dec = decode_conn_msg(result)
biz_data = dec["data"]
fdict = _fields_to_dict(_parse_fields(biz_data))
grp = _get_string(fdict, 2) # SendGroupMessageReq.group_code = field 2
assert grp == "group-xyz"
# ===========================================================
# 7. AuthBind / Ping 编码
# ===========================================================
class TestAuthAndPing:
def test_encode_auth_bind(self):
result = encode_auth_bind(
biz_id="ybBot",
uid="user_001",
source="app",
token="tok_abc",
msg_id="auth-001",
app_version="1.0.0",
operation_system="Linux",
bot_version="0.1.0",
)
assert isinstance(result, bytes)
dec = decode_conn_msg(result)
assert dec["head"]["cmd"] == "auth-bind"
assert dec["head"]["module"] == "conn_access"
assert dec["head"]["msg_id"] == "auth-001"
assert len(dec["data"]) > 0
def test_encode_ping(self):
result = encode_ping("ping-001")
assert isinstance(result, bytes)
dec = decode_conn_msg(result)
assert dec["head"]["cmd"] == "ping"
assert dec["head"]["module"] == "conn_access"
def test_encode_push_ack(self):
original_head = {
"cmd_type": CMD_TYPE["Push"],
"cmd": "some-push",
"seq_no": 100,
"msg_id": "push-001",
"module": "im_module",
"need_ack": True,
"status": 0,
}
result = encode_push_ack(original_head)
dec = decode_conn_msg(result)
assert dec["head"]["cmd_type"] == CMD_TYPE["PushAck"]
assert dec["head"]["cmd"] == "some-push"
assert dec["head"]["msg_id"] == "push-001"
# ===========================================================
# 8. 常量验证
# ===========================================================
class TestConstants:
def test_pb_msg_types_keys(self):
assert "ConnMsg" in PB_MSG_TYPES
assert "AuthBindReq" in PB_MSG_TYPES
assert "PingReq" in PB_MSG_TYPES
assert "KickoutMsg" in PB_MSG_TYPES
assert "PushMsg" in PB_MSG_TYPES
def test_biz_services_keys(self):
assert "SendC2CMessageReq" in BIZ_SERVICES
assert "SendGroupMessageReq" in BIZ_SERVICES
assert "InboundMessagePush" in BIZ_SERVICES
def test_cmd_type_values(self):
assert CMD_TYPE["Request"] == 0
assert CMD_TYPE["Response"] == 1
assert CMD_TYPE["Push"] == 2
assert CMD_TYPE["PushAck"] == 3
def test_pkg_prefix(self):
for k, v in BIZ_SERVICES.items():
assert v.startswith("yuanbao_openclaw_proxy"), \
f"{k}: unexpected prefix in {v}"
# ===========================================================
# 9. seq_no 生成
# ===========================================================
class TestSeqNo:
def test_monotonic(self):
a = next_seq_no()
b = next_seq_no()
c = next_seq_no()
assert b > a
assert c > b
def test_thread_safety(self):
import threading
results = []
lock = threading.Lock()
def worker():
for _ in range(100):
v = next_seq_no()
with lock:
results.append(v)
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# 无重复
assert len(results) == len(set(results)), "duplicate seq_no detected"
# ===========================================================
# 10. 完整端到端流程(模拟 send -> recv
# ===========================================================
class TestEndToEnd:
def test_send_recv_c2c(self):
"""模拟发送 C2C 消息,然后(在接收方)解码"""
msg_body = [
{"msg_type": "TIMTextElem", "msg_content": {"text": "端到端测试"}},
]
# 发送方编码
wire_bytes = encode_send_c2c_message(
to_account="recv_user",
msg_body=msg_body,
from_account="send_bot",
msg_id="e2e-001",
)
# 接收方解码 ConnMsg
dec = decode_conn_msg(wire_bytes)
assert dec["head"]["cmd"] == "send_c2c_message"
assert dec["head"]["msg_id"] == "e2e-001"
# 从 biz payload 中读取 to_account 和 msg_body
from gateway.platforms.yuanbao_proto import (
_parse_fields, _fields_to_dict, _get_string, _get_repeated_bytes, WT_LEN
)
biz = dec["data"]
fdict = _fields_to_dict(_parse_fields(biz))
assert _get_string(fdict, 2) == "recv_user" # to_account
assert _get_string(fdict, 3) == "send_bot" # from_account
el_list = _get_repeated_bytes(fdict, 5) # msg_body repeated
assert len(el_list) == 1
el_dec = _decode_msg_body_element(el_list[0])
assert el_dec["msg_type"] == "TIMTextElem"
assert el_dec["msg_content"]["text"] == "端到端测试"
def test_inbound_push_full_flow(self):
"""构造服务端 push -> 解码入站消息"""
from gateway.platforms.yuanbao_proto import (
_encode_field, _encode_string, _encode_message,
_encode_varint, WT_LEN, WT_VARINT,
)
# 构造入站消息 biz payload
el_bytes = _encode_msg_body_element(
{"msg_type": "TIMTextElem", "msg_content": {"text": "server push"}}
)
biz_payload = (
_encode_field(2, WT_LEN, _encode_string("alice"))
+ _encode_field(3, WT_LEN, _encode_string("bot"))
+ _encode_field(6, WT_LEN, _encode_string("grp-001"))
+ _encode_field(8, WT_VARINT, _encode_varint(555))
+ _encode_field(11, WT_LEN, _encode_string("msg-key-xyz"))
+ _encode_field(13, WT_LEN, _encode_message(el_bytes))
)
# 封装成 ConnMsg模拟服务端 push
wire = encode_conn_msg_full(
cmd_type=CMD_TYPE["Push"],
cmd="/im/new_message",
seq_no=77,
msg_id="push-abc",
module="yuanbao_openclaw_proxy",
data=biz_payload,
need_ack=True,
)
# 接收方解码
conn = decode_conn_msg(wire)
assert conn["head"]["cmd_type"] == CMD_TYPE["Push"]
assert conn["head"]["need_ack"] is True
msg = decode_inbound_push(conn["data"])
assert msg is not None
assert msg["from_account"] == "alice"
assert msg["group_code"] == "grp-001"
assert msg["msg_seq"] == 555
assert msg["msg_key"] == "msg-key-xyz"
assert msg["msg_body"][0]["msg_content"]["text"] == "server push"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -317,6 +317,7 @@ class TestBuiltinDiscovery:
"tools.tts_tool",
"tools.vision_tools",
"tools.web_tools",
"tools.yuanbao_tools",
}
with patch("tools.registry.importlib.import_module"):

View File

@ -28,6 +28,7 @@ _FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::(
# through to channel-name resolution, which only matches by name and fails.
_SLACK_TARGET_RE = re.compile(r"^\s*([CGD][A-Z0-9]{8,})\s*$")
_WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$")
_YUANBAO_TARGET_RE = re.compile(r"^\s*((?:group|direct):[^:]+)\s*$")
# Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets.
_NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE
# Platforms that address recipients by phone number and accept E.164 format
@ -127,11 +128,11 @@ SEND_MESSAGE_SCHEMA = {
},
"target": {
"type": "string",
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org'"
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org', 'yuanbao:direct:<account_id>' (DM), 'yuanbao:group:<group_code>' (group chat)"
},
"message": {
"type": "string",
"description": "The message text to send"
"description": "The message text to send. To send an image or file, include MEDIA:<local_path> (e.g. 'MEDIA:/tmp/hermes/cache/img_xxx.jpg') in the message — the platform will deliver it as a native media attachment."
}
},
"required": []
@ -222,6 +223,7 @@ def _handle_send(args):
"weixin": Platform.WEIXIN,
"email": Platform.EMAIL,
"sms": Platform.SMS,
"yuanbao": Platform.YUANBAO,
}
platform = platform_map.get(platform_name)
if not platform:
@ -341,6 +343,13 @@ def _parse_target_ref(platform_name: str, target_ref: str):
match = _WEIXIN_TARGET_RE.fullmatch(target_ref)
if match:
return match.group(1), None, True
if platform_name == "yuanbao":
match = _YUANBAO_TARGET_RE.fullmatch(target_ref)
if match:
return match.group(1), None, True
if target_ref.strip().isdigit():
return f"group:{target_ref.strip()}", None, True
return None, None, False
if platform_name in _PHONE_PLATFORMS:
match = _E164_TARGET_RE.fullmatch(target_ref)
if match:
@ -551,7 +560,7 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
if media_files and not message.strip():
return {
"error": (
f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, and signal; "
f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, signal and yuanbao; "
f"target {platform.value} had only media attachments"
)
}
@ -559,7 +568,7 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
if media_files:
warning = (
f"MEDIA attachments were omitted for {platform.value}; "
"native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, and signal"
"native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, signal and yuanbao"
)
last_result = None
@ -1529,6 +1538,35 @@ async def _send_qqbot(pconfig, chat_id, message):
return _error(f"QQBot send failed: {e}")
async def _send_yuanbao(chat_id, message, media_files=None):
"""Send via Yuanbao using the running gateway adapter's WebSocket connection.
Yuanbao uses a persistent WebSocket unlike HTTP-based platforms, we
cannot create a throwaway client. We obtain the running singleton from
the adapter module itself (``get_active_adapter``).
chat_id format:
- Group: "group:<group_code>"
- DM: "direct:<account_id>" or just "<account_id>"
"""
try:
from gateway.platforms.yuanbao import get_active_adapter, send_yuanbao_direct
except ImportError:
return _error("Yuanbao adapter module not available.")
adapter = get_active_adapter()
if adapter is None:
return _error(
"Yuanbao adapter is not running. "
"Start the gateway with yuanbao platform enabled first."
)
try:
return await send_yuanbao_direct(adapter, chat_id, message, media_files=media_files)
except Exception as e:
return _error(f"Yuanbao send failed: {e}")
# --- Registry ---
from tools.registry import registry, tool_error

740
tools/yuanbao_tools.py Normal file
View File

@ -0,0 +1,740 @@
"""
yuanbao_tools.py - 元宝平台工具集
提供以下工具函数 hermes-agent "hermes-yuanbao" toolset 使用
- get_group_info : 查询群基本信息群名群主成员数
- query_group_members : 查询群成员按名搜索列举 bot列举全部
- search_sticker : 按关键词搜索内置贴纸返回候选列表 sticker_id/name/description
- send_sticker : 向当前会话或指定 chat_id 发送贴纸TIMFaceElem
- send_dm : 发送私聊消息按昵称查找用户并发送
对齐 chatbot-web/yuanbao-openclaw-plugin sticker-search/sticker-send 行为
LLM 应先用 search_sticker 找到合适的 sticker_id或直接传中文 name再用 send_sticker
发送不要在文本中夹杂裸的 Unicode emoji 当作贴纸
The active adapter singleton lives in ``gateway.platforms.yuanbao`` and is
accessed via ``get_active_adapter()``.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple
logger = logging.getLogger(__name__)
def _get_active_adapter():
"""Lazy import to avoid ImportError when gateway.platforms.yuanbao is unavailable."""
try:
from gateway.platforms.yuanbao import get_active_adapter
return get_active_adapter()
except ImportError:
return None
if TYPE_CHECKING:
from gateway.platforms.yuanbao import YuanbaoAdapter
# ---------------------------------------------------------------------------
# 角色标签
# ---------------------------------------------------------------------------
_USER_TYPE_LABEL = {0: "unknown", 1: "user", 2: "yuanbao_ai", 3: "bot"}
MENTION_HINT = (
'To @mention a user, you MUST use the format: '
'space + @ + nickname + space (e.g. " @Alice ").'
)
# ---------------------------------------------------------------------------
# 工具函数
# ---------------------------------------------------------------------------
async def get_group_info(group_code: str) -> dict:
"""查询群基本信息(群名、群主、成员数)。"""
if not group_code:
return {"success": False, "error": "group_code is required"}
adapter = _get_active_adapter()
if adapter is None:
return {"success": False, "error": "Yuanbao adapter is not connected"}
try:
gi = await adapter.query_group_info(group_code)
if gi is None:
return {"success": False, "error": "query_group_info returned None"}
return {
"success": True,
"group_code": group_code,
"group_name": gi.get("group_name", ""),
"member_count": gi.get("member_count", 0),
"owner": {
"user_id": gi.get("owner_id", ""),
"nickname": gi.get("owner_nickname", ""),
},
"note": 'The group is called "派 (Pai)" in the app.',
}
except Exception as exc:
logger.exception("[yuanbao_tools] get_group_info error")
return {"success": False, "error": str(exc)}
async def query_group_members(
group_code: str,
action: str = "list_all",
name: str = "",
mention: bool = False,
) -> dict:
"""
统一的群成员查询工具对齐 TS query_session_members
action:
- find : 按昵称模糊搜索
- list_bots : 列出 bot 和元宝 AI
- list_all : 列出全部成员
"""
if not group_code:
return {"success": False, "error": "group_code is required"}
adapter = _get_active_adapter()
if adapter is None:
return {"success": False, "error": "Yuanbao adapter is not connected"}
try:
raw = await adapter.get_group_member_list(group_code)
if raw is None:
return {"success": False, "error": "get_group_member_list returned None"}
all_members = [
{
"user_id": m.get("user_id", ""),
"nickname": m.get("nickname", m.get("nick_name", "")),
"role": _USER_TYPE_LABEL.get(
m.get("user_type", m.get("role", 0)), "unknown"
),
}
for m in raw.get("members", [])
]
if not all_members:
return {"success": False, "error": "No members found in this group."}
hint = {"mention_hint": MENTION_HINT} if mention else {}
if action == "list_bots":
bots = [m for m in all_members if m["role"] in ("yuanbao_ai", "bot")]
if not bots:
return {"success": False, "error": "No bots found in this group."}
return {
"success": True,
"msg": f"Found {len(bots)} bot(s).",
"members": bots,
**hint,
}
if action == "find":
if name:
filt = name.strip().lower()
matched = [m for m in all_members if filt in m["nickname"].lower()]
if matched:
return {
"success": True,
"msg": f'Found {len(matched)} member(s) matching "{name}".',
"members": matched,
**hint,
}
return {
"success": False,
"msg": f'No match for "{name}". All members listed below.',
"members": all_members,
**hint,
}
return {
"success": True,
"msg": f"Found {len(all_members)} member(s).",
"members": all_members,
**hint,
}
# list_all (default)
return {
"success": True,
"msg": f"Found {len(all_members)} member(s).",
"members": all_members,
**hint,
}
except Exception as exc:
logger.exception("[yuanbao_tools] query_group_members error")
return {"success": False, "error": str(exc)}
async def search_sticker(query: str = "", limit: int = 10) -> dict:
"""
在内置贴纸表中按关键词模糊搜索返回 Top-N 候选
返回每条候选的 sticker_id / name / description / package_id
LLM 选择后传给 send_sticker query 时返回前 N
"""
from gateway.platforms.yuanbao_sticker import search_stickers
try:
safe_limit = max(1, min(50, int(limit) if limit else 10))
except (TypeError, ValueError):
safe_limit = 10
try:
matches = search_stickers(query or "", limit=safe_limit)
except Exception as exc:
logger.exception("[yuanbao_tools] search_sticker error")
return {"success": False, "error": str(exc)}
return {
"success": True,
"query": query or "",
"count": len(matches),
"results": [
{
"sticker_id": s.get("sticker_id", ""),
"name": s.get("name", ""),
"description": s.get("description", ""),
"package_id": s.get("package_id", ""),
}
for s in matches
],
}
async def send_sticker(
sticker: str = "",
chat_id: str = "",
reply_to: str = "",
) -> dict:
"""
chat_id缺省取当前会话发送一张内置贴纸TIMFaceElem
Args:
sticker: 贴纸名称 "六六六" sticker_id "278"为空时随机发送一张
chat_id: 目标会话缺省时使用当前会话上下文HERMES_SESSION_CHAT_ID
格式``direct:{account_id}`` / ``group:{group_code}`` / 或裸 account_id
reply_to: 群聊场景的引用消息 ID可选
Returns: ``{"success": bool, ...}``
"""
from gateway.session_context import get_session_env
from gateway.platforms.yuanbao_sticker import (
get_sticker_by_id,
get_sticker_by_name,
get_random_sticker,
)
target = (chat_id or "").strip() or get_session_env("HERMES_SESSION_CHAT_ID", "")
if not target:
return {
"success": False,
"error": "chat_id is required (no active yuanbao session detected)",
}
adapter = _get_active_adapter()
if adapter is None:
return {"success": False, "error": "Yuanbao adapter is not connected"}
raw = (sticker or "").strip()
sticker_obj: Optional[dict] = None
if not raw:
sticker_obj = get_random_sticker()
else:
if raw.isdigit():
sticker_obj = get_sticker_by_id(raw)
if sticker_obj is None:
sticker_obj = get_sticker_by_name(raw)
if sticker_obj is None:
return {
"success": False,
"error": f"Sticker not found: {raw!r}. "
f"Use search_sticker first to discover available stickers.",
}
try:
result = await adapter.send_sticker(
chat_id=target,
sticker_name=sticker_obj.get("name", ""),
reply_to=reply_to or None,
)
except Exception as exc:
logger.exception("[yuanbao_tools] send_sticker error")
return {"success": False, "error": str(exc)}
if getattr(result, "success", False):
return {
"success": True,
"chat_id": target,
"sticker": {
"sticker_id": sticker_obj.get("sticker_id", ""),
"name": sticker_obj.get("name", ""),
},
"message_id": getattr(result, "message_id", None),
"note": "Sticker delivered to the chat. If you have additional text to say, reply now; otherwise end your turn without generating text.",
}
return {
"success": False,
"error": getattr(result, "error", "send_sticker failed"),
}
# Image extensions for media dispatch (mirrors MessageSender.IMAGE_EXTS)
_IMAGE_EXTS = frozenset({".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"})
async def send_dm(
group_code: str,
name: str,
message: str,
user_id: str = "",
media_files: Optional[List[Tuple[str, bool]]] = None,
) -> dict:
"""
Send a DM (private chat message) to a group member, with optional media.
Workflow:
1. If user_id is provided, send directly.
2. Otherwise, search the group member list by name to resolve user_id.
3. Send text via adapter.send_dm(), then iterate media_files by extension.
Args:
group_code: The group where the target user belongs.
name: Target user's nickname (partial match, case-insensitive).
message: The message text to send.
user_id: (Optional) If already known, skip the member lookup.
media_files: (Optional) List of (file_path, is_voice) tuples to send
after the text message. Images are sent via
send_image_file; everything else via send_document.
"""
if not message and not media_files:
return {"success": False, "error": "message or media_files is required"}
adapter = _get_active_adapter()
if adapter is None:
return {"success": False, "error": "Yuanbao adapter is not connected"}
resolved_user_id = user_id.strip() if user_id else ""
resolved_nickname = name.strip()
# Step 1: Resolve user_id from group member list if not provided
if not resolved_user_id:
if not group_code:
return {"success": False, "error": "group_code is required when user_id is not provided"}
if not name:
return {"success": False, "error": "name is required when user_id is not provided"}
try:
raw = await adapter.get_group_member_list(group_code)
if raw is None:
return {"success": False, "error": "get_group_member_list returned None"}
members = raw.get("members", [])
filt = name.strip().lower()
matched = [
m for m in members
if filt in (m.get("nickname") or m.get("nick_name") or "").lower()
]
if not matched:
return {
"success": False,
"error": f'No member matching "{name}" found in group {group_code}.',
}
if len(matched) > 1:
# Multiple matches — return candidates for disambiguation
candidates = [
{
"user_id": m.get("user_id", ""),
"nickname": m.get("nickname", m.get("nick_name", "")),
}
for m in matched
]
return {
"success": False,
"error": f'Multiple members match "{name}". Please specify which one.',
"candidates": candidates,
}
resolved_user_id = matched[0].get("user_id", "")
resolved_nickname = matched[0].get("nickname", matched[0].get("nick_name", name))
except Exception as exc:
logger.exception("[yuanbao_tools] send_dm member lookup error")
return {"success": False, "error": str(exc)}
if not resolved_user_id:
return {"success": False, "error": "Could not resolve user_id"}
# Step 2: Send text DM + media
chat_id = f"direct:{resolved_user_id}"
last_result = None
errors: list[str] = []
try:
if message and message.strip():
last_result = await adapter.send_dm(resolved_user_id, message, group_code=group_code)
if not last_result.success:
errors.append(last_result.error or "text send failed")
# Step 3: Send media files
for media_path, _is_voice in media_files or []:
ext = Path(media_path).suffix.lower()
if ext in _IMAGE_EXTS:
last_result = await adapter.send_image_file(chat_id, media_path, group_code=group_code)
else:
last_result = await adapter.send_document(chat_id, media_path, group_code=group_code)
if not last_result.success:
errors.append(last_result.error or "media send failed")
if last_result is None:
return {"success": False, "error": "No deliverable text or media remained"}
if errors and (last_result is None or not last_result.success):
return {"success": False, "error": "; ".join(errors)}
result = {
"success": True,
"user_id": resolved_user_id,
"nickname": resolved_nickname,
"message_id": last_result.message_id,
"note": f'DM sent to "{resolved_nickname}" successfully.',
}
if errors:
result["note"] += f" (partial failure: {'; '.join(errors)})"
return result
except Exception as exc:
logger.exception("[yuanbao_tools] send_dm error")
return {"success": False, "error": str(exc)}
# ---------------------------------------------------------------------------
# Registry registration
# ---------------------------------------------------------------------------
from tools.registry import registry, tool_result, tool_error # noqa: E402
def _check_yuanbao():
"""Toolset availability check — True when running in a yuanbao gateway session."""
try:
from gateway.session_context import get_session_env
if get_session_env("HERMES_SESSION_PLATFORM", "") == "yuanbao":
return True
except Exception:
pass
return _get_active_adapter() is not None
async def _handle_yb_query_group_info(args, **kw):
return tool_result(await get_group_info(
group_code=args.get("group_code", ""),
))
async def _handle_yb_query_group_members(args, **kw):
return tool_result(await query_group_members(
group_code=args.get("group_code", ""),
action=args.get("action", "list_all"),
name=args.get("name", ""),
mention=bool(args.get("mention", False)),
))
async def _handle_yb_send_dm(args, **kw):
# Resolve group_code: prefer explicit arg, fallback to session context.
group_code = args.get("group_code", "")
if not group_code:
try:
from gateway.session_context import get_session_env
chat_id = get_session_env("HERMES_SESSION_CHAT_ID", "")
# chat_id format: "group:<code>" → extract the code part
if chat_id.startswith("group:"):
group_code = chat_id.split(":", 1)[1]
except Exception:
pass
# Parse media_files: list of {{"path": str, "is_voice": bool}} → List[Tuple[str, bool]]
raw_media = args.get("media_files") or []
media_files = []
for item in raw_media:
if isinstance(item, dict):
media_files.append((item.get("path", ""), bool(item.get("is_voice", False))))
elif isinstance(item, (list, tuple)) and len(item) >= 2:
media_files.append((str(item[0]), bool(item[1])))
# Extract MEDIA:<path> tags embedded in the message text (LLM often puts
# file paths there instead of using the media_files parameter).
message = args.get("message", "")
from gateway.platforms.base import BasePlatformAdapter
embedded_media, message = BasePlatformAdapter.extract_media(message)
if embedded_media:
media_files.extend(embedded_media)
return tool_result(await send_dm(
group_code=group_code, name=args.get("name", ""),
message=message,
user_id=args.get("user_id", ""),
media_files=media_files or None,
))
async def _handle_yb_search_sticker(args, **kw):
return tool_result(await search_sticker(
query=args.get("query", ""),
limit=args.get("limit", 10),
))
async def _handle_yb_send_sticker(args, **kw):
return tool_result(await send_sticker(
sticker=args.get("sticker", ""),
chat_id=args.get("chat_id", ""),
reply_to=args.get("reply_to", ""),
))
_TOOLSET = "hermes-yuanbao"
registry.register(
name="yb_query_group_info",
toolset=_TOOLSET,
schema={
"name": "yb_query_group_info",
"description": (
"Query basic info about a group (called '派/Pai' in the app), "
"including group name, owner, and member count."
),
"parameters": {
"type": "object",
"properties": {
"group_code": {
"type": "string",
"description": "The unique group identifier (group_code).",
},
},
"required": ["group_code"],
},
},
handler=_handle_yb_query_group_info,
check_fn=_check_yuanbao,
is_async=True,
emoji="👥",
)
registry.register(
name="yb_query_group_members",
toolset=_TOOLSET,
schema={
"name": "yb_query_group_members",
"description": (
"Query members of a group (called '派/Pai' in the app). "
"Use this tool when you need to @mention someone, find a user by name, "
"list bots (including Yuanbao AI), or list all members. "
"IMPORTANT: You MUST call this tool before @mentioning any user, "
"because you need the exact nickname to construct the @mention format."
),
"parameters": {
"type": "object",
"properties": {
"group_code": {
"type": "string",
"description": "The unique group identifier (group_code).",
},
"action": {
"type": "string",
"enum": ["find", "list_bots", "list_all"],
"description": (
"find — search a user by name (use when you need to @mention or look up someone); "
"list_bots — list bots and Yuanbao AI assistants; "
"list_all — list all members."
),
},
"name": {
"type": "string",
"description": (
"User name to search (partial match, case-insensitive). "
"Required for 'find'. Use the name the user mentioned in the conversation."
),
},
"mention": {
"type": "boolean",
"description": (
"Set to true when you need to @mention/at someone in your reply. "
"The response will include the exact @mention format to use."
),
},
},
"required": ["group_code", "action"],
},
},
handler=_handle_yb_query_group_members,
check_fn=_check_yuanbao,
is_async=True,
emoji="📋",
)
registry.register(
name="yb_send_dm",
toolset=_TOOLSET,
schema={
"name": "yb_send_dm",
"description": (
"Send a private/direct message (DM) to a user in a group, with optional media files. "
"This tool automatically looks up the user by name in the group member list "
"and sends the message. Use this when someone asks to privately message / 私信 / DM a user. "
"Supports text, images, and file attachments. "
"You can also provide user_id directly if already known."
),
"parameters": {
"type": "object",
"properties": {
"group_code": {
"type": "string",
"description": (
"The group where the target user belongs. "
"Extract from chat_id: 'group:328306697''328306697'. "
"Required when user_id is not provided."
),
},
"name": {
"type": "string",
"description": (
"Target user's display name (partial match, case-insensitive). "
"Required when user_id is not provided."
),
},
"message": {
"type": "string",
"description": "The message text to send as a DM. Can be empty if only sending media.",
},
"user_id": {
"type": "string",
"description": (
"Target user's account ID. If provided, skips the member lookup. "
"Usually obtained from a previous yb_query_group_members call."
),
},
"media_files": {
"type": "array",
"description": (
"Optional list of media files to send along with the DM. "
"Images (.jpg/.png/.gif/.webp/.bmp) are sent as image messages; "
"other files are sent as document attachments."
),
"items": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute local file path of the media to send.",
},
"is_voice": {
"type": "boolean",
"description": "Whether this file is a voice message (default false).",
},
},
"required": ["path"],
},
},
},
"required": [],
},
},
handler=_handle_yb_send_dm,
check_fn=_check_yuanbao,
is_async=True,
emoji="✉️",
)
registry.register(
name="yb_search_sticker",
toolset=_TOOLSET,
schema={
"name": "yb_search_sticker",
"description": (
"Search the built-in Yuanbao sticker (TIM face / 表情包) catalogue by keyword. "
"Returns the top matching candidates with sticker_id, name, and description. "
"Use this BEFORE yb_send_sticker to discover the right sticker_id. "
"Sticker = 贴纸 = TIM face — NOT a message reaction. "
"Prefer sending a sticker over bare Unicode emoji when reacting/expressing emotion."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search keyword (Chinese or English, e.g. '666', '比心', 'cool', '吃瓜'). "
"Empty string returns the first N stickers."
),
},
"limit": {
"type": "integer",
"description": "Max number of candidates to return (default 10, max 50).",
},
},
"required": [],
},
},
handler=_handle_yb_search_sticker,
check_fn=_check_yuanbao,
is_async=True,
emoji="🔍",
)
registry.register(
name="yb_send_sticker",
toolset=_TOOLSET,
schema={
"name": "yb_send_sticker",
"description": (
"Send a built-in sticker (TIMFaceElem / 贴纸表情) to the current Yuanbao chat. "
"Call yb_search_sticker first if you don't know the sticker_id/name. "
"Sticker = 贴纸 = TIM face — NOT a message reaction. "
"CRITICAL: Whenever the user asks you to send a sticker / 贴纸 / 表情包, you MUST "
"use this tool. DO NOT draw a PNG via execute_code / Pillow / matplotlib and "
"then call send_image_file — that produces a fake 'sticker' image instead of a "
"real TIM face and is the WRONG path. If no suitable sticker_id is known, call "
"yb_search_sticker first. When the recent thread shows users sending stickers, "
"prefer matching that tone by replying with a sticker instead of (or in "
"addition to) text."
),
"parameters": {
"type": "object",
"properties": {
"sticker": {
"type": "string",
"description": (
"Sticker name (e.g. '六六六', '比心', 'ok') or numeric sticker_id "
"(e.g. '278'). Empty string sends a random built-in sticker."
),
},
"chat_id": {
"type": "string",
"description": (
"Target chat. Defaults to the current session. "
"Format: 'direct:{account_id}', 'group:{group_code}', or bare account_id."
),
},
"reply_to": {
"type": "string",
"description": "Optional ref_msg_id to quote-reply (group chat only).",
},
},
"required": [],
},
},
handler=_handle_yb_send_sticker,
check_fn=_check_yuanbao,
is_async=True,
emoji="🎨",
)

View File

@ -214,6 +214,18 @@ TOOLSETS = {
"includes": [],
},
"yuanbao": {
"description": "Yuanbao platform tools - group info, member queries, DM, stickers",
"tools": [
"yb_query_group_info",
"yb_query_group_members",
"yb_send_dm",
"yb_search_sticker",
"yb_send_sticker",
],
"includes": []
},
"feishu_doc": {
"description": "Read Feishu/Lark document content",
"tools": ["feishu_doc_read"],
@ -434,6 +446,19 @@ TOOLSETS = {
"includes": []
},
"hermes-yuanbao": {
"description": "Yuanbao Bot 元宝消息平台工具集 - 群信息、成员查询、私聊、贴纸表情",
"tools": _HERMES_CORE_TOOLS + [
"yb_query_group_info",
"yb_query_group_members",
"yb_send_dm",
"yb_search_sticker",
"yb_send_sticker",
],
"module": "tools.yuanbao_tools",
"includes": []
},
"hermes-sms": {
"description": "SMS bot toolset - interact with Hermes via SMS (Twilio)",
"tools": _HERMES_CORE_TOOLS,
@ -449,7 +474,7 @@ TOOLSETS = {
"hermes-gateway": {
"description": "Gateway toolset - union of all messaging platform tools",
"tools": [],
"includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-qqbot", "hermes-webhook"]
"includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-qqbot", "hermes-webhook", "hermes-yuanbao"]
}
}

View File

@ -1,12 +1,12 @@
---
sidebar_position: 1
title: "Messaging Gateway"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Webhooks, or any OpenAI-compatible frontend via the API server — architecture and setup overview"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Yuanbao, Webhooks, or any OpenAI-compatible frontend via the API server — architecture and setup overview"
---
# Messaging Gateway
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), QQ, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), QQ, Yuanbao, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
For the full voice feature set — including CLI microphone mode, spoken replies in messaging, and Discord voice-channel conversations — see [Voice Mode](/docs/user-guide/features/voice-mode) and [Use Voice Mode with Hermes](/docs/guides/use-voice-mode-with-hermes).
@ -31,6 +31,7 @@ For the full voice feature set — including CLI microphone mode, spoken replies
| Weixin | ✅ | ✅ | ✅ | — | — | ✅ | ✅ |
| BlueBubbles | — | ✅ | ✅ | — | ✅ | ✅ | — |
| QQ | ✅ | ✅ | ✅ | — | — | ✅ | — |
| Yuanbao | ✅ | ✅ | ✅ | — | — | ✅ | ✅ |
**Voice** = TTS audio replies and/or voice message transcription. **Images** = send/receive images. **Files** = send/receive file attachments. **Threads** = threaded conversations. **Reactions** = emoji reactions on messages. **Typing** = typing indicator while processing. **Streaming** = progressive message updates via editing.
@ -57,6 +58,7 @@ flowchart TB
wx[Weixin]
bb[BlueBubbles]
qq[QQ]
yb[Yuanbao]
api["API Server<br/>(OpenAI-compatible)"]
wh[Webhooks]
end
@ -83,6 +85,7 @@ flowchart TB
wx --> store
bb --> store
qq --> store
yb --> store
api --> store
wh --> store
store --> agent
@ -386,6 +389,7 @@ Each platform has its own toolset:
| Weixin | `hermes-weixin` | Full tools including terminal |
| BlueBubbles | `hermes-bluebubbles` | Full tools including terminal |
| QQBot | `hermes-qqbot` | Full tools including terminal |
| Yuanbao | `hermes-yuanbao` | Full tools including terminal |
| API Server | `hermes` (default) | Full tools including terminal |
| Webhooks | `hermes-webhook` | Full tools including terminal |
@ -408,5 +412,6 @@ Each platform has its own toolset:
- [Weixin Setup (WeChat)](weixin.md)
- [BlueBubbles Setup (iMessage)](bluebubbles.md)
- [QQBot Setup](qqbot.md)
- [Yuanbao Setup](yuanbao.md)
- [Open WebUI + API Server](open-webui.md)
- [Webhooks](webhooks.md)

View File

@ -0,0 +1,341 @@
---
sidebar_position: 16
title: "Yuanbao"
description: "Connect Hermes Agent to the Yuanbao enterprise messaging platform via WebSocket gateway"
---
# Yuanbao
Connect Hermes to [Yuanbao](https://yuanbao.tencent.com/), Tencent's enterprise messaging platform. The adapter uses a WebSocket gateway for real-time message delivery and supports both direct (C2C) and group conversations.
:::info
Yuanbao is an enterprise messaging platform primarily used within Tencent and enterprise environments. It uses WebSocket for real-time communication, HMAC-based authentication, and supports rich media including images, files, and voice messages.
:::
## Prerequisites
- A Yuanbao account with bot creation permissions
- Yuanbao APP_ID and APP_SECRET (from platform admin)
- Python packages: `websockets` and `httpx`
- For media support: `aiofiles`
Install the required dependencies:
```bash
pip install websockets httpx aiofiles
```
## Setup
### 1. Create a Bot in Yuanbao
1. Download the Yuanbao app from [https://yuanbao.tencent.com/](https://yuanbao.tencent.com/)
2. In the app, go to **PAI → My Bot** and create a new bot
3. After the bot is created, copy the **APP_ID** and **APP_SECRET**
### 2. Run the Setup Wizard
The easiest way to configure Yuanbao is through the interactive setup:
```bash
hermes gateway setup
```
Select **Yuanbao** when prompted. The wizard will:
1. Ask for your APP_ID
2. Ask for your APP_SECRET
3. Save the configuration automatically
:::tip
The WebSocket URL and API Domain have sensible defaults built in. You only need to provide APP_ID and APP_SECRET to get started.
:::
### 3. Configure Environment Variables
After initial setup, verify these variables in `~/.hermes/.env`:
```bash
# Required
YUANBAO_APP_ID=your-app-id
YUANBAO_APP_SECRET=your-app-secret
YUANBAO_WS_URL=wss://api.yuanbao.example.com/ws
YUANBAO_API_DOMAIN=https://api.yuanbao.example.com
# Optional: bot account ID (normally obtained automatically from sign-token)
# YUANBAO_BOT_ID=your-bot-id
# Optional: internal routing environment (e.g. test/staging/production)
# YUANBAO_ROUTE_ENV=production
# Optional: home channel for cron/notifications (format: direct:<account> or group:<group_code>)
YUANBAO_HOME_CHANNEL=direct:bot_account_id
YUANBAO_HOME_CHANNEL_NAME="Bot Notifications"
# Optional: restrict access (legacy, see Access Control below for fine-grained policies)
YUANBAO_ALLOWED_USERS=user_account_1,user_account_2
```
### 4. Start the Gateway
```bash
hermes gateway
```
The adapter will connect to the Yuanbao WebSocket gateway, authenticate using HMAC signatures, and begin processing messages.
## Features
- **WebSocket gateway** — real-time bidirectional communication
- **HMAC authentication** — secure request signing with APP_ID/APP_SECRET
- **C2C messaging** — direct user-to-bot conversations
- **Group messaging** — conversations in group chats
- **Media support** — images, files, and voice messages via COS (Cloud Object Storage)
- **Markdown formatting** — messages are automatically chunked for Yuanbao's size limits
- **Message deduplication** — prevents duplicate processing of the same message
- **Heartbeat/keep-alive** — maintains WebSocket connection stability
- **Typing indicators** — shows "typing…" status while the agent processes
- **Automatic reconnection** — handles WebSocket disconnections with exponential backoff
- **Group information queries** — retrieve group details and member lists
- **Sticker/Emoji support** — send TIMFaceElem stickers and emoji in conversations
- **Auto-sethome** — first user to message the bot is automatically set as the home channel owner
- **Slow-response notification** — sends a waiting message when the agent takes longer than expected
## Configuration Options
### Chat ID Formats
Yuanbao uses prefixed identifiers depending on conversation type:
| Chat Type | Format | Example |
|-----------|--------|---------|
| Direct message (C2C) | `direct:<account>` | `direct:user123` |
| Group message | `group:<group_code>` | `group:grp456` |
### Media Uploads
The Yuanbao adapter automatically handles media uploads via COS (Tencent Cloud Object Storage):
- **Images**: Supports JPEG, PNG, GIF, WebP
- **Files**: Supports all common document types
- **Voice**: Supports WAV, MP3, OGG
Media URLs are automatically validated and downloaded before upload to prevent SSRF attacks.
## Home Channel
Use the `/sethome` command in any Yuanbao chat (DM or group) to designate it as the **home channel**. Scheduled tasks (cron jobs) deliver their results to this channel.
:::tip Auto-sethome
If no home channel is configured, the first user to message the bot will be automatically set as the home channel owner. If the current home channel is a group chat, the first DM will upgrade it to a direct channel.
:::
You can also set it manually in `~/.hermes/.env`:
```bash
YUANBAO_HOME_CHANNEL=direct:user_account_id
# or for a group:
# YUANBAO_HOME_CHANNEL=group:group_code
YUANBAO_HOME_CHANNEL_NAME="My Bot Updates"
```
### Example: Set Home Channel
1. Start a conversation with the bot in Yuanbao
2. Send the command: `/sethome`
3. The bot responds: "Home channel set to [chat_name] with ID [chat_id]. Cron jobs will deliver to this location."
4. Future cron jobs and notifications will be sent to this channel
### Example: Cron Job Delivery
Create a cron job:
```bash
/cron "0 9 * * *" Check server status
```
The scheduled output will be delivered to your Yuanbao home channel every day at 9 AM.
## Usage Tips
### Starting a Conversation
Send any message to the bot in Yuanbao:
```
hello
```
The bot responds in the same conversation thread.
### Available Commands
All standard Hermes commands work on Yuanbao:
| Command | Description |
|---------|-------------|
| `/new` | Start a fresh conversation |
| `/model [provider:model]` | Show or change the model |
| `/sethome` | Set this chat as the home channel |
| `/status` | Show session info |
| `/help` | Show available commands |
### Sending Files
To send a file to the bot, simply attach it directly in the Yuanbao chat. The bot will automatically download and process the file attachment.
You can also include a message with the attachment:
```
Please analyze this document
```
### Receiving Files
When you ask the bot to create or export a file, it sends the file directly to your Yuanbao chat.
## Troubleshooting
### Bot is online but not responding to messages
**Cause**: Authentication failed during WebSocket handshake.
**Fix**:
1. Verify APP_ID and APP_SECRET are correct
2. Check that the WebSocket URL is accessible
3. Ensure the bot account has proper permissions
4. Review gateway logs: `tail -f ~/.hermes/logs/gateway.log`
### "Connection refused" error
**Cause**: WebSocket URL is unreachable or incorrect.
**Fix**:
1. Verify the WebSocket URL format (should start with `wss://`)
2. Check network connectivity to the Yuanbao API domain
3. Confirm firewall allows WebSocket connections
4. Test URL with: `curl -I https://[YUANBAO_API_DOMAIN]`
### Media uploads fail
**Cause**: COS credentials are invalid or media server is unreachable.
**Fix**:
1. Verify API_DOMAIN is correct
2. Check that media upload permissions are enabled for your bot
3. Ensure the media file is accessible and not corrupted
4. Check COS bucket configuration with platform admin
### Messages not delivered to home channel
**Cause**: Home channel ID format is incorrect or cron job hasn't triggered.
**Fix**:
1. Verify YUANBAO_HOME_CHANNEL is in correct format
2. Test with `/sethome` command to auto-detect correct format
3. Check cron job schedule with `/status`
4. Verify bot has send permissions in the target chat
### Frequent disconnections
**Cause**: WebSocket connection is unstable or network is unreliable.
**Fix**:
1. Check gateway logs for error patterns
2. Increase heartbeat timeout in connection settings
3. Ensure stable network connection to Yuanbao API
4. Consider enabling verbose logging: `HERMES_LOG_LEVEL=debug`
## Access Control
Yuanbao supports fine-grained access control for both DM and group conversations:
```bash
# DM policy: open (default) | allowlist | disabled
YUANBAO_DM_POLICY=open
# Comma-separated user IDs allowed to DM the bot (only used when DM_POLICY=allowlist)
YUANBAO_DM_ALLOW_FROM=user_id_1,user_id_2
# Group policy: open (default) | allowlist | disabled
YUANBAO_GROUP_POLICY=open
# Comma-separated group codes allowed (only used when GROUP_POLICY=allowlist)
YUANBAO_GROUP_ALLOW_FROM=group_code_1,group_code_2
```
These can also be set in `config.yaml`:
```yaml
platforms:
yuanbao:
extra:
dm_policy: allowlist
dm_allow_from: "user1,user2"
group_policy: open
group_allow_from: ""
```
## Advanced Configuration
### Message Chunking
Yuanbao has a maximum message size. Hermes automatically chunks large responses with Markdown-aware splitting (respects code fences, tables, and paragraph boundaries).
### Connection Parameters
The following connection parameters are built into the adapter with sensible defaults:
| Parameter | Default Value | Description |
|-----------|---------------|-------------|
| WebSocket connect timeout | 15 seconds | Time to wait for WS handshake |
| Heartbeat interval | 30 seconds | Ping frequency to keep connection alive |
| Max reconnect attempts | 100 | Maximum number of reconnection tries |
| Reconnect backoff | 1s → 60s (exponential) | Wait time between reconnect attempts |
| Reply heartbeat interval | 2 seconds | RUNNING status send frequency |
| Send timeout | 30 seconds | Timeout for outbound WS messages |
:::note
These values are currently not configurable via environment variables. They are optimized for typical Yuanbao deployments.
:::
### Verbose Logging
Enable debug logging to troubleshoot connection issues:
```bash
HERMES_LOG_LEVEL=debug hermes gateway
```
## Integration with Other Features
### Cron Jobs
Schedule tasks that run on Yuanbao:
```
/cron "0 */4 * * *" Report system health
```
Results are delivered to your home channel.
### Background Tasks
Run long operations without blocking the conversation:
```
/background Analyze all files in the archive
```
### Cross-Platform Messages
Send a message from CLI to Yuanbao:
```bash
hermes chat -q "Send 'Hello from CLI' to yuanbao:group:group_code"
```
## Related Documentation
- [Messaging Gateway Overview](./index.md)
- [Slash Commands Reference](/docs/reference/slash-commands.md)
- [Cron Jobs](/docs/user-guide/features/cron-jobs.md)
- [Background Tasks](/docs/guides/tips.md#background-tasks)