forked from cardosofelipe/fast-next-template
Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1497 lines
51 KiB
Markdown
1497 lines
51 KiB
Markdown
# SPIKE-007: Agent Communication Protocol
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #7
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
This spike researches inter-agent communication protocols for Syndarix, where 10+ specialized AI agents need to collaborate on software projects. After analyzing industry standards (A2A, ACP, MCP) and multi-agent system patterns, we recommend a **structured message-based protocol** built on the existing Redis Pub/Sub event bus.
|
|
|
|
### Recommendation
|
|
|
|
Adopt a **hybrid communication model** combining:
|
|
1. **Structured JSON-RPC messages** for request-response patterns
|
|
2. **Redis Pub/Sub channels** for broadcasts and topic-based routing
|
|
3. **Database-backed message persistence** for auditability and context recovery
|
|
4. **Priority queues via Celery** for async task delegation
|
|
|
|
This approach aligns with Google's A2A protocol principles while leveraging Syndarix's existing infrastructure (Redis, Celery, SSE events).
|
|
|
|
---
|
|
|
|
## Research Questions
|
|
|
|
### 1. What communication patterns work for AI multi-agent systems?
|
|
|
|
Industry research identifies four primary patterns:
|
|
|
|
| Pattern | Use Case | Syndarix Application |
|
|
|---------|----------|---------------------|
|
|
| **Request-Response** | Direct task delegation | Engineer asks Architect for guidance |
|
|
| **Publish-Subscribe** | Broadcasts, notifications | PO announces sprint goals to all agents |
|
|
| **Task Queue** | Async work delegation | PO assigns issues to Engineers |
|
|
| **Streaming** | Long-running updates | Agent streaming progress to observers |
|
|
|
|
**Key Insight:** Syndarix needs all four patterns. A2A protocol supports request-response and streaming; Celery handles task queues; Redis Pub/Sub provides pub-sub.
|
|
|
|
### 2. Structured message formats vs natural language between agents?
|
|
|
|
**Recommendation: Structured messages with natural language payload.**
|
|
|
|
```python
|
|
# Structured envelope with natural language content
|
|
{
|
|
"id": "msg-uuid-123",
|
|
"type": "request",
|
|
"from": {"agent_id": "eng-001", "role": "Engineer", "name": "Dave"},
|
|
"to": {"agent_id": "arch-001", "role": "Architect", "name": "Alex"},
|
|
"action": "request_guidance",
|
|
"priority": "normal",
|
|
"context": {
|
|
"project_id": "proj-123",
|
|
"issue_id": "issue-456",
|
|
"conversation_id": "conv-789"
|
|
},
|
|
"content": "I'm implementing the auth module. Should I use JWT with refresh tokens or session-based auth?",
|
|
"metadata": {
|
|
"created_at": "2025-12-29T10:00:00Z",
|
|
"expires_at": "2025-12-29T11:00:00Z",
|
|
"requires_response": true
|
|
}
|
|
}
|
|
```
|
|
|
|
**Rationale:**
|
|
- Structured envelope enables routing, filtering, and auditing
|
|
- Natural language content preserves LLM reasoning capabilities
|
|
- Matches A2A/ACP hybrid approach (structured headers, flexible payload)
|
|
- Enables agent-to-agent understanding without rigid schemas
|
|
|
|
### 3. How to handle async vs sync communication?
|
|
|
|
**Pattern Mapping:**
|
|
|
|
| Scenario | Timing | Implementation |
|
|
|----------|--------|----------------|
|
|
| Quick clarification | Sync-like (< 30s) | Request-response with timeout |
|
|
| Code review request | Async (minutes-hours) | Task queue + callback |
|
|
| Broadcast announcement | Fire-and-forget | Pub/Sub |
|
|
| Long-running analysis | Streaming | SSE with progress events |
|
|
|
|
**Implementation Strategy:**
|
|
|
|
```python
|
|
class MessageMode(str, Enum):
|
|
SYNC = "sync" # Await response (with timeout)
|
|
ASYNC = "async" # Queue for later, callback on completion
|
|
FIRE_AND_FORGET = "fire_and_forget" # No response expected
|
|
STREAM = "stream" # Continuous updates
|
|
```
|
|
|
|
### 4. Message routing strategies?
|
|
|
|
**Recommended: Hierarchical routing with three strategies.**
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Message Router │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
|
│ │ DIRECT │ │ ROLE-BASED │ │ BROADCAST │ │
|
|
│ │ Routing │ │ Routing │ │ Routing │ │
|
|
│ │ │ │ │ │ │ │
|
|
│ │ to: agent │ │ to: @role │ │ to: @all │ │
|
|
│ │ "arch-001" │ │ "@engineers"│ │ "@project" │ │
|
|
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
| Strategy | Syntax | Use Case |
|
|
|----------|--------|----------|
|
|
| **Direct** | `to: "agent-123"` | Specific agent communication |
|
|
| **Role-based** | `to: "@engineers"` | All agents of a role |
|
|
| **Broadcast** | `to: "@all"` | Project-wide announcements |
|
|
| **Topic-based** | `to: "#auth-module"` | Agents subscribed to topic |
|
|
|
|
### 5. How to maintain conversation context across agent interactions?
|
|
|
|
**Three-tier context management:**
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Context Hierarchy │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|
│ │ CONVERSATION CONTEXT (Short-term) │ │
|
|
│ │ - Current message thread │ │
|
|
│ │ - Last N exchanges between agents │ │
|
|
│ │ - Active topic/issue context │ │
|
|
│ │ Storage: In-memory + Redis (TTL: 1 hour) │ │
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|
│ │ SESSION CONTEXT (Medium-term) │ │
|
|
│ │ - Sprint goals and decisions │ │
|
|
│ │ - Shared agreements between agents │ │
|
|
│ │ - Recent artifacts and references │ │
|
|
│ │ Storage: PostgreSQL AgentMessage table │ │
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ┌───────────────────────────────────────────────────────────┐ │
|
|
│ │ PROJECT CONTEXT (Long-term) │ │
|
|
│ │ - Architecture decisions (ADRs) │ │
|
|
│ │ - Requirements and constraints │ │
|
|
│ │ - Knowledge base documents │ │
|
|
│ │ Storage: PostgreSQL + pgvector (RAG) │ │
|
|
│ └───────────────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
**Context injection pattern:**
|
|
|
|
```python
|
|
async def prepare_agent_context(
|
|
agent_id: str,
|
|
conversation_id: str,
|
|
project_id: str
|
|
) -> list[dict]:
|
|
"""Build context for LLM call."""
|
|
context = []
|
|
|
|
# 1. Conversation context (recent exchanges)
|
|
recent_messages = await get_conversation_history(
|
|
conversation_id, limit=10
|
|
)
|
|
context.extend(format_as_messages(recent_messages))
|
|
|
|
# 2. Session context (relevant decisions)
|
|
session_context = await get_session_context(agent_id, project_id)
|
|
if session_context:
|
|
context.append({
|
|
"role": "system",
|
|
"content": f"Session context:\n{session_context}"
|
|
})
|
|
|
|
# 3. Project context (RAG retrieval)
|
|
query = extract_query_from_conversation(recent_messages)
|
|
rag_results = await search_knowledge_base(project_id, query)
|
|
if rag_results:
|
|
context.append({
|
|
"role": "system",
|
|
"content": f"Relevant project context:\n{rag_results}"
|
|
})
|
|
|
|
return context
|
|
```
|
|
|
|
### 6. Conflict resolution when agents disagree?
|
|
|
|
**Hierarchical escalation model:**
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Conflict Resolution Hierarchy │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ Level 1: Direct Negotiation │
|
|
│ ├── Agents exchange arguments (max 3 rounds) │
|
|
│ ├── If consensus → Resolution recorded │
|
|
│ └── If no consensus → Escalate to Level 2 │
|
|
│ │
|
|
│ Level 2: Expert Arbitration │
|
|
│ ├── Relevant expert agent weighs in (Architect for tech) │
|
|
│ ├── Expert makes binding recommendation │
|
|
│ └── If expert unavailable → Escalate to Level 3 │
|
|
│ │
|
|
│ Level 3: Human Decision │
|
|
│ ├── Conflict summarized and sent to human client │
|
|
│ ├── Approval event triggers resolution │
|
|
│ └── Decision recorded for future reference │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
**Implementation:**
|
|
|
|
```python
|
|
class ConflictResolution(str, Enum):
|
|
NEGOTIATION = "negotiation"
|
|
EXPERT_ARBITRATION = "expert_arbitration"
|
|
HUMAN_DECISION = "human_decision"
|
|
|
|
class ConflictTracker:
|
|
async def record_conflict(
|
|
self,
|
|
participants: list[str],
|
|
topic: str,
|
|
positions: dict[str, str]
|
|
) -> Conflict:
|
|
"""Record a conflict between agents."""
|
|
return await self.db.create(Conflict(
|
|
participants=participants,
|
|
topic=topic,
|
|
positions=positions,
|
|
status=ConflictStatus.OPEN,
|
|
resolution_level=ConflictResolution.NEGOTIATION,
|
|
negotiation_rounds=0
|
|
))
|
|
|
|
async def escalate(self, conflict_id: str) -> Conflict:
|
|
"""Escalate conflict to next resolution level."""
|
|
conflict = await self.get(conflict_id)
|
|
|
|
if conflict.resolution_level == ConflictResolution.NEGOTIATION:
|
|
# Find relevant expert
|
|
expert = await self.find_expert_for_topic(conflict.topic)
|
|
conflict.arbitrator_id = expert.id
|
|
conflict.resolution_level = ConflictResolution.EXPERT_ARBITRATION
|
|
|
|
elif conflict.resolution_level == ConflictResolution.EXPERT_ARBITRATION:
|
|
# Create human approval request
|
|
await self.create_approval_request(conflict)
|
|
conflict.resolution_level = ConflictResolution.HUMAN_DECISION
|
|
|
|
return await self.db.update(conflict)
|
|
```
|
|
|
|
### 7. How to audit/log inter-agent communication?
|
|
|
|
**Comprehensive audit trail:**
|
|
|
|
```python
|
|
class MessageAuditLog:
|
|
"""All agent messages are persisted for audit."""
|
|
|
|
id = Column(UUID, primary_key=True)
|
|
message_id = Column(UUID, index=True)
|
|
|
|
# Routing info
|
|
from_agent_id = Column(UUID, ForeignKey("agent_instances.id"))
|
|
to_agent_id = Column(UUID, nullable=True) # NULL for broadcasts
|
|
to_role = Column(String, nullable=True)
|
|
|
|
# Message details
|
|
message_type = Column(Enum(MessageType))
|
|
action = Column(String(50))
|
|
content_hash = Column(String(64)) # SHA-256 of content
|
|
content = Column(Text) # Full content (encrypted at rest)
|
|
|
|
# Context
|
|
project_id = Column(UUID, ForeignKey("projects.id"), index=True)
|
|
conversation_id = Column(UUID, index=True)
|
|
parent_message_id = Column(UUID, nullable=True)
|
|
|
|
# Response tracking
|
|
response_to_id = Column(UUID, nullable=True)
|
|
response_received_at = Column(DateTime, nullable=True)
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|
|
delivered_at = Column(DateTime, nullable=True)
|
|
read_at = Column(DateTime, nullable=True)
|
|
```
|
|
|
|
---
|
|
|
|
## Message Format Specification
|
|
|
|
### Core Message Schema
|
|
|
|
```python
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, Literal
|
|
from datetime import datetime
|
|
from uuid import UUID
|
|
|
|
class AgentIdentifier(BaseModel):
|
|
"""Identifies an agent in communication."""
|
|
agent_id: UUID
|
|
role: str
|
|
name: str
|
|
|
|
class MessageContext(BaseModel):
|
|
"""Contextual information for message routing."""
|
|
project_id: UUID
|
|
conversation_id: UUID
|
|
issue_id: Optional[UUID] = None
|
|
sprint_id: Optional[UUID] = None
|
|
parent_message_id: Optional[UUID] = None
|
|
|
|
class MessageMetadata(BaseModel):
|
|
"""Message metadata for processing."""
|
|
created_at: datetime = Field(default_factory=datetime.utcnow)
|
|
expires_at: Optional[datetime] = None
|
|
priority: Literal["urgent", "high", "normal", "low"] = "normal"
|
|
requires_response: bool = False
|
|
response_timeout_seconds: Optional[int] = None
|
|
retry_count: int = 0
|
|
max_retries: int = 3
|
|
|
|
class AgentMessage(BaseModel):
|
|
"""Primary message format for inter-agent communication."""
|
|
|
|
# Identity
|
|
id: UUID = Field(default_factory=uuid4)
|
|
type: Literal["request", "response", "notification", "broadcast", "stream"]
|
|
|
|
# Routing
|
|
sender: AgentIdentifier
|
|
recipient: Optional[AgentIdentifier] = None # None for broadcasts
|
|
recipient_role: Optional[str] = None # For role-based routing
|
|
recipient_all: bool = False # For project-wide broadcast
|
|
|
|
# Action
|
|
action: str # e.g., "request_review", "report_bug", "share_finding"
|
|
|
|
# Content
|
|
content: str # Natural language message
|
|
attachments: list[Attachment] = [] # Files, code snippets, etc.
|
|
|
|
# Context
|
|
context: MessageContext
|
|
|
|
# Metadata
|
|
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
|
|
|
|
# Response linking
|
|
in_response_to: Optional[UUID] = None
|
|
|
|
class Attachment(BaseModel):
|
|
"""Attachments to messages."""
|
|
type: Literal["code", "file", "image", "document", "reference"]
|
|
name: str
|
|
content: Optional[str] = None # For inline content
|
|
url: Optional[str] = None # For file references
|
|
mime_type: Optional[str] = None
|
|
```
|
|
|
|
### Action Types
|
|
|
|
```python
|
|
class MessageAction(str, Enum):
|
|
# Request actions
|
|
REQUEST_REVIEW = "request_review"
|
|
REQUEST_GUIDANCE = "request_guidance"
|
|
REQUEST_CLARIFICATION = "request_clarification"
|
|
REQUEST_APPROVAL = "request_approval"
|
|
DELEGATE_TASK = "delegate_task"
|
|
|
|
# Response actions
|
|
PROVIDE_REVIEW = "provide_review"
|
|
PROVIDE_GUIDANCE = "provide_guidance"
|
|
PROVIDE_CLARIFICATION = "provide_clarification"
|
|
APPROVE = "approve"
|
|
REJECT = "reject"
|
|
|
|
# Notification actions
|
|
REPORT_BUG = "report_bug"
|
|
REPORT_PROGRESS = "report_progress"
|
|
REPORT_BLOCKER = "report_blocker"
|
|
SHARE_FINDING = "share_finding"
|
|
ANNOUNCE_DECISION = "announce_decision"
|
|
|
|
# Collaboration actions
|
|
PROPOSE_APPROACH = "propose_approach"
|
|
CHALLENGE_APPROACH = "challenge_approach"
|
|
AGREE_WITH = "agree_with"
|
|
DISAGREE_WITH = "disagree_with"
|
|
```
|
|
|
|
---
|
|
|
|
## Communication Patterns
|
|
|
|
### Pattern 1: Request-Response
|
|
|
|
**Use Case:** Engineer asks Architect for design guidance.
|
|
|
|
```python
|
|
# Engineer sends request
|
|
request = AgentMessage(
|
|
type="request",
|
|
sender=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
|
|
recipient=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
|
|
action="request_guidance",
|
|
content="I need to implement caching for the user service. Should I use Redis with write-through or write-behind strategy?",
|
|
context=MessageContext(
|
|
project_id=project_id,
|
|
conversation_id=conv_id,
|
|
issue_id=issue_id
|
|
),
|
|
metadata=MessageMetadata(
|
|
priority="normal",
|
|
requires_response=True,
|
|
response_timeout_seconds=300
|
|
)
|
|
)
|
|
|
|
# Send and await response
|
|
response = await message_router.send_and_wait(request, timeout=300)
|
|
|
|
# Architect's response
|
|
response = AgentMessage(
|
|
type="response",
|
|
sender=AgentIdentifier(agent_id=arch_id, role="Architect", name="Alex"),
|
|
recipient=request.sender,
|
|
action="provide_guidance",
|
|
content="Given our consistency requirements, use write-through caching with Redis. Here's the pattern...",
|
|
in_response_to=request.id,
|
|
context=request.context,
|
|
attachments=[
|
|
Attachment(type="code", name="cache_pattern.py", content="...")
|
|
]
|
|
)
|
|
```
|
|
|
|
### Pattern 2: Broadcast
|
|
|
|
**Use Case:** Product Owner announces sprint goals.
|
|
|
|
```python
|
|
broadcast = AgentMessage(
|
|
type="broadcast",
|
|
sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
|
|
recipient_all=True,
|
|
action="announce_decision",
|
|
content="Sprint 3 goals: 1) Complete auth module 2) Add user settings page 3) Fix critical bugs from QA",
|
|
context=MessageContext(
|
|
project_id=project_id,
|
|
conversation_id=conv_id,
|
|
sprint_id=sprint_id
|
|
),
|
|
metadata=MessageMetadata(
|
|
priority="high",
|
|
requires_response=False
|
|
)
|
|
)
|
|
|
|
await message_router.broadcast(broadcast)
|
|
```
|
|
|
|
### Pattern 3: Role-Based Routing
|
|
|
|
**Use Case:** QA reports a bug to all Engineers.
|
|
|
|
```python
|
|
bug_report = AgentMessage(
|
|
type="notification",
|
|
sender=AgentIdentifier(agent_id=qa_id, role="QA", name="Quinn"),
|
|
recipient_role="Engineer", # All engineers receive this
|
|
action="report_bug",
|
|
content="Found a critical bug in the auth flow: users can bypass 2FA by...",
|
|
context=MessageContext(
|
|
project_id=project_id,
|
|
conversation_id=conv_id,
|
|
issue_id=bug_issue_id
|
|
),
|
|
metadata=MessageMetadata(
|
|
priority="urgent",
|
|
requires_response=True
|
|
)
|
|
)
|
|
|
|
await message_router.send_to_role(bug_report)
|
|
```
|
|
|
|
### Pattern 4: Task Delegation
|
|
|
|
**Use Case:** Product Owner assigns work to Engineer.
|
|
|
|
```python
|
|
delegation = AgentMessage(
|
|
type="request",
|
|
sender=AgentIdentifier(agent_id=po_id, role="ProductOwner", name="Sarah"),
|
|
recipient=AgentIdentifier(agent_id=eng_id, role="Engineer", name="Dave"),
|
|
action="delegate_task",
|
|
content="Please implement issue #45: Add password reset functionality",
|
|
context=MessageContext(
|
|
project_id=project_id,
|
|
conversation_id=conv_id,
|
|
issue_id=issue_45_id
|
|
),
|
|
metadata=MessageMetadata(
|
|
priority="normal",
|
|
requires_response=True
|
|
)
|
|
)
|
|
|
|
# This creates a Celery task for async execution
|
|
await message_router.delegate_task(delegation)
|
|
```
|
|
|
|
### Pattern 5: Streaming Updates
|
|
|
|
**Use Case:** Engineer shares progress during long task.
|
|
|
|
```python
|
|
async def stream_progress(agent_id: str, task_id: str):
|
|
"""Stream progress updates during task execution."""
|
|
|
|
for step in task_steps:
|
|
# Execute step
|
|
result = await execute_step(step)
|
|
|
|
# Stream update
|
|
update = AgentMessage(
|
|
type="stream",
|
|
sender=AgentIdentifier(agent_id=agent_id, ...),
|
|
recipient_all=True,
|
|
action="report_progress",
|
|
content=f"Completed: {step.name}. Progress: {step.percent}%",
|
|
context=MessageContext(project_id=project_id, ...),
|
|
)
|
|
|
|
await message_router.stream(update)
|
|
```
|
|
|
|
---
|
|
|
|
## Database Schema
|
|
|
|
### Message Storage
|
|
|
|
```python
|
|
# app/models/agent_message.py
|
|
from sqlalchemy import Column, String, Text, DateTime, Enum, ForeignKey, Index
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
|
from app.db.base import Base
|
|
|
|
class AgentMessage(Base):
|
|
"""Persistent storage for all agent messages."""
|
|
|
|
__tablename__ = "agent_messages"
|
|
|
|
# Primary key
|
|
id = Column(UUID, primary_key=True, default=uuid4)
|
|
|
|
# Message type
|
|
type = Column(
|
|
Enum("request", "response", "notification", "broadcast", "stream",
|
|
name="message_type"),
|
|
nullable=False
|
|
)
|
|
|
|
# Sender
|
|
sender_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
|
|
sender_role = Column(String(50), nullable=False)
|
|
sender_name = Column(String(100), nullable=False)
|
|
|
|
# Recipient (nullable for broadcasts)
|
|
recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True)
|
|
recipient_role = Column(String(50), nullable=True) # For role-based routing
|
|
is_broadcast = Column(Boolean, default=False)
|
|
|
|
# Action
|
|
action = Column(String(50), nullable=False)
|
|
|
|
# Content
|
|
content = Column(Text, nullable=False)
|
|
content_hash = Column(String(64), nullable=False) # SHA-256
|
|
attachments = Column(JSONB, default=list)
|
|
|
|
# Context
|
|
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
|
|
conversation_id = Column(UUID, nullable=False)
|
|
issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
|
|
sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
|
|
parent_message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)
|
|
|
|
# Response linking
|
|
in_response_to_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=True)
|
|
|
|
# Metadata
|
|
priority = Column(
|
|
Enum("urgent", "high", "normal", "low", name="priority_level"),
|
|
default="normal"
|
|
)
|
|
requires_response = Column(Boolean, default=False)
|
|
response_timeout_seconds = Column(Integer, nullable=True)
|
|
|
|
# Status tracking
|
|
status = Column(
|
|
Enum("pending", "delivered", "read", "responded", "expired", "failed",
|
|
name="message_status"),
|
|
default="pending"
|
|
)
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
delivered_at = Column(DateTime, nullable=True)
|
|
read_at = Column(DateTime, nullable=True)
|
|
responded_at = Column(DateTime, nullable=True)
|
|
expires_at = Column(DateTime, nullable=True)
|
|
|
|
# Indexes
|
|
__table_args__ = (
|
|
Index("ix_agent_messages_project_conversation", "project_id", "conversation_id"),
|
|
Index("ix_agent_messages_sender", "sender_agent_id", "created_at"),
|
|
Index("ix_agent_messages_recipient", "recipient_agent_id", "status"),
|
|
Index("ix_agent_messages_conversation", "conversation_id", "created_at"),
|
|
)
|
|
|
|
|
|
class Conversation(Base):
|
|
"""Groups related messages into conversations."""
|
|
|
|
__tablename__ = "conversations"
|
|
|
|
id = Column(UUID, primary_key=True, default=uuid4)
|
|
|
|
# Context
|
|
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
|
|
issue_id = Column(UUID, ForeignKey("issues.id"), nullable=True)
|
|
sprint_id = Column(UUID, ForeignKey("sprints.id"), nullable=True)
|
|
|
|
# Participants
|
|
participant_ids = Column(JSONB, default=list) # List of agent IDs
|
|
|
|
# Topic
|
|
topic = Column(String(200), nullable=True)
|
|
|
|
# Status
|
|
status = Column(
|
|
Enum("active", "resolved", "archived", name="conversation_status"),
|
|
default="active"
|
|
)
|
|
|
|
# Timestamps
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|
|
updated_at = Column(DateTime, onupdate=datetime.utcnow)
|
|
resolved_at = Column(DateTime, nullable=True)
|
|
|
|
# Summary (generated)
|
|
summary = Column(Text, nullable=True)
|
|
|
|
|
|
class MessageDelivery(Base):
|
|
"""Tracks delivery status for broadcast messages."""
|
|
|
|
__tablename__ = "message_deliveries"
|
|
|
|
id = Column(UUID, primary_key=True, default=uuid4)
|
|
message_id = Column(UUID, ForeignKey("agent_messages.id"), nullable=False)
|
|
recipient_agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=False)
|
|
|
|
# Status
|
|
delivered_at = Column(DateTime, nullable=True)
|
|
read_at = Column(DateTime, nullable=True)
|
|
responded_at = Column(DateTime, nullable=True)
|
|
```
|
|
|
|
### Alembic Migration
|
|
|
|
```python
|
|
# migrations/versions/xxx_add_agent_messages.py
|
|
def upgrade():
|
|
# Create enum types
|
|
op.execute("""
|
|
CREATE TYPE message_type AS ENUM (
|
|
'request', 'response', 'notification', 'broadcast', 'stream'
|
|
)
|
|
""")
|
|
op.execute("""
|
|
CREATE TYPE priority_level AS ENUM ('urgent', 'high', 'normal', 'low')
|
|
""")
|
|
op.execute("""
|
|
CREATE TYPE message_status AS ENUM (
|
|
'pending', 'delivered', 'read', 'responded', 'expired', 'failed'
|
|
)
|
|
""")
|
|
op.execute("""
|
|
CREATE TYPE conversation_status AS ENUM ('active', 'resolved', 'archived')
|
|
""")
|
|
|
|
# Create tables
|
|
op.create_table(
|
|
'conversations',
|
|
sa.Column('id', UUID, primary_key=True),
|
|
sa.Column('project_id', UUID, sa.ForeignKey('projects.id'), nullable=False),
|
|
sa.Column('issue_id', UUID, sa.ForeignKey('issues.id'), nullable=True),
|
|
sa.Column('sprint_id', UUID, sa.ForeignKey('sprints.id'), nullable=True),
|
|
sa.Column('participant_ids', JSONB, default=[]),
|
|
sa.Column('topic', sa.String(200), nullable=True),
|
|
sa.Column('status', sa.Enum('active', 'resolved', 'archived',
|
|
name='conversation_status'), default='active'),
|
|
sa.Column('created_at', sa.DateTime, default=datetime.utcnow),
|
|
sa.Column('updated_at', sa.DateTime, onupdate=datetime.utcnow),
|
|
sa.Column('resolved_at', sa.DateTime, nullable=True),
|
|
sa.Column('summary', sa.Text, nullable=True),
|
|
)
|
|
|
|
op.create_table(
|
|
'agent_messages',
|
|
# ... columns as defined above
|
|
)
|
|
|
|
op.create_table(
|
|
'message_deliveries',
|
|
# ... columns as defined above
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Code Examples
|
|
|
|
### Message Router Service
|
|
|
|
```python
|
|
# app/services/message_router.py
|
|
from typing import Optional, AsyncIterator
|
|
from uuid import UUID
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
|
|
from app.models.agent_message import AgentMessage as AgentMessageModel
|
|
from app.schemas.messages import AgentMessage, AgentIdentifier
|
|
from app.services.events import EventBus
|
|
from app.db.session import AsyncSession
|
|
from app.core.celery_app import celery_app
|
|
|
|
class MessageRouter:
|
|
"""Routes messages between agents."""
|
|
|
|
def __init__(
|
|
self,
|
|
db: AsyncSession,
|
|
event_bus: EventBus,
|
|
orchestrator: "AgentOrchestrator"
|
|
):
|
|
self.db = db
|
|
self.event_bus = event_bus
|
|
self.orchestrator = orchestrator
|
|
|
|
async def send(self, message: AgentMessage) -> AgentMessageModel:
|
|
"""Send a message to a specific agent."""
|
|
# Validate recipient exists and is active
|
|
recipient = await self.orchestrator.get_instance(message.recipient.agent_id)
|
|
if not recipient or recipient.status != "active":
|
|
raise AgentNotAvailableError(f"Agent {message.recipient.agent_id} not available")
|
|
|
|
# Persist message
|
|
db_message = await self._persist_message(message)
|
|
|
|
# Publish to recipient's channel
|
|
await self.event_bus.publish(
|
|
f"agent:{message.recipient.agent_id}",
|
|
{
|
|
"type": "message_received",
|
|
"message_id": str(db_message.id),
|
|
"sender": message.sender.dict(),
|
|
"action": message.action,
|
|
"priority": message.metadata.priority,
|
|
"preview": message.content[:100]
|
|
}
|
|
)
|
|
|
|
# Also publish to project channel for monitoring
|
|
await self.event_bus.publish(
|
|
f"project:{message.context.project_id}",
|
|
{
|
|
"type": "agent_message",
|
|
"from": message.sender.name,
|
|
"to": message.recipient.name,
|
|
"action": message.action
|
|
}
|
|
)
|
|
|
|
return db_message
|
|
|
|
async def send_and_wait(
|
|
self,
|
|
message: AgentMessage,
|
|
timeout: int = 300
|
|
) -> Optional[AgentMessage]:
|
|
"""Send a message and wait for response."""
|
|
db_message = await self.send(message)
|
|
|
|
# Subscribe to response channel
|
|
response_channel = f"response:{db_message.id}"
|
|
subscriber = await self.event_bus.subscribe(response_channel)
|
|
|
|
try:
|
|
response_event = await asyncio.wait_for(
|
|
subscriber.get_event(),
|
|
timeout=timeout
|
|
)
|
|
response_id = response_event.data["response_id"]
|
|
return await self.get_message(response_id)
|
|
except asyncio.TimeoutError:
|
|
# Update message status to expired
|
|
await self._mark_expired(db_message.id)
|
|
return None
|
|
finally:
|
|
await subscriber.unsubscribe()
|
|
|
|
async def send_to_role(self, message: AgentMessage) -> list[AgentMessageModel]:
|
|
"""Send message to all agents of a specific role."""
|
|
# Get all active agents with the target role
|
|
recipients = await self.orchestrator.get_instances_by_role(
|
|
project_id=message.context.project_id,
|
|
role=message.recipient_role
|
|
)
|
|
|
|
messages = []
|
|
for recipient in recipients:
|
|
msg = message.copy()
|
|
msg.recipient = AgentIdentifier(
|
|
agent_id=recipient.id,
|
|
role=recipient.agent_type.role,
|
|
name=recipient.name
|
|
)
|
|
db_message = await self.send(msg)
|
|
messages.append(db_message)
|
|
|
|
return messages
|
|
|
|
async def broadcast(self, message: AgentMessage) -> AgentMessageModel:
|
|
"""Broadcast message to all project agents."""
|
|
# Persist the broadcast message
|
|
db_message = await self._persist_message(message)
|
|
|
|
# Get all active agents in project
|
|
agents = await self.orchestrator.get_project_agents(
|
|
message.context.project_id
|
|
)
|
|
|
|
# Create delivery records
|
|
for agent in agents:
|
|
if agent.id != message.sender.agent_id:
|
|
await self._create_delivery_record(db_message.id, agent.id)
|
|
|
|
# Publish to project channel
|
|
await self.event_bus.publish(
|
|
f"project:{message.context.project_id}",
|
|
{
|
|
"type": "broadcast",
|
|
"message_id": str(db_message.id),
|
|
"sender": message.sender.dict(),
|
|
"action": message.action,
|
|
"content": message.content
|
|
}
|
|
)
|
|
|
|
return db_message
|
|
|
|
async def delegate_task(self, message: AgentMessage) -> str:
|
|
"""Delegate a task for async execution via Celery."""
|
|
# Persist message
|
|
db_message = await self._persist_message(message)
|
|
|
|
# Create Celery task
|
|
task = celery_app.send_task(
|
|
"app.tasks.agent.execute_delegated_task",
|
|
args=[str(db_message.id), str(message.recipient.agent_id)],
|
|
queue="agent_tasks",
|
|
priority=self._priority_to_int(message.metadata.priority)
|
|
)
|
|
|
|
return task.id
|
|
|
|
async def respond(
|
|
self,
|
|
original_message_id: UUID,
|
|
response: AgentMessage
|
|
) -> AgentMessageModel:
|
|
"""Send a response to a previous message."""
|
|
original = await self.get_message(original_message_id)
|
|
|
|
response.in_response_to = original_message_id
|
|
response.context = original.context
|
|
|
|
db_response = await self.send(response)
|
|
|
|
# Update original message status
|
|
await self._mark_responded(original_message_id)
|
|
|
|
# Publish response notification
|
|
await self.event_bus.publish(
|
|
f"response:{original_message_id}",
|
|
{
|
|
"type": "response_received",
|
|
"response_id": str(db_response.id)
|
|
}
|
|
)
|
|
|
|
return db_response
|
|
|
|
async def get_conversation_history(
|
|
self,
|
|
conversation_id: UUID,
|
|
limit: int = 50,
|
|
before: Optional[datetime] = None
|
|
) -> list[AgentMessageModel]:
|
|
"""Get messages in a conversation."""
|
|
query = select(AgentMessageModel).where(
|
|
AgentMessageModel.conversation_id == conversation_id
|
|
)
|
|
|
|
if before:
|
|
query = query.where(AgentMessageModel.created_at < before)
|
|
|
|
query = query.order_by(AgentMessageModel.created_at.desc()).limit(limit)
|
|
|
|
result = await self.db.execute(query)
|
|
return list(reversed(result.scalars().all()))
|
|
|
|
async def _persist_message(self, message: AgentMessage) -> AgentMessageModel:
|
|
"""Persist message to database."""
|
|
content_hash = hashlib.sha256(message.content.encode()).hexdigest()
|
|
|
|
db_message = AgentMessageModel(
|
|
id=message.id,
|
|
type=message.type,
|
|
sender_agent_id=message.sender.agent_id,
|
|
sender_role=message.sender.role,
|
|
sender_name=message.sender.name,
|
|
recipient_agent_id=message.recipient.agent_id if message.recipient else None,
|
|
recipient_role=message.recipient_role,
|
|
is_broadcast=message.recipient_all,
|
|
action=message.action,
|
|
content=message.content,
|
|
content_hash=content_hash,
|
|
attachments=[a.dict() for a in message.attachments],
|
|
project_id=message.context.project_id,
|
|
conversation_id=message.context.conversation_id,
|
|
issue_id=message.context.issue_id,
|
|
sprint_id=message.context.sprint_id,
|
|
parent_message_id=message.context.parent_message_id,
|
|
in_response_to_id=message.in_response_to,
|
|
priority=message.metadata.priority,
|
|
requires_response=message.metadata.requires_response,
|
|
response_timeout_seconds=message.metadata.response_timeout_seconds,
|
|
expires_at=message.metadata.expires_at,
|
|
)
|
|
|
|
self.db.add(db_message)
|
|
await self.db.commit()
|
|
await self.db.refresh(db_message)
|
|
|
|
return db_message
|
|
|
|
def _priority_to_int(self, priority: str) -> int:
|
|
"""Convert priority to Celery priority (0=highest)."""
|
|
return {"urgent": 0, "high": 3, "normal": 6, "low": 9}.get(priority, 6)
|
|
```
|
|
|
|
### Receiving and Processing Messages
|
|
|
|
```python
|
|
# app/services/agent_inbox.py
|
|
from typing import Optional, Callable, Awaitable
|
|
|
|
class AgentInbox:
|
|
"""Manages incoming messages for an agent."""
|
|
|
|
def __init__(
|
|
self,
|
|
agent_id: UUID,
|
|
event_bus: EventBus,
|
|
message_router: MessageRouter
|
|
):
|
|
self.agent_id = agent_id
|
|
self.event_bus = event_bus
|
|
self.message_router = message_router
|
|
self._handlers: dict[str, Callable] = {}
|
|
|
|
def on_action(
|
|
self,
|
|
action: str,
|
|
handler: Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]]
|
|
):
|
|
"""Register a handler for a specific action type."""
|
|
self._handlers[action] = handler
|
|
|
|
async def start_listening(self):
|
|
"""Start listening for incoming messages."""
|
|
channel = f"agent:{self.agent_id}"
|
|
subscriber = await self.event_bus.subscribe(channel)
|
|
|
|
while True:
|
|
event = await subscriber.get_event()
|
|
|
|
if event.type == "message_received":
|
|
message = await self.message_router.get_message(
|
|
event.data["message_id"]
|
|
)
|
|
await self._process_message(message)
|
|
|
|
async def _process_message(self, message: AgentMessage):
|
|
"""Process an incoming message."""
|
|
# Mark as read
|
|
await self.message_router.mark_read(message.id)
|
|
|
|
# Find handler
|
|
handler = self._handlers.get(message.action)
|
|
if not handler:
|
|
handler = self._handlers.get("default")
|
|
|
|
if handler:
|
|
response = await handler(message)
|
|
|
|
if response and message.requires_response:
|
|
await self.message_router.respond(message.id, response)
|
|
|
|
|
|
# Usage in agent runner
|
|
class AgentRunner:
|
|
async def setup_message_handlers(self):
|
|
"""Setup handlers for different message types."""
|
|
|
|
@self.inbox.on_action("request_review")
|
|
async def handle_review_request(message: AgentMessage) -> AgentMessage:
|
|
# Execute code review
|
|
review_result = await self.execute("review_code", {
|
|
"content": message.content,
|
|
"attachments": message.attachments
|
|
})
|
|
|
|
return AgentMessage(
|
|
type="response",
|
|
sender=self.identifier,
|
|
recipient=message.sender,
|
|
action="provide_review",
|
|
content=review_result["content"],
|
|
attachments=review_result.get("attachments", [])
|
|
)
|
|
|
|
@self.inbox.on_action("delegate_task")
|
|
async def handle_task_delegation(message: AgentMessage) -> AgentMessage:
|
|
# Accept and start working on task
|
|
await self.update_status("working")
|
|
|
|
# Execute task
|
|
result = await self.execute("implement", {
|
|
"task_description": message.content,
|
|
"issue_id": message.context.issue_id
|
|
})
|
|
|
|
return AgentMessage(
|
|
type="response",
|
|
sender=self.identifier,
|
|
action="task_completed",
|
|
content=f"Task completed. {result['summary']}"
|
|
)
|
|
```
|
|
|
|
### API Endpoints
|
|
|
|
```python
|
|
# app/api/v1/messages.py
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from uuid import UUID
|
|
|
|
from app.schemas.messages import AgentMessage, MessageCreate, MessageResponse
|
|
from app.services.message_router import MessageRouter
|
|
from app.api.deps import get_message_router, get_current_user
|
|
|
|
router = APIRouter(prefix="/messages", tags=["messages"])
|
|
|
|
@router.post("/send", response_model=MessageResponse)
|
|
async def send_message(
|
|
message_in: MessageCreate,
|
|
router: MessageRouter = Depends(get_message_router),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Send a message between agents (admin/system use)."""
|
|
message = AgentMessage(**message_in.dict())
|
|
result = await router.send(message)
|
|
return MessageResponse.from_orm(result)
|
|
|
|
@router.post("/broadcast", response_model=MessageResponse)
|
|
async def broadcast_message(
|
|
message_in: MessageCreate,
|
|
router: MessageRouter = Depends(get_message_router),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Broadcast a message to all project agents."""
|
|
message = AgentMessage(**message_in.dict(), recipient_all=True)
|
|
result = await router.broadcast(message)
|
|
return MessageResponse.from_orm(result)
|
|
|
|
@router.get("/conversation/{conversation_id}", response_model=list[MessageResponse])
|
|
async def get_conversation(
|
|
conversation_id: UUID,
|
|
limit: int = 50,
|
|
router: MessageRouter = Depends(get_message_router),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get messages in a conversation."""
|
|
messages = await router.get_conversation_history(conversation_id, limit=limit)
|
|
return [MessageResponse.from_orm(m) for m in messages]
|
|
|
|
@router.get("/agent/{agent_id}/inbox", response_model=list[MessageResponse])
|
|
async def get_agent_inbox(
|
|
agent_id: UUID,
|
|
status: Optional[str] = None,
|
|
limit: int = 50,
|
|
router: MessageRouter = Depends(get_message_router),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get messages in an agent's inbox."""
|
|
messages = await router.get_agent_messages(agent_id, status=status, limit=limit)
|
|
return [MessageResponse.from_orm(m) for m in messages]
|
|
```
|
|
|
|
---
|
|
|
|
## SSE Integration
|
|
|
|
### Real-time Message Events
|
|
|
|
Messages integrate with the existing SSE event system (ADR-002):
|
|
|
|
```python
|
|
# Extended event types for messages
|
|
class EventType(str, Enum):
|
|
# Existing events...
|
|
|
|
# Message Events
|
|
MESSAGE_SENT = "message_sent"
|
|
MESSAGE_RECEIVED = "message_received"
|
|
MESSAGE_READ = "message_read"
|
|
MESSAGE_RESPONDED = "message_responded"
|
|
|
|
# Conversation Events
|
|
CONVERSATION_STARTED = "conversation_started"
|
|
CONVERSATION_RESOLVED = "conversation_resolved"
|
|
|
|
# Conflict Events
|
|
CONFLICT_DETECTED = "conflict_detected"
|
|
CONFLICT_ESCALATED = "conflict_escalated"
|
|
CONFLICT_RESOLVED = "conflict_resolved"
|
|
```
|
|
|
|
### Message Event Stream
|
|
|
|
```python
|
|
# app/api/v1/events.py (extended)
|
|
|
|
@router.get("/projects/{project_id}/messages/events")
|
|
async def message_events(
|
|
project_id: UUID,
|
|
request: Request,
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Stream message events for a project."""
|
|
|
|
async def event_generator():
|
|
subscriber = await event_bus.subscribe(f"project:{project_id}:messages")
|
|
|
|
try:
|
|
while not await request.is_disconnected():
|
|
try:
|
|
event = await asyncio.wait_for(
|
|
subscriber.get_event(),
|
|
timeout=30.0
|
|
)
|
|
|
|
# Filter by event type
|
|
if event.type in [
|
|
"message_sent", "message_received",
|
|
"conversation_started", "conflict_detected"
|
|
]:
|
|
yield f"event: {event.type}\ndata: {json.dumps(event.data)}\n\n"
|
|
|
|
except asyncio.TimeoutError:
|
|
yield ": keepalive\n\n"
|
|
finally:
|
|
await subscriber.unsubscribe()
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream"
|
|
)
|
|
```
|
|
|
|
### Frontend Integration
|
|
|
|
```typescript
|
|
// frontend/lib/messageEvents.ts
|
|
import { useEffect, useCallback } from 'react';
|
|
|
|
interface MessageEvent {
|
|
type: 'message_sent' | 'message_received' | 'conversation_started';
|
|
data: {
|
|
message_id?: string;
|
|
from?: AgentIdentifier;
|
|
to?: AgentIdentifier;
|
|
action?: string;
|
|
preview?: string;
|
|
};
|
|
}
|
|
|
|
export function useMessageEvents(
|
|
projectId: string,
|
|
onMessage: (event: MessageEvent) => void
|
|
) {
|
|
useEffect(() => {
|
|
const eventSource = new EventSource(
|
|
`/api/v1/projects/${projectId}/messages/events`,
|
|
{ withCredentials: true }
|
|
);
|
|
|
|
eventSource.addEventListener('message_sent', (e) => {
|
|
onMessage({ type: 'message_sent', data: JSON.parse(e.data) });
|
|
});
|
|
|
|
eventSource.addEventListener('message_received', (e) => {
|
|
onMessage({ type: 'message_received', data: JSON.parse(e.data) });
|
|
});
|
|
|
|
eventSource.addEventListener('conversation_started', (e) => {
|
|
onMessage({ type: 'conversation_started', data: JSON.parse(e.data) });
|
|
});
|
|
|
|
eventSource.onerror = () => {
|
|
// Reconnection handled by EventSource
|
|
console.log('Message SSE reconnecting...');
|
|
};
|
|
|
|
return () => eventSource.close();
|
|
}, [projectId, onMessage]);
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Syndarix-Specific Requirements
|
|
|
|
### @Mentions Support
|
|
|
|
```python
|
|
class MentionParser:
|
|
"""Parse @mentions in message content."""
|
|
|
|
MENTION_PATTERN = r'@(\w+)'
|
|
ROLE_PATTERN = r'@(engineers?|architects?|qa|po|pm|devops)'
|
|
|
|
def parse(self, content: str) -> list[Mention]:
|
|
mentions = []
|
|
|
|
# Parse agent mentions
|
|
for match in re.finditer(self.MENTION_PATTERN, content):
|
|
name = match.group(1)
|
|
mentions.append(Mention(type="agent", name=name, position=match.start()))
|
|
|
|
# Parse role mentions
|
|
for match in re.finditer(self.ROLE_PATTERN, content, re.IGNORECASE):
|
|
role = match.group(1)
|
|
mentions.append(Mention(type="role", role=role, position=match.start()))
|
|
|
|
return mentions
|
|
|
|
async def resolve_mentions(
|
|
self,
|
|
mentions: list[Mention],
|
|
project_id: UUID,
|
|
orchestrator: AgentOrchestrator
|
|
) -> list[UUID]:
|
|
"""Resolve mentions to agent IDs."""
|
|
agent_ids = set()
|
|
|
|
for mention in mentions:
|
|
if mention.type == "agent":
|
|
agent = await orchestrator.get_by_name(project_id, mention.name)
|
|
if agent:
|
|
agent_ids.add(agent.id)
|
|
elif mention.type == "role":
|
|
agents = await orchestrator.get_instances_by_role(
|
|
project_id, mention.role
|
|
)
|
|
agent_ids.update(a.id for a in agents)
|
|
|
|
return list(agent_ids)
|
|
```
|
|
|
|
### Priority Message Handling
|
|
|
|
```python
|
|
class PriorityMessageHandler:
|
|
"""Handle urgent messages with priority."""
|
|
|
|
PRIORITY_INTERRUPTS = {"urgent", "high"}
|
|
|
|
async def process_with_priority(
|
|
self,
|
|
agent_id: UUID,
|
|
message: AgentMessage
|
|
):
|
|
"""Process message based on priority."""
|
|
if message.metadata.priority in self.PRIORITY_INTERRUPTS:
|
|
# Interrupt current task
|
|
await self.orchestrator.interrupt_agent(agent_id)
|
|
|
|
# Publish urgent notification
|
|
await self.event_bus.publish(
|
|
f"project:{message.context.project_id}",
|
|
{
|
|
"type": "urgent_message",
|
|
"agent_id": str(agent_id),
|
|
"message_id": str(message.id),
|
|
"from": message.sender.name,
|
|
"preview": message.content[:100]
|
|
}
|
|
)
|
|
|
|
# Process immediately
|
|
await self.process_immediately(agent_id, message)
|
|
else:
|
|
# Queue for normal processing
|
|
await self.queue_for_processing(agent_id, message)
|
|
```
|
|
|
|
### Communication Traceability
|
|
|
|
```python
|
|
class CommunicationTracer:
|
|
"""Trace all inter-agent communication."""
|
|
|
|
async def trace_message_flow(
|
|
self,
|
|
conversation_id: UUID
|
|
) -> MessageFlowTrace:
|
|
"""Generate a trace of message flow in a conversation."""
|
|
messages = await self.get_conversation_messages(conversation_id)
|
|
|
|
nodes = []
|
|
edges = []
|
|
|
|
for msg in messages:
|
|
nodes.append({
|
|
"id": str(msg.id),
|
|
"agent": msg.sender_name,
|
|
"role": msg.sender_role,
|
|
"action": msg.action,
|
|
"timestamp": msg.created_at.isoformat()
|
|
})
|
|
|
|
if msg.in_response_to_id:
|
|
edges.append({
|
|
"from": str(msg.in_response_to_id),
|
|
"to": str(msg.id),
|
|
"type": "response"
|
|
})
|
|
|
|
if msg.parent_message_id:
|
|
edges.append({
|
|
"from": str(msg.parent_message_id),
|
|
"to": str(msg.id),
|
|
"type": "thread"
|
|
})
|
|
|
|
return MessageFlowTrace(
|
|
conversation_id=conversation_id,
|
|
nodes=nodes,
|
|
edges=edges,
|
|
summary=await self.generate_summary(messages)
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Considerations
|
|
|
|
### Message Throughput
|
|
|
|
| Scenario | Target | Implementation |
|
|
|----------|--------|----------------|
|
|
| Direct messages | 100/sec | Redis Pub/Sub |
|
|
| Broadcasts | 10/sec | Batched delivery |
|
|
| Task delegation | 50/sec | Celery queue |
|
|
| Message persistence | 200/sec | Batch inserts |
|
|
|
|
### Scalability
|
|
|
|
```python
|
|
# Message batching for high-throughput scenarios
|
|
class MessageBatcher:
|
|
def __init__(self, batch_size: int = 100, flush_interval: float = 0.1):
|
|
self.batch_size = batch_size
|
|
self.flush_interval = flush_interval
|
|
self._buffer: list[AgentMessage] = []
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def add(self, message: AgentMessage):
|
|
async with self._lock:
|
|
self._buffer.append(message)
|
|
|
|
if len(self._buffer) >= self.batch_size:
|
|
await self._flush()
|
|
|
|
async def _flush(self):
|
|
if not self._buffer:
|
|
return
|
|
|
|
messages = self._buffer
|
|
self._buffer = []
|
|
|
|
# Batch insert
|
|
await self.db.execute(
|
|
insert(AgentMessageModel),
|
|
[self._to_dict(m) for m in messages]
|
|
)
|
|
await self.db.commit()
|
|
```
|
|
|
|
### Memory Management
|
|
|
|
- Conversation history limited to last 50 messages in-memory
|
|
- Older messages retrieved on-demand from database
|
|
- Redis TTL of 1 hour for active conversation cache
|
|
- Periodic archival of resolved conversations
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
### Industry Protocols Researched
|
|
|
|
- [Google A2A Protocol](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/) - Agent-to-Agent Protocol
|
|
- [IBM ACP](https://arxiv.org/html/2505.02279v1) - Agent Communication Protocol
|
|
- [Anthropic MCP](https://spec.modelcontextprotocol.io/) - Model Context Protocol
|
|
- [Linux Foundation A2A Project](https://www.linuxfoundation.org/press/linux-foundation-launches-the-agent2agent-protocol-project-to-enable-secure-intelligent-communication-between-ai-agents)
|
|
|
|
### Multi-Agent System Research
|
|
|
|
- [Multi-Agent Collaboration Mechanisms Survey](https://arxiv.org/html/2501.06322v1)
|
|
- [Conflict Resolution in Multi-Agent Systems](https://zilliz.com/ai-faq/how-do-multiagent-systems-manage-conflict-resolution)
|
|
- [Agent Memory Management](https://www.letta.com/blog/agent-memory)
|
|
- [Context Engineering for LLM Agents](https://developers.googleblog.com/architecting-efficient-context-aware-multi-agent-framework-for-production/)
|
|
|
|
### Syndarix Architecture
|
|
|
|
- [ADR-002: Real-time Communication](../adrs/ADR-002-realtime-communication.md)
|
|
- [ADR-006: Agent Orchestration](../adrs/ADR-006-agent-orchestration.md)
|
|
- [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md)
|
|
|
|
---
|
|
|
|
## Decision
|
|
|
|
**Adopt a hybrid message-based communication protocol** with:
|
|
|
|
1. **Structured JSON messages** with natural language content
|
|
2. **Three routing strategies**: Direct, Role-based, Broadcast
|
|
3. **Redis Pub/Sub** for real-time delivery
|
|
4. **PostgreSQL** for message persistence and audit
|
|
5. **Celery** for async task delegation
|
|
6. **SSE** for client notifications
|
|
7. **Three-tier context management** for conversation continuity
|
|
8. **Hierarchical conflict resolution** with human escalation
|
|
|
|
This protocol integrates seamlessly with existing Syndarix infrastructure while following industry best practices from A2A and ACP protocols.
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform implementation of inter-agent communication in the Agent Orchestration layer.*
|