Files
syndarix/docs/spikes/SPIKE-007-agent-communication-protocol.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

1497 lines
51 KiB
Markdown

# SPIKE-007: Agent Communication Protocol
**Status:** Completed
**Date:** 2025-12-29
**Author:** Architecture Team
**Related Issue:** #7
---
## Executive Summary
This spike researches inter-agent communication protocols for Syndarix, where 10+ specialized AI agents need to collaborate on software projects. After analyzing industry standards (A2A, ACP, MCP) and multi-agent system patterns, we recommend a **structured message-based protocol** built on the existing Redis Pub/Sub event bus.
### Recommendation
Adopt a **hybrid communication model** combining:
1. **Structured JSON-RPC messages** for request-response patterns
2. **Redis Pub/Sub channels** for broadcasts and topic-based routing
3. **Database-backed message persistence** for auditability and context recovery
4. **Priority queues via Celery** for async task delegation
This approach aligns with Google's A2A protocol principles while leveraging Syndarix's existing infrastructure (Redis, Celery, SSE events).
---
## Research Questions
### 1. What communication patterns work for AI multi-agent systems?
Industry research identifies four primary patterns:
| Pattern | Use Case | Syndarix Application |
|---------|----------|---------------------|
| **Request-Response** | Direct task delegation | Engineer asks Architect for guidance |
| **Publish-Subscribe** | Broadcasts, notifications | PO announces sprint goals to all agents |
| **Task Queue** | Async work delegation | PO assigns issues to Engineers |
| **Streaming** | Long-running updates | Agent streaming progress to observers |
**Key Insight:** Syndarix needs all four patterns. A2A protocol supports request-response and streaming; Celery handles task queues; Redis Pub/Sub provides pub-sub.
### 2. Structured message formats vs natural language between agents?
**Recommendation: Structured messages with natural language payload.**
```python
# Structured envelope with natural language content
{
"id": "msg-uuid-123",
"type": "request",
"from": {"agent_id": "eng-001", "role": "Engineer", "name": "Dave"},
"to": {"agent_id": "arch-001", "role": "Architect", "name": "Alex"},
"action": "request_guidance",
"priority": "normal",
"context": {
"project_id": "proj-123",
"issue_id": "issue-456",
"conversation_id": "conv-789"
},
"content": "I'm implementing the auth module. Should I use JWT with refresh tokens or session-based auth?",
"metadata": {
"created_at": "2025-12-29T10:00:00Z",
"expires_at": "2025-12-29T11:00:00Z",
"requires_response": true
}
}
```
**Rationale:**
- Structured envelope enables routing, filtering, and auditing
- Natural language content preserves LLM reasoning capabilities
- Matches A2A/ACP hybrid approach (structured headers, flexible payload)
- Enables agent-to-agent understanding without rigid schemas
### 3. How to handle async vs sync communication?
**Pattern Mapping:**
| Scenario | Timing | Implementation |
|----------|--------|----------------|
| Quick clarification | Sync-like (< 30s) | Request-response with timeout |
| Code review request | Async (minutes-hours) | Task queue + callback |
| Broadcast announcement | Fire-and-forget | Pub/Sub |
| Long-running analysis | Streaming | SSE with progress events |
**Implementation Strategy:**
```python
class MessageMode(str, Enum):
SYNC = "sync" # Await response (with timeout)
ASYNC = "async" # Queue for later, callback on completion
FIRE_AND_FORGET = "fire_and_forget" # No response expected
STREAM = "stream" # Continuous updates
```
### 4. Message routing strategies?
**Recommended: Hierarchical routing with three strategies.**
```
┌─────────────────────────────────────────────────────────────────┐
│ Message Router │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DIRECT │ │ ROLE-BASED │ │ BROADCAST │ │
│ │ Routing │ │ Routing │ │ Routing │ │
│ │ │ │ │ │ │ │
│ │ to: agent │ │ to: @role │ │ to: @all │ │
│ │ "arch-001" │ │ "@engineers"│ │ "@project" │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
| Strategy | Syntax | Use Case |
|----------|--------|----------|
| **Direct** | `to: "agent-123"` | Specific agent communication |
| **Role-based** | `to: "@engineers"` | All agents of a role |
| **Broadcast** | `to: "@all"` | Project-wide announcements |
| **Topic-based** | `to: "#auth-module"` | Agents subscribed to topic |
### 5. How to maintain conversation context across agent interactions?
**Three-tier context management:**
```
┌─────────────────────────────────────────────────────────────────┐
│ Context Hierarchy │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ CONVERSATION CONTEXT (Short-term) │ │
│ │ - Current message thread │ │
│ │ - Last N exchanges between agents │ │
│ │ - Active topic/issue context │ │
│ │ Storage: In-memory + Redis (TTL: 1 hour) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ SESSION CONTEXT (Medium-term) │ │
│ │ - Sprint goals and decisions │ │
│ │ - Shared agreements between agents │ │
│ │ - Recent artifacts and references │ │
│ │ Storage: PostgreSQL AgentMessage table │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PROJECT CONTEXT (Long-term) │ │
│ │ - Architecture decisions (ADRs) │ │
│ │ - Requirements and constraints │ │
│ │ - Knowledge base documents │ │
│ │ Storage: PostgreSQL + pgvector (RAG) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
**Context injection pattern:**
```python
async def prepare_agent_context(
agent_id: str,
conversation_id: str,
project_id: str
) -> list[dict]:
"""Build context for LLM call."""
context = []
# 1. Conversation context (recent exchanges)
recent_messages = await get_conversation_history(
conversation_id, limit=10
)
context.extend(format_as_messages(recent_messages))
# 2. Session context (relevant decisions)
session_context = await get_session_context(agent_id, project_id)
if session_context:
context.append({
"role": "system",
"content": f"Session context:\n{session_context}"
})
# 3. Project context (RAG retrieval)
query = extract_query_from_conversation(recent_messages)
rag_results = await search_knowledge_base(project_id, query)
if rag_results:
context.append({
"role": "system",
"content": f"Relevant project context:\n{rag_results}"
})
return context
```
### 6. Conflict resolution when agents disagree?
**Hierarchical escalation model:**
```
┌─────────────────────────────────────────────────────────────────┐
│ Conflict Resolution Hierarchy │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Level 1: Direct Negotiation │
│ ├── Agents exchange arguments (max 3 rounds) │
│ ├── If consensus → Resolution recorded │
│ └── If no consensus → Escalate to Level 2 │
│ │
│ Level 2: Expert Arbitration │
│ ├── Relevant expert agent weighs in (Architect for tech) │
│ ├── Expert makes binding recommendation │
│ └── If expert unavailable → Escalate to Level 3 │
│ │
│ Level 3: Human Decision │
│ ├── Conflict summarized and sent to human client │
│ ├── Approval event triggers resolution │
│ └── Decision recorded for future reference │
│ │
└─────────────────────────────────────────────────────────────────┘
```
**Implementation:**
```python
class ConflictResolution(str, Enum):
NEGOTIATION = "negotiation"
EXPERT_ARBITRATION = "expert_arbitration"
HUMAN_DECISION = "human_decision"
class ConflictTracker:
async def record_conflict(
self,
participants: list[str],
topic: str,
positions: dict[str, str]
) -> Conflict:
"""Record a conflict between agents."""
return await self.db.create(Conflict(
participants=participants,
topic=topic,
positions=positions,
status=ConflictStatus.OPEN,
resolution_level=ConflictResolution.NEGOTIATION,
negotiation_rounds=0
))
async def escalate(self, conflict_id: str) -> Conflict:
"""Escalate conflict to next resolution level."""
conflict = await self.get(conflict_id)
if conflict.resolution_level == ConflictResolution.NEGOTIATION:
# Find relevant expert
expert = await self.find_expert_for_topic(conflict.topic)
conflict.arbitrator_id = expert.id
conflict.resolution_level = ConflictResolution.EXPERT_ARBITRATION
elif conflict.resolution_level == ConflictResolution.EXPERT_ARBITRATION:
# Create human approval request
await self.create_approval_request(conflict)
conflict.resolution_level = ConflictResolution.HUMAN_DECISION
return await self.db.update(conflict)
```
### 7. How to audit/log inter-agent communication?
**Comprehensive audit trail:**
```python
class MessageAuditLog:
"""All agent messages are persisted for audit."""
id = Column(UUID, primary_key=True)
message_id = Column(UUID, index=True)
# Routing info
from_agent_id = Column(UUID, ForeignKey("agent_instances.id"))
to_agent_id = Column(UUID, nullable=True) # NULL for broadcasts
to_role = Column(String, nullable=True)
# Message details
message_type = Column(Enum(MessageType))
action = Column(String(50))
content_hash = Column(String(64)) # SHA-256 of content
content = Column(Text) # Full content (encrypted at rest)
# Context
project_id = Column(UUID, ForeignKey("projects.id"), index=True)
conversation_id = Column(UUID, index=True)
parent_message_id = Column(UUID, nullable=True)
# Response tracking
response_to_id = Column(UUID, nullable=True)
response_received_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
delivered_at = Column(DateTime, nullable=True)
read_at = Column(DateTime, nullable=True)
```
---
## Message Format Specification
### Core Message Schema
```python
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
from uuid import UUID
class AgentIdentifier(BaseModel):
"""Identifies an agent in communication."""
agent_id: UUID
role: str
name: str
class MessageContext(BaseModel):
"""Contextual information for message routing."""
project_id: UUID
conversation_id: UUID
issue_id: Optional[UUID] = None
sprint_id: Optional[UUID] = None
parent_message_id: Optional[UUID] = None
class MessageMetadata(BaseModel):
"""Message metadata for processing."""
created_at: datetime = Field(default_factory=datetime.utcnow)
expires_at: Optional[datetime] = None
priority: Literal["urgent", "high", "normal", "low"] = "normal"
requires_response: bool = False
response_timeout_seconds: Optional[int] = None
retry_count: int = 0
max_retries: int = 3
class AgentMessage(BaseModel):
"""Primary message format for inter-agent communication."""
# Identity
id: UUID = Field(default_factory=uuid4)
type: Literal["request", "response", "notification", "broadcast", "stream"]
# Routing
sender: AgentIdentifier
recipient: Optional[AgentIdentifier] = None # None for broadcasts
recipient_role: Optional[str] = None # For role-based routing
recipient_all: bool = False # For project-wide broadcast
# Action
action: str # e.g., "request_review", "report_bug", "share_finding"
# Content
content: str # Natural language message
attachments: list[Attachment] = [] # Files, code snippets, etc.
# Context
context: MessageContext
# Metadata
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
# Response linking
in_response_to: Optional[UUID] = None
class Attachment(BaseModel):
"""Attachments to messages."""
type: Literal["code", "file", "image", "document", "reference"]
name: str
content: Optional[str] = None # For inline content
url: Optional[str] = None # For file references
mime_type: Optional[str] = None
```
### Action Types
```python
class MessageAction(str, Enum):
# Request actions
REQUEST_REVIEW = "request_review"
REQUEST_GUIDANCE = "request_guidance"
REQUEST_CLARIFICATION = "request_clarification"
REQUEST_APPROVAL = "request_approval"
DELEGATE_TASK = "delegate_task"
# Response actions
PROVIDE_REVIEW = "provide_review"
PROVIDE_GUIDANCE = "provide_guidance"
PROVIDE_CLARIFICATION = "provide_clarification"
APPROVE = "approve"
REJECT = "reject"
# Notification actions
REPORT_BUG = "report_bug"
REPORT_PROGRESS = "report_progress"
REPORT_BLOCKER = "report_blocker"
SHARE_FINDING = "share_finding"
ANNOUNCE_DECISION = "announce_decision"
# Collaboration actions
PROPOSE_APPROACH = "propose_approach"
CHALLENGE_APPROACH = "challenge_approach"
AGREE_WITH = "agree_with"
DISAGREE_WITH = "disagree_with"
```
---
## Communication Patterns
### Pattern 1: Request-Response
**Use Case:** Engineer asks Architect for design guidance.
```python
# Engineer sends request
request = AgentMessage(
type="request",
sender=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
recipient=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
action="request_guidance",
content="I need to implement caching for the user service. Should I use Redis with write-through or write-behind strategy?",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
issue_id=issue_id
),
metadata=MessageMetadata(
priority="normal",
requires_response=True,
response_timeout_seconds=300
)
)
# Send and await response
response = await message_router.send_and_wait(request, timeout=300)
# Architect's response
response = AgentMessage(
type="response",
sender=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
recipient=request.sender,
action="provide_guidance",
content="Given our consistency requirements, use write-through caching with Redis. Here's the pattern...",
in_response_to=request.id,
context=request.context,
attachments=[
Attachment(type="code", name="cache_pattern.py", content="...")
]
)
```
### Pattern 2: Broadcast
**Use Case:** Product Owner announces sprint goals.
```python
broadcast = AgentMessage(
type="broadcast",
sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
recipient_all=True,
action="announce_decision",
content="Sprint 3 goals: 1) Complete auth module 2) Add user settings page 3) Fix critical bugs from QA",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
sprint_id=sprint_id
),
metadata=MessageMetadata(
priority="high",
requires_response=False
)
)
await message_router.broadcast(broadcast)
```
### Pattern 3: Role-Based Routing
**Use Case:** QA reports a bug to all Engineers.
```python
bug_report = AgentMessage(
type="notification",
sender=AgentIdentifier(agent_id=qa_id, role="QA", name="Quinn"),
recipient_role="Engineer", # All engineers receive this
action="report_bug",
content="Found a critical bug in the auth flow: users can bypass 2FA by...",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
issue_id=bug_issue_id
),
metadata=MessageMetadata(
priority="urgent",
requires_response=True
)
)
await message_router.send_to_role(bug_report)
```
### Pattern 4: Task Delegation
**Use Case:** Product Owner assigns work to Engineer.
```python
delegation = AgentMessage(
type="request",
sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
recipient=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
action="delegate_task",
content="Please implement issue #45: Add password reset functionality",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
issue_id=issue_45_id
),
metadata=MessageMetadata(
priority="normal",
requires_response=True
)
)
# This creates a Celery task for async execution
await message_router.delegate_task(delegation)
```
### Pattern 5: Streaming Updates
**Use Case:** Engineer shares progress during long task.
```python
async def stream_progress(agent_id: str, task_id: str):
"""Stream progress updates during task execution."""
for step in task_steps:
# Execute step
result = await execute_step(step)
# Stream update
update = AgentMessage(
type="stream",
sender=AgentIdentifier(agent_id=agent_id, ...),
recipient_all=True,
action="report_progress",
content=f"Completed: {step.name}. Progress: {step.percent}%",
context=MessageContext(project_id=project_id, ...),
)
await message_router.stream(update)
```
---
## Database Schema
### Message Storage
```python
# app/models/agent_message.py
from sqlalchemy import Column, String, Text, DateTime, Enum, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.db.base import Base
class AgentMessage(Base):
"""Persistent storage for all agent messages."""
__tablename__ = "agent_messages"
# Primary key
id = Column(UUID, primary_key=True, default=uuid4)
# Message type
type = Column(
Enum("request", "response", "notification", "broadcast", "stream",
name="message_type"),
nullable=False
)
# Sender
sender_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
sender_role = Column(String(50), nullable=False)
sender_name = Column(String(100), nullable=False)
# Recipient (nullable for broadcasts)
recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True)
recipient_role = Column(String(50), nullable=True) # For role-based routing
is_broadcast = Column(Boolean, default=False)
# Action
action = Column(String(50), nullable=False)
# Content
content = Column(Text, nullable=False)
content_hash = Column(String(64), nullable=False) # SHA-256
attachments = Column(JSONB, default=list)
# Context
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
conversation_id = Column(UUID, nullable=False)
issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
parent_message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)
# Response linking
in_response_to_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)
# Metadata
priority = Column(
Enum("urgent", "high", "normal", "low", name="priority_level"),
default="normal"
)
requires_response = Column(Boolean, default=False)
response_timeout_seconds = Column(Integer, nullable=True)
# Status tracking
status = Column(
Enum("pending", "delivered", "read", "responded", "expired", "failed",
name="message_status"),
default="pending"
)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
delivered_at = Column(DateTime, nullable=True)
read_at = Column(DateTime, nullable=True)
responded_at = Column(DateTime, nullable=True)
expires_at = Column(DateTime, nullable=True)
# Indexes
__table_args__ = (
Index("ix_agent_messages_project_conversation", "project_id", "conversation_id"),
Index("ix_agent_messages_sender", "sender_agent_id", "created_at"),
Index("ix_agent_messages_recipient", "recipient_agent_id", "status"),
Index("ix_agent_messages_conversation", "conversation_id", "created_at"),
)
class Conversation(Base):
"""Groups related messages into conversations."""
__tablename__ = "conversations"
id = Column(UUID, primary_key=True, default=uuid4)
# Context
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
# Participants
participant_ids = Column(JSONB, default=list) # List of agent IDs
# Topic
topic = Column(String(200), nullable=True)
# Status
status = Column(
Enum("active", "resolved", "archived", name="conversation_status"),
default="active"
)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
resolved_at = Column(DateTime, nullable=True)
# Summary (generated)
summary = Column(Text, nullable=True)
class MessageDelivery(Base):
"""Tracks delivery status for broadcast messages."""
__tablename__ = "message_deliveries"
id = Column(UUID, primary_key=True, default=uuid4)
message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=False)
recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
# Status
delivered_at = Column(DateTime, nullable=True)
read_at = Column(DateTime, nullable=True)
responded_at = Column(DateTime, nullable=True)
```
### Alembic Migration
```python
# migrations/versions/xxx_add_agent_messages.py
def upgrade():
# Create enum types
op.execute("""
CREATE TYPE message_type AS ENUM (
'request', 'response', 'notification', 'broadcast', 'stream'
)
""")
op.execute("""
CREATE TYPE priority_level AS ENUM ('urgent', 'high', 'normal', 'low')
""")
op.execute("""
CREATE TYPE message_status AS ENUM (
'pending', 'delivered', 'read', 'responded', 'expired', 'failed'
)
""")
op.execute("""
CREATE TYPE conversation_status AS ENUM ('active', 'resolved', 'archived')
""")
# Create tables
op.create_table(
'conversations',
sa.Column('id', UUID, primary_key=True),
sa.Column('project_id', UUID, sa.ForeignKey('projects.id'), nullable=False),
sa.Column('issue_id', UUID, sa.ForeignKey('issues.id'), nullable=True),
sa.Column('sprint_id', UUID, sa.ForeignKey('sprints.id'), nullable=True),
sa.Column('participant_ids', JSONB, default=[]),
sa.Column('topic', sa.String(200), nullable=True),
sa.Column('status', sa.Enum('active', 'resolved', 'archived',
name='conversation_status'), default='active'),
sa.Column('created_at', sa.DateTime, default=datetime.utcnow),
sa.Column('updated_at', sa.DateTime, onupdate=datetime.utcnow),
sa.Column('resolved_at', sa.DateTime, nullable=True),
sa.Column('summary', sa.Text, nullable=True),
)
op.create_table(
'agent_messages',
# ... columns as defined above
)
op.create_table(
'message_deliveries',
# ... columns as defined above
)
```
---
## Code Examples
### Message Router Service
```python
# app/services/message_router.py
from typing import Optional, AsyncIterator
from uuid import UUID
import asyncio
import hashlib
import json
from app.models.agent_message import AgentMessage as AgentMessageModel
from app.schemas.messages import AgentMessage, AgentIdentifier
from app.services.events import EventBus
from app.db.session import AsyncSession
from app.core.celery_app import celery_app
class MessageRouter:
"""Routes messages between agents."""
def __init__(
self,
db: AsyncSession,
event_bus: EventBus,
orchestrator: "AgentOrchestrator"
):
self.db = db
self.event_bus = event_bus
self.orchestrator = orchestrator
async def send(self, message: AgentMessage) -> AgentMessageModel:
"""Send a message to a specific agent."""
# Validate recipient exists and is active
recipient = await self.orchestrator.get_instance(message.recipient.agent_id)
if not recipient or recipient.status != "active":
raise AgentNotAvailableError(f"Agent {message.recipient.agent_id} not available")
# Persist message
db_message = await self._persist_message(message)
# Publish to recipient's channel
await self.event_bus.publish(
f"agent:{message.recipient.agent_id}",
{
"type": "message_received",
"message_id": str(db_message.id),
"sender": message.sender.dict(),
"action": message.action,
"priority": message.metadata.priority,
"preview": message.content[:100]
}
)
# Also publish to project channel for monitoring
await self.event_bus.publish(
f"project:{message.context.project_id}",
{
"type": "agent_message",
"from": message.sender.name,
"to": message.recipient.name,
"action": message.action
}
)
return db_message
async def send_and_wait(
self,
message: AgentMessage,
timeout: int = 300
) -> Optional[AgentMessage]:
"""Send a message and wait for response."""
db_message = await self.send(message)
# Subscribe to response channel
response_channel = f"response:{db_message.id}"
subscriber = await self.event_bus.subscribe(response_channel)
try:
response_event = await asyncio.wait_for(
subscriber.get_event(),
timeout=timeout
)
response_id = response_event.data["response_id"]
return await self.get_message(response_id)
except asyncio.TimeoutError:
# Update message status to expired
await self._mark_expired(db_message.id)
return None
finally:
await subscriber.unsubscribe()
async def send_to_role(self, message: AgentMessage) -> list[AgentMessageModel]:
"""Send message to all agents of a specific role."""
# Get all active agents with the target role
recipients = await self.orchestrator.get_instances_by_role(
project_id=message.context.project_id,
role=message.recipient_role
)
messages = []
for recipient in recipients:
msg = message.copy()
msg.recipient = AgentIdentifier(
agent_id=recipient.id,
role=recipient.agent_type.role,
name=recipient.name
)
db_message = await self.send(msg)
messages.append(db_message)
return messages
async def broadcast(self, message: AgentMessage) -> AgentMessageModel:
"""Broadcast message to all project agents."""
# Persist the broadcast message
db_message = await self._persist_message(message)
# Get all active agents in project
agents = await self.orchestrator.get_project_agents(
message.context.project_id
)
# Create delivery records
for agent in agents:
if agent.id != message.sender.agent_id:
await self._create_delivery_record(db_message.id, agent.id)
# Publish to project channel
await self.event_bus.publish(
f"project:{message.context.project_id}",
{
"type": "broadcast",
"message_id": str(db_message.id),
"sender": message.sender.dict(),
"action": message.action,
"content": message.content
}
)
return db_message
async def delegate_task(self, message: AgentMessage) -> str:
"""Delegate a task for async execution via Celery."""
# Persist message
db_message = await self._persist_message(message)
# Create Celery task
task = celery_app.send_task(
"app.tasks.agent.execute_delegated_task",
args=[str(db_message.id), str(message.recipient.agent_id)],
queue="agent_tasks",
priority=self._priority_to_int(message.metadata.priority)
)
return task.id
async def respond(
self,
original_message_id: UUID,
response: AgentMessage
) -> AgentMessageModel:
"""Send a response to a previous message."""
original = await self.get_message(original_message_id)
response.in_response_to = original_message_id
response.context = original.context
db_response = await self.send(response)
# Update original message status
await self._mark_responded(original_message_id)
# Publish response notification
await self.event_bus.publish(
f"response:{original_message_id}",
{
"type": "response_received",
"response_id": str(db_response.id)
}
)
return db_response
async def get_conversation_history(
self,
conversation_id: UUID,
limit: int = 50,
before: Optional[datetime] = None
) -> list[AgentMessageModel]:
"""Get messages in a conversation."""
query = select(AgentMessageModel).where(
AgentMessageModel.conversation_id == conversation_id
)
if before:
query = query.where(AgentMessageModel.created_at < before)
query = query.order_by(AgentMessageModel.created_at.desc()).limit(limit)
result = await self.db.execute(query)
return list(reversed(result.scalars().all()))
async def _persist_message(self, message: AgentMessage) -> AgentMessageModel:
"""Persist message to database."""
content_hash = hashlib.sha256(message.content.encode()).hexdigest()
db_message = AgentMessageModel(
id=message.id,
type=message.type,
sender_agent_id=message.sender.agent_id,
sender_role=message.sender.role,
sender_name=message.sender.name,
recipient_agent_id=message.recipient.agent_id if message.recipient else None,
recipient_role=message.recipient_role,
is_broadcast=message.recipient_all,
action=message.action,
content=message.content,
content_hash=content_hash,
attachments=[a.dict() for a in message.attachments],
project_id=message.context.project_id,
conversation_id=message.context.conversation_id,
issue_id=message.context.issue_id,
sprint_id=message.context.sprint_id,
parent_message_id=message.context.parent_message_id,
in_response_to_id=message.in_response_to,
priority=message.metadata.priority,
requires_response=message.metadata.requires_response,
response_timeout_seconds=message.metadata.response_timeout_seconds,
expires_at=message.metadata.expires_at,
)
self.db.add(db_message)
await self.db.commit()
await self.db.refresh(db_message)
return db_message
def _priority_to_int(self, priority: str) -> int:
"""Convert priority to Celery priority (0=highest)."""
return {"urgent": 0, "high": 3, "normal": 6, "low": 9}.get(priority, 6)
```
### Receiving and Processing Messages
```python
# app/services/agent_inbox.py
from typing import Optional, Callable, Awaitable
class AgentInbox:
"""Manages incoming messages for an agent."""
def __init__(
self,
agent_id: UUID,
event_bus: EventBus,
message_router: MessageRouter
):
self.agent_id = agent_id
self.event_bus = event_bus
self.message_router = message_router
self._handlers: dict[str, Callable] = {}
def on_action(
self,
action: str,
handler: Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]]
):
"""Register a handler for a specific action type."""
self._handlers[action] = handler
async def start_listening(self):
"""Start listening for incoming messages."""
channel = f"agent:{self.agent_id}"
subscriber = await self.event_bus.subscribe(channel)
while True:
event = await subscriber.get_event()
if event.type == "message_received":
message = await self.message_router.get_message(
event.data["message_id"]
)
await self._process_message(message)
async def _process_message(self, message: AgentMessage):
"""Process an incoming message."""
# Mark as read
await self.message_router.mark_read(message.id)
# Find handler
handler = self._handlers.get(message.action)
if not handler:
handler = self._handlers.get("default")
if handler:
response = await handler(message)
if response and message.requires_response:
await self.message_router.respond(message.id, response)
# Usage in agent runner
class AgentRunner:
async def setup_message_handlers(self):
"""Setup handlers for different message types."""
@self.inbox.on_action("request_review")
async def handle_review_request(message: AgentMessage) -> AgentMessage:
# Execute code review
review_result = await self.execute("review_code", {
"content": message.content,
"attachments": message.attachments
})
return AgentMessage(
type="response",
sender=self.identifier,
recipient=message.sender,
action="provide_review",
content=review_result["content"],
attachments=review_result.get("attachments", [])
)
@self.inbox.on_action("delegate_task")
async def handle_task_delegation(message: AgentMessage) -> AgentMessage:
# Accept and start working on task
await self.update_status("working")
# Execute task
result = await self.execute("implement", {
"task_description": message.content,
"issue_id": message.context.issue_id
})
return AgentMessage(
type="response",
sender=self.identifier,
action="task_completed",
content=f"Task completed. {result['summary']}"
)
```
### API Endpoints
```python
# app/api/v1/messages.py
from fastapi import APIRouter, Depends, HTTPException
from uuid import UUID
from app.schemas.messages import AgentMessage, MessageCreate, MessageResponse
from app.services.message_router import MessageRouter
from app.api.deps import get_message_router, get_current_user
router = APIRouter(prefix="/messages", tags=["messages"])
@router.post("/send", response_model=MessageResponse)
async def send_message(
message_in: MessageCreate,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Send a message between agents (admin/system use)."""
message = AgentMessage(**message_in.dict())
result = await router.send(message)
return MessageResponse.from_orm(result)
@router.post("/broadcast", response_model=MessageResponse)
async def broadcast_message(
message_in: MessageCreate,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Broadcast a message to all project agents."""
message = AgentMessage(**message_in.dict(), recipient_all=True)
result = await router.broadcast(message)
return MessageResponse.from_orm(result)
@router.get("/conversation/{conversation_id}", response_model=list[MessageResponse])
async def get_conversation(
conversation_id: UUID,
limit: int = 50,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Get messages in a conversation."""
messages = await router.get_conversation_history(conversation_id, limit=limit)
return [MessageResponse.from_orm(m) for m in messages]
@router.get("/agent/{agent_id}/inbox", response_model=list[MessageResponse])
async def get_agent_inbox(
agent_id: UUID,
status: Optional[str] = None,
limit: int = 50,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Get messages in an agent's inbox."""
messages = await router.get_agent_messages(agent_id, status=status, limit=limit)
return [MessageResponse.from_orm(m) for m in messages]
```
---
## SSE Integration
### Real-time Message Events
Messages integrate with the existing SSE event system (ADR-002):
```python
# Extended event types for messages
class EventType(str, Enum):
# Existing events...
# Message Events
MESSAGE_SENT = "message_sent"
MESSAGE_RECEIVED = "message_received"
MESSAGE_READ = "message_read"
MESSAGE_RESPONDED = "message_responded"
# Conversation Events
CONVERSATION_STARTED = "conversation_started"
CONVERSATION_RESOLVED = "conversation_resolved"
# Conflict Events
CONFLICT_DETECTED = "conflict_detected"
CONFLICT_ESCALATED = "conflict_escalated"
CONFLICT_RESOLVED = "conflict_resolved"
```
### Message Event Stream
```python
# app/api/v1/events.py (extended)
@router.get("/projects/{project_id}/messages/events")
async def message_events(
project_id: UUID,
request: Request,
current_user: User = Depends(get_current_user)
):
"""Stream message events for a project."""
async def event_generator():
subscriber = await event_bus.subscribe(f"project:{project_id}:messages")
try:
while not await request.is_disconnected():
try:
event = await asyncio.wait_for(
subscriber.get_event(),
timeout=30.0
)
# Filter by event type
if event.type in [
"message_sent", "message_received",
"conversation_started", "conflict_detected"
]:
yield f"event: {event.type}\ndata: {json.dumps(event.data)}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
await subscriber.unsubscribe()
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
```
### Frontend Integration
```typescript
// frontend/lib/messageEvents.ts
import { useEffect, useCallback } from 'react';
interface MessageEvent {
type: 'message_sent' | 'message_received' | 'conversation_started';
data: {
message_id?: string;
from?: AgentIdentifier;
to?: AgentIdentifier;
action?: string;
preview?: string;
};
}
export function useMessageEvents(
projectId: string,
onMessage: (event: MessageEvent) => void
) {
useEffect(() => {
const eventSource = new EventSource(
`/api/v1/projects/${projectId}/messages/events`,
{ withCredentials: true }
);
eventSource.addEventListener('message_sent', (e) => {
onMessage({ type: 'message_sent', data: JSON.parse(e.data) });
});
eventSource.addEventListener('message_received', (e) => {
onMessage({ type: 'message_received', data: JSON.parse(e.data) });
});
eventSource.addEventListener('conversation_started', (e) => {
onMessage({ type: 'conversation_started', data: JSON.parse(e.data) });
});
eventSource.onerror = () => {
// Reconnection handled by EventSource
console.log('Message SSE reconnecting...');
};
return () => eventSource.close();
}, [projectId, onMessage]);
}
```
---
## Syndarix-Specific Requirements
### @Mentions Support
```python
class MentionParser:
"""Parse @mentions in message content."""
MENTION_PATTERN = r'@(\w+)'
ROLE_PATTERN = r'@(engineers?|architects?|qa|po|pm|devops)'
def parse(self, content: str) -> list[Mention]:
mentions = []
# Parse agent mentions
for match in re.finditer(self.MENTION_PATTERN, content):
name = match.group(1)
mentions.append(Mention(type="agent", name=name, position=match.start()))
# Parse role mentions
for match in re.finditer(self.ROLE_PATTERN, content, re.IGNORECASE):
role = match.group(1)
mentions.append(Mention(type="role", role=role, position=match.start()))
return mentions
async def resolve_mentions(
self,
mentions: list[Mention],
project_id: UUID,
orchestrator: AgentOrchestrator
) -> list[UUID]:
"""Resolve mentions to agent IDs."""
agent_ids = set()
for mention in mentions:
if mention.type == "agent":
agent = await orchestrator.get_by_name(project_id, mention.name)
if agent:
agent_ids.add(agent.id)
elif mention.type == "role":
agents = await orchestrator.get_instances_by_role(
project_id, mention.role
)
agent_ids.update(a.id for a in agents)
return list(agent_ids)
```
### Priority Message Handling
```python
class PriorityMessageHandler:
"""Handle urgent messages with priority."""
PRIORITY_INTERRUPTS = {"urgent", "high"}
async def process_with_priority(
self,
agent_id: UUID,
message: AgentMessage
):
"""Process message based on priority."""
if message.metadata.priority in self.PRIORITY_INTERRUPTS:
# Interrupt current task
await self.orchestrator.interrupt_agent(agent_id)
# Publish urgent notification
await self.event_bus.publish(
f"project:{message.context.project_id}",
{
"type": "urgent_message",
"agent_id": str(agent_id),
"message_id": str(message.id),
"from": message.sender.name,
"preview": message.content[:100]
}
)
# Process immediately
await self.process_immediately(agent_id, message)
else:
# Queue for normal processing
await self.queue_for_processing(agent_id, message)
```
### Communication Traceability
```python
class CommunicationTracer:
"""Trace all inter-agent communication."""
async def trace_message_flow(
self,
conversation_id: UUID
) -> MessageFlowTrace:
"""Generate a trace of message flow in a conversation."""
messages = await self.get_conversation_messages(conversation_id)
nodes = []
edges = []
for msg in messages:
nodes.append({
"id": str(msg.id),
"agent": msg.sender_name,
"role": msg.sender_role,
"action": msg.action,
"timestamp": msg.created_at.isoformat()
})
if msg.in_response_to_id:
edges.append({
"from": str(msg.in_response_to_id),
"to": str(msg.id),
"type": "response"
})
if msg.parent_message_id:
edges.append({
"from": str(msg.parent_message_id),
"to": str(msg.id),
"type": "thread"
})
return MessageFlowTrace(
conversation_id=conversation_id,
nodes=nodes,
edges=edges,
summary=await self.generate_summary(messages)
)
```
---
## Performance Considerations
### Message Throughput
| Scenario | Target | Implementation |
|----------|--------|----------------|
| Direct messages | 100/sec | Redis Pub/Sub |
| Broadcasts | 10/sec | Batched delivery |
| Task delegation | 50/sec | Celery queue |
| Message persistence | 200/sec | Batch inserts |
### Scalability
```python
# Message batching for high-throughput scenarios
class MessageBatcher:
def __init__(self, batch_size: int = 100, flush_interval: float = 0.1):
self.batch_size = batch_size
self.flush_interval = flush_interval
self._buffer: list[AgentMessage] = []
self._lock = asyncio.Lock()
async def add(self, message: AgentMessage):
async with self._lock:
self._buffer.append(message)
if len(self._buffer) >= self.batch_size:
await self._flush()
async def _flush(self):
if not self._buffer:
return
messages = self._buffer
self._buffer = []
# Batch insert
await self.db.execute(
insert(AgentMessageModel),
[self._to_dict(m) for m in messages]
)
await self.db.commit()
```
### Memory Management
- Conversation history limited to last 50 messages in-memory
- Older messages retrieved on-demand from database
- Redis TTL of 1 hour for active conversation cache
- Periodic archival of resolved conversations
---
## References
### Industry Protocols Researched
- [Google A2A Protocol](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/) - Agent-to-Agent Protocol
- [IBM ACP](https://arxiv.org/html/2505.02279v1) - Agent Communication Protocol
- [Anthropic MCP](https://spec.modelcontextprotocol.io/) - Model Context Protocol
- [Linux Foundation A2A Project](https://www.linuxfoundation.org/press/linux-foundation-launches-the-agent2agent-protocol-project-to-enable-secure-intelligent-communication-between-ai-agents)
### Multi-Agent System Research
- [Multi-Agent Collaboration Mechanisms Survey](https://arxiv.org/html/2501.06322v1)
- [Conflict Resolution in Multi-Agent Systems](https://zilliz.com/ai-faq/how-do-multiagent-systems-manage-conflict-resolution)
- [Agent Memory Management](https://www.letta.com/blog/agent-memory)
- [Context Engineering for LLM Agents](https://developers.googleblog.com/architecting-efficient-context-aware-multi-agent-framework-for-production/)
### Syndarix Architecture
- [ADR-002: Real-time Communication](../adrs/ADR-002-realtime-communication.md)
- [ADR-006: Agent Orchestration](../adrs/ADR-006-agent-orchestration.md)
- [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md)
---
## Decision
**Adopt a hybrid message-based communication protocol** with:
1. **Structured JSON messages** with natural language content
2. **Three routing strategies**: Direct, Role-based, Broadcast
3. **Redis Pub/Sub** for real-time delivery
4. **PostgreSQL** for message persistence and audit
5. **Celery** for async task delegation
6. **SSE** for client notifications
7. **Three-tier context management** for conversation continuity
8. **Hierarchical conflict resolution** with human escalation
This protocol integrates seamlessly with existing Syndarix infrastructure while following industry best practices from A2A and ACP protocols.
---
*Spike completed. Findings will inform implementation of inter-agent communication in the Agent Orchestration layer.*