Skip to main content

Chat Session Context - Sub-Agent Tool Message Streaming Fix

Status: ๐ŸŸข In-use Category: Session & Context Date: October 25, 2024 Session Goal: Enable sub-agent tool messages to stream to end users for better transparency and debugging


๐ŸŽฏ Mission Accomplishedโ€‹

Successfully implemented streaming of sub-agent tool messages from sub-agents (port 8001) through the supervisor (port 8000) to end users. Sub-agent tool details like ๐Ÿ”ง Calling tool: **version_service__version** and โœ… Tool **version_service__version** completed are now visible in real-time.


๐Ÿ”ง Changes Madeโ€‹

1. Supervisor Agent - Switch to astream with Custom Modeโ€‹

File: ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py

Key Changes:

  • Line 4: Added import asyncio for CancelledError handling

  • Lines 74-79: Changed from astream_events to astream with stream_mode=['messages', 'custom']

    # OLD (doesn't capture custom events):
    async for event in self.graph.astream_events(inputs, config, version="v2"):
    event_type = event.get("event")

    # NEW (captures both messages and custom events):
    async for item_type, item in self.graph.astream(inputs, config, stream_mode=['messages', 'custom']):
  • Lines 81-91: Added custom event handler

    # Handle custom A2A event payloads from sub-agents
    if item_type == 'custom' and isinstance(item, dict) and item.get("type") == "a2a_event":
    custom_text = item.get("data", "")
    if custom_text:
    logging.info(f"Processing custom a2a_event from sub-agent: {len(custom_text)} chars")
    yield {
    "is_task_complete": False,
    "require_user_input": False,
    "content": custom_text,
    }
    continue
  • Lines 93-99: Added message stream filtering

  • Lines 101-145: Changed from event-based to message-based processing:

    • on_chat_model_stream โ†’ isinstance(message, AIMessageChunk)
    • on_tool_start โ†’ isinstance(message, AIMessage) with tool_calls
    • on_tool_end โ†’ isinstance(message, ToolMessage)
  • Lines 195-197: Added asyncio.CancelledError handling

    except asyncio.CancelledError:
    logging.info("Primary stream cancelled by client disconnection")
    return

2. A2A Client - Remove Raw JSON Streamingโ€‹

File: ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py

Key Change:

  • Line 206: Removed raw JSON streaming that was causing duplicate output

    # OLD (caused raw JSON to appear):
    writer({"type": "a2a_event", "data": chunk_dump})

    # NEW (only stream extracted text at line 251):
    # Don't stream raw chunk_dump - we'll stream extracted text only at line 251
  • Line 251: This existing line now does the clean streaming:

    writer({"type": "a2a_event", "data": text})  # Only clean text, not raw JSON

๐Ÿงช Testing Resultsโ€‹

Test Command:โ€‹

curl -X POST http://10.99.255.178:8000 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test-clean-output","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show argocd version"}],"messageId":"msg-clean-test"}}}'

Output - What Users Now See:โ€‹

โœ… Sub-agent tool messages (NEW):

  • "text":"๐Ÿ”ง Calling tool: **version_service__version**\n"
  • "text":"โœ… Tool **version_service__version** completed\n"
  • "text":"The current version of ArgoCD is **v3.1.8+becb020**..."

โœ… Token-level streaming (still working):

  • Individual tokens: "###", " Ar", "go", "CD", " Version", etc.

โœ… Supervisor notifications (still working):

  • ๐Ÿ”ง Calling argocd...
  • โœ… argocd completed

โŒ Raw JSON (REMOVED):

  • No more {'id': '...', 'jsonrpc': '2.0', 'result': {...}}

Supervisor Logs Confirm Success:โ€‹

2025-10-25 18:30:55 [root] [INFO] [stream:85] Processing custom a2a_event from sub-agent: 45 chars
2025-10-25 18:30:56 [root] [INFO] [stream:85] Processing custom a2a_event from sub-agent: 46 chars
2025-10-25 18:30:57 [root] [INFO] [stream:85] Processing custom a2a_event from sub-agent: 403 chars
  • 45 chars = ๐Ÿ”ง Calling tool: **version_service__version**\n
  • 46 chars = โœ… Tool **version_service__version** completed\n
  • 403 chars = Full version response

