Files
syndarix/docs/spikes/SPIKE-009-issue-synchronization.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

55 KiB

SPIKE-009: Issue Synchronization with External Trackers

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #9


Executive Summary

This spike researches bi-directional issue synchronization between Syndarix and external issue trackers (Gitea, GitHub, GitLab). After analyzing sync patterns, conflict resolution strategies, and API capabilities of each platform, we recommend:

Primary Recommendation: Implement a webhook-first, polling-fallback architecture with Last-Writer-Wins (LWW) conflict resolution using vector clocks for causality tracking. External trackers serve as the source of truth with Syndarix maintaining local mirrors for unified agent access.

Key Decisions:

  1. Use webhooks for real-time sync; polling for reconciliation and initial import
  2. Implement version vectors for conflict detection with LWW resolution
  3. Store sync metadata in dedicated issue_sync_log table for audit and recovery
  4. Abstract provider differences behind a unified IssueProvider interface
  5. Use Redis for webhook event queuing and deduplication

Table of Contents

  1. Research Questions & Answers
  2. Sync Architecture
  3. Conflict Resolution Strategy
  4. Webhook Handling Design
  5. Provider API Comparison
  6. Database Schema
  7. Field Mapping Specification
  8. Code Examples
  9. Error Handling & Recovery
  10. Implementation Roadmap
  11. References

Research Questions & Answers

1. Best patterns for bi-directional data sync?

Answer: The Hub-and-Spoke pattern with Syndarix as the hub is recommended. This pattern:

  • Designates external trackers as authoritative sources
  • Maintains local mirrors for unified access
  • Synchronizes changes bidirectionally through a central sync engine
  • Uses explicit ownership rules per field to prevent conflicts

Modern implementations leverage Conflict-Free Replicated Data Types (CRDTs) for automatic conflict resolution, but for issue tracking where human review may be desired, version vectors with LWW provides better control.

2. Handling sync conflicts (edited in both places)?

Answer: Implement a tiered conflict resolution strategy:

Scenario Resolution
Same field, different times Last-Writer-Wins (LWW)
Same field, concurrent edits Mark as conflict, notify user
Different fields Merge both changes
Delete vs Update Delete wins (configurable)

Use version vectors to detect concurrent modifications. Each system maintains a version counter, and conflicts are identified when neither version dominates the other.

3. Webhook vs polling strategies?

Answer: Hybrid approach - webhooks primary, polling secondary.

Strategy Use Case Latency Resource Cost
Webhooks Real-time sync <1s Low
Polling Initial import, reconciliation, fallback Minutes Medium
On-demand User-triggered refresh Immediate Minimal

Rationale: Webhooks provide real-time updates but may miss events during outages. Periodic polling (every 15-30 minutes) ensures eventual consistency.

4. Rate limiting and API quota management?

Answer: Implement a token bucket with adaptive throttling:

Provider Auth Rate Limit Unauthenticated
GitHub 5,000/hour 60/hour
GitLab 600/minute 10/minute
Gitea Configurable (default: 50 items/response) N/A

Strategies:

  • Use conditional requests (If-None-Match, If-Modified-Since) to avoid counting unchanged responses
  • Implement exponential backoff on 429/403 responses
  • Cache responses with ETags
  • Batch operations where possible
  • Monitor X-RateLimit-Remaining headers

5. Eventual consistency vs strong consistency tradeoffs?

Answer: Eventual consistency is acceptable and recommended for issue sync.

Consistency Pros Cons
Strong Always accurate Higher latency, complex implementation
Eventual Better performance, simpler Temporary inconsistency

Rationale: Issue tracking tolerates brief inconsistency windows (seconds to minutes). Users can manually refresh if needed. The simplicity and performance gains outweigh the drawbacks.

6. How to map different field schemas?

Answer: Use a canonical field model with provider-specific adapters.

External Field → Provider Adapter → Canonical Model → Local Storage
Local Field → Canonical Model → Provider Adapter → External Field

See Field Mapping Specification for detailed mappings.

7. Handling offline/disconnected scenarios?

Answer: Implement an outbox pattern with retry queue:

  1. Queue all outgoing changes in local sync_outbox table
  2. Background worker processes queue with exponential backoff
  3. Mark items as pending, in_progress, completed, or failed
  4. Dead letter queue for items exceeding max retries
  5. Manual reconciliation UI for failed items

Sync Architecture

High-Level Architecture Diagram

                                   External Issue Trackers
                        ┌─────────────────┬─────────────────┬─────────────────┐
                        │     Gitea       │     GitHub      │     GitLab      │
                        │   (Primary)     │                 │                 │
                        └────────┬────────┴────────┬────────┴────────┬────────┘
                                 │                 │                 │
                        ┌────────┴─────────────────┴─────────────────┴────────┐
                        │                    Webhooks                          │
                        └─────────────────────────┬───────────────────────────┘
                                                  │
