fix(a2a): delegate_task returns str(result) for empty-parts responses #283

Closed
infra-runtime-be wants to merge 2 commits from runtime/fix-delegate-empty-parts-regression into main
7 changed files with 32 additions and 11 deletions

View File

@ -28,7 +28,7 @@ WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
async def discover(target_id: str) -> dict | None:

View File

@ -29,7 +29,7 @@ WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}

View File

@ -66,8 +66,19 @@ async def delegate_task(workspace_id: str, task: str) -> str:
)
data = a2a_resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
return parts[0].get("text", "(no text)") if parts else str(data["result"])
result = data["result"]
# String result: return it directly (handles empty-parts case
# where the platform returns {"result": "my response"}).
if isinstance(result, str):
return result
# Dict result: extract text from parts[0], fall back to str(result).
if isinstance(result, dict):
parts = result.get("parts", [])
if parts and isinstance(parts[0], dict):
return parts[0].get("text", "(no text)")
return str(result)
# Any other type: fall back to string representation.
return str(result)
elif "error" in data:
return f"Error: {data['error'].get('message', str(data['error']))}"
return str(data)

View File

@ -54,6 +54,16 @@ import httpx
logger = logging.getLogger(__name__)
def _platform_url() -> str:
"""Return the platform URL, defaulting to host.docker.internal when running
inside a Docker container (where localhost refers to the container, not the
host). External callers can always override via the PLATFORM_URL env var.
"""
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# ─────────────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────────────
@ -79,12 +89,12 @@ async def _fetch_latest_checkpoint(workspace_id: str) -> Optional[dict]:
workspace_id: The workspace to query.
Reads:
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
"""
try:
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = _platform_url()
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints/latest"
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=_auth_headers())
@ -125,12 +135,12 @@ async def _save_checkpoint(
payload: Optional JSON-serialisable dict stored as JSONB.
Reads:
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
"""
try:
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = _platform_url()
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints"
body: dict = {
"workflow_id": workflow_id,

View File

@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")

View File

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")

View File

@ -63,7 +63,7 @@ async def main(): # pragma: no cover
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
awareness_config = get_awareness_config()
# 0. Initialise OpenTelemetry (no-op if packages not installed)