๐Ÿ“Š Architecture Understandingโ€‹

The Problem (Before Fix):โ€‹

  1. Primary Streaming Mode: astream_events with version="v2"

    • โœ… Captures: on_chat_model_stream, on_tool_start, on_tool_end
    • โŒ Ignores: Custom events from get_stream_writer()
  2. Fallback Mode: astream with stream_mode=['messages', 'custom', 'updates']

    • โœ… Captures: Custom events
    • โš ๏ธ Only triggered on exceptions (never used in normal flow)

The Solution (After Fix):โ€‹

  1. Primary Streaming Mode: astream with stream_mode=['messages', 'custom']

    • โœ… Captures: AIMessageChunk for token streaming
    • โœ… Captures: Custom events with item_type == 'custom'
    • โœ… Captures: AIMessage with tool_calls for tool start
    • โœ… Captures: ToolMessage for tool completion
  2. Event Flow:

    Sub-Agent (8001)
    โ†’ Generates status-update events with tool messages
    โ†’ A2A Client (a2a_remote_agent_connect.py line 251)
    โ†’ Extracts text from status.message.parts[0].text
    โ†’ Calls writer({"type": "a2a_event", "data": text})
    โ†’ get_stream_writer() emits custom event
    โ†’ Supervisor astream with 'custom' mode (agent.py line 82)
    โ†’ Yields content to end user
    โ†’ Clean text appears in SSE stream โœ…

๐Ÿ“ Files Modified (Not Yet Committed)โ€‹

Modified Files:โ€‹

  1. ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py

    • Added asyncio import
    • Switched from astream_events to astream
    • Added custom event handler
    • Converted event handlers to message-based
  2. ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py

    • Removed line 206 that was streaming raw JSON
  3. docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md

    • Updated Mermaid diagram to show working flow
    • Changed broken paths to working paths
    • Updated "What User Sees" section to show all โœ…

Previously Committed:โ€‹

git commit -m "Add querying announcement detection and _get_tool_purpose to supervisor agent"
# Committed: 10 files changed, 887 insertions(+), 72 deletions(-)

๐Ÿš€ Next Steps (When You Resume)โ€‹

Immediate:โ€‹

  1. Commit the fix:

    git add ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
    git add ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
    git add docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md
    git commit -m "Fix sub-agent tool message streaming to end users

    - Switched supervisor from astream_events to astream with custom mode
    - Added custom event handler to process a2a_event types from sub-agents
    - Removed raw JSON streaming from a2a_remote_agent_connect.py line 206
    - Sub-agent tool messages now visible to end users for better transparency
    - Token-level streaming still intact via AIMessageChunk
    - Updated documentation with working architecture diagram"
  2. Test edge cases (optional):

    • Multiple sub-agent calls in parallel
    • Sub-agent errors and how they stream
    • Long-running tool calls
  3. Update documentation (optional):

    • Add "Solution Implemented" section to the markdown doc
    • Document the before/after behavior
    • Add troubleshooting guide

Future Work (from TODO list):โ€‹

  1. Add on_tool_start logic to base_langgraph_agent.py (pending)
    • Generate ๐Ÿ” Querying announcements programmatically
    • Currently using LLM-generated announcements

๐Ÿ” Key Technical Discoveriesโ€‹

1. LangGraph Streaming Modes:โ€‹

  • astream_events: Does NOT process custom events from get_stream_writer()
  • astream with stream_mode=['messages', 'custom']: DOES process custom events
  • Custom events must be checked with item_type == 'custom'

2. A2A Event Types:โ€‹

  1. task: Initial request (state: submitted)
  2. status-update: Progress notifications (final: false/true, contains message.parts[].text)
  3. artifact-update: Content streaming (append: true/false, contains parts[].text)

3. Event Flow Timeline (from live capture):โ€‹

