Files
syndarix/docs/spikes/SPIKE-006-knowledge-base-pgvector.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

42 KiB
Raw Permalink Blame History

SPIKE-006: Knowledge Base with pgvector for RAG System

Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #6


Executive Summary

This spike researches the optimal approach for implementing a knowledge base system to enable RAG (Retrieval-Augmented Generation) for Syndarix AI agents. After evaluating options, we recommend pgvector with hybrid search as the primary solution.

Key Recommendation

Use pgvector for the following reasons:

  • Already using PostgreSQL in the stack (operational simplicity)
  • Handles 10-100M vectors effectively (sufficient for project-scoped knowledge)
  • Transactional consistency with application data
  • Native hybrid search with PostgreSQL full-text search
  • Row-level security for multi-tenant isolation
  • Integrates seamlessly with existing migrations and tooling

For projects that scale beyond 100M vectors per tenant, consider migration to Qdrant (open-source, high-performance) or Pinecone (fully managed).


Research Questions & Findings

1. pgvector vs Dedicated Vector Databases

Feature pgvector Pinecone Qdrant Weaviate
Max Scale 10-100M vectors Billions Billions Billions
Self-Hosted Yes No Yes Yes
Managed Option RDS, Neon, Supabase Yes (only) Yes Yes
Query Latency Good (<100ms) Excellent Excellent Good
Hybrid Search Native + pg_search Sparse vectors Native Native
Cost (1M vectors) ~$0 (existing DB) $20-30/mo ~$27/mo Variable
Operational Overhead Zero (existing) None Medium Medium
ACID Transactions Yes No No No

Why pgvector for Syndarix:

  • Per-project knowledge isolation means smaller vector sets (thousands to millions, not billions)
  • Transactional ingest: embed and index in the same INSERT as application data
  • Single database backup/restore story
  • Migration path exists if scale requires dedicated solution

When to Consider Alternatives:

  • Pinecone: Zero-ops requirement, budget available, billions of vectors
  • Qdrant: Need advanced filtering, high QPS, open-source preference
  • Weaviate: Multi-modal (images, video), knowledge graph features

2. Embedding Model Recommendations

Based on research from 2024-2025 and Modal's code embedding comparison:

Model Best For Dimensions Cost/1M tokens Notes
text-embedding-3-small General text, docs 512-1536 $0.02 Good balance
text-embedding-3-large High accuracy needs 256-3072 $0.13 Dimension reduction
voyage-code-3 Code retrieval 1024 $0.06 State-of-art for code
voyage-3-large General + code 1024 $0.12 Top leaderboard
nomic-embed-text Open-source, local 768 Free Ollama compatible

Recommendation for Syndarix:

# Content-type based model selection
EMBEDDING_MODELS = {
    "code": "voyage/voyage-code-3",           # Code files (.py, .js, etc.)
    "documentation": "text-embedding-3-small", # Markdown, docs
    "general": "text-embedding-3-small",       # Default
    "high_accuracy": "voyage/voyage-3-large",  # Critical queries
    "local": "ollama/nomic-embed-text",        # Fallback / dev
}

LiteLLM Integration:

from litellm import embedding

# Via LiteLLM (unified interface)
response = await embedding(
    model="voyage/voyage-code-3",
    input=["def hello(): return 'world'"],
)
vector = response.data[0].embedding

3. Chunking Strategies

Based on Weaviate's research and Stack Overflow's analysis:

Strategy by Content Type:

Content Type Strategy Chunk Size Overlap Notes
Code Files AST-based / Function Per function/class None Preserve semantic units
Markdown Docs Heading-based Per section 10% Respect document structure
PDF Specs Page-level + semantic 1000 tokens 15% NVIDIA recommends page-level
Conversations Turn-based Per exchange Context window Preserve dialogue flow
API Docs Endpoint-based Per endpoint None Group by resource

Implementation:

# app/services/knowledge/chunkers.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
import tree_sitter_python as tspython
from tree_sitter import Parser

@dataclass
class Chunk:
    content: str
    metadata: dict
    start_line: int | None = None
    end_line: int | None = None

class BaseChunker(ABC):
    @abstractmethod
    def chunk(self, content: str, metadata: dict) -> List[Chunk]:
        pass

class CodeChunker(BaseChunker):
    """AST-based chunking for source code."""

    def __init__(self, language: str = "python"):
        self.parser = Parser()
        if language == "python":
            self.parser.set_language(tspython.language())

    def chunk(self, content: str, metadata: dict) -> List[Chunk]:
        tree = self.parser.parse(bytes(content, "utf8"))
        chunks = []

        for node in tree.root_node.children:
            if node.type in ("function_definition", "class_definition"):
                chunk_content = content[node.start_byte:node.end_byte]
                chunks.append(Chunk(
                    content=chunk_content,
                    metadata={
                        **metadata,
                        "type": node.type,
                        "name": self._get_name(node),
                    },
                    start_line=node.start_point[0],
                    end_line=node.end_point[0],
                ))

        # Handle module-level code
        if not chunks:
            chunks.append(Chunk(content=content, metadata=metadata))

        return chunks

    def _get_name(self, node) -> str:
        for child in node.children:
            if child.type == "identifier":
                return child.text.decode("utf8")
        return "unknown"

class MarkdownChunker(BaseChunker):
    """Heading-based chunking for markdown."""

    def __init__(self, max_tokens: int = 1000, overlap_ratio: float = 0.1):
        self.max_tokens = max_tokens
        self.overlap_ratio = overlap_ratio

    def chunk(self, content: str, metadata: dict) -> List[Chunk]:
        import re

        # Split by headings
        sections = re.split(r'^(#{1,6}\s+.+)$', content, flags=re.MULTILINE)
        chunks = []
        current_heading = ""

        for i, section in enumerate(sections):
            if section.startswith('#'):
                current_heading = section.strip()
            elif section.strip():
                chunks.append(Chunk(
                    content=f"{current_heading}\n\n{section.strip()}",
                    metadata={
                        **metadata,
                        "heading": current_heading,
                        "section_index": i,
                    }
                ))

        return self._apply_overlap(chunks)

    def _apply_overlap(self, chunks: List[Chunk]) -> List[Chunk]:
        # Add overlap between chunks for context
        for i in range(1, len(chunks)):
            overlap_size = int(len(chunks[i-1].content) * self.overlap_ratio)
            overlap_text = chunks[i-1].content[-overlap_size:]
            chunks[i].content = f"[Context: ...{overlap_text}]\n\n{chunks[i].content}"
        return chunks

class SemanticChunker(BaseChunker):
    """Semantic chunking based on embedding similarity."""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        from litellm import embedding
        self.embed = embedding
        self.model = embedding_model
        self.similarity_threshold = 0.7

    async def chunk(self, content: str, metadata: dict) -> List[Chunk]:
        import nltk
        sentences = nltk.sent_tokenize(content)

        # Get embeddings for each sentence
        response = await self.embed(model=self.model, input=sentences)
        embeddings = [d.embedding for d in response.data]

        # Group sentences by semantic similarity
        chunks = []
        current_chunk = [sentences[0]]
        current_embedding = embeddings[0]

        for i in range(1, len(sentences)):
            similarity = self._cosine_similarity(current_embedding, embeddings[i])
            if similarity > self.similarity_threshold:
                current_chunk.append(sentences[i])
            else:
                chunks.append(Chunk(
                    content=" ".join(current_chunk),
                    metadata=metadata
                ))
                current_chunk = [sentences[i]]
                current_embedding = embeddings[i]

        if current_chunk:
            chunks.append(Chunk(content=" ".join(current_chunk), metadata=metadata))

        return chunks

    def _cosine_similarity(self, a, b):
        import numpy as np
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

4. Hybrid Search (Semantic + Keyword)

Hybrid search combines the precision of BM25 keyword matching with semantic vector similarity. Based on ParadeDB's research:

Approach: Reciprocal Rank Fusion (RRF)

