Files
syndarix/docs/spikes/SPIKE-007-agent-communication-protocol.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

51 KiB

SPIKE-007: Agent Communication Protocol

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #7


Executive Summary

This spike researches inter-agent communication protocols for Syndarix, where 10+ specialized AI agents need to collaborate on software projects. After analyzing industry standards (A2A, ACP, MCP) and multi-agent system patterns, we recommend a structured message-based protocol built on the existing Redis Pub/Sub event bus.

Recommendation

Adopt a hybrid communication model combining:

  1. Structured JSON-RPC messages for request-response patterns
  2. Redis Pub/Sub channels for broadcasts and topic-based routing
  3. Database-backed message persistence for auditability and context recovery
  4. Priority queues via Celery for async task delegation

This approach aligns with Google's A2A protocol principles while leveraging Syndarix's existing infrastructure (Redis, Celery, SSE events).


Research Questions

1. What communication patterns work for AI multi-agent systems?

Industry research identifies four primary patterns:

Pattern Use Case Syndarix Application
Request-Response Direct task delegation Engineer asks Architect for guidance
Publish-Subscribe Broadcasts, notifications PO announces sprint goals to all agents
Task Queue Async work delegation PO assigns issues to Engineers
Streaming Long-running updates Agent streaming progress to observers

Key Insight: Syndarix needs all four patterns. A2A protocol supports request-response and streaming; Celery handles task queues; Redis Pub/Sub provides pub-sub.

2. Structured message formats vs natural language between agents?

Recommendation: Structured messages with natural language payload.

# Structured envelope with natural language content
{
    "id": "msg-uuid-123",
    "type": "request",
    "from": {"agent_id": "eng-001", "role": "Engineer", "name": "Dave"},
    "to": {"agent_id": "arch-001", "role": "Architect", "name": "Alex"},
    "action": "request_guidance",
    "priority": "normal",
    "context": {
        "project_id": "proj-123",
        "issue_id": "issue-456",
        "conversation_id": "conv-789"
    },
    "content": "I'm implementing the auth module. Should I use JWT with refresh tokens or session-based auth?",
    "metadata": {
        "created_at": "2025-12-29T10:00:00Z",
        "expires_at": "2025-12-29T11:00:00Z",
        "requires_response": true
    }
}

Rationale:

  • Structured envelope enables routing, filtering, and auditing
  • Natural language content preserves LLM reasoning capabilities
  • Matches A2A/ACP hybrid approach (structured headers, flexible payload)
  • Enables agent-to-agent understanding without rigid schemas

3. How to handle async vs sync communication?

Pattern Mapping:

Scenario Timing Implementation
Quick clarification Sync-like (< 30s) Request-response with timeout
Code review request Async (minutes-hours) Task queue + callback
Broadcast announcement Fire-and-forget Pub/Sub
Long-running analysis Streaming SSE with progress events

Implementation Strategy:

class MessageMode(str, Enum):
    SYNC = "sync"        # Await response (with timeout)
    ASYNC = "async"      # Queue for later, callback on completion
    FIRE_AND_FORGET = "fire_and_forget"  # No response expected
    STREAM = "stream"    # Continuous updates

4. Message routing strategies?

Recommended: Hierarchical routing with three strategies.

┌─────────────────────────────────────────────────────────────────┐
│                      Message Router                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │   DIRECT    │    │  ROLE-BASED │    │  BROADCAST  │         │
│  │   Routing   │    │   Routing   │    │   Routing   │         │
│  │             │    │             │    │             │         │
│  │  to: agent  │    │ to: @role   │    │  to: @all   │         │
│  │  "arch-001" │    │ "@engineers"│    │  "@project" │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘
Strategy Syntax Use Case
Direct to: "agent-123" Specific agent communication
Role-based to: "@engineers" All agents of a role
Broadcast to: "@all" Project-wide announcements
Topic-based to: "#auth-module" Agents subscribed to topic

5. How to maintain conversation context across agent interactions?

Three-tier context management:

┌─────────────────────────────────────────────────────────────────┐
│                    Context Hierarchy                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  CONVERSATION CONTEXT (Short-term)                        │  │
│  │  - Current message thread                                 │  │
│  │  - Last N exchanges between agents                        │  │
│  │  - Active topic/issue context                             │  │
│  │  Storage: In-memory + Redis (TTL: 1 hour)                 │  │
│  └───────────────────────────────────────────────────────────┘  │
│                           │                                      │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  SESSION CONTEXT (Medium-term)                            │  │
│  │  - Sprint goals and decisions                             │  │
│  │  - Shared agreements between agents                       │  │
│  │  - Recent artifacts and references                        │  │
│  │  Storage: PostgreSQL AgentMessage table                   │  │
│  └───────────────────────────────────────────────────────────┘  │
│                           │                                      │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  PROJECT CONTEXT (Long-term)                              │  │
│  │  - Architecture decisions (ADRs)                          │  │
│  │  - Requirements and constraints                           │  │
│  │  - Knowledge base documents                               │  │
│  │  Storage: PostgreSQL + pgvector (RAG)                     │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Context injection pattern:

async def prepare_agent_context(
    agent_id: str,
    conversation_id: str,
    project_id: str
) -> list[dict]:
    """Build context for LLM call."""
    context = []

    # 1. Conversation context (recent exchanges)
    recent_messages = await get_conversation_history(
        conversation_id, limit=10
    )
    context.extend(format_as_messages(recent_messages))

    # 2. Session context (relevant decisions)
    session_context = await get_session_context(agent_id, project_id)
    if session_context:
        context.append({
            "role": "system",
            "content": f"Session context:\n{session_context}"
        })

    # 3. Project context (RAG retrieval)
    query = extract_query_from_conversation(recent_messages)
    rag_results = await search_knowledge_base(project_id, query)
    if rag_results:
        context.append({
            "role": "system",
            "content": f"Relevant project context:\n{rag_results}"
        })

    return context

6. Conflict resolution when agents disagree?

Hierarchical escalation model:

┌─────────────────────────────────────────────────────────────────┐
│                  Conflict Resolution Hierarchy                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Level 1: Direct Negotiation                                    │
│  ├── Agents exchange arguments (max 3 rounds)                   │
│  ├── If consensus → Resolution recorded                         │
│  └── If no consensus → Escalate to Level 2                      │
│                                                                  │
│  Level 2: Expert Arbitration                                    │
│  ├── Relevant expert agent weighs in (Architect for tech)      │
│  ├── Expert makes binding recommendation                        │
│  └── If expert unavailable → Escalate to Level 3                │
│                                                                  │
│  Level 3: Human Decision                                        │
│  ├── Conflict summarized and sent to human client              │
│  ├── Approval event triggers resolution                         │
│  └── Decision recorded for future reference                     │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Implementation:

class ConflictResolution(str, Enum):
    NEGOTIATION = "negotiation"
    EXPERT_ARBITRATION = "expert_arbitration"
    HUMAN_DECISION = "human_decision"

class ConflictTracker:
    async def record_conflict(
        self,
        participants: list[str],
        topic: str,
        positions: dict[str, str]
    ) -> Conflict:
        """Record a conflict between agents."""
        return await self.db.create(Conflict(
            participants=participants,
            topic=topic,
            positions=positions,
            status=ConflictStatus.OPEN,
            resolution_level=ConflictResolution.NEGOTIATION,
            negotiation_rounds=0
        ))

    async def escalate(self, conflict_id: str) -> Conflict:
        """Escalate conflict to next resolution level."""
        conflict = await self.get(conflict_id)

        if conflict.resolution_level == ConflictResolution.NEGOTIATION:
            # Find relevant expert
            expert = await self.find_expert_for_topic(conflict.topic)
            conflict.arbitrator_id = expert.id
            conflict.resolution_level = ConflictResolution.EXPERT_ARBITRATION

        elif conflict.resolution_level == ConflictResolution.EXPERT_ARBITRATION:
            # Create human approval request
            await self.create_approval_request(conflict)
            conflict.resolution_level = ConflictResolution.HUMAN_DECISION

        return await self.db.update(conflict)

7. How to audit/log inter-agent communication?

Comprehensive audit trail:

class MessageAuditLog:
    """All agent messages are persisted for audit."""

    id = Column(UUID, primary_key=True)
    message_id = Column(UUID, index=True)

    # Routing info
    from_agent_id = Column(UUID, ForeignKey("agent_instances.id"))
    to_agent_id = Column(UUID, nullable=True)  # NULL for broadcasts
    to_role = Column(String, nullable=True)

    # Message details
    message_type = Column(Enum(MessageType))
    action = Column(String(50))
    content_hash = Column(String(64))  # SHA-256 of content
    content = Column(Text)  # Full content (encrypted at rest)

    # Context
    project_id = Column(UUID, ForeignKey("projects.id"), index=True)
    conversation_id = Column(UUID, index=True)
    parent_message_id = Column(UUID, nullable=True)

    # Response tracking
    response_to_id = Column(UUID, nullable=True)
    response_received_at = Column(DateTime, nullable=True)

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow)
    delivered_at = Column(DateTime, nullable=True)
    read_at = Column(DateTime, nullable=True)

Message Format Specification

Core Message Schema

from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
from uuid import UUID

class AgentIdentifier(BaseModel):
    """Identifies an agent in communication."""
    agent_id: UUID
    role: str
    name: str

class MessageContext(BaseModel):
    """Contextual information for message routing."""
    project_id: UUID
    conversation_id: UUID
    issue_id: Optional[UUID] = None
    sprint_id: Optional[UUID] = None
    parent_message_id: Optional[UUID] = None

class MessageMetadata(BaseModel):
    """Message metadata for processing."""
    created_at: datetime = Field(default_factory=datetime.utcnow)
    expires_at: Optional[datetime] = None
    priority: Literal["urgent", "high", "normal", "low"] = "normal"
    requires_response: bool = False
    response_timeout_seconds: Optional[int] = None
    retry_count: int = 0
    max_retries: int = 3