#TimeEvent TypePurposeText Content
1T+0mstaskInitializestate: "submitted"
2T+500msstatus-updateTool start"๐Ÿ”ง Calling tool: version_service__version"
3T+800msstatus-updateTool complete"โœ… Tool version_service__version completed"
4T+1000msstatus-updateResponseFull version details (500+ chars)
5T+1200msartifact-updateResult markerEmpty string, lastChunk: true
6T+1250msstatus-updateCompletionfinal: true, state: "completed"

4. Two Separate Processes:โ€‹

  • Supervisor (port 8000): platform-engineer-p2p service

    • Files: agent.py, a2a_remote_agent_connect.py
    • Role: Orchestrates sub-agents, processes end-user requests
  • Sub-Agent (port 8001): agent-argocd-p2p service (example)

    • Files: base_strands_agent.py
    • Role: Executes domain-specific tools, generates detailed status updates

๐Ÿ› Debugging Commandsโ€‹

Restart Services:โ€‹

docker restart platform-engineer-p2p
docker logs platform-engineer-p2p --tail 50

Test Supervisor:โ€‹

curl -X POST http://10.99.255.178:8000 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show argocd version"}],"messageId":"msg-test"}}}' \
| head -40

Test Sub-Agent Directly:โ€‹

curl -X POST http://10.99.255.178:8001 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show version"}],"messageId":"msg-test"}}}' \
| head -40

Check Logs for Custom Events:โ€‹

docker logs platform-engineer-p2p 2>&1 | tail -100 | grep -E "custom|a2a_event"

Files to Reference:โ€‹

  1. Architecture Diagram: docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md

    • Comprehensive Mermaid diagram showing event flow
    • A2A event type specifications
    • Protocol communication details
  2. Previous Work: docs/docs/changes/2024-10-22-a2a-intermediate-states.md

    • Background on A2A protocol
  3. Prompt Config: charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml

    • System prompt for Deep Agent (๐Ÿ” Querying instructions removed)

Docker Configuration:โ€‹

  • docker-compose.dev.yaml line 11: Volume mount for prompt config
    platform-engineer-p2p:
    volumes:
    - ./charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml:/app/prompt_config.yaml

๐Ÿ’ก Important Contextโ€‹

Why This Fix Was Needed:โ€‹

Users could only see:

  • โŒ Supervisor-level tool calls: ๐Ÿ”ง Calling argocd...
  • โŒ Not sub-agent-level tool calls: ๐Ÿ”ง Calling tool: **version_service__version**

This lack of visibility made debugging difficult when sub-agents had issues.

What This Fix Enables:โ€‹

  • โœ… Complete transparency into sub-agent operations
  • โœ… Better debugging when tools fail
  • โœ… Real-time progress updates from sub-agents
  • โœ… No performance degradation (still token-level streaming)

Alternative Approaches Considered:โ€‹

  1. Add on_custom handler to astream_events - Not possible, astream_events ignores custom events
  2. Use fallback mode as primary - Too risky, fallback is for errors
  3. โœ… Switch to astream with stream_mode=['messages', 'custom'] - Clean solution that works

๐ŸŽ“ Lessons Learnedโ€‹

  1. LangGraph Streaming Architecture: Two fundamentally different modes with different capabilities
  2. Custom Events: Must use astream with 'custom' mode, not astream_events
  3. Double Streaming: Be careful not to stream both raw and processed data
  4. Message-Based vs Event-Based: When using astream, process messages not events
  5. Testing is Critical: Raw JSON in output was only caught through end-to-end testing

  • Supervisor Container: docker exec -it platform-engineer-p2p bash
  • Sub-Agent Container: docker exec -it agent-argocd-p2p bash
  • Logs: docker logs -f platform-engineer-p2p
  • Documentation: docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md

โœ… TODO Statusโ€‹

Completed:

  • Switch supervisor from astream_events to astream with custom mode
  • Remove raw JSON streaming from a2a_remote_agent_connect.py
  • Update Mermaid diagram to show working flow
  • Test and verify sub-agent tool messages stream to users

Pending:

  • Commit all changes
  • Add on_tool_start logic to base_langgraph_agent.py for ๐Ÿ” Querying announcements

End of Session Context