Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
55 KiB
SPIKE-009: Issue Synchronization with External Trackers
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #9
Executive Summary
This spike researches bi-directional issue synchronization between Syndarix and external issue trackers (Gitea, GitHub, GitLab). After analyzing sync patterns, conflict resolution strategies, and API capabilities of each platform, we recommend:
Primary Recommendation: Implement a webhook-first, polling-fallback architecture with Last-Writer-Wins (LWW) conflict resolution using vector clocks for causality tracking. External trackers serve as the source of truth with Syndarix maintaining local mirrors for unified agent access.
Key Decisions:
- Use webhooks for real-time sync; polling for reconciliation and initial import
- Implement version vectors for conflict detection with LWW resolution
- Store sync metadata in dedicated
issue_sync_logtable for audit and recovery - Abstract provider differences behind a unified
IssueProviderinterface - Use Redis for webhook event queuing and deduplication
Table of Contents
- Research Questions & Answers
- Sync Architecture
- Conflict Resolution Strategy
- Webhook Handling Design
- Provider API Comparison
- Database Schema
- Field Mapping Specification
- Code Examples
- Error Handling & Recovery
- Implementation Roadmap
- References
Research Questions & Answers
1. Best patterns for bi-directional data sync?
Answer: The Hub-and-Spoke pattern with Syndarix as the hub is recommended. This pattern:
- Designates external trackers as authoritative sources
- Maintains local mirrors for unified access
- Synchronizes changes bidirectionally through a central sync engine
- Uses explicit ownership rules per field to prevent conflicts
Modern implementations leverage Conflict-Free Replicated Data Types (CRDTs) for automatic conflict resolution, but for issue tracking where human review may be desired, version vectors with LWW provides better control.
2. Handling sync conflicts (edited in both places)?
Answer: Implement a tiered conflict resolution strategy:
| Scenario | Resolution |
|---|---|
| Same field, different times | Last-Writer-Wins (LWW) |
| Same field, concurrent edits | Mark as conflict, notify user |
| Different fields | Merge both changes |
| Delete vs Update | Delete wins (configurable) |
Use version vectors to detect concurrent modifications. Each system maintains a version counter, and conflicts are identified when neither version dominates the other.
3. Webhook vs polling strategies?
Answer: Hybrid approach - webhooks primary, polling secondary.
| Strategy | Use Case | Latency | Resource Cost |
|---|---|---|---|
| Webhooks | Real-time sync | <1s | Low |
| Polling | Initial import, reconciliation, fallback | Minutes | Medium |
| On-demand | User-triggered refresh | Immediate | Minimal |
Rationale: Webhooks provide real-time updates but may miss events during outages. Periodic polling (every 15-30 minutes) ensures eventual consistency.
4. Rate limiting and API quota management?
Answer: Implement a token bucket with adaptive throttling:
| Provider | Auth Rate Limit | Unauthenticated |
|---|---|---|
| GitHub | 5,000/hour | 60/hour |
| GitLab | 600/minute | 10/minute |
| Gitea | Configurable (default: 50 items/response) | N/A |
Strategies:
- Use conditional requests (
If-None-Match,If-Modified-Since) to avoid counting unchanged responses - Implement exponential backoff on 429/403 responses
- Cache responses with ETags
- Batch operations where possible
- Monitor
X-RateLimit-Remainingheaders
5. Eventual consistency vs strong consistency tradeoffs?
Answer: Eventual consistency is acceptable and recommended for issue sync.
| Consistency | Pros | Cons |
|---|---|---|
| Strong | Always accurate | Higher latency, complex implementation |
| Eventual | Better performance, simpler | Temporary inconsistency |
Rationale: Issue tracking tolerates brief inconsistency windows (seconds to minutes). Users can manually refresh if needed. The simplicity and performance gains outweigh the drawbacks.
6. How to map different field schemas?
Answer: Use a canonical field model with provider-specific adapters.
External Field → Provider Adapter → Canonical Model → Local Storage
Local Field → Canonical Model → Provider Adapter → External Field
See Field Mapping Specification for detailed mappings.
7. Handling offline/disconnected scenarios?
Answer: Implement an outbox pattern with retry queue:
- Queue all outgoing changes in local
sync_outboxtable - Background worker processes queue with exponential backoff
- Mark items as
pending,in_progress,completed, orfailed - Dead letter queue for items exceeding max retries
- Manual reconciliation UI for failed items
Sync Architecture
High-Level Architecture Diagram
External Issue Trackers
┌─────────────────┬─────────────────┬─────────────────┐
│ Gitea │ GitHub │ GitLab │
│ (Primary) │ │ │
└────────┬────────┴────────┬────────┴────────┬────────┘
│ │ │
┌────────┴─────────────────┴─────────────────┴────────┐
│ Webhooks │
└─────────────────────────┬───────────────────────────┘
│
┌─────────────────────────────────────────────────┼─────────────────────────────────────────────┐
│ Syndarix Backend │
│ ┌────────────────────┐ ┌────────────┴───────────┐ ┌────────────────────┐ │
│ │ Webhook Handler │◀──────────│ Redis Queue │──────────▶│ Polling Worker │ │
│ │ (FastAPI Route) │ │ (Event Dedup/Buffer) │ │ (Celery Beat) │ │
│ └─────────┬──────────┘ └────────────────────────┘ └─────────┬──────────┘ │
│ │ │ │
│ └──────────────────────────┬──────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ Sync Engine │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Provider Factory │ │ │
│ │ │ ┌─────────────┐ │ │ │
│ │ │ │GiteaProvider│ │ │ │
│ │ │ │GitHubProvider│ │ │ │
│ │ │ │GitLabProvider│ │ │ │
│ │ │ └─────────────┘ │ │ │
│ │ └──────────────────┘ │ │
│ │ ┌──────────────────┐ │ │
│ │ │Conflict Resolver │ │ │
│ │ └──────────────────┘ │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Field Mapper │ │ │
│ │ └──────────────────┘ │ │
│ └────────────┬────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ PostgreSQL │ │
│ │ ┌──────────────────┐ │ │
│ │ │ issues │ │ │
│ │ │ issue_sync_log │ │ │
│ │ │ sync_outbox │ │ │
│ │ │ external_links │ │ │
│ │ └──────────────────┘ │ │
│ └─────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────────────────────────────┘
Component Responsibilities
| Component | Responsibility |
|---|---|
| Webhook Handler | Receive, validate, and queue incoming webhooks |
| Redis Queue | Buffer events, deduplicate, handle backpressure |
| Polling Worker | Periodic reconciliation, initial import, fallback |
| Sync Engine | Orchestrate sync operations, apply business logic |
| Provider Factory | Create provider-specific clients |
| Conflict Resolver | Detect and resolve sync conflicts |
| Field Mapper | Transform between canonical and provider schemas |
Data Flow
Inbound (External to Syndarix):
1. Webhook received → Validate signature → Queue in Redis
2. Worker dequeues → Parse payload → Transform to canonical model
3. Check for conflicts → Apply resolution strategy
4. Update local database → Log sync operation
5. Notify connected clients via SSE (per SPIKE-003)
Outbound (Syndarix to External):
1. Local change detected → Transform to provider format
2. Queue in sync_outbox → Worker picks up
3. Call external API → Handle response
4. Update sync metadata → Mark as synced
5. Handle failures → Retry with backoff
Conflict Resolution Strategy
Version Vector Implementation
Each issue maintains a version vector tracking modifications across systems:
# Version vector structure
{
"syndarix": 5, # Local modification count
"gitea": 3, # Gitea modification count
"github": 0, # Not synced with GitHub
"gitlab": 2 # GitLab modification count
}
Conflict Detection Algorithm
def detect_conflict(local_version: dict, remote_version: dict) -> str:
"""
Compare version vectors to detect conflicts.
Returns:
- "local_wins": Local version dominates
- "remote_wins": Remote version dominates
- "conflict": Concurrent modification (neither dominates)
- "equal": No changes
"""
local_dominates = all(
local_version.get(k, 0) >= v
for k, v in remote_version.items()
)
remote_dominates = all(
remote_version.get(k, 0) >= v
for k, v in local_version.items()
)
if local_version == remote_version:
return "equal"
elif local_dominates and not remote_dominates:
return "local_wins"
elif remote_dominates and not local_dominates:
return "remote_wins"
else:
return "conflict"
Resolution Strategies
from enum import Enum
class ConflictStrategy(str, Enum):
REMOTE_WINS = "remote_wins" # External tracker is source of truth
LOCAL_WINS = "local_wins" # Syndarix changes take precedence
LAST_WRITE_WINS = "lww" # Most recent timestamp wins
MANUAL = "manual" # Flag for human review
MERGE = "merge" # Attempt field-level merge
# Default strategy per field
FIELD_STRATEGIES = {
"title": ConflictStrategy.LAST_WRITE_WINS,
"description": ConflictStrategy.MERGE,
"status": ConflictStrategy.REMOTE_WINS,
"assignees": ConflictStrategy.MERGE,
"labels": ConflictStrategy.MERGE,
"comments": ConflictStrategy.MERGE, # Append both
"priority": ConflictStrategy.REMOTE_WINS,
}
Merge Algorithm for Complex Fields
def merge_labels(local: list[str], remote: list[str], base: list[str]) -> list[str]:
"""Three-way merge for labels."""
local_added = set(local) - set(base)
local_removed = set(base) - set(local)
remote_added = set(remote) - set(base)
remote_removed = set(base) - set(remote)
result = set(base)
result |= local_added | remote_added
result -= local_removed | remote_removed
return sorted(result)
Webhook Handling Design
Webhook Endpoint Architecture
# app/api/v1/webhooks/issues.py
from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
from app.services.sync.webhook_handler import WebhookHandler
from app.core.redis import redis_client
import hashlib
import hmac
router = APIRouter()
@router.post("/webhooks/{provider}/{project_id}")
async def receive_webhook(
provider: str,
project_id: str,
request: Request,
background_tasks: BackgroundTasks
):
"""
Unified webhook endpoint for all providers.
Path: /api/v1/webhooks/{provider}/{project_id}
Providers: gitea, github, gitlab
"""
body = await request.body()
headers = dict(request.headers)
# Validate webhook signature
handler = WebhookHandler.get_handler(provider)
if not handler.verify_signature(body, headers):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse event type
event_type = handler.get_event_type(headers)
if event_type not in handler.supported_events:
return {"status": "ignored", "reason": "unsupported_event"}
# Deduplicate using event ID
event_id = handler.get_event_id(headers, body)
if await is_duplicate(event_id):
return {"status": "duplicate"}
# Queue for processing
await redis_client.xadd(
f"webhooks:{project_id}",
{
"provider": provider,
"event_type": event_type,
"payload": body,
"received_at": datetime.utcnow().isoformat()
}
)
return {"status": "queued", "event_id": event_id}
async def is_duplicate(event_id: str, ttl: int = 3600) -> bool:
"""Check if event was already processed (Redis-based dedup)."""
key = f"webhook:processed:{event_id}"
result = await redis_client.set(key, "1", ex=ttl, nx=True)
return result is None # None means key existed
Provider-Specific Signature Validation
# app/services/sync/webhook_validators.py
class GiteaWebhookValidator:
@staticmethod
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
"""Gitea uses X-Gitea-Signature with HMAC-SHA256."""
signature = headers.get("x-gitea-signature", "")
expected = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@staticmethod
def get_event_type(headers: dict) -> str:
return headers.get("x-gitea-event", "")
class GitHubWebhookValidator:
@staticmethod
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
"""GitHub uses X-Hub-Signature-256 with sha256=HMAC."""
signature = headers.get("x-hub-signature-256", "")
if not signature.startswith("sha256="):
return False
expected = "sha256=" + hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@staticmethod
def get_event_type(headers: dict) -> str:
return headers.get("x-github-event", "")
class GitLabWebhookValidator:
@staticmethod
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
"""GitLab uses X-Gitlab-Token for simple token matching."""
token = headers.get("x-gitlab-token", "")
return hmac.compare_digest(token, secret)
@staticmethod
def get_event_type(headers: dict) -> str:
return headers.get("x-gitlab-event", "")
Webhook Event Processing (Celery Worker)
# app/workers/sync_worker.py
from celery import Celery
from app.services.sync.sync_engine import SyncEngine
from app.core.redis import redis_client
celery_app = Celery("syndarix")
@celery_app.task(bind=True, max_retries=3)
def process_webhook_event(self, project_id: str):
"""Process queued webhook events for a project."""
try:
# Read from Redis stream
events = redis_client.xread(
{f"webhooks:{project_id}": "0"},
count=10,
block=5000
)
sync_engine = SyncEngine()
for stream_name, messages in events:
for message_id, data in messages:
try:
sync_engine.process_inbound_event(
provider=data["provider"],
event_type=data["event_type"],
payload=data["payload"]
)
# Acknowledge processed
redis_client.xdel(stream_name, message_id)
except Exception as e:
log.error(f"Failed to process {message_id}: {e}")
# Will retry on next run
except Exception as exc:
self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
Provider API Comparison
Issue Field Support Matrix
| Field | Gitea | GitHub | GitLab | Syndarix Canonical |
|---|---|---|---|---|
| ID | id (int) |
id (int) |
id (int) |
external_id (str) |
| Number | number |
number |
iid |
external_number |
| Title | title |
title |
title |
title |
| Body | body |
body |
description |
description |
| State | state (open/closed) |
state (open/closed) |
state (opened/closed) |
status (enum) |
| Assignees | assignees[] |
assignees[] |
assignees[] |
assignee_ids[] |
| Labels | labels[].name |
labels[].name |
labels[] (strings) |
labels[] |
| Milestone | milestone.title |
milestone.title |
milestone.title |
milestone |
| Created | created_at |
created_at |
created_at |
created_at |
| Updated | updated_at |
updated_at |
updated_at |
updated_at |
| Due Date | due_date |
N/A | due_date |
due_date |
| Priority | N/A (via labels) | N/A (via labels) | N/A (via labels) | priority |
| URL | html_url |
html_url |
web_url |
remote_url |
Webhook Event Mapping
| Action | Gitea Event | GitHub Event | GitLab Event |
|---|---|---|---|
| Create | issues:opened |
issues:opened |
Issue Hook:open |
| Update | issues:edited |
issues:edited |
Issue Hook:update |
| Close | issues:closed |
issues:closed |
Issue Hook:close |
| Reopen | issues:reopened |
issues:reopened |
Issue Hook:reopen |
| Assign | issues:assigned |
issues:assigned |
Issue Hook:update |
| Label | issues:label_updated |
issues:labeled |
Issue Hook:update |
| Comment | issue_comment:created |
issue_comment:created |
Note Hook |
Rate Limits Comparison
| Provider | Authenticated | Pagination | Conditional Requests |
|---|---|---|---|
| GitHub | 5,000/hour | 100/page max | ETag, If-Modified-Since |
| GitLab | 600/minute | 100/page max | ETag |
| Gitea | Configurable | 50/page default | Link header |
Database Schema
Core Tables
-- Extended issues table with sync metadata
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_id VARCHAR(255);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_number INTEGER;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS remote_url TEXT;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider VARCHAR(50);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider_repo_id VARCHAR(255);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS sync_status VARCHAR(20) DEFAULT 'pending';
ALTER TABLE issues ADD COLUMN IF NOT EXISTS version_vector JSONB DEFAULT '{}';
ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_updated_at TIMESTAMP WITH TIME ZONE;
-- Sync status enum values: synced, pending, conflict, error
CREATE INDEX idx_issues_external_id ON issues(provider, external_id);
CREATE INDEX idx_issues_sync_status ON issues(sync_status);
-- Issue sync log for audit trail
CREATE TABLE issue_sync_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
-- Sync details
direction VARCHAR(10) NOT NULL, -- 'inbound' or 'outbound'
provider VARCHAR(50) NOT NULL,
event_type VARCHAR(100) NOT NULL,
-- Change tracking
previous_state JSONB,
new_state JSONB,
diff JSONB,
-- Conflict info
had_conflict BOOLEAN DEFAULT FALSE,
conflict_resolution VARCHAR(50),
conflict_details JSONB,
-- Status
status VARCHAR(20) NOT NULL, -- success, failed, skipped
error_message TEXT,
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
-- Webhook metadata
webhook_event_id VARCHAR(255),
webhook_delivery_id VARCHAR(255)
);
CREATE INDEX idx_sync_log_issue ON issue_sync_log(issue_id);
CREATE INDEX idx_sync_log_project ON issue_sync_log(project_id);
CREATE INDEX idx_sync_log_status ON issue_sync_log(status);
CREATE INDEX idx_sync_log_created ON issue_sync_log(created_at);
-- Outbox for pending outbound syncs
CREATE TABLE sync_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
-- Sync target
provider VARCHAR(50) NOT NULL,
operation VARCHAR(20) NOT NULL, -- create, update, delete
payload JSONB NOT NULL,
-- Processing status
status VARCHAR(20) DEFAULT 'pending', -- pending, in_progress, completed, failed, dead_letter
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 5,
last_error TEXT,
-- Scheduling
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
scheduled_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
-- Idempotency
idempotency_key VARCHAR(255) UNIQUE
);
CREATE INDEX idx_outbox_status ON sync_outbox(status, scheduled_at);
CREATE INDEX idx_outbox_issue ON sync_outbox(issue_id);
-- External provider connections
CREATE TABLE external_connections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
organization_id UUID REFERENCES organizations(id) ON DELETE CASCADE,
-- Provider details
provider VARCHAR(50) NOT NULL,
provider_url TEXT NOT NULL, -- Base URL (e.g., https://gitea.example.com)
repo_owner VARCHAR(255) NOT NULL,
repo_name VARCHAR(255) NOT NULL,
-- Authentication
auth_type VARCHAR(20) NOT NULL, -- token, oauth, app
credentials_encrypted TEXT, -- Encrypted token/credentials
-- Webhook configuration
webhook_secret_encrypted TEXT,
webhook_id VARCHAR(255), -- ID from provider
webhook_active BOOLEAN DEFAULT TRUE,
-- Sync settings
sync_enabled BOOLEAN DEFAULT TRUE,
sync_direction VARCHAR(20) DEFAULT 'bidirectional', -- inbound, outbound, bidirectional
sync_labels_filter JSONB, -- Only sync issues with these labels
sync_milestones_filter JSONB,
-- Status tracking
last_sync_at TIMESTAMP WITH TIME ZONE,
last_error TEXT,
status VARCHAR(20) DEFAULT 'active', -- active, paused, error
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(project_id, provider, repo_owner, repo_name)
);
CREATE INDEX idx_connections_project ON external_connections(project_id);
CREATE INDEX idx_connections_status ON external_connections(status);
SQLAlchemy Models
# app/models/issue_sync.py
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, JSON
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
import enum
class SyncStatus(str, enum.Enum):
SYNCED = "synced"
PENDING = "pending"
CONFLICT = "conflict"
ERROR = "error"
class SyncDirection(str, enum.Enum):
INBOUND = "inbound"
OUTBOUND = "outbound"
BIDIRECTIONAL = "bidirectional"
class Provider(str, enum.Enum):
GITEA = "gitea"
GITHUB = "github"
GITLAB = "gitlab"
class ExternalConnection(Base, UUIDMixin, TimestampMixin):
__tablename__ = "external_connections"
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"))
provider = Column(String(50), nullable=False)
provider_url = Column(String, nullable=False)
repo_owner = Column(String(255), nullable=False)
repo_name = Column(String(255), nullable=False)
auth_type = Column(String(20), nullable=False)
credentials_encrypted = Column(String)
webhook_secret_encrypted = Column(String)
webhook_id = Column(String(255))
webhook_active = Column(Boolean, default=True)
sync_enabled = Column(Boolean, default=True)
sync_direction = Column(String(20), default="bidirectional")
sync_labels_filter = Column(JSONB)
sync_milestones_filter = Column(JSONB)
last_sync_at = Column(DateTime(timezone=True))
last_error = Column(String)
status = Column(String(20), default="active")
class IssueSyncLog(Base, UUIDMixin):
__tablename__ = "issue_sync_log"
issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))
direction = Column(String(10), nullable=False)
provider = Column(String(50), nullable=False)
event_type = Column(String(100), nullable=False)
previous_state = Column(JSONB)
new_state = Column(JSONB)
diff = Column(JSONB)
had_conflict = Column(Boolean, default=False)
conflict_resolution = Column(String(50))
conflict_details = Column(JSONB)
status = Column(String(20), nullable=False)
error_message = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
processed_at = Column(DateTime(timezone=True))
webhook_event_id = Column(String(255))
webhook_delivery_id = Column(String(255))
class SyncOutbox(Base, UUIDMixin):
__tablename__ = "sync_outbox"
issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))
provider = Column(String(50), nullable=False)
operation = Column(String(20), nullable=False)
payload = Column(JSONB, nullable=False)
status = Column(String(20), default="pending")
attempts = Column(Integer, default=0)
max_attempts = Column(Integer, default=5)
last_error = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
scheduled_at = Column(DateTime(timezone=True), server_default=func.now())
processed_at = Column(DateTime(timezone=True))
idempotency_key = Column(String(255), unique=True)
Field Mapping Specification
Canonical Issue Model
# app/schemas/sync/canonical.py
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
from enum import Enum
class IssueStatus(str, Enum):
OPEN = "open"
CLOSED = "closed"
IN_PROGRESS = "in_progress" # Syndarix-only
class CanonicalIssue(BaseModel):
"""Canonical issue representation for sync operations."""
# Identity
external_id: str
external_number: int
remote_url: str
# Core fields
title: str
description: Optional[str] = None
status: IssueStatus
# Relationships
assignee_ids: list[str] = Field(default_factory=list)
assignee_usernames: list[str] = Field(default_factory=list)
labels: list[str] = Field(default_factory=list)
milestone: Optional[str] = None
# Metadata
created_at: datetime
updated_at: datetime
closed_at: Optional[datetime] = None
due_date: Optional[datetime] = None
# Provider info
provider: str
raw_data: dict = Field(default_factory=dict) # Original payload
Provider Adapters
# app/services/sync/adapters/gitea.py
from app.schemas.sync.canonical import CanonicalIssue, IssueStatus
class GiteaAdapter:
"""Transform between Gitea API format and canonical model."""
@staticmethod
def to_canonical(gitea_issue: dict, base_url: str) -> CanonicalIssue:
return CanonicalIssue(
external_id=str(gitea_issue["id"]),
external_number=gitea_issue["number"],
remote_url=gitea_issue["html_url"],
title=gitea_issue["title"],
description=gitea_issue.get("body"),
status=IssueStatus.OPEN if gitea_issue["state"] == "open" else IssueStatus.CLOSED,
assignee_ids=[str(a["id"]) for a in gitea_issue.get("assignees", [])],
assignee_usernames=[a["login"] for a in gitea_issue.get("assignees", [])],
labels=[label["name"] for label in gitea_issue.get("labels", [])],
milestone=gitea_issue.get("milestone", {}).get("title"),
created_at=gitea_issue["created_at"],
updated_at=gitea_issue["updated_at"],
closed_at=gitea_issue.get("closed_at"),
due_date=gitea_issue.get("due_date"),
provider="gitea",
raw_data=gitea_issue
)
@staticmethod
def from_canonical(issue: CanonicalIssue) -> dict:
"""Convert canonical to Gitea API format for updates."""
return {
"title": issue.title,
"body": issue.description,
"state": "open" if issue.status == IssueStatus.OPEN else "closed",
"assignees": issue.assignee_usernames,
"labels": issue.labels,
"milestone": issue.milestone,
"due_date": issue.due_date.isoformat() if issue.due_date else None,
}
# app/services/sync/adapters/github.py
class GitHubAdapter:
"""Transform between GitHub API format and canonical model."""
@staticmethod
def to_canonical(github_issue: dict, base_url: str) -> CanonicalIssue:
return CanonicalIssue(
external_id=str(github_issue["id"]),
external_number=github_issue["number"],
remote_url=github_issue["html_url"],
title=github_issue["title"],
description=github_issue.get("body"),
status=IssueStatus.OPEN if github_issue["state"] == "open" else IssueStatus.CLOSED,
assignee_ids=[str(a["id"]) for a in github_issue.get("assignees", [])],
assignee_usernames=[a["login"] for a in github_issue.get("assignees", [])],
labels=[label["name"] for label in github_issue.get("labels", [])],
milestone=github_issue.get("milestone", {}).get("title") if github_issue.get("milestone") else None,
created_at=github_issue["created_at"],
updated_at=github_issue["updated_at"],
closed_at=github_issue.get("closed_at"),
due_date=None, # GitHub doesn't have native due dates
provider="github",
raw_data=github_issue
)
@staticmethod
def from_canonical(issue: CanonicalIssue) -> dict:
return {
"title": issue.title,
"body": issue.description,
"state": "open" if issue.status == IssueStatus.OPEN else "closed",
"assignees": issue.assignee_usernames,
"labels": issue.labels,
"milestone": issue.milestone,
}
# app/services/sync/adapters/gitlab.py
class GitLabAdapter:
"""Transform between GitLab API format and canonical model."""
@staticmethod
def to_canonical(gitlab_issue: dict, base_url: str) -> CanonicalIssue:
# GitLab uses 'opened' instead of 'open'
state = gitlab_issue["state"]
status = IssueStatus.OPEN if state == "opened" else IssueStatus.CLOSED
return CanonicalIssue(
external_id=str(gitlab_issue["id"]),
external_number=gitlab_issue["iid"], # GitLab uses iid for project-scoped number
remote_url=gitlab_issue["web_url"],
title=gitlab_issue["title"],
description=gitlab_issue.get("description"),
status=status,
assignee_ids=[str(a["id"]) for a in gitlab_issue.get("assignees", [])],
assignee_usernames=[a["username"] for a in gitlab_issue.get("assignees", [])],
labels=gitlab_issue.get("labels", []), # GitLab returns labels as strings
milestone=gitlab_issue.get("milestone", {}).get("title") if gitlab_issue.get("milestone") else None,
created_at=gitlab_issue["created_at"],
updated_at=gitlab_issue["updated_at"],
closed_at=gitlab_issue.get("closed_at"),
due_date=gitlab_issue.get("due_date"),
provider="gitlab",
raw_data=gitlab_issue
)
@staticmethod
def from_canonical(issue: CanonicalIssue) -> dict:
return {
"title": issue.title,
"description": issue.description,
"state_event": "reopen" if issue.status == IssueStatus.OPEN else "close",
"assignee_ids": issue.assignee_ids,
"labels": ",".join(issue.labels),
"milestone_id": issue.milestone, # Requires ID lookup
"due_date": issue.due_date.isoformat() if issue.due_date else None,
}
Code Examples
Provider Interface
# app/services/sync/providers/base.py
from abc import ABC, abstractmethod
from typing import AsyncIterator
from app.schemas.sync.canonical import CanonicalIssue
class IssueProvider(ABC):
"""Abstract base class for issue tracker providers."""
def __init__(self, connection: ExternalConnection):
self.connection = connection
self.base_url = connection.provider_url
self.repo_owner = connection.repo_owner
self.repo_name = connection.repo_name
@abstractmethod
async def get_issue(self, issue_number: int) -> CanonicalIssue:
"""Fetch a single issue by number."""
pass
@abstractmethod
async def list_issues(
self,
state: str = "all",
since: datetime = None,
labels: list[str] = None
) -> AsyncIterator[CanonicalIssue]:
"""List issues with optional filters."""
pass
@abstractmethod
async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
"""Create a new issue."""
pass
@abstractmethod
async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
"""Update an existing issue."""
pass
@abstractmethod
async def add_comment(self, issue_number: int, body: str) -> dict:
"""Add a comment to an issue."""
pass
@abstractmethod
async def setup_webhook(self, callback_url: str, secret: str) -> str:
"""Configure webhook for issue events. Returns webhook ID."""
pass
@abstractmethod
async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
"""Verify webhook signature."""
pass
Gitea Provider Implementation
# app/services/sync/providers/gitea.py
import httpx
from app.services.sync.providers.base import IssueProvider
from app.services.sync.adapters.gitea import GiteaAdapter
class GiteaProvider(IssueProvider):
"""Gitea issue tracker provider."""
def __init__(self, connection: ExternalConnection):
super().__init__(connection)
self.adapter = GiteaAdapter()
self._client = None
@property
def api_url(self) -> str:
return f"{self.base_url}/api/v1"
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
token = decrypt(self.connection.credentials_encrypted)
self._client = httpx.AsyncClient(
base_url=self.api_url,
headers={
"Authorization": f"token {token}",
"Accept": "application/json",
},
timeout=30.0
)
return self._client
async def get_issue(self, issue_number: int) -> CanonicalIssue:
client = await self._get_client()
response = await client.get(
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}"
)
response.raise_for_status()
return self.adapter.to_canonical(response.json(), self.base_url)
async def list_issues(
self,
state: str = "all",
since: datetime = None,
labels: list[str] = None
) -> AsyncIterator[CanonicalIssue]:
client = await self._get_client()
page = 1
while True:
params = {
"state": state,
"page": page,
"limit": 50, # Gitea default max
}
if since:
params["since"] = since.isoformat()
if labels:
params["labels"] = ",".join(labels)
response = await client.get(
f"/repos/{self.repo_owner}/{self.repo_name}/issues",
params=params
)
response.raise_for_status()
issues = response.json()
if not issues:
break
for issue in issues:
yield self.adapter.to_canonical(issue, self.base_url)
# Check pagination
link_header = response.headers.get("link", "")
if 'rel="next"' not in link_header:
break
page += 1
async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
client = await self._get_client()
payload = self.adapter.from_canonical(issue)
response = await client.post(
f"/repos/{self.repo_owner}/{self.repo_name}/issues",
json=payload
)
response.raise_for_status()
return self.adapter.to_canonical(response.json(), self.base_url)
async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
client = await self._get_client()
payload = self.adapter.from_canonical(issue)
response = await client.patch(
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}",
json=payload
)
response.raise_for_status()
return self.adapter.to_canonical(response.json(), self.base_url)
async def add_comment(self, issue_number: int, body: str) -> dict:
client = await self._get_client()
response = await client.post(
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}/comments",
json={"body": body}
)
response.raise_for_status()
return response.json()
async def setup_webhook(self, callback_url: str, secret: str) -> str:
client = await self._get_client()
response = await client.post(
f"/repos/{self.repo_owner}/{self.repo_name}/hooks",
json={
"type": "gitea",
"active": True,
"events": ["issues", "issue_comment"],
"config": {
"url": callback_url,
"content_type": "json",
"secret": secret,
}
}
)
response.raise_for_status()
return str(response.json()["id"])
async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
secret = decrypt(self.connection.webhook_secret_encrypted)
return GiteaWebhookValidator.verify_signature(payload, headers, secret)
Sync Engine
# app/services/sync/sync_engine.py
from app.services.sync.providers.factory import ProviderFactory
from app.services.sync.conflict_resolver import ConflictResolver
from app.models.issue_sync import IssueSyncLog, SyncOutbox
class SyncEngine:
"""Orchestrates synchronization between Syndarix and external trackers."""
def __init__(self, db: AsyncSession):
self.db = db
self.conflict_resolver = ConflictResolver()
async def sync_inbound(
self,
connection: ExternalConnection,
external_issue: CanonicalIssue
) -> Issue:
"""Sync an issue from external tracker to Syndarix."""
# Find existing local issue
local_issue = await self._find_by_external_id(
connection.project_id,
external_issue.provider,
external_issue.external_id
)
if local_issue is None:
# Create new local issue
local_issue = await self._create_local_issue(
connection.project_id,
external_issue
)
await self._log_sync(
local_issue,
direction="inbound",
event_type="created",
status="success"
)
else:
# Check for conflicts
conflict_result = self.conflict_resolver.check(
local_issue.version_vector,
external_issue.raw_data.get("_version_vector", {}),
local_issue.updated_at,
external_issue.updated_at
)
if conflict_result.has_conflict:
# Apply resolution strategy
resolved = self.conflict_resolver.resolve(
local_issue,
external_issue,
conflict_result
)
await self._update_local_issue(local_issue, resolved)
await self._log_sync(
local_issue,
direction="inbound",
event_type="conflict_resolved",
status="success",
conflict_details=conflict_result.to_dict()
)
else:
# Normal update
await self._update_local_issue(local_issue, external_issue)
await self._log_sync(
local_issue,
direction="inbound",
event_type="updated",
status="success"
)
return local_issue
async def sync_outbound(
self,
connection: ExternalConnection,
local_issue: Issue
):
"""Queue a local issue for sync to external tracker."""
provider = ProviderFactory.get_provider(connection)
canonical = self._to_canonical(local_issue)
outbox_entry = SyncOutbox(
issue_id=local_issue.id,
project_id=connection.project_id,
provider=connection.provider,
operation="update" if local_issue.external_id else "create",
payload=canonical.dict(),
idempotency_key=f"{local_issue.id}:{local_issue.updated_at.isoformat()}"
)
self.db.add(outbox_entry)
await self.db.commit()
async def initial_import(
self,
connection: ExternalConnection,
since: datetime = None,
labels: list[str] = None
) -> int:
"""Import all issues from external tracker."""
provider = ProviderFactory.get_provider(connection)
imported = 0
async for external_issue in provider.list_issues(
state="all",
since=since,
labels=labels
):
await self.sync_inbound(connection, external_issue)
imported += 1
connection.last_sync_at = datetime.utcnow()
await self.db.commit()
return imported
async def reconcile(self, connection: ExternalConnection):
"""
Periodic reconciliation to catch missed webhooks.
Runs via Celery Beat every 15-30 minutes.
"""
since = connection.last_sync_at or (datetime.utcnow() - timedelta(hours=24))
provider = ProviderFactory.get_provider(connection)
async for external_issue in provider.list_issues(
state="all",
since=since
):
local_issue = await self._find_by_external_id(
connection.project_id,
external_issue.provider,
external_issue.external_id
)
if local_issue:
# Check if external is newer
if external_issue.updated_at > local_issue.external_updated_at:
await self.sync_inbound(connection, external_issue)
else:
await self.sync_inbound(connection, external_issue)
connection.last_sync_at = datetime.utcnow()
await self.db.commit()
Error Handling & Recovery
Error Categories and Handling
| Error Type | Handling Strategy | Retry | Alert |
|---|---|---|---|
| Network timeout | Exponential backoff | Yes (3x) | After 3 failures |
| Rate limit (429) | Wait for reset | Yes | No |
| Auth error (401/403) | Mark connection as error | No | Yes |
| Not found (404) | Mark issue as deleted | No | No |
| Conflict (409) | Apply resolution strategy | No | If unresolved |
| Server error (5xx) | Exponential backoff | Yes (5x) | After 5 failures |
| Validation error | Log and skip | No | Yes |
Retry Strategy
# app/services/sync/retry.py
import asyncio
from functools import wraps
def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
retryable_exceptions: tuple = (httpx.TimeoutException, httpx.NetworkError)
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
Dead Letter Queue Handling
# app/workers/dead_letter_worker.py
@celery_app.task
def process_dead_letter_queue():
"""Process failed sync items for manual review or retry."""
dead_items = db.query(SyncOutbox).filter(
SyncOutbox.status == "dead_letter",
SyncOutbox.created_at > datetime.utcnow() - timedelta(days=7)
).all()
for item in dead_items:
# Create notification for admin review
notify_admin(
f"Sync failed for issue {item.issue_id}",
details={
"provider": item.provider,
"operation": item.operation,
"attempts": item.attempts,
"last_error": item.last_error,
}
)
# Optionally attempt one more retry with manual intervention
if should_retry(item):
item.status = "pending"
item.attempts = 0
item.scheduled_at = datetime.utcnow()
Health Monitoring
# app/services/sync/health.py
async def check_sync_health(project_id: str) -> dict:
"""Check sync health for a project."""
connections = await get_connections(project_id)
health = {
"status": "healthy",
"connections": [],
"pending_syncs": 0,
"failed_syncs_24h": 0,
"conflicts_24h": 0,
}
for conn in connections:
conn_health = {
"provider": conn.provider,
"status": conn.status,
"last_sync": conn.last_sync_at,
"webhook_active": conn.webhook_active,
}
# Check if sync is stale
if conn.last_sync_at:
stale_threshold = datetime.utcnow() - timedelta(hours=1)
if conn.last_sync_at < stale_threshold:
conn_health["status"] = "stale"
health["status"] = "degraded"
health["connections"].append(conn_health)
# Count pending and failed
health["pending_syncs"] = await count_pending_syncs(project_id)
health["failed_syncs_24h"] = await count_failed_syncs(project_id, hours=24)
health["conflicts_24h"] = await count_conflicts(project_id, hours=24)
if health["failed_syncs_24h"] > 10:
health["status"] = "unhealthy"
return health
Implementation Roadmap
Phase 1: Foundation (Week 1-2)
- Database schema and migrations
- Core models (ExternalConnection, IssueSyncLog, SyncOutbox)
- Provider interface and Gitea implementation
- Canonical issue model and field mapping
Phase 2: Inbound Sync (Week 2-3)
- Webhook endpoint and signature validation
- Redis queue for webhook events
- Celery worker for event processing
- Initial import functionality
- Basic conflict detection
Phase 3: Outbound Sync (Week 3-4)
- Outbox pattern implementation
- Outbound sync worker
- Retry and dead letter queue
- Bidirectional sync testing
Phase 4: GitHub & GitLab (Week 4-5)
- GitHub provider implementation
- GitLab provider implementation
- Provider factory and dynamic selection
- Cross-provider field mapping
Phase 5: Conflict Resolution (Week 5-6)
- Version vector implementation
- Conflict resolution strategies
- Merge algorithms for complex fields
- Conflict notification UI
Phase 6: Production Readiness (Week 6-7)
- Health monitoring and alerting
- Admin UI for connection management
- Comprehensive test coverage
- Performance optimization
- Documentation
References
Research Sources
- Two-Way Sync Tools 2025: Best Platforms for Real-Time Data Integration - StackSync
- The Architect's Guide to Data Integration Patterns - Medium
- System Design Pattern: Conflict Resolution in Distributed Systems - Medium
- Eventual Consistency in Distributed Systems - GeeksforGeeks
- Bidirectional Synchronization: What It Is and Examples - Workato
- Data Integration Patterns: Bi-Directional Sync - MuleSoft
API Documentation
Related Syndarix Spikes
- SPIKE-001: MCP Integration Pattern - MCP architecture and FastMCP usage
- SPIKE-003: Real-time Updates - SSE for event streaming
- SPIKE-004: Celery Redis Integration - Background job infrastructure
Decision
Adopt a webhook-first, polling-fallback synchronization architecture with:
- Last-Writer-Wins (LWW) conflict resolution using version vectors
- External tracker as source of truth with local mirrors
- Unified provider interface abstracting Gitea, GitHub, GitLab differences
- Outbox pattern for reliable outbound sync
- Redis Streams for webhook event queuing and deduplication
- Celery Beat for periodic reconciliation
This approach balances real-time responsiveness with eventual consistency, providing a robust foundation for bidirectional issue synchronization.
Spike completed. Findings will inform ADR-009: Issue Synchronization Architecture.