class AgentMessage(BaseModel):
    """Primary message format for inter-agent communication."""

    # Identity
    id: UUID = Field(default_factory=uuid4)
    type: Literal["request", "response", "notification", "broadcast", "stream"]

    # Routing
    sender: AgentIdentifier
    recipient: Optional[AgentIdentifier] = None  # None for broadcasts
    recipient_role: Optional[str] = None  # For role-based routing
    recipient_all: bool = False  # For project-wide broadcast

    # Action
    action: str  # e.g., "request_review", "report_bug", "share_finding"

    # Content
    content: str  # Natural language message
    attachments: list[Attachment] = []  # Files, code snippets, etc.

    # Context
    context: MessageContext

    # Metadata
    metadata: MessageMetadata = Field(default_factory=MessageMetadata)

    # Response linking
    in_response_to: Optional[UUID] = None

class Attachment(BaseModel):
    """Attachments to messages."""
    type: Literal["code", "file", "image", "document", "reference"]
    name: str
    content: Optional[str] = None  # For inline content
    url: Optional[str] = None  # For file references
    mime_type: Optional[str] = None

Action Types

class MessageAction(str, Enum):
    # Request actions
    REQUEST_REVIEW = "request_review"
    REQUEST_GUIDANCE = "request_guidance"
    REQUEST_CLARIFICATION = "request_clarification"
    REQUEST_APPROVAL = "request_approval"
    DELEGATE_TASK = "delegate_task"

    # Response actions
    PROVIDE_REVIEW = "provide_review"
    PROVIDE_GUIDANCE = "provide_guidance"
    PROVIDE_CLARIFICATION = "provide_clarification"
    APPROVE = "approve"
    REJECT = "reject"

    # Notification actions
    REPORT_BUG = "report_bug"
    REPORT_PROGRESS = "report_progress"
    REPORT_BLOCKER = "report_blocker"
    SHARE_FINDING = "share_finding"
    ANNOUNCE_DECISION = "announce_decision"

    # Collaboration actions
    PROPOSE_APPROACH = "propose_approach"
    CHALLENGE_APPROACH = "challenge_approach"
    AGREE_WITH = "agree_with"
    DISAGREE_WITH = "disagree_with"

Communication Patterns

Pattern 1: Request-Response

Use Case: Engineer asks Architect for design guidance.

# Engineer sends request
request = AgentMessage(
    type="request",
    sender=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
    recipient=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
    action="request_guidance",
    content="I need to implement caching for the user service. Should I use Redis with write-through or write-behind strategy?",
    context=MessageContext(
        project_id=project_id,
        conversation_id=conv_id,
        issue_id=issue_id
    ),
    metadata=MessageMetadata(
        priority="normal",
        requires_response=True,
        response_timeout_seconds=300
    )
)

# Send and await response
response = await message_router.send_and_wait(request, timeout=300)

# Architect's response
response = AgentMessage(
    type="response",
    sender=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
    recipient=request.sender,
    action="provide_guidance",
    content="Given our consistency requirements, use write-through caching with Redis. Here's the pattern...",
    in_response_to=request.id,
    context=request.context,
    attachments=[
        Attachment(type="code", name="cache_pattern.py", content="...")
    ]
)

Pattern 2: Broadcast

Use Case: Product Owner announces sprint goals.

broadcast = AgentMessage(
    type="broadcast",
    sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
    recipient_all=True,
    action="announce_decision",
    content="Sprint 3 goals: 1) Complete auth module 2) Add user settings page 3) Fix critical bugs from QA",
    context=MessageContext(
        project_id=project_id,
        conversation_id=conv_id,
        sprint_id=sprint_id
    ),
    metadata=MessageMetadata(
        priority="high",
        requires_response=False
    )
)

await message_router.broadcast(broadcast)

Pattern 3: Role-Based Routing

Use Case: QA reports a bug to all Engineers.

bug_report = AgentMessage(
    type="notification",
    sender=AgentIdentifier(agent_id=qa_id, role="QA", name="Quinn"),
    recipient_role="Engineer",  # All engineers receive this
    action="report_bug",
    content="Found a critical bug in the auth flow: users can bypass 2FA by...",
    context=MessageContext(
        project_id=project_id,
        conversation_id=conv_id,
        issue_id=bug_issue_id
    ),
    metadata=MessageMetadata(
        priority="urgent",
        requires_response=True
    )
)

await message_router.send_to_role(bug_report)

Pattern 4: Task Delegation

Use Case: Product Owner assigns work to Engineer.

delegation = AgentMessage(
    type="request",
    sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
    recipient=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
    action="delegate_task",
    content="Please implement issue #45: Add password reset functionality",
    context=MessageContext(
        project_id=project_id,
        conversation_id=conv_id,
        issue_id=issue_45_id
    ),
    metadata=MessageMetadata(
        priority="normal",
        requires_response=True
    )
)

# This creates a Celery task for async execution
await message_router.delegate_task(delegation)

Pattern 5: Streaming Updates

Use Case: Engineer shares progress during long task.

async def stream_progress(agent_id: str, task_id: str):
    """Stream progress updates during task execution."""

    for step in task_steps:
        # Execute step
        result = await execute_step(step)

        # Stream update
        update = AgentMessage(
            type="stream",
            sender=AgentIdentifier(agent_id=agent_id, ...),
            recipient_all=True,
            action="report_progress",
            content=f"Completed: {step.name}. Progress: {step.percent}%",
            context=MessageContext(project_id=project_id, ...),
        )

        await message_router.stream(update)

Database Schema

Message Storage

# app/models/agent_message.py
from sqlalchemy import Column, String, Text, DateTime, Enum, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.db.base import Base

class AgentMessage(Base):
    """Persistent storage for all agent messages."""

    __tablename__ = "agent_messages"

    # Primary key
    id = Column(UUID, primary_key=True, default=uuid4)

    # Message type
    type = Column(
        Enum("request", "response", "notification", "broadcast", "stream",
             name="message_type"),
        nullable=False
    )

    # Sender
    sender_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
    sender_role = Column(String(50), nullable=False)
    sender_name = Column(String(100), nullable=False)

    # Recipient (nullable for broadcasts)
    recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True)
    recipient_role = Column(String(50), nullable=True)  # For role-based routing
    is_broadcast = Column(Boolean, default=False)

    # Action
    action = Column(String(50), nullable=False)

    # Content
    content = Column(Text, nullable=False)
    content_hash = Column(String(64), nullable=False)  # SHA-256
    attachments = Column(JSONB, default=list)

    # Context
    project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
    conversation_id = Column(UUID, nullable=False)
    issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
    sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
    parent_message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)

    # Response linking
    in_response_to_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)

    # Metadata
    priority = Column(
        Enum("urgent", "high", "normal", "low", name="priority_level"),
        default="normal"
    )
    requires_response = Column(Boolean, default=False)
    response_timeout_seconds = Column(Integer, nullable=True)

    # Status tracking
    status = Column(
        Enum("pending", "delivered", "read", "responded", "expired", "failed",
             name="message_status"),
        default="pending"
    )

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    delivered_at = Column(DateTime, nullable=True)
    read_at = Column(DateTime, nullable=True)
    responded_at = Column(DateTime, nullable=True)
    expires_at = Column(DateTime, nullable=True)

    # Indexes
    __table_args__ = (
        Index("ix_agent_messages_project_conversation", "project_id", "conversation_id"),
        Index("ix_agent_messages_sender", "sender_agent_id", "created_at"),
        Index("ix_agent_messages_recipient", "recipient_agent_id", "status"),
        Index("ix_agent_messages_conversation", "conversation_id", "created_at"),
    )


class Conversation(Base):
    """Groups related messages into conversations."""

    __tablename__ = "conversations"

    id = Column(UUID, primary_key=True, default=uuid4)

    # Context
    project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
    issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
    sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)

    # Participants
    participant_ids = Column(JSONB, default=list)  # List of agent IDs

    # Topic
    topic = Column(String(200), nullable=True)

    # Status
    status = Column(
        Enum("active", "resolved", "archived", name="conversation_status"),
        default="active"
    )

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, onupdate=datetime.utcnow)
    resolved_at = Column(DateTime, nullable=True)

    # Summary (generated)
    summary = Column(Text, nullable=True)


class MessageDelivery(Base):
    """Tracks delivery status for broadcast messages."""

    __tablename__ = "message_deliveries"

    id = Column(UUID, primary_key=True, default=uuid4)
    message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=False)
    recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)

    # Status
    delivered_at = Column(DateTime, nullable=True)
    read_at = Column(DateTime, nullable=True)
    responded_at = Column(DateTime, nullable=True)

Alembic Migration

# migrations/versions/xxx_add_agent_messages.py
def upgrade():
    # Create enum types
    op.execute("""
        CREATE TYPE message_type AS ENUM (
            'request', 'response', 'notification', 'broadcast', 'stream'
        )
    """)
    op.execute("""
        CREATE TYPE priority_level AS ENUM ('urgent', 'high', 'normal', 'low')
    """)
    op.execute("""
        CREATE TYPE message_status AS ENUM (
            'pending', 'delivered', 'read', 'responded', 'expired', 'failed'
        )
    """)
    op.execute("""
        CREATE TYPE conversation_status AS ENUM ('active', 'resolved', 'archived')
    """)

    # Create tables
    op.create_table(
        'conversations',
        sa.Column('id', UUID, primary_key=True),
        sa.Column('project_id', UUID, sa.ForeignKey('projects.id'), nullable=False),
        sa.Column('issue_id', UUID, sa.ForeignKey('issues.id'), nullable=True),
        sa.Column('sprint_id', UUID, sa.ForeignKey('sprints.id'), nullable=True),
        sa.Column('participant_ids', JSONB, default=[]),
        sa.Column('topic', sa.String(200), nullable=True),
        sa.Column('status', sa.Enum('active', 'resolved', 'archived',
                  name='conversation_status'), default='active'),
        sa.Column('created_at', sa.DateTime, default=datetime.utcnow),
        sa.Column('updated_at', sa.DateTime, onupdate=datetime.utcnow),
        sa.Column('resolved_at', sa.DateTime, nullable=True),
        sa.Column('summary', sa.Text, nullable=True),
    )

    op.create_table(
        'agent_messages',
        # ... columns as defined above
    )

    op.create_table(
        'message_deliveries',
        # ... columns as defined above
    )

