forked from cardosofelipe/fast-next-template
Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1495 lines
55 KiB
Markdown
1495 lines
55 KiB
Markdown
# SPIKE-009: Issue Synchronization with External Trackers
|
|
|
|
**Status:** Completed
|
|
**Date:** 2025-12-29
|
|
**Author:** Architecture Team
|
|
**Related Issue:** #9
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
This spike researches bi-directional issue synchronization between Syndarix and external issue trackers (Gitea, GitHub, GitLab). After analyzing sync patterns, conflict resolution strategies, and API capabilities of each platform, we recommend:
|
|
|
|
**Primary Recommendation:** Implement a **webhook-first, polling-fallback** architecture with **Last-Writer-Wins (LWW)** conflict resolution using vector clocks for causality tracking. External trackers serve as the **source of truth** with Syndarix maintaining local mirrors for unified agent access.
|
|
|
|
**Key Decisions:**
|
|
1. Use webhooks for real-time sync; polling for reconciliation and initial import
|
|
2. Implement version vectors for conflict detection with LWW resolution
|
|
3. Store sync metadata in dedicated `issue_sync_log` table for audit and recovery
|
|
4. Abstract provider differences behind a unified `IssueProvider` interface
|
|
5. Use Redis for webhook event queuing and deduplication
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
|
|
1. [Research Questions & Answers](#research-questions--answers)
|
|
2. [Sync Architecture](#sync-architecture)
|
|
3. [Conflict Resolution Strategy](#conflict-resolution-strategy)
|
|
4. [Webhook Handling Design](#webhook-handling-design)
|
|
5. [Provider API Comparison](#provider-api-comparison)
|
|
6. [Database Schema](#database-schema)
|
|
7. [Field Mapping Specification](#field-mapping-specification)
|
|
8. [Code Examples](#code-examples)
|
|
9. [Error Handling & Recovery](#error-handling--recovery)
|
|
10. [Implementation Roadmap](#implementation-roadmap)
|
|
11. [References](#references)
|
|
|
|
---
|
|
|
|
## Research Questions & Answers
|
|
|
|
### 1. Best patterns for bi-directional data sync?
|
|
|
|
**Answer:** The **Hub-and-Spoke** pattern with Syndarix as the hub is recommended. This pattern:
|
|
- Designates external trackers as authoritative sources
|
|
- Maintains local mirrors for unified access
|
|
- Synchronizes changes bidirectionally through a central sync engine
|
|
- Uses explicit ownership rules per field to prevent conflicts
|
|
|
|
Modern implementations leverage **Conflict-Free Replicated Data Types (CRDTs)** for automatic conflict resolution, but for issue tracking where human review may be desired, **version vectors with LWW** provides better control.
|
|
|
|
### 2. Handling sync conflicts (edited in both places)?
|
|
|
|
**Answer:** Implement a **tiered conflict resolution strategy**:
|
|
|
|
| Scenario | Resolution |
|
|
|----------|------------|
|
|
| Same field, different times | Last-Writer-Wins (LWW) |
|
|
| Same field, concurrent edits | Mark as conflict, notify user |
|
|
| Different fields | Merge both changes |
|
|
| Delete vs Update | Delete wins (configurable) |
|
|
|
|
Use **version vectors** to detect concurrent modifications. Each system maintains a version counter, and conflicts are identified when neither version dominates the other.
|
|
|
|
### 3. Webhook vs polling strategies?
|
|
|
|
**Answer:** **Hybrid approach** - webhooks primary, polling secondary.
|
|
|
|
| Strategy | Use Case | Latency | Resource Cost |
|
|
|----------|----------|---------|---------------|
|
|
| Webhooks | Real-time sync | <1s | Low |
|
|
| Polling | Initial import, reconciliation, fallback | Minutes | Medium |
|
|
| On-demand | User-triggered refresh | Immediate | Minimal |
|
|
|
|
**Rationale:** Webhooks provide real-time updates but may miss events during outages. Periodic polling (every 15-30 minutes) ensures eventual consistency.
|
|
|
|
### 4. Rate limiting and API quota management?
|
|
|
|
**Answer:** Implement a **token bucket with adaptive throttling**:
|
|
|
|
| Provider | Auth Rate Limit | Unauthenticated |
|
|
|----------|-----------------|-----------------|
|
|
| GitHub | 5,000/hour | 60/hour |
|
|
| GitLab | 600/minute | 10/minute |
|
|
| Gitea | Configurable (default: 50 items/response) | N/A |
|
|
|
|
**Strategies:**
|
|
- Use conditional requests (`If-None-Match`, `If-Modified-Since`) to avoid counting unchanged responses
|
|
- Implement exponential backoff on 429/403 responses
|
|
- Cache responses with ETags
|
|
- Batch operations where possible
|
|
- Monitor `X-RateLimit-Remaining` headers
|
|
|
|
### 5. Eventual consistency vs strong consistency tradeoffs?
|
|
|
|
**Answer:** **Eventual consistency** is acceptable and recommended for issue sync.
|
|
|
|
| Consistency | Pros | Cons |
|
|
|-------------|------|------|
|
|
| **Strong** | Always accurate | Higher latency, complex implementation |
|
|
| **Eventual** | Better performance, simpler | Temporary inconsistency |
|
|
|
|
**Rationale:** Issue tracking tolerates brief inconsistency windows (seconds to minutes). Users can manually refresh if needed. The simplicity and performance gains outweigh the drawbacks.
|
|
|
|
### 6. How to map different field schemas?
|
|
|
|
**Answer:** Use a **canonical field model** with provider-specific adapters.
|
|
|
|
```
|
|
External Field → Provider Adapter → Canonical Model → Local Storage
|
|
Local Field → Canonical Model → Provider Adapter → External Field
|
|
```
|
|
|
|
See [Field Mapping Specification](#field-mapping-specification) for detailed mappings.
|
|
|
|
### 7. Handling offline/disconnected scenarios?
|
|
|
|
**Answer:** Implement an **outbox pattern** with retry queue:
|
|
|
|
1. Queue all outgoing changes in local `sync_outbox` table
|
|
2. Background worker processes queue with exponential backoff
|
|
3. Mark items as `pending`, `in_progress`, `completed`, or `failed`
|
|
4. Dead letter queue for items exceeding max retries
|
|
5. Manual reconciliation UI for failed items
|
|
|
|
---
|
|
|
|
## Sync Architecture
|
|
|
|
### High-Level Architecture Diagram
|
|
|
|
```
|
|
External Issue Trackers
|
|
┌─────────────────┬─────────────────┬─────────────────┐
|
|
│ Gitea │ GitHub │ GitLab │
|
|
│ (Primary) │ │ │
|
|
└────────┬────────┴────────┬────────┴────────┬────────┘
|
|
│ │ │
|
|
┌────────┴─────────────────┴─────────────────┴────────┐
|
|
│ Webhooks │
|
|
└─────────────────────────┬───────────────────────────┘
|
|
│
|
|
┌─────────────────────────────────────────────────┼─────────────────────────────────────────────┐
|
|
│ Syndarix Backend │
|
|
│ ┌────────────────────┐ ┌────────────┴───────────┐ ┌────────────────────┐ │
|
|
│ │ Webhook Handler │◀──────────│ Redis Queue │──────────▶│ Polling Worker │ │
|
|
│ │ (FastAPI Route) │ │ (Event Dedup/Buffer) │ │ (Celery Beat) │ │
|
|
│ └─────────┬──────────┘ └────────────────────────┘ └─────────┬──────────┘ │
|
|
│ │ │ │
|
|
│ └──────────────────────────┬──────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ┌────────────┴────────────┐ │
|
|
│ │ Sync Engine │ │
|
|
│ │ ┌──────────────────┐ │ │
|
|
│ │ │ Provider Factory │ │ │
|
|
│ │ │ ┌─────────────┐ │ │ │
|
|
│ │ │ │GiteaProvider│ │ │ │
|
|
│ │ │ │GitHubProvider│ │ │ │
|
|
│ │ │ │GitLabProvider│ │ │ │
|
|
│ │ │ └─────────────┘ │ │ │
|
|
│ │ └──────────────────┘ │ │
|
|
│ │ ┌──────────────────┐ │ │
|
|
│ │ │Conflict Resolver │ │ │
|
|
│ │ └──────────────────┘ │ │
|
|
│ │ ┌──────────────────┐ │ │
|
|
│ │ │ Field Mapper │ │ │
|
|
│ │ └──────────────────┘ │ │
|
|
│ └────────────┬────────────┘ │
|
|
│ │ │
|
|
│ ┌────────────┴────────────┐ │
|
|
│ │ PostgreSQL │ │
|
|
│ │ ┌──────────────────┐ │ │
|
|
│ │ │ issues │ │ │
|
|
│ │ │ issue_sync_log │ │ │
|
|
│ │ │ sync_outbox │ │ │
|
|
│ │ │ external_links │ │ │
|
|
│ │ └──────────────────┘ │ │
|
|
│ └─────────────────────────┘ │
|
|
└───────────────────────────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### Component Responsibilities
|
|
|
|
| Component | Responsibility |
|
|
|-----------|----------------|
|
|
| **Webhook Handler** | Receive, validate, and queue incoming webhooks |
|
|
| **Redis Queue** | Buffer events, deduplicate, handle backpressure |
|
|
| **Polling Worker** | Periodic reconciliation, initial import, fallback |
|
|
| **Sync Engine** | Orchestrate sync operations, apply business logic |
|
|
| **Provider Factory** | Create provider-specific clients |
|
|
| **Conflict Resolver** | Detect and resolve sync conflicts |
|
|
| **Field Mapper** | Transform between canonical and provider schemas |
|
|
|
|
### Data Flow
|
|
|
|
**Inbound (External to Syndarix):**
|
|
```
|
|
1. Webhook received → Validate signature → Queue in Redis
|
|
2. Worker dequeues → Parse payload → Transform to canonical model
|
|
3. Check for conflicts → Apply resolution strategy
|
|
4. Update local database → Log sync operation
|
|
5. Notify connected clients via SSE (per SPIKE-003)
|
|
```
|
|
|
|
**Outbound (Syndarix to External):**
|
|
```
|
|
1. Local change detected → Transform to provider format
|
|
2. Queue in sync_outbox → Worker picks up
|
|
3. Call external API → Handle response
|
|
4. Update sync metadata → Mark as synced
|
|
5. Handle failures → Retry with backoff
|
|
```
|
|
|
|
---
|
|
|
|
## Conflict Resolution Strategy
|
|
|
|
### Version Vector Implementation
|
|
|
|
Each issue maintains a version vector tracking modifications across systems:
|
|
|
|
```python
|
|
# Version vector structure
|
|
{
|
|
"syndarix": 5, # Local modification count
|
|
"gitea": 3, # Gitea modification count
|
|
"github": 0, # Not synced with GitHub
|
|
"gitlab": 2 # GitLab modification count
|
|
}
|
|
```
|
|
|
|
### Conflict Detection Algorithm
|
|
|
|
```python
|
|
def detect_conflict(local_version: dict, remote_version: dict) -> str:
|
|
"""
|
|
Compare version vectors to detect conflicts.
|
|
|
|
Returns:
|
|
- "local_wins": Local version dominates
|
|
- "remote_wins": Remote version dominates
|
|
- "conflict": Concurrent modification (neither dominates)
|
|
- "equal": No changes
|
|
"""
|
|
local_dominates = all(
|
|
local_version.get(k, 0) >= v
|
|
for k, v in remote_version.items()
|
|
)
|
|
remote_dominates = all(
|
|
remote_version.get(k, 0) >= v
|
|
for k, v in local_version.items()
|
|
)
|
|
|
|
if local_version == remote_version:
|
|
return "equal"
|
|
elif local_dominates and not remote_dominates:
|
|
return "local_wins"
|
|
elif remote_dominates and not local_dominates:
|
|
return "remote_wins"
|
|
else:
|
|
return "conflict"
|
|
```
|
|
|
|
### Resolution Strategies
|
|
|
|
```python
|
|
from enum import Enum
|
|
|
|
class ConflictStrategy(str, Enum):
|
|
REMOTE_WINS = "remote_wins" # External tracker is source of truth
|
|
LOCAL_WINS = "local_wins" # Syndarix changes take precedence
|
|
LAST_WRITE_WINS = "lww" # Most recent timestamp wins
|
|
MANUAL = "manual" # Flag for human review
|
|
MERGE = "merge" # Attempt field-level merge
|
|
|
|
# Default strategy per field
|
|
FIELD_STRATEGIES = {
|
|
"title": ConflictStrategy.LAST_WRITE_WINS,
|
|
"description": ConflictStrategy.MERGE,
|
|
"status": ConflictStrategy.REMOTE_WINS,
|
|
"assignees": ConflictStrategy.MERGE,
|
|
"labels": ConflictStrategy.MERGE,
|
|
"comments": ConflictStrategy.MERGE, # Append both
|
|
"priority": ConflictStrategy.REMOTE_WINS,
|
|
}
|
|
```
|
|
|
|
### Merge Algorithm for Complex Fields
|
|
|
|
```python
|
|
def merge_labels(local: list[str], remote: list[str], base: list[str]) -> list[str]:
|
|
"""Three-way merge for labels."""
|
|
local_added = set(local) - set(base)
|
|
local_removed = set(base) - set(local)
|
|
remote_added = set(remote) - set(base)
|
|
remote_removed = set(base) - set(remote)
|
|
|
|
result = set(base)
|
|
result |= local_added | remote_added
|
|
result -= local_removed | remote_removed
|
|
|
|
return sorted(result)
|
|
```
|
|
|
|
---
|
|
|
|
## Webhook Handling Design
|
|
|
|
### Webhook Endpoint Architecture
|
|
|
|
```python
|
|
# app/api/v1/webhooks/issues.py
|
|
from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
|
|
from app.services.sync.webhook_handler import WebhookHandler
|
|
from app.core.redis import redis_client
|
|
import hashlib
|
|
import hmac
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/webhooks/{provider}/{project_id}")
|
|
async def receive_webhook(
|
|
provider: str,
|
|
project_id: str,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks
|
|
):
|
|
"""
|
|
Unified webhook endpoint for all providers.
|
|
|
|
Path: /api/v1/webhooks/{provider}/{project_id}
|
|
Providers: gitea, github, gitlab
|
|
"""
|
|
body = await request.body()
|
|
headers = dict(request.headers)
|
|
|
|
# Validate webhook signature
|
|
handler = WebhookHandler.get_handler(provider)
|
|
if not handler.verify_signature(body, headers):
|
|
raise HTTPException(status_code=401, detail="Invalid signature")
|
|
|
|
# Parse event type
|
|
event_type = handler.get_event_type(headers)
|
|
if event_type not in handler.supported_events:
|
|
return {"status": "ignored", "reason": "unsupported_event"}
|
|
|
|
# Deduplicate using event ID
|
|
event_id = handler.get_event_id(headers, body)
|
|
if await is_duplicate(event_id):
|
|
return {"status": "duplicate"}
|
|
|
|
# Queue for processing
|
|
await redis_client.xadd(
|
|
f"webhooks:{project_id}",
|
|
{
|
|
"provider": provider,
|
|
"event_type": event_type,
|
|
"payload": body,
|
|
"received_at": datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
return {"status": "queued", "event_id": event_id}
|
|
|
|
|
|
async def is_duplicate(event_id: str, ttl: int = 3600) -> bool:
|
|
"""Check if event was already processed (Redis-based dedup)."""
|
|
key = f"webhook:processed:{event_id}"
|
|
result = await redis_client.set(key, "1", ex=ttl, nx=True)
|
|
return result is None # None means key existed
|
|
```
|
|
|
|
### Provider-Specific Signature Validation
|
|
|
|
```python
|
|
# app/services/sync/webhook_validators.py
|
|
|
|
class GiteaWebhookValidator:
|
|
@staticmethod
|
|
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
|
|
"""Gitea uses X-Gitea-Signature with HMAC-SHA256."""
|
|
signature = headers.get("x-gitea-signature", "")
|
|
expected = hmac.new(
|
|
secret.encode(),
|
|
body,
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
return hmac.compare_digest(signature, expected)
|
|
|
|
@staticmethod
|
|
def get_event_type(headers: dict) -> str:
|
|
return headers.get("x-gitea-event", "")
|
|
|
|
|
|
class GitHubWebhookValidator:
|
|
@staticmethod
|
|
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
|
|
"""GitHub uses X-Hub-Signature-256 with sha256=HMAC."""
|
|
signature = headers.get("x-hub-signature-256", "")
|
|
if not signature.startswith("sha256="):
|
|
return False
|
|
expected = "sha256=" + hmac.new(
|
|
secret.encode(),
|
|
body,
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
return hmac.compare_digest(signature, expected)
|
|
|
|
@staticmethod
|
|
def get_event_type(headers: dict) -> str:
|
|
return headers.get("x-github-event", "")
|
|
|
|
|
|
class GitLabWebhookValidator:
|
|
@staticmethod
|
|
def verify_signature(body: bytes, headers: dict, secret: str) -> bool:
|
|
"""GitLab uses X-Gitlab-Token for simple token matching."""
|
|
token = headers.get("x-gitlab-token", "")
|
|
return hmac.compare_digest(token, secret)
|
|
|
|
@staticmethod
|
|
def get_event_type(headers: dict) -> str:
|
|
return headers.get("x-gitlab-event", "")
|
|
```
|
|
|
|
### Webhook Event Processing (Celery Worker)
|
|
|
|
```python
|
|
# app/workers/sync_worker.py
|
|
from celery import Celery
|
|
from app.services.sync.sync_engine import SyncEngine
|
|
from app.core.redis import redis_client
|
|
|
|
celery_app = Celery("syndarix")
|
|
|
|
@celery_app.task(bind=True, max_retries=3)
|
|
def process_webhook_event(self, project_id: str):
|
|
"""Process queued webhook events for a project."""
|
|
try:
|
|
# Read from Redis stream
|
|
events = redis_client.xread(
|
|
{f"webhooks:{project_id}": "0"},
|
|
count=10,
|
|
block=5000
|
|
)
|
|
|
|
sync_engine = SyncEngine()
|
|
|
|
for stream_name, messages in events:
|
|
for message_id, data in messages:
|
|
try:
|
|
sync_engine.process_inbound_event(
|
|
provider=data["provider"],
|
|
event_type=data["event_type"],
|
|
payload=data["payload"]
|
|
)
|
|
# Acknowledge processed
|
|
redis_client.xdel(stream_name, message_id)
|
|
except Exception as e:
|
|
log.error(f"Failed to process {message_id}: {e}")
|
|
# Will retry on next run
|
|
|
|
except Exception as exc:
|
|
self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
|
|
```
|
|
|
|
---
|
|
|
|
## Provider API Comparison
|
|
|
|
### Issue Field Support Matrix
|
|
|
|
| Field | Gitea | GitHub | GitLab | Syndarix Canonical |
|
|
|-------|-------|--------|--------|-------------------|
|
|
| ID | `id` (int) | `id` (int) | `id` (int) | `external_id` (str) |
|
|
| Number | `number` | `number` | `iid` | `external_number` |
|
|
| Title | `title` | `title` | `title` | `title` |
|
|
| Body | `body` | `body` | `description` | `description` |
|
|
| State | `state` (open/closed) | `state` (open/closed) | `state` (opened/closed) | `status` (enum) |
|
|
| Assignees | `assignees[]` | `assignees[]` | `assignees[]` | `assignee_ids[]` |
|
|
| Labels | `labels[].name` | `labels[].name` | `labels[]` (strings) | `labels[]` |
|
|
| Milestone | `milestone.title` | `milestone.title` | `milestone.title` | `milestone` |
|
|
| Created | `created_at` | `created_at` | `created_at` | `created_at` |
|
|
| Updated | `updated_at` | `updated_at` | `updated_at` | `updated_at` |
|
|
| Due Date | `due_date` | N/A | `due_date` | `due_date` |
|
|
| Priority | N/A (via labels) | N/A (via labels) | N/A (via labels) | `priority` |
|
|
| URL | `html_url` | `html_url` | `web_url` | `remote_url` |
|
|
|
|
### Webhook Event Mapping
|
|
|
|
| Action | Gitea Event | GitHub Event | GitLab Event |
|
|
|--------|-------------|--------------|--------------|
|
|
| Create | `issues:opened` | `issues:opened` | `Issue Hook:open` |
|
|
| Update | `issues:edited` | `issues:edited` | `Issue Hook:update` |
|
|
| Close | `issues:closed` | `issues:closed` | `Issue Hook:close` |
|
|
| Reopen | `issues:reopened` | `issues:reopened` | `Issue Hook:reopen` |
|
|
| Assign | `issues:assigned` | `issues:assigned` | `Issue Hook:update` |
|
|
| Label | `issues:label_updated` | `issues:labeled` | `Issue Hook:update` |
|
|
| Comment | `issue_comment:created` | `issue_comment:created` | `Note Hook` |
|
|
|
|
### Rate Limits Comparison
|
|
|
|
| Provider | Authenticated | Pagination | Conditional Requests |
|
|
|----------|---------------|------------|---------------------|
|
|
| GitHub | 5,000/hour | 100/page max | ETag, If-Modified-Since |
|
|
| GitLab | 600/minute | 100/page max | ETag |
|
|
| Gitea | Configurable | 50/page default | Link header |
|
|
|
|
---
|
|
|
|
## Database Schema
|
|
|
|
### Core Tables
|
|
|
|
```sql
|
|
-- Extended issues table with sync metadata
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_id VARCHAR(255);
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_number INTEGER;
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS remote_url TEXT;
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider VARCHAR(50);
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS provider_repo_id VARCHAR(255);
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS sync_status VARCHAR(20) DEFAULT 'pending';
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS version_vector JSONB DEFAULT '{}';
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMP WITH TIME ZONE;
|
|
ALTER TABLE issues ADD COLUMN IF NOT EXISTS external_updated_at TIMESTAMP WITH TIME ZONE;
|
|
|
|
-- Sync status enum values: synced, pending, conflict, error
|
|
|
|
CREATE INDEX idx_issues_external_id ON issues(provider, external_id);
|
|
CREATE INDEX idx_issues_sync_status ON issues(sync_status);
|
|
```
|
|
|
|
```sql
|
|
-- Issue sync log for audit trail
|
|
CREATE TABLE issue_sync_log (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
|
|
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
|
|
|
|
-- Sync details
|
|
direction VARCHAR(10) NOT NULL, -- 'inbound' or 'outbound'
|
|
provider VARCHAR(50) NOT NULL,
|
|
event_type VARCHAR(100) NOT NULL,
|
|
|
|
-- Change tracking
|
|
previous_state JSONB,
|
|
new_state JSONB,
|
|
diff JSONB,
|
|
|
|
-- Conflict info
|
|
had_conflict BOOLEAN DEFAULT FALSE,
|
|
conflict_resolution VARCHAR(50),
|
|
conflict_details JSONB,
|
|
|
|
-- Status
|
|
status VARCHAR(20) NOT NULL, -- success, failed, skipped
|
|
error_message TEXT,
|
|
|
|
-- Timestamps
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
processed_at TIMESTAMP WITH TIME ZONE,
|
|
|
|
-- Webhook metadata
|
|
webhook_event_id VARCHAR(255),
|
|
webhook_delivery_id VARCHAR(255)
|
|
);
|
|
|
|
CREATE INDEX idx_sync_log_issue ON issue_sync_log(issue_id);
|
|
CREATE INDEX idx_sync_log_project ON issue_sync_log(project_id);
|
|
CREATE INDEX idx_sync_log_status ON issue_sync_log(status);
|
|
CREATE INDEX idx_sync_log_created ON issue_sync_log(created_at);
|
|
```
|
|
|
|
```sql
|
|
-- Outbox for pending outbound syncs
|
|
CREATE TABLE sync_outbox (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
issue_id UUID REFERENCES issues(id) ON DELETE CASCADE,
|
|
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
|
|
|
|
-- Sync target
|
|
provider VARCHAR(50) NOT NULL,
|
|
operation VARCHAR(20) NOT NULL, -- create, update, delete
|
|
payload JSONB NOT NULL,
|
|
|
|
-- Processing status
|
|
status VARCHAR(20) DEFAULT 'pending', -- pending, in_progress, completed, failed, dead_letter
|
|
attempts INTEGER DEFAULT 0,
|
|
max_attempts INTEGER DEFAULT 5,
|
|
last_error TEXT,
|
|
|
|
-- Scheduling
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
scheduled_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
processed_at TIMESTAMP WITH TIME ZONE,
|
|
|
|
-- Idempotency
|
|
idempotency_key VARCHAR(255) UNIQUE
|
|
);
|
|
|
|
CREATE INDEX idx_outbox_status ON sync_outbox(status, scheduled_at);
|
|
CREATE INDEX idx_outbox_issue ON sync_outbox(issue_id);
|
|
```
|
|
|
|
```sql
|
|
-- External provider connections
|
|
CREATE TABLE external_connections (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
|
|
organization_id UUID REFERENCES organizations(id) ON DELETE CASCADE,
|
|
|
|
-- Provider details
|
|
provider VARCHAR(50) NOT NULL,
|
|
provider_url TEXT NOT NULL, -- Base URL (e.g., https://gitea.example.com)
|
|
repo_owner VARCHAR(255) NOT NULL,
|
|
repo_name VARCHAR(255) NOT NULL,
|
|
|
|
-- Authentication
|
|
auth_type VARCHAR(20) NOT NULL, -- token, oauth, app
|
|
credentials_encrypted TEXT, -- Encrypted token/credentials
|
|
|
|
-- Webhook configuration
|
|
webhook_secret_encrypted TEXT,
|
|
webhook_id VARCHAR(255), -- ID from provider
|
|
webhook_active BOOLEAN DEFAULT TRUE,
|
|
|
|
-- Sync settings
|
|
sync_enabled BOOLEAN DEFAULT TRUE,
|
|
sync_direction VARCHAR(20) DEFAULT 'bidirectional', -- inbound, outbound, bidirectional
|
|
sync_labels_filter JSONB, -- Only sync issues with these labels
|
|
sync_milestones_filter JSONB,
|
|
|
|
-- Status tracking
|
|
last_sync_at TIMESTAMP WITH TIME ZONE,
|
|
last_error TEXT,
|
|
status VARCHAR(20) DEFAULT 'active', -- active, paused, error
|
|
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
|
|
UNIQUE(project_id, provider, repo_owner, repo_name)
|
|
);
|
|
|
|
CREATE INDEX idx_connections_project ON external_connections(project_id);
|
|
CREATE INDEX idx_connections_status ON external_connections(status);
|
|
```
|
|
|
|
### SQLAlchemy Models
|
|
|
|
```python
|
|
# app/models/issue_sync.py
|
|
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, JSON
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
|
from sqlalchemy.orm import relationship
|
|
from app.models.base import Base, TimestampMixin, UUIDMixin
|
|
import enum
|
|
|
|
class SyncStatus(str, enum.Enum):
|
|
SYNCED = "synced"
|
|
PENDING = "pending"
|
|
CONFLICT = "conflict"
|
|
ERROR = "error"
|
|
|
|
class SyncDirection(str, enum.Enum):
|
|
INBOUND = "inbound"
|
|
OUTBOUND = "outbound"
|
|
BIDIRECTIONAL = "bidirectional"
|
|
|
|
class Provider(str, enum.Enum):
|
|
GITEA = "gitea"
|
|
GITHUB = "github"
|
|
GITLAB = "gitlab"
|
|
|
|
|
|
class ExternalConnection(Base, UUIDMixin, TimestampMixin):
|
|
__tablename__ = "external_connections"
|
|
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
|
|
organization_id = Column(UUID(as_uuid=True), ForeignKey("organizations.id"))
|
|
|
|
provider = Column(String(50), nullable=False)
|
|
provider_url = Column(String, nullable=False)
|
|
repo_owner = Column(String(255), nullable=False)
|
|
repo_name = Column(String(255), nullable=False)
|
|
|
|
auth_type = Column(String(20), nullable=False)
|
|
credentials_encrypted = Column(String)
|
|
|
|
webhook_secret_encrypted = Column(String)
|
|
webhook_id = Column(String(255))
|
|
webhook_active = Column(Boolean, default=True)
|
|
|
|
sync_enabled = Column(Boolean, default=True)
|
|
sync_direction = Column(String(20), default="bidirectional")
|
|
sync_labels_filter = Column(JSONB)
|
|
sync_milestones_filter = Column(JSONB)
|
|
|
|
last_sync_at = Column(DateTime(timezone=True))
|
|
last_error = Column(String)
|
|
status = Column(String(20), default="active")
|
|
|
|
|
|
class IssueSyncLog(Base, UUIDMixin):
|
|
__tablename__ = "issue_sync_log"
|
|
|
|
issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))
|
|
|
|
direction = Column(String(10), nullable=False)
|
|
provider = Column(String(50), nullable=False)
|
|
event_type = Column(String(100), nullable=False)
|
|
|
|
previous_state = Column(JSONB)
|
|
new_state = Column(JSONB)
|
|
diff = Column(JSONB)
|
|
|
|
had_conflict = Column(Boolean, default=False)
|
|
conflict_resolution = Column(String(50))
|
|
conflict_details = Column(JSONB)
|
|
|
|
status = Column(String(20), nullable=False)
|
|
error_message = Column(String)
|
|
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
processed_at = Column(DateTime(timezone=True))
|
|
|
|
webhook_event_id = Column(String(255))
|
|
webhook_delivery_id = Column(String(255))
|
|
|
|
|
|
class SyncOutbox(Base, UUIDMixin):
|
|
__tablename__ = "sync_outbox"
|
|
|
|
issue_id = Column(UUID(as_uuid=True), ForeignKey("issues.id", ondelete="CASCADE"))
|
|
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"))
|
|
|
|
provider = Column(String(50), nullable=False)
|
|
operation = Column(String(20), nullable=False)
|
|
payload = Column(JSONB, nullable=False)
|
|
|
|
status = Column(String(20), default="pending")
|
|
attempts = Column(Integer, default=0)
|
|
max_attempts = Column(Integer, default=5)
|
|
last_error = Column(String)
|
|
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
scheduled_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
processed_at = Column(DateTime(timezone=True))
|
|
|
|
idempotency_key = Column(String(255), unique=True)
|
|
```
|
|
|
|
---
|
|
|
|
## Field Mapping Specification
|
|
|
|
### Canonical Issue Model
|
|
|
|
```python
|
|
# app/schemas/sync/canonical.py
|
|
from pydantic import BaseModel, Field
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from enum import Enum
|
|
|
|
class IssueStatus(str, Enum):
|
|
OPEN = "open"
|
|
CLOSED = "closed"
|
|
IN_PROGRESS = "in_progress" # Syndarix-only
|
|
|
|
class CanonicalIssue(BaseModel):
|
|
"""Canonical issue representation for sync operations."""
|
|
|
|
# Identity
|
|
external_id: str
|
|
external_number: int
|
|
remote_url: str
|
|
|
|
# Core fields
|
|
title: str
|
|
description: Optional[str] = None
|
|
status: IssueStatus
|
|
|
|
# Relationships
|
|
assignee_ids: list[str] = Field(default_factory=list)
|
|
assignee_usernames: list[str] = Field(default_factory=list)
|
|
labels: list[str] = Field(default_factory=list)
|
|
milestone: Optional[str] = None
|
|
|
|
# Metadata
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
closed_at: Optional[datetime] = None
|
|
due_date: Optional[datetime] = None
|
|
|
|
# Provider info
|
|
provider: str
|
|
raw_data: dict = Field(default_factory=dict) # Original payload
|
|
```
|
|
|
|
### Provider Adapters
|
|
|
|
```python
|
|
# app/services/sync/adapters/gitea.py
|
|
from app.schemas.sync.canonical import CanonicalIssue, IssueStatus
|
|
|
|
class GiteaAdapter:
|
|
"""Transform between Gitea API format and canonical model."""
|
|
|
|
@staticmethod
|
|
def to_canonical(gitea_issue: dict, base_url: str) -> CanonicalIssue:
|
|
return CanonicalIssue(
|
|
external_id=str(gitea_issue["id"]),
|
|
external_number=gitea_issue["number"],
|
|
remote_url=gitea_issue["html_url"],
|
|
title=gitea_issue["title"],
|
|
description=gitea_issue.get("body"),
|
|
status=IssueStatus.OPEN if gitea_issue["state"] == "open" else IssueStatus.CLOSED,
|
|
assignee_ids=[str(a["id"]) for a in gitea_issue.get("assignees", [])],
|
|
assignee_usernames=[a["login"] for a in gitea_issue.get("assignees", [])],
|
|
labels=[label["name"] for label in gitea_issue.get("labels", [])],
|
|
milestone=gitea_issue.get("milestone", {}).get("title"),
|
|
created_at=gitea_issue["created_at"],
|
|
updated_at=gitea_issue["updated_at"],
|
|
closed_at=gitea_issue.get("closed_at"),
|
|
due_date=gitea_issue.get("due_date"),
|
|
provider="gitea",
|
|
raw_data=gitea_issue
|
|
)
|
|
|
|
@staticmethod
|
|
def from_canonical(issue: CanonicalIssue) -> dict:
|
|
"""Convert canonical to Gitea API format for updates."""
|
|
return {
|
|
"title": issue.title,
|
|
"body": issue.description,
|
|
"state": "open" if issue.status == IssueStatus.OPEN else "closed",
|
|
"assignees": issue.assignee_usernames,
|
|
"labels": issue.labels,
|
|
"milestone": issue.milestone,
|
|
"due_date": issue.due_date.isoformat() if issue.due_date else None,
|
|
}
|
|
|
|
|
|
# app/services/sync/adapters/github.py
|
|
class GitHubAdapter:
|
|
"""Transform between GitHub API format and canonical model."""
|
|
|
|
@staticmethod
|
|
def to_canonical(github_issue: dict, base_url: str) -> CanonicalIssue:
|
|
return CanonicalIssue(
|
|
external_id=str(github_issue["id"]),
|
|
external_number=github_issue["number"],
|
|
remote_url=github_issue["html_url"],
|
|
title=github_issue["title"],
|
|
description=github_issue.get("body"),
|
|
status=IssueStatus.OPEN if github_issue["state"] == "open" else IssueStatus.CLOSED,
|
|
assignee_ids=[str(a["id"]) for a in github_issue.get("assignees", [])],
|
|
assignee_usernames=[a["login"] for a in github_issue.get("assignees", [])],
|
|
labels=[label["name"] for label in github_issue.get("labels", [])],
|
|
milestone=github_issue.get("milestone", {}).get("title") if github_issue.get("milestone") else None,
|
|
created_at=github_issue["created_at"],
|
|
updated_at=github_issue["updated_at"],
|
|
closed_at=github_issue.get("closed_at"),
|
|
due_date=None, # GitHub doesn't have native due dates
|
|
provider="github",
|
|
raw_data=github_issue
|
|
)
|
|
|
|
@staticmethod
|
|
def from_canonical(issue: CanonicalIssue) -> dict:
|
|
return {
|
|
"title": issue.title,
|
|
"body": issue.description,
|
|
"state": "open" if issue.status == IssueStatus.OPEN else "closed",
|
|
"assignees": issue.assignee_usernames,
|
|
"labels": issue.labels,
|
|
"milestone": issue.milestone,
|
|
}
|
|
|
|
|
|
# app/services/sync/adapters/gitlab.py
|
|
class GitLabAdapter:
|
|
"""Transform between GitLab API format and canonical model."""
|
|
|
|
@staticmethod
|
|
def to_canonical(gitlab_issue: dict, base_url: str) -> CanonicalIssue:
|
|
# GitLab uses 'opened' instead of 'open'
|
|
state = gitlab_issue["state"]
|
|
status = IssueStatus.OPEN if state == "opened" else IssueStatus.CLOSED
|
|
|
|
return CanonicalIssue(
|
|
external_id=str(gitlab_issue["id"]),
|
|
external_number=gitlab_issue["iid"], # GitLab uses iid for project-scoped number
|
|
remote_url=gitlab_issue["web_url"],
|
|
title=gitlab_issue["title"],
|
|
description=gitlab_issue.get("description"),
|
|
status=status,
|
|
assignee_ids=[str(a["id"]) for a in gitlab_issue.get("assignees", [])],
|
|
assignee_usernames=[a["username"] for a in gitlab_issue.get("assignees", [])],
|
|
labels=gitlab_issue.get("labels", []), # GitLab returns labels as strings
|
|
milestone=gitlab_issue.get("milestone", {}).get("title") if gitlab_issue.get("milestone") else None,
|
|
created_at=gitlab_issue["created_at"],
|
|
updated_at=gitlab_issue["updated_at"],
|
|
closed_at=gitlab_issue.get("closed_at"),
|
|
due_date=gitlab_issue.get("due_date"),
|
|
provider="gitlab",
|
|
raw_data=gitlab_issue
|
|
)
|
|
|
|
@staticmethod
|
|
def from_canonical(issue: CanonicalIssue) -> dict:
|
|
return {
|
|
"title": issue.title,
|
|
"description": issue.description,
|
|
"state_event": "reopen" if issue.status == IssueStatus.OPEN else "close",
|
|
"assignee_ids": issue.assignee_ids,
|
|
"labels": ",".join(issue.labels),
|
|
"milestone_id": issue.milestone, # Requires ID lookup
|
|
"due_date": issue.due_date.isoformat() if issue.due_date else None,
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Code Examples
|
|
|
|
### Provider Interface
|
|
|
|
```python
|
|
# app/services/sync/providers/base.py
|
|
from abc import ABC, abstractmethod
|
|
from typing import AsyncIterator
|
|
from app.schemas.sync.canonical import CanonicalIssue
|
|
|
|
class IssueProvider(ABC):
|
|
"""Abstract base class for issue tracker providers."""
|
|
|
|
def __init__(self, connection: ExternalConnection):
|
|
self.connection = connection
|
|
self.base_url = connection.provider_url
|
|
self.repo_owner = connection.repo_owner
|
|
self.repo_name = connection.repo_name
|
|
|
|
@abstractmethod
|
|
async def get_issue(self, issue_number: int) -> CanonicalIssue:
|
|
"""Fetch a single issue by number."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def list_issues(
|
|
self,
|
|
state: str = "all",
|
|
since: datetime = None,
|
|
labels: list[str] = None
|
|
) -> AsyncIterator[CanonicalIssue]:
|
|
"""List issues with optional filters."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
|
|
"""Create a new issue."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
|
|
"""Update an existing issue."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def add_comment(self, issue_number: int, body: str) -> dict:
|
|
"""Add a comment to an issue."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def setup_webhook(self, callback_url: str, secret: str) -> str:
|
|
"""Configure webhook for issue events. Returns webhook ID."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
|
|
"""Verify webhook signature."""
|
|
pass
|
|
```
|
|
|
|
### Gitea Provider Implementation
|
|
|
|
```python
|
|
# app/services/sync/providers/gitea.py
|
|
import httpx
|
|
from app.services.sync.providers.base import IssueProvider
|
|
from app.services.sync.adapters.gitea import GiteaAdapter
|
|
|
|
class GiteaProvider(IssueProvider):
|
|
"""Gitea issue tracker provider."""
|
|
|
|
def __init__(self, connection: ExternalConnection):
|
|
super().__init__(connection)
|
|
self.adapter = GiteaAdapter()
|
|
self._client = None
|
|
|
|
@property
|
|
def api_url(self) -> str:
|
|
return f"{self.base_url}/api/v1"
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
if self._client is None:
|
|
token = decrypt(self.connection.credentials_encrypted)
|
|
self._client = httpx.AsyncClient(
|
|
base_url=self.api_url,
|
|
headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
},
|
|
timeout=30.0
|
|
)
|
|
return self._client
|
|
|
|
async def get_issue(self, issue_number: int) -> CanonicalIssue:
|
|
client = await self._get_client()
|
|
response = await client.get(
|
|
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}"
|
|
)
|
|
response.raise_for_status()
|
|
return self.adapter.to_canonical(response.json(), self.base_url)
|
|
|
|
async def list_issues(
|
|
self,
|
|
state: str = "all",
|
|
since: datetime = None,
|
|
labels: list[str] = None
|
|
) -> AsyncIterator[CanonicalIssue]:
|
|
client = await self._get_client()
|
|
page = 1
|
|
|
|
while True:
|
|
params = {
|
|
"state": state,
|
|
"page": page,
|
|
"limit": 50, # Gitea default max
|
|
}
|
|
if since:
|
|
params["since"] = since.isoformat()
|
|
if labels:
|
|
params["labels"] = ",".join(labels)
|
|
|
|
response = await client.get(
|
|
f"/repos/{self.repo_owner}/{self.repo_name}/issues",
|
|
params=params
|
|
)
|
|
response.raise_for_status()
|
|
|
|
issues = response.json()
|
|
if not issues:
|
|
break
|
|
|
|
for issue in issues:
|
|
yield self.adapter.to_canonical(issue, self.base_url)
|
|
|
|
# Check pagination
|
|
link_header = response.headers.get("link", "")
|
|
if 'rel="next"' not in link_header:
|
|
break
|
|
page += 1
|
|
|
|
async def create_issue(self, issue: CanonicalIssue) -> CanonicalIssue:
|
|
client = await self._get_client()
|
|
payload = self.adapter.from_canonical(issue)
|
|
|
|
response = await client.post(
|
|
f"/repos/{self.repo_owner}/{self.repo_name}/issues",
|
|
json=payload
|
|
)
|
|
response.raise_for_status()
|
|
return self.adapter.to_canonical(response.json(), self.base_url)
|
|
|
|
async def update_issue(self, issue_number: int, issue: CanonicalIssue) -> CanonicalIssue:
|
|
client = await self._get_client()
|
|
payload = self.adapter.from_canonical(issue)
|
|
|
|
response = await client.patch(
|
|
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}",
|
|
json=payload
|
|
)
|
|
response.raise_for_status()
|
|
return self.adapter.to_canonical(response.json(), self.base_url)
|
|
|
|
async def add_comment(self, issue_number: int, body: str) -> dict:
|
|
client = await self._get_client()
|
|
response = await client.post(
|
|
f"/repos/{self.repo_owner}/{self.repo_name}/issues/{issue_number}/comments",
|
|
json={"body": body}
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def setup_webhook(self, callback_url: str, secret: str) -> str:
|
|
client = await self._get_client()
|
|
response = await client.post(
|
|
f"/repos/{self.repo_owner}/{self.repo_name}/hooks",
|
|
json={
|
|
"type": "gitea",
|
|
"active": True,
|
|
"events": ["issues", "issue_comment"],
|
|
"config": {
|
|
"url": callback_url,
|
|
"content_type": "json",
|
|
"secret": secret,
|
|
}
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
return str(response.json()["id"])
|
|
|
|
async def verify_webhook_signature(self, payload: bytes, headers: dict) -> bool:
|
|
secret = decrypt(self.connection.webhook_secret_encrypted)
|
|
return GiteaWebhookValidator.verify_signature(payload, headers, secret)
|
|
```
|
|
|
|
### Sync Engine
|
|
|
|
```python
|
|
# app/services/sync/sync_engine.py
|
|
from app.services.sync.providers.factory import ProviderFactory
|
|
from app.services.sync.conflict_resolver import ConflictResolver
|
|
from app.models.issue_sync import IssueSyncLog, SyncOutbox
|
|
|
|
class SyncEngine:
|
|
"""Orchestrates synchronization between Syndarix and external trackers."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
self.conflict_resolver = ConflictResolver()
|
|
|
|
async def sync_inbound(
|
|
self,
|
|
connection: ExternalConnection,
|
|
external_issue: CanonicalIssue
|
|
) -> Issue:
|
|
"""Sync an issue from external tracker to Syndarix."""
|
|
|
|
# Find existing local issue
|
|
local_issue = await self._find_by_external_id(
|
|
connection.project_id,
|
|
external_issue.provider,
|
|
external_issue.external_id
|
|
)
|
|
|
|
if local_issue is None:
|
|
# Create new local issue
|
|
local_issue = await self._create_local_issue(
|
|
connection.project_id,
|
|
external_issue
|
|
)
|
|
await self._log_sync(
|
|
local_issue,
|
|
direction="inbound",
|
|
event_type="created",
|
|
status="success"
|
|
)
|
|
else:
|
|
# Check for conflicts
|
|
conflict_result = self.conflict_resolver.check(
|
|
local_issue.version_vector,
|
|
external_issue.raw_data.get("_version_vector", {}),
|
|
local_issue.updated_at,
|
|
external_issue.updated_at
|
|
)
|
|
|
|
if conflict_result.has_conflict:
|
|
# Apply resolution strategy
|
|
resolved = self.conflict_resolver.resolve(
|
|
local_issue,
|
|
external_issue,
|
|
conflict_result
|
|
)
|
|
await self._update_local_issue(local_issue, resolved)
|
|
await self._log_sync(
|
|
local_issue,
|
|
direction="inbound",
|
|
event_type="conflict_resolved",
|
|
status="success",
|
|
conflict_details=conflict_result.to_dict()
|
|
)
|
|
else:
|
|
# Normal update
|
|
await self._update_local_issue(local_issue, external_issue)
|
|
await self._log_sync(
|
|
local_issue,
|
|
direction="inbound",
|
|
event_type="updated",
|
|
status="success"
|
|
)
|
|
|
|
return local_issue
|
|
|
|
async def sync_outbound(
|
|
self,
|
|
connection: ExternalConnection,
|
|
local_issue: Issue
|
|
):
|
|
"""Queue a local issue for sync to external tracker."""
|
|
|
|
provider = ProviderFactory.get_provider(connection)
|
|
canonical = self._to_canonical(local_issue)
|
|
|
|
outbox_entry = SyncOutbox(
|
|
issue_id=local_issue.id,
|
|
project_id=connection.project_id,
|
|
provider=connection.provider,
|
|
operation="update" if local_issue.external_id else "create",
|
|
payload=canonical.dict(),
|
|
idempotency_key=f"{local_issue.id}:{local_issue.updated_at.isoformat()}"
|
|
)
|
|
|
|
self.db.add(outbox_entry)
|
|
await self.db.commit()
|
|
|
|
async def initial_import(
|
|
self,
|
|
connection: ExternalConnection,
|
|
since: datetime = None,
|
|
labels: list[str] = None
|
|
) -> int:
|
|
"""Import all issues from external tracker."""
|
|
|
|
provider = ProviderFactory.get_provider(connection)
|
|
imported = 0
|
|
|
|
async for external_issue in provider.list_issues(
|
|
state="all",
|
|
since=since,
|
|
labels=labels
|
|
):
|
|
await self.sync_inbound(connection, external_issue)
|
|
imported += 1
|
|
|
|
connection.last_sync_at = datetime.utcnow()
|
|
await self.db.commit()
|
|
|
|
return imported
|
|
|
|
async def reconcile(self, connection: ExternalConnection):
|
|
"""
|
|
Periodic reconciliation to catch missed webhooks.
|
|
Runs via Celery Beat every 15-30 minutes.
|
|
"""
|
|
since = connection.last_sync_at or (datetime.utcnow() - timedelta(hours=24))
|
|
|
|
provider = ProviderFactory.get_provider(connection)
|
|
|
|
async for external_issue in provider.list_issues(
|
|
state="all",
|
|
since=since
|
|
):
|
|
local_issue = await self._find_by_external_id(
|
|
connection.project_id,
|
|
external_issue.provider,
|
|
external_issue.external_id
|
|
)
|
|
|
|
if local_issue:
|
|
# Check if external is newer
|
|
if external_issue.updated_at > local_issue.external_updated_at:
|
|
await self.sync_inbound(connection, external_issue)
|
|
else:
|
|
await self.sync_inbound(connection, external_issue)
|
|
|
|
connection.last_sync_at = datetime.utcnow()
|
|
await self.db.commit()
|
|
```
|
|
|
|
---
|
|
|
|
## Error Handling & Recovery
|
|
|
|
### Error Categories and Handling
|
|
|
|
| Error Type | Handling Strategy | Retry | Alert |
|
|
|------------|-------------------|-------|-------|
|
|
| Network timeout | Exponential backoff | Yes (3x) | After 3 failures |
|
|
| Rate limit (429) | Wait for reset | Yes | No |
|
|
| Auth error (401/403) | Mark connection as error | No | Yes |
|
|
| Not found (404) | Mark issue as deleted | No | No |
|
|
| Conflict (409) | Apply resolution strategy | No | If unresolved |
|
|
| Server error (5xx) | Exponential backoff | Yes (5x) | After 5 failures |
|
|
| Validation error | Log and skip | No | Yes |
|
|
|
|
### Retry Strategy
|
|
|
|
```python
|
|
# app/services/sync/retry.py
|
|
import asyncio
|
|
from functools import wraps
|
|
|
|
def with_retry(
|
|
max_attempts: int = 3,
|
|
base_delay: float = 1.0,
|
|
max_delay: float = 60.0,
|
|
exponential_base: float = 2.0,
|
|
retryable_exceptions: tuple = (httpx.TimeoutException, httpx.NetworkError)
|
|
):
|
|
"""Decorator for retry with exponential backoff."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
last_exception = None
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except retryable_exceptions as e:
|
|
last_exception = e
|
|
if attempt < max_attempts - 1:
|
|
delay = min(
|
|
base_delay * (exponential_base ** attempt),
|
|
max_delay
|
|
)
|
|
await asyncio.sleep(delay)
|
|
|
|
raise last_exception
|
|
|
|
return wrapper
|
|
return decorator
|
|
```
|
|
|
|
### Dead Letter Queue Handling
|
|
|
|
```python
|
|
# app/workers/dead_letter_worker.py
|
|
|
|
@celery_app.task
|
|
def process_dead_letter_queue():
|
|
"""Process failed sync items for manual review or retry."""
|
|
|
|
dead_items = db.query(SyncOutbox).filter(
|
|
SyncOutbox.status == "dead_letter",
|
|
SyncOutbox.created_at > datetime.utcnow() - timedelta(days=7)
|
|
).all()
|
|
|
|
for item in dead_items:
|
|
# Create notification for admin review
|
|
notify_admin(
|
|
f"Sync failed for issue {item.issue_id}",
|
|
details={
|
|
"provider": item.provider,
|
|
"operation": item.operation,
|
|
"attempts": item.attempts,
|
|
"last_error": item.last_error,
|
|
}
|
|
)
|
|
|
|
# Optionally attempt one more retry with manual intervention
|
|
if should_retry(item):
|
|
item.status = "pending"
|
|
item.attempts = 0
|
|
item.scheduled_at = datetime.utcnow()
|
|
```
|
|
|
|
### Health Monitoring
|
|
|
|
```python
|
|
# app/services/sync/health.py
|
|
|
|
async def check_sync_health(project_id: str) -> dict:
|
|
"""Check sync health for a project."""
|
|
|
|
connections = await get_connections(project_id)
|
|
|
|
health = {
|
|
"status": "healthy",
|
|
"connections": [],
|
|
"pending_syncs": 0,
|
|
"failed_syncs_24h": 0,
|
|
"conflicts_24h": 0,
|
|
}
|
|
|
|
for conn in connections:
|
|
conn_health = {
|
|
"provider": conn.provider,
|
|
"status": conn.status,
|
|
"last_sync": conn.last_sync_at,
|
|
"webhook_active": conn.webhook_active,
|
|
}
|
|
|
|
# Check if sync is stale
|
|
if conn.last_sync_at:
|
|
stale_threshold = datetime.utcnow() - timedelta(hours=1)
|
|
if conn.last_sync_at < stale_threshold:
|
|
conn_health["status"] = "stale"
|
|
health["status"] = "degraded"
|
|
|
|
health["connections"].append(conn_health)
|
|
|
|
# Count pending and failed
|
|
health["pending_syncs"] = await count_pending_syncs(project_id)
|
|
health["failed_syncs_24h"] = await count_failed_syncs(project_id, hours=24)
|
|
health["conflicts_24h"] = await count_conflicts(project_id, hours=24)
|
|
|
|
if health["failed_syncs_24h"] > 10:
|
|
health["status"] = "unhealthy"
|
|
|
|
return health
|
|
```
|
|
|
|
---
|
|
|
|
## Implementation Roadmap
|
|
|
|
### Phase 1: Foundation (Week 1-2)
|
|
- [ ] Database schema and migrations
|
|
- [ ] Core models (ExternalConnection, IssueSyncLog, SyncOutbox)
|
|
- [ ] Provider interface and Gitea implementation
|
|
- [ ] Canonical issue model and field mapping
|
|
|
|
### Phase 2: Inbound Sync (Week 2-3)
|
|
- [ ] Webhook endpoint and signature validation
|
|
- [ ] Redis queue for webhook events
|
|
- [ ] Celery worker for event processing
|
|
- [ ] Initial import functionality
|
|
- [ ] Basic conflict detection
|
|
|
|
### Phase 3: Outbound Sync (Week 3-4)
|
|
- [ ] Outbox pattern implementation
|
|
- [ ] Outbound sync worker
|
|
- [ ] Retry and dead letter queue
|
|
- [ ] Bidirectional sync testing
|
|
|
|
### Phase 4: GitHub & GitLab (Week 4-5)
|
|
- [ ] GitHub provider implementation
|
|
- [ ] GitLab provider implementation
|
|
- [ ] Provider factory and dynamic selection
|
|
- [ ] Cross-provider field mapping
|
|
|
|
### Phase 5: Conflict Resolution (Week 5-6)
|
|
- [ ] Version vector implementation
|
|
- [ ] Conflict resolution strategies
|
|
- [ ] Merge algorithms for complex fields
|
|
- [ ] Conflict notification UI
|
|
|
|
### Phase 6: Production Readiness (Week 6-7)
|
|
- [ ] Health monitoring and alerting
|
|
- [ ] Admin UI for connection management
|
|
- [ ] Comprehensive test coverage
|
|
- [ ] Performance optimization
|
|
- [ ] Documentation
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
### Research Sources
|
|
|
|
- [Two-Way Sync Tools 2025: Best Platforms for Real-Time Data Integration](https://www.stacksync.com/blog/2025-best-two-way-sync-tools-a-comprehensive-guide-for-data-integration) - StackSync
|
|
- [The Architect's Guide to Data Integration Patterns](https://medium.com/@prayagvakharia/the-architects-guide-to-data-integration-patterns-migration-broadcast-bi-directional-a4c92b5f908d) - Medium
|
|
- [System Design Pattern: Conflict Resolution in Distributed Systems](https://medium.com/@priyasrivastava18official/system-design-pattern-from-chaos-to-consistency-the-art-of-conflict-resolution-in-distributed-9d631028bdb4) - Medium
|
|
- [Eventual Consistency in Distributed Systems](https://www.geeksforgeeks.org/system-design/eventual-consistency-in-distributive-systems-learn-system-design/) - GeeksforGeeks
|
|
- [Bidirectional Synchronization: What It Is and Examples](https://www.workato.com/the-connector/bidirectional-synchronization/) - Workato
|
|
- [Data Integration Patterns: Bi-Directional Sync](https://blogs.mulesoft.com/api-integration/patterns/data-integration-patterns-bi-directional-sync/) - MuleSoft
|
|
|
|
### API Documentation
|
|
|
|
- [GitHub REST API - Issues](https://docs.github.com/en/rest/issues/issues)
|
|
- [GitHub Webhook Events](https://docs.github.com/en/webhooks/webhook-events-and-payloads)
|
|
- [GitLab Issues API](https://docs.gitlab.com/api/issues.html)
|
|
- [GitLab Webhooks](https://docs.gitlab.com/ee/user/project/integrations/webhooks.html)
|
|
- [Gitea API Usage](https://docs.gitea.com/development/api-usage)
|
|
|
|
### Related Syndarix Spikes
|
|
|
|
- [SPIKE-001: MCP Integration Pattern](./SPIKE-001-mcp-integration-pattern.md) - MCP architecture and FastMCP usage
|
|
- [SPIKE-003: Real-time Updates](./SPIKE-003-realtime-updates.md) - SSE for event streaming
|
|
- [SPIKE-004: Celery Redis Integration](./SPIKE-004-celery-redis-integration.md) - Background job infrastructure
|
|
|
|
---
|
|
|
|
## Decision
|
|
|
|
**Adopt a webhook-first, polling-fallback synchronization architecture** with:
|
|
|
|
1. **Last-Writer-Wins (LWW)** conflict resolution using version vectors
|
|
2. **External tracker as source of truth** with local mirrors
|
|
3. **Unified provider interface** abstracting Gitea, GitHub, GitLab differences
|
|
4. **Outbox pattern** for reliable outbound sync
|
|
5. **Redis Streams** for webhook event queuing and deduplication
|
|
6. **Celery Beat** for periodic reconciliation
|
|
|
|
This approach balances real-time responsiveness with eventual consistency, providing a robust foundation for bidirectional issue synchronization.
|
|
|
|
---
|
|
|
|
*Spike completed. Findings will inform ADR-009: Issue Synchronization Architecture.*
|