┌─────────────────────────────────────────────────┼─────────────────────────────────────────────┐
│                                    Syndarix Backend                                            │
│  ┌────────────────────┐           ┌────────────┴───────────┐           ┌────────────────────┐ │
│  │   Webhook Handler  │◀──────────│      Redis Queue       │──────────▶│   Polling Worker   │ │
│  │  (FastAPI Route)   │           │  (Event Dedup/Buffer)  │           │  (Celery Beat)     │ │
│  └─────────┬──────────┘           └────────────────────────┘           └─────────┬──────────┘ │
│            │                                                                      │            │
│            └──────────────────────────┬──────────────────────────────────────────┘            │
│                                       │                                                        │
│                          ┌────────────┴────────────┐                                          │
│                          │     Sync Engine         │                                          │
│                          │  ┌──────────────────┐   │                                          │
│                          │  │ Provider Factory │   │                                          │
│                          │  │  ┌─────────────┐ │   │                                          │
│                          │  │  │GiteaProvider│ │   │                                          │
│                          │  │  │GitHubProvider│ │  │                                          │
│                          │  │  │GitLabProvider│ │  │                                          │
│                          │  │  └─────────────┘ │   │                                          │
│                          │  └──────────────────┘   │                                          │
│                          │  ┌──────────────────┐   │                                          │
│                          │  │Conflict Resolver │   │                                          │
│                          │  └──────────────────┘   │                                          │
│                          │  ┌──────────────────┐   │                                          │
│                          │  │ Field Mapper     │   │                                          │
│                          │  └──────────────────┘   │                                          │
│                          └────────────┬────────────┘                                          │
│                                       │                                                        │
│                          ┌────────────┴────────────┐                                          │
│                          │      PostgreSQL         │                                          │
│                          │  ┌──────────────────┐   │                                          │
│                          │  │     issues       │   │                                          │
│                          │  │  issue_sync_log  │   │                                          │
│                          │  │  sync_outbox     │   │                                          │
│                          │  │  external_links  │   │                                          │
│                          │  └──────────────────┘   │                                          │
│                          └─────────────────────────┘                                          │
└───────────────────────────────────────────────────────────────────────────────────────────────┘

Component Responsibilities

Component Responsibility
Webhook Handler Receive, validate, and queue incoming webhooks
Redis Queue Buffer events, deduplicate, handle backpressure
Polling Worker Periodic reconciliation, initial import, fallback
Sync Engine Orchestrate sync operations, apply business logic
Provider Factory Create provider-specific clients
Conflict Resolver Detect and resolve sync conflicts
Field Mapper Transform between canonical and provider schemas

Data Flow

Inbound (External to Syndarix):

1. Webhook received → Validate signature → Queue in Redis
2. Worker dequeues → Parse payload → Transform to canonical model
3. Check for conflicts → Apply resolution strategy
4. Update local database → Log sync operation
5. Notify connected clients via SSE (per SPIKE-003)

Outbound (Syndarix to External):

1. Local change detected → Transform to provider format
2. Queue in sync_outbox → Worker picks up
3. Call external API → Handle response
4. Update sync metadata → Mark as synced
5. Handle failures → Retry with backoff

Conflict Resolution Strategy

Version Vector Implementation

Each issue maintains a version vector tracking modifications across systems:

# Version vector structure
{
    "syndarix": 5,      # Local modification count
    "gitea": 3,         # Gitea modification count
    "github": 0,        # Not synced with GitHub
    "gitlab": 2         # GitLab modification count
}

Conflict Detection Algorithm

def detect_conflict(local_version: dict, remote_version: dict) -> str:
    """
    Compare version vectors to detect conflicts.

    Returns:
        - "local_wins": Local version dominates
        - "remote_wins": Remote version dominates
        - "conflict": Concurrent modification (neither dominates)
        - "equal": No changes
    """
    local_dominates = all(
        local_version.get(k, 0) >= v
        for k, v in remote_version.items()
    )
    remote_dominates = all(
        remote_version.get(k, 0) >= v
        for k, v in local_version.items()
    )

    if local_version == remote_version:
        return "equal"
    elif local_dominates and not remote_dominates:
        return "local_wins"
    elif remote_dominates and not local_dominates:
        return "remote_wins"
    else:
        return "conflict"

Resolution Strategies

from enum import Enum

class ConflictStrategy(str, Enum):
    REMOTE_WINS = "remote_wins"      # External tracker is source of truth
    LOCAL_WINS = "local_wins"        # Syndarix changes take precedence
    LAST_WRITE_WINS = "lww"          # Most recent timestamp wins
    MANUAL = "manual"                # Flag for human review
    MERGE = "merge"                  # Attempt field-level merge

# Default strategy per field
FIELD_STRATEGIES = {
    "title": ConflictStrategy.LAST_WRITE_WINS,
    "description": ConflictStrategy.MERGE,
    "status": ConflictStrategy.REMOTE_WINS,
    "assignees": ConflictStrategy.MERGE,
    "labels": ConflictStrategy.MERGE,
    "comments": ConflictStrategy.MERGE,  # Append both
    "priority": ConflictStrategy.REMOTE_WINS,
}

Merge Algorithm for Complex Fields

def merge_labels(local: list[str], remote: list[str], base: list[str]) -> list[str]:
    """Three-way merge for labels."""
    local_added = set(local) - set(base)
    local_removed = set(base) - set(local)
    remote_added = set(remote) - set(base)
    remote_removed = set(base) - set(remote)

    result = set(base)
    result |= local_added | remote_added
    result -= local_removed | remote_removed

    return sorted(result)

Webhook Handling Design

Webhook Endpoint Architecture

# app/api/v1/webhooks/issues.py
from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
from app.services.sync.webhook_handler import WebhookHandler
from app.core.redis import redis_client
import hashlib
import hmac