Code Examples

Message Router Service

# app/services/message_router.py
from typing import Optional, AsyncIterator
from uuid import UUID
import asyncio
import hashlib
import json

from app.models.agent_message import AgentMessage as AgentMessageModel
from app.schemas.messages import AgentMessage, AgentIdentifier
from app.services.events import EventBus
from app.db.session import AsyncSession
from app.core.celery_app import celery_app

class MessageRouter:
    """Routes messages between agents."""

    def __init__(
        self,
        db: AsyncSession,
        event_bus: EventBus,
        orchestrator: "AgentOrchestrator"
    ):
        self.db = db
        self.event_bus = event_bus
        self.orchestrator = orchestrator

    async def send(self, message: AgentMessage) -> AgentMessageModel:
        """Send a message to a specific agent."""
        # Validate recipient exists and is active
        recipient = await self.orchestrator.get_instance(message.recipient.agent_id)
        if not recipient or recipient.status != "active":
            raise AgentNotAvailableError(f"Agent {message.recipient.agent_id} not available")

        # Persist message
        db_message = await self._persist_message(message)

        # Publish to recipient's channel
        await self.event_bus.publish(
            f"agent:{message.recipient.agent_id}",
            {
                "type": "message_received",
                "message_id": str(db_message.id),
                "sender": message.sender.dict(),
                "action": message.action,
                "priority": message.metadata.priority,
                "preview": message.content[:100]
            }
        )

        # Also publish to project channel for monitoring
        await self.event_bus.publish(
            f"project:{message.context.project_id}",
            {
                "type": "agent_message",
                "from": message.sender.name,
                "to": message.recipient.name,
                "action": message.action
            }
        )

        return db_message

    async def send_and_wait(
        self,
        message: AgentMessage,
        timeout: int = 300
    ) -> Optional[AgentMessage]:
        """Send a message and wait for response."""
        db_message = await self.send(message)

        # Subscribe to response channel
        response_channel = f"response:{db_message.id}"
        subscriber = await self.event_bus.subscribe(response_channel)

        try:
            response_event = await asyncio.wait_for(
                subscriber.get_event(),
                timeout=timeout
            )
            response_id = response_event.data["response_id"]
            return await self.get_message(response_id)
        except asyncio.TimeoutError:
            # Update message status to expired
            await self._mark_expired(db_message.id)
            return None
        finally:
            await subscriber.unsubscribe()

    async def send_to_role(self, message: AgentMessage) -> list[AgentMessageModel]:
        """Send message to all agents of a specific role."""
        # Get all active agents with the target role
        recipients = await self.orchestrator.get_instances_by_role(
            project_id=message.context.project_id,
            role=message.recipient_role
        )

        messages = []
        for recipient in recipients:
            msg = message.copy()
            msg.recipient = AgentIdentifier(
                agent_id=recipient.id,
                role=recipient.agent_type.role,
                name=recipient.name
            )
            db_message = await self.send(msg)
            messages.append(db_message)

        return messages

    async def broadcast(self, message: AgentMessage) -> AgentMessageModel:
        """Broadcast message to all project agents."""
        # Persist the broadcast message
        db_message = await self._persist_message(message)

        # Get all active agents in project
        agents = await self.orchestrator.get_project_agents(
            message.context.project_id
        )

        # Create delivery records
        for agent in agents:
            if agent.id != message.sender.agent_id:
                await self._create_delivery_record(db_message.id, agent.id)

        # Publish to project channel
        await self.event_bus.publish(
            f"project:{message.context.project_id}",
            {
                "type": "broadcast",
                "message_id": str(db_message.id),
                "sender": message.sender.dict(),
                "action": message.action,
                "content": message.content
            }
        )

        return db_message

    async def delegate_task(self, message: AgentMessage) -> str:
        """Delegate a task for async execution via Celery."""
        # Persist message
        db_message = await self._persist_message(message)

        # Create Celery task
        task = celery_app.send_task(
            "app.tasks.agent.execute_delegated_task",
            args=[str(db_message.id), str(message.recipient.agent_id)],
            queue="agent_tasks",
            priority=self._priority_to_int(message.metadata.priority)
        )

        return task.id

    async def respond(
        self,
        original_message_id: UUID,
        response: AgentMessage
    ) -> AgentMessageModel:
        """Send a response to a previous message."""
        original = await self.get_message(original_message_id)

        response.in_response_to = original_message_id
        response.context = original.context

        db_response = await self.send(response)

        # Update original message status
        await self._mark_responded(original_message_id)

        # Publish response notification
        await self.event_bus.publish(
            f"response:{original_message_id}",
            {
                "type": "response_received",
                "response_id": str(db_response.id)
            }
        )

        return db_response

    async def get_conversation_history(
        self,
        conversation_id: UUID,
        limit: int = 50,
        before: Optional[datetime] = None
    ) -> list[AgentMessageModel]:
        """Get messages in a conversation."""
        query = select(AgentMessageModel).where(
            AgentMessageModel.conversation_id == conversation_id
        )

        if before:
            query = query.where(AgentMessageModel.created_at < before)

        query = query.order_by(AgentMessageModel.created_at.desc()).limit(limit)

        result = await self.db.execute(query)
        return list(reversed(result.scalars().all()))

    async def _persist_message(self, message: AgentMessage) -> AgentMessageModel:
        """Persist message to database."""
        content_hash = hashlib.sha256(message.content.encode()).hexdigest()

        db_message = AgentMessageModel(
            id=message.id,
            type=message.type,
            sender_agent_id=message.sender.agent_id,
            sender_role=message.sender.role,
            sender_name=message.sender.name,
            recipient_agent_id=message.recipient.agent_id if message.recipient else None,
            recipient_role=message.recipient_role,
            is_broadcast=message.recipient_all,
            action=message.action,
            content=message.content,
            content_hash=content_hash,
            attachments=[a.dict() for a in message.attachments],
            project_id=message.context.project_id,
            conversation_id=message.context.conversation_id,
            issue_id=message.context.issue_id,
            sprint_id=message.context.sprint_id,
            parent_message_id=message.context.parent_message_id,
            in_response_to_id=message.in_response_to,
            priority=message.metadata.priority,
            requires_response=message.metadata.requires_response,
            response_timeout_seconds=message.metadata.response_timeout_seconds,
            expires_at=message.metadata.expires_at,
        )

        self.db.add(db_message)
        await self.db.commit()
        await self.db.refresh(db_message)

        return db_message

    def _priority_to_int(self, priority: str) -> int:
        """Convert priority to Celery priority (0=highest)."""
        return {"urgent": 0, "high": 3, "normal": 6, "low": 9}.get(priority, 6)

