""" Code-aware chunking implementation. Provides intelligent chunking for source code that respects function/class boundaries and preserves context. """ import logging import re from typing import Any from chunking.base import BaseChunker from config import Settings from models import Chunk, ChunkType, FileType logger = logging.getLogger(__name__) # Language-specific patterns for detecting function/class definitions LANGUAGE_PATTERNS: dict[FileType, dict[str, re.Pattern[str]]] = { FileType.PYTHON: { "function": re.compile(r"^(\s*)(async\s+)?def\s+\w+", re.MULTILINE), "class": re.compile(r"^(\s*)class\s+\w+", re.MULTILINE), "decorator": re.compile(r"^(\s*)@\w+", re.MULTILINE), }, FileType.JAVASCRIPT: { "function": re.compile( r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|" r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?\(", re.MULTILINE, ), "class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE), "arrow": re.compile( r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*=\s*(async\s+)?(\([^)]*\)|[^=])\s*=>", re.MULTILINE, ), }, FileType.TYPESCRIPT: { "function": re.compile( r"^(\s*)(export\s+)?(async\s+)?function\s+\w+|" r"^(\s*)(export\s+)?(const|let|var)\s+\w+\s*[:<]", re.MULTILINE, ), "class": re.compile(r"^(\s*)(export\s+)?class\s+\w+", re.MULTILINE), "interface": re.compile(r"^(\s*)(export\s+)?interface\s+\w+", re.MULTILINE), "type": re.compile(r"^(\s*)(export\s+)?type\s+\w+", re.MULTILINE), }, FileType.GO: { "function": re.compile(r"^func\s+(\([^)]+\)\s+)?\w+", re.MULTILINE), "struct": re.compile(r"^type\s+\w+\s+struct", re.MULTILINE), "interface": re.compile(r"^type\s+\w+\s+interface", re.MULTILINE), }, FileType.RUST: { "function": re.compile(r"^(\s*)(pub\s+)?(async\s+)?fn\s+\w+", re.MULTILINE), "struct": re.compile(r"^(\s*)(pub\s+)?struct\s+\w+", re.MULTILINE), "impl": re.compile(r"^(\s*)impl\s+", re.MULTILINE), "trait": re.compile(r"^(\s*)(pub\s+)?trait\s+\w+", re.MULTILINE), }, FileType.JAVA: { "method": re.compile( r"^(\s*)(public|private|protected)?\s*(static)?\s*\w+\s+\w+\s*\(", re.MULTILINE, ), "class": re.compile( r"^(\s*)(public|private|protected)?\s*(abstract)?\s*class\s+\w+", re.MULTILINE, ), "interface": re.compile( r"^(\s*)(public|private|protected)?\s*interface\s+\w+", re.MULTILINE, ), }, } class CodeChunker(BaseChunker): """ Code-aware chunker that respects logical boundaries. Features: - Detects function/class boundaries - Preserves decorator/annotation context - Handles nested structures - Falls back to line-based chunking when needed """ def __init__( self, chunk_size: int, chunk_overlap: int, settings: Settings | None = None, ) -> None: """Initialize code chunker.""" super().__init__(chunk_size, chunk_overlap, settings) @property def chunk_type(self) -> ChunkType: """Get chunk type.""" return ChunkType.CODE def chunk( self, content: str, source_path: str | None = None, file_type: FileType | None = None, metadata: dict[str, Any] | None = None, ) -> list[Chunk]: """ Chunk code content. Tries to respect function/class boundaries, falling back to line-based chunking if needed. """ if not content.strip(): return [] metadata = metadata or {} lines = content.splitlines(keepends=True) # Try language-aware chunking if we have patterns if file_type and file_type in LANGUAGE_PATTERNS: chunks = self._chunk_by_structure( content, lines, file_type, source_path, metadata ) if chunks: return chunks # Fall back to line-based chunking return self._chunk_by_lines(lines, source_path, file_type, metadata) def _chunk_by_structure( self, content: str, lines: list[str], file_type: FileType, source_path: str | None, metadata: dict[str, Any], ) -> list[Chunk]: """ Chunk by detecting code structure (functions, classes). Returns empty list if structure detection isn't useful. """ patterns = LANGUAGE_PATTERNS.get(file_type, {}) if not patterns: return [] # Find all structure boundaries boundaries: list[tuple[int, str]] = [] # (line_number, type) for struct_type, pattern in patterns.items(): for match in pattern.finditer(content): # Convert character position to line number line_num = content[: match.start()].count("\n") boundaries.append((line_num, struct_type)) if not boundaries: return [] # Sort boundaries by line number boundaries.sort(key=lambda x: x[0]) # If we have very few boundaries, line-based might be better if len(boundaries) < 3 and len(lines) > 50: return [] # Create chunks based on boundaries chunks: list[Chunk] = [] current_start = 0 for _i, (line_num, struct_type) in enumerate(boundaries): # Check if we need to create a chunk before this boundary if line_num > current_start: # Include any preceding comments/decorators actual_start = self._find_context_start(lines, line_num) if actual_start < current_start: actual_start = current_start chunk_lines = lines[current_start:line_num] chunk_content = "".join(chunk_lines) if chunk_content.strip(): token_count = self.count_tokens(chunk_content) # If chunk is too large, split it if token_count > self.chunk_size * 1.5: sub_chunks = self._split_large_chunk( chunk_lines, current_start, source_path, file_type, metadata ) chunks.extend(sub_chunks) elif token_count > 0: chunks.append( self._create_chunk( content=chunk_content.rstrip(), source_path=source_path, start_line=current_start + 1, end_line=line_num, file_type=file_type, metadata={**metadata, "structure_type": struct_type}, ) ) current_start = line_num # Handle remaining content if current_start < len(lines): chunk_lines = lines[current_start:] chunk_content = "".join(chunk_lines) if chunk_content.strip(): token_count = self.count_tokens(chunk_content) if token_count > self.chunk_size * 1.5: sub_chunks = self._split_large_chunk( chunk_lines, current_start, source_path, file_type, metadata ) chunks.extend(sub_chunks) else: chunks.append( self._create_chunk( content=chunk_content.rstrip(), source_path=source_path, start_line=current_start + 1, end_line=len(lines), file_type=file_type, metadata=metadata, ) ) return chunks def _find_context_start(self, lines: list[str], line_num: int) -> int: """Find the start of context (decorators, comments) before a line.""" start = line_num # Look backwards for decorators/comments for i in range(line_num - 1, max(0, line_num - 10), -1): line = lines[i].strip() if not line: continue if line.startswith(("#", "//", "/*", "*", "@", "'")): start = i else: break return start def _split_large_chunk( self, chunk_lines: list[str], base_line: int, source_path: str | None, file_type: FileType | None, metadata: dict[str, Any], ) -> list[Chunk]: """Split a large chunk into smaller pieces with overlap.""" chunks: list[Chunk] = [] current_lines: list[str] = [] current_tokens = 0 chunk_start = 0 for i, line in enumerate(chunk_lines): line_tokens = self.count_tokens(line) if current_tokens + line_tokens > self.chunk_size and current_lines: # Create chunk chunk_content = "".join(current_lines).rstrip() chunks.append( self._create_chunk( content=chunk_content, source_path=source_path, start_line=base_line + chunk_start + 1, end_line=base_line + i, file_type=file_type, metadata=metadata, ) ) # Calculate overlap overlap_tokens = 0 overlap_lines: list[str] = [] for j in range(len(current_lines) - 1, -1, -1): overlap_tokens += self.count_tokens(current_lines[j]) if overlap_tokens >= self.chunk_overlap: overlap_lines = current_lines[j:] break current_lines = overlap_lines current_tokens = sum(self.count_tokens(line) for line in current_lines) chunk_start = i - len(overlap_lines) current_lines.append(line) current_tokens += line_tokens # Final chunk if current_lines: chunk_content = "".join(current_lines).rstrip() if chunk_content.strip(): chunks.append( self._create_chunk( content=chunk_content, source_path=source_path, start_line=base_line + chunk_start + 1, end_line=base_line + len(chunk_lines), file_type=file_type, metadata=metadata, ) ) return chunks def _chunk_by_lines( self, lines: list[str], source_path: str | None, file_type: FileType | None, metadata: dict[str, Any], ) -> list[Chunk]: """Chunk by lines with overlap.""" chunks: list[Chunk] = [] current_lines: list[str] = [] current_tokens = 0 chunk_start = 0 for i, line in enumerate(lines): line_tokens = self.count_tokens(line) # If this line alone exceeds chunk size, handle specially if line_tokens > self.chunk_size: # Flush current chunk if current_lines: chunk_content = "".join(current_lines).rstrip() if chunk_content.strip(): chunks.append( self._create_chunk( content=chunk_content, source_path=source_path, start_line=chunk_start + 1, end_line=i, file_type=file_type, metadata=metadata, ) ) current_lines = [] current_tokens = 0 chunk_start = i # Truncate and add long line truncated = self.truncate_to_tokens(line, self.chunk_size) chunks.append( self._create_chunk( content=truncated.rstrip(), source_path=source_path, start_line=i + 1, end_line=i + 1, file_type=file_type, metadata={**metadata, "truncated": True}, ) ) chunk_start = i + 1 continue if current_tokens + line_tokens > self.chunk_size and current_lines: # Create chunk chunk_content = "".join(current_lines).rstrip() if chunk_content.strip(): chunks.append( self._create_chunk( content=chunk_content, source_path=source_path, start_line=chunk_start + 1, end_line=i, file_type=file_type, metadata=metadata, ) ) # Calculate overlap overlap_tokens = 0 overlap_lines: list[str] = [] for j in range(len(current_lines) - 1, -1, -1): line_tok = self.count_tokens(current_lines[j]) if overlap_tokens + line_tok > self.chunk_overlap: break overlap_lines.insert(0, current_lines[j]) overlap_tokens += line_tok current_lines = overlap_lines current_tokens = overlap_tokens chunk_start = i - len(overlap_lines) current_lines.append(line) current_tokens += line_tokens # Final chunk if current_lines: chunk_content = "".join(current_lines).rstrip() if chunk_content.strip(): chunks.append( self._create_chunk( content=chunk_content, source_path=source_path, start_line=chunk_start + 1, end_line=len(lines), file_type=file_type, metadata=metadata, ) ) return chunks