router = APIRouter()

@router.post("/webhooks/{provider}/{project_id}")
async def receive_webhook(
    provider: str,
    project_id: str,
    request: Request,
    background_tasks: BackgroundTasks
):
    """
    Unified webhook endpoint for all providers.

    Path: /api/v1/webhooks/{provider}/{project_id}
    Providers: gitea, github, gitlab
    """
    body = await request.body()
    headers = dict(request.headers)

    # Validate webhook signature
    handler = WebhookHandler.get_handler(provider)
    if not handler.verify_signature(body, headers):
        raise HTTPException(status_code=401, detail="Invalid signature")

    # Parse event type
    event_type = handler.get_event_type(headers)
    if event_type not in handler.supported_events:
        return {"status": "ignored", "reason": "unsupported_event"}

    # Deduplicate using event ID
    event_id = handler.get_event_id(headers, body)
    if await is_duplicate(event_id):
        return {"status": "duplicate"}

    # Queue for processing
    await redis_client.xadd(
        f"webhooks:{project_id}",
        {
            "provider": provider,
            "event_type": event_type,
            "payload": body,
            "received_at": datetime.utcnow().isoformat()
        }
    )

    return {"status": "queued", "event_id": event_id}


async def is_duplicate(event_id: str, ttl: int = 3600) -> bool:
    """Check if event was already processed (Redis-based dedup)."""
    key = f"webhook:processed:{event_id}"
    result = await redis_client.set(key, "1", ex=ttl, nx=True)
    return result is None  # None means key existed

Provider-Specific Signature Validation

# app/services/sync/webhook_validators.py

