Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
51 KiB
SPIKE-007: Agent Communication Protocol
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #7
Executive Summary
This spike researches inter-agent communication protocols for Syndarix, where 10+ specialized AI agents need to collaborate on software projects. After analyzing industry standards (A2A, ACP, MCP) and multi-agent system patterns, we recommend a structured message-based protocol built on the existing Redis Pub/Sub event bus.
Recommendation
Adopt a hybrid communication model combining:
- Structured JSON-RPC messages for request-response patterns
- Redis Pub/Sub channels for broadcasts and topic-based routing
- Database-backed message persistence for auditability and context recovery
- Priority queues via Celery for async task delegation
This approach aligns with Google's A2A protocol principles while leveraging Syndarix's existing infrastructure (Redis, Celery, SSE events).
Research Questions
1. What communication patterns work for AI multi-agent systems?
Industry research identifies four primary patterns:
| Pattern | Use Case | Syndarix Application |
|---|---|---|
| Request-Response | Direct task delegation | Engineer asks Architect for guidance |
| Publish-Subscribe | Broadcasts, notifications | PO announces sprint goals to all agents |
| Task Queue | Async work delegation | PO assigns issues to Engineers |
| Streaming | Long-running updates | Agent streaming progress to observers |
Key Insight: Syndarix needs all four patterns. A2A protocol supports request-response and streaming; Celery handles task queues; Redis Pub/Sub provides pub-sub.
2. Structured message formats vs natural language between agents?
Recommendation: Structured messages with natural language payload.
# Structured envelope with natural language content
{
"id": "msg-uuid-123",
"type": "request",
"from": {"agent_id": "eng-001", "role": "Engineer", "name": "Dave"},
"to": {"agent_id": "arch-001", "role": "Architect", "name": "Alex"},
"action": "request_guidance",
"priority": "normal",
"context": {
"project_id": "proj-123",
"issue_id": "issue-456",
"conversation_id": "conv-789"
},
"content": "I'm implementing the auth module. Should I use JWT with refresh tokens or session-based auth?",
"metadata": {
"created_at": "2025-12-29T10:00:00Z",
"expires_at": "2025-12-29T11:00:00Z",
"requires_response": true
}
}
Rationale:
- Structured envelope enables routing, filtering, and auditing
- Natural language content preserves LLM reasoning capabilities
- Matches A2A/ACP hybrid approach (structured headers, flexible payload)
- Enables agent-to-agent understanding without rigid schemas
3. How to handle async vs sync communication?
Pattern Mapping:
| Scenario | Timing | Implementation |
|---|---|---|
| Quick clarification | Sync-like (< 30s) | Request-response with timeout |
| Code review request | Async (minutes-hours) | Task queue + callback |
| Broadcast announcement | Fire-and-forget | Pub/Sub |
| Long-running analysis | Streaming | SSE with progress events |
Implementation Strategy:
class MessageMode(str, Enum):
SYNC = "sync" # Await response (with timeout)
ASYNC = "async" # Queue for later, callback on completion
FIRE_AND_FORGET = "fire_and_forget" # No response expected
STREAM = "stream" # Continuous updates
4. Message routing strategies?
Recommended: Hierarchical routing with three strategies.
┌─────────────────────────────────────────────────────────────────┐
│ Message Router │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DIRECT │ │ ROLE-BASED │ │ BROADCAST │ │
│ │ Routing │ │ Routing │ │ Routing │ │
│ │ │ │ │ │ │ │
│ │ to: agent │ │ to: @role │ │ to: @all │ │
│ │ "arch-001" │ │ "@engineers"│ │ "@project" │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
| Strategy | Syntax | Use Case |
|---|---|---|
| Direct | to: "agent-123" |
Specific agent communication |
| Role-based | to: "@engineers" |
All agents of a role |
| Broadcast | to: "@all" |
Project-wide announcements |
| Topic-based | to: "#auth-module" |
Agents subscribed to topic |
5. How to maintain conversation context across agent interactions?
Three-tier context management:
┌─────────────────────────────────────────────────────────────────┐
│ Context Hierarchy │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ CONVERSATION CONTEXT (Short-term) │ │
│ │ - Current message thread │ │
│ │ - Last N exchanges between agents │ │
│ │ - Active topic/issue context │ │
│ │ Storage: In-memory + Redis (TTL: 1 hour) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ SESSION CONTEXT (Medium-term) │ │
│ │ - Sprint goals and decisions │ │
│ │ - Shared agreements between agents │ │
│ │ - Recent artifacts and references │ │
│ │ Storage: PostgreSQL AgentMessage table │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PROJECT CONTEXT (Long-term) │ │
│ │ - Architecture decisions (ADRs) │ │
│ │ - Requirements and constraints │ │
│ │ - Knowledge base documents │ │
│ │ Storage: PostgreSQL + pgvector (RAG) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Context injection pattern:
async def prepare_agent_context(
agent_id: str,
conversation_id: str,
project_id: str
) -> list[dict]:
"""Build context for LLM call."""
context = []
# 1. Conversation context (recent exchanges)
recent_messages = await get_conversation_history(
conversation_id, limit=10
)
context.extend(format_as_messages(recent_messages))
# 2. Session context (relevant decisions)
session_context = await get_session_context(agent_id, project_id)
if session_context:
context.append({
"role": "system",
"content": f"Session context:\n{session_context}"
})
# 3. Project context (RAG retrieval)
query = extract_query_from_conversation(recent_messages)
rag_results = await search_knowledge_base(project_id, query)
if rag_results:
context.append({
"role": "system",
"content": f"Relevant project context:\n{rag_results}"
})
return context
6. Conflict resolution when agents disagree?
Hierarchical escalation model:
┌─────────────────────────────────────────────────────────────────┐
│ Conflict Resolution Hierarchy │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Level 1: Direct Negotiation │
│ ├── Agents exchange arguments (max 3 rounds) │
│ ├── If consensus → Resolution recorded │
│ └── If no consensus → Escalate to Level 2 │
│ │
│ Level 2: Expert Arbitration │
│ ├── Relevant expert agent weighs in (Architect for tech) │
│ ├── Expert makes binding recommendation │
│ └── If expert unavailable → Escalate to Level 3 │
│ │
│ Level 3: Human Decision │
│ ├── Conflict summarized and sent to human client │
│ ├── Approval event triggers resolution │
│ └── Decision recorded for future reference │
│ │
└─────────────────────────────────────────────────────────────────┘
Implementation:
class ConflictResolution(str, Enum):
NEGOTIATION = "negotiation"
EXPERT_ARBITRATION = "expert_arbitration"
HUMAN_DECISION = "human_decision"
class ConflictTracker:
async def record_conflict(
self,
participants: list[str],
topic: str,
positions: dict[str, str]
) -> Conflict:
"""Record a conflict between agents."""
return await self.db.create(Conflict(
participants=participants,
topic=topic,
positions=positions,
status=ConflictStatus.OPEN,
resolution_level=ConflictResolution.NEGOTIATION,
negotiation_rounds=0
))
async def escalate(self, conflict_id: str) -> Conflict:
"""Escalate conflict to next resolution level."""
conflict = await self.get(conflict_id)
if conflict.resolution_level == ConflictResolution.NEGOTIATION:
# Find relevant expert
expert = await self.find_expert_for_topic(conflict.topic)
conflict.arbitrator_id = expert.id
conflict.resolution_level = ConflictResolution.EXPERT_ARBITRATION
elif conflict.resolution_level == ConflictResolution.EXPERT_ARBITRATION:
# Create human approval request
await self.create_approval_request(conflict)
conflict.resolution_level = ConflictResolution.HUMAN_DECISION
return await self.db.update(conflict)
7. How to audit/log inter-agent communication?
Comprehensive audit trail:
class MessageAuditLog:
"""All agent messages are persisted for audit."""
id = Column(UUID, primary_key=True)
message_id = Column(UUID, index=True)
# Routing info
from_agent_id = Column(UUID, ForeignKey("agent_instances.id"))
to_agent_id = Column(UUID, nullable=True) # NULL for broadcasts
to_role = Column(String, nullable=True)
# Message details
message_type = Column(Enum(MessageType))
action = Column(String(50))
content_hash = Column(String(64)) # SHA-256 of content
content = Column(Text) # Full content (encrypted at rest)
# Context
project_id = Column(UUID, ForeignKey("projects.id"), index=True)
conversation_id = Column(UUID, index=True)
parent_message_id = Column(UUID, nullable=True)
# Response tracking
response_to_id = Column(UUID, nullable=True)
response_received_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
delivered_at = Column(DateTime, nullable=True)
read_at = Column(DateTime, nullable=True)
Message Format Specification
Core Message Schema
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
from uuid import UUID
class AgentIdentifier(BaseModel):
"""Identifies an agent in communication."""
agent_id: UUID
role: str
name: str
class MessageContext(BaseModel):
"""Contextual information for message routing."""
project_id: UUID
conversation_id: UUID
issue_id: Optional[UUID] = None
sprint_id: Optional[UUID] = None
parent_message_id: Optional[UUID] = None
class MessageMetadata(BaseModel):
"""Message metadata for processing."""
created_at: datetime = Field(default_factory=datetime.utcnow)
expires_at: Optional[datetime] = None
priority: Literal["urgent", "high", "normal", "low"] = "normal"
requires_response: bool = False
response_timeout_seconds: Optional[int] = None
retry_count: int = 0
max_retries: int = 3
class AgentMessage(BaseModel):
"""Primary message format for inter-agent communication."""
# Identity
id: UUID = Field(default_factory=uuid4)
type: Literal["request", "response", "notification", "broadcast", "stream"]
# Routing
sender: AgentIdentifier
recipient: Optional[AgentIdentifier] = None # None for broadcasts
recipient_role: Optional[str] = None # For role-based routing
recipient_all: bool = False # For project-wide broadcast
# Action
action: str # e.g., "request_review", "report_bug", "share_finding"
# Content
content: str # Natural language message
attachments: list[Attachment] = [] # Files, code snippets, etc.
# Context
context: MessageContext
# Metadata
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
# Response linking
in_response_to: Optional[UUID] = None
class Attachment(BaseModel):
"""Attachments to messages."""
type: Literal["code", "file", "image", "document", "reference"]
name: str
content: Optional[str] = None # For inline content
url: Optional[str] = None # For file references
mime_type: Optional[str] = None
Action Types
class MessageAction(str, Enum):
# Request actions
REQUEST_REVIEW = "request_review"
REQUEST_GUIDANCE = "request_guidance"
REQUEST_CLARIFICATION = "request_clarification"
REQUEST_APPROVAL = "request_approval"
DELEGATE_TASK = "delegate_task"
# Response actions
PROVIDE_REVIEW = "provide_review"
PROVIDE_GUIDANCE = "provide_guidance"
PROVIDE_CLARIFICATION = "provide_clarification"
APPROVE = "approve"
REJECT = "reject"
# Notification actions
REPORT_BUG = "report_bug"
REPORT_PROGRESS = "report_progress"
REPORT_BLOCKER = "report_blocker"
SHARE_FINDING = "share_finding"
ANNOUNCE_DECISION = "announce_decision"
# Collaboration actions
PROPOSE_APPROACH = "propose_approach"
CHALLENGE_APPROACH = "challenge_approach"
AGREE_WITH = "agree_with"
DISAGREE_WITH = "disagree_with"
Communication Patterns
Pattern 1: Request-Response
Use Case: Engineer asks Architect for design guidance.
# Engineer sends request
request = AgentMessage(
type="request",
sender=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
recipient=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
action="request_guidance",
content="I need to implement caching for the user service. Should I use Redis with write-through or write-behind strategy?",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
issue_id=issue_id
),
metadata=MessageMetadata(
priority="normal",
requires_response=True,
response_timeout_seconds=300
)
)
# Send and await response
response = await message_router.send_and_wait(request, timeout=300)
# Architect's response
response = AgentMessage(
type="response",
sender=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
recipient=request.sender,
action="provide_guidance",
content="Given our consistency requirements, use write-through caching with Redis. Here's the pattern...",
in_response_to=request.id,
context=request.context,
attachments=[
Attachment(type="code", name="cache_pattern.py", content="...")
]
)
Pattern 2: Broadcast
Use Case: Product Owner announces sprint goals.
broadcast = AgentMessage(
type="broadcast",
sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
recipient_all=True,
action="announce_decision",
content="Sprint 3 goals: 1) Complete auth module 2) Add user settings page 3) Fix critical bugs from QA",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
sprint_id=sprint_id
),
metadata=MessageMetadata(
priority="high",
requires_response=False
)
)
await message_router.broadcast(broadcast)
Pattern 3: Role-Based Routing
Use Case: QA reports a bug to all Engineers.
bug_report = AgentMessage(
type="notification",
sender=AgentIdentifier(agent_id=qa_id, role="QA", name="Quinn"),
recipient_role="Engineer", # All engineers receive this
action="report_bug",
content="Found a critical bug in the auth flow: users can bypass 2FA by...",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
issue_id=bug_issue_id
),
metadata=MessageMetadata(
priority="urgent",
requires_response=True
)
)
await message_router.send_to_role(bug_report)
Pattern 4: Task Delegation
Use Case: Product Owner assigns work to Engineer.
delegation = AgentMessage(
type="request",
sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
recipient=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
action="delegate_task",
content="Please implement issue #45: Add password reset functionality",
context=MessageContext(
project_id=project_id,
conversation_id=conv_id,
issue_id=issue_45_id
),
metadata=MessageMetadata(
priority="normal",
requires_response=True
)
)
# This creates a Celery task for async execution
await message_router.delegate_task(delegation)
Pattern 5: Streaming Updates
Use Case: Engineer shares progress during long task.
async def stream_progress(agent_id: str, task_id: str):
"""Stream progress updates during task execution."""
for step in task_steps:
# Execute step
result = await execute_step(step)
# Stream update
update = AgentMessage(
type="stream",
sender=AgentIdentifier(agent_id=agent_id, ...),
recipient_all=True,
action="report_progress",
content=f"Completed: {step.name}. Progress: {step.percent}%",
context=MessageContext(project_id=project_id, ...),
)
await message_router.stream(update)
Database Schema
Message Storage
# app/models/agent_message.py
from sqlalchemy import Column, String, Text, DateTime, Enum, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.db.base import Base
class AgentMessage(Base):
"""Persistent storage for all agent messages."""
__tablename__ = "agent_messages"
# Primary key
id = Column(UUID, primary_key=True, default=uuid4)
# Message type
type = Column(
Enum("request", "response", "notification", "broadcast", "stream",
name="message_type"),
nullable=False
)
# Sender
sender_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
sender_role = Column(String(50), nullable=False)
sender_name = Column(String(100), nullable=False)
# Recipient (nullable for broadcasts)
recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True)
recipient_role = Column(String(50), nullable=True) # For role-based routing
is_broadcast = Column(Boolean, default=False)
# Action
action = Column(String(50), nullable=False)
# Content
content = Column(Text, nullable=False)
content_hash = Column(String(64), nullable=False) # SHA-256
attachments = Column(JSONB, default=list)
# Context
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
conversation_id = Column(UUID, nullable=False)
issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
parent_message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)
# Response linking
in_response_to_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)
# Metadata
priority = Column(
Enum("urgent", "high", "normal", "low", name="priority_level"),
default="normal"
)
requires_response = Column(Boolean, default=False)
response_timeout_seconds = Column(Integer, nullable=True)
# Status tracking
status = Column(
Enum("pending", "delivered", "read", "responded", "expired", "failed",
name="message_status"),
default="pending"
)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
delivered_at = Column(DateTime, nullable=True)
read_at = Column(DateTime, nullable=True)
responded_at = Column(DateTime, nullable=True)
expires_at = Column(DateTime, nullable=True)
# Indexes
__table_args__ = (
Index("ix_agent_messages_project_conversation", "project_id", "conversation_id"),
Index("ix_agent_messages_sender", "sender_agent_id", "created_at"),
Index("ix_agent_messages_recipient", "recipient_agent_id", "status"),
Index("ix_agent_messages_conversation", "conversation_id", "created_at"),
)
class Conversation(Base):
"""Groups related messages into conversations."""
__tablename__ = "conversations"
id = Column(UUID, primary_key=True, default=uuid4)
# Context
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
# Participants
participant_ids = Column(JSONB, default=list) # List of agent IDs
# Topic
topic = Column(String(200), nullable=True)
# Status
status = Column(
Enum("active", "resolved", "archived", name="conversation_status"),
default="active"
)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
resolved_at = Column(DateTime, nullable=True)
# Summary (generated)
summary = Column(Text, nullable=True)
class MessageDelivery(Base):
"""Tracks delivery status for broadcast messages."""
__tablename__ = "message_deliveries"
id = Column(UUID, primary_key=True, default=uuid4)
message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=False)
recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
# Status
delivered_at = Column(DateTime, nullable=True)
read_at = Column(DateTime, nullable=True)
responded_at = Column(DateTime, nullable=True)
Alembic Migration
# migrations/versions/xxx_add_agent_messages.py
def upgrade():
# Create enum types
op.execute("""
CREATE TYPE message_type AS ENUM (
'request', 'response', 'notification', 'broadcast', 'stream'
)
""")
op.execute("""
CREATE TYPE priority_level AS ENUM ('urgent', 'high', 'normal', 'low')
""")
op.execute("""
CREATE TYPE message_status AS ENUM (
'pending', 'delivered', 'read', 'responded', 'expired', 'failed'
)
""")
op.execute("""
CREATE TYPE conversation_status AS ENUM ('active', 'resolved', 'archived')
""")
# Create tables
op.create_table(
'conversations',
sa.Column('id', UUID, primary_key=True),
sa.Column('project_id', UUID, sa.ForeignKey('projects.id'), nullable=False),
sa.Column('issue_id', UUID, sa.ForeignKey('issues.id'), nullable=True),
sa.Column('sprint_id', UUID, sa.ForeignKey('sprints.id'), nullable=True),
sa.Column('participant_ids', JSONB, default=[]),
sa.Column('topic', sa.String(200), nullable=True),
sa.Column('status', sa.Enum('active', 'resolved', 'archived',
name='conversation_status'), default='active'),
sa.Column('created_at', sa.DateTime, default=datetime.utcnow),
sa.Column('updated_at', sa.DateTime, onupdate=datetime.utcnow),
sa.Column('resolved_at', sa.DateTime, nullable=True),
sa.Column('summary', sa.Text, nullable=True),
)
op.create_table(
'agent_messages',
# ... columns as defined above
)
op.create_table(
'message_deliveries',
# ... columns as defined above
)
Code Examples
Message Router Service
# app/services/message_router.py
from typing import Optional, AsyncIterator
from uuid import UUID
import asyncio
import hashlib
import json
from app.models.agent_message import AgentMessage as AgentMessageModel
from app.schemas.messages import AgentMessage, AgentIdentifier
from app.services.events import EventBus
from app.db.session import AsyncSession
from app.core.celery_app import celery_app
class MessageRouter:
"""Routes messages between agents."""
def __init__(
self,
db: AsyncSession,
event_bus: EventBus,
orchestrator: "AgentOrchestrator"
):
self.db = db
self.event_bus = event_bus
self.orchestrator = orchestrator
async def send(self, message: AgentMessage) -> AgentMessageModel:
"""Send a message to a specific agent."""
# Validate recipient exists and is active
recipient = await self.orchestrator.get_instance(message.recipient.agent_id)
if not recipient or recipient.status != "active":
raise AgentNotAvailableError(f"Agent {message.recipient.agent_id} not available")
# Persist message
db_message = await self._persist_message(message)
# Publish to recipient's channel
await self.event_bus.publish(
f"agent:{message.recipient.agent_id}",
{
"type": "message_received",
"message_id": str(db_message.id),
"sender": message.sender.dict(),
"action": message.action,
"priority": message.metadata.priority,
"preview": message.content[:100]
}
)
# Also publish to project channel for monitoring
await self.event_bus.publish(
f"project:{message.context.project_id}",
{
"type": "agent_message",
"from": message.sender.name,
"to": message.recipient.name,
"action": message.action
}
)
return db_message
async def send_and_wait(
self,
message: AgentMessage,
timeout: int = 300
) -> Optional[AgentMessage]:
"""Send a message and wait for response."""
db_message = await self.send(message)
# Subscribe to response channel
response_channel = f"response:{db_message.id}"
subscriber = await self.event_bus.subscribe(response_channel)
try:
response_event = await asyncio.wait_for(
subscriber.get_event(),
timeout=timeout
)
response_id = response_event.data["response_id"]
return await self.get_message(response_id)
except asyncio.TimeoutError:
# Update message status to expired
await self._mark_expired(db_message.id)
return None
finally:
await subscriber.unsubscribe()
async def send_to_role(self, message: AgentMessage) -> list[AgentMessageModel]:
"""Send message to all agents of a specific role."""
# Get all active agents with the target role
recipients = await self.orchestrator.get_instances_by_role(
project_id=message.context.project_id,
role=message.recipient_role
)
messages = []
for recipient in recipients:
msg = message.copy()
msg.recipient = AgentIdentifier(
agent_id=recipient.id,
role=recipient.agent_type.role,
name=recipient.name
)
db_message = await self.send(msg)
messages.append(db_message)
return messages
async def broadcast(self, message: AgentMessage) -> AgentMessageModel:
"""Broadcast message to all project agents."""
# Persist the broadcast message
db_message = await self._persist_message(message)
# Get all active agents in project
agents = await self.orchestrator.get_project_agents(
message.context.project_id
)
# Create delivery records
for agent in agents:
if agent.id != message.sender.agent_id:
await self._create_delivery_record(db_message.id, agent.id)
# Publish to project channel
await self.event_bus.publish(
f"project:{message.context.project_id}",
{
"type": "broadcast",
"message_id": str(db_message.id),
"sender": message.sender.dict(),
"action": message.action,
"content": message.content
}
)
return db_message
async def delegate_task(self, message: AgentMessage) -> str:
"""Delegate a task for async execution via Celery."""
# Persist message
db_message = await self._persist_message(message)
# Create Celery task
task = celery_app.send_task(
"app.tasks.agent.execute_delegated_task",
args=[str(db_message.id), str(message.recipient.agent_id)],
queue="agent_tasks",
priority=self._priority_to_int(message.metadata.priority)
)
return task.id
async def respond(
self,
original_message_id: UUID,
response: AgentMessage
) -> AgentMessageModel:
"""Send a response to a previous message."""
original = await self.get_message(original_message_id)
response.in_response_to = original_message_id
response.context = original.context
db_response = await self.send(response)
# Update original message status
await self._mark_responded(original_message_id)
# Publish response notification
await self.event_bus.publish(
f"response:{original_message_id}",
{
"type": "response_received",
"response_id": str(db_response.id)
}
)
return db_response
async def get_conversation_history(
self,
conversation_id: UUID,
limit: int = 50,
before: Optional[datetime] = None
) -> list[AgentMessageModel]:
"""Get messages in a conversation."""
query = select(AgentMessageModel).where(
AgentMessageModel.conversation_id == conversation_id
)
if before:
query = query.where(AgentMessageModel.created_at < before)
query = query.order_by(AgentMessageModel.created_at.desc()).limit(limit)
result = await self.db.execute(query)
return list(reversed(result.scalars().all()))
async def _persist_message(self, message: AgentMessage) -> AgentMessageModel:
"""Persist message to database."""
content_hash = hashlib.sha256(message.content.encode()).hexdigest()
db_message = AgentMessageModel(
id=message.id,
type=message.type,
sender_agent_id=message.sender.agent_id,
sender_role=message.sender.role,
sender_name=message.sender.name,
recipient_agent_id=message.recipient.agent_id if message.recipient else None,
recipient_role=message.recipient_role,
is_broadcast=message.recipient_all,
action=message.action,
content=message.content,
content_hash=content_hash,
attachments=[a.dict() for a in message.attachments],
project_id=message.context.project_id,
conversation_id=message.context.conversation_id,
issue_id=message.context.issue_id,
sprint_id=message.context.sprint_id,
parent_message_id=message.context.parent_message_id,
in_response_to_id=message.in_response_to,
priority=message.metadata.priority,
requires_response=message.metadata.requires_response,
response_timeout_seconds=message.metadata.response_timeout_seconds,
expires_at=message.metadata.expires_at,
)
self.db.add(db_message)
await self.db.commit()
await self.db.refresh(db_message)
return db_message
def _priority_to_int(self, priority: str) -> int:
"""Convert priority to Celery priority (0=highest)."""
return {"urgent": 0, "high": 3, "normal": 6, "low": 9}.get(priority, 6)
Receiving and Processing Messages
# app/services/agent_inbox.py
from typing import Optional, Callable, Awaitable
class AgentInbox:
"""Manages incoming messages for an agent."""
def __init__(
self,
agent_id: UUID,
event_bus: EventBus,
message_router: MessageRouter
):
self.agent_id = agent_id
self.event_bus = event_bus
self.message_router = message_router
self._handlers: dict[str, Callable] = {}
def on_action(
self,
action: str,
handler: Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]]
):
"""Register a handler for a specific action type."""
self._handlers[action] = handler
async def start_listening(self):
"""Start listening for incoming messages."""
channel = f"agent:{self.agent_id}"
subscriber = await self.event_bus.subscribe(channel)
while True:
event = await subscriber.get_event()
if event.type == "message_received":
message = await self.message_router.get_message(
event.data["message_id"]
)
await self._process_message(message)
async def _process_message(self, message: AgentMessage):
"""Process an incoming message."""
# Mark as read
await self.message_router.mark_read(message.id)
# Find handler
handler = self._handlers.get(message.action)
if not handler:
handler = self._handlers.get("default")
if handler:
response = await handler(message)
if response and message.requires_response:
await self.message_router.respond(message.id, response)
# Usage in agent runner
class AgentRunner:
async def setup_message_handlers(self):
"""Setup handlers for different message types."""
@self.inbox.on_action("request_review")
async def handle_review_request(message: AgentMessage) -> AgentMessage:
# Execute code review
review_result = await self.execute("review_code", {
"content": message.content,
"attachments": message.attachments
})
return AgentMessage(
type="response",
sender=self.identifier,
recipient=message.sender,
action="provide_review",
content=review_result["content"],
attachments=review_result.get("attachments", [])
)
@self.inbox.on_action("delegate_task")
async def handle_task_delegation(message: AgentMessage) -> AgentMessage:
# Accept and start working on task
await self.update_status("working")
# Execute task
result = await self.execute("implement", {
"task_description": message.content,
"issue_id": message.context.issue_id
})
return AgentMessage(
type="response",
sender=self.identifier,
action="task_completed",
content=f"Task completed. {result['summary']}"
)
API Endpoints
# app/api/v1/messages.py
from fastapi import APIRouter, Depends, HTTPException
from uuid import UUID
from app.schemas.messages import AgentMessage, MessageCreate, MessageResponse
from app.services.message_router import MessageRouter
from app.api.deps import get_message_router, get_current_user
router = APIRouter(prefix="/messages", tags=["messages"])
@router.post("/send", response_model=MessageResponse)
async def send_message(
message_in: MessageCreate,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Send a message between agents (admin/system use)."""
message = AgentMessage(**message_in.dict())
result = await router.send(message)
return MessageResponse.from_orm(result)
@router.post("/broadcast", response_model=MessageResponse)
async def broadcast_message(
message_in: MessageCreate,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Broadcast a message to all project agents."""
message = AgentMessage(**message_in.dict(), recipient_all=True)
result = await router.broadcast(message)
return MessageResponse.from_orm(result)
@router.get("/conversation/{conversation_id}", response_model=list[MessageResponse])
async def get_conversation(
conversation_id: UUID,
limit: int = 50,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Get messages in a conversation."""
messages = await router.get_conversation_history(conversation_id, limit=limit)
return [MessageResponse.from_orm(m) for m in messages]
@router.get("/agent/{agent_id}/inbox", response_model=list[MessageResponse])
async def get_agent_inbox(
agent_id: UUID,
status: Optional[str] = None,
limit: int = 50,
router: MessageRouter = Depends(get_message_router),
current_user: User = Depends(get_current_user)
):
"""Get messages in an agent's inbox."""
messages = await router.get_agent_messages(agent_id, status=status, limit=limit)
return [MessageResponse.from_orm(m) for m in messages]
SSE Integration
Real-time Message Events
Messages integrate with the existing SSE event system (ADR-002):
# Extended event types for messages
class EventType(str, Enum):
# Existing events...
# Message Events
MESSAGE_SENT = "message_sent"
MESSAGE_RECEIVED = "message_received"
MESSAGE_READ = "message_read"
MESSAGE_RESPONDED = "message_responded"
# Conversation Events
CONVERSATION_STARTED = "conversation_started"
CONVERSATION_RESOLVED = "conversation_resolved"
# Conflict Events
CONFLICT_DETECTED = "conflict_detected"
CONFLICT_ESCALATED = "conflict_escalated"
CONFLICT_RESOLVED = "conflict_resolved"
Message Event Stream
# app/api/v1/events.py (extended)
@router.get("/projects/{project_id}/messages/events")
async def message_events(
project_id: UUID,
request: Request,
current_user: User = Depends(get_current_user)
):
"""Stream message events for a project."""
async def event_generator():
subscriber = await event_bus.subscribe(f"project:{project_id}:messages")
try:
while not await request.is_disconnected():
try:
event = await asyncio.wait_for(
subscriber.get_event(),
timeout=30.0
)
# Filter by event type
if event.type in [
"message_sent", "message_received",
"conversation_started", "conflict_detected"
]:
yield f"event: {event.type}\ndata: {json.dumps(event.data)}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
await subscriber.unsubscribe()
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Frontend Integration
// frontend/lib/messageEvents.ts
import { useEffect, useCallback } from 'react';
interface MessageEvent {
type: 'message_sent' | 'message_received' | 'conversation_started';
data: {
message_id?: string;
from?: AgentIdentifier;
to?: AgentIdentifier;
action?: string;
preview?: string;
};
}
export function useMessageEvents(
projectId: string,
onMessage: (event: MessageEvent) => void
) {
useEffect(() => {
const eventSource = new EventSource(
`/api/v1/projects/${projectId}/messages/events`,
{ withCredentials: true }
);
eventSource.addEventListener('message_sent', (e) => {
onMessage({ type: 'message_sent', data: JSON.parse(e.data) });
});
eventSource.addEventListener('message_received', (e) => {
onMessage({ type: 'message_received', data: JSON.parse(e.data) });
});
eventSource.addEventListener('conversation_started', (e) => {
onMessage({ type: 'conversation_started', data: JSON.parse(e.data) });
});
eventSource.onerror = () => {
// Reconnection handled by EventSource
console.log('Message SSE reconnecting...');
};
return () => eventSource.close();
}, [projectId, onMessage]);
}
Syndarix-Specific Requirements
@Mentions Support
class MentionParser:
"""Parse @mentions in message content."""
MENTION_PATTERN = r'@(\w+)'
ROLE_PATTERN = r'@(engineers?|architects?|qa|po|pm|devops)'
def parse(self, content: str) -> list[Mention]:
mentions = []
# Parse agent mentions
for match in re.finditer(self.MENTION_PATTERN, content):
name = match.group(1)
mentions.append(Mention(type="agent", name=name, position=match.start()))
# Parse role mentions
for match in re.finditer(self.ROLE_PATTERN, content, re.IGNORECASE):
role = match.group(1)
mentions.append(Mention(type="role", role=role, position=match.start()))
return mentions
async def resolve_mentions(
self,
mentions: list[Mention],
project_id: UUID,
orchestrator: AgentOrchestrator
) -> list[UUID]:
"""Resolve mentions to agent IDs."""
agent_ids = set()
for mention in mentions:
if mention.type == "agent":
agent = await orchestrator.get_by_name(project_id, mention.name)
if agent:
agent_ids.add(agent.id)
elif mention.type == "role":
agents = await orchestrator.get_instances_by_role(
project_id, mention.role
)
agent_ids.update(a.id for a in agents)
return list(agent_ids)
Priority Message Handling
class PriorityMessageHandler:
"""Handle urgent messages with priority."""
PRIORITY_INTERRUPTS = {"urgent", "high"}
async def process_with_priority(
self,
agent_id: UUID,
message: AgentMessage
):
"""Process message based on priority."""
if message.metadata.priority in self.PRIORITY_INTERRUPTS:
# Interrupt current task
await self.orchestrator.interrupt_agent(agent_id)
# Publish urgent notification
await self.event_bus.publish(
f"project:{message.context.project_id}",
{
"type": "urgent_message",
"agent_id": str(agent_id),
"message_id": str(message.id),
"from": message.sender.name,
"preview": message.content[:100]
}
)
# Process immediately
await self.process_immediately(agent_id, message)
else:
# Queue for normal processing
await self.queue_for_processing(agent_id, message)
Communication Traceability
class CommunicationTracer:
"""Trace all inter-agent communication."""
async def trace_message_flow(
self,
conversation_id: UUID
) -> MessageFlowTrace:
"""Generate a trace of message flow in a conversation."""
messages = await self.get_conversation_messages(conversation_id)
nodes = []
edges = []
for msg in messages:
nodes.append({
"id": str(msg.id),
"agent": msg.sender_name,
"role": msg.sender_role,
"action": msg.action,
"timestamp": msg.created_at.isoformat()
})
if msg.in_response_to_id:
edges.append({
"from": str(msg.in_response_to_id),
"to": str(msg.id),
"type": "response"
})
if msg.parent_message_id:
edges.append({
"from": str(msg.parent_message_id),
"to": str(msg.id),
"type": "thread"
})
return MessageFlowTrace(
conversation_id=conversation_id,
nodes=nodes,
edges=edges,
summary=await self.generate_summary(messages)
)
Performance Considerations
Message Throughput
| Scenario | Target | Implementation |
|---|---|---|
| Direct messages | 100/sec | Redis Pub/Sub |
| Broadcasts | 10/sec | Batched delivery |
| Task delegation | 50/sec | Celery queue |
| Message persistence | 200/sec | Batch inserts |
Scalability
# Message batching for high-throughput scenarios
class MessageBatcher:
def __init__(self, batch_size: int = 100, flush_interval: float = 0.1):
self.batch_size = batch_size
self.flush_interval = flush_interval
self._buffer: list[AgentMessage] = []
self._lock = asyncio.Lock()
async def add(self, message: AgentMessage):
async with self._lock:
self._buffer.append(message)
if len(self._buffer) >= self.batch_size:
await self._flush()
async def _flush(self):
if not self._buffer:
return
messages = self._buffer
self._buffer = []
# Batch insert
await self.db.execute(
insert(AgentMessageModel),
[self._to_dict(m) for m in messages]
)
await self.db.commit()
Memory Management
- Conversation history limited to last 50 messages in-memory
- Older messages retrieved on-demand from database
- Redis TTL of 1 hour for active conversation cache
- Periodic archival of resolved conversations
References
Industry Protocols Researched
- Google A2A Protocol - Agent-to-Agent Protocol
- IBM ACP - Agent Communication Protocol
- Anthropic MCP - Model Context Protocol
- Linux Foundation A2A Project
Multi-Agent System Research
- Multi-Agent Collaboration Mechanisms Survey
- Conflict Resolution in Multi-Agent Systems
- Agent Memory Management
- Context Engineering for LLM Agents
Syndarix Architecture
Decision
Adopt a hybrid message-based communication protocol with:
- Structured JSON messages with natural language content
- Three routing strategies: Direct, Role-based, Broadcast
- Redis Pub/Sub for real-time delivery
- PostgreSQL for message persistence and audit
- Celery for async task delegation
- SSE for client notifications
- Three-tier context management for conversation continuity
- Hierarchical conflict resolution with human escalation
This protocol integrates seamlessly with existing Syndarix infrastructure while following industry best practices from A2A and ACP protocols.
Spike completed. Findings will inform implementation of inter-agent communication in the Agent Orchestration layer.