forked from molecule-ai/molecule-core
Merge pull request #205 from Molecule-AI/feat/workspace-idle-loop
feat(workspace): add idle-loop reflection pattern (Hermes/Letta shape, opt-in, ~90 LOC)
This commit is contained in:
commit
7f11328e22
@ -198,6 +198,17 @@ class WorkspaceConfig:
|
|||||||
initial_prompt: str = ""
|
initial_prompt: str = ""
|
||||||
"""Auto-sent as the first A2A message after startup. Default empty = no auto-message.
|
"""Auto-sent as the first A2A message after startup. Default empty = no auto-message.
|
||||||
Can be an inline string or a file reference (initial_prompt_file in yaml)."""
|
Can be an inline string or a file reference (initial_prompt_file in yaml)."""
|
||||||
|
idle_prompt: str = ""
|
||||||
|
"""Auto-sent every `idle_interval_seconds` while the workspace has no active
|
||||||
|
task (heartbeat.active_tasks == 0). Default empty = no idle loop. This is
|
||||||
|
the reflection-on-completion / backlog-pull pattern from the Hermes/Letta
|
||||||
|
playbook: the workspace self-wakes when idle, runs a lightweight reflection
|
||||||
|
prompt, and either picks up queued work or stops. Cost scales with useful
|
||||||
|
activity (the prompt returns quickly if there's nothing to do). Can be
|
||||||
|
inline or a file reference via `idle_prompt_file`."""
|
||||||
|
idle_interval_seconds: int = 600
|
||||||
|
"""How often the idle loop checks in (seconds). Default 600 (10 min).
|
||||||
|
Ignored when idle_prompt is empty."""
|
||||||
skills: list[str] = field(default_factory=list)
|
skills: list[str] = field(default_factory=list)
|
||||||
plugins: list[str] = field(default_factory=list) # installed plugin names
|
plugins: list[str] = field(default_factory=list) # installed plugin names
|
||||||
tools: list[str] = field(default_factory=list)
|
tools: list[str] = field(default_factory=list)
|
||||||
@ -251,6 +262,15 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig:
|
|||||||
if prompt_path.exists():
|
if prompt_path.exists():
|
||||||
initial_prompt = prompt_path.read_text().strip()
|
initial_prompt = prompt_path.read_text().strip()
|
||||||
|
|
||||||
|
# Resolve idle_prompt: same pattern as initial_prompt
|
||||||
|
idle_prompt = raw.get("idle_prompt", "")
|
||||||
|
idle_prompt_file = raw.get("idle_prompt_file", "")
|
||||||
|
if not idle_prompt and idle_prompt_file:
|
||||||
|
idle_path = Path(config_path) / idle_prompt_file
|
||||||
|
if idle_path.exists():
|
||||||
|
idle_prompt = idle_path.read_text().strip()
|
||||||
|
idle_interval_seconds = int(raw.get("idle_interval_seconds", 600))
|
||||||
|
|
||||||
return WorkspaceConfig(
|
return WorkspaceConfig(
|
||||||
name=raw.get("name", "Workspace"),
|
name=raw.get("name", "Workspace"),
|
||||||
description=raw.get("description", ""),
|
description=raw.get("description", ""),
|
||||||
@ -259,6 +279,8 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig:
|
|||||||
model=model,
|
model=model,
|
||||||
runtime=runtime,
|
runtime=runtime,
|
||||||
initial_prompt=initial_prompt,
|
initial_prompt=initial_prompt,
|
||||||
|
idle_prompt=idle_prompt,
|
||||||
|
idle_interval_seconds=idle_interval_seconds,
|
||||||
runtime_config=RuntimeConfig(
|
runtime_config=RuntimeConfig(
|
||||||
command=runtime_raw.get("command", ""),
|
command=runtime_raw.get("command", ""),
|
||||||
args=runtime_raw.get("args", []),
|
args=runtime_raw.get("args", []),
|
||||||
|
|||||||
@ -376,12 +376,80 @@ async def main(): # pragma: no cover
|
|||||||
|
|
||||||
initial_prompt_task = asyncio.create_task(_send_initial_prompt())
|
initial_prompt_task = asyncio.create_task(_send_initial_prompt())
|
||||||
|
|
||||||
|
# 10c. Idle loop — reflection-on-completion / backlog-pull pattern.
|
||||||
|
# Fires config.idle_prompt every config.idle_interval_seconds while the
|
||||||
|
# workspace has no active task. This turns every role from "waits for cron"
|
||||||
|
# into "self-wakes when idle" — the Hermes/Letta shape from today's
|
||||||
|
# multi-framework survey (see docs/ecosystem-watch.md). Cost collapses to
|
||||||
|
# event-driven in practice: the idle check is local (no LLM call, just
|
||||||
|
# heartbeat.active_tasks==0), and the prompt only fires when there's
|
||||||
|
# actually nothing to do. Gated on idle_prompt being non-empty so existing
|
||||||
|
# workspaces upgrade opt-in — set idle_prompt in org.yaml defaults or
|
||||||
|
# per-workspace to enable.
|
||||||
|
idle_loop_task = None
|
||||||
|
if config.idle_prompt:
|
||||||
|
async def _run_idle_loop():
|
||||||
|
"""Self-sends config.idle_prompt periodically when the workspace is idle."""
|
||||||
|
# Wait for server + initial prompt to settle before the first idle check.
|
||||||
|
# Short wait (min of 60s or interval) so cold-start races don't fire instantly.
|
||||||
|
await asyncio.sleep(min(config.idle_interval_seconds, 60))
|
||||||
|
|
||||||
|
import json as _json
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(config.idle_interval_seconds)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Local idle check — no platform API call, no LLM call.
|
||||||
|
# heartbeat.active_tasks == 0 means no in-flight work.
|
||||||
|
if heartbeat.active_tasks > 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Self-post the idle prompt via the platform A2A proxy (same
|
||||||
|
# path as initial_prompt). The agent's own concurrency control
|
||||||
|
# rejects if the workspace becomes busy between this check and
|
||||||
|
# the post — that's the expected safety valve.
|
||||||
|
payload = _json.dumps({
|
||||||
|
"method": "message/send",
|
||||||
|
"params": {
|
||||||
|
"message": {
|
||||||
|
"role": "user",
|
||||||
|
"messageId": f"idle-{_uuid.uuid4().hex[:8]}",
|
||||||
|
"parts": [{"kind": "text", "text": config.idle_prompt}],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}).encode()
|
||||||
|
|
||||||
|
def _post_sync():
|
||||||
|
try:
|
||||||
|
req = urllib.request.Request(
|
||||||
|
f"{platform_url}/workspaces/{workspace_id}/a2a",
|
||||||
|
data=payload,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(req, timeout=600) as resp:
|
||||||
|
resp.read()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Idle loop: post failed — {e}", flush=True)
|
||||||
|
|
||||||
|
print(f"Idle loop: firing (active_tasks=0, interval={config.idle_interval_seconds}s)", flush=True)
|
||||||
|
loop_ref = asyncio.get_event_loop()
|
||||||
|
loop_ref.run_in_executor(None, _post_sync)
|
||||||
|
|
||||||
|
idle_loop_task = asyncio.create_task(_run_idle_loop())
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await server.serve()
|
await server.serve()
|
||||||
finally:
|
finally:
|
||||||
# Cancel initial prompt if still running
|
# Cancel initial prompt if still running
|
||||||
if initial_prompt_task and not initial_prompt_task.done():
|
if initial_prompt_task and not initial_prompt_task.done():
|
||||||
initial_prompt_task.cancel()
|
initial_prompt_task.cancel()
|
||||||
|
# Cancel idle loop if running
|
||||||
|
if idle_loop_task and not idle_loop_task.done():
|
||||||
|
idle_loop_task.cancel()
|
||||||
# Gracefully stop the Temporal worker background task on shutdown
|
# Gracefully stop the Temporal worker background task on shutdown
|
||||||
await temporal_wrapper.stop()
|
await temporal_wrapper.stop()
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user