Files
syndarix/mcp-servers/knowledge-base/chunking/code.py
Felipe Cardoso 51404216ae refactor(knowledge-base mcp server): adjust formatting for consistency and readability
Improved code formatting, line breaks, and indentation across chunking logic and multiple test modules to enhance code clarity and maintain consistent style. No functional changes made.
2026-01-06 17:20:31 +01:00

411 lines
14 KiB
Python

"""
Code-aware chunking implementation.
Provides intelligent chunking for source code that respects
function/class boundaries and preserves context.
"""
import logging
import re
from typing import Any
from chunking.base import BaseChunker
from config import Settings
from models import Chunk, ChunkType, FileType
logger = logging.getLogger(__name__)
# Language-specific patterns for detecting function/class definitions
LANGUAGE_PATTERNS: dict[FileType, dict[str, re.Pattern[str]]] = {
FileType.PYTHON: {
"function": re.compile(r"^(\s*)(async\s+)?def\s+\w+", re.MULTILINE),
"class": re.compile(r"^(\s*)class\s+\w+", re.MULTILINE),
"decorator": re.compile(r"^(\s*)@\w+", re.MULTILINE),
},
FileType.JAVASCRIPT: {
"function": re.compile(
r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|"
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?\(",
re.MULTILINE,
),
"class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE),
"arrow": re.compile(
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?(\([^)]*\)|[^=])\s*=>",
re.MULTILINE,
),
},
FileType.TYPESCRIPT: {
"function": re.compile(
r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|"
r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*[:<]",
re.MULTILINE,
),
"class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE),
"interface": re.compile(r"^(\s*)(export\s+)?interface\s+\w+", re.MULTILINE),
"type": re.compile(r"^(\s*)(export\s+)?type\s+\w+", re.MULTILINE),
},
FileType.GO: {
"function": re.compile(r"^func\s+(\([^)]+\)\s+)?\w+", re.MULTILINE),
"struct": re.compile(r"^type\s+\w+\s+struct", re.MULTILINE),
"interface": re.compile(r"^type\s+\w+\s+interface", re.MULTILINE),
},
FileType.RUST: {
"function": re.compile(r"^(\s*)(pub\s+)?(async\s+)?fn\s+\w+", re.MULTILINE),
"struct": re.compile(r"^(\s*)(pub\s+)?struct\s+\w+", re.MULTILINE),
"impl": re.compile(r"^(\s*)impl\s+", re.MULTILINE),
"trait": re.compile(r"^(\s*)(pub\s+)?trait\s+\w+", re.MULTILINE),
},
FileType.JAVA: {
"method": re.compile(
r"^(\s*)(public|private|protected)?\s*(static)?\s*\w+\s+\w+\s*\(",
re.MULTILINE,
),
"class": re.compile(
r"^(\s*)(public|private|protected)?\s*(abstract)?\s*class\s+\w+",
re.MULTILINE,
),
"interface": re.compile(
r"^(\s*)(public|private|protected)?\s*interface\s+\w+",
re.MULTILINE,
),
},
}
class CodeChunker(BaseChunker):
"""
Code-aware chunker that respects logical boundaries.
Features:
- Detects function/class boundaries
- Preserves decorator/annotation context
- Handles nested structures
- Falls back to line-based chunking when needed
"""
def __init__(
self,
chunk_size: int,
chunk_overlap: int,
settings: Settings | None = None,
) -> None:
"""Initialize code chunker."""
super().__init__(chunk_size, chunk_overlap, settings)
@property
def chunk_type(self) -> ChunkType:
"""Get chunk type."""
return ChunkType.CODE
def chunk(
self,
content: str,
source_path: str | None = None,
file_type: FileType | None = None,
metadata: dict[str, Any] | None = None,
) -> list[Chunk]:
"""
Chunk code content.
Tries to respect function/class boundaries, falling back
to line-based chunking if needed.
"""
if not content.strip():
return []
metadata = metadata or {}
lines = content.splitlines(keepends=True)
# Try language-aware chunking if we have patterns
if file_type and file_type in LANGUAGE_PATTERNS:
chunks = self._chunk_by_structure(
content, lines, file_type, source_path, metadata
)
if chunks:
return chunks
# Fall back to line-based chunking
return self._chunk_by_lines(lines, source_path, file_type, metadata)
def _chunk_by_structure(
self,
content: str,
lines: list[str],
file_type: FileType,
source_path: str | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""
Chunk by detecting code structure (functions, classes).
Returns empty list if structure detection isn't useful.
"""
patterns = LANGUAGE_PATTERNS.get(file_type, {})
if not patterns:
return []
# Find all structure boundaries
boundaries: list[tuple[int, str]] = [] # (line_number, type)
for struct_type, pattern in patterns.items():
for match in pattern.finditer(content):
# Convert character position to line number
line_num = content[: match.start()].count("\n")
boundaries.append((line_num, struct_type))
if not boundaries:
return []
# Sort boundaries by line number
boundaries.sort(key=lambda x: x[0])
# If we have very few boundaries, line-based might be better
if len(boundaries) < 3 and len(lines) > 50:
return []
# Create chunks based on boundaries
chunks: list[Chunk] = []
current_start = 0
for _i, (line_num, struct_type) in enumerate(boundaries):
# Check if we need to create a chunk before this boundary
if line_num > current_start:
# Include any preceding comments/decorators
actual_start = self._find_context_start(lines, line_num)
if actual_start < current_start:
actual_start = current_start
chunk_lines = lines[current_start:line_num]
chunk_content = "".join(chunk_lines)
if chunk_content.strip():
token_count = self.count_tokens(chunk_content)
# If chunk is too large, split it
if token_count > self.chunk_size * 1.5:
sub_chunks = self._split_large_chunk(
chunk_lines, current_start, source_path, file_type, metadata
)
chunks.extend(sub_chunks)
elif token_count > 0:
chunks.append(
self._create_chunk(
content=chunk_content.rstrip(),
source_path=source_path,
start_line=current_start + 1,
end_line=line_num,
file_type=file_type,
metadata={**metadata, "structure_type": struct_type},
)
)
current_start = line_num
# Handle remaining content
if current_start < len(lines):
chunk_lines = lines[current_start:]
chunk_content = "".join(chunk_lines)
if chunk_content.strip():
token_count = self.count_tokens(chunk_content)
if token_count > self.chunk_size * 1.5:
sub_chunks = self._split_large_chunk(
chunk_lines, current_start, source_path, file_type, metadata
)
chunks.extend(sub_chunks)
else:
chunks.append(
self._create_chunk(
content=chunk_content.rstrip(),
source_path=source_path,
start_line=current_start + 1,
end_line=len(lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _find_context_start(self, lines: list[str], line_num: int) -> int:
"""Find the start of context (decorators, comments) before a line."""
start = line_num
# Look backwards for decorators/comments
for i in range(line_num - 1, max(0, line_num - 10), -1):
line = lines[i].strip()
if not line:
continue
if line.startswith(("#", "//", "/*", "*", "@", "'")):
start = i
else:
break
return start
def _split_large_chunk(
self,
chunk_lines: list[str],
base_line: int,
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Split a large chunk into smaller pieces with overlap."""
chunks: list[Chunk] = []
current_lines: list[str] = []
current_tokens = 0
chunk_start = 0
for i, line in enumerate(chunk_lines):
line_tokens = self.count_tokens(line)
if current_tokens + line_tokens > self.chunk_size and current_lines:
# Create chunk
chunk_content = "".join(current_lines).rstrip()
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=base_line + chunk_start + 1,
end_line=base_line + i,
file_type=file_type,
metadata=metadata,
)
)
# Calculate overlap
overlap_tokens = 0
overlap_lines: list[str] = []
for j in range(len(current_lines) - 1, -1, -1):
overlap_tokens += self.count_tokens(current_lines[j])
if overlap_tokens >= self.chunk_overlap:
overlap_lines = current_lines[j:]
break
current_lines = overlap_lines
current_tokens = sum(self.count_tokens(line) for line in current_lines)
chunk_start = i - len(overlap_lines)
current_lines.append(line)
current_tokens += line_tokens
# Final chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=base_line + chunk_start + 1,
end_line=base_line + len(chunk_lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks
def _chunk_by_lines(
self,
lines: list[str],
source_path: str | None,
file_type: FileType | None,
metadata: dict[str, Any],
) -> list[Chunk]:
"""Chunk by lines with overlap."""
chunks: list[Chunk] = []
current_lines: list[str] = []
current_tokens = 0
chunk_start = 0
for i, line in enumerate(lines):
line_tokens = self.count_tokens(line)
# If this line alone exceeds chunk size, handle specially
if line_tokens > self.chunk_size:
# Flush current chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=i,
file_type=file_type,
metadata=metadata,
)
)
current_lines = []
current_tokens = 0
chunk_start = i
# Truncate and add long line
truncated = self.truncate_to_tokens(line, self.chunk_size)
chunks.append(
self._create_chunk(
content=truncated.rstrip(),
source_path=source_path,
start_line=i + 1,
end_line=i + 1,
file_type=file_type,
metadata={**metadata, "truncated": True},
)
)
chunk_start = i + 1
continue
if current_tokens + line_tokens > self.chunk_size and current_lines:
# Create chunk
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=i,
file_type=file_type,
metadata=metadata,
)
)
# Calculate overlap
overlap_tokens = 0
overlap_lines: list[str] = []
for j in range(len(current_lines) - 1, -1, -1):
line_tok = self.count_tokens(current_lines[j])
if overlap_tokens + line_tok > self.chunk_overlap:
break
overlap_lines.insert(0, current_lines[j])
overlap_tokens += line_tok
current_lines = overlap_lines
current_tokens = overlap_tokens
chunk_start = i - len(overlap_lines)
current_lines.append(line)
current_tokens += line_tokens
# Final chunk
if current_lines:
chunk_content = "".join(current_lines).rstrip()
if chunk_content.strip():
chunks.append(
self._create_chunk(
content=chunk_content,
source_path=source_path,
start_line=chunk_start + 1,
end_line=len(lines),
file_type=file_type,
metadata=metadata,
)
)
return chunks