Architecture: Chat Session Context - Sub-Agent Tool Message Streaming Fix
Date: 2024-10-25
🎯 Mission Accomplished
Successfully implemented streaming of sub-agent tool messages from sub-agents (port 8001) through the supervisor (port 8000) to end users. Sub-agent tool details like 🔧 Calling tool: **version_service__version** and ✅ Tool **version_service__version** completed are now visible in real-time.
🔧 Changes Made
1. Supervisor Agent - Switch to astream with Custom Mode
File: ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
Key Changes:
-
Line 4: Added
import asynciofor CancelledError handling -
Lines 74-79: Changed from
astream_eventstoastreamwithstream_mode=['messages', 'custom']# OLD (doesn't capture custom events):
async for event in self.graph.astream_events(inputs, config, version="v2"):
event_type = event.get("event")
# NEW (captures both messages and custom events):
async for item_type, item in self.graph.astream(inputs, config, stream_mode=['messages', 'custom']): -
Lines 81-91: Added custom event handler
# Handle custom A2A event payloads from sub-agents
if item_type == 'custom' and isinstance(item, dict) and item.get("type") == "a2a_event":
custom_text = item.get("data", "")
if custom_text:
logging.info(f"Processing custom a2a_event from sub-agent: {len(custom_text)} chars")
yield {
"is_task_complete": False,
"require_user_input": False,
"content": custom_text,
}
continue -
Lines 93-99: Added message stream filtering
-
Lines 101-145: Changed from event-based to message-based processing:
on_chat_model_stream→isinstance(message, AIMessageChunk)on_tool_start→isinstance(message, AIMessage) with tool_callson_tool_end→isinstance(message, ToolMessage)
-
Lines 195-197: Added asyncio.CancelledError handling
except asyncio.CancelledError:
logging.info("Primary stream cancelled by client disconnection")
return
2. A2A Client - Remove Raw JSON Streaming
File: ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
Key Change:
-
Line 206: Removed raw JSON streaming that was causing duplicate output
# OLD (caused raw JSON to appear):
writer({"type": "a2a_event", "data": chunk_dump})
# NEW (only stream extracted text at line 251):
# Don't stream raw chunk_dump - we'll stream extracted text only at line 251 -
Line 251: This existing line now does the clean streaming:
writer({"type": "a2a_event", "data": text}) # Only clean text, not raw JSON
📊 Architecture Understanding
The Problem (Before Fix):
-
Primary Streaming Mode:
astream_eventswith version="v2"- ✅ Captures:
on_chat_model_stream,on_tool_start,on_tool_end - ❌ Ignores: Custom events from
get_stream_writer()
- ✅ Captures:
-
Fallback Mode:
astreamwithstream_mode=['messages', 'custom', 'updates']- ✅ Captures: Custom events
- ⚠️ Only triggered on exceptions (never used in normal flow)
The Solution (After Fix):
-
Primary Streaming Mode:
astreamwithstream_mode=['messages', 'custom']- ✅ Captures: AIMessageChunk for token streaming
- ✅ Captures: Custom events with
item_type == 'custom' - ✅ Captures: AIMessage with tool_calls for tool start
- ✅ Captures: ToolMessage for tool completion
-
Event Flow:
Sub-Agent (8001)
→ Generates status-update events with tool messages
→ A2A Client (a2a_remote_agent_connect.py line 251)
→ Extracts text from status.message.parts[0].text
→ Calls writer({"type": "a2a_event", "data": text})
→ get_stream_writer() emits custom event
→ Supervisor astream with 'custom' mode (agent.py line 82)
→ Yields content to end user
→ Clean text appears in SSE stream ✅
📝 Files Modified (Not Yet Committed)
Modified Files:
-
ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
- Added asyncio import
- Switched from astream_events to astream
- Added custom event handler
- Converted event handlers to message-based
-
ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
- Removed line 206 that was streaming raw JSON
-
docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md
- Updated Mermaid diagram to show working flow
- Changed broken paths to working paths
- Updated "What User Sees" section to show all ✅
Previously Committed:
git commit -m "Add querying announcement detection and _get_tool_purpose to supervisor agent"
# Committed: 10 files changed, 887 insertions(+), 72 deletions(-)
🚀 Next Steps (When You Resume)
Immediate:
-
Commit the fix:
git add ai_platform_engineering/multi_agents/platform_engineer/protocol_bindings/a2a/agent.py
git add ai_platform_engineering/utils/a2a_common/a2a_remote_agent_connect.py
git add docs/docs/changes/2024-10-25-sub-agent-tool-message-streaming.md
git commit -m "Fix sub-agent tool message streaming to end users
- Switched supervisor from astream_events to astream with custom mode
- Added custom event handler to process a2a_event types from sub-agents
- Removed raw JSON streaming from a2a_remote_agent_connect.py line 206
- Sub-agent tool messages now visible to end users for better transparency
- Token-level streaming still intact via AIMessageChunk
- Updated documentation with working architecture diagram" -
Test edge cases (optional):
- Multiple sub-agent calls in parallel
- Sub-agent errors and how they stream
- Long-running tool calls
-
Update documentation (optional):
- Add "Solution Implemented" section to the markdown doc
- Document the before/after behavior
- Add troubleshooting guide
Future Work (from TODO list):
- Add on_tool_start logic to base_langgraph_agent.py (pending)
- Generate 🔍 Querying announcements programmatically
- Currently using LLM-generated announcements
🔍 Key Technical Discoveries
1. LangGraph Streaming Modes:
astream_events: Does NOT process custom events fromget_stream_writer()astreamwithstream_mode=['messages', 'custom']: DOES process custom events- Custom events must be checked with
item_type == 'custom'
2. A2A Event Types:
task: Initial request (state: submitted)status-update: Progress notifications (final: false/true, contains message.parts[].text)artifact-update: Content streaming (append: true/false, contains parts[].text)
3. Event Flow Timeline (from live capture):
| # | Time | Event Type | Purpose | Text Content |
|---|---|---|---|---|
| 1 | T+0ms | task | Initialize | state: "submitted" |
| 2 | T+500ms | status-update | Tool start | "🔧 Calling tool: version_service__version" |
| 3 | T+800ms | status-update | Tool complete | "✅ Tool version_service__version completed" |
| 4 | T+1000ms | status-update | Response | Full version details (500+ chars) |
| 5 | T+1200ms | artifact-update | Result marker | Empty string, lastChunk: true |
| 6 | T+1250ms | status-update | Completion | final: true, state: "completed" |
4. Two Separate Processes:
-
Supervisor (port 8000):
platform-engineer-p2pservice- Files:
agent.py,a2a_remote_agent_connect.py - Role: Orchestrates sub-agents, processes end-user requests
- Files:
-
Sub-Agent (port 8001):
agent-argocd-p2pservice (example)- Files:
base_strands_agent.py - Role: Executes domain-specific tools, generates detailed status updates
- Files:
🐛 Debugging Commands
Restart Services:
docker restart platform-engineer-p2p
docker logs platform-engineer-p2p --tail 50
Test Supervisor:
curl -X POST http://10.99.255.178:8000 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show argocd version"}],"messageId":"msg-test"}}}' \
| head -40
Test Sub-Agent Directly:
curl -X POST http://10.99.255.178:8001 \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"id":"test","method":"message/stream","params":{"message":{"role":"user","parts":[{"kind":"text","text":"show version"}],"messageId":"msg-test"}}}' \
| head -40
Check Logs for Custom Events:
docker logs platform-engineer-p2p 2>&1 | tail -100 | grep -E "custom|a2a_event"
💡 Important Context
Why This Fix Was Needed:
Users could only see:
- ❌ Supervisor-level tool calls:
🔧 Calling argocd... - ❌ Not sub-agent-level tool calls:
🔧 Calling tool: **version_service__version**
This lack of visibility made debugging difficult when sub-agents had issues.
What This Fix Enables:
- ✅ Complete transparency into sub-agent operations
- ✅ Better debugging when tools fail
- ✅ Real-time progress updates from sub-agents
- ✅ No performance degradation (still token-level streaming)
Alternative Approaches Considered:
Add- Not possible, astream_events ignores custom eventson_customhandler toastream_eventsUse fallback mode as primary- Too risky, fallback is for errors- ✅ Switch to
astreamwithstream_mode=['messages', 'custom']- Clean solution that works
🎓 Lessons Learned
- LangGraph Streaming Architecture: Two fundamentally different modes with different capabilities
- Custom Events: Must use
astreamwith 'custom' mode, notastream_events - Double Streaming: Be careful not to stream both raw and processed data
- Message-Based vs Event-Based: When using
astream, process messages not events - Testing is Critical: Raw JSON in output was only caught through end-to-end testing
🔗 Quick Links
- Supervisor Container:
docker exec -it platform-engineer-p2p bash - Sub-Agent Container:
docker exec -it agent-argocd-p2p bash - Logs:
docker logs -f platform-engineer-p2p - Documentation: 2024-10-25-sub-agent-tool-message-streaming
Related
- Spec: spec.md