Feature Specification: Unify Single-Node (All-in-One) and Distributed A2A Binding Layer
Feature Branch: 098-unify-single-distributed-binding
Created: 2026-04-08
Status: Implemented (per-agent distribution in progress)
Input: Unify the single-node (all-in-one) and distributed supervisor deployment modes into a single codebase, eliminating parallel file implementations. Add per-agent distribution control via DISTRIBUTED_AGENTS.
User Scenarios & Testing (mandatory)β
User Story 1 - Single Codebase for All Deployments (Priority: P1)β
As a platform engineer deploying the AI Platform Engineering supervisor, I want a single set of source files that works for both single-node (all-in-one) and distributed (remote A2A agents) modes, so that I don't have to maintain or reason about two parallel implementations.
Why this priority: Eliminates ~4,700 lines of duplicated code across 4 parallel file pairs. Reduces the risk of behavioral drift where fixes applied to one mode are missed in the other (as was the case with "taskβtask" notifications and missing execution plans).
Independent Test: Deploy the supervisor with DISTRIBUTED_MODE=false (single-node / all-in-one) and verify subagents load MCP tools in-process. Then deploy with DISTRIBUTED_MODE=true and verify subagents connect to remote A2A containers. Both should stream execution plans, correct tool notifications, and HITL forms.
Acceptance Scenarios:
- Given a supervisor deployed with
DISTRIBUTED_MODE=false, When a user asks "list my Jira tickets", Then the Jira subagent loads MCP tools in-process and returns results with proper "jira" tool notifications (not "taskβtask"). - Given a supervisor deployed with
DISTRIBUTED_MODE=true, When a user asks "list my Jira tickets", Then the Jira subagent connects to the remote A2A Jira container and returns results with proper "jira" tool notifications. - Given either deployment mode, When the supervisor starts a multi-step task, Then the
write_todosexecution plan is displayed to the user with agent-tagged steps (e.g.,[Jira] Search for tickets).
User Story 5 - Per-Agent Distribution Control (Priority: P2)β
As a platform engineer, I want to choose which specific agents run as remote A2A containers and which run in-process, so that I can progressively migrate agents to distributed mode without an all-or-nothing switch.
Why this priority: The binary DISTRIBUTED_MODE toggle is too coarse. In practice, some agents (e.g., ArgoCD managing 800+ applications) benefit from running in their own container with dedicated memory, while lightweight agents (e.g., Jira, GitHub) can run in-process to reduce infrastructure cost and latency.
Design: A single comma-separated env var DISTRIBUTED_AGENTS lists agents that should run as remote A2A subagents. All other enabled agents load MCP tools in-process. The special value all distributes every agent (equivalent to the legacy DISTRIBUTED_MODE=true).
# Only ArgoCD and AWS run remotely; Jira, GitHub, etc. run in-process
DISTRIBUTED_AGENTS=argocd,aws
# All agents distributed (same as legacy DISTRIBUTED_MODE=true)
DISTRIBUTED_AGENTS=all
# All agents in-process (default when unset)
# DISTRIBUTED_AGENTS=
Backward compatibility: DISTRIBUTED_MODE=true is treated as DISTRIBUTED_AGENTS=all. If both are set, DISTRIBUTED_AGENTS takes precedence (more specific wins).
Independent Test: Set DISTRIBUTED_AGENTS=argocd and verify ArgoCD connects via remote A2A while Jira loads MCP tools in-process, within the same supervisor instance.
Acceptance Scenarios:
- Given
DISTRIBUTED_AGENTS=argocd, When the supervisor initializes, Then ArgoCD is created as a remote A2A subagent and Jira/GitHub/etc. load MCP tools in-process. - Given
DISTRIBUTED_AGENTS=all, When the supervisor initializes, Then all agents are created as remote A2A subagents (same behavior asDISTRIBUTED_MODE=true). - Given
DISTRIBUTED_AGENTSis unset andDISTRIBUTED_MODEis unset, When the supervisor initializes, Then all agents load MCP tools in-process (fully single-node / all-in-one). - Given
DISTRIBUTED_AGENTS=argocdandENABLE_ARGOCD=false, When the supervisor initializes, Then ArgoCD is skipped entirely (enable/disable takes precedence over distribution mode).
User Story 2 - Correct Tool Notifications in All Modes (Priority: P1)β
As a user interacting with the platform via Slack or the web UI, I want to see which specific agent is handling my request (e.g., "Calling Agent Jira...") instead of generic "taskβtask" labels, so that I understand what's happening.
Why this priority: This was the original bug that motivated the unification. The distributed binding (agent.py) displayed "taskβtask" while the single-node (all-in-one) binding (agent_single.py) correctly extracted subagent_type from the task() tool call arguments.
Independent Test: Send a multi-agent query and verify tool notification text contains the actual subagent name, not "task".
Acceptance Scenarios:
- Given a user asks to compare Jira and GitHub issues, When the supervisor delegates to subagents, Then streaming notifications show "Calling Agent Jira..." and "Calling Agent Github..." (not "Calling Agent Task...").
- Given a
task()tool call withsubagent_type: "github"in its arguments, When the binding processes the tool call, Then it extracts "github" from the arguments and uses it as thesource_agentin the notification artifact.
User Story 3 - HITL Form Support Across All Modes (Priority: P2)β
As a user invoking a self-service workflow (e.g., "Create GitHub Repo"), I want the system to present a Human-in-the-Loop input form so I can provide required parameters, regardless of whether the supervisor is running in single-node (all-in-one) or distributed mode.
Why this priority: HITL/interrupt support was only implemented in the single-node (all-in-one) binding. Distributed deployments lacked GraphInterrupt handling, Command resume, and form-based user input.
Independent Test: Invoke a self-service workflow and verify the HITL form is presented, user can submit values, and the workflow resumes with submitted data.
Acceptance Scenarios:
- Given a user invokes the "Create GitHub Repo" workflow, When the CAIPE subagent needs input, Then a structured form (CAIPEAgentResponse) is presented to the user via the A2A
input_requiredstate. - Given the user submits form values via A2A DataPart resume, When the executor receives the resume command, Then it constructs a
Commandand the graph resumes from the interrupt point.
User Story 4 - Skills Middleware Available in Unified Deployment (Priority: P3)β
As a platform administrator, I want the skills middleware REST API to be available alongside the A2A routes in both deployment modes, so that skill catalog management works regardless of how the supervisor is deployed.
Why this priority: The skills router mount was only in the distributed main.py. Single-node (all-in-one) main_single.py didn't mount it.
Independent Test: After starting the supervisor, call the skills middleware endpoints and verify they respond.
Acceptance Scenarios:
- Given the unified
main.py, When the server starts, Then the skills middleware router is mounted at/after A2A routes. - Given a request to a skills endpoint, When it doesn't match any A2A route, Then it falls through to the skills middleware FastAPI sub-app.
User Story 6 - RAG Tool Reliability: Call Caps, Output Truncation, and Context Protection (Priority: P1)β
As a user querying the knowledge base via the platform, I want RAG tool calls to be capped and their output truncated so that queries complete successfully instead of hitting LangGraph recursion limits or context window overflow.
Why this priority: Without caps, the model enters runaway loops β calling fetch_document or search indefinitely until the recursion limit (previously 100) is hit, producing the generic error "I ran into an issue while processing your request." Output truncation prevents a single RAG result from consuming the entire context window (~128K tokens).
Design: Two wrapper classes (FetchDocumentCapWrapper, SearchCapWrapper) enforce per-query call limits. A shared _CapCounterMixin provides thread-safe counting keyed by LangGraph thread_id. When the cap is hit, wrappers return a normal-looking success string (not an exception) instructing the model to synthesize from retrieved content. Per-call output is truncated to RAG_MAX_OUTPUT_CHARS (default 10K chars). Search results per call are capped to RAG_MAX_SEARCH_RESULTS (default 3).
Key design decision: Returning a success string instead of raising ToolInvocationError. Raising exceptions created is_error=True ToolMessages, which the model interpreted as "this document failed, try the next one" β defeating the cap. A normal-looking response signals completion, not failure.
Environment variables:
FETCH_DOCUMENT_MAX_CALLS(default 10): max fetch_document calls per querySEARCH_MAX_CALLS(default 5): max search calls per queryRAG_MAX_OUTPUT_CHARS(default 10000): per-call output character limitRAG_MAX_SEARCH_RESULTS(default 3): max results returned per search callLANGGRAPH_RECURSION_LIMIT(default 500): configurable LangGraph recursion limit
Independent Test: Ask a broad RAG query (e.g., "what is agntcy slim?") and verify the response completes within 60 seconds without hitting the recursion limit, and the answer is synthesized from retrieved documents.
Acceptance Scenarios:
- Given
FETCH_DOCUMENT_MAX_CALLS=10, When a RAG query triggers more than 10 fetch_document calls, Then the 11th call returns a success message instructing synthesis, and the model stops calling fetch_document. - Given
SEARCH_MAX_CALLS=5, When a RAG query triggers more than 5 search calls, Then the 6th call returns a success message and the model synthesizes from existing results. - Given
RAG_MAX_OUTPUT_CHARS=10000, When a single fetch_document result exceeds 10K characters, Then the output is truncated with a "[Output truncated]" suffix. - Given
RAG_MAX_SEARCH_RESULTS=3, When the model requestslimit=10in a search call, Then the wrapper overrides it to 3 before calling the underlying tool. - Given
LANGGRAPH_RECURSION_LIMIT=500, When a runaway loop occurs despite caps, Then the error is caught viaGraphRecursionErrorisinstance check and handled gracefully.
User Story 7 - Slack Narrative Text Streaming (Priority: P1)β
As a user interacting via Slack, I want to see the model's intermediate narration (e.g., "I'll search the knowledge base for relevant information") in real-time, so that I understand what the system is doing between tool calls.
Why this priority: Previously, ALL text during intermediate plan steps was silently accumulated in step_thinking and never displayed. Users only saw "Thinking..." followed by the final answer, with no visibility into the multi-step process.
Design: Removed the continue statement that suppressed intermediate step STREAMING_RESULT events. Narrative text now falls through to the Slack streaming code. Post-tool echo suppression is handled separately by the any_subagent_completed flag, which only activates after non-RAG tools complete.
Key distinction: RAG tools (search, fetch_document, list_datasources, fetch_url) are excluded from the sub-agent echo suppression because their post-tool STREAMING_RESULT IS the synthesized answer.
Acceptance Scenarios:
- Given a RAG query with a plan, When the model generates narrative text before calling search/fetch_document, Then the narrative appears in the Slack stream in real-time.
- Given a sub-agent query (e.g., GitHub), When a non-RAG tool completes, Then post-tool echo text is suppressed (not shown in Slack), but the final FINAL_RESULT answer is displayed.
- Given a mixed query (RAG + sub-agent), When the RAG step produces narrative, Then the narrative streams to the user even after a sub-agent tool has completed in a prior step.
Edge Casesβ
- What happens when
DISTRIBUTED_MODE=truebut a remote agent container is unreachable? The system creates the remote A2A subagent definition anyway; theA2ARemoteAgentConnectToolfetches the agent card on first use and surfaces the connection error to the LLM. - What happens when an agent's MCP server fails to load tools in single-node (all-in-one) mode? The subagent is created with an empty tool list and a warning is logged. The LLM will be informed the subagent has no domain tools.
- What happens when
LANGGRAPH_DEVenvironment variable is set? The checkpointer attachment is skipped, allowing LangGraph Studio to manage its own checkpointer. - What happens when
DISTRIBUTED_AGENTS=argocdbutENABLE_ARGOCD=false? ArgoCD is skipped entirely. TheENABLE_*flags are evaluated first; only enabled agents are considered for distribution mode selection. - What happens when both
DISTRIBUTED_MODE=trueandDISTRIBUTED_AGENTS=argocdare set?DISTRIBUTED_AGENTStakes precedence (more specific wins), so only ArgoCD runs remotely.
Clarificationsβ
Session 2026-04-08β
- Q: Should RAG reliability (call caps, output truncation, SearchCapWrapper) be a separate user story, edge case, or separate spec? β A: New User Story within this spec (Option A)
- Q: What test level should the harness target for all-in-one, distributed, Slack, and streaming? β A: Both β unit/component tests (mocked deps, fast CI) + lightweight integration suite (Docker Compose test profile)
- Q: How should streaming event assertions work in unit tests? β A: Collect yielded events from the binding's async generator into a list, assert on artifact names/content/order
- Q: How should Slack message rendering be tested? β A: Both β StreamBuffer output for content/ordering logic + mock Slack WebClient for message lifecycle (create/update/finalize)
- Q: How should distributed mode be tested without real containers? β A: Both β mock HTTP responses (httpx patch for canned agent card + SSE streams) for fast unit tests + fake in-process A2A server (FastAPI with canned responses) for richer path testing
- Q: What RAG test fixture strategy should the harness use? β A: Both β mock BaseTool for wrapper cap/truncation/thread-scoping logic + JSON fixtures in
tests/fixtures/for integration tests exercising the full RAG β binding β streaming pipeline
Requirements (mandatory)β
Functional Requirementsβ
- FR-001: The system MUST support both single-node (all-in-one, in-process MCP tools) and distributed (remote A2A agents) modes using the same source files, toggled by
DISTRIBUTED_MODEorDISTRIBUTED_AGENTSenvironment variables. - FR-013: The system MUST support per-agent distribution via
DISTRIBUTED_AGENTS(comma-separated list of agent names). Agents in the list run as remote A2A subagents; all others load MCP tools in-process. The valuealldistributes every agent. - FR-014:
DISTRIBUTED_MODE=trueMUST be treated asDISTRIBUTED_AGENTS=allfor backward compatibility. If both are set,DISTRIBUTED_AGENTStakes precedence. - FR-002: The system MUST eliminate all
_singlevariant files (deep_agent_single.py,agent_single.py,agent_executor_single.py,main_single.py) by merging their logic into the original filenames (deep_agent.py,agent.py,agent_executor.py,main.py). - FR-003: The system MUST correctly extract the actual subagent name from
task()tool calls (viaargs.subagent_type) for tool notification display, instead of showing the generic "task" tool name. - FR-004: The system MUST support
GraphInterrupthandling andCommand-based resume for HITL workflows in both deployment modes. - FR-005: The system MUST use Bedrock-compatible tool call ID extraction (
_extract_tool_call_ids) in orphaned tool call repair, checkingtool_calls,additional_kwargs, and content blocks. - FR-006: The system MUST register the MAS instance with the skills middleware registry (
set_mas_instance) during binding initialization. - FR-007: The system MUST attempt persistent checkpointer/store backends (MongoDB, Redis) in both modes, falling back to
InMemorySaverwhen unavailable. - FR-008: The system MUST include
trace_idin completion status metadata and final result artifacts for client-side feedback/scoring. - FR-009: The system MUST mount the skills middleware REST API alongside A2A routes in the unified entry point.
- FR-010: The system MUST use lazy imports for agent classes in
deep_agent.pyto avoid requiring agent-specific PYTHONPATH at module import time (test compatibility). - FR-011: Both the
platform-engineerandplatform-engineer-singleCLI commands MUST point to the same unifiedmain:appentry point. - FR-012: The system MUST update all internal imports and external references from
_singlemodule paths to the original module paths. - FR-015: The system MUST enforce per-query call caps on
fetch_documentandsearchRAG tools viaFetchDocumentCapWrapperandSearchCapWrapper, returning a normal success string (not an exception) when the cap is hit. - FR-016: The system MUST truncate per-call RAG tool output to
RAG_MAX_OUTPUT_CHARSto prevent context window overflow. - FR-017: The system MUST cap per-call search results to
RAG_MAX_SEARCH_RESULTSregardless of the model's requestedlimitparameter. - FR-018: The system MUST make the LangGraph recursion limit configurable via
LANGGRAPH_RECURSION_LIMITenvironment variable. - FR-019: The system MUST detect
GraphRecursionErrorvia properisinstancecheck (with string-match fallback for older LangGraph versions). - FR-020: The Slack bot MUST stream intermediate plan step narrative text (e.g., "I'll search the knowledge base...") instead of silently accumulating it.
- FR-021: The Slack bot MUST distinguish RAG tools from sub-agent tools for post-tool echo suppression β RAG tool post-tool text MUST stream through; sub-agent post-tool text MUST be suppressed.
- FR-022: The system MUST auto-enable agent connectivity checks when
DISTRIBUTED_AGENTSis set (non-empty), without requiring explicitSKIP_AGENT_CONNECTIVITY_CHECK=false. - FR-023: Docker-compose MUST define a single
caipe-supervisorservice (not separate all-in-one and distributed services).
Key Entitiesβ
PlatformEngineerDeepAgent(aliasAIPlatformEngineerMAS): The multi-agent system definition. UsesDISTRIBUTED_AGENTS(or legacyDISTRIBUTED_MODE) to determine per-agent distribution mode. Each agent is independently routed to in-process MCP or remote A2A based on the list.AIPlatformEngineerA2ABinding: The A2A protocol binding that translates LangGraph events to A2A streaming artifacts. Handlestask()subagent resolution, execution plan tracking, and HITL interrupts.AIPlatformEngineerA2AExecutor: The A2A executor that manages task lifecycle (streaming, completion, error, user-input-required). WrapsStreamStatefor artifact dedup and trace propagation.StreamState: Dataclass tracking streaming state per request, including content accumulation, artifact IDs, sub-agent completion count, and trace ID.
Success Criteria (mandatory)β
Measurable Outcomesβ
- SC-001: Only one file per layer exists after unification (4 files total instead of 8), with zero
_singlevariant files remaining in the codebase. - SC-002: All existing synchronous tests pass without modification (async test failures due to missing
pytest-asynciodependency are pre-existing and not caused by unification). - SC-003: Tool notifications in the Slack and web UI display the correct subagent name (e.g., "jira", "github") instead of "task" for 100% of delegated tool calls.
- SC-004: HITL forms are correctly presented and resume works for self-service workflows in both single-node (all-in-one) and distributed modes.
- SC-005: The supervisor starts successfully in both
DISTRIBUTED_MODE=trueandDISTRIBUTED_MODE=falseconfigurations using the same Docker image and entry point. - SC-006: All import paths that previously referenced
_singlemodules continue to work (either via the unified file or because no external consumers existed). - SC-007: Setting
DISTRIBUTED_AGENTS=argocdresults in ArgoCD running as a remote A2A subagent while all other enabled agents load MCP tools in-process within the same supervisor process. - SC-008: Setting
DISTRIBUTED_AGENTS=allproduces identical behavior to the legacyDISTRIBUTED_MODE=true. - SC-009: A broad RAG query (e.g., "what is agntcy slim?") completes within 60 seconds without hitting the LangGraph recursion limit, and returns a synthesized answer.
- SC-010: When
FETCH_DOCUMENT_MAX_CALLS=10, the (N+1)th call returns a success message; the model stops callingfetch_documentand synthesizes. - SC-011: Narrative text like "I'll search the knowledge base for relevant information" appears in Slack in real-time during intermediate plan steps.
- SC-012: The final synthesized answer from the supervisor arrives in Slack as a non-empty
FINAL_RESULTartifact.
Implementation Progressβ
Completedβ
| File | Change | Stories |
|---|---|---|
rag_tools.py | FetchDocumentCapWrapper and SearchCapWrapper with success-string caps, output truncation, per-call search limit capping | US-6 |
deep_agent.py | Wired cap wrappers into tool loading; env-configurable limits | US-6 |
agent.py (a2a binding) | Configurable LANGGRAPH_RECURSION_LIMIT; GraphRecursionError isinstance detection | US-6 |
agent.py (a2a binding) | Fix final_response['content'] to include response_format_result; final_model_content fallback from ResponseFormat | US-7, SC-012 |
base_langgraph_agent.py | Configurable recursion limit | US-6 |
prompt_config.deep_agent.yaml | Updated RAG instructions: "up to 3-5 documents"; stop on cap messages | US-6 |
ai.py (slack_bot) | Removed continue that suppressed intermediate narrative; RAG tool echo pass-through | US-7 |
docker-compose.dev.yaml | Merged supervisor services; DISTRIBUTED_AGENTS, RAG env vars, LANGGRAPH_RECURSION_LIMIT=500 | US-1, US-6 |
agent_registry.py | Auto-enable connectivity check when DISTRIBUTED_AGENTS is set | US-1 |
test_final_result_content.py | 26 tests: FINAL_RESULT content propagation, dedup survival, regression guards | US-7, SC-012 |
test_binding_streaming_events.py | 11 tests: tool notification source_agent, execution plan, narrative streaming, event ordering | US-2, US-7 |
test_slack_narrative_streaming.py | 11 tests: event type parsing, RAG tool exclusion, _get_final_text priority | US-7, SC-011 |
test_distributed_mode_binding.py | 9 tests: DISTRIBUTED_AGENTS parsing edge cases, per-agent routing, ENABLE_* integration | US-5, SC-007 |
tests/fixtures/ | 4 JSON fixtures: rag_search_response, rag_fetch_document_response, a2a_agent_card, a2a_task_sse_stream | Test infra |
integration/test_streaming_harness.py | Integration test: health check, tool notifications, event ordering, final result content | SC-003, SC-012 |