class GiteaWebhookValidator:
    @staticmethod
    def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
        """Gitea uses X-Gitea-Signature with HMAC-SHA256."""
        signature = headers.get("x-gitea-signature", "")
        expected = hmac.new(
            secret.encode(),
            body,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(signature, expected)

    @staticmethod
    def get_event_type(headers: dict) -> str:
        return headers.get("x-gitea-event", "")


class GitHubWebhookValidator:
    @staticmethod
    def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
        """GitHub uses X-Hub-Signature-256 with sha256=HMAC."""
        signature = headers.get("x-hub-signature-256", "")
        if not signature.startswith("sha256="):
            return False
        expected = "sha256=" + hmac.new(
            secret.encode(),
            body,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(signature, expected)

    @staticmethod
    def get_event_type(headers: dict) -> str:
        return headers.get("x-github-event", "")


class GitLabWebhookValidator:
    @staticmethod
    def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
        """GitLab uses X-Gitlab-Token for simple token matching."""
        token = headers.get("x-gitlab-token", "")
        return hmac.compare_digest(token, secret)

    @staticmethod
    def get_event_type(headers: dict) -> str:
        return headers.get("x-gitlab-event", "")

Webhook Event Processing (Celery Worker)

# app/workers/sync_worker.py
from celery import Celery
from app.services.sync.sync_engine import SyncEngine
from app.core.redis import redis_client

celery_app = Celery("syndarix")

@celery_app.task(bind=True, max_retries=3)
def process_webhook_event(self, project_id: str):
    """Process queued webhook events for a project."""
    try:
        # Read from Redis stream
        events = redis_client.xread(
            {f"webhooks:{project_id}": "0"},
            count=10,
            block=5000
        )

        sync_engine = SyncEngine()

        for stream_name, messages in events:
            for message_id, data in messages:
                try:
                    sync_engine.process_inbound_event(
                        provider=data["provider"],
                        event_type=data["event_type"],
                        payload=data["payload"]
                    )
                    # Acknowledge processed
                    redis_client.xdel(stream_name, message_id)
                except Exception as e:
                    log.error(f"Failed to process {message_id}: {e}")
                    # Will retry on next run

    except Exception as exc:
        self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Provider API Comparison

Issue Field Support Matrix

Field Gitea GitHub GitLab Syndarix Canonical
ID id (int) id (int) id (int) external_id (str)
Number number number iid external_number
Title title title title title
Body body body description description
State state (open/closed) state (open/closed) state (opened/closed) status (enum)
Assignees assignees[] assignees[] assignees[] assignee_ids[]
Labels labels[].name labels[].name labels[] (strings) labels[]
Milestone milestone.title milestone.title milestone.title milestone
Created created_at created_at created_at created_at
Updated updated_at updated_at updated_at updated_at
Due Date due_date N/A due_date due_date
Priority N/A (via labels) N/A (via labels) N/A (via labels) priority
URL html_url html_url web_url remote_url

Webhook Event Mapping

Action Gitea Event GitHub Event GitLab Event
Create issues:opened issues:opened Issue Hook:open
Update issues:edited issues:edited Issue Hook:update
Close issues:closed issues:closed Issue Hook:close
Reopen issues:reopened issues:reopened Issue Hook:reopen
Assign issues:assigned issues:assigned Issue Hook:update
Label issues:label_updated issues:labeled Issue Hook:update
Comment issue_comment:created issue_comment:created Note Hook

Rate Limits Comparison

Provider Authenticated Pagination Conditional Requests
GitHub 5,000/hour 100/page max ETag, If-Modified-Since
GitLab 600/minute 100/page max ETag
Gitea Configurable 50/page default Link header

Database Schema

Core Tables

-- Extended issues table with sync metadata
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_id VARCHAR(255);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_number INTEGER;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS remote_url TEXT;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider VARCHAR(50);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider_repo_id VARCHAR(255);
ALTER TABLE issues ADD COLUMN IF NOT EXISTS sync_status VARCHAR(20) DEFAULT 'pending';
ALTER TABLE issues ADD COLUMN IF NOT EXISTS version_vector JSONB DEFAULT '{}';
ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_updated_at TIMESTAMP WITH TIME ZONE;

-- Sync status enum values: synced, pending, conflict, error

CREATE INDEX idx_issues_external_id ON issues(provider, external_id);
CREATE INDEX idx_issues_sync_status ON issues(sync_status);
-- Issue sync log for audit trail
CREATE TABLE issue_sync_log (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
    project_id UUID REFERENCES projects(id) ON DELETE CASCADE,

    -- Sync details
    direction VARCHAR(10) NOT NULL,  -- 'inbound' or 'outbound'
    provider VARCHAR(50) NOT NULL,
    event_type VARCHAR(100) NOT NULL,

    -- Change tracking
    previous_state JSONB,
    new_state JSONB,
    diff JSONB,

    -- Conflict info
    had_conflict BOOLEAN DEFAULT FALSE,
    conflict_resolution VARCHAR(50),
    conflict_details JSONB,

    -- Status
    status VARCHAR(20) NOT NULL,  -- success, failed, skipped
    error_message TEXT,

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    processed_at TIMESTAMP WITH TIME ZONE,

    -- Webhook metadata
    webhook_event_id VARCHAR(255),
    webhook_delivery_id VARCHAR(255)
);

CREATE INDEX idx_sync_log_issue ON issue_sync_log(issue_id);
CREATE INDEX idx_sync_log_project ON issue_sync_log(project_id);
CREATE INDEX idx_sync_log_status ON issue_sync_log(status);
CREATE INDEX idx_sync_log_created ON issue_sync_log(created_at);
-- Outbox for pending outbound syncs
CREATE TABLE sync_outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
    project_id UUID REFERENCES projects(id) ON DELETE CASCADE,

    -- Sync target
    provider VARCHAR(50) NOT NULL,
    operation VARCHAR(20) NOT NULL,  -- create, update, delete
    payload JSONB NOT NULL,

    -- Processing status
    status VARCHAR(20) DEFAULT 'pending',  -- pending, in_progress, completed, failed, dead_letter
    attempts INTEGER DEFAULT 0,
    max_attempts INTEGER DEFAULT 5,
    last_error TEXT,

    -- Scheduling
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    scheduled_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    processed_at TIMESTAMP WITH TIME ZONE,

    -- Idempotency
    idempotency_key VARCHAR(255) UNIQUE
);

CREATE INDEX idx_outbox_status ON sync_outbox(status, scheduled_at);
CREATE INDEX idx_outbox_issue ON sync_outbox(issue_id);
-- External provider connections
CREATE TABLE external_connections (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
    organization_id UUID REFERENCES organizations(id) ON DELETE CASCADE,

    -- Provider details
    provider VARCHAR(50) NOT NULL,
    provider_url TEXT NOT NULL,  -- Base URL (e.g., https://gitea.example.com)
    repo_owner VARCHAR(255) NOT NULL,
    repo_name VARCHAR(255) NOT NULL,

    -- Authentication
    auth_type VARCHAR(20) NOT NULL,  -- token, oauth, app
    credentials_encrypted TEXT,  -- Encrypted token/credentials

    -- Webhook configuration
    webhook_secret_encrypted TEXT,
    webhook_id VARCHAR(255),  -- ID from provider
    webhook_active BOOLEAN DEFAULT TRUE,

    -- Sync settings
    sync_enabled BOOLEAN DEFAULT TRUE,
    sync_direction VARCHAR(20) DEFAULT 'bidirectional',  -- inbound, outbound, bidirectional
    sync_labels_filter JSONB,  -- Only sync issues with these labels
    sync_milestones_filter JSONB,

    -- Status tracking
    last_sync_at TIMESTAMP WITH TIME ZONE,
    last_error TEXT,
    status VARCHAR(20) DEFAULT 'active',  -- active, paused, error

    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    UNIQUE(project_id, provider, repo_owner, repo_name)
);

CREATE INDEX idx_connections_project ON external_connections(project_id);
CREATE INDEX idx_connections_status ON external_connections(status);

SQLAlchemy Models

# app/models/issue_sync.py
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, JSON
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
import enum

class SyncStatus(str, enum.Enum):
    SYNCED = "synced"
    PENDING = "pending"
    CONFLICT = "conflict"
    ERROR = "error"

class SyncDirection(str, enum.Enum):
    INBOUND = "inbound"
    OUTBOUND = "outbound"
    BIDIRECTIONAL = "bidirectional"

class Provider(str, enum.Enum):
    GITEA = "gitea"
    GITHUB = "github"
    GITLAB = "gitlab"


class ExternalConnection(Base, UUIDMixin, TimestampMixin):
    __tablename__ = "external_connections"

    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
    organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"))

    provider = Column(String(50), nullable=False)
    provider_url = Column(String, nullable=False)
    repo_owner = Column(String(255), nullable=False)
    repo_name = Column(String(255), nullable=False)

    auth_type = Column(String(20), nullable=False)
    credentials_encrypted = Column(String)

    webhook_secret_encrypted = Column(String)
    webhook_id = Column(String(255))
    webhook_active = Column(Boolean, default=True)

    sync_enabled = Column(Boolean, default=True)
    sync_direction = Column(String(20), default="bidirectional")
    sync_labels_filter = Column(JSONB)
    sync_milestones_filter = Column(JSONB)

    last_sync_at = Column(DateTime(timezone=True))
    last_error = Column(String)
    status = Column(String(20), default="active")


class IssueSyncLog(Base, UUIDMixin):
    __tablename__ = "issue_sync_log"

    issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))

    direction = Column(String(10), nullable=False)
    provider = Column(String(50), nullable=False)
    event_type = Column(String(100), nullable=False)

    previous_state = Column(JSONB)
    new_state = Column(JSONB)
    diff = Column(JSONB)

    had_conflict = Column(Boolean, default=False)
    conflict_resolution = Column(String(50))
    conflict_details = Column(JSONB)

    status = Column(String(20), nullable=False)
    error_message = Column(String)

    created_at = Column(DateTime(timezone=True), server_default=func.now())
    processed_at = Column(DateTime(timezone=True))

    webhook_event_id = Column(String(255))
    webhook_delivery_id = Column(String(255))


class SyncOutbox(Base, UUIDMixin):
    __tablename__ = "sync_outbox"

    issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
    project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))

    provider = Column(String(50), nullable=False)
    operation = Column(String(20), nullable=False)
    payload = Column(JSONB, nullable=False)

    status = Column(String(20), default="pending")
    attempts = Column(Integer, default=0)
    max_attempts = Column(Integer, default=5)
    last_error = Column(String)

    created_at = Column(DateTime(timezone=True), server_default=func.now())
    scheduled_at = Column(DateTime(timezone=True), server_default=func.now())
    processed_at = Column(DateTime(timezone=True))

    idempotency_key = Column(String(255), unique=True)

Field Mapping Specification

Canonical Issue Model

# app/schemas/sync/canonical.py
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
from enum import Enum

class IssueStatus(str, Enum):
    OPEN = "open"
    CLOSED = "closed"
    IN_PROGRESS = "in_progress"  # Syndarix-only

class CanonicalIssue(BaseModel):
    """Canonical issue representation for sync operations."""

    # Identity
    external_id: str
    external_number: int
    remote_url: str

    # Core fields
    title: str
    description: Optional[str] = None
    status: IssueStatus

    # Relationships
    assignee_ids: list[str] = Field(default_factory=list)
    assignee_usernames: list[str] = Field(default_factory=list)
    labels: list[str] = Field(default_factory=list)
    milestone: Optional[str] = None

    # Metadata
    created_at: datetime
    updated_at: datetime
    closed_at: Optional[datetime] = None
    due_date: Optional[datetime] = None

    # Provider info
    provider: str
    raw_data: dict = Field(default_factory=dict)  # Original payload

Provider Adapters

# app/services/sync/adapters/gitea.py
from app.schemas.sync.canonical import CanonicalIssue, IssueStatus

class GiteaAdapter:
    """Transform between Gitea API format and canonical model."""

    @staticmethod
    def to_canonical(gitea_issue: dict, base_url: str) -> CanonicalIssue:
        return CanonicalIssue(
            external_id=str(gitea_issue["id"]),
            external_number=gitea_issue["number"],
            remote_url=gitea_issue["html_url"],
            title=gitea_issue["title"],
            description=gitea_issue.get("body"),
            status=IssueStatus.OPEN if gitea_issue["state"] == "open" else IssueStatus.CLOSED,
            assignee_ids=[str(a["id"]) for a in gitea_issue.get("assignees", [])],
            assignee_usernames=[a["login"] for a in gitea_issue.get("assignees", [])],
            labels=[label["name"] for label in gitea_issue.get("labels", [])],
            milestone=gitea_issue.get("milestone", {}).get("title"),
            created_at=gitea_issue["created_at"],
            updated_at=gitea_issue["updated_at"],
            closed_at=gitea_issue.get("closed_at"),
            due_date=gitea_issue.get("due_date"),
            provider="gitea",
            raw_data=gitea_issue
        )

    @staticmethod
    def from_canonical(issue: CanonicalIssue) -> dict:
        """Convert canonical to Gitea API format for updates."""
        return {
            "title": issue.title,
            "body": issue.description,
            "state": "open" if issue.status == IssueStatus.OPEN else "closed",
            "assignees": issue.assignee_usernames,
            "labels": issue.labels,
            "milestone": issue.milestone,
            "due_date": issue.due_date.isoformat() if issue.due_date else None,
        }


# app/services/sync/adapters/github.py
class GitHubAdapter:
    """Transform between GitHub API format and canonical model."""

    @staticmethod
    def to_canonical(github_issue: dict, base_url: str) -> CanonicalIssue:
        return CanonicalIssue(
            external_id=str(github_issue["id"]),
            external_number=github_issue["number"],
            remote_url=github_issue["html_url"],
            title=github_issue["title"],
            description=github_issue.get("body"),
            status=IssueStatus.OPEN if github_issue["state"] == "open" else IssueStatus.CLOSED,
            assignee_ids=[str(a["id"]) for a in github_issue.get("assignees", [])],
            assignee_usernames=[a["login"] for a in github_issue.get("assignees", [])],
            labels=[label["name"] for label in github_issue.get("labels", [])],
            milestone=github_issue.get("milestone", {}).get("title") if github_issue.get("milestone") else None,
            created_at=github_issue["created_at"],
            updated_at=github_issue["updated_at"],
            closed_at=github_issue.get("closed_at"),
            due_date=None,  # GitHub doesn't have native due dates
            provider="github",
            raw_data=github_issue
        )

    @staticmethod
    def from_canonical(issue: CanonicalIssue) -> dict:
        return {
            "title": issue.title,
            "body": issue.description,
            "state": "open" if issue.status == IssueStatus.OPEN else "closed",
            "assignees": issue.assignee_usernames,
            "labels": issue.labels,
            "milestone": issue.milestone,
        }


# app/services/sync/adapters/gitlab.py
class GitLabAdapter:
    """Transform between GitLab API format and canonical model."""

    @staticmethod
    def to_canonical(gitlab_issue: dict, base_url: str) -> CanonicalIssue:
        # GitLab uses 'opened' instead of 'open'
        state = gitlab_issue["state"]
        status = IssueStatus.OPEN if state == "opened" else IssueStatus.CLOSED

        return CanonicalIssue(
            external_id=str(gitlab_issue["id"]),
            external_number=gitlab_issue["iid"],  # GitLab uses iid for project-scoped number
            remote_url=gitlab_issue["web_url"],
            title=gitlab_issue["title"],
            description=gitlab_issue.get("description"),
            status=status,
            assignee_ids=[str(a["id"]) for a in gitlab_issue.get("assignees", [])],
            assignee_usernames=[a["username"] for a in gitlab_issue.get("assignees", [])],
            labels=gitlab_issue.get("labels", []),  # GitLab returns labels as strings
            milestone=gitlab_issue.get("milestone", {}).get("title") if gitlab_issue.get("milestone") else None,
            created_at=gitlab_issue["created_at"],
            updated_at=gitlab_issue["updated_at"],
            closed_at=gitlab_issue.get("closed_at"),
            due_date=gitlab_issue.get("due_date"),
            provider="gitlab",
            raw_data=gitlab_issue
        )

    @staticmethod
    def from_canonical(issue: CanonicalIssue) -> dict:
        return {
            "title": issue.title,
            "description": issue.description,
            "state_event": "reopen" if issue.status == IssueStatus.OPEN else "close",
            "assignee_ids": issue.assignee_ids,
            "labels": ",".join(issue.labels),
            "milestone_id": issue.milestone,  # Requires ID lookup
            "due_date": issue.due_date.isoformat() if issue.due_date else None,
        }

Code Examples

Provider Interface

# app/services/sync/providers/base.py
from abc import ABC, abstractmethod
from typing import AsyncIterator
from app.schemas.sync.canonical import CanonicalIssue

class IssueProvider(ABC):
    """Abstract base class for issue tracker providers."""

    def __init__(self, connection: ExternalConnection):
        self.connection = connection
        self.base_url = connection.provider_url
        self.repo_owner = connection.repo_owner
        self.repo_name = connection.repo_name

    @abstractmethod
    async def get_issue(self, issue_number: int) -> CanonicalIssue:
        """Fetch a single issue by number."""
        pass

    @abstractmethod
    async def list_issues(
        self,
        state: str = "all",
        since: datetime = None,
        labels: list[str] = None
    ) -> AsyncIterator[CanonicalIssue]:
        """List issues with optional filters."""
        pass

    @abstractmethod
    async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
        """Create a new issue."""
        pass

    @abstractmethod
    async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
        """Update an existing issue."""
        pass

    @abstractmethod
    async def add_comment(self, issue_number: int, body: str) -> dict:
        """Add a comment to an issue."""
        pass

    @abstractmethod
    async def setup_webhook(self, callback_url: str, secret: str) -> str:
        """Configure webhook for issue events. Returns webhook ID."""
        pass

    @abstractmethod
    async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
        """Verify webhook signature."""
        pass

Gitea Provider Implementation

# app/services/sync/providers/gitea.py
import httpx
from app.services.sync.providers.base import IssueProvider
from app.services.sync.adapters.gitea import GiteaAdapter

class GiteaProvider(IssueProvider):
    """Gitea issue tracker provider."""

    def __init__(self, connection: ExternalConnection):
        super().__init__(connection)
        self.adapter = GiteaAdapter()
        self._client = None

    @property
    def api_url(self) -> str:
        return f"{self.base_url}/api/v1"

    async def _get_client(self) -> httpx.AsyncClient:
        if self._client is None:
            token = decrypt(self.connection.credentials_encrypted)
            self._client = httpx.AsyncClient(
                base_url=self.api_url,
                headers={
                    "Authorization": f"token {token}",
                    "Accept": "application/json",
                },
                timeout=30.0
            )
        return self._client

    async def get_issue(self, issue_number: int) -> CanonicalIssue:
        client = await self._get_client()
        response = await client.get(
            f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}"
        )
        response.raise_for_status()
        return self.adapter.to_canonical(response.json(), self.base_url)

    async def list_issues(
        self,
        state: str = "all",
        since: datetime = None,
        labels: list[str] = None
    ) -> AsyncIterator[CanonicalIssue]:
        client = await self._get_client()
        page = 1

        while True:
            params = {
                "state": state,
                "page": page,
                "limit": 50,  # Gitea default max
            }
            if since:
                params["since"] = since.isoformat()
            if labels:
                params["labels"] = ",".join(labels)

            response = await client.get(
                f"/repos/{self.repo_owner}/{self.repo_name}/issues",
                params=params
            )
            response.raise_for_status()

            issues = response.json()
            if not issues:
                break

            for issue in issues:
                yield self.adapter.to_canonical(issue, self.base_url)

            # Check pagination
            link_header = response.headers.get("link", "")
            if 'rel="next"' not in link_header:
                break
            page += 1

    async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
        client = await self._get_client()
        payload = self.adapter.from_canonical(issue)

        response = await client.post(
            f"/repos/{self.repo_owner}/{self.repo_name}/issues",
            json=payload
        )
        response.raise_for_status()
        return self.adapter.to_canonical(response.json(), self.base_url)

    async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
        client = await self._get_client()
        payload = self.adapter.from_canonical(issue)

        response = await client.patch(
            f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}",
            json=payload
        )
        response.raise_for_status()
        return self.adapter.to_canonical(response.json(), self.base_url)

    async def add_comment(self, issue_number: int, body: str) -> dict:
        client = await self._get_client()
        response = await client.post(
            f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}/comments",
            json={"body": body}
        )
        response.raise_for_status()
        return response.json()

    async def setup_webhook(self, callback_url: str, secret: str) -> str:
        client = await self._get_client()
        response = await client.post(
            f"/repos/{self.repo_owner}/{self.repo_name}/hooks",
            json={
                "type": "gitea",
                "active": True,
                "events": ["issues", "issue_comment"],
                "config": {
                    "url": callback_url,
                    "content_type": "json",
                    "secret": secret,
                }
            }
        )
        response.raise_for_status()
        return str(response.json()["id"])

    async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
        secret = decrypt(self.connection.webhook_secret_encrypted)
        return GiteaWebhookValidator.verify_signature(payload, headers, secret)

Sync Engine

# app/services/sync/sync_engine.py
from app.services.sync.providers.factory import ProviderFactory
from app.services.sync.conflict_resolver import ConflictResolver
from app.models.issue_sync import IssueSyncLog, SyncOutbox

class SyncEngine:
    """Orchestrates synchronization between Syndarix and external trackers."""

    def __init__(self, db: AsyncSession):
        self.db = db
        self.conflict_resolver = ConflictResolver()

    async def sync_inbound(
        self,
        connection: ExternalConnection,
        external_issue: CanonicalIssue
    ) -> Issue:
        """Sync an issue from external tracker to Syndarix."""

        # Find existing local issue
        local_issue = await self._find_by_external_id(
            connection.project_id,
            external_issue.provider,
            external_issue.external_id
        )

        if local_issue is None:
            # Create new local issue
            local_issue = await self._create_local_issue(
                connection.project_id,
                external_issue
            )
            await self._log_sync(
                local_issue,
                direction="inbound",
                event_type="created",
                status="success"
            )
        else:
            # Check for conflicts
            conflict_result = self.conflict_resolver.check(
                local_issue.version_vector,
                external_issue.raw_data.get("_version_vector", {}),
                local_issue.updated_at,
                external_issue.updated_at
            )

            if conflict_result.has_conflict:
                # Apply resolution strategy
                resolved = self.conflict_resolver.resolve(
                    local_issue,
                    external_issue,
                    conflict_result
                )
                await self._update_local_issue(local_issue, resolved)
                await self._log_sync(
                    local_issue,
                    direction="inbound",
                    event_type="conflict_resolved",
                    status="success",
                    conflict_details=conflict_result.to_dict()
                )
            else:
                # Normal update
                await self._update_local_issue(local_issue, external_issue)
                await self._log_sync(
                    local_issue,
                    direction="inbound",
                    event_type="updated",
                    status="success"
                )

        return local_issue

    async def sync_outbound(
        self,
        connection: ExternalConnection,
        local_issue: Issue
    ):
        """Queue a local issue for sync to external tracker."""

        provider = ProviderFactory.get_provider(connection)
        canonical = self._to_canonical(local_issue)

        outbox_entry = SyncOutbox(
            issue_id=local_issue.id,
            project_id=connection.project_id,
            provider=connection.provider,
            operation="update" if local_issue.external_id else "create",
            payload=canonical.dict(),
            idempotency_key=f"{local_issue.id}:{local_issue.updated_at.isoformat()}"
        )

        self.db.add(outbox_entry)
        await self.db.commit()

    async def initial_import(
        self,
        connection: ExternalConnection,
        since: datetime = None,
        labels: list[str] = None
    ) -> int:
        """Import all issues from external tracker."""

        provider = ProviderFactory.get_provider(connection)
        imported = 0

        async for external_issue in provider.list_issues(
            state="all",
            since=since,
            labels=labels
        ):
            await self.sync_inbound(connection, external_issue)
            imported += 1

        connection.last_sync_at = datetime.utcnow()
        await self.db.commit()

        return imported

    async def reconcile(self, connection: ExternalConnection):
        """
        Periodic reconciliation to catch missed webhooks.
        Runs via Celery Beat every 15-30 minutes.
        """
        since = connection.last_sync_at or (datetime.utcnow() - timedelta(hours=24))

        provider = ProviderFactory.get_provider(connection)

        async for external_issue in provider.list_issues(
            state="all",
            since=since
        ):
            local_issue = await self._find_by_external_id(
                connection.project_id,
                external_issue.provider,
                external_issue.external_id
            )

            if local_issue:
                # Check if external is newer
                if external_issue.updated_at > local_issue.external_updated_at:
                    await self.sync_inbound(connection, external_issue)
            else:
                await self.sync_inbound(connection, external_issue)

        connection.last_sync_at = datetime.utcnow()
        await self.db.commit()

Error Handling & Recovery

Error Categories and Handling

Error Type Handling Strategy Retry Alert
Network timeout Exponential backoff Yes (3x) After 3 failures
Rate limit (429) Wait for reset Yes No
Auth error (401/403) Mark connection as error No Yes
Not found (404) Mark issue as deleted No No
Conflict (409) Apply resolution strategy No If unresolved
Server error (5xx) Exponential backoff Yes (5x) After 5 failures
Validation error Log and skip No Yes

Retry Strategy

# app/services/sync/retry.py
import asyncio
from functools import wraps

def with_retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    retryable_exceptions: tuple = (httpx.TimeoutException, httpx.NetworkError)
):
    """Decorator for retry with exponential backoff."""

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        delay = min(
                            base_delay * (exponential_base ** attempt),
                            max_delay
                        )
                        await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

Dead Letter Queue Handling

# app/workers/dead_letter_worker.py

@celery_app.task
def process_dead_letter_queue():
    """Process failed sync items for manual review or retry."""

    dead_items = db.query(SyncOutbox).filter(
        SyncOutbox.status == "dead_letter",
        SyncOutbox.created_at > datetime.utcnow() - timedelta(days=7)
    ).all()

    for item in dead_items:
        # Create notification for admin review
        notify_admin(
            f"Sync failed for issue {item.issue_id}",
            details={
                "provider": item.provider,
                "operation": item.operation,
                "attempts": item.attempts,
                "last_error": item.last_error,
            }
        )

        # Optionally attempt one more retry with manual intervention
        if should_retry(item):
            item.status = "pending"
            item.attempts = 0
            item.scheduled_at = datetime.utcnow()

Health Monitoring

# app/services/sync/health.py

async def check_sync_health(project_id: str) -> dict:
    """Check sync health for a project."""

    connections = await get_connections(project_id)

    health = {
        "status": "healthy",
        "connections": [],
        "pending_syncs": 0,
        "failed_syncs_24h": 0,
        "conflicts_24h": 0,
    }

    for conn in connections:
        conn_health = {
            "provider": conn.provider,
            "status": conn.status,
            "last_sync": conn.last_sync_at,
            "webhook_active": conn.webhook_active,
        }

        # Check if sync is stale
        if conn.last_sync_at:
            stale_threshold = datetime.utcnow() - timedelta(hours=1)
            if conn.last_sync_at < stale_threshold:
                conn_health["status"] = "stale"
                health["status"] = "degraded"

        health["connections"].append(conn_health)

    # Count pending and failed
    health["pending_syncs"] = await count_pending_syncs(project_id)
    health["failed_syncs_24h"] = await count_failed_syncs(project_id, hours=24)
    health["conflicts_24h"] = await count_conflicts(project_id, hours=24)

    if health["failed_syncs_24h"] > 10:
        health["status"] = "unhealthy"

    return health

Implementation Roadmap

Phase 1: Foundation (Week 1-2)

  • Database schema and migrations
  • Core models (ExternalConnection, IssueSyncLog, SyncOutbox)
  • Provider interface and Gitea implementation
  • Canonical issue model and field mapping

Phase 2: Inbound Sync (Week 2-3)

  • Webhook endpoint and signature validation
  • Redis queue for webhook events
  • Celery worker for event processing
  • Initial import functionality
  • Basic conflict detection

Phase 3: Outbound Sync (Week 3-4)

  • Outbox pattern implementation
  • Outbound sync worker
  • Retry and dead letter queue
  • Bidirectional sync testing

Phase 4: GitHub & GitLab (Week 4-5)

  • GitHub provider implementation
  • GitLab provider implementation
  • Provider factory and dynamic selection
  • Cross-provider field mapping

Phase 5: Conflict Resolution (Week 5-6)

  • Version vector implementation
  • Conflict resolution strategies
  • Merge algorithms for complex fields
  • Conflict notification UI

Phase 6: Production Readiness (Week 6-7)

  • Health monitoring and alerting
  • Admin UI for connection management
  • Comprehensive test coverage
  • Performance optimization
  • Documentation

References

Research Sources

API Documentation


Decision

Adopt a webhook-first, polling-fallback synchronization architecture with:

  1. Last-Writer-Wins (LWW) conflict resolution using version vectors
  2. External tracker as source of truth with local mirrors
  3. Unified provider interface abstracting Gitea, GitHub, GitLab differences
  4. Outbox pattern for reliable outbound sync
  5. Redis Streams for webhook event queuing and deduplication
  6. Celery Beat for periodic reconciliation

This approach balances real-time responsiveness with eventual consistency, providing a robust foundation for bidirectional issue synchronization.


Spike completed. Findings will inform ADR-009: Issue Synchronization Architecture.