Receiving and Processing Messages

# app/services/agent_inbox.py
from typing import Optional, Callable, Awaitable

class AgentInbox:
    """Manages incoming messages for an agent."""

    def __init__(
        self,
        agent_id: UUID,
        event_bus: EventBus,
        message_router: MessageRouter
    ):
        self.agent_id = agent_id
        self.event_bus = event_bus
        self.message_router = message_router
        self._handlers: dict[str, Callable] = {}

    def on_action(
        self,
        action: str,
        handler: Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]]
    ):
        """Register a handler for a specific action type."""
        self._handlers[action] = handler

    async def start_listening(self):
        """Start listening for incoming messages."""
        channel = f"agent:{self.agent_id}"
        subscriber = await self.event_bus.subscribe(channel)

        while True:
            event = await subscriber.get_event()

            if event.type == "message_received":
                message = await self.message_router.get_message(
                    event.data["message_id"]
                )
                await self._process_message(message)

    async def _process_message(self, message: AgentMessage):
        """Process an incoming message."""
        # Mark as read
        await self.message_router.mark_read(message.id)

        # Find handler
        handler = self._handlers.get(message.action)
        if not handler:
            handler = self._handlers.get("default")

        if handler:
            response = await handler(message)

            if response and message.requires_response:
                await self.message_router.respond(message.id, response)


# Usage in agent runner
class AgentRunner:
    async def setup_message_handlers(self):
        """Setup handlers for different message types."""

        @self.inbox.on_action("request_review")
        async def handle_review_request(message: AgentMessage) -> AgentMessage:
            # Execute code review
            review_result = await self.execute("review_code", {
                "content": message.content,
                "attachments": message.attachments
            })

            return AgentMessage(
                type="response",
                sender=self.identifier,
                recipient=message.sender,
                action="provide_review",
                content=review_result["content"],
                attachments=review_result.get("attachments", [])
            )

        @self.inbox.on_action("delegate_task")
        async def handle_task_delegation(message: AgentMessage) -> AgentMessage:
            # Accept and start working on task
            await self.update_status("working")

            # Execute task
            result = await self.execute("implement", {
                "task_description": message.content,
                "issue_id": message.context.issue_id
            })

            return AgentMessage(
                type="response",
                sender=self.identifier,
                action="task_completed",
                content=f"Task completed. {result['summary']}"
            )

API Endpoints

# app/api/v1/messages.py
from fastapi import APIRouter, Depends, HTTPException
from uuid import UUID

from app.schemas.messages import AgentMessage, MessageCreate, MessageResponse
from app.services.message_router import MessageRouter
from app.api.deps import get_message_router, get_current_user

router = APIRouter(prefix="/messages", tags=["messages"])

@router.post("/send", response_model=MessageResponse)
async def send_message(
    message_in: MessageCreate,
    router: MessageRouter = Depends(get_message_router),
    current_user: User = Depends(get_current_user)
):
    """Send a message between agents (admin/system use)."""
    message = AgentMessage(**message_in.dict())
    result = await router.send(message)
    return MessageResponse.from_orm(result)

@router.post("/broadcast", response_model=MessageResponse)
async def broadcast_message(
    message_in: MessageCreate,
    router: MessageRouter = Depends(get_message_router),
    current_user: User = Depends(get_current_user)
):
    """Broadcast a message to all project agents."""
    message = AgentMessage(**message_in.dict(), recipient_all=True)
    result = await router.broadcast(message)
    return MessageResponse.from_orm(result)

@router.get("/conversation/{conversation_id}", response_model=list[MessageResponse])
async def get_conversation(
    conversation_id: UUID,
    limit: int = 50,
    router: MessageRouter = Depends(get_message_router),
    current_user: User = Depends(get_current_user)
):
    """Get messages in a conversation."""
    messages = await router.get_conversation_history(conversation_id, limit=limit)
    return [MessageResponse.from_orm(m) for m in messages]

@router.get("/agent/{agent_id}/inbox", response_model=list[MessageResponse])
async def get_agent_inbox(
    agent_id: UUID,
    status: Optional[str] = None,
    limit: int = 50,
    router: MessageRouter = Depends(get_message_router),
    current_user: User = Depends(get_current_user)
):
    """Get messages in an agent's inbox."""
    messages = await router.get_agent_messages(agent_id, status=status, limit=limit)
    return [MessageResponse.from_orm(m) for m in messages]

SSE Integration

Real-time Message Events

Messages integrate with the existing SSE event system (ADR-002):

# Extended event types for messages
class EventType(str, Enum):
    # Existing events...

    # Message Events
    MESSAGE_SENT = "message_sent"
    MESSAGE_RECEIVED = "message_received"
    MESSAGE_READ = "message_read"
    MESSAGE_RESPONDED = "message_responded"

    # Conversation Events
    CONVERSATION_STARTED = "conversation_started"
    CONVERSATION_RESOLVED = "conversation_resolved"

    # Conflict Events
    CONFLICT_DETECTED = "conflict_detected"
    CONFLICT_ESCALATED = "conflict_escalated"
    CONFLICT_RESOLVED = "conflict_resolved"

Message Event Stream

# app/api/v1/events.py (extended)

@router.get("/projects/{project_id}/messages/events")
async def message_events(
    project_id: UUID,
    request: Request,
    current_user: User = Depends(get_current_user)
):
    """Stream message events for a project."""

    async def event_generator():
        subscriber = await event_bus.subscribe(f"project:{project_id}:messages")

        try:
            while not await request.is_disconnected():
                try:
                    event = await asyncio.wait_for(
                        subscriber.get_event(),
                        timeout=30.0
                    )

                    # Filter by event type
                    if event.type in [
                        "message_sent", "message_received",
                        "conversation_started", "conflict_detected"
                    ]:
                        yield f"event: {event.type}\ndata: {json.dumps(event.data)}\n\n"

                except asyncio.TimeoutError:
                    yield ": keepalive\n\n"
        finally:
            await subscriber.unsubscribe()

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Frontend Integration

// frontend/lib/messageEvents.ts
import { useEffect, useCallback } from 'react';

interface MessageEvent {
  type: 'message_sent' | 'message_received' | 'conversation_started';
  data: {
    message_id?: string;
    from?: AgentIdentifier;
    to?: AgentIdentifier;
    action?: string;
    preview?: string;
  };
}

export function useMessageEvents(
  projectId: string,
  onMessage: (event: MessageEvent) => void
) {
  useEffect(() => {
    const eventSource = new EventSource(
      `/api/v1/projects/${projectId}/messages/events`,
      { withCredentials: true }
    );

    eventSource.addEventListener('message_sent', (e) => {
      onMessage({ type: 'message_sent', data: JSON.parse(e.data) });
    });

    eventSource.addEventListener('message_received', (e) => {
      onMessage({ type: 'message_received', data: JSON.parse(e.data) });
    });

    eventSource.addEventListener('conversation_started', (e) => {
      onMessage({ type: 'conversation_started', data: JSON.parse(e.data) });
    });

    eventSource.onerror = () => {
      // Reconnection handled by EventSource
      console.log('Message SSE reconnecting...');
    };

    return () => eventSource.close();
  }, [projectId, onMessage]);
}

Syndarix-Specific Requirements

@Mentions Support