-- Hybrid search with RRF scoring
WITH semantic_results AS (
    SELECT id, content,
           1 - (embedding <=> $1::vector) as semantic_score,
           ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) as semantic_rank
    FROM knowledge_chunks
    WHERE project_id = $2
    ORDER BY embedding <=> $1::vector
    LIMIT 20
),
keyword_results AS (
    SELECT id, content,
           ts_rank(search_vector, plainto_tsquery('english', $3)) as keyword_score,
           ROW_NUMBER() OVER (ORDER BY ts_rank(search_vector, plainto_tsquery('english', $3)) DESC) as keyword_rank
    FROM knowledge_chunks
    WHERE project_id = $2
      AND search_vector @@ plainto_tsquery('english', $3)
    ORDER BY keyword_score DESC
    LIMIT 20
)
SELECT
    COALESCE(s.id, k.id) as id,
    COALESCE(s.content, k.content) as content,
    -- RRF formula: 1/(k + rank) where k=60 is standard
    (1.0 / (60 + COALESCE(s.semantic_rank, 1000))) +
    (1.0 / (60 + COALESCE(k.keyword_rank, 1000))) as rrf_score
FROM semantic_results s
FULL OUTER JOIN keyword_results k ON s.id = k.id
ORDER BY rrf_score DESC
LIMIT 10;

Implementation with SQLAlchemy:

# app/services/knowledge/search.py
from sqlalchemy import text, func
from sqlalchemy.ext.asyncio import AsyncSession
from pgvector.sqlalchemy import Vector

class HybridSearchService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def search(
        self,
        query: str,
        query_embedding: list[float],
        project_id: str,
        agent_id: str | None = None,
        limit: int = 10,
        semantic_weight: float = 0.5,
    ) -> list[dict]:
        """
        Hybrid search combining semantic and keyword matching.

        Args:
            query: Natural language query
            query_embedding: Pre-computed query embedding
            project_id: Project scope
            agent_id: Optional agent-specific scope
            limit: Max results
            semantic_weight: 0-1, weight for semantic vs keyword
        """
        keyword_weight = 1 - semantic_weight

        sql = text("""
        WITH semantic AS (
            SELECT id, content, metadata,
                   1 - (embedding <=> :embedding::vector) as score,
                   ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding::vector) as rank
            FROM knowledge_chunks
            WHERE project_id = :project_id
              AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL)
            ORDER BY embedding <=> :embedding::vector
            LIMIT 30
        ),
        keyword AS (
            SELECT id, content, metadata,
                   ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) as score,
                   ROW_NUMBER() OVER (
                       ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) DESC
                   ) as rank
            FROM knowledge_chunks
            WHERE project_id = :project_id
              AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL)
              AND search_vector @@ websearch_to_tsquery('english', :query)
            ORDER BY score DESC
            LIMIT 30
        )
        SELECT
            COALESCE(s.id, k.id) as id,
            COALESCE(s.content, k.content) as content,
            COALESCE(s.metadata, k.metadata) as metadata,
            (
                :semantic_weight * (1.0 / (60 + COALESCE(s.rank, 1000))) +
                :keyword_weight * (1.0 / (60 + COALESCE(k.rank, 1000)))
            ) as combined_score,
            s.score as semantic_score,
            k.score as keyword_score
        FROM semantic s
        FULL OUTER JOIN keyword k ON s.id = k.id
        ORDER BY combined_score DESC
        LIMIT :limit
        """)

        result = await self.db.execute(sql, {
            "embedding": query_embedding,
            "query": query,
            "project_id": project_id,
            "agent_id": agent_id,
            "semantic_weight": semantic_weight,
            "keyword_weight": keyword_weight,
            "limit": limit,
        })

        return [dict(row._mapping) for row in result.fetchall()]

5. Multi-Tenant Vector Collections

Based on Timescale's research on multi-tenant RAG:

Recommended Pattern: Shared Table with Tenant ID

For Syndarix, use a shared table with project_id and agent_id columns:

# app/models/knowledge.py
from sqlalchemy import Column, String, Text, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from pgvector.sqlalchemy import Vector
from app.db.base import Base

class KnowledgeChunk(Base):
    __tablename__ = "knowledge_chunks"

    id = Column(UUID, primary_key=True, default=uuid.uuid4)

    # Multi-tenant isolation
    project_id = Column(UUID, ForeignKey("projects.id"), nullable=False, index=True)
    agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True, index=True)

    # Content
    content = Column(Text, nullable=False)
    content_type = Column(String(50), nullable=False)  # code, markdown, pdf, etc.

    # Source tracking
    source_uri = Column(String(512))  # file path, URL, etc.
    source_type = Column(String(50))  # file, url, conversation, etc.

    # Vector embedding
    embedding = Column(Vector(1536))  # Dimension depends on model
    embedding_model = Column(String(100))

    # Full-text search
    search_vector = Column(TSVECTOR)

    # Metadata
    metadata = Column(JSONB, default={})

    # Timestamps
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, onupdate=func.now())

    __table_args__ = (
        # HNSW index for vector similarity (per-project partitioning)
        Index(
            'ix_knowledge_chunks_embedding_hnsw',
            'embedding',
            postgresql_using='hnsw',
            postgresql_with={'m': 16, 'ef_construction': 64},
            postgresql_ops={'embedding': 'vector_cosine_ops'}
        ),
        # GIN index for full-text search
        Index(
            'ix_knowledge_chunks_search_vector',
            'search_vector',
            postgresql_using='gin'
        ),
        # Composite index for tenant isolation
        Index('ix_knowledge_chunks_project_agent', 'project_id', 'agent_id'),
    )

class KnowledgeCollection(Base):
    """Groups of chunks for organizing knowledge."""
    __tablename__ = "knowledge_collections"

    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
    name = Column(String(100), nullable=False)
    description = Column(Text)
    collection_type = Column(String(50))  # codebase, documentation, specs, etc.

    # Configuration
    chunking_strategy = Column(String(50), default="auto")
    embedding_model = Column(String(100), default="text-embedding-3-small")

    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, onupdate=func.now())

class ChunkCollectionAssociation(Base):
    """Many-to-many: chunks can belong to multiple collections."""
    __tablename__ = "chunk_collection_associations"

    chunk_id = Column(UUID, ForeignKey("knowledge_chunks.id"), primary_key=True)
    collection_id = Column(UUID, ForeignKey("knowledge_collections.id"), primary_key=True)

Row-Level Security (Optional but Recommended):

-- Enable RLS on knowledge_chunks
ALTER TABLE knowledge_chunks ENABLE ROW LEVEL SECURITY;

-- Policy: Users can only access chunks from their projects
CREATE POLICY knowledge_chunk_project_isolation ON knowledge_chunks
    USING (project_id IN (
        SELECT project_id FROM project_members
        WHERE user_id = current_setting('app.current_user_id')::uuid
    ));

6. Indexing Strategies for Large Codebases

HNSW vs IVFFlat Selection:

Factor HNSW IVFFlat
Query speed Faster Slower
Build time Slower Faster
Memory Higher Lower
Accuracy Higher Lower
Use when <10M vectors, high recall needed >10M vectors, memory constrained

HNSW Parameter Guidelines:

-- Small collections (<100K vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- Medium collections (100K-1M vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 24, ef_construction = 100);

-- Large collections (1M-10M vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);

-- Query-time tuning
SET hnsw.ef_search = 100;  -- Higher = better recall, slower

Partial Indexes for Multi-Tenant:

-- Create partial indexes per high-traffic project
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
WHERE project_id = 'frequently-queried-project-id';

Build Performance:

-- Speed up index builds
SET maintenance_work_mem = '2GB';  -- Ensure graph fits in memory
SET max_parallel_maintenance_workers = 7;  -- Parallel building

7. Real-Time vs Batch Embedding Updates

Recommendation: Hybrid Approach

Scenario Strategy Why
New file added Real-time Immediate availability
Bulk import Batch (Celery) Avoid blocking
File modified Debounced real-time Avoid churning
Conversation Real-time Context needed now
Codebase sync Scheduled batch Efficient

Implementation:

# app/services/knowledge/ingestion.py
from celery import shared_task
from app.core.celery import celery_app
from app.services.knowledge.embedder import EmbeddingService
from app.services.knowledge.chunkers import get_chunker

class KnowledgeIngestionService:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.embedder = EmbeddingService()

    async def ingest_realtime(
        self,
        project_id: str,
        content: str,
        content_type: str,
        source_uri: str,
        agent_id: str | None = None,
    ) -> list[str]:
        """Real-time ingestion for immediate availability."""
        chunker = get_chunker(content_type)
        chunks = chunker.chunk(content, {"source_uri": source_uri})

        # Embed and store
        chunk_ids = []
        for chunk in chunks:
            embedding = await self.embedder.embed(chunk.content, content_type)

            db_chunk = KnowledgeChunk(
                project_id=project_id,
                agent_id=agent_id,
                content=chunk.content,
                content_type=content_type,
                source_uri=source_uri,
                embedding=embedding,
                embedding_model=self.embedder.get_model(content_type),
                metadata=chunk.metadata,
            )
            self.db.add(db_chunk)
            chunk_ids.append(str(db_chunk.id))

        await self.db.commit()
        return chunk_ids

    def schedule_batch_ingestion(
        self,
        project_id: str,
        files: list[dict],  # [{path, content_type}]
    ) -> str:
        """Schedule batch ingestion via Celery."""
        task = batch_ingest_files.delay(project_id, files)
        return task.id

@celery_app.task(bind=True, max_retries=3)
def batch_ingest_files(self, project_id: str, files: list[dict]):
    """Celery task for batch file ingestion."""
    from app.core.database import get_sync_session

    with get_sync_session() as db:
        ingestion = KnowledgeIngestionService(db)

        for file in files:
            try:
                # Read file content
                with open(file["path"], "r") as f:
                    content = f.read()

                # Process (sync version)
                ingestion.ingest_sync(
                    project_id=project_id,
                    content=content,
                    content_type=file["content_type"],
                    source_uri=file["path"],
                )
            except Exception as e:
                # Log and continue, don't fail entire batch
                logger.error(f"Failed to ingest {file['path']}: {e}")

        db.commit()

Debounced Updates:

# app/services/knowledge/watcher.py
import asyncio
from collections import defaultdict

class KnowledgeUpdateDebouncer:
    """Debounce rapid file changes to avoid excessive re-embedding."""

    def __init__(self, delay_seconds: float = 2.0):
        self.delay = delay_seconds
        self.pending: dict[str, asyncio.Task] = {}

    async def schedule_update(
        self,
        file_path: str,
        update_callback: callable,
    ):
        """Schedule an update, canceling any pending update for the same file."""
        # Cancel existing pending update
        if file_path in self.pending:
            self.pending[file_path].cancel()

        # Schedule new update
        self.pending[file_path] = asyncio.create_task(
            self._delayed_update(file_path, update_callback)
        )

    async def _delayed_update(self, file_path: str, callback: callable):
        await asyncio.sleep(self.delay)
        await callback(file_path)
        del self.pending[file_path]

Schema Design

Database Schema

-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;  -- For fuzzy text matching

-- Knowledge chunks table
CREATE TABLE knowledge_chunks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- Multi-tenant isolation
    project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    agent_id UUID REFERENCES agent_instances(id) ON DELETE SET NULL,
    collection_id UUID REFERENCES knowledge_collections(id) ON DELETE SET NULL,

    -- Content
    content TEXT NOT NULL,
    content_type VARCHAR(50) NOT NULL,

    -- Source tracking
    source_uri VARCHAR(512),
    source_type VARCHAR(50),
    source_hash VARCHAR(64),  -- For detecting changes

    -- Vector embedding (1536 for text-embedding-3-small)
    embedding vector(1536),
    embedding_model VARCHAR(100),

    -- Full-text search
    search_vector tsvector GENERATED ALWAYS AS (
        setweight(to_tsvector('english', coalesce(content, '')), 'A')
    ) STORED,

    -- Metadata
    metadata JSONB DEFAULT '{}',

    -- Timestamps
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes
CREATE INDEX ix_knowledge_chunks_project ON knowledge_chunks(project_id);
CREATE INDEX ix_knowledge_chunks_agent ON knowledge_chunks(agent_id);
CREATE INDEX ix_knowledge_chunks_collection ON knowledge_chunks(collection_id);
CREATE INDEX ix_knowledge_chunks_source_hash ON knowledge_chunks(source_hash);

-- HNSW vector index
CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- GIN index for full-text search
CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector);

-- Knowledge collections
CREATE TABLE knowledge_collections (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    collection_type VARCHAR(50),
    chunking_strategy VARCHAR(50) DEFAULT 'auto',
    embedding_model VARCHAR(100) DEFAULT 'text-embedding-3-small',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),

    UNIQUE(project_id, name)
);

-- Trigger for updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER knowledge_chunks_updated_at
    BEFORE UPDATE ON knowledge_chunks
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at();

Alembic Migration

