# SPIKE-006: Knowledge Base with pgvector for RAG System **Status:** Completed **Date:** 2025-12-29 **Author:** Architecture Team **Related Issue:** #6 --- ## Executive Summary This spike researches the optimal approach for implementing a knowledge base system to enable RAG (Retrieval-Augmented Generation) for Syndarix AI agents. After evaluating options, we recommend **pgvector with hybrid search** as the primary solution. ### Key Recommendation **Use pgvector** for the following reasons: - Already using PostgreSQL in the stack (operational simplicity) - Handles 10-100M vectors effectively (sufficient for project-scoped knowledge) - Transactional consistency with application data - Native hybrid search with PostgreSQL full-text search - Row-level security for multi-tenant isolation - Integrates seamlessly with existing migrations and tooling For projects that scale beyond 100M vectors per tenant, consider migration to Qdrant (open-source, high-performance) or Pinecone (fully managed). --- ## Research Questions & Findings ### 1. pgvector vs Dedicated Vector Databases | Feature | pgvector | Pinecone | Qdrant | Weaviate | |---------|----------|----------|--------|----------| | **Max Scale** | 10-100M vectors | Billions | Billions | Billions | | **Self-Hosted** | Yes | No | Yes | Yes | | **Managed Option** | RDS, Neon, Supabase | Yes (only) | Yes | Yes | | **Query Latency** | Good (<100ms) | Excellent | Excellent | Good | | **Hybrid Search** | Native + pg_search | Sparse vectors | Native | Native | | **Cost (1M vectors)** | ~$0 (existing DB) | $20-30/mo | ~$27/mo | Variable | | **Operational Overhead** | Zero (existing) | None | Medium | Medium | | **ACID Transactions** | Yes | No | No | No | **Why pgvector for Syndarix:** - Per-project knowledge isolation means smaller vector sets (thousands to millions, not billions) - Transactional ingest: embed and index in the same INSERT as application data - Single database backup/restore story - Migration path exists if scale requires dedicated solution **When to Consider Alternatives:** - Pinecone: Zero-ops requirement, budget available, billions of vectors - Qdrant: Need advanced filtering, high QPS, open-source preference - Weaviate: Multi-modal (images, video), knowledge graph features ### 2. Embedding Model Recommendations Based on [research from 2024-2025](https://elephas.app/blog/best-embedding-models) and [Modal's code embedding comparison](https://modal.com/blog/6-best-code-embedding-models-compared): | Model | Best For | Dimensions | Cost/1M tokens | Notes | |-------|----------|------------|----------------|-------| | **text-embedding-3-small** | General text, docs | 512-1536 | $0.02 | Good balance | | **text-embedding-3-large** | High accuracy needs | 256-3072 | $0.13 | Dimension reduction | | **voyage-code-3** | Code retrieval | 1024 | $0.06 | State-of-art for code | | **voyage-3-large** | General + code | 1024 | $0.12 | Top leaderboard | | **nomic-embed-text** | Open-source, local | 768 | Free | Ollama compatible | **Recommendation for Syndarix:** ```python # Content-type based model selection EMBEDDING_MODELS = { "code": "voyage/voyage-code-3", # Code files (.py, .js, etc.) "documentation": "text-embedding-3-small", # Markdown, docs "general": "text-embedding-3-small", # Default "high_accuracy": "voyage/voyage-3-large", # Critical queries "local": "ollama/nomic-embed-text", # Fallback / dev } ``` **LiteLLM Integration:** ```python from litellm import embedding # Via LiteLLM (unified interface) response = await embedding( model="voyage/voyage-code-3", input=["def hello(): return 'world'"], ) vector = response.data[0].embedding ``` ### 3. Chunking Strategies Based on [Weaviate's research](https://weaviate.io/blog/chunking-strategies-for-rag) and [Stack Overflow's analysis](https://stackoverflow.blog/2024/12/27/breaking-up-is-hard-to-do-chunking-in-rag-applications/): **Strategy by Content Type:** | Content Type | Strategy | Chunk Size | Overlap | Notes | |--------------|----------|------------|---------|-------| | **Code Files** | AST-based / Function | Per function/class | None | Preserve semantic units | | **Markdown Docs** | Heading-based | Per section | 10% | Respect document structure | | **PDF Specs** | Page-level + semantic | 1000 tokens | 15% | NVIDIA recommends page-level | | **Conversations** | Turn-based | Per exchange | Context window | Preserve dialogue flow | | **API Docs** | Endpoint-based | Per endpoint | None | Group by resource | **Implementation:** ```python # app/services/knowledge/chunkers.py from abc import ABC, abstractmethod from dataclasses import dataclass from typing import List import tree_sitter_python as tspython from tree_sitter import Parser @dataclass class Chunk: content: str metadata: dict start_line: int | None = None end_line: int | None = None class BaseChunker(ABC): @abstractmethod def chunk(self, content: str, metadata: dict) -> List[Chunk]: pass class CodeChunker(BaseChunker): """AST-based chunking for source code.""" def __init__(self, language: str = "python"): self.parser = Parser() if language == "python": self.parser.set_language(tspython.language()) def chunk(self, content: str, metadata: dict) -> List[Chunk]: tree = self.parser.parse(bytes(content, "utf8")) chunks = [] for node in tree.root_node.children: if node.type in ("function_definition", "class_definition"): chunk_content = content[node.start_byte:node.end_byte] chunks.append(Chunk( content=chunk_content, metadata={ **metadata, "type": node.type, "name": self._get_name(node), }, start_line=node.start_point[0], end_line=node.end_point[0], )) # Handle module-level code if not chunks: chunks.append(Chunk(content=content, metadata=metadata)) return chunks def _get_name(self, node) -> str: for child in node.children: if child.type == "identifier": return child.text.decode("utf8") return "unknown" class MarkdownChunker(BaseChunker): """Heading-based chunking for markdown.""" def __init__(self, max_tokens: int = 1000, overlap_ratio: float = 0.1): self.max_tokens = max_tokens self.overlap_ratio = overlap_ratio def chunk(self, content: str, metadata: dict) -> List[Chunk]: import re # Split by headings sections = re.split(r'^(#{1,6}\s+.+)$', content, flags=re.MULTILINE) chunks = [] current_heading = "" for i, section in enumerate(sections): if section.startswith('#'): current_heading = section.strip() elif section.strip(): chunks.append(Chunk( content=f"{current_heading}\n\n{section.strip()}", metadata={ **metadata, "heading": current_heading, "section_index": i, } )) return self._apply_overlap(chunks) def _apply_overlap(self, chunks: List[Chunk]) -> List[Chunk]: # Add overlap between chunks for context for i in range(1, len(chunks)): overlap_size = int(len(chunks[i-1].content) * self.overlap_ratio) overlap_text = chunks[i-1].content[-overlap_size:] chunks[i].content = f"[Context: ...{overlap_text}]\n\n{chunks[i].content}" return chunks class SemanticChunker(BaseChunker): """Semantic chunking based on embedding similarity.""" def __init__(self, embedding_model: str = "text-embedding-3-small"): from litellm import embedding self.embed = embedding self.model = embedding_model self.similarity_threshold = 0.7 async def chunk(self, content: str, metadata: dict) -> List[Chunk]: import nltk sentences = nltk.sent_tokenize(content) # Get embeddings for each sentence response = await self.embed(model=self.model, input=sentences) embeddings = [d.embedding for d in response.data] # Group sentences by semantic similarity chunks = [] current_chunk = [sentences[0]] current_embedding = embeddings[0] for i in range(1, len(sentences)): similarity = self._cosine_similarity(current_embedding, embeddings[i]) if similarity > self.similarity_threshold: current_chunk.append(sentences[i]) else: chunks.append(Chunk( content=" ".join(current_chunk), metadata=metadata )) current_chunk = [sentences[i]] current_embedding = embeddings[i] if current_chunk: chunks.append(Chunk(content=" ".join(current_chunk), metadata=metadata)) return chunks def _cosine_similarity(self, a, b): import numpy as np return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) ``` ### 4. Hybrid Search (Semantic + Keyword) Hybrid search combines the precision of BM25 keyword matching with semantic vector similarity. Based on [ParadeDB's research](https://www.paradedb.com/blog/hybrid-search-in-postgresql-the-missing-manual): **Approach: Reciprocal Rank Fusion (RRF)** ```sql -- Hybrid search with RRF scoring WITH semantic_results AS ( SELECT id, content, 1 - (embedding <=> $1::vector) as semantic_score, ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) as semantic_rank FROM knowledge_chunks WHERE project_id = $2 ORDER BY embedding <=> $1::vector LIMIT 20 ), keyword_results AS ( SELECT id, content, ts_rank(search_vector, plainto_tsquery('english', $3)) as keyword_score, ROW_NUMBER() OVER (ORDER BY ts_rank(search_vector, plainto_tsquery('english', $3)) DESC) as keyword_rank FROM knowledge_chunks WHERE project_id = $2 AND search_vector @@ plainto_tsquery('english', $3) ORDER BY keyword_score DESC LIMIT 20 ) SELECT COALESCE(s.id, k.id) as id, COALESCE(s.content, k.content) as content, -- RRF formula: 1/(k + rank) where k=60 is standard (1.0 / (60 + COALESCE(s.semantic_rank, 1000))) + (1.0 / (60 + COALESCE(k.keyword_rank, 1000))) as rrf_score FROM semantic_results s FULL OUTER JOIN keyword_results k ON s.id = k.id ORDER BY rrf_score DESC LIMIT 10; ``` **Implementation with SQLAlchemy:** ```python # app/services/knowledge/search.py from sqlalchemy import text, func from sqlalchemy.ext.asyncio import AsyncSession from pgvector.sqlalchemy import Vector class HybridSearchService: def __init__(self, db: AsyncSession): self.db = db async def search( self, query: str, query_embedding: list[float], project_id: str, agent_id: str | None = None, limit: int = 10, semantic_weight: float = 0.5, ) -> list[dict]: """ Hybrid search combining semantic and keyword matching. Args: query: Natural language query query_embedding: Pre-computed query embedding project_id: Project scope agent_id: Optional agent-specific scope limit: Max results semantic_weight: 0-1, weight for semantic vs keyword """ keyword_weight = 1 - semantic_weight sql = text(""" WITH semantic AS ( SELECT id, content, metadata, 1 - (embedding <=> :embedding::vector) as score, ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding::vector) as rank FROM knowledge_chunks WHERE project_id = :project_id AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL) ORDER BY embedding <=> :embedding::vector LIMIT 30 ), keyword AS ( SELECT id, content, metadata, ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) as score, ROW_NUMBER() OVER ( ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) DESC ) as rank FROM knowledge_chunks WHERE project_id = :project_id AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL) AND search_vector @@ websearch_to_tsquery('english', :query) ORDER BY score DESC LIMIT 30 ) SELECT COALESCE(s.id, k.id) as id, COALESCE(s.content, k.content) as content, COALESCE(s.metadata, k.metadata) as metadata, ( :semantic_weight * (1.0 / (60 + COALESCE(s.rank, 1000))) + :keyword_weight * (1.0 / (60 + COALESCE(k.rank, 1000))) ) as combined_score, s.score as semantic_score, k.score as keyword_score FROM semantic s FULL OUTER JOIN keyword k ON s.id = k.id ORDER BY combined_score DESC LIMIT :limit """) result = await self.db.execute(sql, { "embedding": query_embedding, "query": query, "project_id": project_id, "agent_id": agent_id, "semantic_weight": semantic_weight, "keyword_weight": keyword_weight, "limit": limit, }) return [dict(row._mapping) for row in result.fetchall()] ``` ### 5. Multi-Tenant Vector Collections Based on [Timescale's research on multi-tenant RAG](https://www.tigerdata.com/blog/building-multi-tenant-rag-applications-with-postgresql-choosing-the-right-approach): **Recommended Pattern: Shared Table with Tenant ID** For Syndarix, use a shared table with `project_id` and `agent_id` columns: ```python # app/models/knowledge.py from sqlalchemy import Column, String, Text, ForeignKey, Index from sqlalchemy.dialects.postgresql import UUID, JSONB from pgvector.sqlalchemy import Vector from app.db.base import Base class KnowledgeChunk(Base): __tablename__ = "knowledge_chunks" id = Column(UUID, primary_key=True, default=uuid.uuid4) # Multi-tenant isolation project_id = Column(UUID, ForeignKey("projects.id"), nullable=False, index=True) agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True, index=True) # Content content = Column(Text, nullable=False) content_type = Column(String(50), nullable=False) # code, markdown, pdf, etc. # Source tracking source_uri = Column(String(512)) # file path, URL, etc. source_type = Column(String(50)) # file, url, conversation, etc. # Vector embedding embedding = Column(Vector(1536)) # Dimension depends on model embedding_model = Column(String(100)) # Full-text search search_vector = Column(TSVECTOR) # Metadata metadata = Column(JSONB, default={}) # Timestamps created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, onupdate=func.now()) __table_args__ = ( # HNSW index for vector similarity (per-project partitioning) Index( 'ix_knowledge_chunks_embedding_hnsw', 'embedding', postgresql_using='hnsw', postgresql_with={'m': 16, 'ef_construction': 64}, postgresql_ops={'embedding': 'vector_cosine_ops'} ), # GIN index for full-text search Index( 'ix_knowledge_chunks_search_vector', 'search_vector', postgresql_using='gin' ), # Composite index for tenant isolation Index('ix_knowledge_chunks_project_agent', 'project_id', 'agent_id'), ) class KnowledgeCollection(Base): """Groups of chunks for organizing knowledge.""" __tablename__ = "knowledge_collections" id = Column(UUID, primary_key=True, default=uuid.uuid4) project_id = Column(UUID, ForeignKey("projects.id"), nullable=False) name = Column(String(100), nullable=False) description = Column(Text) collection_type = Column(String(50)) # codebase, documentation, specs, etc. # Configuration chunking_strategy = Column(String(50), default="auto") embedding_model = Column(String(100), default="text-embedding-3-small") created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, onupdate=func.now()) class ChunkCollectionAssociation(Base): """Many-to-many: chunks can belong to multiple collections.""" __tablename__ = "chunk_collection_associations" chunk_id = Column(UUID, ForeignKey("knowledge_chunks.id"), primary_key=True) collection_id = Column(UUID, ForeignKey("knowledge_collections.id"), primary_key=True) ``` **Row-Level Security (Optional but Recommended):** ```sql -- Enable RLS on knowledge_chunks ALTER TABLE knowledge_chunks ENABLE ROW LEVEL SECURITY; -- Policy: Users can only access chunks from their projects CREATE POLICY knowledge_chunk_project_isolation ON knowledge_chunks USING (project_id IN ( SELECT project_id FROM project_members WHERE user_id = current_setting('app.current_user_id')::uuid )); ``` ### 6. Indexing Strategies for Large Codebases **HNSW vs IVFFlat Selection:** | Factor | HNSW | IVFFlat | |--------|------|---------| | Query speed | Faster | Slower | | Build time | Slower | Faster | | Memory | Higher | Lower | | Accuracy | Higher | Lower | | Use when | <10M vectors, high recall needed | >10M vectors, memory constrained | **HNSW Parameter Guidelines:** ```sql -- Small collections (<100K vectors) CREATE INDEX ON knowledge_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- Medium collections (100K-1M vectors) CREATE INDEX ON knowledge_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 24, ef_construction = 100); -- Large collections (1M-10M vectors) CREATE INDEX ON knowledge_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 32, ef_construction = 128); -- Query-time tuning SET hnsw.ef_search = 100; -- Higher = better recall, slower ``` **Partial Indexes for Multi-Tenant:** ```sql -- Create partial indexes per high-traffic project CREATE INDEX ON knowledge_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64) WHERE project_id = 'frequently-queried-project-id'; ``` **Build Performance:** ```sql -- Speed up index builds SET maintenance_work_mem = '2GB'; -- Ensure graph fits in memory SET max_parallel_maintenance_workers = 7; -- Parallel building ``` ### 7. Real-Time vs Batch Embedding Updates **Recommendation: Hybrid Approach** | Scenario | Strategy | Why | |----------|----------|-----| | New file added | Real-time | Immediate availability | | Bulk import | Batch (Celery) | Avoid blocking | | File modified | Debounced real-time | Avoid churning | | Conversation | Real-time | Context needed now | | Codebase sync | Scheduled batch | Efficient | **Implementation:** ```python # app/services/knowledge/ingestion.py from celery import shared_task from app.core.celery import celery_app from app.services.knowledge.embedder import EmbeddingService from app.services.knowledge.chunkers import get_chunker class KnowledgeIngestionService: def __init__(self, db: AsyncSession): self.db = db self.embedder = EmbeddingService() async def ingest_realtime( self, project_id: str, content: str, content_type: str, source_uri: str, agent_id: str | None = None, ) -> list[str]: """Real-time ingestion for immediate availability.""" chunker = get_chunker(content_type) chunks = chunker.chunk(content, {"source_uri": source_uri}) # Embed and store chunk_ids = [] for chunk in chunks: embedding = await self.embedder.embed(chunk.content, content_type) db_chunk = KnowledgeChunk( project_id=project_id, agent_id=agent_id, content=chunk.content, content_type=content_type, source_uri=source_uri, embedding=embedding, embedding_model=self.embedder.get_model(content_type), metadata=chunk.metadata, ) self.db.add(db_chunk) chunk_ids.append(str(db_chunk.id)) await self.db.commit() return chunk_ids def schedule_batch_ingestion( self, project_id: str, files: list[dict], # [{path, content_type}] ) -> str: """Schedule batch ingestion via Celery.""" task = batch_ingest_files.delay(project_id, files) return task.id @celery_app.task(bind=True, max_retries=3) def batch_ingest_files(self, project_id: str, files: list[dict]): """Celery task for batch file ingestion.""" from app.core.database import get_sync_session with get_sync_session() as db: ingestion = KnowledgeIngestionService(db) for file in files: try: # Read file content with open(file["path"], "r") as f: content = f.read() # Process (sync version) ingestion.ingest_sync( project_id=project_id, content=content, content_type=file["content_type"], source_uri=file["path"], ) except Exception as e: # Log and continue, don't fail entire batch logger.error(f"Failed to ingest {file['path']}: {e}") db.commit() ``` **Debounced Updates:** ```python # app/services/knowledge/watcher.py import asyncio from collections import defaultdict class KnowledgeUpdateDebouncer: """Debounce rapid file changes to avoid excessive re-embedding.""" def __init__(self, delay_seconds: float = 2.0): self.delay = delay_seconds self.pending: dict[str, asyncio.Task] = {} async def schedule_update( self, file_path: str, update_callback: callable, ): """Schedule an update, canceling any pending update for the same file.""" # Cancel existing pending update if file_path in self.pending: self.pending[file_path].cancel() # Schedule new update self.pending[file_path] = asyncio.create_task( self._delayed_update(file_path, update_callback) ) async def _delayed_update(self, file_path: str, callback: callable): await asyncio.sleep(self.delay) await callback(file_path) del self.pending[file_path] ``` --- ## Schema Design ### Database Schema ```sql -- Enable required extensions CREATE EXTENSION IF NOT EXISTS vector; CREATE EXTENSION IF NOT EXISTS pg_trgm; -- For fuzzy text matching -- Knowledge chunks table CREATE TABLE knowledge_chunks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Multi-tenant isolation project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, agent_id UUID REFERENCES agent_instances(id) ON DELETE SET NULL, collection_id UUID REFERENCES knowledge_collections(id) ON DELETE SET NULL, -- Content content TEXT NOT NULL, content_type VARCHAR(50) NOT NULL, -- Source tracking source_uri VARCHAR(512), source_type VARCHAR(50), source_hash VARCHAR(64), -- For detecting changes -- Vector embedding (1536 for text-embedding-3-small) embedding vector(1536), embedding_model VARCHAR(100), -- Full-text search search_vector tsvector GENERATED ALWAYS AS ( setweight(to_tsvector('english', coalesce(content, '')), 'A') ) STORED, -- Metadata metadata JSONB DEFAULT '{}', -- Timestamps created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); -- Indexes CREATE INDEX ix_knowledge_chunks_project ON knowledge_chunks(project_id); CREATE INDEX ix_knowledge_chunks_agent ON knowledge_chunks(agent_id); CREATE INDEX ix_knowledge_chunks_collection ON knowledge_chunks(collection_id); CREATE INDEX ix_knowledge_chunks_source_hash ON knowledge_chunks(source_hash); -- HNSW vector index CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- GIN index for full-text search CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector); -- Knowledge collections CREATE TABLE knowledge_collections ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(100) NOT NULL, description TEXT, collection_type VARCHAR(50), chunking_strategy VARCHAR(50) DEFAULT 'auto', embedding_model VARCHAR(100) DEFAULT 'text-embedding-3-small', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(project_id, name) ); -- Trigger for updated_at CREATE OR REPLACE FUNCTION update_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER knowledge_chunks_updated_at BEFORE UPDATE ON knowledge_chunks FOR EACH ROW EXECUTE FUNCTION update_updated_at(); ``` ### Alembic Migration ```python # alembic/versions/xxxx_add_knowledge_base.py """Add knowledge base tables for RAG Revision ID: xxxx """ from alembic import op import sqlalchemy as sa from pgvector.sqlalchemy import Vector def upgrade(): # Enable extensions op.execute("CREATE EXTENSION IF NOT EXISTS vector") op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm") # Create knowledge_collections table op.create_table( 'knowledge_collections', sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True), sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id'), nullable=False), sa.Column('name', sa.String(100), nullable=False), sa.Column('description', sa.Text()), sa.Column('collection_type', sa.String(50)), sa.Column('chunking_strategy', sa.String(50), default='auto'), sa.Column('embedding_model', sa.String(100), default='text-embedding-3-small'), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True)), sa.UniqueConstraint('project_id', 'name', name='uq_knowledge_collections_project_name'), ) # Create knowledge_chunks table op.create_table( 'knowledge_chunks', sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True), sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False), sa.Column('agent_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('agent_instances.id', ondelete='SET NULL')), sa.Column('collection_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('knowledge_collections.id', ondelete='SET NULL')), sa.Column('content', sa.Text(), nullable=False), sa.Column('content_type', sa.String(50), nullable=False), sa.Column('source_uri', sa.String(512)), sa.Column('source_type', sa.String(50)), sa.Column('source_hash', sa.String(64)), sa.Column('embedding', Vector(1536)), sa.Column('embedding_model', sa.String(100)), sa.Column('metadata', sa.dialects.postgresql.JSONB(), default={}), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column('updated_at', sa.DateTime(timezone=True)), ) # Create indexes op.create_index('ix_knowledge_chunks_project', 'knowledge_chunks', ['project_id']) op.create_index('ix_knowledge_chunks_agent', 'knowledge_chunks', ['agent_id']) op.create_index('ix_knowledge_chunks_collection', 'knowledge_chunks', ['collection_id']) op.create_index('ix_knowledge_chunks_source_hash', 'knowledge_chunks', ['source_hash']) # Create HNSW vector index op.execute(""" CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64) """) # Add full-text search column and index op.execute(""" ALTER TABLE knowledge_chunks ADD COLUMN search_vector tsvector GENERATED ALWAYS AS ( setweight(to_tsvector('english', coalesce(content, '')), 'A') ) STORED """) op.execute("CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector)") def downgrade(): op.drop_table('knowledge_chunks') op.drop_table('knowledge_collections') ``` --- ## Complete Service Implementation ```python # app/services/knowledge/service.py from dataclasses import dataclass from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from litellm import embedding as litellm_embedding from app.models.knowledge import KnowledgeChunk, KnowledgeCollection from app.services.knowledge.chunkers import get_chunker from app.services.knowledge.search import HybridSearchService @dataclass class SearchResult: id: str content: str metadata: dict score: float source_uri: Optional[str] = None class KnowledgeBaseService: """ Main service for knowledge base operations. Integrates with LiteLLM for embeddings. """ # Model selection by content type EMBEDDING_MODELS = { "code": "voyage/voyage-code-3", "markdown": "text-embedding-3-small", "pdf": "text-embedding-3-small", "conversation": "text-embedding-3-small", "default": "text-embedding-3-small", } def __init__(self, db: AsyncSession): self.db = db self.search_service = HybridSearchService(db) async def create_collection( self, project_id: str, name: str, collection_type: str, description: str = "", chunking_strategy: str = "auto", ) -> KnowledgeCollection: """Create a new knowledge collection for a project.""" collection = KnowledgeCollection( project_id=project_id, name=name, description=description, collection_type=collection_type, chunking_strategy=chunking_strategy, embedding_model=self.EMBEDDING_MODELS.get(collection_type, "text-embedding-3-small"), ) self.db.add(collection) await self.db.commit() await self.db.refresh(collection) return collection async def ingest( self, project_id: str, content: str, content_type: str, source_uri: str, agent_id: Optional[str] = None, collection_id: Optional[str] = None, metadata: Optional[dict] = None, ) -> list[str]: """ Ingest content into the knowledge base. Automatically chunks and embeds the content. """ import hashlib # Check for existing content by hash source_hash = hashlib.sha256(content.encode()).hexdigest() existing = await self.db.execute( select(KnowledgeChunk).where( KnowledgeChunk.project_id == project_id, KnowledgeChunk.source_hash == source_hash, ) ) if existing.scalar_one_or_none(): # Content unchanged, skip return [] # Get appropriate chunker chunker = get_chunker(content_type) chunks = chunker.chunk(content, metadata or {}) # Get embedding model model = self.EMBEDDING_MODELS.get(content_type, "text-embedding-3-small") # Embed all chunks in batch chunk_texts = [c.content for c in chunks] embeddings = await self._embed_batch(chunk_texts, model) # Store chunks chunk_ids = [] for chunk, emb in zip(chunks, embeddings): db_chunk = KnowledgeChunk( project_id=project_id, agent_id=agent_id, collection_id=collection_id, content=chunk.content, content_type=content_type, source_uri=source_uri, source_type=self._infer_source_type(source_uri), source_hash=source_hash, embedding=emb, embedding_model=model, metadata={ **chunk.metadata, **(metadata or {}), }, ) self.db.add(db_chunk) chunk_ids.append(str(db_chunk.id)) await self.db.commit() return chunk_ids async def search( self, project_id: str, query: str, agent_id: Optional[str] = None, collection_id: Optional[str] = None, limit: int = 10, content_types: Optional[list[str]] = None, semantic_weight: float = 0.6, ) -> list[SearchResult]: """ Search the knowledge base using hybrid search. Args: project_id: Project scope query: Natural language query agent_id: Optional agent-specific scope collection_id: Optional collection scope limit: Max results content_types: Filter by content types semantic_weight: 0-1, weight for semantic vs keyword """ # Get query embedding query_embedding = await self._embed_query(query) # Perform hybrid search results = await self.search_service.search( query=query, query_embedding=query_embedding, project_id=project_id, agent_id=agent_id, limit=limit, semantic_weight=semantic_weight, ) return [ SearchResult( id=r["id"], content=r["content"], metadata=r.get("metadata", {}), score=r["combined_score"], source_uri=r.get("source_uri"), ) for r in results ] async def delete_by_source( self, project_id: str, source_uri: str, ) -> int: """Delete all chunks from a specific source.""" result = await self.db.execute( delete(KnowledgeChunk).where( KnowledgeChunk.project_id == project_id, KnowledgeChunk.source_uri == source_uri, ) ) await self.db.commit() return result.rowcount async def _embed_batch( self, texts: list[str], model: str, ) -> list[list[float]]: """Embed multiple texts in a single API call.""" response = await litellm_embedding( model=model, input=texts, ) return [d["embedding"] for d in response.data] async def _embed_query(self, query: str) -> list[float]: """Embed a query string.""" response = await litellm_embedding( model="text-embedding-3-small", input=[query], ) return response.data[0]["embedding"] def _infer_source_type(self, source_uri: str) -> str: """Infer source type from URI.""" if source_uri.startswith("http"): return "url" if source_uri.startswith("conversation:"): return "conversation" return "file" ``` --- ## Performance Considerations ### Query Latency Targets | Vector Count | Target Latency | Recommended Config | |--------------|----------------|-------------------| | <100K | <20ms | Default HNSW | | 100K-1M | <50ms | m=24, ef_construction=100 | | 1M-10M | <100ms | m=32, ef_construction=128, ef_search=100 | ### Memory Requirements ``` HNSW memory ≈ vectors × dimensions × 4 bytes × (1 + m/8) Example: 1M vectors × 1536 dims × 4 bytes × (1 + 16/8) = ~9.2 GB ``` ### Batch Embedding Costs | Model | 1K chunks | 10K chunks | 100K chunks | |-------|-----------|------------|-------------| | text-embedding-3-small | $0.002 | $0.02 | $0.20 | | voyage-code-3 | $0.006 | $0.06 | $0.60 | | Local (nomic-embed) | $0 | $0 | $0 | ### Optimization Tips 1. **Use batch embedding** - Single API call for multiple chunks 2. **Cache query embeddings** - Same queries return same vectors 3. **Partial indexes** - Create per-project indexes for high-traffic projects 4. **Dimension reduction** - Use 512-dim with text-embedding-3-small for cost savings 5. **Connection pooling** - Use pgBouncer for high-concurrency scenarios --- ## Integration with Syndarix Agents ### Agent Context Retrieval ```python # app/services/agent/context.py class AgentContextBuilder: """Builds context for agent prompts using RAG.""" def __init__(self, kb_service: KnowledgeBaseService): self.kb = kb_service async def build_context( self, agent_id: str, project_id: str, task_description: str, max_context_tokens: int = 4000, ) -> str: """ Build relevant context for an agent task. Returns formatted context string for inclusion in prompt. """ # Search for relevant knowledge results = await self.kb.search( project_id=project_id, query=task_description, agent_id=agent_id, # Prefer agent-specific knowledge limit=10, semantic_weight=0.7, ) # Format context context_parts = [] current_tokens = 0 for result in results: chunk_tokens = self._count_tokens(result.content) if current_tokens + chunk_tokens > max_context_tokens: break context_parts.append(f""" ### Source: {result.source_uri or 'Unknown'} {result.content} """) current_tokens += chunk_tokens if not context_parts: return "" return f""" ## Relevant Context The following information was retrieved from the project knowledge base: {"".join(context_parts)} --- """ def _count_tokens(self, text: str) -> int: """Approximate token count.""" return len(text) // 4 # Rough estimate ``` ### MCP Tool for Knowledge Access ```python # app/mcp/tools/knowledge.py from mcp import Tool, ToolResult class KnowledgeSearchTool(Tool): """MCP tool for agents to search project knowledge.""" name = "search_knowledge" description = "Search the project knowledge base for relevant information" parameters = { "type": "object", "properties": { "project_id": { "type": "string", "description": "The project ID to search within" }, "query": { "type": "string", "description": "Natural language search query" }, "content_types": { "type": "array", "items": {"type": "string"}, "description": "Filter by content types (code, markdown, pdf)" }, "limit": { "type": "integer", "default": 5, "description": "Maximum results to return" } }, "required": ["project_id", "query"] } async def execute(self, **params) -> ToolResult: results = await self.kb_service.search( project_id=params["project_id"], query=params["query"], content_types=params.get("content_types"), limit=params.get("limit", 5), ) return ToolResult( content=[ { "source": r.source_uri, "content": r.content[:500] + "..." if len(r.content) > 500 else r.content, "relevance_score": r.score, } for r in results ] ) ``` --- ## References ### Vector Databases - [Best Vector Databases 2025 - Firecrawl](https://www.firecrawl.dev/blog/best-vector-databases-2025) - [pgvector vs Qdrant Comparison - MyScale](https://www.myscale.com/blog/comprehensive-comparison-pgvector-vs-qdrant-performance-vector-database-benchmarks/) - [Multi-Tenancy in Vector Databases - Pinecone](https://www.pinecone.io/learn/series/vector-databases-in-production-for-busy-engineers/vector-database-multi-tenancy/) ### Embedding Models - [Best Embedding Models 2025 - Elephas](https://elephas.app/blog/best-embedding-models) - [6 Best Code Embedding Models - Modal](https://modal.com/blog/6-best-code-embedding-models-compared) - [LiteLLM Embedding Documentation](https://docs.litellm.ai/docs/embedding/supported_embedding) ### Chunking & RAG - [Chunking Strategies for RAG - Weaviate](https://weaviate.io/blog/chunking-strategies-for-rag) - [Breaking Up is Hard to Do - Stack Overflow](https://stackoverflow.blog/2024/12/27/breaking-up-is-hard-to-do-chunking-in-rag-applications/) - [Best Chunking Strategies 2025 - Firecrawl](https://www.firecrawl.dev/blog/best-chunking-strategies-rag-2025) ### Hybrid Search - [Hybrid Search in PostgreSQL - ParadeDB](https://www.paradedb.com/blog/hybrid-search-in-postgresql-the-missing-manual) - [Hybrid Search with pgvector - Jonathan Katz](https://jkatz05.com/post/postgres/hybrid-search-postgres-pgvector/) - [Stop the Hallucinations - Cloudurable](https://cloudurable.com/blog/stop-the-hallucinations-hybrid-retrieval-with-bm25-pgvector-embedding-rerank-llm-rubric-rerank-hyde/) ### Multi-Tenant RAG - [Multi-Tenant RAG with PostgreSQL - Timescale](https://www.tigerdata.com/blog/building-multi-tenant-rag-applications-with-postgresql-choosing-the-right-approach) - [Building Multi-Tenancy RAG with Milvus](https://milvus.io/blog/build-multi-tenancy-rag-with-milvus-best-practices-part-one.md) ### pgvector - [pgvector GitHub](https://github.com/pgvector/pgvector) - [HNSW Indexes with pgvector - Crunchy Data](https://www.crunchydata.com/blog/hnsw-indexes-with-postgres-and-pgvector) - [Optimize pgvector - Neon](https://neon.com/docs/ai/ai-vector-search-optimization) --- ## Decision **Adopt pgvector with hybrid search** as the knowledge base solution for Syndarix RAG: 1. **pgvector** for vector storage and similarity search 2. **PostgreSQL full-text search** (tsvector) for keyword matching 3. **Reciprocal Rank Fusion (RRF)** for combining results 4. **LiteLLM** for unified embedding API 5. **Content-type-aware chunking** with AST parsing for code 6. **Shared table with tenant isolation** via project_id/agent_id **Migration Path:** If any project exceeds 10M vectors or requires sub-10ms latency, evaluate Qdrant as a dedicated vector store while keeping metadata in PostgreSQL. --- *Spike completed. Findings will inform ADR-006: Knowledge Base Architecture.*