Files
syndarix/docs/spikes/SPIKE-006-knowledge-base-pgvector.md
Felipe Cardoso 5594655fba docs: add architecture spikes and deep analysis documentation
Add comprehensive spike research documents:
- SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid)
- SPIKE-006: Knowledge Base pgvector (RAG with hybrid search)
- SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams)
- SPIKE-008: Workflow State Machine (transitions lib + event sourcing)
- SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution)
- SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement)
- SPIKE-011: Audit Logging (structured event sourcing)
- SPIKE-012: Client Approval Flow (checkpoint-based approvals)

Add architecture documentation:
- ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy
- IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan

Closes #2, #6, #7, #8, #9, #10, #11, #12

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 13:31:02 +01:00

1260 lines
42 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPIKE-006: Knowledge Base with pgvector for RAG System
**Status:** Completed
**Date:** 2025-12-29
**Author:** Architecture Team
**Related Issue:** #6
---
## Executive Summary
This spike researches the optimal approach for implementing a knowledge base system to enable RAG (Retrieval-Augmented Generation) for Syndarix AI agents. After evaluating options, we recommend **pgvector with hybrid search** as the primary solution.
### Key Recommendation
**Use pgvector** for the following reasons:
- Already using PostgreSQL in the stack (operational simplicity)
- Handles 10-100M vectors effectively (sufficient for project-scoped knowledge)
- Transactional consistency with application data
- Native hybrid search with PostgreSQL full-text search
- Row-level security for multi-tenant isolation
- Integrates seamlessly with existing migrations and tooling
For projects that scale beyond 100M vectors per tenant, consider migration to Qdrant (open-source, high-performance) or Pinecone (fully managed).
---
## Research Questions & Findings
### 1. pgvector vs Dedicated Vector Databases
| Feature | pgvector | Pinecone | Qdrant | Weaviate |
|---------|----------|----------|--------|----------|
| **Max Scale** | 10-100M vectors | Billions | Billions | Billions |
| **Self-Hosted** | Yes | No | Yes | Yes |
| **Managed Option** | RDS, Neon, Supabase | Yes (only) | Yes | Yes |
| **Query Latency** | Good (<100ms) | Excellent | Excellent | Good |
| **Hybrid Search** | Native + pg_search | Sparse vectors | Native | Native |
| **Cost (1M vectors)** | ~$0 (existing DB) | $20-30/mo | ~$27/mo | Variable |
| **Operational Overhead** | Zero (existing) | None | Medium | Medium |
| **ACID Transactions** | Yes | No | No | No |
**Why pgvector for Syndarix:**
- Per-project knowledge isolation means smaller vector sets (thousands to millions, not billions)
- Transactional ingest: embed and index in the same INSERT as application data
- Single database backup/restore story
- Migration path exists if scale requires dedicated solution
**When to Consider Alternatives:**
- Pinecone: Zero-ops requirement, budget available, billions of vectors
- Qdrant: Need advanced filtering, high QPS, open-source preference
- Weaviate: Multi-modal (images, video), knowledge graph features
### 2. Embedding Model Recommendations
Based on [research from 2024-2025](https://elephas.app/blog/best-embedding-models) and [Modal's code embedding comparison](https://modal.com/blog/6-best-code-embedding-models-compared):
| Model | Best For | Dimensions | Cost/1M tokens | Notes |
|-------|----------|------------|----------------|-------|
| **text-embedding-3-small** | General text, docs | 512-1536 | $0.02 | Good balance |
| **text-embedding-3-large** | High accuracy needs | 256-3072 | $0.13 | Dimension reduction |
| **voyage-code-3** | Code retrieval | 1024 | $0.06 | State-of-art for code |
| **voyage-3-large** | General + code | 1024 | $0.12 | Top leaderboard |
| **nomic-embed-text** | Open-source, local | 768 | Free | Ollama compatible |
**Recommendation for Syndarix:**
```python
# Content-type based model selection
EMBEDDING_MODELS = {
"code": "voyage/voyage-code-3", # Code files (.py, .js, etc.)
"documentation": "text-embedding-3-small", # Markdown, docs
"general": "text-embedding-3-small", # Default
"high_accuracy": "voyage/voyage-3-large", # Critical queries
"local": "ollama/nomic-embed-text", # Fallback / dev
}
```
**LiteLLM Integration:**
```python
from litellm import embedding
# Via LiteLLM (unified interface)
response = await embedding(
model="voyage/voyage-code-3",
input=["def hello(): return 'world'"],
)
vector = response.data[0].embedding
```
### 3. Chunking Strategies
Based on [Weaviate's research](https://weaviate.io/blog/chunking-strategies-for-rag) and [Stack Overflow's analysis](https://stackoverflow.blog/2024/12/27/breaking-up-is-hard-to-do-chunking-in-rag-applications/):
**Strategy by Content Type:**
| Content Type | Strategy | Chunk Size | Overlap | Notes |
|--------------|----------|------------|---------|-------|
| **Code Files** | AST-based / Function | Per function/class | None | Preserve semantic units |
| **Markdown Docs** | Heading-based | Per section | 10% | Respect document structure |
| **PDF Specs** | Page-level + semantic | 1000 tokens | 15% | NVIDIA recommends page-level |
| **Conversations** | Turn-based | Per exchange | Context window | Preserve dialogue flow |
| **API Docs** | Endpoint-based | Per endpoint | None | Group by resource |
**Implementation:**
```python
# app/services/knowledge/chunkers.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
import tree_sitter_python as tspython
from tree_sitter import Parser
@dataclass
class Chunk:
content: str
metadata: dict
start_line: int | None = None
end_line: int | None = None
class BaseChunker(ABC):
@abstractmethod
def chunk(self, content: str, metadata: dict) -> List[Chunk]:
pass
class CodeChunker(BaseChunker):
"""AST-based chunking for source code."""
def __init__(self, language: str = "python"):
self.parser = Parser()
if language == "python":
self.parser.set_language(tspython.language())
def chunk(self, content: str, metadata: dict) -> List[Chunk]:
tree = self.parser.parse(bytes(content, "utf8"))
chunks = []
for node in tree.root_node.children:
if node.type in ("function_definition", "class_definition"):
chunk_content = content[node.start_byte:node.end_byte]
chunks.append(Chunk(
content=chunk_content,
metadata={
**metadata,
"type": node.type,
"name": self._get_name(node),
},
start_line=node.start_point[0],
end_line=node.end_point[0],
))
# Handle module-level code
if not chunks:
chunks.append(Chunk(content=content, metadata=metadata))
return chunks
def _get_name(self, node) -> str:
for child in node.children:
if child.type == "identifier":
return child.text.decode("utf8")
return "unknown"
class MarkdownChunker(BaseChunker):
"""Heading-based chunking for markdown."""
def __init__(self, max_tokens: int = 1000, overlap_ratio: float = 0.1):
self.max_tokens = max_tokens
self.overlap_ratio = overlap_ratio
def chunk(self, content: str, metadata: dict) -> List[Chunk]:
import re
# Split by headings
sections = re.split(r'^(#{1,6}\s+.+)$', content, flags=re.MULTILINE)
chunks = []
current_heading = ""
for i, section in enumerate(sections):
if section.startswith('#'):
current_heading = section.strip()
elif section.strip():
chunks.append(Chunk(
content=f"{current_heading}\n\n{section.strip()}",
metadata={
**metadata,
"heading": current_heading,
"section_index": i,
}
))
return self._apply_overlap(chunks)
def _apply_overlap(self, chunks: List[Chunk]) -> List[Chunk]:
# Add overlap between chunks for context
for i in range(1, len(chunks)):
overlap_size = int(len(chunks[i-1].content) * self.overlap_ratio)
overlap_text = chunks[i-1].content[-overlap_size:]
chunks[i].content = f"[Context: ...{overlap_text}]\n\n{chunks[i].content}"
return chunks
class SemanticChunker(BaseChunker):
"""Semantic chunking based on embedding similarity."""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
from litellm import embedding
self.embed = embedding
self.model = embedding_model
self.similarity_threshold = 0.7
async def chunk(self, content: str, metadata: dict) -> List[Chunk]:
import nltk
sentences = nltk.sent_tokenize(content)
# Get embeddings for each sentence
response = await self.embed(model=self.model, input=sentences)
embeddings = [d.embedding for d in response.data]
# Group sentences by semantic similarity
chunks = []
current_chunk = [sentences[0]]
current_embedding = embeddings[0]
for i in range(1, len(sentences)):
similarity = self._cosine_similarity(current_embedding, embeddings[i])
if similarity > self.similarity_threshold:
current_chunk.append(sentences[i])
else:
chunks.append(Chunk(
content=" ".join(current_chunk),
metadata=metadata
))
current_chunk = [sentences[i]]
current_embedding = embeddings[i]
if current_chunk:
chunks.append(Chunk(content=" ".join(current_chunk), metadata=metadata))
return chunks
def _cosine_similarity(self, a, b):
import numpy as np
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
```
### 4. Hybrid Search (Semantic + Keyword)
Hybrid search combines the precision of BM25 keyword matching with semantic vector similarity. Based on [ParadeDB's research](https://www.paradedb.com/blog/hybrid-search-in-postgresql-the-missing-manual):
**Approach: Reciprocal Rank Fusion (RRF)**
```sql
-- Hybrid search with RRF scoring
WITH semantic_results AS (
SELECT id, content,
1 - (embedding <=> $1::vector) as semantic_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) as semantic_rank
FROM knowledge_chunks
WHERE project_id = $2
ORDER BY embedding <=> $1::vector
LIMIT 20
),
keyword_results AS (
SELECT id, content,
ts_rank(search_vector, plainto_tsquery('english', $3)) as keyword_score,
ROW_NUMBER() OVER (ORDER BY ts_rank(search_vector, plainto_tsquery('english', $3)) DESC) as keyword_rank
FROM knowledge_chunks
WHERE project_id = $2
AND search_vector @@ plainto_tsquery('english', $3)
ORDER BY keyword_score DESC
LIMIT 20
)
SELECT
COALESCE(s.id, k.id) as id,
COALESCE(s.content, k.content) as content,
-- RRF formula: 1/(k + rank) where k=60 is standard
(1.0 / (60 + COALESCE(s.semantic_rank, 1000))) +
(1.0 / (60 + COALESCE(k.keyword_rank, 1000))) as rrf_score
FROM semantic_results s
FULL OUTER JOIN keyword_results k ON s.id = k.id
ORDER BY rrf_score DESC
LIMIT 10;
```
**Implementation with SQLAlchemy:**
```python
# app/services/knowledge/search.py
from sqlalchemy import text, func
from sqlalchemy.ext.asyncio import AsyncSession
from pgvector.sqlalchemy import Vector
class HybridSearchService:
def __init__(self, db: AsyncSession):
self.db = db
async def search(
self,
query: str,
query_embedding: list[float],
project_id: str,
agent_id: str | None = None,
limit: int = 10,
semantic_weight: float = 0.5,
) -> list[dict]:
"""
Hybrid search combining semantic and keyword matching.
Args:
query: Natural language query
query_embedding: Pre-computed query embedding
project_id: Project scope
agent_id: Optional agent-specific scope
limit: Max results
semantic_weight: 0-1, weight for semantic vs keyword
"""
keyword_weight = 1 - semantic_weight
sql = text("""
WITH semantic AS (
SELECT id, content, metadata,
1 - (embedding <=> :embedding::vector) as score,
ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding::vector) as rank
FROM knowledge_chunks
WHERE project_id = :project_id
AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL)
ORDER BY embedding <=> :embedding::vector
LIMIT 30
),
keyword AS (
SELECT id, content, metadata,
ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) as score,
ROW_NUMBER() OVER (
ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) DESC
) as rank
FROM knowledge_chunks
WHERE project_id = :project_id
AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL)
AND search_vector @@ websearch_to_tsquery('english', :query)
ORDER BY score DESC
LIMIT 30
)
SELECT
COALESCE(s.id, k.id) as id,
COALESCE(s.content, k.content) as content,
COALESCE(s.metadata, k.metadata) as metadata,
(
:semantic_weight * (1.0 / (60 + COALESCE(s.rank, 1000))) +
:keyword_weight * (1.0 / (60 + COALESCE(k.rank, 1000)))
) as combined_score,
s.score as semantic_score,
k.score as keyword_score
FROM semantic s
FULL OUTER JOIN keyword k ON s.id = k.id
ORDER BY combined_score DESC
LIMIT :limit
""")
result = await self.db.execute(sql, {
"embedding": query_embedding,
"query": query,
"project_id": project_id,
"agent_id": agent_id,
"semantic_weight": semantic_weight,
"keyword_weight": keyword_weight,
"limit": limit,
})
return [dict(row._mapping) for row in result.fetchall()]
```
### 5. Multi-Tenant Vector Collections
Based on [Timescale's research on multi-tenant RAG](https://www.tigerdata.com/blog/building-multi-tenant-rag-applications-with-postgresql-choosing-the-right-approach):
**Recommended Pattern: Shared Table with Tenant ID**
For Syndarix, use a shared table with `project_id` and `agent_id` columns:
```python
# app/models/knowledge.py
from sqlalchemy import Column, String, Text, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from pgvector.sqlalchemy import Vector
from app.db.base import Base
class KnowledgeChunk(Base):
__tablename__ = "knowledge_chunks"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
# Multi-tenant isolation
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False, index=True)
agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True, index=True)
# Content
content = Column(Text, nullable=False)
content_type = Column(String(50), nullable=False) # code, markdown, pdf, etc.
# Source tracking
source_uri = Column(String(512)) # file path, URL, etc.
source_type = Column(String(50)) # file, url, conversation, etc.
# Vector embedding
embedding = Column(Vector(1536)) # Dimension depends on model
embedding_model = Column(String(100))
# Full-text search
search_vector = Column(TSVECTOR)
# Metadata
metadata = Column(JSONB, default={})
# Timestamps
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
__table_args__ = (
# HNSW index for vector similarity (per-project partitioning)
Index(
'ix_knowledge_chunks_embedding_hnsw',
'embedding',
postgresql_using='hnsw',
postgresql_with={'m': 16, 'ef_construction': 64},
postgresql_ops={'embedding': 'vector_cosine_ops'}
),
# GIN index for full-text search
Index(
'ix_knowledge_chunks_search_vector',
'search_vector',
postgresql_using='gin'
),
# Composite index for tenant isolation
Index('ix_knowledge_chunks_project_agent', 'project_id', 'agent_id'),
)
class KnowledgeCollection(Base):
"""Groups of chunks for organizing knowledge."""
__tablename__ = "knowledge_collections"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
name = Column(String(100), nullable=False)
description = Column(Text)
collection_type = Column(String(50)) # codebase, documentation, specs, etc.
# Configuration
chunking_strategy = Column(String(50), default="auto")
embedding_model = Column(String(100), default="text-embedding-3-small")
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
class ChunkCollectionAssociation(Base):
"""Many-to-many: chunks can belong to multiple collections."""
__tablename__ = "chunk_collection_associations"
chunk_id = Column(UUID, ForeignKey("knowledge_chunks.id"), primary_key=True)
collection_id = Column(UUID, ForeignKey("knowledge_collections.id"), primary_key=True)
```
**Row-Level Security (Optional but Recommended):**
```sql
-- Enable RLS on knowledge_chunks
ALTER TABLE knowledge_chunks ENABLE ROW LEVEL SECURITY;
-- Policy: Users can only access chunks from their projects
CREATE POLICY knowledge_chunk_project_isolation ON knowledge_chunks
USING (project_id IN (
SELECT project_id FROM project_members
WHERE user_id = current_setting('app.current_user_id')::uuid
));
```
### 6. Indexing Strategies for Large Codebases
**HNSW vs IVFFlat Selection:**
| Factor | HNSW | IVFFlat |
|--------|------|---------|
| Query speed | Faster | Slower |
| Build time | Slower | Faster |
| Memory | Higher | Lower |
| Accuracy | Higher | Lower |
| Use when | <10M vectors, high recall needed | >10M vectors, memory constrained |
**HNSW Parameter Guidelines:**
```sql
-- Small collections (<100K vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Medium collections (100K-1M vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 24, ef_construction = 100);
-- Large collections (1M-10M vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);
-- Query-time tuning
SET hnsw.ef_search = 100; -- Higher = better recall, slower
```
**Partial Indexes for Multi-Tenant:**
```sql
-- Create partial indexes per high-traffic project
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
WHERE project_id = 'frequently-queried-project-id';
```
**Build Performance:**
```sql
-- Speed up index builds
SET maintenance_work_mem = '2GB'; -- Ensure graph fits in memory
SET max_parallel_maintenance_workers = 7; -- Parallel building
```
### 7. Real-Time vs Batch Embedding Updates
**Recommendation: Hybrid Approach**
| Scenario | Strategy | Why |
|----------|----------|-----|
| New file added | Real-time | Immediate availability |
| Bulk import | Batch (Celery) | Avoid blocking |
| File modified | Debounced real-time | Avoid churning |
| Conversation | Real-time | Context needed now |
| Codebase sync | Scheduled batch | Efficient |
**Implementation:**
```python
# app/services/knowledge/ingestion.py
from celery import shared_task
from app.core.celery import celery_app
from app.services.knowledge.embedder import EmbeddingService
from app.services.knowledge.chunkers import get_chunker
class KnowledgeIngestionService:
def __init__(self, db: AsyncSession):
self.db = db
self.embedder = EmbeddingService()
async def ingest_realtime(
self,
project_id: str,
content: str,
content_type: str,
source_uri: str,
agent_id: str | None = None,
) -> list[str]:
"""Real-time ingestion for immediate availability."""
chunker = get_chunker(content_type)
chunks = chunker.chunk(content, {"source_uri": source_uri})
# Embed and store
chunk_ids = []
for chunk in chunks:
embedding = await self.embedder.embed(chunk.content, content_type)
db_chunk = KnowledgeChunk(
project_id=project_id,
agent_id=agent_id,
content=chunk.content,
content_type=content_type,
source_uri=source_uri,
embedding=embedding,
embedding_model=self.embedder.get_model(content_type),
metadata=chunk.metadata,
)
self.db.add(db_chunk)
chunk_ids.append(str(db_chunk.id))
await self.db.commit()
return chunk_ids
def schedule_batch_ingestion(
self,
project_id: str,
files: list[dict], # [{path, content_type}]
) -> str:
"""Schedule batch ingestion via Celery."""
task = batch_ingest_files.delay(project_id, files)
return task.id
@celery_app.task(bind=True, max_retries=3)
def batch_ingest_files(self, project_id: str, files: list[dict]):
"""Celery task for batch file ingestion."""
from app.core.database import get_sync_session
with get_sync_session() as db:
ingestion = KnowledgeIngestionService(db)
for file in files:
try:
# Read file content
with open(file["path"], "r") as f:
content = f.read()
# Process (sync version)
ingestion.ingest_sync(
project_id=project_id,
content=content,
content_type=file["content_type"],
source_uri=file["path"],
)
except Exception as e:
# Log and continue, don't fail entire batch
logger.error(f"Failed to ingest {file['path']}: {e}")
db.commit()
```
**Debounced Updates:**
```python
# app/services/knowledge/watcher.py
import asyncio
from collections import defaultdict
class KnowledgeUpdateDebouncer:
"""Debounce rapid file changes to avoid excessive re-embedding."""
def __init__(self, delay_seconds: float = 2.0):
self.delay = delay_seconds
self.pending: dict[str, asyncio.Task] = {}
async def schedule_update(
self,
file_path: str,
update_callback: callable,
):
"""Schedule an update, canceling any pending update for the same file."""
# Cancel existing pending update
if file_path in self.pending:
self.pending[file_path].cancel()
# Schedule new update
self.pending[file_path] = asyncio.create_task(
self._delayed_update(file_path, update_callback)
)
async def _delayed_update(self, file_path: str, callback: callable):
await asyncio.sleep(self.delay)
await callback(file_path)
del self.pending[file_path]
```
---
## Schema Design
### Database Schema
```sql
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- For fuzzy text matching
-- Knowledge chunks table
CREATE TABLE knowledge_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Multi-tenant isolation
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
agent_id UUID REFERENCES agent_instances(id) ON DELETE SET NULL,
collection_id UUID REFERENCES knowledge_collections(id) ON DELETE SET NULL,
-- Content
content TEXT NOT NULL,
content_type VARCHAR(50) NOT NULL,
-- Source tracking
source_uri VARCHAR(512),
source_type VARCHAR(50),
source_hash VARCHAR(64), -- For detecting changes
-- Vector embedding (1536 for text-embedding-3-small)
embedding vector(1536),
embedding_model VARCHAR(100),
-- Full-text search
search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(content, '')), 'A')
) STORED,
-- Metadata
metadata JSONB DEFAULT '{}',
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes
CREATE INDEX ix_knowledge_chunks_project ON knowledge_chunks(project_id);
CREATE INDEX ix_knowledge_chunks_agent ON knowledge_chunks(agent_id);
CREATE INDEX ix_knowledge_chunks_collection ON knowledge_chunks(collection_id);
CREATE INDEX ix_knowledge_chunks_source_hash ON knowledge_chunks(source_hash);
-- HNSW vector index
CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- GIN index for full-text search
CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector);
-- Knowledge collections
CREATE TABLE knowledge_collections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
name VARCHAR(100) NOT NULL,
description TEXT,
collection_type VARCHAR(50),
chunking_strategy VARCHAR(50) DEFAULT 'auto',
embedding_model VARCHAR(100) DEFAULT 'text-embedding-3-small',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(project_id, name)
);
-- Trigger for updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER knowledge_chunks_updated_at
BEFORE UPDATE ON knowledge_chunks
FOR EACH ROW
EXECUTE FUNCTION update_updated_at();
```
### Alembic Migration
```python
# alembic/versions/xxxx_add_knowledge_base.py
"""Add knowledge base tables for RAG
Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa
from pgvector.sqlalchemy import Vector
def upgrade():
# Enable extensions
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
# Create knowledge_collections table
op.create_table(
'knowledge_collections',
sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True),
sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id'), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('description', sa.Text()),
sa.Column('collection_type', sa.String(50)),
sa.Column('chunking_strategy', sa.String(50), default='auto'),
sa.Column('embedding_model', sa.String(100), default='text-embedding-3-small'),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True)),
sa.UniqueConstraint('project_id', 'name', name='uq_knowledge_collections_project_name'),
)
# Create knowledge_chunks table
op.create_table(
'knowledge_chunks',
sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True),
sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False),
sa.Column('agent_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('agent_instances.id', ondelete='SET NULL')),
sa.Column('collection_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('knowledge_collections.id', ondelete='SET NULL')),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('content_type', sa.String(50), nullable=False),
sa.Column('source_uri', sa.String(512)),
sa.Column('source_type', sa.String(50)),
sa.Column('source_hash', sa.String(64)),
sa.Column('embedding', Vector(1536)),
sa.Column('embedding_model', sa.String(100)),
sa.Column('metadata', sa.dialects.postgresql.JSONB(), default={}),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True)),
)
# Create indexes
op.create_index('ix_knowledge_chunks_project', 'knowledge_chunks', ['project_id'])
op.create_index('ix_knowledge_chunks_agent', 'knowledge_chunks', ['agent_id'])
op.create_index('ix_knowledge_chunks_collection', 'knowledge_chunks', ['collection_id'])
op.create_index('ix_knowledge_chunks_source_hash', 'knowledge_chunks', ['source_hash'])
# Create HNSW vector index
op.execute("""
CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
# Add full-text search column and index
op.execute("""
ALTER TABLE knowledge_chunks
ADD COLUMN search_vector tsvector
GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(content, '')), 'A')
) STORED
""")
op.execute("CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector)")
def downgrade():
op.drop_table('knowledge_chunks')
op.drop_table('knowledge_collections')
```
---
## Complete Service Implementation
```python
# app/services/knowledge/service.py
from dataclasses import dataclass
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from litellm import embedding as litellm_embedding
from app.models.knowledge import KnowledgeChunk, KnowledgeCollection
from app.services.knowledge.chunkers import get_chunker
from app.services.knowledge.search import HybridSearchService
@dataclass
class SearchResult:
id: str
content: str
metadata: dict
score: float
source_uri: Optional[str] = None
class KnowledgeBaseService:
"""
Main service for knowledge base operations.
Integrates with LiteLLM for embeddings.
"""
# Model selection by content type
EMBEDDING_MODELS = {
"code": "voyage/voyage-code-3",
"markdown": "text-embedding-3-small",
"pdf": "text-embedding-3-small",
"conversation": "text-embedding-3-small",
"default": "text-embedding-3-small",
}
def __init__(self, db: AsyncSession):
self.db = db
self.search_service = HybridSearchService(db)
async def create_collection(
self,
project_id: str,
name: str,
collection_type: str,
description: str = "",
chunking_strategy: str = "auto",
) -> KnowledgeCollection:
"""Create a new knowledge collection for a project."""
collection = KnowledgeCollection(
project_id=project_id,
name=name,
description=description,
collection_type=collection_type,
chunking_strategy=chunking_strategy,
embedding_model=self.EMBEDDING_MODELS.get(collection_type, "text-embedding-3-small"),
)
self.db.add(collection)
await self.db.commit()
await self.db.refresh(collection)
return collection
async def ingest(
self,
project_id: str,
content: str,
content_type: str,
source_uri: str,
agent_id: Optional[str] = None,
collection_id: Optional[str] = None,
metadata: Optional[dict] = None,
) -> list[str]:
"""
Ingest content into the knowledge base.
Automatically chunks and embeds the content.
"""
import hashlib
# Check for existing content by hash
source_hash = hashlib.sha256(content.encode()).hexdigest()
existing = await self.db.execute(
select(KnowledgeChunk).where(
KnowledgeChunk.project_id == project_id,
KnowledgeChunk.source_hash == source_hash,
)
)
if existing.scalar_one_or_none():
# Content unchanged, skip
return []
# Get appropriate chunker
chunker = get_chunker(content_type)
chunks = chunker.chunk(content, metadata or {})
# Get embedding model
model = self.EMBEDDING_MODELS.get(content_type, "text-embedding-3-small")
# Embed all chunks in batch
chunk_texts = [c.content for c in chunks]
embeddings = await self._embed_batch(chunk_texts, model)
# Store chunks
chunk_ids = []
for chunk, emb in zip(chunks, embeddings):
db_chunk = KnowledgeChunk(
project_id=project_id,
agent_id=agent_id,
collection_id=collection_id,
content=chunk.content,
content_type=content_type,
source_uri=source_uri,
source_type=self._infer_source_type(source_uri),
source_hash=source_hash,
embedding=emb,
embedding_model=model,
metadata={
**chunk.metadata,
**(metadata or {}),
},
)
self.db.add(db_chunk)
chunk_ids.append(str(db_chunk.id))
await self.db.commit()
return chunk_ids
async def search(
self,
project_id: str,
query: str,
agent_id: Optional[str] = None,
collection_id: Optional[str] = None,
limit: int = 10,
content_types: Optional[list[str]] = None,
semantic_weight: float = 0.6,
) -> list[SearchResult]:
"""
Search the knowledge base using hybrid search.
Args:
project_id: Project scope
query: Natural language query
agent_id: Optional agent-specific scope
collection_id: Optional collection scope
limit: Max results
content_types: Filter by content types
semantic_weight: 0-1, weight for semantic vs keyword
"""
# Get query embedding
query_embedding = await self._embed_query(query)
# Perform hybrid search
results = await self.search_service.search(
query=query,
query_embedding=query_embedding,
project_id=project_id,
agent_id=agent_id,
limit=limit,
semantic_weight=semantic_weight,
)
return [
SearchResult(
id=r["id"],
content=r["content"],
metadata=r.get("metadata", {}),
score=r["combined_score"],
source_uri=r.get("source_uri"),
)
for r in results
]
async def delete_by_source(
self,
project_id: str,
source_uri: str,
) -> int:
"""Delete all chunks from a specific source."""
result = await self.db.execute(
delete(KnowledgeChunk).where(
KnowledgeChunk.project_id == project_id,
KnowledgeChunk.source_uri == source_uri,
)
)
await self.db.commit()
return result.rowcount
async def _embed_batch(
self,
texts: list[str],
model: str,
) -> list[list[float]]:
"""Embed multiple texts in a single API call."""
response = await litellm_embedding(
model=model,
input=texts,
)
return [d["embedding"] for d in response.data]
async def _embed_query(self, query: str) -> list[float]:
"""Embed a query string."""
response = await litellm_embedding(
model="text-embedding-3-small",
input=[query],
)
return response.data[0]["embedding"]
def _infer_source_type(self, source_uri: str) -> str:
"""Infer source type from URI."""
if source_uri.startswith("http"):
return "url"
if source_uri.startswith("conversation:"):
return "conversation"
return "file"
```
---
## Performance Considerations
### Query Latency Targets
| Vector Count | Target Latency | Recommended Config |
|--------------|----------------|-------------------|
| <100K | <20ms | Default HNSW |
| 100K-1M | <50ms | m=24, ef_construction=100 |
| 1M-10M | <100ms | m=32, ef_construction=128, ef_search=100 |
### Memory Requirements
```
HNSW memory ≈ vectors × dimensions × 4 bytes × (1 + m/8)
Example: 1M vectors × 1536 dims × 4 bytes × (1 + 16/8) = ~9.2 GB
```
### Batch Embedding Costs
| Model | 1K chunks | 10K chunks | 100K chunks |
|-------|-----------|------------|-------------|
| text-embedding-3-small | $0.002 | $0.02 | $0.20 |
| voyage-code-3 | $0.006 | $0.06 | $0.60 |
| Local (nomic-embed) | $0 | $0 | $0 |
### Optimization Tips
1. **Use batch embedding** - Single API call for multiple chunks
2. **Cache query embeddings** - Same queries return same vectors
3. **Partial indexes** - Create per-project indexes for high-traffic projects
4. **Dimension reduction** - Use 512-dim with text-embedding-3-small for cost savings
5. **Connection pooling** - Use pgBouncer for high-concurrency scenarios
---
## Integration with Syndarix Agents
### Agent Context Retrieval
```python
# app/services/agent/context.py
class AgentContextBuilder:
"""Builds context for agent prompts using RAG."""
def __init__(self, kb_service: KnowledgeBaseService):
self.kb = kb_service
async def build_context(
self,
agent_id: str,
project_id: str,
task_description: str,
max_context_tokens: int = 4000,
) -> str:
"""
Build relevant context for an agent task.
Returns formatted context string for inclusion in prompt.
"""
# Search for relevant knowledge
results = await self.kb.search(
project_id=project_id,
query=task_description,
agent_id=agent_id, # Prefer agent-specific knowledge
limit=10,
semantic_weight=0.7,
)
# Format context
context_parts = []
current_tokens = 0
for result in results:
chunk_tokens = self._count_tokens(result.content)
if current_tokens + chunk_tokens > max_context_tokens:
break
context_parts.append(f"""
### Source: {result.source_uri or 'Unknown'}
{result.content}
""")
current_tokens += chunk_tokens
if not context_parts:
return ""
return f"""
## Relevant Context
The following information was retrieved from the project knowledge base:
{"".join(context_parts)}
---
"""
def _count_tokens(self, text: str) -> int:
"""Approximate token count."""
return len(text) // 4 # Rough estimate
```
### MCP Tool for Knowledge Access
```python
# app/mcp/tools/knowledge.py
from mcp import Tool, ToolResult
class KnowledgeSearchTool(Tool):
"""MCP tool for agents to search project knowledge."""
name = "search_knowledge"
description = "Search the project knowledge base for relevant information"
parameters = {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "The project ID to search within"
},
"query": {
"type": "string",
"description": "Natural language search query"
},
"content_types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by content types (code, markdown, pdf)"
},
"limit": {
"type": "integer",
"default": 5,
"description": "Maximum results to return"
}
},
"required": ["project_id", "query"]
}
async def execute(self, **params) -> ToolResult:
results = await self.kb_service.search(
project_id=params["project_id"],
query=params["query"],
content_types=params.get("content_types"),
limit=params.get("limit", 5),
)
return ToolResult(
content=[
{
"source": r.source_uri,
"content": r.content[:500] + "..." if len(r.content) > 500 else r.content,
"relevance_score": r.score,
}
for r in results
]
)
```
---
## References
### Vector Databases
- [Best Vector Databases 2025 - Firecrawl](https://www.firecrawl.dev/blog/best-vector-databases-2025)
- [pgvector vs Qdrant Comparison - MyScale](https://www.myscale.com/blog/comprehensive-comparison-pgvector-vs-qdrant-performance-vector-database-benchmarks/)
- [Multi-Tenancy in Vector Databases - Pinecone](https://www.pinecone.io/learn/series/vector-databases-in-production-for-busy-engineers/vector-database-multi-tenancy/)
### Embedding Models
- [Best Embedding Models 2025 - Elephas](https://elephas.app/blog/best-embedding-models)
- [6 Best Code Embedding Models - Modal](https://modal.com/blog/6-best-code-embedding-models-compared)
- [LiteLLM Embedding Documentation](https://docs.litellm.ai/docs/embedding/supported_embedding)
### Chunking & RAG
- [Chunking Strategies for RAG - Weaviate](https://weaviate.io/blog/chunking-strategies-for-rag)
- [Breaking Up is Hard to Do - Stack Overflow](https://stackoverflow.blog/2024/12/27/breaking-up-is-hard-to-do-chunking-in-rag-applications/)
- [Best Chunking Strategies 2025 - Firecrawl](https://www.firecrawl.dev/blog/best-chunking-strategies-rag-2025)
### Hybrid Search
- [Hybrid Search in PostgreSQL - ParadeDB](https://www.paradedb.com/blog/hybrid-search-in-postgresql-the-missing-manual)
- [Hybrid Search with pgvector - Jonathan Katz](https://jkatz05.com/post/postgres/hybrid-search-postgres-pgvector/)
- [Stop the Hallucinations - Cloudurable](https://cloudurable.com/blog/stop-the-hallucinations-hybrid-retrieval-with-bm25-pgvector-embedding-rerank-llm-rubric-rerank-hyde/)
### Multi-Tenant RAG
- [Multi-Tenant RAG with PostgreSQL - Timescale](https://www.tigerdata.com/blog/building-multi-tenant-rag-applications-with-postgresql-choosing-the-right-approach)
- [Building Multi-Tenancy RAG with Milvus](https://milvus.io/blog/build-multi-tenancy-rag-with-milvus-best-practices-part-one.md)
### pgvector
- [pgvector GitHub](https://github.com/pgvector/pgvector)
- [HNSW Indexes with pgvector - Crunchy Data](https://www.crunchydata.com/blog/hnsw-indexes-with-postgres-and-pgvector)
- [Optimize pgvector - Neon](https://neon.com/docs/ai/ai-vector-search-optimization)
---
## Decision
**Adopt pgvector with hybrid search** as the knowledge base solution for Syndarix RAG:
1. **pgvector** for vector storage and similarity search
2. **PostgreSQL full-text search** (tsvector) for keyword matching
3. **Reciprocal Rank Fusion (RRF)** for combining results
4. **LiteLLM** for unified embedding API
5. **Content-type-aware chunking** with AST parsing for code
6. **Shared table with tenant isolation** via project_id/agent_id
**Migration Path:** If any project exceeds 10M vectors or requires sub-10ms latency, evaluate Qdrant as a dedicated vector store while keeping metadata in PostgreSQL.
---
*Spike completed. Findings will inform ADR-006: Knowledge Base Architecture.*