Files
syndarix/mcp-servers/knowledge-base/chunking/code.py
Felipe Cardoso d0fc7f37ff feat(knowledge-base): implement Knowledge Base MCP Server (#57)
Implements RAG capabilities with pgvector for semantic search:

- Intelligent chunking strategies (code-aware, markdown-aware, text)
- Semantic search with vector similarity (HNSW index)
- Keyword search with PostgreSQL full-text search
- Hybrid search using Reciprocal Rank Fusion (RRF)
- Redis caching for embeddings
- Collection management (ingest, search, delete, stats)
- FastMCP tools: search_knowledge, ingest_content, delete_content,
  list_collections, get_collection_stats, update_document

Testing:
- 128 comprehensive tests covering all components
- 58% code coverage (database integration tests use mocks)
- Passes ruff linting and mypy type checking

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 21:33:26 +01:00

411 lines
14 KiB
Python

"""
Code-aware chunking implementation.
Provides intelligent chunking for source code that respects
function/class boundaries and preserves context.
"""
import logging
import re
from typing import Any
from chunking.base import BaseChunker
from config import Settings
from models import Chunk, ChunkType, FileType
logger = logging.getLogger(__name__)
# Language-specific patterns for detecting function/class definitions
LANGUAGE_PATTERNS: dict[FileType, dict[str, re.Pattern[str]]] = {
FileType.PYTHON: {
"function": re.compile(r"^(\s*)(async\s+)?def\s+\w+", re.MULTILINE),
"class": re.compile(r"^(\s*)class\s+\w+", re.MULTILINE),
"decorator": re.compile(r"^(\s*)@\w+", re.MULTILINE),
},
FileType.JAVASCRIPT: {
"function": re.compile(
r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|"
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?\(",
re.MULTILINE,
),
"class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE),
"arrow": re.compile(
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?(\([^)]*\)|[^=])\s*=>",
re.MULTILINE,
),
},
FileType.TYPESCRIPT: {
"function": re.compile(
r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|"
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*[:<]",
re.MULTILINE,
),
"class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE),
"interface": re.compile(r"^(\s*)(export\s+)?interface\s+\w+", re.MULTILINE),
"type": re.compile(r"^(\s*)(export\s+)?type\s+\w+", re.MULTILINE),
},
FileType.GO: {
"function": re.compile(r"^func\s+(\([^)]+\)\s+)?\w+", re.MULTILINE),
"struct": re.compile(r"^type\s+\w+\s+struct", re.MULTILINE),
"interface": re.compile(r"^type\s+\w+\s+interface", re.MULTILINE),
},
FileType.RUST: {
"function": re.compile(r"^(\s*)(pub\s+)?(async\s+)?fn\s+\w+", re.MULTILINE),
"struct": re.compile(r"^(\s*)(pub\s+)?struct\s+\w+", re.MULTILINE),
"impl": re.compile(r"^(\s*)impl\s+", re.MULTILINE),
"trait": re.compile(r"^(\s*)(pub\s+)?trait\s+\w+", re.MULTILINE),
},
FileType.JAVA: {
"method": re.compile(
r"^(\s*)(public|private|protected)?\s*(static)?\s*\w+\s+\w+\s*\(",
re.MULTILINE,
),
"class": re.compile(
r"^(\s*)(public|private|protected)?\s*(abstract)?\s*class\s+\w+",
re.MULTILINE,
),
"interface": re.compile(
r"^(\s*)(public|private|protected)?\s*interface\s+\w+",
re.MULTILINE,
),
},
}
class CodeChunker(BaseChunker):
"""
Code-aware chunker that respects logical boundaries.
Features:
- Detects function/class boundaries
- Preserves decorator/annotation context
- Handles nested structures
- Falls back to line-based chunking when needed
"""
def __init__(
self,
chunk_size: int,
chunk_overlap: int,
settings: Settings | None = None,
) -> None:
"""Initialize code chunker."""
super().__init__(chunk_size, chunk_overlap, settings)
@property
def chunk_type(self) -> ChunkType:
"""Get chunk type."""
return ChunkType.CODE
def chunk(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Chunk code content.
Tries to respect function/class boundaries, falling back
to line-based chunking if needed.
"""
if not content.strip():
return []
metadata = metadata or {}
lines = content.splitlines(keepends=True)
# Try language-aware chunking if we have patterns
if file_type and file_type in LANGUAGE_PATTERNS:
chunks = self._chunk_by_structure(
content, lines, file_type, source_path, metadata
)
if chunks:
return chunks
# Fall back to line-based chunking
return self._chunk_by_lines(lines, source_path, file_type, metadata)
def _chunk_by_structure(
self,
content: str,
lines: list[str],
file_type: FileType,
source_path: str | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""
Chunk by detecting code structure (functions, classes).
Returns empty list if structure detection isn't useful.
"""
patterns = LANGUAGE_PATTERNS.get(file_type, {})
if not patterns:
return []
# Find all structure boundaries
boundaries: list[tuple[int, str]] = [] # (line_number, type)
for struct_type, pattern in patterns.items():
for match in pattern.finditer(content):
# Convert character position to line number
line_num = content[:match.start()].count("\n")
boundaries.append((line_num, struct_type))
if not boundaries:
return []
# Sort boundaries by line number
boundaries.sort(key=lambda x: x[0])
# If we have very few boundaries, line-based might be better
if len(boundaries) < 3 and len(lines) > 50:
return []
# Create chunks based on boundaries
chunks: list[Chunk] = []
current_start = 0
for _i, (line_num, struct_type) in enumerate(boundaries):
# Check if we need to create a chunk before this boundary
if line_num > current_start:
# Include any preceding comments/decorators
actual_start = self._find_context_start(lines, line_num)
if actual_start < current_start:
actual_start = current_start
chunk_lines = lines[current_start:line_num]
chunk_content = "".join(chunk_lines)
if chunk_content.strip():
token_count = self.count_tokens(chunk_content)
# If chunk is too large, split it
if token_count > self.chunk_size * 1.5:
sub_chunks = self._split_large_chunk(
chunk_lines, current_start, source_path, file_type, metadata
)
chunks.extend(sub_chunks)
elif token_count > 0:
chunks.append(
self._create_chunk(
content=chunk_content.rstrip(),
source_path=source_path,
start_line=current_start + 1,
end_line=line_num,
file_type=file_type,
metadata={**metadata, "structure_type": struct_type},
)
)
current_start = line_num
# Handle remaining content
if current_start < len(lines):
chunk_lines = lines[current_start:]
chunk_content = "".join(chunk_lines)
if chunk_content.strip():
token_count = self.count_tokens(chunk_content)
if token_count > self.chunk_size * 1.5:
sub_chunks = self._split_large_chunk(
chunk_lines, current_start, source_path, file_type, metadata
)
chunks.extend(sub_chunks)
else:
chunks.append(
self._create_chunk(
content=chunk_content.rstrip(),
source_path=source_path,
start_line=current_start + 1,
end_line=len(lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _find_context_start(self, lines: list[str], line_num: int) -> int:
"""Find the start of context (decorators, comments) before a line."""
start = line_num
# Look backwards for decorators/comments
for i in range(line_num - 1, max(0, line_num - 10), -1):
line = lines[i].strip()
if not line:
continue
if line.startswith(("#", "//", "/*", "*", "@", "'")):
start = i
else:
break
return start
def _split_large_chunk(
self,
chunk_lines: list[str],
base_line: int,
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Split a large chunk into smaller pieces with overlap."""
chunks: list[Chunk] = []
current_lines: list[str] = []
current_tokens = 0
chunk_start = 0
for i, line in enumerate(chunk_lines):
line_tokens = self.count_tokens(line)
if current_tokens + line_tokens > self.chunk_size and current_lines:
# Create chunk
chunk_content = "".join(current_lines).rstrip()
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=base_line + chunk_start + 1,
end_line=base_line + i,
file_type=file_type,
metadata=metadata,
)
)
# Calculate overlap
overlap_tokens = 0
overlap_lines: list[str] = []
for j in range(len(current_lines) - 1, -1, -1):
overlap_tokens += self.count_tokens(current_lines[j])
if overlap_tokens >= self.chunk_overlap:
overlap_lines = current_lines[j:]
break
current_lines = overlap_lines
current_tokens = sum(self.count_tokens(line) for line in current_lines)
chunk_start = i - len(overlap_lines)
current_lines.append(line)
current_tokens += line_tokens
# Final chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=base_line + chunk_start + 1,
end_line=base_line + len(chunk_lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _chunk_by_lines(
self,
lines: list[str],
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Chunk by lines with overlap."""
chunks: list[Chunk] = []
current_lines: list[str] = []
current_tokens = 0
chunk_start = 0
for i, line in enumerate(lines):
line_tokens = self.count_tokens(line)
# If this line alone exceeds chunk size, handle specially
if line_tokens > self.chunk_size:
# Flush current chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=i,
file_type=file_type,
metadata=metadata,
)
)
current_lines = []
current_tokens = 0
chunk_start = i
# Truncate and add long line
truncated = self.truncate_to_tokens(line, self.chunk_size)
chunks.append(
self._create_chunk(
content=truncated.rstrip(),
source_path=source_path,
start_line=i + 1,
end_line=i + 1,
file_type=file_type,
metadata={**metadata, "truncated": True},
)
)
chunk_start = i + 1
continue
if current_tokens + line_tokens > self.chunk_size and current_lines:
# Create chunk
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=i,
file_type=file_type,
metadata=metadata,
)
)
# Calculate overlap
overlap_tokens = 0
overlap_lines: list[str] = []
for j in range(len(current_lines) - 1, -1, -1):
line_tok = self.count_tokens(current_lines[j])
if overlap_tokens + line_tok > self.chunk_overlap:
break
overlap_lines.insert(0, current_lines[j])
overlap_tokens += line_tok
current_lines = overlap_lines
current_tokens = overlap_tokens
chunk_start = i - len(overlap_lines)
current_lines.append(line)
current_tokens += line_tokens
# Final chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=len(lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks