# SPIKE-009: Issue Synchronization with External Trackers **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #9 --- ## Executive Summary This spike researches bi-directional issue synchronization between Syndarix and external issue trackers (Gitea, GitHub, GitLab). After analyzing sync patterns, conflict resolution strategies, and API capabilities of each platform, we recommend: **Primary Recommendation:** Implement a **webhook-first, polling-fallback** architecture with **Last-Writer-Wins (LWW)** conflict resolution using vector clocks for causality tracking. External trackers serve as the **source of truth** with Syndarix maintaining local mirrors for unified agent access. **Key Decisions:** 1. Use webhooks for real-time sync; polling for reconciliation and initial import 2. Implement version vectors for conflict detection with LWW resolution 3. Store sync metadata in dedicated `issue_sync_log` table for audit and recovery 4. Abstract provider differences behind a unified `IssueProvider` interface 5. Use Redis for webhook event queuing and deduplication --- ## Table of Contents 1. [Research Questions & Answers](#research-questions--answers) 2. [Sync Architecture](#sync-architecture) 3. [Conflict Resolution Strategy](#conflict-resolution-strategy) 4. [Webhook Handling Design](#webhook-handling-design) 5. [Provider API Comparison](#provider-api-comparison) 6. [Database Schema](#database-schema) 7. [Field Mapping Specification](#field-mapping-specification) 8. [Code Examples](#code-examples) 9. [Error Handling & Recovery](#error-handling--recovery) 10. [Implementation Roadmap](#implementation-roadmap) 11. [References](#references) --- ## Research Questions & Answers ### 1. Best patterns for bi-directional data sync? **Answer:** The **Hub-and-Spoke** pattern with Syndarix as the hub is recommended. This pattern: - Designates external trackers as authoritative sources - Maintains local mirrors for unified access - Synchronizes changes bidirectionally through a central sync engine - Uses explicit ownership rules per field to prevent conflicts Modern implementations leverage **Conflict-Free Replicated Data Types (CRDTs)** for automatic conflict resolution, but for issue tracking where human review may be desired, **version vectors with LWW** provides better control. ### 2. Handling sync conflicts (edited in both places)? **Answer:** Implement a **tiered conflict resolution strategy**: | Scenario | Resolution | |----------|------------| | Same field, different times | Last-Writer-Wins (LWW) | | Same field, concurrent edits | Mark as conflict, notify user | | Different fields | Merge both changes | | Delete vs Update | Delete wins (configurable) | Use **version vectors** to detect concurrent modifications. Each system maintains a version counter, and conflicts are identified when neither version dominates the other. ### 3. Webhook vs polling strategies? **Answer:** **Hybrid approach** - webhooks primary, polling secondary. | Strategy | Use Case | Latency | Resource Cost | |----------|----------|---------|---------------| | Webhooks | Real-time sync | <1s | Low | | Polling | Initial import, reconciliation, fallback | Minutes | Medium | | On-demand | User-triggered refresh | Immediate | Minimal | **Rationale:** Webhooks provide real-time updates but may miss events during outages. Periodic polling (every 15-30 minutes) ensures eventual consistency. ### 4. Rate limiting and API quota management? **Answer:** Implement a **token bucket with adaptive throttling**: | Provider | Auth Rate Limit | Unauthenticated | |----------|-----------------|-----------------| | GitHub | 5,000/hour | 60/hour | | GitLab | 600/minute | 10/minute | | Gitea | Configurable (default: 50 items/response) | N/A | **Strategies:** - Use conditional requests (`If-None-Match`, `If-Modified-Since`) to avoid counting unchanged responses - Implement exponential backoff on 429/403 responses - Cache responses with ETags - Batch operations where possible - Monitor `X-RateLimit-Remaining` headers ### 5. Eventual consistency vs strong consistency tradeoffs? **Answer:** **Eventual consistency** is acceptable and recommended for issue sync. | Consistency | Pros | Cons | |-------------|------|------| | **Strong** | Always accurate | Higher latency, complex implementation | | **Eventual** | Better performance, simpler | Temporary inconsistency | **Rationale:** Issue tracking tolerates brief inconsistency windows (seconds to minutes). Users can manually refresh if needed. The simplicity and performance gains outweigh the drawbacks. ### 6. How to map different field schemas? **Answer:** Use a **canonical field model** with provider-specific adapters. ``` External Field → Provider Adapter → Canonical Model → Local Storage Local Field → Canonical Model → Provider Adapter → External Field ``` See [Field Mapping Specification](#field-mapping-specification) for detailed mappings. ### 7. Handling offline/disconnected scenarios? **Answer:** Implement an **outbox pattern** with retry queue: 1. Queue all outgoing changes in local `sync_outbox` table 2. Background worker processes queue with exponential backoff 3. Mark items as `pending`, `in_progress`, `completed`, or `failed` 4. Dead letter queue for items exceeding max retries 5. Manual reconciliation UI for failed items --- ## Sync Architecture ### High-Level Architecture Diagram ``` External Issue Trackers ┌─────────────────┬─────────────────┬─────────────────┐ │ Gitea │ GitHub │ GitLab │ │ (Primary) │ │ │ └────────┬────────┴────────┬────────┴────────┬────────┘ │ │ │ ┌────────┴─────────────────┴─────────────────┴────────┐ │ Webhooks │ └─────────────────────────┬───────────────────────────┘ │ ┌─────────────────────────────────────────────────┼─────────────────────────────────────────────┐ │ Syndarix Backend │ │ ┌────────────────────┐ ┌────────────┴───────────┐ ┌────────────────────┐ │ │ │ Webhook Handler │◀──────────│ Redis Queue │──────────▶│ Polling Worker │ │ │ │ (FastAPI Route) │ │ (Event Dedup/Buffer) │ │ (Celery Beat) │ │ │ └─────────┬──────────┘ └────────────────────────┘ └─────────┬──────────┘ │ │ │ │ │ │ └──────────────────────────┬──────────────────────────────────────────┘ │ │ │ │ │ ┌────────────┴────────────┐ │ │ │ Sync Engine │ │ │ │ ┌──────────────────┐ │ │ │ │ │ Provider Factory │ │ │ │ │ │ ┌─────────────┐ │ │ │ │ │ │ │GiteaProvider│ │ │ │ │ │ │ │GitHubProvider│ │ │ │ │ │ │ │GitLabProvider│ │ │ │ │ │ │ └─────────────┘ │ │ │ │ │ └──────────────────┘ │ │ │ │ ┌──────────────────┐ │ │ │ │ │Conflict Resolver │ │ │ │ │ └──────────────────┘ │ │ │ │ ┌──────────────────┐ │ │ │ │ │ Field Mapper │ │ │ │ │ └──────────────────┘ │ │ │ └────────────┬────────────┘ │ │ │ │ │ ┌────────────┴────────────┐ │ │ │ PostgreSQL │ │ │ │ ┌──────────────────┐ │ │ │ │ │ issues │ │ │ │ │ │ issue_sync_log │ │ │ │ │ │ sync_outbox │ │ │ │ │ │ external_links │ │ │ │ │ └──────────────────┘ │ │ │ └─────────────────────────┘ │ └───────────────────────────────────────────────────────────────────────────────────────────────┘ ``` ### Component Responsibilities | Component | Responsibility | |-----------|----------------| | **Webhook Handler** | Receive, validate, and queue incoming webhooks | | **Redis Queue** | Buffer events, deduplicate, handle backpressure | | **Polling Worker** | Periodic reconciliation, initial import, fallback | | **Sync Engine** | Orchestrate sync operations, apply business logic | | **Provider Factory** | Create provider-specific clients | | **Conflict Resolver** | Detect and resolve sync conflicts | | **Field Mapper** | Transform between canonical and provider schemas | ### Data Flow **Inbound (External to Syndarix):** ``` 1. Webhook received → Validate signature → Queue in Redis 2. Worker dequeues → Parse payload → Transform to canonical model 3. Check for conflicts → Apply resolution strategy 4. Update local database → Log sync operation 5. Notify connected clients via SSE (per SPIKE-003) ``` **Outbound (Syndarix to External):** ``` 1. Local change detected → Transform to provider format 2. Queue in sync_outbox → Worker picks up 3. Call external API → Handle response 4. Update sync metadata → Mark as synced 5. Handle failures → Retry with backoff ``` --- ## Conflict Resolution Strategy ### Version Vector Implementation Each issue maintains a version vector tracking modifications across systems: ```python # Version vector structure { "syndarix": 5, # Local modification count "gitea": 3, # Gitea modification count "github": 0, # Not synced with GitHub "gitlab": 2 # GitLab modification count } ``` ### Conflict Detection Algorithm ```python def detect_conflict(local_version: dict, remote_version: dict) -> str: """ Compare version vectors to detect conflicts. Returns: - "local_wins": Local version dominates - "remote_wins": Remote version dominates - "conflict": Concurrent modification (neither dominates) - "equal": No changes """ local_dominates = all( local_version.get(k, 0) >= v for k, v in remote_version.items() ) remote_dominates = all( remote_version.get(k, 0) >= v for k, v in local_version.items() ) if local_version == remote_version: return "equal" elif local_dominates and not remote_dominates: return "local_wins" elif remote_dominates and not local_dominates: return "remote_wins" else: return "conflict" ``` ### Resolution Strategies ```python from enum import Enum class ConflictStrategy(str, Enum): REMOTE_WINS = "remote_wins" # External tracker is source of truth LOCAL_WINS = "local_wins" # Syndarix changes take precedence LAST_WRITE_WINS = "lww" # Most recent timestamp wins MANUAL = "manual" # Flag for human review MERGE = "merge" # Attempt field-level merge # Default strategy per field FIELD_STRATEGIES = { "title": ConflictStrategy.LAST_WRITE_WINS, "description": ConflictStrategy.MERGE, "status": ConflictStrategy.REMOTE_WINS, "assignees": ConflictStrategy.MERGE, "labels": ConflictStrategy.MERGE, "comments": ConflictStrategy.MERGE, # Append both "priority": ConflictStrategy.REMOTE_WINS, } ``` ### Merge Algorithm for Complex Fields ```python def merge_labels(local: list[str], remote: list[str], base: list[str]) -> list[str]: """Three-way merge for labels.""" local_added = set(local) - set(base) local_removed = set(base) - set(local) remote_added = set(remote) - set(base) remote_removed = set(base) - set(remote) result = set(base) result |= local_added | remote_added result -= local_removed | remote_removed return sorted(result) ``` --- ## Webhook Handling Design ### Webhook Endpoint Architecture ```python # app/api/v1/webhooks/issues.py from fastapi import APIRouter, Request, HTTPException, BackgroundTasks from app.services.sync.webhook_handler import WebhookHandler from app.core.redis import redis_client import hashlib import hmac router = APIRouter() @router.post("/webhooks/{provider}/{project_id}") async def receive_webhook( provider: str, project_id: str, request: Request, background_tasks: BackgroundTasks ): """ Unified webhook endpoint for all providers. Path: /api/v1/webhooks/{provider}/{project_id} Providers: gitea, github, gitlab """ body = await request.body() headers = dict(request.headers) # Validate webhook signature handler = WebhookHandler.get_handler(provider) if not handler.verify_signature(body, headers): raise HTTPException(status_code=401, detail="Invalid signature") # Parse event type event_type = handler.get_event_type(headers) if event_type not in handler.supported_events: return {"status": "ignored", "reason": "unsupported_event"} # Deduplicate using event ID event_id = handler.get_event_id(headers, body) if await is_duplicate(event_id): return {"status": "duplicate"} # Queue for processing await redis_client.xadd( f"webhooks:{project_id}", { "provider": provider, "event_type": event_type, "payload": body, "received_at": datetime.utcnow().isoformat() } ) return {"status": "queued", "event_id": event_id} async def is_duplicate(event_id: str, ttl: int = 3600) -> bool: """Check if event was already processed (Redis-based dedup).""" key = f"webhook:processed:{event_id}" result = await redis_client.set(key, "1", ex=ttl, nx=True) return result is None # None means key existed ``` ### Provider-Specific Signature Validation ```python # app/services/sync/webhook_validators.py class GiteaWebhookValidator: @staticmethod def verify_signature(body: bytes, headers: dict, secret: str) -> bool: """Gitea uses X-Gitea-Signature with HMAC-SHA256.""" signature = headers.get("x-gitea-signature", "") expected = hmac.new( secret.encode(), body, hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected) @staticmethod def get_event_type(headers: dict) -> str: return headers.get("x-gitea-event", "") class GitHubWebhookValidator: @staticmethod def verify_signature(body: bytes, headers: dict, secret: str) -> bool: """GitHub uses X-Hub-Signature-256 with sha256=HMAC.""" signature = headers.get("x-hub-signature-256", "") if not signature.startswith("sha256="): return False expected = "sha256=" + hmac.new( secret.encode(), body, hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected) @staticmethod def get_event_type(headers: dict) -> str: return headers.get("x-github-event", "") class GitLabWebhookValidator: @staticmethod def verify_signature(body: bytes, headers: dict, secret: str) -> bool: """GitLab uses X-Gitlab-Token for simple token matching.""" token = headers.get("x-gitlab-token", "") return hmac.compare_digest(token, secret) @staticmethod def get_event_type(headers: dict) -> str: return headers.get("x-gitlab-event", "") ``` ### Webhook Event Processing (Celery Worker) ```python # app/workers/sync_worker.py from celery import Celery from app.services.sync.sync_engine import SyncEngine from app.core.redis import redis_client celery_app = Celery("syndarix") @celery_app.task(bind=True, max_retries=3) def process_webhook_event(self, project_id: str): """Process queued webhook events for a project.""" try: # Read from Redis stream events = redis_client.xread( {f"webhooks:{project_id}": "0"}, count=10, block=5000 ) sync_engine = SyncEngine() for stream_name, messages in events: for message_id, data in messages: try: sync_engine.process_inbound_event( provider=data["provider"], event_type=data["event_type"], payload=data["payload"] ) # Acknowledge processed redis_client.xdel(stream_name, message_id) except Exception as e: log.error(f"Failed to process {message_id}: {e}") # Will retry on next run except Exception as exc: self.retry(exc=exc, countdown=60 * (2 ** self.request.retries)) ``` --- ## Provider API Comparison ### Issue Field Support Matrix | Field | Gitea | GitHub | GitLab | Syndarix Canonical | |-------|-------|--------|--------|-------------------| | ID | `id` (int) | `id` (int) | `id` (int) | `external_id` (str) | | Number | `number` | `number` | `iid` | `external_number` | | Title | `title` | `title` | `title` | `title` | | Body | `body` | `body` | `description` | `description` | | State | `state` (open/closed) | `state` (open/closed) | `state` (opened/closed) | `status` (enum) | | Assignees | `assignees[]` | `assignees[]` | `assignees[]` | `assignee_ids[]` | | Labels | `labels[].name` | `labels[].name` | `labels[]` (strings) | `labels[]` | | Milestone | `milestone.title` | `milestone.title` | `milestone.title` | `milestone` | | Created | `created_at` | `created_at` | `created_at` | `created_at` | | Updated | `updated_at` | `updated_at` | `updated_at` | `updated_at` | | Due Date | `due_date` | N/A | `due_date` | `due_date` | | Priority | N/A (via labels) | N/A (via labels) | N/A (via labels) | `priority` | | URL | `html_url` | `html_url` | `web_url` | `remote_url` | ### Webhook Event Mapping | Action | Gitea Event | GitHub Event | GitLab Event | |--------|-------------|--------------|--------------| | Create | `issues:opened` | `issues:opened` | `Issue Hook:open` | | Update | `issues:edited` | `issues:edited` | `Issue Hook:update` | | Close | `issues:closed` | `issues:closed` | `Issue Hook:close` | | Reopen | `issues:reopened` | `issues:reopened` | `Issue Hook:reopen` | | Assign | `issues:assigned` | `issues:assigned` | `Issue Hook:update` | | Label | `issues:label_updated` | `issues:labeled` | `Issue Hook:update` | | Comment | `issue_comment:created` | `issue_comment:created` | `Note Hook` | ### Rate Limits Comparison | Provider | Authenticated | Pagination | Conditional Requests | |----------|---------------|------------|---------------------| | GitHub | 5,000/hour | 100/page max | ETag, If-Modified-Since | | GitLab | 600/minute | 100/page max | ETag | | Gitea | Configurable | 50/page default | Link header | --- ## Database Schema ### Core Tables ```sql -- Extended issues table with sync metadata ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_id VARCHAR(255); ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_number INTEGER; ALTER TABLE issues ADD COLUMN IF NOT EXISTS remote_url TEXT; ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider VARCHAR(50); ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider_repo_id VARCHAR(255); ALTER TABLE issues ADD COLUMN IF NOT EXISTS sync_status VARCHAR(20) DEFAULT 'pending'; ALTER TABLE issues ADD COLUMN IF NOT EXISTS version_vector JSONB DEFAULT '{}'; ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMP WITH TIME ZONE; ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_updated_at TIMESTAMP WITH TIME ZONE; -- Sync status enum values: synced, pending, conflict, error CREATE INDEX idx_issues_external_id ON issues(provider, external_id); CREATE INDEX idx_issues_sync_status ON issues(sync_status); ``` ```sql -- Issue sync log for audit trail CREATE TABLE issue_sync_log ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), issue_id UUID REFERENCES issues(id) ON DELETE CASCADE, project_id UUID REFERENCES projects(id) ON DELETE CASCADE, -- Sync details direction VARCHAR(10) NOT NULL, -- 'inbound' or 'outbound' provider VARCHAR(50) NOT NULL, event_type VARCHAR(100) NOT NULL, -- Change tracking previous_state JSONB, new_state JSONB, diff JSONB, -- Conflict info had_conflict BOOLEAN DEFAULT FALSE, conflict_resolution VARCHAR(50), conflict_details JSONB, -- Status status VARCHAR(20) NOT NULL, -- success, failed, skipped error_message TEXT, -- Timestamps created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), processed_at TIMESTAMP WITH TIME ZONE, -- Webhook metadata webhook_event_id VARCHAR(255), webhook_delivery_id VARCHAR(255) ); CREATE INDEX idx_sync_log_issue ON issue_sync_log(issue_id); CREATE INDEX idx_sync_log_project ON issue_sync_log(project_id); CREATE INDEX idx_sync_log_status ON issue_sync_log(status); CREATE INDEX idx_sync_log_created ON issue_sync_log(created_at); ``` ```sql -- Outbox for pending outbound syncs CREATE TABLE sync_outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), issue_id UUID REFERENCES issues(id) ON DELETE CASCADE, project_id UUID REFERENCES projects(id) ON DELETE CASCADE, -- Sync target provider VARCHAR(50) NOT NULL, operation VARCHAR(20) NOT NULL, -- create, update, delete payload JSONB NOT NULL, -- Processing status status VARCHAR(20) DEFAULT 'pending', -- pending, in_progress, completed, failed, dead_letter attempts INTEGER DEFAULT 0, max_attempts INTEGER DEFAULT 5, last_error TEXT, -- Scheduling created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), scheduled_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), processed_at TIMESTAMP WITH TIME ZONE, -- Idempotency idempotency_key VARCHAR(255) UNIQUE ); CREATE INDEX idx_outbox_status ON sync_outbox(status, scheduled_at); CREATE INDEX idx_outbox_issue ON sync_outbox(issue_id); ``` ```sql -- External provider connections CREATE TABLE external_connections ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, organization_id UUID REFERENCES organizations(id) ON DELETE CASCADE, -- Provider details provider VARCHAR(50) NOT NULL, provider_url TEXT NOT NULL, -- Base URL (e.g., https://gitea.example.com) repo_owner VARCHAR(255) NOT NULL, repo_name VARCHAR(255) NOT NULL, -- Authentication auth_type VARCHAR(20) NOT NULL, -- token, oauth, app credentials_encrypted TEXT, -- Encrypted token/credentials -- Webhook configuration webhook_secret_encrypted TEXT, webhook_id VARCHAR(255), -- ID from provider webhook_active BOOLEAN DEFAULT TRUE, -- Sync settings sync_enabled BOOLEAN DEFAULT TRUE, sync_direction VARCHAR(20) DEFAULT 'bidirectional', -- inbound, outbound, bidirectional sync_labels_filter JSONB, -- Only sync issues with these labels sync_milestones_filter JSONB, -- Status tracking last_sync_at TIMESTAMP WITH TIME ZONE, last_error TEXT, status VARCHAR(20) DEFAULT 'active', -- active, paused, error created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), UNIQUE(project_id, provider, repo_owner, repo_name) ); CREATE INDEX idx_connections_project ON external_connections(project_id); CREATE INDEX idx_connections_status ON external_connections(status); ``` ### SQLAlchemy Models ```python # app/models/issue_sync.py from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, JSON from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from app.models.base import Base, TimestampMixin, UUIDMixin import enum class SyncStatus(str, enum.Enum): SYNCED = "synced" PENDING = "pending" CONFLICT = "conflict" ERROR = "error" class SyncDirection(str, enum.Enum): INBOUND = "inbound" OUTBOUND = "outbound" BIDIRECTIONAL = "bidirectional" class Provider(str, enum.Enum): GITEA = "gitea" GITHUB = "github" GITLAB = "gitlab" class ExternalConnection(Base, UUIDMixin, TimestampMixin): __tablename__ = "external_connections" project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False) organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id")) provider = Column(String(50), nullable=False) provider_url = Column(String, nullable=False) repo_owner = Column(String(255), nullable=False) repo_name = Column(String(255), nullable=False) auth_type = Column(String(20), nullable=False) credentials_encrypted = Column(String) webhook_secret_encrypted = Column(String) webhook_id = Column(String(255)) webhook_active = Column(Boolean, default=True) sync_enabled = Column(Boolean, default=True) sync_direction = Column(String(20), default="bidirectional") sync_labels_filter = Column(JSONB) sync_milestones_filter = Column(JSONB) last_sync_at = Column(DateTime(timezone=True)) last_error = Column(String) status = Column(String(20), default="active") class IssueSyncLog(Base, UUIDMixin): __tablename__ = "issue_sync_log" issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE")) project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE")) direction = Column(String(10), nullable=False) provider = Column(String(50), nullable=False) event_type = Column(String(100), nullable=False) previous_state = Column(JSONB) new_state = Column(JSONB) diff = Column(JSONB) had_conflict = Column(Boolean, default=False) conflict_resolution = Column(String(50)) conflict_details = Column(JSONB) status = Column(String(20), nullable=False) error_message = Column(String) created_at = Column(DateTime(timezone=True), server_default=func.now()) processed_at = Column(DateTime(timezone=True)) webhook_event_id = Column(String(255)) webhook_delivery_id = Column(String(255)) class SyncOutbox(Base, UUIDMixin): __tablename__ = "sync_outbox" issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE")) project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE")) provider = Column(String(50), nullable=False) operation = Column(String(20), nullable=False) payload = Column(JSONB, nullable=False) status = Column(String(20), default="pending") attempts = Column(Integer, default=0) max_attempts = Column(Integer, default=5) last_error = Column(String) created_at = Column(DateTime(timezone=True), server_default=func.now()) scheduled_at = Column(DateTime(timezone=True), server_default=func.now()) processed_at = Column(DateTime(timezone=True)) idempotency_key = Column(String(255), unique=True) ``` --- ## Field Mapping Specification ### Canonical Issue Model ```python # app/schemas/sync/canonical.py from pydantic import BaseModel, Field from datetime import datetime from typing import Optional from enum import Enum class IssueStatus(str, Enum): OPEN = "open" CLOSED = "closed" IN_PROGRESS = "in_progress" # Syndarix-only class CanonicalIssue(BaseModel): """Canonical issue representation for sync operations.""" # Identity external_id: str external_number: int remote_url: str # Core fields title: str description: Optional[str] = None status: IssueStatus # Relationships assignee_ids: list[str] = Field(default_factory=list) assignee_usernames: list[str] = Field(default_factory=list) labels: list[str] = Field(default_factory=list) milestone: Optional[str] = None # Metadata created_at: datetime updated_at: datetime closed_at: Optional[datetime] = None due_date: Optional[datetime] = None # Provider info provider: str raw_data: dict = Field(default_factory=dict) # Original payload ``` ### Provider Adapters ```python # app/services/sync/adapters/gitea.py from app.schemas.sync.canonical import CanonicalIssue, IssueStatus class GiteaAdapter: """Transform between Gitea API format and canonical model.""" @staticmethod def to_canonical(gitea_issue: dict, base_url: str) -> CanonicalIssue: return CanonicalIssue( external_id=str(gitea_issue["id"]), external_number=gitea_issue["number"], remote_url=gitea_issue["html_url"], title=gitea_issue["title"], description=gitea_issue.get("body"), status=IssueStatus.OPEN if gitea_issue["state"] == "open" else IssueStatus.CLOSED, assignee_ids=[str(a["id"]) for a in gitea_issue.get("assignees", [])], assignee_usernames=[a["login"] for a in gitea_issue.get("assignees", [])], labels=[label["name"] for label in gitea_issue.get("labels", [])], milestone=gitea_issue.get("milestone", {}).get("title"), created_at=gitea_issue["created_at"], updated_at=gitea_issue["updated_at"], closed_at=gitea_issue.get("closed_at"), due_date=gitea_issue.get("due_date"), provider="gitea", raw_data=gitea_issue ) @staticmethod def from_canonical(issue: CanonicalIssue) -> dict: """Convert canonical to Gitea API format for updates.""" return { "title": issue.title, "body": issue.description, "state": "open" if issue.status == IssueStatus.OPEN else "closed", "assignees": issue.assignee_usernames, "labels": issue.labels, "milestone": issue.milestone, "due_date": issue.due_date.isoformat() if issue.due_date else None, } # app/services/sync/adapters/github.py class GitHubAdapter: """Transform between GitHub API format and canonical model.""" @staticmethod def to_canonical(github_issue: dict, base_url: str) -> CanonicalIssue: return CanonicalIssue( external_id=str(github_issue["id"]), external_number=github_issue["number"], remote_url=github_issue["html_url"], title=github_issue["title"], description=github_issue.get("body"), status=IssueStatus.OPEN if github_issue["state"] == "open" else IssueStatus.CLOSED, assignee_ids=[str(a["id"]) for a in github_issue.get("assignees", [])], assignee_usernames=[a["login"] for a in github_issue.get("assignees", [])], labels=[label["name"] for label in github_issue.get("labels", [])], milestone=github_issue.get("milestone", {}).get("title") if github_issue.get("milestone") else None, created_at=github_issue["created_at"], updated_at=github_issue["updated_at"], closed_at=github_issue.get("closed_at"), due_date=None, # GitHub doesn't have native due dates provider="github", raw_data=github_issue ) @staticmethod def from_canonical(issue: CanonicalIssue) -> dict: return { "title": issue.title, "body": issue.description, "state": "open" if issue.status == IssueStatus.OPEN else "closed", "assignees": issue.assignee_usernames, "labels": issue.labels, "milestone": issue.milestone, } # app/services/sync/adapters/gitlab.py class GitLabAdapter: """Transform between GitLab API format and canonical model.""" @staticmethod def to_canonical(gitlab_issue: dict, base_url: str) -> CanonicalIssue: # GitLab uses 'opened' instead of 'open' state = gitlab_issue["state"] status = IssueStatus.OPEN if state == "opened" else IssueStatus.CLOSED return CanonicalIssue( external_id=str(gitlab_issue["id"]), external_number=gitlab_issue["iid"], # GitLab uses iid for project-scoped number remote_url=gitlab_issue["web_url"], title=gitlab_issue["title"], description=gitlab_issue.get("description"), status=status, assignee_ids=[str(a["id"]) for a in gitlab_issue.get("assignees", [])], assignee_usernames=[a["username"] for a in gitlab_issue.get("assignees", [])], labels=gitlab_issue.get("labels", []), # GitLab returns labels as strings milestone=gitlab_issue.get("milestone", {}).get("title") if gitlab_issue.get("milestone") else None, created_at=gitlab_issue["created_at"], updated_at=gitlab_issue["updated_at"], closed_at=gitlab_issue.get("closed_at"), due_date=gitlab_issue.get("due_date"), provider="gitlab", raw_data=gitlab_issue ) @staticmethod def from_canonical(issue: CanonicalIssue) -> dict: return { "title": issue.title, "description": issue.description, "state_event": "reopen" if issue.status == IssueStatus.OPEN else "close", "assignee_ids": issue.assignee_ids, "labels": ",".join(issue.labels), "milestone_id": issue.milestone, # Requires ID lookup "due_date": issue.due_date.isoformat() if issue.due_date else None, } ``` --- ## Code Examples ### Provider Interface ```python # app/services/sync/providers/base.py from abc import ABC, abstractmethod from typing import AsyncIterator from app.schemas.sync.canonical import CanonicalIssue class IssueProvider(ABC): """Abstract base class for issue tracker providers.""" def __init__(self, connection: ExternalConnection): self.connection = connection self.base_url = connection.provider_url self.repo_owner = connection.repo_owner self.repo_name = connection.repo_name @abstractmethod async def get_issue(self, issue_number: int) -> CanonicalIssue: """Fetch a single issue by number.""" pass @abstractmethod async def list_issues( self, state: str = "all", since: datetime = None, labels: list[str] = None ) -> AsyncIterator[CanonicalIssue]: """List issues with optional filters.""" pass @abstractmethod async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue: """Create a new issue.""" pass @abstractmethod async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue: """Update an existing issue.""" pass @abstractmethod async def add_comment(self, issue_number: int, body: str) -> dict: """Add a comment to an issue.""" pass @abstractmethod async def setup_webhook(self, callback_url: str, secret: str) -> str: """Configure webhook for issue events. Returns webhook ID.""" pass @abstractmethod async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool: """Verify webhook signature.""" pass ``` ### Gitea Provider Implementation ```python # app/services/sync/providers/gitea.py import httpx from app.services.sync.providers.base import IssueProvider from app.services.sync.adapters.gitea import GiteaAdapter class GiteaProvider(IssueProvider): """Gitea issue tracker provider.""" def __init__(self, connection: ExternalConnection): super().__init__(connection) self.adapter = GiteaAdapter() self._client = None @property def api_url(self) -> str: return f"{self.base_url}/api/v1" async def _get_client(self) -> httpx.AsyncClient: if self._client is None: token = decrypt(self.connection.credentials_encrypted) self._client = httpx.AsyncClient( base_url=self.api_url, headers={ "Authorization": f"token {token}", "Accept": "application/json", }, timeout=30.0 ) return self._client async def get_issue(self, issue_number: int) -> CanonicalIssue: client = await self._get_client() response = await client.get( f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}" ) response.raise_for_status() return self.adapter.to_canonical(response.json(), self.base_url) async def list_issues( self, state: str = "all", since: datetime = None, labels: list[str] = None ) -> AsyncIterator[CanonicalIssue]: client = await self._get_client() page = 1 while True: params = { "state": state, "page": page, "limit": 50, # Gitea default max } if since: params["since"] = since.isoformat() if labels: params["labels"] = ",".join(labels) response = await client.get( f"/repos/{self.repo_owner}/{self.repo_name}/issues", params=params ) response.raise_for_status() issues = response.json() if not issues: break for issue in issues: yield self.adapter.to_canonical(issue, self.base_url) # Check pagination link_header = response.headers.get("link", "") if 'rel="next"' not in link_header: break page += 1 async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue: client = await self._get_client() payload = self.adapter.from_canonical(issue) response = await client.post( f"/repos/{self.repo_owner}/{self.repo_name}/issues", json=payload ) response.raise_for_status() return self.adapter.to_canonical(response.json(), self.base_url) async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue: client = await self._get_client() payload = self.adapter.from_canonical(issue) response = await client.patch( f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}", json=payload ) response.raise_for_status() return self.adapter.to_canonical(response.json(), self.base_url) async def add_comment(self, issue_number: int, body: str) -> dict: client = await self._get_client() response = await client.post( f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}/comments", json={"body": body} ) response.raise_for_status() return response.json() async def setup_webhook(self, callback_url: str, secret: str) -> str: client = await self._get_client() response = await client.post( f"/repos/{self.repo_owner}/{self.repo_name}/hooks", json={ "type": "gitea", "active": True, "events": ["issues", "issue_comment"], "config": { "url": callback_url, "content_type": "json", "secret": secret, } } ) response.raise_for_status() return str(response.json()["id"]) async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool: secret = decrypt(self.connection.webhook_secret_encrypted) return GiteaWebhookValidator.verify_signature(payload, headers, secret) ``` ### Sync Engine ```python # app/services/sync/sync_engine.py from app.services.sync.providers.factory import ProviderFactory from app.services.sync.conflict_resolver import ConflictResolver from app.models.issue_sync import IssueSyncLog, SyncOutbox class SyncEngine: """Orchestrates synchronization between Syndarix and external trackers.""" def __init__(self, db: AsyncSession): self.db = db self.conflict_resolver = ConflictResolver() async def sync_inbound( self, connection: ExternalConnection, external_issue: CanonicalIssue ) -> Issue: """Sync an issue from external tracker to Syndarix.""" # Find existing local issue local_issue = await self._find_by_external_id( connection.project_id, external_issue.provider, external_issue.external_id ) if local_issue is None: # Create new local issue local_issue = await self._create_local_issue( connection.project_id, external_issue ) await self._log_sync( local_issue, direction="inbound", event_type="created", status="success" ) else: # Check for conflicts conflict_result = self.conflict_resolver.check( local_issue.version_vector, external_issue.raw_data.get("_version_vector", {}), local_issue.updated_at, external_issue.updated_at ) if conflict_result.has_conflict: # Apply resolution strategy resolved = self.conflict_resolver.resolve( local_issue, external_issue, conflict_result ) await self._update_local_issue(local_issue, resolved) await self._log_sync( local_issue, direction="inbound", event_type="conflict_resolved", status="success", conflict_details=conflict_result.to_dict() ) else: # Normal update await self._update_local_issue(local_issue, external_issue) await self._log_sync( local_issue, direction="inbound", event_type="updated", status="success" ) return local_issue async def sync_outbound( self, connection: ExternalConnection, local_issue: Issue ): """Queue a local issue for sync to external tracker.""" provider = ProviderFactory.get_provider(connection) canonical = self._to_canonical(local_issue) outbox_entry = SyncOutbox( issue_id=local_issue.id, project_id=connection.project_id, provider=connection.provider, operation="update" if local_issue.external_id else "create", payload=canonical.dict(), idempotency_key=f"{local_issue.id}:{local_issue.updated_at.isoformat()}" ) self.db.add(outbox_entry) await self.db.commit() async def initial_import( self, connection: ExternalConnection, since: datetime = None, labels: list[str] = None ) -> int: """Import all issues from external tracker.""" provider = ProviderFactory.get_provider(connection) imported = 0 async for external_issue in provider.list_issues( state="all", since=since, labels=labels ): await self.sync_inbound(connection, external_issue) imported += 1 connection.last_sync_at = datetime.utcnow() await self.db.commit() return imported async def reconcile(self, connection: ExternalConnection): """ Periodic reconciliation to catch missed webhooks. Runs via Celery Beat every 15-30 minutes. """ since = connection.last_sync_at or (datetime.utcnow() - timedelta(hours=24)) provider = ProviderFactory.get_provider(connection) async for external_issue in provider.list_issues( state="all", since=since ): local_issue = await self._find_by_external_id( connection.project_id, external_issue.provider, external_issue.external_id ) if local_issue: # Check if external is newer if external_issue.updated_at > local_issue.external_updated_at: await self.sync_inbound(connection, external_issue) else: await self.sync_inbound(connection, external_issue) connection.last_sync_at = datetime.utcnow() await self.db.commit() ``` --- ## Error Handling & Recovery ### Error Categories and Handling | Error Type | Handling Strategy | Retry | Alert | |------------|-------------------|-------|-------| | Network timeout | Exponential backoff | Yes (3x) | After 3 failures | | Rate limit (429) | Wait for reset | Yes | No | | Auth error (401/403) | Mark connection as error | No | Yes | | Not found (404) | Mark issue as deleted | No | No | | Conflict (409) | Apply resolution strategy | No | If unresolved | | Server error (5xx) | Exponential backoff | Yes (5x) | After 5 failures | | Validation error | Log and skip | No | Yes | ### Retry Strategy ```python # app/services/sync/retry.py import asyncio from functools import wraps def with_retry( max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, retryable_exceptions: tuple = (httpx.TimeoutException, httpx.NetworkError) ): """Decorator for retry with exponential backoff.""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except retryable_exceptions as e: last_exception = e if attempt < max_attempts - 1: delay = min( base_delay * (exponential_base ** attempt), max_delay ) await asyncio.sleep(delay) raise last_exception return wrapper return decorator ``` ### Dead Letter Queue Handling ```python # app/workers/dead_letter_worker.py @celery_app.task def process_dead_letter_queue(): """Process failed sync items for manual review or retry.""" dead_items = db.query(SyncOutbox).filter( SyncOutbox.status == "dead_letter", SyncOutbox.created_at > datetime.utcnow() - timedelta(days=7) ).all() for item in dead_items: # Create notification for admin review notify_admin( f"Sync failed for issue {item.issue_id}", details={ "provider": item.provider, "operation": item.operation, "attempts": item.attempts, "last_error": item.last_error, } ) # Optionally attempt one more retry with manual intervention if should_retry(item): item.status = "pending" item.attempts = 0 item.scheduled_at = datetime.utcnow() ``` ### Health Monitoring ```python # app/services/sync/health.py async def check_sync_health(project_id: str) -> dict: """Check sync health for a project.""" connections = await get_connections(project_id) health = { "status": "healthy", "connections": [], "pending_syncs": 0, "failed_syncs_24h": 0, "conflicts_24h": 0, } for conn in connections: conn_health = { "provider": conn.provider, "status": conn.status, "last_sync": conn.last_sync_at, "webhook_active": conn.webhook_active, } # Check if sync is stale if conn.last_sync_at: stale_threshold = datetime.utcnow() - timedelta(hours=1) if conn.last_sync_at < stale_threshold: conn_health["status"] = "stale" health["status"] = "degraded" health["connections"].append(conn_health) # Count pending and failed health["pending_syncs"] = await count_pending_syncs(project_id) health["failed_syncs_24h"] = await count_failed_syncs(project_id, hours=24) health["conflicts_24h"] = await count_conflicts(project_id, hours=24) if health["failed_syncs_24h"] > 10: health["status"] = "unhealthy" return health ``` --- ## Implementation Roadmap ### Phase 1: Foundation (Week 1-2) - [ ] Database schema and migrations - [ ] Core models (ExternalConnection, IssueSyncLog, SyncOutbox) - [ ] Provider interface and Gitea implementation - [ ] Canonical issue model and field mapping ### Phase 2: Inbound Sync (Week 2-3) - [ ] Webhook endpoint and signature validation - [ ] Redis queue for webhook events - [ ] Celery worker for event processing - [ ] Initial import functionality - [ ] Basic conflict detection ### Phase 3: Outbound Sync (Week 3-4) - [ ] Outbox pattern implementation - [ ] Outbound sync worker - [ ] Retry and dead letter queue - [ ] Bidirectional sync testing ### Phase 4: GitHub & GitLab (Week 4-5) - [ ] GitHub provider implementation - [ ] GitLab provider implementation - [ ] Provider factory and dynamic selection - [ ] Cross-provider field mapping ### Phase 5: Conflict Resolution (Week 5-6) - [ ] Version vector implementation - [ ] Conflict resolution strategies - [ ] Merge algorithms for complex fields - [ ] Conflict notification UI ### Phase 6: Production Readiness (Week 6-7) - [ ] Health monitoring and alerting - [ ] Admin UI for connection management - [ ] Comprehensive test coverage - [ ] Performance optimization - [ ] Documentation --- ## References ### Research Sources - [Two-Way Sync Tools 2025: Best Platforms for Real-Time Data Integration](https://www.stacksync.com/blog/2025-best-two-way-sync-tools-a-comprehensive-guide-for-data-integration) - StackSync - [The Architect's Guide to Data Integration Patterns](https://medium.com/@prayagvakharia/the-architects-guide-to-data-integration-patterns-migration-broadcast-bi-directional-a4c92b5f908d) - Medium - [System Design Pattern: Conflict Resolution in Distributed Systems](https://medium.com/@priyasrivastava18official/system-design-pattern-from-chaos-to-consistency-the-art-of-conflict-resolution-in-distributed-9d631028bdb4) - Medium - [Eventual Consistency in Distributed Systems](https://www.geeksforgeeks.org/system-design/eventual-consistency-in-distributive-systems-learn-system-design/) - GeeksforGeeks - [Bidirectional Synchronization: What It Is and Examples](https://www.workato.com/the-connector/bidirectional-synchronization/) - Workato - [Data Integration Patterns: Bi-Directional Sync](https://blogs.mulesoft.com/api-integration/patterns/data-integration-patterns-bi-directional-sync/) - MuleSoft ### API Documentation - [GitHub REST API - Issues](https://docs.github.com/en/rest/issues/issues) - [GitHub Webhook Events](https://docs.github.com/en/webhooks/webhook-events-and-payloads) - [GitLab Issues API](https://docs.gitlab.com/api/issues.html) - [GitLab Webhooks](https://docs.gitlab.com/ee/user/project/integrations/webhooks.html) - [Gitea API Usage](https://docs.gitea.com/development/api-usage) ### Related Syndarix Spikes - [SPIKE-001: MCP Integration Pattern](./SPIKE-001-mcp-integration-pattern.md) - MCP architecture and FastMCP usage - [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md) - SSE for event streaming - [SPIKE-004: Celery Redis Integration](./SPIKE-004-celery-redis-integration.md) - Background job infrastructure --- ## Decision **Adopt a webhook-first, polling-fallback synchronization architecture** with: 1. **Last-Writer-Wins (LWW)** conflict resolution using version vectors 2. **External tracker as source of truth** with local mirrors 3. **Unified provider interface** abstracting Gitea, GitHub, GitLab differences 4. **Outbox pattern** for reliable outbound sync 5. **Redis Streams** for webhook event queuing and deduplication 6. **Celery Beat** for periodic reconciliation This approach balances real-time responsiveness with eventual consistency, providing a robust foundation for bidirectional issue synchronization. --- *Spike completed. Findings will inform ADR-009: Issue Synchronization Architecture.*