class MentionParser:
    """Parse @mentions in message content."""

    MENTION_PATTERN = r'@(\w+)'
    ROLE_PATTERN = r'@(engineers?|architects?|qa|po|pm|devops)'

    def parse(self, content: str) -> list[Mention]:
        mentions = []

        # Parse agent mentions
        for match in re.finditer(self.MENTION_PATTERN, content):
            name = match.group(1)
            mentions.append(Mention(type="agent", name=name, position=match.start()))

        # Parse role mentions
        for match in re.finditer(self.ROLE_PATTERN, content, re.IGNORECASE):
            role = match.group(1)
            mentions.append(Mention(type="role", role=role, position=match.start()))

        return mentions

    async def resolve_mentions(
        self,
        mentions: list[Mention],
        project_id: UUID,
        orchestrator: AgentOrchestrator
    ) -> list[UUID]:
        """Resolve mentions to agent IDs."""
        agent_ids = set()

        for mention in mentions:
            if mention.type == "agent":
                agent = await orchestrator.get_by_name(project_id, mention.name)
                if agent:
                    agent_ids.add(agent.id)
            elif mention.type == "role":
                agents = await orchestrator.get_instances_by_role(
                    project_id, mention.role
                )
                agent_ids.update(a.id for a in agents)

        return list(agent_ids)

Priority Message Handling

class PriorityMessageHandler:
    """Handle urgent messages with priority."""

    PRIORITY_INTERRUPTS = {"urgent", "high"}

    async def process_with_priority(
        self,
        agent_id: UUID,
        message: AgentMessage
    ):
        """Process message based on priority."""
        if message.metadata.priority in self.PRIORITY_INTERRUPTS:
            # Interrupt current task
            await self.orchestrator.interrupt_agent(agent_id)

            # Publish urgent notification
            await self.event_bus.publish(
                f"project:{message.context.project_id}",
                {
                    "type": "urgent_message",
                    "agent_id": str(agent_id),
                    "message_id": str(message.id),
                    "from": message.sender.name,
                    "preview": message.content[:100]
                }
            )

            # Process immediately
            await self.process_immediately(agent_id, message)
        else:
            # Queue for normal processing
            await self.queue_for_processing(agent_id, message)

Communication Traceability

class CommunicationTracer:
    """Trace all inter-agent communication."""

    async def trace_message_flow(
        self,
        conversation_id: UUID
    ) -> MessageFlowTrace:
        """Generate a trace of message flow in a conversation."""
        messages = await self.get_conversation_messages(conversation_id)

        nodes = []
        edges = []

        for msg in messages:
            nodes.append({
                "id": str(msg.id),
                "agent": msg.sender_name,
                "role": msg.sender_role,
                "action": msg.action,
                "timestamp": msg.created_at.isoformat()
            })

            if msg.in_response_to_id:
                edges.append({
                    "from": str(msg.in_response_to_id),
                    "to": str(msg.id),
                    "type": "response"
                })

            if msg.parent_message_id:
                edges.append({
                    "from": str(msg.parent_message_id),
                    "to": str(msg.id),
                    "type": "thread"
                })

        return MessageFlowTrace(
            conversation_id=conversation_id,
            nodes=nodes,
            edges=edges,
            summary=await self.generate_summary(messages)
        )

Performance Considerations

Message Throughput

Scenario Target Implementation
Direct messages 100/sec Redis Pub/Sub
Broadcasts 10/sec Batched delivery
Task delegation 50/sec Celery queue
Message persistence 200/sec Batch inserts

Scalability

# Message batching for high-throughput scenarios
class MessageBatcher:
    def __init__(self, batch_size: int = 100, flush_interval: float = 0.1):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._buffer: list[AgentMessage] = []
        self._lock = asyncio.Lock()

    async def add(self, message: AgentMessage):
        async with self._lock:
            self._buffer.append(message)

            if len(self._buffer) >= self.batch_size:
                await self._flush()

    async def _flush(self):
        if not self._buffer:
            return

        messages = self._buffer
        self._buffer = []

        # Batch insert
        await self.db.execute(
            insert(AgentMessageModel),
            [self._to_dict(m) for m in messages]
        )
        await self.db.commit()

Memory Management

  • Conversation history limited to last 50 messages in-memory
  • Older messages retrieved on-demand from database
  • Redis TTL of 1 hour for active conversation cache
  • Periodic archival of resolved conversations

References

Industry Protocols Researched

Multi-Agent System Research

Syndarix Architecture


Decision

Adopt a hybrid message-based communication protocol with:

  1. Structured JSON messages with natural language content
  2. Three routing strategies: Direct, Role-based, Broadcast
  3. Redis Pub/Sub for real-time delivery
  4. PostgreSQL for message persistence and audit
  5. Celery for async task delegation
  6. SSE for client notifications
  7. Three-tier context management for conversation continuity
  8. Hierarchical conflict resolution with human escalation

This protocol integrates seamlessly with existing Syndarix infrastructure while following industry best practices from A2A and ACP protocols.


Spike completed. Findings will inform implementation of inter-agent communication in the Agent Orchestration layer.