# alembic/versions/xxxx_add_knowledge_base.py
"""Add knowledge base tables for RAG

Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa
from pgvector.sqlalchemy import Vector

def upgrade():
    # Enable extensions
    op.execute("CREATE EXTENSION IF NOT EXISTS vector")
    op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")

    # Create knowledge_collections table
    op.create_table(
        'knowledge_collections',
        sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True),
        sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id'), nullable=False),
        sa.Column('name', sa.String(100), nullable=False),
        sa.Column('description', sa.Text()),
        sa.Column('collection_type', sa.String(50)),
        sa.Column('chunking_strategy', sa.String(50), default='auto'),
        sa.Column('embedding_model', sa.String(100), default='text-embedding-3-small'),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(timezone=True)),
        sa.UniqueConstraint('project_id', 'name', name='uq_knowledge_collections_project_name'),
    )

    # Create knowledge_chunks table
    op.create_table(
        'knowledge_chunks',
        sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True),
        sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False),
        sa.Column('agent_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('agent_instances.id', ondelete='SET NULL')),
        sa.Column('collection_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('knowledge_collections.id', ondelete='SET NULL')),
        sa.Column('content', sa.Text(), nullable=False),
        sa.Column('content_type', sa.String(50), nullable=False),
        sa.Column('source_uri', sa.String(512)),
        sa.Column('source_type', sa.String(50)),
        sa.Column('source_hash', sa.String(64)),
        sa.Column('embedding', Vector(1536)),
        sa.Column('embedding_model', sa.String(100)),
        sa.Column('metadata', sa.dialects.postgresql.JSONB(), default={}),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(timezone=True)),
    )

    # Create indexes
    op.create_index('ix_knowledge_chunks_project', 'knowledge_chunks', ['project_id'])
    op.create_index('ix_knowledge_chunks_agent', 'knowledge_chunks', ['agent_id'])
    op.create_index('ix_knowledge_chunks_collection', 'knowledge_chunks', ['collection_id'])
    op.create_index('ix_knowledge_chunks_source_hash', 'knowledge_chunks', ['source_hash'])

    # Create HNSW vector index
    op.execute("""
        CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks
        USING hnsw (embedding vector_cosine_ops)
        WITH (m = 16, ef_construction = 64)
    """)

    # Add full-text search column and index
    op.execute("""
        ALTER TABLE knowledge_chunks
        ADD COLUMN search_vector tsvector
        GENERATED ALWAYS AS (
            setweight(to_tsvector('english', coalesce(content, '')), 'A')
        ) STORED
    """)
    op.execute("CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector)")

def downgrade():
    op.drop_table('knowledge_chunks')
    op.drop_table('knowledge_collections')

Complete Service Implementation

# app/services/knowledge/service.py
from dataclasses import dataclass
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from litellm import embedding as litellm_embedding

from app.models.knowledge import KnowledgeChunk, KnowledgeCollection
from app.services.knowledge.chunkers import get_chunker
from app.services.knowledge.search import HybridSearchService

@dataclass
class SearchResult:
    id: str
    content: str
    metadata: dict
    score: float
    source_uri: Optional[str] = None

class KnowledgeBaseService:
    """
    Main service for knowledge base operations.
    Integrates with LiteLLM for embeddings.
    """

    # Model selection by content type
    EMBEDDING_MODELS = {
        "code": "voyage/voyage-code-3",
        "markdown": "text-embedding-3-small",
        "pdf": "text-embedding-3-small",
        "conversation": "text-embedding-3-small",
        "default": "text-embedding-3-small",
    }

    def __init__(self, db: AsyncSession):
        self.db = db
        self.search_service = HybridSearchService(db)

    async def create_collection(
        self,
        project_id: str,
        name: str,
        collection_type: str,
        description: str = "",
        chunking_strategy: str = "auto",
    ) -> KnowledgeCollection:
        """Create a new knowledge collection for a project."""
        collection = KnowledgeCollection(
            project_id=project_id,
            name=name,
            description=description,
            collection_type=collection_type,
            chunking_strategy=chunking_strategy,
            embedding_model=self.EMBEDDING_MODELS.get(collection_type, "text-embedding-3-small"),
        )
        self.db.add(collection)
        await self.db.commit()
        await self.db.refresh(collection)
        return collection

    async def ingest(
        self,
        project_id: str,
        content: str,
        content_type: str,
        source_uri: str,
        agent_id: Optional[str] = None,
        collection_id: Optional[str] = None,
        metadata: Optional[dict] = None,
    ) -> list[str]:
        """
        Ingest content into the knowledge base.
        Automatically chunks and embeds the content.
        """
        import hashlib

        # Check for existing content by hash
        source_hash = hashlib.sha256(content.encode()).hexdigest()
        existing = await self.db.execute(
            select(KnowledgeChunk).where(
                KnowledgeChunk.project_id == project_id,
                KnowledgeChunk.source_hash == source_hash,
            )
        )
        if existing.scalar_one_or_none():
            # Content unchanged, skip
            return []

        # Get appropriate chunker
        chunker = get_chunker(content_type)
        chunks = chunker.chunk(content, metadata or {})

        # Get embedding model
        model = self.EMBEDDING_MODELS.get(content_type, "text-embedding-3-small")

        # Embed all chunks in batch
        chunk_texts = [c.content for c in chunks]
        embeddings = await self._embed_batch(chunk_texts, model)

        # Store chunks
        chunk_ids = []
        for chunk, emb in zip(chunks, embeddings):
            db_chunk = KnowledgeChunk(
                project_id=project_id,
                agent_id=agent_id,
                collection_id=collection_id,
                content=chunk.content,
                content_type=content_type,
                source_uri=source_uri,
                source_type=self._infer_source_type(source_uri),
                source_hash=source_hash,
                embedding=emb,
                embedding_model=model,
                metadata={
                    **chunk.metadata,
                    **(metadata or {}),
                },
            )
            self.db.add(db_chunk)
            chunk_ids.append(str(db_chunk.id))

        await self.db.commit()
        return chunk_ids

    async def search(
        self,
        project_id: str,
        query: str,
        agent_id: Optional[str] = None,
        collection_id: Optional[str] = None,
        limit: int = 10,
        content_types: Optional[list[str]] = None,
        semantic_weight: float = 0.6,
    ) -> list[SearchResult]:
        """
        Search the knowledge base using hybrid search.

        Args:
            project_id: Project scope
            query: Natural language query
            agent_id: Optional agent-specific scope
            collection_id: Optional collection scope
            limit: Max results
            content_types: Filter by content types
            semantic_weight: 0-1, weight for semantic vs keyword
        """
        # Get query embedding
        query_embedding = await self._embed_query(query)

        # Perform hybrid search
        results = await self.search_service.search(
            query=query,
            query_embedding=query_embedding,
            project_id=project_id,
            agent_id=agent_id,
            limit=limit,
            semantic_weight=semantic_weight,
        )

        return [
            SearchResult(
                id=r["id"],
                content=r["content"],
                metadata=r.get("metadata", {}),
                score=r["combined_score"],
                source_uri=r.get("source_uri"),
            )
            for r in results
        ]

    async def delete_by_source(
        self,
        project_id: str,
        source_uri: str,
    ) -> int:
        """Delete all chunks from a specific source."""
        result = await self.db.execute(
            delete(KnowledgeChunk).where(
                KnowledgeChunk.project_id == project_id,
                KnowledgeChunk.source_uri == source_uri,
            )
        )
        await self.db.commit()
        return result.rowcount

    async def _embed_batch(
        self,
        texts: list[str],
        model: str,
    ) -> list[list[float]]:
        """Embed multiple texts in a single API call."""
        response = await litellm_embedding(
            model=model,
            input=texts,
        )
        return [d["embedding"] for d in response.data]

    async def _embed_query(self, query: str) -> list[float]:
        """Embed a query string."""
        response = await litellm_embedding(
            model="text-embedding-3-small",
            input=[query],
        )
        return response.data[0]["embedding"]

    def _infer_source_type(self, source_uri: str) -> str:
        """Infer source type from URI."""
        if source_uri.startswith("http"):
            return "url"
        if source_uri.startswith("conversation:"):
            return "conversation"
        return "file"

Performance Considerations

Query Latency Targets

Vector Count Target Latency Recommended Config
<100K <20ms Default HNSW
100K-1M <50ms m=24, ef_construction=100
1M-10M <100ms m=32, ef_construction=128, ef_search=100

Memory Requirements

HNSW memory ≈ vectors × dimensions × 4 bytes × (1 + m/8)

Example: 1M vectors × 1536 dims × 4 bytes × (1 + 16/8) = ~9.2 GB

Batch Embedding Costs

Model 1K chunks 10K chunks 100K chunks
text-embedding-3-small $0.002 $0.02 $0.20
voyage-code-3 $0.006 $0.06 $0.60
Local (nomic-embed) $0 $0 $0

Optimization Tips

  1. Use batch embedding - Single API call for multiple chunks
  2. Cache query embeddings - Same queries return same vectors
  3. Partial indexes - Create per-project indexes for high-traffic projects
  4. Dimension reduction - Use 512-dim with text-embedding-3-small for cost savings
  5. Connection pooling - Use pgBouncer for high-concurrency scenarios

Integration with Syndarix Agents

Agent Context Retrieval

# app/services/agent/context.py
class AgentContextBuilder:
    """Builds context for agent prompts using RAG."""

    def __init__(self, kb_service: KnowledgeBaseService):
        self.kb = kb_service

    async def build_context(
        self,
        agent_id: str,
        project_id: str,
        task_description: str,
        max_context_tokens: int = 4000,
    ) -> str:
        """
        Build relevant context for an agent task.

        Returns formatted context string for inclusion in prompt.
        """
        # Search for relevant knowledge
        results = await self.kb.search(
            project_id=project_id,
            query=task_description,
            agent_id=agent_id,  # Prefer agent-specific knowledge
            limit=10,
            semantic_weight=0.7,
        )

        # Format context
        context_parts = []
        current_tokens = 0

        for result in results:
            chunk_tokens = self._count_tokens(result.content)
            if current_tokens + chunk_tokens > max_context_tokens:
                break

            context_parts.append(f"""
### Source: {result.source_uri or 'Unknown'}
{result.content}
""")
            current_tokens += chunk_tokens

        if not context_parts:
            return ""

        return f"""
## Relevant Context

The following information was retrieved from the project knowledge base:

{"".join(context_parts)}

---
"""

    def _count_tokens(self, text: str) -> int:
        """Approximate token count."""
        return len(text) // 4  # Rough estimate

MCP Tool for Knowledge Access

# app/mcp/tools/knowledge.py
from mcp import Tool, ToolResult

class KnowledgeSearchTool(Tool):
    """MCP tool for agents to search project knowledge."""

    name = "search_knowledge"
    description = "Search the project knowledge base for relevant information"

    parameters = {
        "type": "object",
        "properties": {
            "project_id": {
                "type": "string",
                "description": "The project ID to search within"
            },
            "query": {
                "type": "string",
                "description": "Natural language search query"
            },
            "content_types": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Filter by content types (code, markdown, pdf)"
            },
            "limit": {
                "type": "integer",
                "default": 5,
                "description": "Maximum results to return"
            }
        },
        "required": ["project_id", "query"]
    }

    async def execute(self, **params) -> ToolResult:
        results = await self.kb_service.search(
            project_id=params["project_id"],
            query=params["query"],
            content_types=params.get("content_types"),
            limit=params.get("limit", 5),
        )

        return ToolResult(
            content=[
                {
                    "source": r.source_uri,
                    "content": r.content[:500] + "..." if len(r.content) > 500 else r.content,
                    "relevance_score": r.score,
                }
                for r in results
            ]
        )

References

Vector Databases

Embedding Models

Chunking & RAG

Multi-Tenant RAG

pgvector


Decision

Adopt pgvector with hybrid search as the knowledge base solution for Syndarix RAG:

  1. pgvector for vector storage and similarity search
  2. PostgreSQL full-text search (tsvector) for keyword matching
  3. Reciprocal Rank Fusion (RRF) for combining results
  4. LiteLLM for unified embedding API
  5. Content-type-aware chunking with AST parsing for code
  6. Shared table with tenant isolation via project_id/agent_id

Migration Path: If any project exceeds 10M vectors or requires sub-10ms latency, evaluate Qdrant as a dedicated vector store while keeping metadata in PostgreSQL.


Spike completed. Findings will inform ADR-006: Knowledge Base Architecture.