feat(computer-use): background focus-safe backend — set_value, structured windows, MIME detection
Extends the cua-driver computer-use backend to drive backgrounded macOS windows without stealing keyboard or mouse focus from the foreground app. All changes target the cua-driver MCP backend and the shared dispatcher. ## cua_backend.py **Window-aware capture**: capture() now calls list_windows + get_window_state instead of the removed capture tool. Prefers structuredContent.windows (MCP 2024-11-05+ cua-driver) for zero-parse window enumeration; falls back to regex-parsed text for older builds. Stores the selected (pid, window_id) as sticky context so subsequent action calls do not need a redundant round-trip. **Action routing**: click/scroll/type_text/key all carry the sticky pid (and window_id for element-indexed clicks). type_text routes through type_text_chars (individual key events) rather than AX attribute write -- WebKit AXTextFields reject attribute writes from backgrounded processes. **Key parsing**: _parse_key_combo splits cmd+s-style strings into (key, [modifiers]) and routes to hotkey (modifier present) or press_key (bare key) -- cua-driver actual tool names. **set_value method**: new set_value(value, element) calls the cua-driver set_value MCP tool. For AXPopUpButton / HTML select in a backgrounded Safari, AXPress opens the native macOS popup which closes immediately when the app is non-frontmost; set_value AX-presses the matching child option directly (no menu required, no focus steal). **focus_app**: reimplemented as a pure window-selector (enumerates list_windows, sets sticky pid/window_id) without ever raising the window or stealing focus. **list_apps**: fixed tool name from listApps to list_apps; handles plain-text response via regex when structured data is absent. **Structured-content extraction**: _extract_tool_result now surfaces structuredContent from MCP results, enabling the list_windows window array without text parsing. **Helpers**: _parse_windows_from_text, _parse_elements_from_tree, _split_tree_text, _parse_key_combo extracted as module-level functions. ## schema.py Added set_value to the action enum with a description explaining when to prefer it over click (select/popup elements, sliders, no focus steal). Added value field for set_value payloads. ## tool.py Routed set_value action through _dispatch to backend.set_value. Added set_value to _DESTRUCTIVE_ACTIONS (approval-gated). Fixed MIME-type detection in _capture_response: cua-driver may return JPEG; detect from base64 magic bytes (/9j/ -> image/jpeg, else image/png) rather than hardcoding image/png. ## agent/display.py + run_agent.py Guard _detect_tool_failure and result-preview logic against non-string function_result values: multimodal tool results (dicts with _multimodal=True) are not string-sliceable; treat them as successes and fall back to str() for length/preview.
This commit is contained in:
parent
dad10a78d0
commit
413ee1a286
@ -827,6 +827,10 @@ def _detect_tool_failure(tool_name: str, result: str | None) -> tuple[bool, str]
|
|||||||
return True, " [full]"
|
return True, " [full]"
|
||||||
|
|
||||||
# Generic heuristic for non-terminal tools
|
# Generic heuristic for non-terminal tools
|
||||||
|
# Multimodal tool results (dicts with _multimodal=True) are not strings —
|
||||||
|
# treat them as successes since failures would be JSON-encoded strings.
|
||||||
|
if not isinstance(result, str):
|
||||||
|
return False, ""
|
||||||
lower = result[:500].lower()
|
lower = result[:500].lower()
|
||||||
if '"error"' in lower or '"failed"' in lower or result.startswith("Error"):
|
if '"error"' in lower or '"failed"' in lower or result.startswith("Error"):
|
||||||
return True, " [error]"
|
return True, " [error]"
|
||||||
|
|||||||
17
run_agent.py
17
run_agent.py
@ -9479,9 +9479,15 @@ class AIAgent:
|
|||||||
logger.error("handle_function_call raised for %s: %s", function_name, tool_error, exc_info=True)
|
logger.error("handle_function_call raised for %s: %s", function_name, tool_error, exc_info=True)
|
||||||
tool_duration = time.time() - tool_start_time
|
tool_duration = time.time() - tool_start_time
|
||||||
|
|
||||||
result_preview = function_result if self.verbose_logging else (
|
if isinstance(function_result, str):
|
||||||
function_result[:200] if len(function_result) > 200 else function_result
|
result_preview = function_result if self.verbose_logging else (
|
||||||
)
|
function_result[:200] if len(function_result) > 200 else function_result
|
||||||
|
)
|
||||||
|
_result_len = len(function_result)
|
||||||
|
else:
|
||||||
|
# Multimodal dict result (_multimodal=True) — not sliceable as string
|
||||||
|
result_preview = function_result
|
||||||
|
_result_len = len(str(function_result))
|
||||||
|
|
||||||
# Log tool errors to the persistent error log so [error] tags
|
# Log tool errors to the persistent error log so [error] tags
|
||||||
# in the UI always have a corresponding detailed entry on disk.
|
# in the UI always have a corresponding detailed entry on disk.
|
||||||
@ -9489,7 +9495,7 @@ class AIAgent:
|
|||||||
if _is_error_result:
|
if _is_error_result:
|
||||||
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
|
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
|
||||||
else:
|
else:
|
||||||
logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, len(function_result))
|
logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, _result_len)
|
||||||
|
|
||||||
if self.tool_progress_callback:
|
if self.tool_progress_callback:
|
||||||
try:
|
try:
|
||||||
@ -9547,7 +9553,8 @@ class AIAgent:
|
|||||||
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s")
|
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s")
|
||||||
print(self._wrap_verbose("Result: ", function_result))
|
print(self._wrap_verbose("Result: ", function_result))
|
||||||
else:
|
else:
|
||||||
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
|
_fr_str = function_result if isinstance(function_result, str) else str(function_result)
|
||||||
|
response_preview = _fr_str[:self.log_prefix_chars] + "..." if len(_fr_str) > self.log_prefix_chars else _fr_str
|
||||||
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}")
|
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}")
|
||||||
|
|
||||||
if self._interrupt_requested and i < len(assistant_message.tool_calls):
|
if self._interrupt_requested and i < len(assistant_message.tool_calls):
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
@ -44,16 +45,25 @@ logger = logging.getLogger(__name__)
|
|||||||
# Version pinning
|
# Version pinning
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
# The SkyLight SPIs cua-driver calls are private. We pin a known-good version
|
|
||||||
# so OS updates don't silently change the surface area our agent depends on.
|
|
||||||
# Users on newer macOS releases may need to bump this and re-run
|
|
||||||
# `hermes tools` to take the updated binary.
|
|
||||||
PINNED_CUA_DRIVER_VERSION = os.environ.get("HERMES_CUA_DRIVER_VERSION", "0.5.0")
|
PINNED_CUA_DRIVER_VERSION = os.environ.get("HERMES_CUA_DRIVER_VERSION", "0.5.0")
|
||||||
|
|
||||||
# Env var override for the cua-driver binary path (mostly for tests / CI).
|
|
||||||
_CUA_DRIVER_CMD = os.environ.get("HERMES_CUA_DRIVER_CMD", "cua-driver")
|
_CUA_DRIVER_CMD = os.environ.get("HERMES_CUA_DRIVER_CMD", "cua-driver")
|
||||||
_CUA_DRIVER_ARGS = ["mcp"] # stdio MCP transport
|
_CUA_DRIVER_ARGS = ["mcp"] # stdio MCP transport
|
||||||
|
|
||||||
|
# Regex to parse list_windows text output lines:
|
||||||
|
# "- AppName (pid 12345) "Title" [window_id: 67890]"
|
||||||
|
_WINDOW_LINE_RE = re.compile(
|
||||||
|
r'^-\s+(.+?)\s+\(pid\s+(\d+)\)\s+.*\[window_id:\s+(\d+)\]',
|
||||||
|
re.MULTILINE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Regex to parse element lines from get_window_state AX tree markdown:
|
||||||
|
# " - [N] AXRole "label""
|
||||||
|
_ELEMENT_LINE_RE = re.compile(
|
||||||
|
r'^\s*-\s+\[(\d+)\]\s+(\w+)(?:\s+"([^"]*)")?',
|
||||||
|
re.MULTILINE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Helpers
|
# Helpers
|
||||||
@ -81,6 +91,61 @@ def cua_driver_install_hint() -> str:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_windows_from_text(text: str) -> List[Dict[str, Any]]:
|
||||||
|
"""Parse window records from list_windows text output."""
|
||||||
|
windows = []
|
||||||
|
for m in _WINDOW_LINE_RE.finditer(text):
|
||||||
|
windows.append({
|
||||||
|
"app_name": m.group(1).strip(),
|
||||||
|
"pid": int(m.group(2)),
|
||||||
|
"window_id": int(m.group(3)),
|
||||||
|
"off_screen": "[off-screen]" in m.group(0),
|
||||||
|
})
|
||||||
|
return windows
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_elements_from_tree(markdown: str) -> List[UIElement]:
|
||||||
|
"""Parse UIElement list from get_window_state AX tree markdown."""
|
||||||
|
elements = []
|
||||||
|
for m in _ELEMENT_LINE_RE.finditer(markdown):
|
||||||
|
elements.append(UIElement(
|
||||||
|
index=int(m.group(1)),
|
||||||
|
role=m.group(2),
|
||||||
|
label=m.group(3) or "",
|
||||||
|
bounds=(0, 0, 0, 0),
|
||||||
|
))
|
||||||
|
return elements
|
||||||
|
|
||||||
|
|
||||||
|
def _split_tree_text(full_text: str) -> Tuple[str, str]:
|
||||||
|
"""Split get_window_state text into (summary_line, tree_markdown)."""
|
||||||
|
lines = full_text.split("\n", 1)
|
||||||
|
summary = lines[0]
|
||||||
|
tree = lines[1] if len(lines) > 1 else ""
|
||||||
|
return summary, tree
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_key_combo(keys: str) -> Tuple[Optional[str], List[str]]:
|
||||||
|
"""Parse a key string like 'cmd+s' into (key, modifiers).
|
||||||
|
|
||||||
|
Returns (key, modifiers) where key is the non-modifier key and modifiers
|
||||||
|
is a list of modifier names (cmd, shift, option, ctrl).
|
||||||
|
"""
|
||||||
|
MODIFIER_NAMES = {"cmd", "command", "shift", "option", "alt", "ctrl", "control", "fn"}
|
||||||
|
KEY_ALIASES = {"command": "cmd", "alt": "option", "control": "ctrl"}
|
||||||
|
|
||||||
|
parts = [p.strip().lower() for p in re.split(r'[+\-]', keys) if p.strip()]
|
||||||
|
modifiers = []
|
||||||
|
key = None
|
||||||
|
for part in parts:
|
||||||
|
normalized = KEY_ALIASES.get(part, part)
|
||||||
|
if normalized in MODIFIER_NAMES:
|
||||||
|
modifiers.append(normalized)
|
||||||
|
else:
|
||||||
|
key = part # last non-modifier wins
|
||||||
|
return key, modifiers
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Asyncio bridge — one long-lived loop on a background thread
|
# Asyncio bridge — one long-lived loop on a background thread
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -139,8 +204,8 @@ class _CuaDriverSession:
|
|||||||
|
|
||||||
def __init__(self, bridge: _AsyncBridge) -> None:
|
def __init__(self, bridge: _AsyncBridge) -> None:
|
||||||
self._bridge = bridge
|
self._bridge = bridge
|
||||||
self._session = None # mcp.ClientSession
|
self._session = None
|
||||||
self._exit_stack = None # AsyncExitStack for stdio_client + ClientSession
|
self._exit_stack = None
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._started = False
|
self._started = False
|
||||||
|
|
||||||
@ -159,7 +224,7 @@ class _CuaDriverSession:
|
|||||||
params = StdioServerParameters(
|
params = StdioServerParameters(
|
||||||
command=_CUA_DRIVER_CMD,
|
command=_CUA_DRIVER_CMD,
|
||||||
args=_CUA_DRIVER_ARGS,
|
args=_CUA_DRIVER_ARGS,
|
||||||
env={**os.environ}, # cua-driver needs HOME / TMPDIR
|
env={**os.environ},
|
||||||
)
|
)
|
||||||
stack = AsyncExitStack()
|
stack = AsyncExitStack()
|
||||||
read, write = await stack.enter_async_context(stdio_client(params))
|
read, write = await stack.enter_async_context(stdio_client(params))
|
||||||
@ -172,7 +237,7 @@ class _CuaDriverSession:
|
|||||||
if self._exit_stack is not None:
|
if self._exit_stack is not None:
|
||||||
try:
|
try:
|
||||||
await self._exit_stack.aclose()
|
await self._exit_stack.aclose()
|
||||||
except Exception as e: # pragma: no cover
|
except Exception as e:
|
||||||
logger.warning("cua-driver shutdown error: %s", e)
|
logger.warning("cua-driver shutdown error: %s", e)
|
||||||
self._exit_stack = None
|
self._exit_stack = None
|
||||||
self._session = None
|
self._session = None
|
||||||
@ -194,10 +259,8 @@ class _CuaDriverSession:
|
|||||||
finally:
|
finally:
|
||||||
self._started = False
|
self._started = False
|
||||||
|
|
||||||
# ── Tool invocation ──────────────────────────────────────────────
|
|
||||||
async def _call_tool_async(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
|
async def _call_tool_async(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
result = await self._session.call_tool(name, args)
|
result = await self._session.call_tool(name, args)
|
||||||
# Normalize: mcp returns content parts. We want a dict.
|
|
||||||
return _extract_tool_result(result)
|
return _extract_tool_result(result)
|
||||||
|
|
||||||
def call_tool(self, name: str, args: Dict[str, Any], timeout: float = 30.0) -> Dict[str, Any]:
|
def call_tool(self, name: str, args: Dict[str, Any], timeout: float = 30.0) -> Dict[str, Any]:
|
||||||
@ -208,12 +271,22 @@ class _CuaDriverSession:
|
|||||||
def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]:
|
def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]:
|
||||||
"""Convert an mcp CallToolResult into a plain dict.
|
"""Convert an mcp CallToolResult into a plain dict.
|
||||||
|
|
||||||
cua-driver returns a mix of json-text parts and image parts. We flatten:
|
cua-driver returns a mix of text parts, image parts, and structuredContent.
|
||||||
{"data": <parsed json from text parts>, "images": [b64, ...], "isError": bool}
|
We flatten into:
|
||||||
|
{
|
||||||
|
"data": <text or parsed json>,
|
||||||
|
"images": [b64, ...],
|
||||||
|
"structuredContent": <dict|None>,
|
||||||
|
"isError": bool,
|
||||||
|
}
|
||||||
|
structuredContent is populated from the MCP result's structuredContent field
|
||||||
|
(MCP spec §2024-11-05+) and takes precedence for structured data like
|
||||||
|
list_windows window arrays.
|
||||||
"""
|
"""
|
||||||
data: Any = None
|
data: Any = None
|
||||||
images: List[str] = []
|
images: List[str] = []
|
||||||
is_error = bool(getattr(mcp_result, "isError", False))
|
is_error = bool(getattr(mcp_result, "isError", False))
|
||||||
|
structured: Optional[Dict] = getattr(mcp_result, "structuredContent", None) or None
|
||||||
text_chunks: List[str] = []
|
text_chunks: List[str] = []
|
||||||
for part in getattr(mcp_result, "content", []) or []:
|
for part in getattr(mcp_result, "content", []) or []:
|
||||||
ptype = getattr(part, "type", None)
|
ptype = getattr(part, "type", None)
|
||||||
@ -229,7 +302,7 @@ def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]:
|
|||||||
data = json.loads(joined) if joined.strip().startswith(("{", "[")) else joined
|
data = json.loads(joined) if joined.strip().startswith(("{", "[")) else joined
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
data = joined
|
data = joined
|
||||||
return {"data": data, "images": images, "isError": is_error}
|
return {"data": data, "images": images, "structuredContent": structured, "isError": is_error}
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -242,6 +315,9 @@ class CuaDriverBackend(ComputerUseBackend):
|
|||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._bridge = _AsyncBridge()
|
self._bridge = _AsyncBridge()
|
||||||
self._session = _CuaDriverSession(self._bridge)
|
self._session = _CuaDriverSession(self._bridge)
|
||||||
|
# Sticky context — updated by capture(), used by action tools.
|
||||||
|
self._active_pid: Optional[int] = None
|
||||||
|
self._active_window_id: Optional[int] = None
|
||||||
|
|
||||||
# ── Lifecycle ──────────────────────────────────────────────────
|
# ── Lifecycle ──────────────────────────────────────────────────
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
@ -260,20 +336,92 @@ class CuaDriverBackend(ComputerUseBackend):
|
|||||||
|
|
||||||
# ── Capture ────────────────────────────────────────────────────
|
# ── Capture ────────────────────────────────────────────────────
|
||||||
def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult:
|
def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult:
|
||||||
args: Dict[str, Any] = {"mode": mode}
|
"""Capture the frontmost on-screen window (optionally filtered by app name).
|
||||||
if app:
|
|
||||||
args["app"] = app
|
|
||||||
out = self._session.call_tool("capture", args)
|
|
||||||
data = out["data"] if isinstance(out["data"], dict) else {}
|
|
||||||
width = int(data.get("width", 0))
|
|
||||||
height = int(data.get("height", 0))
|
|
||||||
elements_raw = data.get("elements", []) or []
|
|
||||||
elements = [_parse_element(e) for e in elements_raw if isinstance(e, dict)]
|
|
||||||
|
|
||||||
|
Maps hermes `capture(mode, app)` → cua-driver `list_windows` +
|
||||||
|
`get_window_state` (ax/som) or `screenshot` (vision).
|
||||||
|
"""
|
||||||
|
# Step 1: enumerate on-screen windows to find target pid/window_id.
|
||||||
|
lw_out = self._session.call_tool("list_windows", {"on_screen_only": True})
|
||||||
|
|
||||||
|
# Prefer structuredContent.windows (MCP 2024-11-05+); fall back to
|
||||||
|
# text-line parsing for older cua-driver builds.
|
||||||
|
sc = lw_out.get("structuredContent") or {}
|
||||||
|
raw_windows = sc.get("windows") if sc else None
|
||||||
|
if raw_windows:
|
||||||
|
windows = [
|
||||||
|
{
|
||||||
|
"app_name": w.get("app_name", ""),
|
||||||
|
"pid": int(w["pid"]),
|
||||||
|
"window_id": int(w["window_id"]),
|
||||||
|
"off_screen": not w.get("is_on_screen", True),
|
||||||
|
"title": w.get("title", ""),
|
||||||
|
"z_index": w.get("z_index", 0),
|
||||||
|
}
|
||||||
|
for w in raw_windows
|
||||||
|
]
|
||||||
|
# Sort by z_index descending (lowest z_index = frontmost on macOS).
|
||||||
|
windows.sort(key=lambda w: w["z_index"])
|
||||||
|
else:
|
||||||
|
raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else ""
|
||||||
|
windows = _parse_windows_from_text(raw_text)
|
||||||
|
|
||||||
|
if not windows:
|
||||||
|
return CaptureResult(mode=mode, width=0, height=0, png_b64=None,
|
||||||
|
elements=[], app="", window_title="", png_bytes_len=0)
|
||||||
|
|
||||||
|
# Filter by app name (case-insensitive substring) if requested.
|
||||||
|
if app:
|
||||||
|
app_lower = app.lower()
|
||||||
|
filtered = [w for w in windows if app_lower in w["app_name"].lower()]
|
||||||
|
if filtered:
|
||||||
|
windows = filtered
|
||||||
|
|
||||||
|
# Pick first on-screen window (sorted by z_index / z-order above).
|
||||||
|
target = next((w for w in windows if not w["off_screen"]), windows[0])
|
||||||
|
self._active_pid = target["pid"]
|
||||||
|
self._active_window_id = target["window_id"]
|
||||||
|
app_name = target["app_name"]
|
||||||
|
|
||||||
|
# Step 2: capture.
|
||||||
png_b64: Optional[str] = None
|
png_b64: Optional[str] = None
|
||||||
|
elements: List[UIElement] = []
|
||||||
|
width = height = 0
|
||||||
|
window_title = ""
|
||||||
|
|
||||||
|
if mode == "vision":
|
||||||
|
# screenshot tool: just the PNG, no AX walk.
|
||||||
|
sc_out = self._session.call_tool(
|
||||||
|
"screenshot",
|
||||||
|
{"window_id": self._active_window_id, "format": "jpeg", "quality": 85},
|
||||||
|
)
|
||||||
|
if sc_out["images"]:
|
||||||
|
png_b64 = sc_out["images"][0]
|
||||||
|
else:
|
||||||
|
# get_window_state: AX tree + optional screenshot.
|
||||||
|
gws_out = self._session.call_tool(
|
||||||
|
"get_window_state",
|
||||||
|
{"pid": self._active_pid, "window_id": self._active_window_id},
|
||||||
|
)
|
||||||
|
text = gws_out["data"] if isinstance(gws_out["data"], str) else ""
|
||||||
|
summary, tree = _split_tree_text(text)
|
||||||
|
|
||||||
|
# Parse element count from summary e.g. "✅ AppName — 42 elements, turn 3..."
|
||||||
|
m = re.search(r'(\d+)\s+elements?', summary)
|
||||||
|
if tree and not gws_out["images"]:
|
||||||
|
# ax mode — no screenshot
|
||||||
|
elements = _parse_elements_from_tree(tree)
|
||||||
|
elif gws_out["images"]:
|
||||||
|
png_b64 = gws_out["images"][0]
|
||||||
|
elements = _parse_elements_from_tree(tree)
|
||||||
|
|
||||||
|
# Extract window title from the AX tree first AXWindow line.
|
||||||
|
wt = re.search(r'AXWindow\s+"([^"]+)"', tree)
|
||||||
|
if wt:
|
||||||
|
window_title = wt.group(1)
|
||||||
|
|
||||||
png_bytes_len = 0
|
png_bytes_len = 0
|
||||||
if out["images"]:
|
if png_b64:
|
||||||
png_b64 = out["images"][0]
|
|
||||||
try:
|
try:
|
||||||
png_bytes_len = len(base64.b64decode(png_b64, validate=False))
|
png_bytes_len = len(base64.b64decode(png_b64, validate=False))
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -285,8 +433,8 @@ class CuaDriverBackend(ComputerUseBackend):
|
|||||||
height=height,
|
height=height,
|
||||||
png_b64=png_b64,
|
png_b64=png_b64,
|
||||||
elements=elements,
|
elements=elements,
|
||||||
app=str(data.get("app", "") or ""),
|
app=app_name,
|
||||||
window_title=str(data.get("window_title", "") or ""),
|
window_title=window_title,
|
||||||
png_bytes_len=png_bytes_len,
|
png_bytes_len=png_bytes_len,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -301,18 +449,36 @@ class CuaDriverBackend(ComputerUseBackend):
|
|||||||
click_count: int = 1,
|
click_count: int = 1,
|
||||||
modifiers: Optional[List[str]] = None,
|
modifiers: Optional[List[str]] = None,
|
||||||
) -> ActionResult:
|
) -> ActionResult:
|
||||||
args: Dict[str, Any] = {"button": button, "clickCount": click_count}
|
pid = self._active_pid
|
||||||
if element is not None:
|
if pid is None:
|
||||||
args["element"] = int(element)
|
|
||||||
elif x is not None and y is not None:
|
|
||||||
args["x"] = int(x)
|
|
||||||
args["y"] = int(y)
|
|
||||||
else:
|
|
||||||
return ActionResult(ok=False, action="click",
|
return ActionResult(ok=False, action="click",
|
||||||
message="click requires element= or x/y")
|
message="No active window — call capture() first.")
|
||||||
|
|
||||||
|
# Choose tool based on button and click_count.
|
||||||
|
if button == "right":
|
||||||
|
tool = "right_click"
|
||||||
|
elif click_count == 2:
|
||||||
|
tool = "double_click"
|
||||||
|
else:
|
||||||
|
tool = "click"
|
||||||
|
|
||||||
|
args: Dict[str, Any] = {"pid": pid}
|
||||||
|
if element is not None:
|
||||||
|
if self._active_window_id is None:
|
||||||
|
return ActionResult(ok=False, action=tool,
|
||||||
|
message="No active window_id for element_index click.")
|
||||||
|
args["element_index"] = element
|
||||||
|
args["window_id"] = self._active_window_id
|
||||||
|
elif x is not None and y is not None:
|
||||||
|
args["x"] = x
|
||||||
|
args["y"] = y
|
||||||
|
else:
|
||||||
|
return ActionResult(ok=False, action=tool,
|
||||||
|
message="click requires element= or x/y.")
|
||||||
if modifiers:
|
if modifiers:
|
||||||
args["modifiers"] = modifiers
|
args["modifier"] = modifiers
|
||||||
return self._action("click", args)
|
|
||||||
|
return self._action(tool, args)
|
||||||
|
|
||||||
def drag(
|
def drag(
|
||||||
self,
|
self,
|
||||||
@ -324,22 +490,9 @@ class CuaDriverBackend(ComputerUseBackend):
|
|||||||
button: str = "left",
|
button: str = "left",
|
||||||
modifiers: Optional[List[str]] = None,
|
modifiers: Optional[List[str]] = None,
|
||||||
) -> ActionResult:
|
) -> ActionResult:
|
||||||
args: Dict[str, Any] = {"button": button}
|
# cua-driver does not expose a drag tool.
|
||||||
if from_element is not None:
|
return ActionResult(ok=False, action="drag",
|
||||||
args["fromElement"] = int(from_element)
|
message="drag is not supported by the cua-driver backend.")
|
||||||
elif from_xy is not None:
|
|
||||||
args["fromX"], args["fromY"] = int(from_xy[0]), int(from_xy[1])
|
|
||||||
else:
|
|
||||||
return ActionResult(ok=False, action="drag", message="drag requires a source")
|
|
||||||
if to_element is not None:
|
|
||||||
args["toElement"] = int(to_element)
|
|
||||||
elif to_xy is not None:
|
|
||||||
args["toX"], args["toY"] = int(to_xy[0]), int(to_xy[1])
|
|
||||||
else:
|
|
||||||
return ActionResult(ok=False, action="drag", message="drag requires a destination")
|
|
||||||
if modifiers:
|
|
||||||
args["modifiers"] = modifiers
|
|
||||||
return self._action("drag", args)
|
|
||||||
|
|
||||||
def scroll(
|
def scroll(
|
||||||
self,
|
self,
|
||||||
@ -351,33 +504,132 @@ class CuaDriverBackend(ComputerUseBackend):
|
|||||||
y: Optional[int] = None,
|
y: Optional[int] = None,
|
||||||
modifiers: Optional[List[str]] = None,
|
modifiers: Optional[List[str]] = None,
|
||||||
) -> ActionResult:
|
) -> ActionResult:
|
||||||
args: Dict[str, Any] = {"direction": direction, "amount": int(amount)}
|
pid = self._active_pid
|
||||||
if element is not None:
|
if pid is None:
|
||||||
args["element"] = int(element)
|
return ActionResult(ok=False, action="scroll",
|
||||||
|
message="No active window — call capture() first.")
|
||||||
|
args: Dict[str, Any] = {
|
||||||
|
"pid": pid,
|
||||||
|
"direction": direction,
|
||||||
|
"amount": max(1, min(50, amount)),
|
||||||
|
}
|
||||||
|
if element is not None and self._active_window_id is not None:
|
||||||
|
args["element_index"] = element
|
||||||
|
args["window_id"] = self._active_window_id
|
||||||
elif x is not None and y is not None:
|
elif x is not None and y is not None:
|
||||||
args["x"] = int(x)
|
args["x"] = x
|
||||||
args["y"] = int(y)
|
args["y"] = y
|
||||||
if modifiers:
|
|
||||||
args["modifiers"] = modifiers
|
|
||||||
return self._action("scroll", args)
|
return self._action("scroll", args)
|
||||||
|
|
||||||
# ── Keyboard ───────────────────────────────────────────────────
|
# ── Keyboard ───────────────────────────────────────────────────
|
||||||
def type_text(self, text: str) -> ActionResult:
|
def type_text(self, text: str) -> ActionResult:
|
||||||
return self._action("type", {"text": text})
|
pid = self._active_pid
|
||||||
|
if pid is None:
|
||||||
|
return ActionResult(ok=False, action="type_text",
|
||||||
|
message="No active window — call capture() first.")
|
||||||
|
# Safari WebKit AXTextField does not accept AX attribute writes (type_text),
|
||||||
|
# so use type_text_chars which synthesises individual key events instead.
|
||||||
|
# This works universally across all macOS apps in background mode.
|
||||||
|
return self._action("type_text_chars", {"pid": pid, "text": text})
|
||||||
|
|
||||||
def key(self, keys: str) -> ActionResult:
|
def key(self, keys: str) -> ActionResult:
|
||||||
return self._action("key", {"keys": keys})
|
pid = self._active_pid
|
||||||
|
if pid is None:
|
||||||
|
return ActionResult(ok=False, action="key",
|
||||||
|
message="No active window — call capture() first.")
|
||||||
|
|
||||||
|
key_name, modifiers = _parse_key_combo(keys)
|
||||||
|
if not key_name:
|
||||||
|
return ActionResult(ok=False, action="key",
|
||||||
|
message=f"Could not parse key from '{keys}'.")
|
||||||
|
|
||||||
|
if modifiers:
|
||||||
|
# hotkey requires at least one modifier + one key.
|
||||||
|
return self._action("hotkey", {"pid": pid, "keys": modifiers + [key_name]})
|
||||||
|
else:
|
||||||
|
return self._action("press_key", {"pid": pid, "key": key_name})
|
||||||
|
|
||||||
|
# ── Value setter ────────────────────────────────────────────────
|
||||||
|
def set_value(self, value: str, element: Optional[int] = None) -> ActionResult:
|
||||||
|
"""Set a value on an element. Handles AXPopUpButton selects natively."""
|
||||||
|
pid = self._active_pid
|
||||||
|
window_id = self._active_window_id
|
||||||
|
if pid is None or window_id is None:
|
||||||
|
return ActionResult(ok=False, action="set_value",
|
||||||
|
message="No active window — call capture() first.")
|
||||||
|
if element is None:
|
||||||
|
return ActionResult(ok=False, action="set_value",
|
||||||
|
message="set_value requires element= (element index).")
|
||||||
|
args: Dict[str, Any] = {
|
||||||
|
"pid": pid,
|
||||||
|
"window_id": window_id,
|
||||||
|
"element_index": element,
|
||||||
|
"value": value,
|
||||||
|
}
|
||||||
|
return self._action("set_value", args)
|
||||||
|
|
||||||
# ── Introspection ──────────────────────────────────────────────
|
# ── Introspection ──────────────────────────────────────────────
|
||||||
def list_apps(self) -> List[Dict[str, Any]]:
|
def list_apps(self) -> List[Dict[str, Any]]:
|
||||||
out = self._session.call_tool("listApps", {})
|
out = self._session.call_tool("list_apps", {})
|
||||||
data = out["data"] if isinstance(out["data"], (list, dict)) else []
|
data = out["data"]
|
||||||
|
if isinstance(data, list):
|
||||||
|
return data
|
||||||
if isinstance(data, dict):
|
if isinstance(data, dict):
|
||||||
data = data.get("apps", [])
|
return data.get("apps", [])
|
||||||
return list(data or [])
|
# list_apps returns plain text — parse app lines.
|
||||||
|
if isinstance(data, str):
|
||||||
|
apps = []
|
||||||
|
for line in data.splitlines():
|
||||||
|
m = re.search(r'(.+?)\s+\(pid\s+(\d+)\)', line)
|
||||||
|
if m:
|
||||||
|
apps.append({"name": m.group(1).strip(), "pid": int(m.group(2))})
|
||||||
|
return apps
|
||||||
|
return []
|
||||||
|
|
||||||
def focus_app(self, app: str, raise_window: bool = False) -> ActionResult:
|
def focus_app(self, app: str, raise_window: bool = False) -> ActionResult:
|
||||||
return self._action("focusApp", {"app": app, "raise": bool(raise_window)})
|
"""Target an app for subsequent actions without stealing system focus.
|
||||||
|
|
||||||
|
cua-driver background-automation never needs to bring a window to the
|
||||||
|
front: capture(app=...) already selects the right window via
|
||||||
|
list_windows. We implement focus_app as a pure window-selector —
|
||||||
|
enumerate on-screen windows, find the best match for *app*, and store
|
||||||
|
its pid/window_id so that subsequent click/type calls hit the right
|
||||||
|
process.
|
||||||
|
|
||||||
|
raise_window=True is intentionally ignored: stealing the user's focus
|
||||||
|
is exactly what this backend is designed to avoid.
|
||||||
|
"""
|
||||||
|
lw_out = self._session.call_tool("list_windows", {"on_screen_only": True})
|
||||||
|
sc = lw_out.get("structuredContent") or {}
|
||||||
|
raw_windows = sc.get("windows") if sc else None
|
||||||
|
if raw_windows:
|
||||||
|
windows = [
|
||||||
|
{
|
||||||
|
"app_name": w.get("app_name", ""),
|
||||||
|
"pid": int(w["pid"]),
|
||||||
|
"window_id": int(w["window_id"]),
|
||||||
|
"z_index": w.get("z_index", 0),
|
||||||
|
}
|
||||||
|
for w in raw_windows
|
||||||
|
]
|
||||||
|
windows.sort(key=lambda w: w["z_index"])
|
||||||
|
else:
|
||||||
|
raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else ""
|
||||||
|
windows = _parse_windows_from_text(raw_text)
|
||||||
|
|
||||||
|
app_lower = app.lower()
|
||||||
|
matched = [w for w in windows if app_lower in w["app_name"].lower()]
|
||||||
|
target = matched[0] if matched else (windows[0] if windows else None)
|
||||||
|
if target:
|
||||||
|
self._active_pid = target["pid"]
|
||||||
|
self._active_window_id = target["window_id"]
|
||||||
|
return ActionResult(
|
||||||
|
ok=True, action="focus_app",
|
||||||
|
message=f"Targeted {target['app_name']} (pid {self._active_pid}, "
|
||||||
|
f"window {self._active_window_id}) without raising window.",
|
||||||
|
)
|
||||||
|
return ActionResult(ok=False, action="focus_app",
|
||||||
|
message=f"No on-screen window found for app '{app}'.")
|
||||||
|
|
||||||
# ── Internal ───────────────────────────────────────────────────
|
# ── Internal ───────────────────────────────────────────────────
|
||||||
def _action(self, name: str, args: Dict[str, Any]) -> ActionResult:
|
def _action(self, name: str, args: Dict[str, Any]) -> ActionResult:
|
||||||
|
|||||||
@ -40,6 +40,7 @@ COMPUTER_USE_SCHEMA: Dict[str, Any] = {
|
|||||||
"scroll",
|
"scroll",
|
||||||
"type",
|
"type",
|
||||||
"key",
|
"key",
|
||||||
|
"set_value",
|
||||||
"wait",
|
"wait",
|
||||||
"list_apps",
|
"list_apps",
|
||||||
"focus_app",
|
"focus_app",
|
||||||
@ -47,7 +48,9 @@ COMPUTER_USE_SCHEMA: Dict[str, Any] = {
|
|||||||
"description": (
|
"description": (
|
||||||
"Which action to perform. `capture` is free (no side "
|
"Which action to perform. `capture` is free (no side "
|
||||||
"effects). All other actions require approval unless "
|
"effects). All other actions require approval unless "
|
||||||
"auto-approved."
|
"auto-approved. Use `set_value` for select/popup elements "
|
||||||
|
"and sliders — it selects the matching option directly "
|
||||||
|
"without opening the native menu (no focus steal)."
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
# ── capture ────────────────────────────────────────────
|
# ── capture ────────────────────────────────────────────
|
||||||
@ -132,6 +135,16 @@ COMPUTER_USE_SCHEMA: Dict[str, Any] = {
|
|||||||
"type": "integer",
|
"type": "integer",
|
||||||
"description": "Scroll wheel ticks. Default 3.",
|
"description": "Scroll wheel ticks. Default 3.",
|
||||||
},
|
},
|
||||||
|
# ── set_value ──────────────────────────────────────────
|
||||||
|
"value": {
|
||||||
|
"type": "string",
|
||||||
|
"description": (
|
||||||
|
"For action='set_value': the value to set on the element. "
|
||||||
|
"For AXPopUpButton / select dropdowns, pass the option's "
|
||||||
|
"display label (e.g. 'Blue'). For sliders and other "
|
||||||
|
"AXValue-settable elements, pass the numeric or string value."
|
||||||
|
),
|
||||||
|
},
|
||||||
# ── type / key / wait ──────────────────────────────────
|
# ── type / key / wait ──────────────────────────────────
|
||||||
"text": {
|
"text": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
|
|||||||
@ -74,7 +74,7 @@ _SAFE_ACTIONS = frozenset({"capture", "wait", "list_apps"})
|
|||||||
# Actions that mutate user-visible state. Go through approval.
|
# Actions that mutate user-visible state. Go through approval.
|
||||||
_DESTRUCTIVE_ACTIONS = frozenset({
|
_DESTRUCTIVE_ACTIONS = frozenset({
|
||||||
"click", "double_click", "right_click", "middle_click",
|
"click", "double_click", "right_click", "middle_click",
|
||||||
"drag", "scroll", "type", "key", "focus_app",
|
"drag", "scroll", "type", "key", "set_value", "focus_app",
|
||||||
})
|
})
|
||||||
|
|
||||||
# Hard-blocked key combinations. Mirrored from #4562 — these are destructive
|
# Hard-blocked key combinations. Mirrored from #4562 — these are destructive
|
||||||
@ -387,6 +387,13 @@ def _dispatch(backend: ComputerUseBackend, action: str, args: Dict[str, Any]) ->
|
|||||||
res = backend.key(args.get("keys", ""))
|
res = backend.key(args.get("keys", ""))
|
||||||
return _maybe_follow_capture(backend, res, capture_after)
|
return _maybe_follow_capture(backend, res, capture_after)
|
||||||
|
|
||||||
|
if action == "set_value":
|
||||||
|
value = args.get("value")
|
||||||
|
if value is None:
|
||||||
|
return json.dumps({"error": "set_value requires `value`"})
|
||||||
|
res = backend.set_value(value=str(value), element=args.get("element"))
|
||||||
|
return _maybe_follow_capture(backend, res, capture_after)
|
||||||
|
|
||||||
return json.dumps({"error": f"unknown action {action!r}"})
|
return json.dumps({"error": f"unknown action {action!r}"})
|
||||||
|
|
||||||
|
|
||||||
@ -416,12 +423,17 @@ def _capture_response(cap: CaptureResult) -> Any:
|
|||||||
summary = "\n".join(summary_lines)
|
summary = "\n".join(summary_lines)
|
||||||
|
|
||||||
if cap.png_b64 and cap.mode != "ax":
|
if cap.png_b64 and cap.mode != "ax":
|
||||||
|
# Detect actual image format from base64 magic bytes so the MIME type
|
||||||
|
# matches what the data contains (cua-driver may return JPEG or PNG).
|
||||||
|
# JPEG: base64 starts with /9j/ PNG: starts with iVBOR
|
||||||
|
_b64_prefix = cap.png_b64[:8]
|
||||||
|
_mime = "image/jpeg" if _b64_prefix.startswith("/9j/") else "image/png"
|
||||||
return {
|
return {
|
||||||
"_multimodal": True,
|
"_multimodal": True,
|
||||||
"content": [
|
"content": [
|
||||||
{"type": "text", "text": summary},
|
{"type": "text", "text": summary},
|
||||||
{"type": "image_url",
|
{"type": "image_url",
|
||||||
"image_url": {"url": f"data:image/png;base64,{cap.png_b64}"}},
|
"image_url": {"url": f"data:{_mime};base64,{cap.png_b64}"}},
|
||||||
],
|
],
|
||||||
"text_summary": summary,
|
"text_summary": summary,
|
||||||
"meta": {"mode": cap.mode, "width": cap.width, "height": cap.height,
|
"meta": {"mode": cap.mode, "width": cap.width, "height": cap.height,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user