Files
syndarix/docs/spikes/SPIKE-009-issue-synchronization.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

1495 lines
55 KiB
Markdown

# SPIKE-009: Issue Synchronization with External Trackers
**Status:** Completed
**Date:** 2025-12-29
**Author:** Architecture Team
**Related Issue:** #9
---
## Executive Summary
This spike researches bi-directional issue synchronization between Syndarix and external issue trackers (Gitea, GitHub, GitLab). After analyzing sync patterns, conflict resolution strategies, and API capabilities of each platform, we recommend:
**Primary Recommendation:** Implement a **webhook-first, polling-fallback** architecture with **Last-Writer-Wins (LWW)** conflict resolution using vector clocks for causality tracking. External trackers serve as the **source of truth** with Syndarix maintaining local mirrors for unified agent access.
**Key Decisions:**
1. Use webhooks for real-time sync; polling for reconciliation and initial import
2. Implement version vectors for conflict detection with LWW resolution
3. Store sync metadata in dedicated `issue_sync_log` table for audit and recovery
4. Abstract provider differences behind a unified `IssueProvider` interface
5. Use Redis for webhook event queuing and deduplication
---
## Table of Contents
1. [Research Questions & Answers](#research-questions--answers)
2. [Sync Architecture](#sync-architecture)
3. [Conflict Resolution Strategy](#conflict-resolution-strategy)
4. [Webhook Handling Design](#webhook-handling-design)
5. [Provider API Comparison](#provider-api-comparison)
6. [Database Schema](#database-schema)
7. [Field Mapping Specification](#field-mapping-specification)
8. [Code Examples](#code-examples)
9. [Error Handling & Recovery](#error-handling--recovery)
10. [Implementation Roadmap](#implementation-roadmap)
11. [References](#references)
---
## Research Questions & Answers
### 1. Best patterns for bi-directional data sync?
**Answer:** The **Hub-and-Spoke** pattern with Syndarix as the hub is recommended. This pattern:
- Designates external trackers as authoritative sources
- Maintains local mirrors for unified access
- Synchronizes changes bidirectionally through a central sync engine
- Uses explicit ownership rules per field to prevent conflicts
Modern implementations leverage **Conflict-Free Replicated Data Types (CRDTs)** for automatic conflict resolution, but for issue tracking where human review may be desired, **version vectors with LWW** provides better control.
### 2. Handling sync conflicts (edited in both places)?
**Answer:** Implement a **tiered conflict resolution strategy**:
| Scenario | Resolution |
|----------|------------|
| Same field, different times | Last-Writer-Wins (LWW) |
| Same field, concurrent edits | Mark as conflict, notify user |
| Different fields | Merge both changes |
| Delete vs Update | Delete wins (configurable) |
Use **version vectors** to detect concurrent modifications. Each system maintains a version counter, and conflicts are identified when neither version dominates the other.
### 3. Webhook vs polling strategies?
**Answer:** **Hybrid approach** - webhooks primary, polling secondary.
| Strategy | Use Case | Latency | Resource Cost |
|----------|----------|---------|---------------|
| Webhooks | Real-time sync | <1s | Low |
| Polling | Initial import, reconciliation, fallback | Minutes | Medium |
| On-demand | User-triggered refresh | Immediate | Minimal |
**Rationale:** Webhooks provide real-time updates but may miss events during outages. Periodic polling (every 15-30 minutes) ensures eventual consistency.
### 4. Rate limiting and API quota management?
**Answer:** Implement a **token bucket with adaptive throttling**:
| Provider | Auth Rate Limit | Unauthenticated |
|----------|-----------------|-----------------|
| GitHub | 5,000/hour | 60/hour |
| GitLab | 600/minute | 10/minute |
| Gitea | Configurable (default: 50 items/response) | N/A |
**Strategies:**
- Use conditional requests (`If-None-Match`, `If-Modified-Since`) to avoid counting unchanged responses
- Implement exponential backoff on 429/403 responses
- Cache responses with ETags
- Batch operations where possible
- Monitor `X-RateLimit-Remaining` headers
### 5. Eventual consistency vs strong consistency tradeoffs?
**Answer:** **Eventual consistency** is acceptable and recommended for issue sync.
| Consistency | Pros | Cons |
|-------------|------|------|
| **Strong** | Always accurate | Higher latency, complex implementation |
| **Eventual** | Better performance, simpler | Temporary inconsistency |
**Rationale:** Issue tracking tolerates brief inconsistency windows (seconds to minutes). Users can manually refresh if needed. The simplicity and performance gains outweigh the drawbacks.
### 6. How to map different field schemas?
**Answer:** Use a **canonical field model** with provider-specific adapters.
```
External Field → Provider Adapter → Canonical Model → Local Storage
Local Field → Canonical Model → Provider Adapter → External Field
```
See [Field Mapping Specification](#field-mapping-specification) for detailed mappings.
### 7. Handling offline/disconnected scenarios?
**Answer:** Implement an **outbox pattern** with retry queue:
1. Queue all outgoing changes in local `sync_outbox` table
2. Background worker processes queue with exponential backoff
3. Mark items as `pending`, `in_progress`, `completed`, or `failed`
4. Dead letter queue for items exceeding max retries
5. Manual reconciliation UI for failed items
---
## Sync Architecture
### High-Level Architecture Diagram
```
External Issue Trackers
┌─────────────────┬─────────────────┬─────────────────┐
│ Gitea │ GitHub │ GitLab │
│ (Primary) │ │ │
└────────┬────────┴────────┬────────┴────────┬────────┘
│ │ │
┌────────┴─────────────────┴─────────────────┴────────┐
│ Webhooks │
└─────────────────────────┬───────────────────────────┘
┌─────────────────────────────────────────────────┼─────────────────────────────────────────────┐
│ Syndarix Backend │
│ ┌────────────────────┐ ┌────────────┴───────────┐ ┌────────────────────┐ │
│ │ Webhook Handler │◀──────────│ Redis Queue │──────────▶│ Polling Worker │ │
│ │ (FastAPI Route) │ │ (Event Dedup/Buffer) │ │ (Celery Beat) │ │
│ └─────────┬──────────┘ └────────────────────────┘ └─────────┬──────────┘ │
│ │ │ │
│ └──────────────────────────┬──────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ Sync Engine │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Provider Factory │ │ │
│ │ │ ┌─────────────┐ │ │ │
│ │ │ │GiteaProvider│ │ │ │
│ │ │ │GitHubProvider│ │ │ │
│ │ │ │GitLabProvider│ │ │ │
│ │ │ └─────────────┘ │ │ │
│ │ └──────────────────┘ │ │
│ │ ┌──────────────────┐ │ │
│ │ │Conflict Resolver │ │ │
│ │ └──────────────────┘ │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Field Mapper │ │ │
│ │ └──────────────────┘ │ │
│ └────────────┬────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ PostgreSQL │ │
│ │ ┌──────────────────┐ │ │
│ │ │ issues │ │ │
│ │ │ issue_sync_log │ │ │
│ │ │ sync_outbox │ │ │
│ │ │ external_links │ │ │
│ │ └──────────────────┘ │ │
│ └─────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────────────────────────────┘
```
### Component Responsibilities
| Component | Responsibility |
|-----------|----------------|
| **Webhook Handler** | Receive, validate, and queue incoming webhooks |
| **Redis Queue** | Buffer events, deduplicate, handle backpressure |
| **Polling Worker** | Periodic reconciliation, initial import, fallback |
| **Sync Engine** | Orchestrate sync operations, apply business logic |
| **Provider Factory** | Create provider-specific clients |
| **Conflict Resolver** | Detect and resolve sync conflicts |
| **Field Mapper** | Transform between canonical and provider schemas |
### Data Flow
**Inbound (External to Syndarix):**
```
1. Webhook received → Validate signature → Queue in Redis
2. Worker dequeues → Parse payload → Transform to canonical model
3. Check for conflicts → Apply resolution strategy
4. Update local database → Log sync operation
5. Notify connected clients via SSE (per SPIKE-003)
```
**Outbound (Syndarix to External):**
```
1. Local change detected → Transform to provider format
2. Queue in sync_outbox → Worker picks up
3. Call external API → Handle response
4. Update sync metadata → Mark as synced
5. Handle failures → Retry with backoff
```
---
## Conflict Resolution Strategy
### Version Vector Implementation
Each issue maintains a version vector tracking modifications across systems:
```python
# Version vector structure
{
"syndarix": 5, # Local modification count
"gitea": 3, # Gitea modification count
"github": 0, # Not synced with GitHub
"gitlab": 2 # GitLab modification count
}
```
### Conflict Detection Algorithm
```python
def detect_conflict(local_version: dict, remote_version: dict) -> str:
"""
Compare version vectors to detect conflicts.
Returns:
- "local_wins": Local version dominates
- "remote_wins": Remote version dominates
- "conflict": Concurrent modification (neither dominates)
- "equal": No changes
"""
local_dominates = all(
local_version.get(k, 0) >= v
for k, v in remote_version.items()
)
remote_dominates = all(
remote_version.get(k, 0) >= v
for k, v in local_version.items()
)
if local_version == remote_version:
return "equal"
elif local_dominates and not remote_dominates:
return "local_wins"
elif remote_dominates and not local_dominates:
return "remote_wins"
else:
return "conflict"
```
### Resolution Strategies
```python
from enum import Enum
class ConflictStrategy(str, Enum):
REMOTE_WINS = "remote_wins" # External tracker is source of truth
LOCAL_WINS = "local_wins" # Syndarix changes take precedence
LAST_WRITE_WINS = "lww" # Most recent timestamp wins
MANUAL = "manual" # Flag for human review
MERGE = "merge" # Attempt field-level merge
# Default strategy per field
FIELD_STRATEGIES = {
"title": ConflictStrategy.LAST_WRITE_WINS,
"description": ConflictStrategy.MERGE,
"status": ConflictStrategy.REMOTE_WINS,
"assignees": ConflictStrategy.MERGE,
"labels": ConflictStrategy.MERGE,
"comments": ConflictStrategy.MERGE, # Append both
"priority": ConflictStrategy.REMOTE_WINS,
}
```
### Merge Algorithm for Complex Fields
```python
def merge_labels(local: list[str], remote: list[str], base: list[str]) -> list[str]:
"""Three-way merge for labels."""
local_added = set(local) - set(base)
local_removed = set(base) - set(local)
remote_added = set(remote) - set(base)
remote_removed = set(base) - set(remote)
result = set(base)
result |= local_added | remote_added
result -= local_removed | remote_removed
return sorted(result)
```
---
## Webhook Handling Design
### Webhook Endpoint Architecture
```python
# app/api/v1/webhooks/issues.py
from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
from app.services.sync.webhook_handler import WebhookHandler
from app.core.redis import redis_client
import hashlib
import hmac
router = APIRouter()
@router.post("/webhooks/{provider}/{project_id}")
async def receive_webhook(
provider: str,
project_id: str,
request: Request,
background_tasks: BackgroundTasks
):
"""
Unified webhook endpoint for all providers.
Path: /api/v1/webhooks/{provider}/{project_id}
Providers: gitea, github, gitlab
"""
body = await request.body()
headers = dict(request.headers)
# Validate webhook signature
handler = WebhookHandler.get_handler(provider)
if not handler.verify_signature(body, headers):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse event type
event_type = handler.get_event_type(headers)
if event_type not in handler.supported_events:
return {"status": "ignored", "reason": "unsupported_event"}
# Deduplicate using event ID
event_id = handler.get_event_id(headers, body)
if await is_duplicate(event_id):
return {"status": "duplicate"}
# Queue for processing
await redis_client.xadd(
f"webhooks:{project_id}",
{
"provider": provider,
"event_type": event_type,
"payload": body,
"received_at": datetime.utcnow().isoformat()
}
)
return {"status": "queued", "event_id": event_id}
async def is_duplicate(event_id: str, ttl: int = 3600) -> bool:
"""Check if event was already processed (Redis-based dedup)."""
key = f"webhook:processed:{event_id}"
result = await redis_client.set(key, "1", ex=ttl, nx=True)
return result is None # None means key existed
```
### Provider-Specific Signature Validation
```python
# app/services/sync/webhook_validators.py
class GiteaWebhookValidator:
@staticmethod
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
"""Gitea uses X-Gitea-Signature with HMAC-SHA256."""
signature = headers.get("x-gitea-signature", "")
expected = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@staticmethod
def get_event_type(headers: dict) -> str:
return headers.get("x-gitea-event", "")
class GitHubWebhookValidator:
@staticmethod
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
"""GitHub uses X-Hub-Signature-256 with sha256=HMAC."""
signature = headers.get("x-hub-signature-256", "")
if not signature.startswith("sha256="):
return False
expected = "sha256=" + hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@staticmethod
def get_event_type(headers: dict) -> str:
return headers.get("x-github-event", "")
class GitLabWebhookValidator:
@staticmethod
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
"""GitLab uses X-Gitlab-Token for simple token matching."""
token = headers.get("x-gitlab-token", "")
return hmac.compare_digest(token, secret)
@staticmethod
def get_event_type(headers: dict) -> str:
return headers.get("x-gitlab-event", "")
```
### Webhook Event Processing (Celery Worker)
```python
# app/workers/sync_worker.py
from celery import Celery
from app.services.sync.sync_engine import SyncEngine
from app.core.redis import redis_client
celery_app = Celery("syndarix")
@celery_app.task(bind=True, max_retries=3)
def process_webhook_event(self, project_id: str):
"""Process queued webhook events for a project."""
try:
# Read from Redis stream
events = redis_client.xread(
{f"webhooks:{project_id}": "0"},
count=10,
block=5000
)
sync_engine = SyncEngine()
for stream_name, messages in events:
for message_id, data in messages:
try:
sync_engine.process_inbound_event(
provider=data["provider"],
event_type=data["event_type"],
payload=data["payload"]
)
# Acknowledge processed
redis_client.xdel(stream_name, message_id)
except Exception as e:
log.error(f"Failed to process {message_id}: {e}")
# Will retry on next run
except Exception as exc:
self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
```
---
## Provider API Comparison
### Issue Field Support Matrix
| Field | Gitea | GitHub | GitLab | Syndarix Canonical |
|-------|-------|--------|--------|-------------------|
| ID | `id` (int) | `id` (int) | `id` (int) | `external_id` (str) |
| Number | `number` | `number` | `iid` | `external_number` |
| Title | `title` | `title` | `title` | `title` |
| Body | `body` | `body` | `description` | `description` |
| State | `state` (open/closed) | `state` (open/closed) | `state` (opened/closed) | `status` (enum) |
| Assignees | `assignees[]` | `assignees[]` | `assignees[]` | `assignee_ids[]` |
| Labels | `labels[].name` | `labels[].name` | `labels[]` (strings) | `labels[]` |
| Milestone | `milestone.title` | `milestone.title` | `milestone.title` | `milestone` |
| Created | `created_at` | `created_at` | `created_at` | `created_at` |
| Updated | `updated_at` | `updated_at` | `updated_at` | `updated_at` |
| Due Date | `due_date` | N/A | `due_date` | `due_date` |
| Priority | N/A (via labels) | N/A (via labels) | N/A (via labels) | `priority` |
| URL | `html_url` | `html_url` | `web_url` | `remote_url` |
### Webhook Event Mapping
| Action | Gitea Event | GitHub Event | GitLab Event |
|--------|-------------|--------------|--------------|
| Create | `issues:opened` | `issues:opened` | `Issue Hook:open` |
| Update | `issues:edited` | `issues:edited` | `Issue Hook:update` |
| Close | `issues:closed` | `issues:closed` | `Issue Hook:close` |
| Reopen | `issues:reopened` | `issues:reopened` | `Issue Hook:reopen` |
| Assign | `issues:assigned` | `issues:assigned` | `Issue Hook:update` |
| Label | `issues:label_updated` | `issues:labeled` | `Issue Hook:update` |
| Comment | `issue_comment:created` | `issue_comment:created` | `Note Hook` |
### Rate Limits Comparison
| Provider | Authenticated | Pagination | Conditional Requests |
|----------|---------------|------------|---------------------|
| GitHub | 5,000/hour | 100/page max | ETag, If-Modified-Since |
| GitLab | 600/minute | 100/page max | ETag |
| Gitea | Configurable | 50/page default | Link header |
---
## Database Schema
### Core Tables
```sql
-- Extended issues table with sync metadata
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_id VARCHAR(255);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_number INTEGER;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS remote_url TEXT;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider VARCHAR(50);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider_repo_id VARCHAR(255);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS sync_status VARCHAR(20) DEFAULT 'pending';
ALTER TABLE issues ADD COLUMN IF NOT EXISTS version_vector JSONB DEFAULT '{}';
ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_updated_at TIMESTAMP WITH TIME ZONE;
-- Sync status enum values: synced, pending, conflict, error
CREATE INDEX idx_issues_external_id ON issues(provider, external_id);
CREATE INDEX idx_issues_sync_status ON issues(sync_status);
```
```sql
-- Issue sync log for audit trail
CREATE TABLE issue_sync_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
-- Sync details
direction VARCHAR(10) NOT NULL, -- 'inbound' or 'outbound'
provider VARCHAR(50) NOT NULL,
event_type VARCHAR(100) NOT NULL,
-- Change tracking
previous_state JSONB,
new_state JSONB,
diff JSONB,
-- Conflict info
had_conflict BOOLEAN DEFAULT FALSE,
conflict_resolution VARCHAR(50),
conflict_details JSONB,
-- Status
status VARCHAR(20) NOT NULL, -- success, failed, skipped
error_message TEXT,
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
-- Webhook metadata
webhook_event_id VARCHAR(255),
webhook_delivery_id VARCHAR(255)
);
CREATE INDEX idx_sync_log_issue ON issue_sync_log(issue_id);
CREATE INDEX idx_sync_log_project ON issue_sync_log(project_id);
CREATE INDEX idx_sync_log_status ON issue_sync_log(status);
CREATE INDEX idx_sync_log_created ON issue_sync_log(created_at);
```
```sql
-- Outbox for pending outbound syncs
CREATE TABLE sync_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
-- Sync target
provider VARCHAR(50) NOT NULL,
operation VARCHAR(20) NOT NULL, -- create, update, delete
payload JSONB NOT NULL,
-- Processing status
status VARCHAR(20) DEFAULT 'pending', -- pending, in_progress, completed, failed, dead_letter
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 5,
last_error TEXT,
-- Scheduling
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
scheduled_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
-- Idempotency
idempotency_key VARCHAR(255) UNIQUE
);
CREATE INDEX idx_outbox_status ON sync_outbox(status, scheduled_at);
CREATE INDEX idx_outbox_issue ON sync_outbox(issue_id);
```
```sql
-- External provider connections
CREATE TABLE external_connections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
organization_id UUID REFERENCES organizations(id) ON DELETE CASCADE,
-- Provider details
provider VARCHAR(50) NOT NULL,
provider_url TEXT NOT NULL, -- Base URL (e.g., https://gitea.example.com)
repo_owner VARCHAR(255) NOT NULL,
repo_name VARCHAR(255) NOT NULL,
-- Authentication
auth_type VARCHAR(20) NOT NULL, -- token, oauth, app
credentials_encrypted TEXT, -- Encrypted token/credentials
-- Webhook configuration
webhook_secret_encrypted TEXT,
webhook_id VARCHAR(255), -- ID from provider
webhook_active BOOLEAN DEFAULT TRUE,
-- Sync settings
sync_enabled BOOLEAN DEFAULT TRUE,
sync_direction VARCHAR(20) DEFAULT 'bidirectional', -- inbound, outbound, bidirectional
sync_labels_filter JSONB, -- Only sync issues with these labels
sync_milestones_filter JSONB,
-- Status tracking
last_sync_at TIMESTAMP WITH TIME ZONE,
last_error TEXT,
status VARCHAR(20) DEFAULT 'active', -- active, paused, error
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(project_id, provider, repo_owner, repo_name)
);
CREATE INDEX idx_connections_project ON external_connections(project_id);
CREATE INDEX idx_connections_status ON external_connections(status);
```
### SQLAlchemy Models
```python
# app/models/issue_sync.py
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, JSON
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
import enum
class SyncStatus(str, enum.Enum):
SYNCED = "synced"
PENDING = "pending"
CONFLICT = "conflict"
ERROR = "error"
class SyncDirection(str, enum.Enum):
INBOUND = "inbound"
OUTBOUND = "outbound"
BIDIRECTIONAL = "bidirectional"
class Provider(str, enum.Enum):
GITEA = "gitea"
GITHUB = "github"
GITLAB = "gitlab"
class ExternalConnection(Base, UUIDMixin, TimestampMixin):
__tablename__ = "external_connections"
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"))
provider = Column(String(50), nullable=False)
provider_url = Column(String, nullable=False)
repo_owner = Column(String(255), nullable=False)
repo_name = Column(String(255), nullable=False)
auth_type = Column(String(20), nullable=False)
credentials_encrypted = Column(String)
webhook_secret_encrypted = Column(String)
webhook_id = Column(String(255))
webhook_active = Column(Boolean, default=True)
sync_enabled = Column(Boolean, default=True)
sync_direction = Column(String(20), default="bidirectional")
sync_labels_filter = Column(JSONB)
sync_milestones_filter = Column(JSONB)
last_sync_at = Column(DateTime(timezone=True))
last_error = Column(String)
status = Column(String(20), default="active")
class IssueSyncLog(Base, UUIDMixin):
__tablename__ = "issue_sync_log"
issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))
direction = Column(String(10), nullable=False)
provider = Column(String(50), nullable=False)
event_type = Column(String(100), nullable=False)
previous_state = Column(JSONB)
new_state = Column(JSONB)
diff = Column(JSONB)
had_conflict = Column(Boolean, default=False)
conflict_resolution = Column(String(50))
conflict_details = Column(JSONB)
status = Column(String(20), nullable=False)
error_message = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
processed_at = Column(DateTime(timezone=True))
webhook_event_id = Column(String(255))
webhook_delivery_id = Column(String(255))
class SyncOutbox(Base, UUIDMixin):
__tablename__ = "sync_outbox"
issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))
provider = Column(String(50), nullable=False)
operation = Column(String(20), nullable=False)
payload = Column(JSONB, nullable=False)
status = Column(String(20), default="pending")
attempts = Column(Integer, default=0)
max_attempts = Column(Integer, default=5)
last_error = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
scheduled_at = Column(DateTime(timezone=True), server_default=func.now())
processed_at = Column(DateTime(timezone=True))
idempotency_key = Column(String(255), unique=True)
```
---
## Field Mapping Specification
### Canonical Issue Model
```python
# app/schemas/sync/canonical.py
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
from enum import Enum
class IssueStatus(str, Enum):
OPEN = "open"
CLOSED = "closed"
IN_PROGRESS = "in_progress" # Syndarix-only
class CanonicalIssue(BaseModel):
"""Canonical issue representation for sync operations."""
# Identity
external_id: str
external_number: int
remote_url: str
# Core fields
title: str
description: Optional[str] = None
status: IssueStatus
# Relationships
assignee_ids: list[str] = Field(default_factory=list)
assignee_usernames: list[str] = Field(default_factory=list)
labels: list[str] = Field(default_factory=list)
milestone: Optional[str] = None
# Metadata
created_at: datetime
updated_at: datetime
closed_at: Optional[datetime] = None
due_date: Optional[datetime] = None
# Provider info
provider: str
raw_data: dict = Field(default_factory=dict) # Original payload
```
### Provider Adapters
```python
# app/services/sync/adapters/gitea.py
from app.schemas.sync.canonical import CanonicalIssue, IssueStatus
class GiteaAdapter:
"""Transform between Gitea API format and canonical model."""
@staticmethod
def to_canonical(gitea_issue: dict, base_url: str) -> CanonicalIssue:
return CanonicalIssue(
external_id=str(gitea_issue["id"]),
external_number=gitea_issue["number"],
remote_url=gitea_issue["html_url"],
title=gitea_issue["title"],
description=gitea_issue.get("body"),
status=IssueStatus.OPEN if gitea_issue["state"] == "open" else IssueStatus.CLOSED,
assignee_ids=[str(a["id"]) for a in gitea_issue.get("assignees", [])],
assignee_usernames=[a["login"] for a in gitea_issue.get("assignees", [])],
labels=[label["name"] for label in gitea_issue.get("labels", [])],
milestone=gitea_issue.get("milestone", {}).get("title"),
created_at=gitea_issue["created_at"],
updated_at=gitea_issue["updated_at"],
closed_at=gitea_issue.get("closed_at"),
due_date=gitea_issue.get("due_date"),
provider="gitea",
raw_data=gitea_issue
)
@staticmethod
def from_canonical(issue: CanonicalIssue) -> dict:
"""Convert canonical to Gitea API format for updates."""
return {
"title": issue.title,
"body": issue.description,
"state": "open" if issue.status == IssueStatus.OPEN else "closed",
"assignees": issue.assignee_usernames,
"labels": issue.labels,
"milestone": issue.milestone,
"due_date": issue.due_date.isoformat() if issue.due_date else None,
}
# app/services/sync/adapters/github.py
class GitHubAdapter:
"""Transform between GitHub API format and canonical model."""
@staticmethod
def to_canonical(github_issue: dict, base_url: str) -> CanonicalIssue:
return CanonicalIssue(
external_id=str(github_issue["id"]),
external_number=github_issue["number"],
remote_url=github_issue["html_url"],
title=github_issue["title"],
description=github_issue.get("body"),
status=IssueStatus.OPEN if github_issue["state"] == "open" else IssueStatus.CLOSED,
assignee_ids=[str(a["id"]) for a in github_issue.get("assignees", [])],
assignee_usernames=[a["login"] for a in github_issue.get("assignees", [])],
labels=[label["name"] for label in github_issue.get("labels", [])],
milestone=github_issue.get("milestone", {}).get("title") if github_issue.get("milestone") else None,
created_at=github_issue["created_at"],
updated_at=github_issue["updated_at"],
closed_at=github_issue.get("closed_at"),
due_date=None, # GitHub doesn't have native due dates
provider="github",
raw_data=github_issue
)
@staticmethod
def from_canonical(issue: CanonicalIssue) -> dict:
return {
"title": issue.title,
"body": issue.description,
"state": "open" if issue.status == IssueStatus.OPEN else "closed",
"assignees": issue.assignee_usernames,
"labels": issue.labels,
"milestone": issue.milestone,
}
# app/services/sync/adapters/gitlab.py
class GitLabAdapter:
"""Transform between GitLab API format and canonical model."""
@staticmethod
def to_canonical(gitlab_issue: dict, base_url: str) -> CanonicalIssue:
# GitLab uses 'opened' instead of 'open'
state = gitlab_issue["state"]
status = IssueStatus.OPEN if state == "opened" else IssueStatus.CLOSED
return CanonicalIssue(
external_id=str(gitlab_issue["id"]),
external_number=gitlab_issue["iid"], # GitLab uses iid for project-scoped number
remote_url=gitlab_issue["web_url"],
title=gitlab_issue["title"],
description=gitlab_issue.get("description"),
status=status,
assignee_ids=[str(a["id"]) for a in gitlab_issue.get("assignees", [])],
assignee_usernames=[a["username"] for a in gitlab_issue.get("assignees", [])],
labels=gitlab_issue.get("labels", []), # GitLab returns labels as strings
milestone=gitlab_issue.get("milestone", {}).get("title") if gitlab_issue.get("milestone") else None,
created_at=gitlab_issue["created_at"],
updated_at=gitlab_issue["updated_at"],
closed_at=gitlab_issue.get("closed_at"),
due_date=gitlab_issue.get("due_date"),
provider="gitlab",
raw_data=gitlab_issue
)
@staticmethod
def from_canonical(issue: CanonicalIssue) -> dict:
return {
"title": issue.title,
"description": issue.description,
"state_event": "reopen" if issue.status == IssueStatus.OPEN else "close",
"assignee_ids": issue.assignee_ids,
"labels": ",".join(issue.labels),
"milestone_id": issue.milestone, # Requires ID lookup
"due_date": issue.due_date.isoformat() if issue.due_date else None,
}
```
---
## Code Examples
### Provider Interface
```python
# app/services/sync/providers/base.py
from abc import ABC, abstractmethod
from typing import AsyncIterator
from app.schemas.sync.canonical import CanonicalIssue
class IssueProvider(ABC):
"""Abstract base class for issue tracker providers."""
def __init__(self, connection: ExternalConnection):
self.connection = connection
self.base_url = connection.provider_url
self.repo_owner = connection.repo_owner
self.repo_name = connection.repo_name
@abstractmethod
async def get_issue(self, issue_number: int) -> CanonicalIssue:
"""Fetch a single issue by number."""
pass
@abstractmethod
async def list_issues(
self,
state: str = "all",
since: datetime = None,
labels: list[str] = None
) -> AsyncIterator[CanonicalIssue]:
"""List issues with optional filters."""
pass
@abstractmethod
async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
"""Create a new issue."""
pass
@abstractmethod
async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
"""Update an existing issue."""
pass
@abstractmethod
async def add_comment(self, issue_number: int, body: str) -> dict:
"""Add a comment to an issue."""
pass
@abstractmethod
async def setup_webhook(self, callback_url: str, secret: str) -> str:
"""Configure webhook for issue events. Returns webhook ID."""
pass
@abstractmethod
async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
"""Verify webhook signature."""
pass
```
### Gitea Provider Implementation
```python
# app/services/sync/providers/gitea.py
import httpx
from app.services.sync.providers.base import IssueProvider
from app.services.sync.adapters.gitea import GiteaAdapter
class GiteaProvider(IssueProvider):
"""Gitea issue tracker provider."""
def __init__(self, connection: ExternalConnection):
super().__init__(connection)
self.adapter = GiteaAdapter()
self._client = None
@property
def api_url(self) -> str:
return f"{self.base_url}/api/v1"
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
token = decrypt(self.connection.credentials_encrypted)
self._client = httpx.AsyncClient(
base_url=self.api_url,
headers={
"Authorization": f"token {token}",
"Accept": "application/json",
},
timeout=30.0
)
return self._client
async def get_issue(self, issue_number: int) -> CanonicalIssue:
client = await self._get_client()
response = await client.get(
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}"
)
response.raise_for_status()
return self.adapter.to_canonical(response.json(), self.base_url)
async def list_issues(
self,
state: str = "all",
since: datetime = None,
labels: list[str] = None
) -> AsyncIterator[CanonicalIssue]:
client = await self._get_client()
page = 1
while True:
params = {
"state": state,
"page": page,
"limit": 50, # Gitea default max
}
if since:
params["since"] = since.isoformat()
if labels:
params["labels"] = ",".join(labels)
response = await client.get(
f"/repos/{self.repo_owner}/{self.repo_name}/issues",
params=params
)
response.raise_for_status()
issues = response.json()
if not issues:
break
for issue in issues:
yield self.adapter.to_canonical(issue, self.base_url)
# Check pagination
link_header = response.headers.get("link", "")
if 'rel="next"' not in link_header:
break
page += 1
async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
client = await self._get_client()
payload = self.adapter.from_canonical(issue)
response = await client.post(
f"/repos/{self.repo_owner}/{self.repo_name}/issues",
json=payload
)
response.raise_for_status()
return self.adapter.to_canonical(response.json(), self.base_url)
async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
client = await self._get_client()
payload = self.adapter.from_canonical(issue)
response = await client.patch(
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}",
json=payload
)
response.raise_for_status()
return self.adapter.to_canonical(response.json(), self.base_url)
async def add_comment(self, issue_number: int, body: str) -> dict:
client = await self._get_client()
response = await client.post(
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}/comments",
json={"body": body}
)
response.raise_for_status()
return response.json()
async def setup_webhook(self, callback_url: str, secret: str) -> str:
client = await self._get_client()
response = await client.post(
f"/repos/{self.repo_owner}/{self.repo_name}/hooks",
json={
"type": "gitea",
"active": True,
"events": ["issues", "issue_comment"],
"config": {
"url": callback_url,
"content_type": "json",
"secret": secret,
}
}
)
response.raise_for_status()
return str(response.json()["id"])
async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
secret = decrypt(self.connection.webhook_secret_encrypted)
return GiteaWebhookValidator.verify_signature(payload, headers, secret)
```
### Sync Engine
```python
# app/services/sync/sync_engine.py
from app.services.sync.providers.factory import ProviderFactory
from app.services.sync.conflict_resolver import ConflictResolver
from app.models.issue_sync import IssueSyncLog, SyncOutbox
class SyncEngine:
"""Orchestrates synchronization between Syndarix and external trackers."""
def __init__(self, db: AsyncSession):
self.db = db
self.conflict_resolver = ConflictResolver()
async def sync_inbound(
self,
connection: ExternalConnection,
external_issue: CanonicalIssue
) -> Issue:
"""Sync an issue from external tracker to Syndarix."""
# Find existing local issue
local_issue = await self._find_by_external_id(
connection.project_id,
external_issue.provider,
external_issue.external_id
)
if local_issue is None:
# Create new local issue
local_issue = await self._create_local_issue(
connection.project_id,
external_issue
)
await self._log_sync(
local_issue,
direction="inbound",
event_type="created",
status="success"
)
else:
# Check for conflicts
conflict_result = self.conflict_resolver.check(
local_issue.version_vector,
external_issue.raw_data.get("_version_vector", {}),
local_issue.updated_at,
external_issue.updated_at
)
if conflict_result.has_conflict:
# Apply resolution strategy
resolved = self.conflict_resolver.resolve(
local_issue,
external_issue,
conflict_result
)
await self._update_local_issue(local_issue, resolved)
await self._log_sync(
local_issue,
direction="inbound",
event_type="conflict_resolved",
status="success",
conflict_details=conflict_result.to_dict()
)
else:
# Normal update
await self._update_local_issue(local_issue, external_issue)
await self._log_sync(
local_issue,
direction="inbound",
event_type="updated",
status="success"
)
return local_issue
async def sync_outbound(
self,
connection: ExternalConnection,
local_issue: Issue
):
"""Queue a local issue for sync to external tracker."""
provider = ProviderFactory.get_provider(connection)
canonical = self._to_canonical(local_issue)
outbox_entry = SyncOutbox(
issue_id=local_issue.id,
project_id=connection.project_id,
provider=connection.provider,
operation="update" if local_issue.external_id else "create",
payload=canonical.dict(),
idempotency_key=f"{local_issue.id}:{local_issue.updated_at.isoformat()}"
)
self.db.add(outbox_entry)
await self.db.commit()
async def initial_import(
self,
connection: ExternalConnection,
since: datetime = None,
labels: list[str] = None
) -> int:
"""Import all issues from external tracker."""
provider = ProviderFactory.get_provider(connection)
imported = 0
async for external_issue in provider.list_issues(
state="all",
since=since,
labels=labels
):
await self.sync_inbound(connection, external_issue)
imported += 1
connection.last_sync_at = datetime.utcnow()
await self.db.commit()
return imported
async def reconcile(self, connection: ExternalConnection):
"""
Periodic reconciliation to catch missed webhooks.
Runs via Celery Beat every 15-30 minutes.
"""
since = connection.last_sync_at or (datetime.utcnow() - timedelta(hours=24))
provider = ProviderFactory.get_provider(connection)
async for external_issue in provider.list_issues(
state="all",
since=since
):
local_issue = await self._find_by_external_id(
connection.project_id,
external_issue.provider,
external_issue.external_id
)
if local_issue:
# Check if external is newer
if external_issue.updated_at > local_issue.external_updated_at:
await self.sync_inbound(connection, external_issue)
else:
await self.sync_inbound(connection, external_issue)
connection.last_sync_at = datetime.utcnow()
await self.db.commit()
```
---
## Error Handling & Recovery
### Error Categories and Handling
| Error Type | Handling Strategy | Retry | Alert |
|------------|-------------------|-------|-------|
| Network timeout | Exponential backoff | Yes (3x) | After 3 failures |
| Rate limit (429) | Wait for reset | Yes | No |
| Auth error (401/403) | Mark connection as error | No | Yes |
| Not found (404) | Mark issue as deleted | No | No |
| Conflict (409) | Apply resolution strategy | No | If unresolved |
| Server error (5xx) | Exponential backoff | Yes (5x) | After 5 failures |
| Validation error | Log and skip | No | Yes |
### Retry Strategy
```python
# app/services/sync/retry.py
import asyncio
from functools import wraps
def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
retryable_exceptions: tuple = (httpx.TimeoutException, httpx.NetworkError)
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
```
### Dead Letter Queue Handling
```python
# app/workers/dead_letter_worker.py
@celery_app.task
def process_dead_letter_queue():
"""Process failed sync items for manual review or retry."""
dead_items = db.query(SyncOutbox).filter(
SyncOutbox.status == "dead_letter",
SyncOutbox.created_at > datetime.utcnow() - timedelta(days=7)
).all()
for item in dead_items:
# Create notification for admin review
notify_admin(
f"Sync failed for issue {item.issue_id}",
details={
"provider": item.provider,
"operation": item.operation,
"attempts": item.attempts,
"last_error": item.last_error,
}
)
# Optionally attempt one more retry with manual intervention
if should_retry(item):
item.status = "pending"
item.attempts = 0
item.scheduled_at = datetime.utcnow()
```
### Health Monitoring
```python
# app/services/sync/health.py
async def check_sync_health(project_id: str) -> dict:
"""Check sync health for a project."""
connections = await get_connections(project_id)
health = {
"status": "healthy",
"connections": [],
"pending_syncs": 0,
"failed_syncs_24h": 0,
"conflicts_24h": 0,
}
for conn in connections:
conn_health = {
"provider": conn.provider,
"status": conn.status,
"last_sync": conn.last_sync_at,
"webhook_active": conn.webhook_active,
}
# Check if sync is stale
if conn.last_sync_at:
stale_threshold = datetime.utcnow() - timedelta(hours=1)
if conn.last_sync_at < stale_threshold:
conn_health["status"] = "stale"
health["status"] = "degraded"
health["connections"].append(conn_health)
# Count pending and failed
health["pending_syncs"] = await count_pending_syncs(project_id)
health["failed_syncs_24h"] = await count_failed_syncs(project_id, hours=24)
health["conflicts_24h"] = await count_conflicts(project_id, hours=24)
if health["failed_syncs_24h"] > 10:
health["status"] = "unhealthy"
return health
```
---
## Implementation Roadmap
### Phase 1: Foundation (Week 1-2)
- [ ] Database schema and migrations
- [ ] Core models (ExternalConnection, IssueSyncLog, SyncOutbox)
- [ ] Provider interface and Gitea implementation
- [ ] Canonical issue model and field mapping
### Phase 2: Inbound Sync (Week 2-3)
- [ ] Webhook endpoint and signature validation
- [ ] Redis queue for webhook events
- [ ] Celery worker for event processing
- [ ] Initial import functionality
- [ ] Basic conflict detection
### Phase 3: Outbound Sync (Week 3-4)
- [ ] Outbox pattern implementation
- [ ] Outbound sync worker
- [ ] Retry and dead letter queue
- [ ] Bidirectional sync testing
### Phase 4: GitHub & GitLab (Week 4-5)
- [ ] GitHub provider implementation
- [ ] GitLab provider implementation
- [ ] Provider factory and dynamic selection
- [ ] Cross-provider field mapping
### Phase 5: Conflict Resolution (Week 5-6)
- [ ] Version vector implementation
- [ ] Conflict resolution strategies
- [ ] Merge algorithms for complex fields
- [ ] Conflict notification UI
### Phase 6: Production Readiness (Week 6-7)
- [ ] Health monitoring and alerting
- [ ] Admin UI for connection management
- [ ] Comprehensive test coverage
- [ ] Performance optimization
- [ ] Documentation
---
## References
### Research Sources
- [Two-Way Sync Tools 2025: Best Platforms for Real-Time Data Integration](https://www.stacksync.com/blog/2025-best-two-way-sync-tools-a-comprehensive-guide-for-data-integration) - StackSync
- [The Architect's Guide to Data Integration Patterns](https://medium.com/@prayagvakharia/the-architects-guide-to-data-integration-patterns-migration-broadcast-bi-directional-a4c92b5f908d) - Medium
- [System Design Pattern: Conflict Resolution in Distributed Systems](https://medium.com/@priyasrivastava18official/system-design-pattern-from-chaos-to-consistency-the-art-of-conflict-resolution-in-distributed-9d631028bdb4) - Medium
- [Eventual Consistency in Distributed Systems](https://www.geeksforgeeks.org/system-design/eventual-consistency-in-distributive-systems-learn-system-design/) - GeeksforGeeks
- [Bidirectional Synchronization: What It Is and Examples](https://www.workato.com/the-connector/bidirectional-synchronization/) - Workato
- [Data Integration Patterns: Bi-Directional Sync](https://blogs.mulesoft.com/api-integration/patterns/data-integration-patterns-bi-directional-sync/) - MuleSoft
### API Documentation
- [GitHub REST API - Issues](https://docs.github.com/en/rest/issues/issues)
- [GitHub Webhook Events](https://docs.github.com/en/webhooks/webhook-events-and-payloads)
- [GitLab Issues API](https://docs.gitlab.com/api/issues.html)
- [GitLab Webhooks](https://docs.gitlab.com/ee/user/project/integrations/webhooks.html)
- [Gitea API Usage](https://docs.gitea.com/development/api-usage)
### Related Syndarix Spikes
- [SPIKE-001: MCP Integration Pattern](./SPIKE-001-mcp-integration-pattern.md) - MCP architecture and FastMCP usage
- [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md) - SSE for event streaming
- [SPIKE-004: Celery Redis Integration](./SPIKE-004-celery-redis-integration.md) - Background job infrastructure
---
## Decision
**Adopt a webhook-first, polling-fallback synchronization architecture** with:
1. **Last-Writer-Wins (LWW)** conflict resolution using version vectors
2. **External tracker as source of truth** with local mirrors
3. **Unified provider interface** abstracting Gitea, GitHub, GitLab differences
4. **Outbox pattern** for reliable outbound sync
5. **Redis Streams** for webhook event queuing and deduplication
6. **Celery Beat** for periodic reconciliation
This approach balances real-time responsiveness with eventual consistency, providing a robust foundation for bidirectional issue synchronization.
---
*Spike completed. Findings will inform ADR-009: Issue Synchronization Architecture.*