# SPIKE-007: Agent Communication Protocol **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #7 --- ## Executive Summary This spike researches inter-agent communication protocols for Syndarix, where 10+ specialized AI agents need to collaborate on software projects. After analyzing industry standards (A2A, ACP, MCP) and multi-agent system patterns, we recommend a **structured message-based protocol** built on the existing Redis Pub/Sub event bus. ### Recommendation Adopt a **hybrid communication model** combining: 1. **Structured JSON-RPC messages** for request-response patterns 2. **Redis Pub/Sub channels** for broadcasts and topic-based routing 3. **Database-backed message persistence** for auditability and context recovery 4. **Priority queues via Celery** for async task delegation This approach aligns with Google's A2A protocol principles while leveraging Syndarix's existing infrastructure (Redis, Celery, SSE events). --- ## Research Questions ### 1. What communication patterns work for AI multi-agent systems? Industry research identifies four primary patterns: | Pattern | Use Case | Syndarix Application | |---------|----------|---------------------| | **Request-Response** | Direct task delegation | Engineer asks Architect for guidance | | **Publish-Subscribe** | Broadcasts, notifications | PO announces sprint goals to all agents | | **Task Queue** | Async work delegation | PO assigns issues to Engineers | | **Streaming** | Long-running updates | Agent streaming progress to observers | **Key Insight:** Syndarix needs all four patterns. A2A protocol supports request-response and streaming; Celery handles task queues; Redis Pub/Sub provides pub-sub. ### 2. Structured message formats vs natural language between agents? **Recommendation: Structured messages with natural language payload.** ```python # Structured envelope with natural language content { "id": "msg-uuid-123", "type": "request", "from": {"agent_id": "eng-001", "role": "Engineer", "name": "Dave"}, "to": {"agent_id": "arch-001", "role": "Architect", "name": "Alex"}, "action": "request_guidance", "priority": "normal", "context": { "project_id": "proj-123", "issue_id": "issue-456", "conversation_id": "conv-789" }, "content": "I'm implementing the auth module. Should I use JWT with refresh tokens or session-based auth?", "metadata": { "created_at": "2025-12-29T10:00:00Z", "expires_at": "2025-12-29T11:00:00Z", "requires_response": true } } ``` **Rationale:** - Structured envelope enables routing, filtering, and auditing - Natural language content preserves LLM reasoning capabilities - Matches A2A/ACP hybrid approach (structured headers, flexible payload) - Enables agent-to-agent understanding without rigid schemas ### 3. How to handle async vs sync communication? **Pattern Mapping:** | Scenario | Timing | Implementation | |----------|--------|----------------| | Quick clarification | Sync-like (< 30s) | Request-response with timeout | | Code review request | Async (minutes-hours) | Task queue + callback | | Broadcast announcement | Fire-and-forget | Pub/Sub | | Long-running analysis | Streaming | SSE with progress events | **Implementation Strategy:** ```python class MessageMode(str, Enum): SYNC = "sync" # Await response (with timeout) ASYNC = "async" # Queue for later, callback on completion FIRE_AND_FORGET = "fire_and_forget" # No response expected STREAM = "stream" # Continuous updates ``` ### 4. Message routing strategies? **Recommended: Hierarchical routing with three strategies.** ``` ┌─────────────────────────────────────────────────────────────────┐ │ Message Router │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ DIRECT │ │ ROLE-BASED │ │ BROADCAST │ │ │ │ Routing │ │ Routing │ │ Routing │ │ │ │ │ │ │ │ │ │ │ │ to: agent │ │ to: @role │ │ to: @all │ │ │ │ "arch-001" │ │ "@engineers"│ │ "@project" │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` | Strategy | Syntax | Use Case | |----------|--------|----------| | **Direct** | `to: "agent-123"` | Specific agent communication | | **Role-based** | `to: "@engineers"` | All agents of a role | | **Broadcast** | `to: "@all"` | Project-wide announcements | | **Topic-based** | `to: "#auth-module"` | Agents subscribed to topic | ### 5. How to maintain conversation context across agent interactions? **Three-tier context management:** ``` ┌─────────────────────────────────────────────────────────────────┐ │ Context Hierarchy │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────────────────────────────────────────────────┐ │ │ │ CONVERSATION CONTEXT (Short-term) │ │ │ │ - Current message thread │ │ │ │ - Last N exchanges between agents │ │ │ │ - Active topic/issue context │ │ │ │ Storage: In-memory + Redis (TTL: 1 hour) │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────────────────────────────────────┐ │ │ │ SESSION CONTEXT (Medium-term) │ │ │ │ - Sprint goals and decisions │ │ │ │ - Shared agreements between agents │ │ │ │ - Recent artifacts and references │ │ │ │ Storage: PostgreSQL AgentMessage table │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────────────────────────────────────┐ │ │ │ PROJECT CONTEXT (Long-term) │ │ │ │ - Architecture decisions (ADRs) │ │ │ │ - Requirements and constraints │ │ │ │ - Knowledge base documents │ │ │ │ Storage: PostgreSQL + pgvector (RAG) │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` **Context injection pattern:** ```python async def prepare_agent_context( agent_id: str, conversation_id: str, project_id: str ) -> list[dict]: """Build context for LLM call.""" context = [] # 1. Conversation context (recent exchanges) recent_messages = await get_conversation_history( conversation_id, limit=10 ) context.extend(format_as_messages(recent_messages)) # 2. Session context (relevant decisions) session_context = await get_session_context(agent_id, project_id) if session_context: context.append({ "role": "system", "content": f"Session context:\n{session_context}" }) # 3. Project context (RAG retrieval) query = extract_query_from_conversation(recent_messages) rag_results = await search_knowledge_base(project_id, query) if rag_results: context.append({ "role": "system", "content": f"Relevant project context:\n{rag_results}" }) return context ``` ### 6. Conflict resolution when agents disagree? **Hierarchical escalation model:** ``` ┌─────────────────────────────────────────────────────────────────┐ │ Conflict Resolution Hierarchy │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Level 1: Direct Negotiation │ │ ├── Agents exchange arguments (max 3 rounds) │ │ ├── If consensus → Resolution recorded │ │ └── If no consensus → Escalate to Level 2 │ │ │ │ Level 2: Expert Arbitration │ │ ├── Relevant expert agent weighs in (Architect for tech) │ │ ├── Expert makes binding recommendation │ │ └── If expert unavailable → Escalate to Level 3 │ │ │ │ Level 3: Human Decision │ │ ├── Conflict summarized and sent to human client │ │ ├── Approval event triggers resolution │ │ └── Decision recorded for future reference │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` **Implementation:** ```python class ConflictResolution(str, Enum): NEGOTIATION = "negotiation" EXPERT_ARBITRATION = "expert_arbitration" HUMAN_DECISION = "human_decision" class ConflictTracker: async def record_conflict( self, participants: list[str], topic: str, positions: dict[str, str] ) -> Conflict: """Record a conflict between agents.""" return await self.db.create(Conflict( participants=participants, topic=topic, positions=positions, status=ConflictStatus.OPEN, resolution_level=ConflictResolution.NEGOTIATION, negotiation_rounds=0 )) async def escalate(self, conflict_id: str) -> Conflict: """Escalate conflict to next resolution level.""" conflict = await self.get(conflict_id) if conflict.resolution_level == ConflictResolution.NEGOTIATION: # Find relevant expert expert = await self.find_expert_for_topic(conflict.topic) conflict.arbitrator_id = expert.id conflict.resolution_level = ConflictResolution.EXPERT_ARBITRATION elif conflict.resolution_level == ConflictResolution.EXPERT_ARBITRATION: # Create human approval request await self.create_approval_request(conflict) conflict.resolution_level = ConflictResolution.HUMAN_DECISION return await self.db.update(conflict) ``` ### 7. How to audit/log inter-agent communication? **Comprehensive audit trail:** ```python class MessageAuditLog: """All agent messages are persisted for audit.""" id = Column(UUID, primary_key=True) message_id = Column(UUID, index=True) # Routing info from_agent_id = Column(UUID, ForeignKey("agent_instances.id")) to_agent_id = Column(UUID, nullable=True) # NULL for broadcasts to_role = Column(String, nullable=True) # Message details message_type = Column(Enum(MessageType)) action = Column(String(50)) content_hash = Column(String(64)) # SHA-256 of content content = Column(Text) # Full content (encrypted at rest) # Context project_id = Column(UUID, ForeignKey("projects.id"), index=True) conversation_id = Column(UUID, index=True) parent_message_id = Column(UUID, nullable=True) # Response tracking response_to_id = Column(UUID, nullable=True) response_received_at = Column(DateTime, nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) delivered_at = Column(DateTime, nullable=True) read_at = Column(DateTime, nullable=True) ``` --- ## Message Format Specification ### Core Message Schema ```python from pydantic import BaseModel, Field from typing import Optional, Literal from datetime import datetime from uuid import UUID class AgentIdentifier(BaseModel): """Identifies an agent in communication.""" agent_id: UUID role: str name: str class MessageContext(BaseModel): """Contextual information for message routing.""" project_id: UUID conversation_id: UUID issue_id: Optional[UUID] = None sprint_id: Optional[UUID] = None parent_message_id: Optional[UUID] = None class MessageMetadata(BaseModel): """Message metadata for processing.""" created_at: datetime = Field(default_factory=datetime.utcnow) expires_at: Optional[datetime] = None priority: Literal["urgent", "high", "normal", "low"] = "normal" requires_response: bool = False response_timeout_seconds: Optional[int] = None retry_count: int = 0 max_retries: int = 3 class AgentMessage(BaseModel): """Primary message format for inter-agent communication.""" # Identity id: UUID = Field(default_factory=uuid4) type: Literal["request", "response", "notification", "broadcast", "stream"] # Routing sender: AgentIdentifier recipient: Optional[AgentIdentifier] = None # None for broadcasts recipient_role: Optional[str] = None # For role-based routing recipient_all: bool = False # For project-wide broadcast # Action action: str # e.g., "request_review", "report_bug", "share_finding" # Content content: str # Natural language message attachments: list[Attachment] = [] # Files, code snippets, etc. # Context context: MessageContext # Metadata metadata: MessageMetadata = Field(default_factory=MessageMetadata) # Response linking in_response_to: Optional[UUID] = None class Attachment(BaseModel): """Attachments to messages.""" type: Literal["code", "file", "image", "document", "reference"] name: str content: Optional[str] = None # For inline content url: Optional[str] = None # For file references mime_type: Optional[str] = None ``` ### Action Types ```python class MessageAction(str, Enum): # Request actions REQUEST_REVIEW = "request_review" REQUEST_GUIDANCE = "request_guidance" REQUEST_CLARIFICATION = "request_clarification" REQUEST_APPROVAL = "request_approval" DELEGATE_TASK = "delegate_task" # Response actions PROVIDE_REVIEW = "provide_review" PROVIDE_GUIDANCE = "provide_guidance" PROVIDE_CLARIFICATION = "provide_clarification" APPROVE = "approve" REJECT = "reject" # Notification actions REPORT_BUG = "report_bug" REPORT_PROGRESS = "report_progress" REPORT_BLOCKER = "report_blocker" SHARE_FINDING = "share_finding" ANNOUNCE_DECISION = "announce_decision" # Collaboration actions PROPOSE_APPROACH = "propose_approach" CHALLENGE_APPROACH = "challenge_approach" AGREE_WITH = "agree_with" DISAGREE_WITH = "disagree_with" ``` --- ## Communication Patterns ### Pattern 1: Request-Response **Use Case:** Engineer asks Architect for design guidance. ```python # Engineer sends request request = AgentMessage( type="request", sender=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"), recipient=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"), action="request_guidance", content="I need to implement caching for the user service. Should I use Redis with write-through or write-behind strategy?", context=MessageContext( project_id=project_id, conversation_id=conv_id, issue_id=issue_id ), metadata=MessageMetadata( priority="normal", requires_response=True, response_timeout_seconds=300 ) ) # Send and await response response = await message_router.send_and_wait(request, timeout=300) # Architect's response response = AgentMessage( type="response", sender=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"), recipient=request.sender, action="provide_guidance", content="Given our consistency requirements, use write-through caching with Redis. Here's the pattern...", in_response_to=request.id, context=request.context, attachments=[ Attachment(type="code", name="cache_pattern.py", content="...") ] ) ``` ### Pattern 2: Broadcast **Use Case:** Product Owner announces sprint goals. ```python broadcast = AgentMessage( type="broadcast", sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"), recipient_all=True, action="announce_decision", content="Sprint 3 goals: 1) Complete auth module 2) Add user settings page 3) Fix critical bugs from QA", context=MessageContext( project_id=project_id, conversation_id=conv_id, sprint_id=sprint_id ), metadata=MessageMetadata( priority="high", requires_response=False ) ) await message_router.broadcast(broadcast) ``` ### Pattern 3: Role-Based Routing **Use Case:** QA reports a bug to all Engineers. ```python bug_report = AgentMessage( type="notification", sender=AgentIdentifier(agent_id=qa_id, role="QA", name="Quinn"), recipient_role="Engineer", # All engineers receive this action="report_bug", content="Found a critical bug in the auth flow: users can bypass 2FA by...", context=MessageContext( project_id=project_id, conversation_id=conv_id, issue_id=bug_issue_id ), metadata=MessageMetadata( priority="urgent", requires_response=True ) ) await message_router.send_to_role(bug_report) ``` ### Pattern 4: Task Delegation **Use Case:** Product Owner assigns work to Engineer. ```python delegation = AgentMessage( type="request", sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"), recipient=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"), action="delegate_task", content="Please implement issue #45: Add password reset functionality", context=MessageContext( project_id=project_id, conversation_id=conv_id, issue_id=issue_45_id ), metadata=MessageMetadata( priority="normal", requires_response=True ) ) # This creates a Celery task for async execution await message_router.delegate_task(delegation) ``` ### Pattern 5: Streaming Updates **Use Case:** Engineer shares progress during long task. ```python async def stream_progress(agent_id: str, task_id: str): """Stream progress updates during task execution.""" for step in task_steps: # Execute step result = await execute_step(step) # Stream update update = AgentMessage( type="stream", sender=AgentIdentifier(agent_id=agent_id, ...), recipient_all=True, action="report_progress", content=f"Completed: {step.name}. Progress: {step.percent}%", context=MessageContext(project_id=project_id, ...), ) await message_router.stream(update) ``` --- ## Database Schema ### Message Storage ```python # app/models/agent_message.py from sqlalchemy import Column, String, Text, DateTime, Enum, ForeignKey, Index from sqlalchemy.dialects.postgresql import UUID, JSONB from app.db.base import Base class AgentMessage(Base): """Persistent storage for all agent messages.""" __tablename__ = "agent_messages" # Primary key id = Column(UUID, primary_key=True, default=uuid4) # Message type type = Column( Enum("request", "response", "notification", "broadcast", "stream", name="message_type"), nullable=False ) # Sender sender_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False) sender_role = Column(String(50), nullable=False) sender_name = Column(String(100), nullable=False) # Recipient (nullable for broadcasts) recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True) recipient_role = Column(String(50), nullable=True) # For role-based routing is_broadcast = Column(Boolean, default=False) # Action action = Column(String(50), nullable=False) # Content content = Column(Text, nullable=False) content_hash = Column(String(64), nullable=False) # SHA-256 attachments = Column(JSONB, default=list) # Context project_id = Column(UUID, ForeignKey("projects.id"), nullable=False) conversation_id = Column(UUID, nullable=False) issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True) sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True) parent_message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True) # Response linking in_response_to_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True) # Metadata priority = Column( Enum("urgent", "high", "normal", "low", name="priority_level"), default="normal" ) requires_response = Column(Boolean, default=False) response_timeout_seconds = Column(Integer, nullable=True) # Status tracking status = Column( Enum("pending", "delivered", "read", "responded", "expired", "failed", name="message_status"), default="pending" ) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) delivered_at = Column(DateTime, nullable=True) read_at = Column(DateTime, nullable=True) responded_at = Column(DateTime, nullable=True) expires_at = Column(DateTime, nullable=True) # Indexes __table_args__ = ( Index("ix_agent_messages_project_conversation", "project_id", "conversation_id"), Index("ix_agent_messages_sender", "sender_agent_id", "created_at"), Index("ix_agent_messages_recipient", "recipient_agent_id", "status"), Index("ix_agent_messages_conversation", "conversation_id", "created_at"), ) class Conversation(Base): """Groups related messages into conversations.""" __tablename__ = "conversations" id = Column(UUID, primary_key=True, default=uuid4) # Context project_id = Column(UUID, ForeignKey("projects.id"), nullable=False) issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True) sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True) # Participants participant_ids = Column(JSONB, default=list) # List of agent IDs # Topic topic = Column(String(200), nullable=True) # Status status = Column( Enum("active", "resolved", "archived", name="conversation_status"), default="active" ) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, onupdate=datetime.utcnow) resolved_at = Column(DateTime, nullable=True) # Summary (generated) summary = Column(Text, nullable=True) class MessageDelivery(Base): """Tracks delivery status for broadcast messages.""" __tablename__ = "message_deliveries" id = Column(UUID, primary_key=True, default=uuid4) message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=False) recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False) # Status delivered_at = Column(DateTime, nullable=True) read_at = Column(DateTime, nullable=True) responded_at = Column(DateTime, nullable=True) ``` ### Alembic Migration ```python # migrations/versions/xxx_add_agent_messages.py def upgrade(): # Create enum types op.execute(""" CREATE TYPE message_type AS ENUM ( 'request', 'response', 'notification', 'broadcast', 'stream' ) """) op.execute(""" CREATE TYPE priority_level AS ENUM ('urgent', 'high', 'normal', 'low') """) op.execute(""" CREATE TYPE message_status AS ENUM ( 'pending', 'delivered', 'read', 'responded', 'expired', 'failed' ) """) op.execute(""" CREATE TYPE conversation_status AS ENUM ('active', 'resolved', 'archived') """) # Create tables op.create_table( 'conversations', sa.Column('id', UUID, primary_key=True), sa.Column('project_id', UUID, sa.ForeignKey('projects.id'), nullable=False), sa.Column('issue_id', UUID, sa.ForeignKey('issues.id'), nullable=True), sa.Column('sprint_id', UUID, sa.ForeignKey('sprints.id'), nullable=True), sa.Column('participant_ids', JSONB, default=[]), sa.Column('topic', sa.String(200), nullable=True), sa.Column('status', sa.Enum('active', 'resolved', 'archived', name='conversation_status'), default='active'), sa.Column('created_at', sa.DateTime, default=datetime.utcnow), sa.Column('updated_at', sa.DateTime, onupdate=datetime.utcnow), sa.Column('resolved_at', sa.DateTime, nullable=True), sa.Column('summary', sa.Text, nullable=True), ) op.create_table( 'agent_messages', # ... columns as defined above ) op.create_table( 'message_deliveries', # ... columns as defined above ) ``` --- ## Code Examples ### Message Router Service ```python # app/services/message_router.py from typing import Optional, AsyncIterator from uuid import UUID import asyncio import hashlib import json from app.models.agent_message import AgentMessage as AgentMessageModel from app.schemas.messages import AgentMessage, AgentIdentifier from app.services.events import EventBus from app.db.session import AsyncSession from app.core.celery_app import celery_app class MessageRouter: """Routes messages between agents.""" def __init__( self, db: AsyncSession, event_bus: EventBus, orchestrator: "AgentOrchestrator" ): self.db = db self.event_bus = event_bus self.orchestrator = orchestrator async def send(self, message: AgentMessage) -> AgentMessageModel: """Send a message to a specific agent.""" # Validate recipient exists and is active recipient = await self.orchestrator.get_instance(message.recipient.agent_id) if not recipient or recipient.status != "active": raise AgentNotAvailableError(f"Agent {message.recipient.agent_id} not available") # Persist message db_message = await self._persist_message(message) # Publish to recipient's channel await self.event_bus.publish( f"agent:{message.recipient.agent_id}", { "type": "message_received", "message_id": str(db_message.id), "sender": message.sender.dict(), "action": message.action, "priority": message.metadata.priority, "preview": message.content[:100] } ) # Also publish to project channel for monitoring await self.event_bus.publish( f"project:{message.context.project_id}", { "type": "agent_message", "from": message.sender.name, "to": message.recipient.name, "action": message.action } ) return db_message async def send_and_wait( self, message: AgentMessage, timeout: int = 300 ) -> Optional[AgentMessage]: """Send a message and wait for response.""" db_message = await self.send(message) # Subscribe to response channel response_channel = f"response:{db_message.id}" subscriber = await self.event_bus.subscribe(response_channel) try: response_event = await asyncio.wait_for( subscriber.get_event(), timeout=timeout ) response_id = response_event.data["response_id"] return await self.get_message(response_id) except asyncio.TimeoutError: # Update message status to expired await self._mark_expired(db_message.id) return None finally: await subscriber.unsubscribe() async def send_to_role(self, message: AgentMessage) -> list[AgentMessageModel]: """Send message to all agents of a specific role.""" # Get all active agents with the target role recipients = await self.orchestrator.get_instances_by_role( project_id=message.context.project_id, role=message.recipient_role ) messages = [] for recipient in recipients: msg = message.copy() msg.recipient = AgentIdentifier( agent_id=recipient.id, role=recipient.agent_type.role, name=recipient.name ) db_message = await self.send(msg) messages.append(db_message) return messages async def broadcast(self, message: AgentMessage) -> AgentMessageModel: """Broadcast message to all project agents.""" # Persist the broadcast message db_message = await self._persist_message(message) # Get all active agents in project agents = await self.orchestrator.get_project_agents( message.context.project_id ) # Create delivery records for agent in agents: if agent.id != message.sender.agent_id: await self._create_delivery_record(db_message.id, agent.id) # Publish to project channel await self.event_bus.publish( f"project:{message.context.project_id}", { "type": "broadcast", "message_id": str(db_message.id), "sender": message.sender.dict(), "action": message.action, "content": message.content } ) return db_message async def delegate_task(self, message: AgentMessage) -> str: """Delegate a task for async execution via Celery.""" # Persist message db_message = await self._persist_message(message) # Create Celery task task = celery_app.send_task( "app.tasks.agent.execute_delegated_task", args=[str(db_message.id), str(message.recipient.agent_id)], queue="agent_tasks", priority=self._priority_to_int(message.metadata.priority) ) return task.id async def respond( self, original_message_id: UUID, response: AgentMessage ) -> AgentMessageModel: """Send a response to a previous message.""" original = await self.get_message(original_message_id) response.in_response_to = original_message_id response.context = original.context db_response = await self.send(response) # Update original message status await self._mark_responded(original_message_id) # Publish response notification await self.event_bus.publish( f"response:{original_message_id}", { "type": "response_received", "response_id": str(db_response.id) } ) return db_response async def get_conversation_history( self, conversation_id: UUID, limit: int = 50, before: Optional[datetime] = None ) -> list[AgentMessageModel]: """Get messages in a conversation.""" query = select(AgentMessageModel).where( AgentMessageModel.conversation_id == conversation_id ) if before: query = query.where(AgentMessageModel.created_at < before) query = query.order_by(AgentMessageModel.created_at.desc()).limit(limit) result = await self.db.execute(query) return list(reversed(result.scalars().all())) async def _persist_message(self, message: AgentMessage) -> AgentMessageModel: """Persist message to database.""" content_hash = hashlib.sha256(message.content.encode()).hexdigest() db_message = AgentMessageModel( id=message.id, type=message.type, sender_agent_id=message.sender.agent_id, sender_role=message.sender.role, sender_name=message.sender.name, recipient_agent_id=message.recipient.agent_id if message.recipient else None, recipient_role=message.recipient_role, is_broadcast=message.recipient_all, action=message.action, content=message.content, content_hash=content_hash, attachments=[a.dict() for a in message.attachments], project_id=message.context.project_id, conversation_id=message.context.conversation_id, issue_id=message.context.issue_id, sprint_id=message.context.sprint_id, parent_message_id=message.context.parent_message_id, in_response_to_id=message.in_response_to, priority=message.metadata.priority, requires_response=message.metadata.requires_response, response_timeout_seconds=message.metadata.response_timeout_seconds, expires_at=message.metadata.expires_at, ) self.db.add(db_message) await self.db.commit() await self.db.refresh(db_message) return db_message def _priority_to_int(self, priority: str) -> int: """Convert priority to Celery priority (0=highest).""" return {"urgent": 0, "high": 3, "normal": 6, "low": 9}.get(priority, 6) ``` ### Receiving and Processing Messages ```python # app/services/agent_inbox.py from typing import Optional, Callable, Awaitable class AgentInbox: """Manages incoming messages for an agent.""" def __init__( self, agent_id: UUID, event_bus: EventBus, message_router: MessageRouter ): self.agent_id = agent_id self.event_bus = event_bus self.message_router = message_router self._handlers: dict[str, Callable] = {} def on_action( self, action: str, handler: Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]] ): """Register a handler for a specific action type.""" self._handlers[action] = handler async def start_listening(self): """Start listening for incoming messages.""" channel = f"agent:{self.agent_id}" subscriber = await self.event_bus.subscribe(channel) while True: event = await subscriber.get_event() if event.type == "message_received": message = await self.message_router.get_message( event.data["message_id"] ) await self._process_message(message) async def _process_message(self, message: AgentMessage): """Process an incoming message.""" # Mark as read await self.message_router.mark_read(message.id) # Find handler handler = self._handlers.get(message.action) if not handler: handler = self._handlers.get("default") if handler: response = await handler(message) if response and message.requires_response: await self.message_router.respond(message.id, response) # Usage in agent runner class AgentRunner: async def setup_message_handlers(self): """Setup handlers for different message types.""" @self.inbox.on_action("request_review") async def handle_review_request(message: AgentMessage) -> AgentMessage: # Execute code review review_result = await self.execute("review_code", { "content": message.content, "attachments": message.attachments }) return AgentMessage( type="response", sender=self.identifier, recipient=message.sender, action="provide_review", content=review_result["content"], attachments=review_result.get("attachments", []) ) @self.inbox.on_action("delegate_task") async def handle_task_delegation(message: AgentMessage) -> AgentMessage: # Accept and start working on task await self.update_status("working") # Execute task result = await self.execute("implement", { "task_description": message.content, "issue_id": message.context.issue_id }) return AgentMessage( type="response", sender=self.identifier, action="task_completed", content=f"Task completed. {result['summary']}" ) ``` ### API Endpoints ```python # app/api/v1/messages.py from fastapi import APIRouter, Depends, HTTPException from uuid import UUID from app.schemas.messages import AgentMessage, MessageCreate, MessageResponse from app.services.message_router import MessageRouter from app.api.deps import get_message_router, get_current_user router = APIRouter(prefix="/messages", tags=["messages"]) @router.post("/send", response_model=MessageResponse) async def send_message( message_in: MessageCreate, router: MessageRouter = Depends(get_message_router), current_user: User = Depends(get_current_user) ): """Send a message between agents (admin/system use).""" message = AgentMessage(**message_in.dict()) result = await router.send(message) return MessageResponse.from_orm(result) @router.post("/broadcast", response_model=MessageResponse) async def broadcast_message( message_in: MessageCreate, router: MessageRouter = Depends(get_message_router), current_user: User = Depends(get_current_user) ): """Broadcast a message to all project agents.""" message = AgentMessage(**message_in.dict(), recipient_all=True) result = await router.broadcast(message) return MessageResponse.from_orm(result) @router.get("/conversation/{conversation_id}", response_model=list[MessageResponse]) async def get_conversation( conversation_id: UUID, limit: int = 50, router: MessageRouter = Depends(get_message_router), current_user: User = Depends(get_current_user) ): """Get messages in a conversation.""" messages = await router.get_conversation_history(conversation_id, limit=limit) return [MessageResponse.from_orm(m) for m in messages] @router.get("/agent/{agent_id}/inbox", response_model=list[MessageResponse]) async def get_agent_inbox( agent_id: UUID, status: Optional[str] = None, limit: int = 50, router: MessageRouter = Depends(get_message_router), current_user: User = Depends(get_current_user) ): """Get messages in an agent's inbox.""" messages = await router.get_agent_messages(agent_id, status=status, limit=limit) return [MessageResponse.from_orm(m) for m in messages] ``` --- ## SSE Integration ### Real-time Message Events Messages integrate with the existing SSE event system (ADR-002): ```python # Extended event types for messages class EventType(str, Enum): # Existing events... # Message Events MESSAGE_SENT = "message_sent" MESSAGE_RECEIVED = "message_received" MESSAGE_READ = "message_read" MESSAGE_RESPONDED = "message_responded" # Conversation Events CONVERSATION_STARTED = "conversation_started" CONVERSATION_RESOLVED = "conversation_resolved" # Conflict Events CONFLICT_DETECTED = "conflict_detected" CONFLICT_ESCALATED = "conflict_escalated" CONFLICT_RESOLVED = "conflict_resolved" ``` ### Message Event Stream ```python # app/api/v1/events.py (extended) @router.get("/projects/{project_id}/messages/events") async def message_events( project_id: UUID, request: Request, current_user: User = Depends(get_current_user) ): """Stream message events for a project.""" async def event_generator(): subscriber = await event_bus.subscribe(f"project:{project_id}:messages") try: while not await request.is_disconnected(): try: event = await asyncio.wait_for( subscriber.get_event(), timeout=30.0 ) # Filter by event type if event.type in [ "message_sent", "message_received", "conversation_started", "conflict_detected" ]: yield f"event: {event.type}\ndata: {json.dumps(event.data)}\n\n" except asyncio.TimeoutError: yield ": keepalive\n\n" finally: await subscriber.unsubscribe() return StreamingResponse( event_generator(), media_type="text/event-stream" ) ``` ### Frontend Integration ```typescript // frontend/lib/messageEvents.ts import { useEffect, useCallback } from 'react'; interface MessageEvent { type: 'message_sent' | 'message_received' | 'conversation_started'; data: { message_id?: string; from?: AgentIdentifier; to?: AgentIdentifier; action?: string; preview?: string; }; } export function useMessageEvents( projectId: string, onMessage: (event: MessageEvent) => void ) { useEffect(() => { const eventSource = new EventSource( `/api/v1/projects/${projectId}/messages/events`, { withCredentials: true } ); eventSource.addEventListener('message_sent', (e) => { onMessage({ type: 'message_sent', data: JSON.parse(e.data) }); }); eventSource.addEventListener('message_received', (e) => { onMessage({ type: 'message_received', data: JSON.parse(e.data) }); }); eventSource.addEventListener('conversation_started', (e) => { onMessage({ type: 'conversation_started', data: JSON.parse(e.data) }); }); eventSource.onerror = () => { // Reconnection handled by EventSource console.log('Message SSE reconnecting...'); }; return () => eventSource.close(); }, [projectId, onMessage]); } ``` --- ## Syndarix-Specific Requirements ### @Mentions Support ```python class MentionParser: """Parse @mentions in message content.""" MENTION_PATTERN = r'@(\w+)' ROLE_PATTERN = r'@(engineers?|architects?|qa|po|pm|devops)' def parse(self, content: str) -> list[Mention]: mentions = [] # Parse agent mentions for match in re.finditer(self.MENTION_PATTERN, content): name = match.group(1) mentions.append(Mention(type="agent", name=name, position=match.start())) # Parse role mentions for match in re.finditer(self.ROLE_PATTERN, content, re.IGNORECASE): role = match.group(1) mentions.append(Mention(type="role", role=role, position=match.start())) return mentions async def resolve_mentions( self, mentions: list[Mention], project_id: UUID, orchestrator: AgentOrchestrator ) -> list[UUID]: """Resolve mentions to agent IDs.""" agent_ids = set() for mention in mentions: if mention.type == "agent": agent = await orchestrator.get_by_name(project_id, mention.name) if agent: agent_ids.add(agent.id) elif mention.type == "role": agents = await orchestrator.get_instances_by_role( project_id, mention.role ) agent_ids.update(a.id for a in agents) return list(agent_ids) ``` ### Priority Message Handling ```python class PriorityMessageHandler: """Handle urgent messages with priority.""" PRIORITY_INTERRUPTS = {"urgent", "high"} async def process_with_priority( self, agent_id: UUID, message: AgentMessage ): """Process message based on priority.""" if message.metadata.priority in self.PRIORITY_INTERRUPTS: # Interrupt current task await self.orchestrator.interrupt_agent(agent_id) # Publish urgent notification await self.event_bus.publish( f"project:{message.context.project_id}", { "type": "urgent_message", "agent_id": str(agent_id), "message_id": str(message.id), "from": message.sender.name, "preview": message.content[:100] } ) # Process immediately await self.process_immediately(agent_id, message) else: # Queue for normal processing await self.queue_for_processing(agent_id, message) ``` ### Communication Traceability ```python class CommunicationTracer: """Trace all inter-agent communication.""" async def trace_message_flow( self, conversation_id: UUID ) -> MessageFlowTrace: """Generate a trace of message flow in a conversation.""" messages = await self.get_conversation_messages(conversation_id) nodes = [] edges = [] for msg in messages: nodes.append({ "id": str(msg.id), "agent": msg.sender_name, "role": msg.sender_role, "action": msg.action, "timestamp": msg.created_at.isoformat() }) if msg.in_response_to_id: edges.append({ "from": str(msg.in_response_to_id), "to": str(msg.id), "type": "response" }) if msg.parent_message_id: edges.append({ "from": str(msg.parent_message_id), "to": str(msg.id), "type": "thread" }) return MessageFlowTrace( conversation_id=conversation_id, nodes=nodes, edges=edges, summary=await self.generate_summary(messages) ) ``` --- ## Performance Considerations ### Message Throughput | Scenario | Target | Implementation | |----------|--------|----------------| | Direct messages | 100/sec | Redis Pub/Sub | | Broadcasts | 10/sec | Batched delivery | | Task delegation | 50/sec | Celery queue | | Message persistence | 200/sec | Batch inserts | ### Scalability ```python # Message batching for high-throughput scenarios class MessageBatcher: def __init__(self, batch_size: int = 100, flush_interval: float = 0.1): self.batch_size = batch_size self.flush_interval = flush_interval self._buffer: list[AgentMessage] = [] self._lock = asyncio.Lock() async def add(self, message: AgentMessage): async with self._lock: self._buffer.append(message) if len(self._buffer) >= self.batch_size: await self._flush() async def _flush(self): if not self._buffer: return messages = self._buffer self._buffer = [] # Batch insert await self.db.execute( insert(AgentMessageModel), [self._to_dict(m) for m in messages] ) await self.db.commit() ``` ### Memory Management - Conversation history limited to last 50 messages in-memory - Older messages retrieved on-demand from database - Redis TTL of 1 hour for active conversation cache - Periodic archival of resolved conversations --- ## References ### Industry Protocols Researched - [Google A2A Protocol](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/) - Agent-to-Agent Protocol - [IBM ACP](https://arxiv.org/html/2505.02279v1) - Agent Communication Protocol - [Anthropic MCP](https://spec.modelcontextprotocol.io/) - Model Context Protocol - [Linux Foundation A2A Project](https://www.linuxfoundation.org/press/linux-foundation-launches-the-agent2agent-protocol-project-to-enable-secure-intelligent-communication-between-ai-agents) ### Multi-Agent System Research - [Multi-Agent Collaboration Mechanisms Survey](https://arxiv.org/html/2501.06322v1) - [Conflict Resolution in Multi-Agent Systems](https://zilliz.com/ai-faq/how-do-multiagent-systems-manage-conflict-resolution) - [Agent Memory Management](https://www.letta.com/blog/agent-memory) - [Context Engineering for LLM Agents](https://developers.googleblog.com/architecting-efficient-context-aware-multi-agent-framework-for-production/) ### Syndarix Architecture - [ADR-002: Real-time Communication](../adrs/ADR-002-realtime-communication.md) - [ADR-006: Agent Orchestration](../adrs/ADR-006-agent-orchestration.md) - [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md) --- ## Decision **Adopt a hybrid message-based communication protocol** with: 1. **Structured JSON messages** with natural language content 2. **Three routing strategies**: Direct, Role-based, Broadcast 3. **Redis Pub/Sub** for real-time delivery 4. **PostgreSQL** for message persistence and audit 5. **Celery** for async task delegation 6. **SSE** for client notifications 7. **Three-tier context management** for conversation continuity 8. **Hierarchical conflict resolution** with human escalation This protocol integrates seamlessly with existing Syndarix infrastructure while following industry best practices from A2A and ACP protocols. --- *Spike completed. Findings will inform implementation of inter-agent communication in the Agent Orchestration layer.*