diff --git a/workspace/tests/test_shared_runtime_helpers.py b/workspace/tests/test_shared_runtime_helpers.py new file mode 100644 index 00000000..8d5f3c04 --- /dev/null +++ b/workspace/tests/test_shared_runtime_helpers.py @@ -0,0 +1,300 @@ +"""Test coverage for shared_runtime helpers (issue #366). + +Six helper functions previously had zero test coverage: + _extract_part_text, extract_message_text, format_conversation_history, + build_task_text, append_peer_guidance, brief_task +""" +from __future__ import annotations + + +from shared_runtime import ( + _extract_part_text, + append_peer_guidance, + brief_task, + build_task_text, + extract_message_text, + format_conversation_history, +) + + +# ============================================================================= +# _extract_part_text +# ============================================================================= + +class TestExtractPartText: + """Coverage for shared_runtime._extract_part_text().""" + + def test_dict_with_text_field(self): + assert _extract_part_text({"text": "hello"}) == "hello" + + def test_dict_without_text_field(self): + assert _extract_part_text({"type": "image"}) == "" + + def test_dict_with_empty_text_field(self): + assert _extract_part_text({"text": ""}) == "" + + def test_dict_with_root_nesting(self): + """Text buried in part['root']['text'] is extracted.""" + assert _extract_part_text({"root": {"text": "nested"}}) == "nested" + + def test_dict_with_root_non_dict(self): + """part['root'] that is not a dict is safely skipped.""" + assert _extract_part_text({"root": "string", "text": "top"}) == "top" + + def test_object_with_text_attribute(self): + class FakePart: + text = "attr-text" + + assert _extract_part_text(FakePart()) == "attr-text" + + def test_object_with_root_object_with_text(self): + """Object with root.attr.text is extracted (A2A v1 object style).""" + + class FakeRoot: + text = "root-attr-text" + + class FakePart: + root = FakeRoot() + + assert _extract_part_text(FakePart()) == "root-attr-text" + + def test_object_with_empty_text_attribute(self): + class FakePart: + text = "" + + assert _extract_part_text(FakePart()) == "" + + def test_none_input(self): + assert _extract_part_text(None) == "" + + def test_unexpected_type(self): + """Plain int/float/bool falls through to empty string.""" + assert _extract_part_text(42) == "" + + +# ============================================================================= +# extract_message_text +# ============================================================================= + +class TestExtractMessageText: + """Coverage for shared_runtime.extract_message_text().""" + + def test_list_of_dict_parts(self): + parts = [{"text": "hello"}, {"text": "world"}] + assert extract_message_text(parts) == "hello world" + + def test_single_part(self): + assert extract_message_text([{"text": "single"}]) == "single" + + def test_context_object_with_message_parts(self): + """RequestContext-like: .message.parts is the parts list.""" + + class FakeContext: + class _Msg: + parts = [{"text": "from context"}] + + message = _Msg() + + assert extract_message_text(FakeContext()) == "from context" + + def test_context_object_without_message(self): + """No .message attr → falls back to treating input as a parts list.""" + + class FakeContext: + pass # no .message + + # Pass a list directly as the context-like object + assert extract_message_text([{"text": "fallback"}]) == "fallback" + + def test_whitespace_normalized(self): + """Leading/trailing whitespace is stripped; internal newlines are preserved.""" + parts = [{"text": " hello "}, {"text": "\nworld\n"}] + result = extract_message_text(parts) + # Leading/trailing stripped, but internal \n stays (join uses single space) + assert result == "hello \nworld" + assert not result.startswith(" ") + assert not result.endswith(" ") + + def test_empty_parts_list(self): + assert extract_message_text([]) == "" + + +# ============================================================================= +# format_conversation_history +# ============================================================================= + +class TestFormatConversationHistory: + """Coverage for shared_runtime.format_conversation_history().""" + + def test_single_user_message(self): + hist = [("human", "hello")] + out = format_conversation_history(hist) + assert out == "User: hello" + + def test_single_agent_message(self): + hist = [("ai", "response")] + out = format_conversation_history(hist) + assert out == "Agent: response" + + def test_interleaved_history(self): + hist = [ + ("human", "hello"), + ("ai", "hi there"), + ("human", "what is 2+2?"), + ("ai", "four"), + ] + out = format_conversation_history(hist) + lines = out.split("\n") + assert lines[0] == "User: hello" + assert lines[1] == "Agent: hi there" + assert lines[2] == "User: what is 2+2?" + assert lines[3] == "Agent: four" + + def test_empty_history(self): + assert format_conversation_history([]) == "" + + +# ============================================================================= +# build_task_text +# ============================================================================= + +class TestBuildTaskText: + """Coverage for shared_runtime.build_task_text().""" + + def test_no_history_returns_user_message_unchanged(self): + assert build_task_text("do the thing", []) == "do the thing" + + def test_history_prepends_transcript(self): + hist = [("human", "hello"), ("ai", "hi")] + result = build_task_text("follow-up", hist) + assert "Conversation so far:" in result + assert "User: hello" in result + assert "Agent: hi" in result + assert "follow-up" in result + + def test_user_message_after_conversation_header(self): + hist = [("human", "hello")] + result = build_task_text("do it", hist) + assert result.startswith("Conversation so far:") + assert result.endswith("Current request: do it") + + def test_empty_user_message_with_history(self): + """Empty user_message is still rendered with history.""" + hist = [("human", "hello")] + result = build_task_text("", hist) + assert "Conversation so far:" in result + assert "Current request:" in result + + +# ============================================================================= +# append_peer_guidance +# ============================================================================= + +class TestAppendPeerGuidance: + """Coverage for shared_runtime.append_peer_guidance().""" + + def test_base_text_appended(self): + result = append_peer_guidance( + "base text", + peers_info="alpha: ws-1", + default_text="default", + tool_name="delegate_task", + ) + assert result.startswith("base text") + assert "## Peers" in result + assert "alpha: ws-1" in result + assert "Use delegate_task" in result + + def test_null_base_text_uses_default(self): + result = append_peer_guidance( + None, + peers_info="peer info", + default_text="DEFAULT_TEXT", + tool_name="tool", + ) + assert result.startswith("DEFAULT_TEXT") + + def test_whitespace_base_text_strips_to_empty_peers_still_added(self): + """Whitespace-only base_text is stripped but default_text is NOT used + (only None triggers the fallback). The peers section is still appended.""" + result = append_peer_guidance( + " ", + peers_info="peer", + default_text="DEF", + tool_name="t", + ) + # " ".strip() == ""; default_text is NOT substituted for whitespace + assert "## Peers" in result + assert "peer" in result + assert "DEF" not in result # default_text only on None, not whitespace + + def test_none_base_text_uses_default(self): + """None base_text triggers fallback to default_text.""" + result = append_peer_guidance( + None, + peers_info="peer", + default_text="DEFAULT", + tool_name="tool", + ) + assert result.startswith("DEFAULT") + assert "## Peers" in result + + def test_empty_peers_info_skips_section(self): + result = append_peer_guidance( + "base", + peers_info="", + default_text="def", + tool_name="tool", + ) + # No "## Peers" section when peers_info is empty + assert result == "base" + + def test_whitespace_in_base_and_peers_normalized(self): + result = append_peer_guidance( + " base \n", + peers_info=" peer-1 \n", + default_text="def", + tool_name="tool", + ) + # Base should be stripped of leading/trailing whitespace + assert result.startswith("base") + # Peer info should be appended + assert "peer-1" in result + + +# ============================================================================= +# brief_task +# ============================================================================= + +class TestBriefTask: + """Coverage for shared_runtime.brief_task().""" + + def test_short_text_returned_unchanged(self): + assert brief_task("hello", limit=60) == "hello" + + def test_exact_limit_no_ellipsis(self): + text = "A" * 60 + assert brief_task(text, limit=60) == text + assert "..." not in text + + def test_truncated_with_ellipsis(self): + text = "A" * 80 + result = brief_task(text, limit=60) + assert len(result) == 63 # 60 chars + "..." + assert result.endswith("...") + + def test_limit_10_shortens(self): + result = brief_task("hello world", limit=10) + assert len(result) == 13 # 10 chars + "..." + assert result.endswith("...") + + def test_limit_0_returns_ellipsis(self): + """limit=0 → 0-char slice + "..." since len("hello") > 0.""" + result = brief_task("hello", limit=0) + assert result == "..." + + def test_limit_1_single_char_plus_ellipsis(self): + result = brief_task("hello", limit=1) + assert len(result) == 4 # 1 char + "..." + assert result.startswith("h") + assert result.endswith("...")