Chat Session Context - Sub-Agent Tool Message Streaming Fix
Status: ๐ข In-use Category: Session & Context Date: October 25, 2024 Session Goal: Enable sub-agent tool messages to stream to end users for better transparency and debugging
๐ฏ Mission Accomplishedโ
Successfully implemented streaming of sub-agent tool messages from sub-agents (port 8001) through the supervisor (port 8000) to end users. Sub-agent tool details like ๐ง Calling tool: **version_service__version** and โ
Tool **version_service__version** completed are now visible in real-time.
๐ง Changes Madeโ
1. Supervisor Agent - Switch to astream with Custom Modeโ
File: ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
Key Changes:
-
Line 4: Added
import asynciofor CancelledError handling -
Lines 74-79: Changed from
astream_eventstoastreamwithstream_mode=['messages', 'custom']# OLD (doesn't capture custom events):
async for event in self.graph.astream_events(inputs, config, version="v2"):
event_type = event.get("event")
# NEW (captures both messages and custom events):
async for item_type, item in self.graph.astream(inputs, config, stream_mode=['messages', 'custom']): -
Lines 81-91: Added custom event handler
# Handle custom A2A event payloads from sub-agents
if item_type == 'custom' and isinstance(item, dict) and item.get("type") == "a2a_event":
custom_text = item.get("data", "")
if custom_text:
logging.info(f"Processing custom a2a_event from sub-agent: {len(custom_text)} chars")
yield {
"is_task_complete": False,
"require_user_input": False,
"content": custom_text,
}
continue -
Lines 93-99: Added message stream filtering
-
Lines 101-145: Changed from event-based to message-based processing:
on_chat_model_streamโisinstance(message, AIMessageChunk)on_tool_startโisinstance(message, AIMessage) with tool_callson_tool_endโisinstance(message, ToolMessage)
-
Lines 195-197: Added asyncio.CancelledError handling
except asyncio.CancelledError:
logging.info("Primary stream cancelled by client disconnection")
return
2. A2A Client - Remove Raw JSON Streamingโ
File: ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
Key Change:
-
Line 206: Removed raw JSON streaming that was causing duplicate output
# OLD (caused raw JSON to appear):
writer({"type": "a2a_event", "data": chunk_dump})
# NEW (only stream extracted text at line 251):
# Don't stream raw chunk_dump - we'll stream extracted text only at line 251 -
Line 251: This existing line now does the clean streaming:
writer({"type": "a2a_event", "data": text}) # Only clean text, not raw JSON
๐งช Testing Resultsโ
Test Command:โ
curl -X POST http://10.99.255.178:8000 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test-clean-output","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show argocd version"}],"messageId":"msg-clean-test"}}}'
Output - What Users Now See:โ
โ Sub-agent tool messages (NEW):
"text":"๐ง Calling tool: **version_service__version**\n""text":"โ Tool **version_service__version** completed\n""text":"The current version of ArgoCD is **v3.1.8+becb020**..."
โ Token-level streaming (still working):
- Individual tokens:
"###"," Ar","go","CD"," Version", etc.
โ Supervisor notifications (still working):
๐ง Calling argocd...โ argocd completed
โ Raw JSON (REMOVED):
- No more
{'id': '...', 'jsonrpc': '2.0', 'result': {...}}
Supervisor Logs Confirm Success:โ
2025-10-25 18:30:55 [root] [INFO] [stream:85] Processing custom a2a_event from sub-agent: 45 chars
2025-10-25 18:30:56 [root] [INFO] [stream:85] Processing custom a2a_event from sub-agent: 46 chars
2025-10-25 18:30:57 [root] [INFO] [stream:85] Processing custom a2a_event from sub-agent: 403 chars
- 45 chars =
๐ง Calling tool: **version_service__version**\n - 46 chars =
โ Tool **version_service__version** completed\n - 403 chars = Full version response
๐ Architecture Understandingโ
The Problem (Before Fix):โ
-
Primary Streaming Mode:
astream_eventswith version="v2"- โ
Captures:
on_chat_model_stream,on_tool_start,on_tool_end - โ Ignores: Custom events from
get_stream_writer()
- โ
Captures:
-
Fallback Mode:
astreamwithstream_mode=['messages', 'custom', 'updates']- โ Captures: Custom events
- โ ๏ธ Only triggered on exceptions (never used in normal flow)
The Solution (After Fix):โ
-
Primary Streaming Mode:
astreamwithstream_mode=['messages', 'custom']- โ Captures: AIMessageChunk for token streaming
- โ
Captures: Custom events with
item_type == 'custom' - โ Captures: AIMessage with tool_calls for tool start
- โ Captures: ToolMessage for tool completion
-
Event Flow:
Sub-Agent (8001)
โ Generates status-update events with tool messages
โ A2A Client (a2a_remote_agent_connect.py line 251)
โ Extracts text from status.message.parts[0].text
โ Calls writer({"type": "a2a_event", "data": text})
โ get_stream_writer() emits custom event
โ Supervisor astream with 'custom' mode (agent.py line 82)
โ Yields content to end user
โ Clean text appears in SSE stream โ
๐ Files Modified (Not Yet Committed)โ
Modified Files:โ
-
ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
- Added asyncio import
- Switched from astream_events to astream
- Added custom event handler
- Converted event handlers to message-based
-
ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
- Removed line 206 that was streaming raw JSON
-
docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md
- Updated Mermaid diagram to show working flow
- Changed broken paths to working paths
- Updated "What User Sees" section to show all โ
Previously Committed:โ
git commit -m "Add querying announcement detection and _get_tool_purpose to supervisor agent"
# Committed: 10 files changed, 887 insertions(+), 72 deletions(-)
๐ Next Steps (When You Resume)โ
Immediate:โ
-
Commit the fix:
git add ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
git add ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
git add docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md
git commit -m "Fix sub-agent tool message streaming to end users
- Switched supervisor from astream_events to astream with custom mode
- Added custom event handler to process a2a_event types from sub-agents
- Removed raw JSON streaming from a2a_remote_agent_connect.py line 206
- Sub-agent tool messages now visible to end users for better transparency
- Token-level streaming still intact via AIMessageChunk
- Updated documentation with working architecture diagram" -
Test edge cases (optional):
- Multiple sub-agent calls in parallel
- Sub-agent errors and how they stream
- Long-running tool calls
-
Update documentation (optional):
- Add "Solution Implemented" section to the markdown doc
- Document the before/after behavior
- Add troubleshooting guide
Future Work (from TODO list):โ
- Add on_tool_start logic to base_langgraph_agent.py (pending)
- Generate ๐ Querying announcements programmatically
- Currently using LLM-generated announcements
๐ Key Technical Discoveriesโ
1. LangGraph Streaming Modes:โ
astream_events: Does NOT process custom events fromget_stream_writer()astreamwithstream_mode=['messages', 'custom']: DOES process custom events- Custom events must be checked with
item_type == 'custom'
2. A2A Event Types:โ
task: Initial request (state: submitted)status-update: Progress notifications (final: false/true, contains message.parts[].text)artifact-update: Content streaming (append: true/false, contains parts[].text)
3. Event Flow Timeline (from live capture):โ
| # | Time | Event Type | Purpose | Text Content |
|---|---|---|---|---|
| 1 | T+0ms | task | Initialize | state: "submitted" |
| 2 | T+500ms | status-update | Tool start | "๐ง Calling tool: version_service__version" |
| 3 | T+800ms | status-update | Tool complete | "โ Tool version_service__version completed" |
| 4 | T+1000ms | status-update | Response | Full version details (500+ chars) |
| 5 | T+1200ms | artifact-update | Result marker | Empty string, lastChunk: true |
| 6 | T+1250ms | status-update | Completion | final: true, state: "completed" |
4. Two Separate Processes:โ
-
Supervisor (port 8000):
platform-engineer-p2pservice- Files:
agent.py,a2a_remote_agent_connect.py - Role: Orchestrates sub-agents, processes end-user requests
- Files:
-
Sub-Agent (port 8001):
agent-argocd-p2pservice (example)- Files:
base_strands_agent.py - Role: Executes domain-specific tools, generates detailed status updates
- Files:
๐ Debugging Commandsโ
Restart Services:โ
docker restart platform-engineer-p2p
docker logs platform-engineer-p2p --tail 50
Test Supervisor:โ
curl -X POST http://10.99.255.178:8000 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show argocd version"}],"messageId":"msg-test"}}}' \
| head -40
Test Sub-Agent Directly:โ
curl -X POST http://10.99.255.178:8001 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show version"}],"messageId":"msg-test"}}}' \
| head -40
Check Logs for Custom Events:โ
docker logs platform-engineer-p2p 2>&1 | tail -100 | grep -E "custom|a2a_event"
๐ Related Documentationโ
Files to Reference:โ
-
Architecture Diagram:
docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md- Comprehensive Mermaid diagram showing event flow
- A2A event type specifications
- Protocol communication details
-
Previous Work:
docs/docs/changes/2024-10-22-a2a-intermediate-states.md- Background on A2A protocol
-
Prompt Config:
charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml- System prompt for Deep Agent (๐ Querying instructions removed)
Docker Configuration:โ
- docker-compose.dev.yaml line 11: Volume mount for prompt config
platform-engineer-p2p:
volumes:
- ./charts/ai-platform-engineering/data/prompt_config.deep_agent.yaml:/app/prompt_config.yaml
๐ก Important Contextโ
Why This Fix Was Needed:โ
Users could only see:
- โ Supervisor-level tool calls:
๐ง Calling argocd... - โ Not sub-agent-level tool calls:
๐ง Calling tool: **version_service__version**
This lack of visibility made debugging difficult when sub-agents had issues.
What This Fix Enables:โ
- โ Complete transparency into sub-agent operations
- โ Better debugging when tools fail
- โ Real-time progress updates from sub-agents
- โ No performance degradation (still token-level streaming)
Alternative Approaches Considered:โ
Add- Not possible, astream_events ignores custom eventson_customhandler toastream_eventsUse fallback mode as primary- Too risky, fallback is for errors- โ
Switch to
astreamwithstream_mode=['messages', 'custom']- Clean solution that works
๐ Lessons Learnedโ
- LangGraph Streaming Architecture: Two fundamentally different modes with different capabilities
- Custom Events: Must use
astreamwith 'custom' mode, notastream_events - Double Streaming: Be careful not to stream both raw and processed data
- Message-Based vs Event-Based: When using
astream, process messages not events - Testing is Critical: Raw JSON in output was only caught through end-to-end testing
๐ Quick Linksโ
- Supervisor Container:
docker exec -it platform-engineer-p2p bash - Sub-Agent Container:
docker exec -it agent-argocd-p2p bash - Logs:
docker logs -f platform-engineer-p2p - Documentation:
docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md
โ TODO Statusโ
Completed:
- Switch supervisor from astream_events to astream with custom mode
- Remove raw JSON streaming from a2a_remote_agent_connect.py
- Update Mermaid diagram to show working flow
- Test and verify sub-agent tool messages stream to users
Pending:
- Commit all changes
- Add on_tool_start logic to base_langgraph_agent.py for ๐ Querying announcements
End of Session Context