Add comprehensive spike research documents: - SPIKE-002: Agent Orchestration Pattern (LangGraph + Temporal hybrid) - SPIKE-006: Knowledge Base pgvector (RAG with hybrid search) - SPIKE-007: Agent Communication Protocol (JSON-RPC + Redis Streams) - SPIKE-008: Workflow State Machine (transitions lib + event sourcing) - SPIKE-009: Issue Synchronization (bi-directional sync with conflict resolution) - SPIKE-010: Cost Tracking (LiteLLM callbacks + budget enforcement) - SPIKE-011: Audit Logging (structured event sourcing) - SPIKE-012: Client Approval Flow (checkpoint-based approvals) Add architecture documentation: - ARCHITECTURE_DEEP_ANALYSIS.md: Memory management, security, testing strategy - IMPLEMENTATION_ROADMAP.md: 6-phase, 24-week implementation plan Closes #2, #6, #7, #8, #9, #10, #11, #12 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
42 KiB
SPIKE-006: Knowledge Base with pgvector for RAG System
Status: Completed Date: 2025-12-29 Author: Architecture Team Related Issue: #6
Executive Summary
This spike researches the optimal approach for implementing a knowledge base system to enable RAG (Retrieval-Augmented Generation) for Syndarix AI agents. After evaluating options, we recommend pgvector with hybrid search as the primary solution.
Key Recommendation
Use pgvector for the following reasons:
- Already using PostgreSQL in the stack (operational simplicity)
- Handles 10-100M vectors effectively (sufficient for project-scoped knowledge)
- Transactional consistency with application data
- Native hybrid search with PostgreSQL full-text search
- Row-level security for multi-tenant isolation
- Integrates seamlessly with existing migrations and tooling
For projects that scale beyond 100M vectors per tenant, consider migration to Qdrant (open-source, high-performance) or Pinecone (fully managed).
Research Questions & Findings
1. pgvector vs Dedicated Vector Databases
| Feature | pgvector | Pinecone | Qdrant | Weaviate |
|---|---|---|---|---|
| Max Scale | 10-100M vectors | Billions | Billions | Billions |
| Self-Hosted | Yes | No | Yes | Yes |
| Managed Option | RDS, Neon, Supabase | Yes (only) | Yes | Yes |
| Query Latency | Good (<100ms) | Excellent | Excellent | Good |
| Hybrid Search | Native + pg_search | Sparse vectors | Native | Native |
| Cost (1M vectors) | ~$0 (existing DB) | $20-30/mo | ~$27/mo | Variable |
| Operational Overhead | Zero (existing) | None | Medium | Medium |
| ACID Transactions | Yes | No | No | No |
Why pgvector for Syndarix:
- Per-project knowledge isolation means smaller vector sets (thousands to millions, not billions)
- Transactional ingest: embed and index in the same INSERT as application data
- Single database backup/restore story
- Migration path exists if scale requires dedicated solution
When to Consider Alternatives:
- Pinecone: Zero-ops requirement, budget available, billions of vectors
- Qdrant: Need advanced filtering, high QPS, open-source preference
- Weaviate: Multi-modal (images, video), knowledge graph features
2. Embedding Model Recommendations
Based on research from 2024-2025 and Modal's code embedding comparison:
| Model | Best For | Dimensions | Cost/1M tokens | Notes |
|---|---|---|---|---|
| text-embedding-3-small | General text, docs | 512-1536 | $0.02 | Good balance |
| text-embedding-3-large | High accuracy needs | 256-3072 | $0.13 | Dimension reduction |
| voyage-code-3 | Code retrieval | 1024 | $0.06 | State-of-art for code |
| voyage-3-large | General + code | 1024 | $0.12 | Top leaderboard |
| nomic-embed-text | Open-source, local | 768 | Free | Ollama compatible |
Recommendation for Syndarix:
# Content-type based model selection
EMBEDDING_MODELS = {
"code": "voyage/voyage-code-3", # Code files (.py, .js, etc.)
"documentation": "text-embedding-3-small", # Markdown, docs
"general": "text-embedding-3-small", # Default
"high_accuracy": "voyage/voyage-3-large", # Critical queries
"local": "ollama/nomic-embed-text", # Fallback / dev
}
LiteLLM Integration:
from litellm import embedding
# Via LiteLLM (unified interface)
response = await embedding(
model="voyage/voyage-code-3",
input=["def hello(): return 'world'"],
)
vector = response.data[0].embedding
3. Chunking Strategies
Based on Weaviate's research and Stack Overflow's analysis:
Strategy by Content Type:
| Content Type | Strategy | Chunk Size | Overlap | Notes |
|---|---|---|---|---|
| Code Files | AST-based / Function | Per function/class | None | Preserve semantic units |
| Markdown Docs | Heading-based | Per section | 10% | Respect document structure |
| PDF Specs | Page-level + semantic | 1000 tokens | 15% | NVIDIA recommends page-level |
| Conversations | Turn-based | Per exchange | Context window | Preserve dialogue flow |
| API Docs | Endpoint-based | Per endpoint | None | Group by resource |
Implementation:
# app/services/knowledge/chunkers.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
import tree_sitter_python as tspython
from tree_sitter import Parser
@dataclass
class Chunk:
content: str
metadata: dict
start_line: int | None = None
end_line: int | None = None
class BaseChunker(ABC):
@abstractmethod
def chunk(self, content: str, metadata: dict) -> List[Chunk]:
pass
class CodeChunker(BaseChunker):
"""AST-based chunking for source code."""
def __init__(self, language: str = "python"):
self.parser = Parser()
if language == "python":
self.parser.set_language(tspython.language())
def chunk(self, content: str, metadata: dict) -> List[Chunk]:
tree = self.parser.parse(bytes(content, "utf8"))
chunks = []
for node in tree.root_node.children:
if node.type in ("function_definition", "class_definition"):
chunk_content = content[node.start_byte:node.end_byte]
chunks.append(Chunk(
content=chunk_content,
metadata={
**metadata,
"type": node.type,
"name": self._get_name(node),
},
start_line=node.start_point[0],
end_line=node.end_point[0],
))
# Handle module-level code
if not chunks:
chunks.append(Chunk(content=content, metadata=metadata))
return chunks
def _get_name(self, node) -> str:
for child in node.children:
if child.type == "identifier":
return child.text.decode("utf8")
return "unknown"
class MarkdownChunker(BaseChunker):
"""Heading-based chunking for markdown."""
def __init__(self, max_tokens: int = 1000, overlap_ratio: float = 0.1):
self.max_tokens = max_tokens
self.overlap_ratio = overlap_ratio
def chunk(self, content: str, metadata: dict) -> List[Chunk]:
import re
# Split by headings
sections = re.split(r'^(#{1,6}\s+.+)$', content, flags=re.MULTILINE)
chunks = []
current_heading = ""
for i, section in enumerate(sections):
if section.startswith('#'):
current_heading = section.strip()
elif section.strip():
chunks.append(Chunk(
content=f"{current_heading}\n\n{section.strip()}",
metadata={
**metadata,
"heading": current_heading,
"section_index": i,
}
))
return self._apply_overlap(chunks)
def _apply_overlap(self, chunks: List[Chunk]) -> List[Chunk]:
# Add overlap between chunks for context
for i in range(1, len(chunks)):
overlap_size = int(len(chunks[i-1].content) * self.overlap_ratio)
overlap_text = chunks[i-1].content[-overlap_size:]
chunks[i].content = f"[Context: ...{overlap_text}]\n\n{chunks[i].content}"
return chunks
class SemanticChunker(BaseChunker):
"""Semantic chunking based on embedding similarity."""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
from litellm import embedding
self.embed = embedding
self.model = embedding_model
self.similarity_threshold = 0.7
async def chunk(self, content: str, metadata: dict) -> List[Chunk]:
import nltk
sentences = nltk.sent_tokenize(content)
# Get embeddings for each sentence
response = await self.embed(model=self.model, input=sentences)
embeddings = [d.embedding for d in response.data]
# Group sentences by semantic similarity
chunks = []
current_chunk = [sentences[0]]
current_embedding = embeddings[0]
for i in range(1, len(sentences)):
similarity = self._cosine_similarity(current_embedding, embeddings[i])
if similarity > self.similarity_threshold:
current_chunk.append(sentences[i])
else:
chunks.append(Chunk(
content=" ".join(current_chunk),
metadata=metadata
))
current_chunk = [sentences[i]]
current_embedding = embeddings[i]
if current_chunk:
chunks.append(Chunk(content=" ".join(current_chunk), metadata=metadata))
return chunks
def _cosine_similarity(self, a, b):
import numpy as np
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
4. Hybrid Search (Semantic + Keyword)
Hybrid search combines the precision of BM25 keyword matching with semantic vector similarity. Based on ParadeDB's research:
Approach: Reciprocal Rank Fusion (RRF)
-- Hybrid search with RRF scoring
WITH semantic_results AS (
SELECT id, content,
1 - (embedding <=> $1::vector) as semantic_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> $1::vector) as semantic_rank
FROM knowledge_chunks
WHERE project_id = $2
ORDER BY embedding <=> $1::vector
LIMIT 20
),
keyword_results AS (
SELECT id, content,
ts_rank(search_vector, plainto_tsquery('english', $3)) as keyword_score,
ROW_NUMBER() OVER (ORDER BY ts_rank(search_vector, plainto_tsquery('english', $3)) DESC) as keyword_rank
FROM knowledge_chunks
WHERE project_id = $2
AND search_vector @@ plainto_tsquery('english', $3)
ORDER BY keyword_score DESC
LIMIT 20
)
SELECT
COALESCE(s.id, k.id) as id,
COALESCE(s.content, k.content) as content,
-- RRF formula: 1/(k + rank) where k=60 is standard
(1.0 / (60 + COALESCE(s.semantic_rank, 1000))) +
(1.0 / (60 + COALESCE(k.keyword_rank, 1000))) as rrf_score
FROM semantic_results s
FULL OUTER JOIN keyword_results k ON s.id = k.id
ORDER BY rrf_score DESC
LIMIT 10;
Implementation with SQLAlchemy:
# app/services/knowledge/search.py
from sqlalchemy import text, func
from sqlalchemy.ext.asyncio import AsyncSession
from pgvector.sqlalchemy import Vector
class HybridSearchService:
def __init__(self, db: AsyncSession):
self.db = db
async def search(
self,
query: str,
query_embedding: list[float],
project_id: str,
agent_id: str | None = None,
limit: int = 10,
semantic_weight: float = 0.5,
) -> list[dict]:
"""
Hybrid search combining semantic and keyword matching.
Args:
query: Natural language query
query_embedding: Pre-computed query embedding
project_id: Project scope
agent_id: Optional agent-specific scope
limit: Max results
semantic_weight: 0-1, weight for semantic vs keyword
"""
keyword_weight = 1 - semantic_weight
sql = text("""
WITH semantic AS (
SELECT id, content, metadata,
1 - (embedding <=> :embedding::vector) as score,
ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding::vector) as rank
FROM knowledge_chunks
WHERE project_id = :project_id
AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL)
ORDER BY embedding <=> :embedding::vector
LIMIT 30
),
keyword AS (
SELECT id, content, metadata,
ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) as score,
ROW_NUMBER() OVER (
ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery('english', :query)) DESC
) as rank
FROM knowledge_chunks
WHERE project_id = :project_id
AND (:agent_id IS NULL OR agent_id = :agent_id OR agent_id IS NULL)
AND search_vector @@ websearch_to_tsquery('english', :query)
ORDER BY score DESC
LIMIT 30
)
SELECT
COALESCE(s.id, k.id) as id,
COALESCE(s.content, k.content) as content,
COALESCE(s.metadata, k.metadata) as metadata,
(
:semantic_weight * (1.0 / (60 + COALESCE(s.rank, 1000))) +
:keyword_weight * (1.0 / (60 + COALESCE(k.rank, 1000)))
) as combined_score,
s.score as semantic_score,
k.score as keyword_score
FROM semantic s
FULL OUTER JOIN keyword k ON s.id = k.id
ORDER BY combined_score DESC
LIMIT :limit
""")
result = await self.db.execute(sql, {
"embedding": query_embedding,
"query": query,
"project_id": project_id,
"agent_id": agent_id,
"semantic_weight": semantic_weight,
"keyword_weight": keyword_weight,
"limit": limit,
})
return [dict(row._mapping) for row in result.fetchall()]
5. Multi-Tenant Vector Collections
Based on Timescale's research on multi-tenant RAG:
Recommended Pattern: Shared Table with Tenant ID
For Syndarix, use a shared table with project_id and agent_id columns:
# app/models/knowledge.py
from sqlalchemy import Column, String, Text, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from pgvector.sqlalchemy import Vector
from app.db.base import Base
class KnowledgeChunk(Base):
__tablename__ = "knowledge_chunks"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
# Multi-tenant isolation
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False, index=True)
agent_id = Column(UUID, ForeignKey("agent_instances.id"), nullable=True, index=True)
# Content
content = Column(Text, nullable=False)
content_type = Column(String(50), nullable=False) # code, markdown, pdf, etc.
# Source tracking
source_uri = Column(String(512)) # file path, URL, etc.
source_type = Column(String(50)) # file, url, conversation, etc.
# Vector embedding
embedding = Column(Vector(1536)) # Dimension depends on model
embedding_model = Column(String(100))
# Full-text search
search_vector = Column(TSVECTOR)
# Metadata
metadata = Column(JSONB, default={})
# Timestamps
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
__table_args__ = (
# HNSW index for vector similarity (per-project partitioning)
Index(
'ix_knowledge_chunks_embedding_hnsw',
'embedding',
postgresql_using='hnsw',
postgresql_with={'m': 16, 'ef_construction': 64},
postgresql_ops={'embedding': 'vector_cosine_ops'}
),
# GIN index for full-text search
Index(
'ix_knowledge_chunks_search_vector',
'search_vector',
postgresql_using='gin'
),
# Composite index for tenant isolation
Index('ix_knowledge_chunks_project_agent', 'project_id', 'agent_id'),
)
class KnowledgeCollection(Base):
"""Groups of chunks for organizing knowledge."""
__tablename__ = "knowledge_collections"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
project_id = Column(UUID, ForeignKey("projects.id"), nullable=False)
name = Column(String(100), nullable=False)
description = Column(Text)
collection_type = Column(String(50)) # codebase, documentation, specs, etc.
# Configuration
chunking_strategy = Column(String(50), default="auto")
embedding_model = Column(String(100), default="text-embedding-3-small")
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
class ChunkCollectionAssociation(Base):
"""Many-to-many: chunks can belong to multiple collections."""
__tablename__ = "chunk_collection_associations"
chunk_id = Column(UUID, ForeignKey("knowledge_chunks.id"), primary_key=True)
collection_id = Column(UUID, ForeignKey("knowledge_collections.id"), primary_key=True)
Row-Level Security (Optional but Recommended):
-- Enable RLS on knowledge_chunks
ALTER TABLE knowledge_chunks ENABLE ROW LEVEL SECURITY;
-- Policy: Users can only access chunks from their projects
CREATE POLICY knowledge_chunk_project_isolation ON knowledge_chunks
USING (project_id IN (
SELECT project_id FROM project_members
WHERE user_id = current_setting('app.current_user_id')::uuid
));
6. Indexing Strategies for Large Codebases
HNSW vs IVFFlat Selection:
| Factor | HNSW | IVFFlat |
|---|---|---|
| Query speed | Faster | Slower |
| Build time | Slower | Faster |
| Memory | Higher | Lower |
| Accuracy | Higher | Lower |
| Use when | <10M vectors, high recall needed | >10M vectors, memory constrained |
HNSW Parameter Guidelines:
-- Small collections (<100K vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Medium collections (100K-1M vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 24, ef_construction = 100);
-- Large collections (1M-10M vectors)
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);
-- Query-time tuning
SET hnsw.ef_search = 100; -- Higher = better recall, slower
Partial Indexes for Multi-Tenant:
-- Create partial indexes per high-traffic project
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
WHERE project_id = 'frequently-queried-project-id';
Build Performance:
-- Speed up index builds
SET maintenance_work_mem = '2GB'; -- Ensure graph fits in memory
SET max_parallel_maintenance_workers = 7; -- Parallel building
7. Real-Time vs Batch Embedding Updates
Recommendation: Hybrid Approach
| Scenario | Strategy | Why |
|---|---|---|
| New file added | Real-time | Immediate availability |
| Bulk import | Batch (Celery) | Avoid blocking |
| File modified | Debounced real-time | Avoid churning |
| Conversation | Real-time | Context needed now |
| Codebase sync | Scheduled batch | Efficient |
Implementation:
# app/services/knowledge/ingestion.py
from celery import shared_task
from app.core.celery import celery_app
from app.services.knowledge.embedder import EmbeddingService
from app.services.knowledge.chunkers import get_chunker
class KnowledgeIngestionService:
def __init__(self, db: AsyncSession):
self.db = db
self.embedder = EmbeddingService()
async def ingest_realtime(
self,
project_id: str,
content: str,
content_type: str,
source_uri: str,
agent_id: str | None = None,
) -> list[str]:
"""Real-time ingestion for immediate availability."""
chunker = get_chunker(content_type)
chunks = chunker.chunk(content, {"source_uri": source_uri})
# Embed and store
chunk_ids = []
for chunk in chunks:
embedding = await self.embedder.embed(chunk.content, content_type)
db_chunk = KnowledgeChunk(
project_id=project_id,
agent_id=agent_id,
content=chunk.content,
content_type=content_type,
source_uri=source_uri,
embedding=embedding,
embedding_model=self.embedder.get_model(content_type),
metadata=chunk.metadata,
)
self.db.add(db_chunk)
chunk_ids.append(str(db_chunk.id))
await self.db.commit()
return chunk_ids
def schedule_batch_ingestion(
self,
project_id: str,
files: list[dict], # [{path, content_type}]
) -> str:
"""Schedule batch ingestion via Celery."""
task = batch_ingest_files.delay(project_id, files)
return task.id
@celery_app.task(bind=True, max_retries=3)
def batch_ingest_files(self, project_id: str, files: list[dict]):
"""Celery task for batch file ingestion."""
from app.core.database import get_sync_session
with get_sync_session() as db:
ingestion = KnowledgeIngestionService(db)
for file in files:
try:
# Read file content
with open(file["path"], "r") as f:
content = f.read()
# Process (sync version)
ingestion.ingest_sync(
project_id=project_id,
content=content,
content_type=file["content_type"],
source_uri=file["path"],
)
except Exception as e:
# Log and continue, don't fail entire batch
logger.error(f"Failed to ingest {file['path']}: {e}")
db.commit()
Debounced Updates:
# app/services/knowledge/watcher.py
import asyncio
from collections import defaultdict
class KnowledgeUpdateDebouncer:
"""Debounce rapid file changes to avoid excessive re-embedding."""
def __init__(self, delay_seconds: float = 2.0):
self.delay = delay_seconds
self.pending: dict[str, asyncio.Task] = {}
async def schedule_update(
self,
file_path: str,
update_callback: callable,
):
"""Schedule an update, canceling any pending update for the same file."""
# Cancel existing pending update
if file_path in self.pending:
self.pending[file_path].cancel()
# Schedule new update
self.pending[file_path] = asyncio.create_task(
self._delayed_update(file_path, update_callback)
)
async def _delayed_update(self, file_path: str, callback: callable):
await asyncio.sleep(self.delay)
await callback(file_path)
del self.pending[file_path]
Schema Design
Database Schema
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- For fuzzy text matching
-- Knowledge chunks table
CREATE TABLE knowledge_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Multi-tenant isolation
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
agent_id UUID REFERENCES agent_instances(id) ON DELETE SET NULL,
collection_id UUID REFERENCES knowledge_collections(id) ON DELETE SET NULL,
-- Content
content TEXT NOT NULL,
content_type VARCHAR(50) NOT NULL,
-- Source tracking
source_uri VARCHAR(512),
source_type VARCHAR(50),
source_hash VARCHAR(64), -- For detecting changes
-- Vector embedding (1536 for text-embedding-3-small)
embedding vector(1536),
embedding_model VARCHAR(100),
-- Full-text search
search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(content, '')), 'A')
) STORED,
-- Metadata
metadata JSONB DEFAULT '{}',
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes
CREATE INDEX ix_knowledge_chunks_project ON knowledge_chunks(project_id);
CREATE INDEX ix_knowledge_chunks_agent ON knowledge_chunks(agent_id);
CREATE INDEX ix_knowledge_chunks_collection ON knowledge_chunks(collection_id);
CREATE INDEX ix_knowledge_chunks_source_hash ON knowledge_chunks(source_hash);
-- HNSW vector index
CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- GIN index for full-text search
CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector);
-- Knowledge collections
CREATE TABLE knowledge_collections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
name VARCHAR(100) NOT NULL,
description TEXT,
collection_type VARCHAR(50),
chunking_strategy VARCHAR(50) DEFAULT 'auto',
embedding_model VARCHAR(100) DEFAULT 'text-embedding-3-small',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(project_id, name)
);
-- Trigger for updated_at
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER knowledge_chunks_updated_at
BEFORE UPDATE ON knowledge_chunks
FOR EACH ROW
EXECUTE FUNCTION update_updated_at();
Alembic Migration
# alembic/versions/xxxx_add_knowledge_base.py
"""Add knowledge base tables for RAG
Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa
from pgvector.sqlalchemy import Vector
def upgrade():
# Enable extensions
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
# Create knowledge_collections table
op.create_table(
'knowledge_collections',
sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True),
sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id'), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('description', sa.Text()),
sa.Column('collection_type', sa.String(50)),
sa.Column('chunking_strategy', sa.String(50), default='auto'),
sa.Column('embedding_model', sa.String(100), default='text-embedding-3-small'),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True)),
sa.UniqueConstraint('project_id', 'name', name='uq_knowledge_collections_project_name'),
)
# Create knowledge_chunks table
op.create_table(
'knowledge_chunks',
sa.Column('id', sa.dialects.postgresql.UUID(), primary_key=True),
sa.Column('project_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False),
sa.Column('agent_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('agent_instances.id', ondelete='SET NULL')),
sa.Column('collection_id', sa.dialects.postgresql.UUID(), sa.ForeignKey('knowledge_collections.id', ondelete='SET NULL')),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('content_type', sa.String(50), nullable=False),
sa.Column('source_uri', sa.String(512)),
sa.Column('source_type', sa.String(50)),
sa.Column('source_hash', sa.String(64)),
sa.Column('embedding', Vector(1536)),
sa.Column('embedding_model', sa.String(100)),
sa.Column('metadata', sa.dialects.postgresql.JSONB(), default={}),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True)),
)
# Create indexes
op.create_index('ix_knowledge_chunks_project', 'knowledge_chunks', ['project_id'])
op.create_index('ix_knowledge_chunks_agent', 'knowledge_chunks', ['agent_id'])
op.create_index('ix_knowledge_chunks_collection', 'knowledge_chunks', ['collection_id'])
op.create_index('ix_knowledge_chunks_source_hash', 'knowledge_chunks', ['source_hash'])
# Create HNSW vector index
op.execute("""
CREATE INDEX ix_knowledge_chunks_embedding ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
# Add full-text search column and index
op.execute("""
ALTER TABLE knowledge_chunks
ADD COLUMN search_vector tsvector
GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(content, '')), 'A')
) STORED
""")
op.execute("CREATE INDEX ix_knowledge_chunks_fts ON knowledge_chunks USING gin(search_vector)")
def downgrade():
op.drop_table('knowledge_chunks')
op.drop_table('knowledge_collections')
Complete Service Implementation
# app/services/knowledge/service.py
from dataclasses import dataclass
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from litellm import embedding as litellm_embedding
from app.models.knowledge import KnowledgeChunk, KnowledgeCollection
from app.services.knowledge.chunkers import get_chunker
from app.services.knowledge.search import HybridSearchService
@dataclass
class SearchResult:
id: str
content: str
metadata: dict
score: float
source_uri: Optional[str] = None
class KnowledgeBaseService:
"""
Main service for knowledge base operations.
Integrates with LiteLLM for embeddings.
"""
# Model selection by content type
EMBEDDING_MODELS = {
"code": "voyage/voyage-code-3",
"markdown": "text-embedding-3-small",
"pdf": "text-embedding-3-small",
"conversation": "text-embedding-3-small",
"default": "text-embedding-3-small",
}
def __init__(self, db: AsyncSession):
self.db = db
self.search_service = HybridSearchService(db)
async def create_collection(
self,
project_id: str,
name: str,
collection_type: str,
description: str = "",
chunking_strategy: str = "auto",
) -> KnowledgeCollection:
"""Create a new knowledge collection for a project."""
collection = KnowledgeCollection(
project_id=project_id,
name=name,
description=description,
collection_type=collection_type,
chunking_strategy=chunking_strategy,
embedding_model=self.EMBEDDING_MODELS.get(collection_type, "text-embedding-3-small"),
)
self.db.add(collection)
await self.db.commit()
await self.db.refresh(collection)
return collection
async def ingest(
self,
project_id: str,
content: str,
content_type: str,
source_uri: str,
agent_id: Optional[str] = None,
collection_id: Optional[str] = None,
metadata: Optional[dict] = None,
) -> list[str]:
"""
Ingest content into the knowledge base.
Automatically chunks and embeds the content.
"""
import hashlib
# Check for existing content by hash
source_hash = hashlib.sha256(content.encode()).hexdigest()
existing = await self.db.execute(
select(KnowledgeChunk).where(
KnowledgeChunk.project_id == project_id,
KnowledgeChunk.source_hash == source_hash,
)
)
if existing.scalar_one_or_none():
# Content unchanged, skip
return []
# Get appropriate chunker
chunker = get_chunker(content_type)
chunks = chunker.chunk(content, metadata or {})
# Get embedding model
model = self.EMBEDDING_MODELS.get(content_type, "text-embedding-3-small")
# Embed all chunks in batch
chunk_texts = [c.content for c in chunks]
embeddings = await self._embed_batch(chunk_texts, model)
# Store chunks
chunk_ids = []
for chunk, emb in zip(chunks, embeddings):
db_chunk = KnowledgeChunk(
project_id=project_id,
agent_id=agent_id,
collection_id=collection_id,
content=chunk.content,
content_type=content_type,
source_uri=source_uri,
source_type=self._infer_source_type(source_uri),
source_hash=source_hash,
embedding=emb,
embedding_model=model,
metadata={
**chunk.metadata,
**(metadata or {}),
},
)
self.db.add(db_chunk)
chunk_ids.append(str(db_chunk.id))
await self.db.commit()
return chunk_ids
async def search(
self,
project_id: str,
query: str,
agent_id: Optional[str] = None,
collection_id: Optional[str] = None,
limit: int = 10,
content_types: Optional[list[str]] = None,
semantic_weight: float = 0.6,
) -> list[SearchResult]:
"""
Search the knowledge base using hybrid search.
Args:
project_id: Project scope
query: Natural language query
agent_id: Optional agent-specific scope
collection_id: Optional collection scope
limit: Max results
content_types: Filter by content types
semantic_weight: 0-1, weight for semantic vs keyword
"""
# Get query embedding
query_embedding = await self._embed_query(query)
# Perform hybrid search
results = await self.search_service.search(
query=query,
query_embedding=query_embedding,
project_id=project_id,
agent_id=agent_id,
limit=limit,
semantic_weight=semantic_weight,
)
return [
SearchResult(
id=r["id"],
content=r["content"],
metadata=r.get("metadata", {}),
score=r["combined_score"],
source_uri=r.get("source_uri"),
)
for r in results
]
async def delete_by_source(
self,
project_id: str,
source_uri: str,
) -> int:
"""Delete all chunks from a specific source."""
result = await self.db.execute(
delete(KnowledgeChunk).where(
KnowledgeChunk.project_id == project_id,
KnowledgeChunk.source_uri == source_uri,
)
)
await self.db.commit()
return result.rowcount
async def _embed_batch(
self,
texts: list[str],
model: str,
) -> list[list[float]]:
"""Embed multiple texts in a single API call."""
response = await litellm_embedding(
model=model,
input=texts,
)
return [d["embedding"] for d in response.data]
async def _embed_query(self, query: str) -> list[float]:
"""Embed a query string."""
response = await litellm_embedding(
model="text-embedding-3-small",
input=[query],
)
return response.data[0]["embedding"]
def _infer_source_type(self, source_uri: str) -> str:
"""Infer source type from URI."""
if source_uri.startswith("http"):
return "url"
if source_uri.startswith("conversation:"):
return "conversation"
return "file"
Performance Considerations
Query Latency Targets
| Vector Count | Target Latency | Recommended Config |
|---|---|---|
| <100K | <20ms | Default HNSW |
| 100K-1M | <50ms | m=24, ef_construction=100 |
| 1M-10M | <100ms | m=32, ef_construction=128, ef_search=100 |
Memory Requirements
HNSW memory ≈ vectors × dimensions × 4 bytes × (1 + m/8)
Example: 1M vectors × 1536 dims × 4 bytes × (1 + 16/8) = ~9.2 GB
Batch Embedding Costs
| Model | 1K chunks | 10K chunks | 100K chunks |
|---|---|---|---|
| text-embedding-3-small | $0.002 | $0.02 | $0.20 |
| voyage-code-3 | $0.006 | $0.06 | $0.60 |
| Local (nomic-embed) | $0 | $0 | $0 |
Optimization Tips
- Use batch embedding - Single API call for multiple chunks
- Cache query embeddings - Same queries return same vectors
- Partial indexes - Create per-project indexes for high-traffic projects
- Dimension reduction - Use 512-dim with text-embedding-3-small for cost savings
- Connection pooling - Use pgBouncer for high-concurrency scenarios
Integration with Syndarix Agents
Agent Context Retrieval
# app/services/agent/context.py
class AgentContextBuilder:
"""Builds context for agent prompts using RAG."""
def __init__(self, kb_service: KnowledgeBaseService):
self.kb = kb_service
async def build_context(
self,
agent_id: str,
project_id: str,
task_description: str,
max_context_tokens: int = 4000,
) -> str:
"""
Build relevant context for an agent task.
Returns formatted context string for inclusion in prompt.
"""
# Search for relevant knowledge
results = await self.kb.search(
project_id=project_id,
query=task_description,
agent_id=agent_id, # Prefer agent-specific knowledge
limit=10,
semantic_weight=0.7,
)
# Format context
context_parts = []
current_tokens = 0
for result in results:
chunk_tokens = self._count_tokens(result.content)
if current_tokens + chunk_tokens > max_context_tokens:
break
context_parts.append(f"""
### Source: {result.source_uri or 'Unknown'}
{result.content}
""")
current_tokens += chunk_tokens
if not context_parts:
return ""
return f"""
## Relevant Context
The following information was retrieved from the project knowledge base:
{"".join(context_parts)}
---
"""
def _count_tokens(self, text: str) -> int:
"""Approximate token count."""
return len(text) // 4 # Rough estimate
MCP Tool for Knowledge Access
# app/mcp/tools/knowledge.py
from mcp import Tool, ToolResult
class KnowledgeSearchTool(Tool):
"""MCP tool for agents to search project knowledge."""
name = "search_knowledge"
description = "Search the project knowledge base for relevant information"
parameters = {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "The project ID to search within"
},
"query": {
"type": "string",
"description": "Natural language search query"
},
"content_types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by content types (code, markdown, pdf)"
},
"limit": {
"type": "integer",
"default": 5,
"description": "Maximum results to return"
}
},
"required": ["project_id", "query"]
}
async def execute(self, **params) -> ToolResult:
results = await self.kb_service.search(
project_id=params["project_id"],
query=params["query"],
content_types=params.get("content_types"),
limit=params.get("limit", 5),
)
return ToolResult(
content=[
{
"source": r.source_uri,
"content": r.content[:500] + "..." if len(r.content) > 500 else r.content,
"relevance_score": r.score,
}
for r in results
]
)
References
Vector Databases
- Best Vector Databases 2025 - Firecrawl
- pgvector vs Qdrant Comparison - MyScale
- Multi-Tenancy in Vector Databases - Pinecone
Embedding Models
- Best Embedding Models 2025 - Elephas
- 6 Best Code Embedding Models - Modal
- LiteLLM Embedding Documentation
Chunking & RAG
- Chunking Strategies for RAG - Weaviate
- Breaking Up is Hard to Do - Stack Overflow
- Best Chunking Strategies 2025 - Firecrawl
Hybrid Search
- Hybrid Search in PostgreSQL - ParadeDB
- Hybrid Search with pgvector - Jonathan Katz
- Stop the Hallucinations - Cloudurable
Multi-Tenant RAG
pgvector
Decision
Adopt pgvector with hybrid search as the knowledge base solution for Syndarix RAG:
- pgvector for vector storage and similarity search
- PostgreSQL full-text search (tsvector) for keyword matching
- Reciprocal Rank Fusion (RRF) for combining results
- LiteLLM for unified embedding API
- Content-type-aware chunking with AST parsing for code
- Shared table with tenant isolation via project_id/agent_id
Migration Path: If any project exceeds 10M vectors or requires sub-10ms latency, evaluate Qdrant as a dedicated vector store while keeping metadata in PostgreSQL.
Spike completed. Findings will inform ADR-006: Knowledge Base Architecture.