feat(knowledge-base): implement Knowledge Base MCP Server (#57)

Implements RAG capabilities with pgvector for semantic search:

- Intelligent chunking strategies (code-aware, markdown-aware, text)
- Semantic search with vector similarity (HNSW index)
- Keyword search with PostgreSQL full-text search
- Hybrid search using Reciprocal Rank Fusion (RRF)
- Redis caching for embeddings
- Collection management (ingest, search, delete, stats)
- FastMCP tools: search_knowledge, ingest_content, delete_content,
  list_collections, get_collection_stats, update_document

Testing:
- 128 comprehensive tests covering all components
- 58% code coverage (database integration tests use mocks)
- Passes ruff linting and mypy type checking

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-03 21:33:26 +01:00
parent 18d717e996
commit d0fc7f37ff
26 changed files with 9530 additions and 120 deletions

View File

@@ -0,0 +1,19 @@
"""
Chunking module for Knowledge Base MCP Server.
Provides intelligent content chunking for different file types
with overlap and context preservation.
"""
from chunking.base import BaseChunker, ChunkerFactory
from chunking.code import CodeChunker
from chunking.markdown import MarkdownChunker
from chunking.text import TextChunker
__all__ = [
"BaseChunker",
"ChunkerFactory",
"CodeChunker",
"MarkdownChunker",
"TextChunker",
]

View File

@@ -0,0 +1,281 @@
"""
Base chunker implementation.
Provides abstract interface and common utilities for content chunking.
"""
import logging
from abc import ABC, abstractmethod
from typing import Any
import tiktoken
from config import Settings, get_settings
from exceptions import ChunkingError
from models import FILE_EXTENSION_MAP, Chunk, ChunkType, FileType
logger = logging.getLogger(__name__)
class BaseChunker(ABC):
"""
Abstract base class for content chunkers.
Subclasses implement specific chunking strategies for
different content types (code, markdown, text).
"""
def __init__(
self,
chunk_size: int,
chunk_overlap: int,
settings: Settings | None = None,
) -> None:
"""
Initialize chunker.
Args:
chunk_size: Target tokens per chunk
chunk_overlap: Token overlap between chunks
settings: Application settings
"""
self._settings = settings or get_settings()
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Use cl100k_base encoding (GPT-4/text-embedding-3)
self._tokenizer = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self._tokenizer.encode(text))
def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to max tokens."""
tokens = self._tokenizer.encode(text)
if len(tokens) <= max_tokens:
return text
return self._tokenizer.decode(tokens[:max_tokens])
@abstractmethod
def chunk(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Split content into chunks.
Args:
content: Content to chunk
source_path: Source file path for reference
file_type: File type for specialized handling
metadata: Additional metadata to include
Returns:
List of Chunk objects
"""
pass
@property
@abstractmethod
def chunk_type(self) -> ChunkType:
"""Get the chunk type this chunker produces."""
pass
def _create_chunk(
self,
content: str,
source_path: str | None = None,
start_line: int | None = None,
end_line: int | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> Chunk:
"""Create a chunk with token count."""
token_count = self.count_tokens(content)
return Chunk(
content=content,
chunk_type=self.chunk_type,
file_type=file_type,
source_path=source_path,
start_line=start_line,
end_line=end_line,
metadata=metadata or {},
token_count=token_count,
)
class ChunkerFactory:
"""
Factory for creating appropriate chunkers.
Selects the best chunker based on file type or content.
"""
def __init__(self, settings: Settings | None = None) -> None:
"""Initialize factory."""
self._settings = settings or get_settings()
self._chunkers: dict[str, BaseChunker] = {}
def _get_code_chunker(self) -> "BaseChunker":
"""Get or create code chunker."""
from chunking.code import CodeChunker
if "code" not in self._chunkers:
self._chunkers["code"] = CodeChunker(
chunk_size=self._settings.code_chunk_size,
chunk_overlap=self._settings.code_chunk_overlap,
settings=self._settings,
)
return self._chunkers["code"]
def _get_markdown_chunker(self) -> "BaseChunker":
"""Get or create markdown chunker."""
from chunking.markdown import MarkdownChunker
if "markdown" not in self._chunkers:
self._chunkers["markdown"] = MarkdownChunker(
chunk_size=self._settings.markdown_chunk_size,
chunk_overlap=self._settings.markdown_chunk_overlap,
settings=self._settings,
)
return self._chunkers["markdown"]
def _get_text_chunker(self) -> "BaseChunker":
"""Get or create text chunker."""
from chunking.text import TextChunker
if "text" not in self._chunkers:
self._chunkers["text"] = TextChunker(
chunk_size=self._settings.text_chunk_size,
chunk_overlap=self._settings.text_chunk_overlap,
settings=self._settings,
)
return self._chunkers["text"]
def get_chunker(
self,
file_type: FileType | None = None,
chunk_type: ChunkType | None = None,
) -> BaseChunker:
"""
Get appropriate chunker for content type.
Args:
file_type: File type to chunk
chunk_type: Explicit chunk type to use
Returns:
Appropriate chunker instance
"""
# If explicit chunk type specified, use it
if chunk_type:
if chunk_type == ChunkType.CODE:
return self._get_code_chunker()
elif chunk_type == ChunkType.MARKDOWN:
return self._get_markdown_chunker()
else:
return self._get_text_chunker()
# Otherwise, infer from file type
if file_type:
if file_type == FileType.MARKDOWN:
return self._get_markdown_chunker()
elif file_type in (FileType.TEXT, FileType.JSON, FileType.YAML, FileType.TOML):
return self._get_text_chunker()
else:
# Code files
return self._get_code_chunker()
# Default to text chunker
return self._get_text_chunker()
def get_chunker_for_path(self, source_path: str) -> tuple[BaseChunker, FileType | None]:
"""
Get chunker based on file path extension.
Args:
source_path: File path to chunk
Returns:
Tuple of (chunker, file_type)
"""
# Extract extension
ext = ""
if "." in source_path:
ext = "." + source_path.rsplit(".", 1)[-1].lower()
file_type = FILE_EXTENSION_MAP.get(ext)
chunker = self.get_chunker(file_type=file_type)
return chunker, file_type
def chunk_content(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
chunk_type: ChunkType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Chunk content using appropriate strategy.
Args:
content: Content to chunk
source_path: Source file path
file_type: File type
chunk_type: Explicit chunk type
metadata: Additional metadata
Returns:
List of chunks
"""
# If we have a source path but no file type, infer it
if source_path and not file_type:
chunker, file_type = self.get_chunker_for_path(source_path)
else:
chunker = self.get_chunker(file_type=file_type, chunk_type=chunk_type)
try:
chunks = chunker.chunk(
content=content,
source_path=source_path,
file_type=file_type,
metadata=metadata,
)
logger.debug(
f"Chunked content into {len(chunks)} chunks "
f"(type={chunker.chunk_type.value})"
)
return chunks
except Exception as e:
logger.error(f"Chunking error: {e}")
raise ChunkingError(
message=f"Failed to chunk content: {e}",
cause=e,
)
# Global chunker factory instance
_chunker_factory: ChunkerFactory | None = None
def get_chunker_factory() -> ChunkerFactory:
"""Get the global chunker factory instance."""
global _chunker_factory
if _chunker_factory is None:
_chunker_factory = ChunkerFactory()
return _chunker_factory
def reset_chunker_factory() -> None:
"""Reset the global chunker factory (for testing)."""
global _chunker_factory
_chunker_factory = None

View File

@@ -0,0 +1,410 @@
"""
Code-aware chunking implementation.
Provides intelligent chunking for source code that respects
function/class boundaries and preserves context.
"""
import logging
import re
from typing import Any
from chunking.base import BaseChunker
from config import Settings
from models import Chunk, ChunkType, FileType
logger = logging.getLogger(__name__)
# Language-specific patterns for detecting function/class definitions
LANGUAGE_PATTERNS: dict[FileType, dict[str, re.Pattern[str]]] = {
FileType.PYTHON: {
"function": re.compile(r"^(\s*)(async\s+)?def\s+\w+", re.MULTILINE),
"class": re.compile(r"^(\s*)class\s+\w+", re.MULTILINE),
"decorator": re.compile(r"^(\s*)@\w+", re.MULTILINE),
},
FileType.JAVASCRIPT: {
"function": re.compile(
r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|"
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?\(",
re.MULTILINE,
),
"class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE),
"arrow": re.compile(
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?(\([^)]*\)|[^=])\s*=>",
re.MULTILINE,
),
},
FileType.TYPESCRIPT: {
"function": re.compile(
r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|"
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*[:<]",
re.MULTILINE,
),
"class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE),
"interface": re.compile(r"^(\s*)(export\s+)?interface\s+\w+", re.MULTILINE),
"type": re.compile(r"^(\s*)(export\s+)?type\s+\w+", re.MULTILINE),
},
FileType.GO: {
"function": re.compile(r"^func\s+(\([^)]+\)\s+)?\w+", re.MULTILINE),
"struct": re.compile(r"^type\s+\w+\s+struct", re.MULTILINE),
"interface": re.compile(r"^type\s+\w+\s+interface", re.MULTILINE),
},
FileType.RUST: {
"function": re.compile(r"^(\s*)(pub\s+)?(async\s+)?fn\s+\w+", re.MULTILINE),
"struct": re.compile(r"^(\s*)(pub\s+)?struct\s+\w+", re.MULTILINE),
"impl": re.compile(r"^(\s*)impl\s+", re.MULTILINE),
"trait": re.compile(r"^(\s*)(pub\s+)?trait\s+\w+", re.MULTILINE),
},
FileType.JAVA: {
"method": re.compile(
r"^(\s*)(public|private|protected)?\s*(static)?\s*\w+\s+\w+\s*\(",
re.MULTILINE,
),
"class": re.compile(
r"^(\s*)(public|private|protected)?\s*(abstract)?\s*class\s+\w+",
re.MULTILINE,
),
"interface": re.compile(
r"^(\s*)(public|private|protected)?\s*interface\s+\w+",
re.MULTILINE,
),
},
}
class CodeChunker(BaseChunker):
"""
Code-aware chunker that respects logical boundaries.
Features:
- Detects function/class boundaries
- Preserves decorator/annotation context
- Handles nested structures
- Falls back to line-based chunking when needed
"""
def __init__(
self,
chunk_size: int,
chunk_overlap: int,
settings: Settings | None = None,
) -> None:
"""Initialize code chunker."""
super().__init__(chunk_size, chunk_overlap, settings)
@property
def chunk_type(self) -> ChunkType:
"""Get chunk type."""
return ChunkType.CODE
def chunk(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Chunk code content.
Tries to respect function/class boundaries, falling back
to line-based chunking if needed.
"""
if not content.strip():
return []
metadata = metadata or {}
lines = content.splitlines(keepends=True)
# Try language-aware chunking if we have patterns
if file_type and file_type in LANGUAGE_PATTERNS:
chunks = self._chunk_by_structure(
content, lines, file_type, source_path, metadata
)
if chunks:
return chunks
# Fall back to line-based chunking
return self._chunk_by_lines(lines, source_path, file_type, metadata)
def _chunk_by_structure(
self,
content: str,
lines: list[str],
file_type: FileType,
source_path: str | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""
Chunk by detecting code structure (functions, classes).
Returns empty list if structure detection isn't useful.
"""
patterns = LANGUAGE_PATTERNS.get(file_type, {})
if not patterns:
return []
# Find all structure boundaries
boundaries: list[tuple[int, str]] = [] # (line_number, type)
for struct_type, pattern in patterns.items():
for match in pattern.finditer(content):
# Convert character position to line number
line_num = content[:match.start()].count("\n")
boundaries.append((line_num, struct_type))
if not boundaries:
return []
# Sort boundaries by line number
boundaries.sort(key=lambda x: x[0])
# If we have very few boundaries, line-based might be better
if len(boundaries) < 3 and len(lines) > 50:
return []
# Create chunks based on boundaries
chunks: list[Chunk] = []
current_start = 0
for _i, (line_num, struct_type) in enumerate(boundaries):
# Check if we need to create a chunk before this boundary
if line_num > current_start:
# Include any preceding comments/decorators
actual_start = self._find_context_start(lines, line_num)
if actual_start < current_start:
actual_start = current_start
chunk_lines = lines[current_start:line_num]
chunk_content = "".join(chunk_lines)
if chunk_content.strip():
token_count = self.count_tokens(chunk_content)
# If chunk is too large, split it
if token_count > self.chunk_size * 1.5:
sub_chunks = self._split_large_chunk(
chunk_lines, current_start, source_path, file_type, metadata
)
chunks.extend(sub_chunks)
elif token_count > 0:
chunks.append(
self._create_chunk(
content=chunk_content.rstrip(),
source_path=source_path,
start_line=current_start + 1,
end_line=line_num,
file_type=file_type,
metadata={**metadata, "structure_type": struct_type},
)
)
current_start = line_num
# Handle remaining content
if current_start < len(lines):
chunk_lines = lines[current_start:]
chunk_content = "".join(chunk_lines)
if chunk_content.strip():
token_count = self.count_tokens(chunk_content)
if token_count > self.chunk_size * 1.5:
sub_chunks = self._split_large_chunk(
chunk_lines, current_start, source_path, file_type, metadata
)
chunks.extend(sub_chunks)
else:
chunks.append(
self._create_chunk(
content=chunk_content.rstrip(),
source_path=source_path,
start_line=current_start + 1,
end_line=len(lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _find_context_start(self, lines: list[str], line_num: int) -> int:
"""Find the start of context (decorators, comments) before a line."""
start = line_num
# Look backwards for decorators/comments
for i in range(line_num - 1, max(0, line_num - 10), -1):
line = lines[i].strip()
if not line:
continue
if line.startswith(("#", "//", "/*", "*", "@", "'")):
start = i
else:
break
return start
def _split_large_chunk(
self,
chunk_lines: list[str],
base_line: int,
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Split a large chunk into smaller pieces with overlap."""
chunks: list[Chunk] = []
current_lines: list[str] = []
current_tokens = 0
chunk_start = 0
for i, line in enumerate(chunk_lines):
line_tokens = self.count_tokens(line)
if current_tokens + line_tokens > self.chunk_size and current_lines:
# Create chunk
chunk_content = "".join(current_lines).rstrip()
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=base_line + chunk_start + 1,
end_line=base_line + i,
file_type=file_type,
metadata=metadata,
)
)
# Calculate overlap
overlap_tokens = 0
overlap_lines: list[str] = []
for j in range(len(current_lines) - 1, -1, -1):
overlap_tokens += self.count_tokens(current_lines[j])
if overlap_tokens >= self.chunk_overlap:
overlap_lines = current_lines[j:]
break
current_lines = overlap_lines
current_tokens = sum(self.count_tokens(line) for line in current_lines)
chunk_start = i - len(overlap_lines)
current_lines.append(line)
current_tokens += line_tokens
# Final chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=base_line + chunk_start + 1,
end_line=base_line + len(chunk_lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _chunk_by_lines(
self,
lines: list[str],
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Chunk by lines with overlap."""
chunks: list[Chunk] = []
current_lines: list[str] = []
current_tokens = 0
chunk_start = 0
for i, line in enumerate(lines):
line_tokens = self.count_tokens(line)
# If this line alone exceeds chunk size, handle specially
if line_tokens > self.chunk_size:
# Flush current chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=i,
file_type=file_type,
metadata=metadata,
)
)
current_lines = []
current_tokens = 0
chunk_start = i
# Truncate and add long line
truncated = self.truncate_to_tokens(line, self.chunk_size)
chunks.append(
self._create_chunk(
content=truncated.rstrip(),
source_path=source_path,
start_line=i + 1,
end_line=i + 1,
file_type=file_type,
metadata={**metadata, "truncated": True},
)
)
chunk_start = i + 1
continue
if current_tokens + line_tokens > self.chunk_size and current_lines:
# Create chunk
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=i,
file_type=file_type,
metadata=metadata,
)
)
# Calculate overlap
overlap_tokens = 0
overlap_lines: list[str] = []
for j in range(len(current_lines) - 1, -1, -1):
line_tok = self.count_tokens(current_lines[j])
if overlap_tokens + line_tok > self.chunk_overlap:
break
overlap_lines.insert(0, current_lines[j])
overlap_tokens += line_tok
current_lines = overlap_lines
current_tokens = overlap_tokens
chunk_start = i - len(overlap_lines)
current_lines.append(line)
current_tokens += line_tokens
# Final chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=len(lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks

View File

@@ -0,0 +1,483 @@
"""
Markdown-aware chunking implementation.
Provides intelligent chunking for markdown content that respects
heading hierarchy and preserves document structure.
"""
import logging
import re
from typing import Any
from chunking.base import BaseChunker
from config import Settings
from models import Chunk, ChunkType, FileType
logger = logging.getLogger(__name__)
# Patterns for markdown elements
HEADING_PATTERN = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
CODE_BLOCK_PATTERN = re.compile(r"^```", re.MULTILINE)
HR_PATTERN = re.compile(r"^(-{3,}|_{3,}|\*{3,})$", re.MULTILINE)
class MarkdownChunker(BaseChunker):
"""
Markdown-aware chunker that respects document structure.
Features:
- Respects heading hierarchy
- Preserves heading context in chunks
- Handles code blocks as units
- Maintains list continuity where possible
"""
def __init__(
self,
chunk_size: int,
chunk_overlap: int,
settings: Settings | None = None,
) -> None:
"""Initialize markdown chunker."""
super().__init__(chunk_size, chunk_overlap, settings)
@property
def chunk_type(self) -> ChunkType:
"""Get chunk type."""
return ChunkType.MARKDOWN
def chunk(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Chunk markdown content.
Splits on heading boundaries and preserves heading context.
"""
if not content.strip():
return []
metadata = metadata or {}
file_type = file_type or FileType.MARKDOWN
# Split content into sections by headings
sections = self._split_by_headings(content)
if not sections:
# No headings, chunk as plain text
return self._chunk_text_block(
content, source_path, file_type, metadata, []
)
chunks: list[Chunk] = []
heading_stack: list[tuple[int, str]] = [] # (level, text)
for section in sections:
heading_level = section.get("level", 0)
heading_text = section.get("heading", "")
section_content = section.get("content", "")
start_line = section.get("start_line", 1)
end_line = section.get("end_line", 1)
# Update heading stack
if heading_level > 0:
# Pop headings of equal or higher level
while heading_stack and heading_stack[-1][0] >= heading_level:
heading_stack.pop()
heading_stack.append((heading_level, heading_text))
# Build heading context prefix
heading_context = " > ".join(h[1] for h in heading_stack)
section_chunks = self._chunk_section(
content=section_content,
heading_context=heading_context,
heading_level=heading_level,
heading_text=heading_text,
start_line=start_line,
end_line=end_line,
source_path=source_path,
file_type=file_type,
metadata=metadata,
)
chunks.extend(section_chunks)
return chunks
def _split_by_headings(self, content: str) -> list[dict[str, Any]]:
"""Split content into sections by headings."""
sections: list[dict[str, Any]] = []
lines = content.split("\n")
current_section: dict[str, Any] = {
"level": 0,
"heading": "",
"content": "",
"start_line": 1,
"end_line": 1,
}
current_lines: list[str] = []
in_code_block = False
for i, line in enumerate(lines):
# Track code blocks
if line.strip().startswith("```"):
in_code_block = not in_code_block
current_lines.append(line)
continue
# Skip heading detection in code blocks
if in_code_block:
current_lines.append(line)
continue
# Check for heading
heading_match = HEADING_PATTERN.match(line)
if heading_match:
# Save previous section
if current_lines:
current_section["content"] = "\n".join(current_lines)
current_section["end_line"] = i
if current_section["content"].strip():
sections.append(current_section)
# Start new section
level = len(heading_match.group(1))
heading_text = heading_match.group(2).strip()
current_section = {
"level": level,
"heading": heading_text,
"content": "",
"start_line": i + 1,
"end_line": i + 1,
}
current_lines = [line]
else:
current_lines.append(line)
# Save final section
if current_lines:
current_section["content"] = "\n".join(current_lines)
current_section["end_line"] = len(lines)
if current_section["content"].strip():
sections.append(current_section)
return sections
def _chunk_section(
self,
content: str,
heading_context: str,
heading_level: int,
heading_text: str,
start_line: int,
end_line: int,
source_path: str | None,
file_type: FileType,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Chunk a single section of markdown."""
if not content.strip():
return []
token_count = self.count_tokens(content)
# If section fits in one chunk, return as-is
if token_count <= self.chunk_size:
section_metadata = {
**metadata,
"heading_context": heading_context,
"heading_level": heading_level,
"heading_text": heading_text,
}
return [
self._create_chunk(
content=content.strip(),
source_path=source_path,
start_line=start_line,
end_line=end_line,
file_type=file_type,
metadata=section_metadata,
)
]
# Need to split - try to split on paragraphs first
return self._chunk_text_block(
content,
source_path,
file_type,
{
**metadata,
"heading_context": heading_context,
"heading_level": heading_level,
"heading_text": heading_text,
},
_heading_stack=[(heading_level, heading_text)] if heading_text else [],
base_line=start_line,
)
def _chunk_text_block(
self,
content: str,
source_path: str | None,
file_type: FileType,
metadata: dict[str, Any],
_heading_stack: list[tuple[int, str]],
base_line: int = 1,
) -> list[Chunk]:
"""Chunk a block of text by paragraphs."""
# Split into paragraphs (separated by blank lines)
paragraphs = self._split_into_paragraphs(content)
if not paragraphs:
return []
chunks: list[Chunk] = []
current_content: list[str] = []
current_tokens = 0
chunk_start_line = base_line
for para_info in paragraphs:
para_content = para_info["content"]
para_tokens = para_info["tokens"]
para_start = para_info["start_line"]
# Handle very large paragraphs
if para_tokens > self.chunk_size:
# Flush current content
if current_content:
chunk_text = "\n\n".join(current_content)
chunks.append(
self._create_chunk(
content=chunk_text.strip(),
source_path=source_path,
start_line=chunk_start_line,
end_line=base_line + para_start - 1,
file_type=file_type,
metadata=metadata,
)
)
current_content = []
current_tokens = 0
# Split large paragraph by sentences/lines
sub_chunks = self._split_large_paragraph(
para_content,
source_path,
file_type,
metadata,
base_line + para_start,
)
chunks.extend(sub_chunks)
chunk_start_line = base_line + para_info["end_line"] + 1
continue
# Check if adding this paragraph exceeds limit
if current_tokens + para_tokens > self.chunk_size and current_content:
# Create chunk
chunk_text = "\n\n".join(current_content)
chunks.append(
self._create_chunk(
content=chunk_text.strip(),
source_path=source_path,
start_line=chunk_start_line,
end_line=base_line + para_start - 1,
file_type=file_type,
metadata=metadata,
)
)
# Overlap: include last paragraph if it fits
if current_content and self.count_tokens(current_content[-1]) <= self.chunk_overlap:
current_content = [current_content[-1]]
current_tokens = self.count_tokens(current_content[-1])
else:
current_content = []
current_tokens = 0
chunk_start_line = base_line + para_start
current_content.append(para_content)
current_tokens += para_tokens
# Final chunk
if current_content:
chunk_text = "\n\n".join(current_content)
end_line_num = base_line + (paragraphs[-1]["end_line"] if paragraphs else 0)
chunks.append(
self._create_chunk(
content=chunk_text.strip(),
source_path=source_path,
start_line=chunk_start_line,
end_line=end_line_num,
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _split_into_paragraphs(self, content: str) -> list[dict[str, Any]]:
"""Split content into paragraphs with metadata."""
paragraphs: list[dict[str, Any]] = []
lines = content.split("\n")
current_para: list[str] = []
para_start = 0
in_code_block = False
for i, line in enumerate(lines):
# Track code blocks (keep them as single units)
if line.strip().startswith("```"):
if in_code_block:
# End of code block
current_para.append(line)
in_code_block = False
else:
# Start of code block - save previous paragraph
if current_para and any(p.strip() for p in current_para):
para_content = "\n".join(current_para)
paragraphs.append({
"content": para_content,
"tokens": self.count_tokens(para_content),
"start_line": para_start,
"end_line": i - 1,
})
current_para = [line]
para_start = i
in_code_block = True
continue
if in_code_block:
current_para.append(line)
continue
# Empty line indicates paragraph break
if not line.strip():
if current_para and any(p.strip() for p in current_para):
para_content = "\n".join(current_para)
paragraphs.append({
"content": para_content,
"tokens": self.count_tokens(para_content),
"start_line": para_start,
"end_line": i - 1,
})
current_para = []
para_start = i + 1
else:
if not current_para:
para_start = i
current_para.append(line)
# Final paragraph
if current_para and any(p.strip() for p in current_para):
para_content = "\n".join(current_para)
paragraphs.append({
"content": para_content,
"tokens": self.count_tokens(para_content),
"start_line": para_start,
"end_line": len(lines) - 1,
})
return paragraphs
def _split_large_paragraph(
self,
content: str,
source_path: str | None,
file_type: FileType,
metadata: dict[str, Any],
base_line: int,
) -> list[Chunk]:
"""Split a large paragraph into smaller chunks."""
# Try splitting by sentences
sentences = self._split_into_sentences(content)
chunks: list[Chunk] = []
current_content: list[str] = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.count_tokens(sentence)
# If single sentence is too large, truncate
if sentence_tokens > self.chunk_size:
if current_content:
chunk_text = " ".join(current_content)
chunks.append(
self._create_chunk(
content=chunk_text.strip(),
source_path=source_path,
start_line=base_line,
end_line=base_line,
file_type=file_type,
metadata=metadata,
)
)
current_content = []
current_tokens = 0
truncated = self.truncate_to_tokens(sentence, self.chunk_size)
chunks.append(
self._create_chunk(
content=truncated.strip(),
source_path=source_path,
start_line=base_line,
end_line=base_line,
file_type=file_type,
metadata={**metadata, "truncated": True},
)
)
continue
if current_tokens + sentence_tokens > self.chunk_size and current_content:
chunk_text = " ".join(current_content)
chunks.append(
self._create_chunk(
content=chunk_text.strip(),
source_path=source_path,
start_line=base_line,
end_line=base_line,
file_type=file_type,
metadata=metadata,
)
)
# Overlap with last sentence
if current_content and self.count_tokens(current_content[-1]) <= self.chunk_overlap:
current_content = [current_content[-1]]
current_tokens = self.count_tokens(current_content[-1])
else:
current_content = []
current_tokens = 0
current_content.append(sentence)
current_tokens += sentence_tokens
# Final chunk
if current_content:
chunk_text = " ".join(current_content)
chunks.append(
self._create_chunk(
content=chunk_text.strip(),
source_path=source_path,
start_line=base_line,
end_line=base_line,
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _split_into_sentences(self, text: str) -> list[str]:
"""Split text into sentences."""
# Simple sentence splitting on common terminators
# More sophisticated splitting could use nltk or spacy
sentence_endings = re.compile(r"(?<=[.!?])\s+")
sentences = sentence_endings.split(text)
return [s.strip() for s in sentences if s.strip()]

View File

@@ -0,0 +1,389 @@
"""
Plain text chunking implementation.
Provides simple text chunking with paragraph and sentence
boundary detection.
"""
import logging
import re
from typing import Any
from chunking.base import BaseChunker
from config import Settings
from models import Chunk, ChunkType, FileType
logger = logging.getLogger(__name__)
class TextChunker(BaseChunker):
"""
Plain text chunker with paragraph awareness.
Features:
- Splits on paragraph boundaries
- Falls back to sentence/word boundaries
- Configurable overlap for context preservation
"""
def __init__(
self,
chunk_size: int,
chunk_overlap: int,
settings: Settings | None = None,
) -> None:
"""Initialize text chunker."""
super().__init__(chunk_size, chunk_overlap, settings)
@property
def chunk_type(self) -> ChunkType:
"""Get chunk type."""
return ChunkType.TEXT
def chunk(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Chunk plain text content.
Tries paragraph boundaries first, then sentences.
"""
if not content.strip():
return []
metadata = metadata or {}
# Check if content fits in a single chunk
total_tokens = self.count_tokens(content)
if total_tokens <= self.chunk_size:
return [
self._create_chunk(
content=content.strip(),
source_path=source_path,
start_line=1,
end_line=content.count("\n") + 1,
file_type=file_type,
metadata=metadata,
)
]
# Try paragraph-based chunking
paragraphs = self._split_paragraphs(content)
if len(paragraphs) > 1:
return self._chunk_by_paragraphs(
paragraphs, source_path, file_type, metadata
)
# Fall back to sentence-based chunking
return self._chunk_by_sentences(
content, source_path, file_type, metadata
)
def _split_paragraphs(self, content: str) -> list[dict[str, Any]]:
"""Split content into paragraphs."""
paragraphs: list[dict[str, Any]] = []
# Split on double newlines (paragraph boundaries)
raw_paras = re.split(r"\n\s*\n", content)
line_num = 1
for para in raw_paras:
para = para.strip()
if not para:
continue
para_lines = para.count("\n") + 1
paragraphs.append({
"content": para,
"tokens": self.count_tokens(para),
"start_line": line_num,
"end_line": line_num + para_lines - 1,
})
line_num += para_lines + 1 # +1 for blank line between paragraphs
return paragraphs
def _chunk_by_paragraphs(
self,
paragraphs: list[dict[str, Any]],
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Chunk by combining paragraphs up to size limit."""
chunks: list[Chunk] = []
current_paras: list[str] = []
current_tokens = 0
chunk_start = paragraphs[0]["start_line"] if paragraphs else 1
chunk_end = chunk_start
for para in paragraphs:
para_content = para["content"]
para_tokens = para["tokens"]
# Handle paragraphs larger than chunk size
if para_tokens > self.chunk_size:
# Flush current content
if current_paras:
chunk_text = "\n\n".join(current_paras)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=chunk_start,
end_line=chunk_end,
file_type=file_type,
metadata=metadata,
)
)
current_paras = []
current_tokens = 0
# Split large paragraph
sub_chunks = self._split_large_text(
para_content,
source_path,
file_type,
metadata,
para["start_line"],
)
chunks.extend(sub_chunks)
chunk_start = para["end_line"] + 1
chunk_end = chunk_start
continue
# Check if adding paragraph exceeds limit
if current_tokens + para_tokens > self.chunk_size and current_paras:
chunk_text = "\n\n".join(current_paras)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=chunk_start,
end_line=chunk_end,
file_type=file_type,
metadata=metadata,
)
)
# Overlap: keep last paragraph if small enough
overlap_para = None
if current_paras and self.count_tokens(current_paras[-1]) <= self.chunk_overlap:
overlap_para = current_paras[-1]
current_paras = [overlap_para] if overlap_para else []
current_tokens = self.count_tokens(overlap_para) if overlap_para else 0
chunk_start = para["start_line"]
current_paras.append(para_content)
current_tokens += para_tokens
chunk_end = para["end_line"]
# Final chunk
if current_paras:
chunk_text = "\n\n".join(current_paras)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=chunk_start,
end_line=chunk_end,
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _chunk_by_sentences(
self,
content: str,
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Chunk by sentences."""
sentences = self._split_sentences(content)
if not sentences:
return []
chunks: list[Chunk] = []
current_sentences: list[str] = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.count_tokens(sentence)
# Handle sentences larger than chunk size
if sentence_tokens > self.chunk_size:
if current_sentences:
chunk_text = " ".join(current_sentences)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=1,
end_line=1,
file_type=file_type,
metadata=metadata,
)
)
current_sentences = []
current_tokens = 0
# Truncate large sentence
truncated = self.truncate_to_tokens(sentence, self.chunk_size)
chunks.append(
self._create_chunk(
content=truncated,
source_path=source_path,
start_line=1,
end_line=1,
file_type=file_type,
metadata={**metadata, "truncated": True},
)
)
continue
# Check if adding sentence exceeds limit
if current_tokens + sentence_tokens > self.chunk_size and current_sentences:
chunk_text = " ".join(current_sentences)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=1,
end_line=1,
file_type=file_type,
metadata=metadata,
)
)
# Overlap: keep last sentence if small enough
overlap = None
if current_sentences and self.count_tokens(current_sentences[-1]) <= self.chunk_overlap:
overlap = current_sentences[-1]
current_sentences = [overlap] if overlap else []
current_tokens = self.count_tokens(overlap) if overlap else 0
current_sentences.append(sentence)
current_tokens += sentence_tokens
# Final chunk
if current_sentences:
chunk_text = " ".join(current_sentences)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=1,
end_line=content.count("\n") + 1,
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _split_sentences(self, text: str) -> list[str]:
"""Split text into sentences."""
# Handle common sentence endings
# This is a simple approach - production might use nltk or spacy
sentence_pattern = re.compile(
r"(?<=[.!?])\s+(?=[A-Z])|" # Standard sentence ending
r"(?<=[.!?])\s*$|" # End of text
r"(?<=\n)\s*(?=\S)" # Newlines as boundaries
)
sentences = sentence_pattern.split(text)
return [s.strip() for s in sentences if s.strip()]
def _split_large_text(
self,
text: str,
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
base_line: int,
) -> list[Chunk]:
"""Split text that exceeds chunk size."""
# First try sentences
sentences = self._split_sentences(text)
if len(sentences) > 1:
return self._chunk_by_sentences(
text, source_path, file_type, metadata
)
# Fall back to word-based splitting
return self._chunk_by_words(
text, source_path, file_type, metadata, base_line
)
def _chunk_by_words(
self,
text: str,
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
base_line: int,
) -> list[Chunk]:
"""Last resort: chunk by words."""
words = text.split()
chunks: list[Chunk] = []
current_words: list[str] = []
current_tokens = 0
for word in words:
word_tokens = self.count_tokens(word + " ")
if current_tokens + word_tokens > self.chunk_size and current_words:
chunk_text = " ".join(current_words)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=base_line,
end_line=base_line,
file_type=file_type,
metadata=metadata,
)
)
# Word overlap
overlap_count = 0
overlap_words: list[str] = []
for w in reversed(current_words):
w_tokens = self.count_tokens(w + " ")
if overlap_count + w_tokens > self.chunk_overlap:
break
overlap_words.insert(0, w)
overlap_count += w_tokens
current_words = overlap_words
current_tokens = overlap_count
current_words.append(word)
current_tokens += word_tokens
# Final chunk
if current_words:
chunk_text = " ".join(current_words)
chunks.append(
self._create_chunk(
content=chunk_text,
source_path=source_path,
start_line=base_line,
end_line=base_line,
file_type=file_type,
metadata=metadata,
)
)
return chunks