feat(context): implement assembly pipeline and compression (#82)

Phase 4 of Context Management Engine - Assembly Pipeline:

- Add TruncationStrategy with end/middle/sentence-aware truncation
- Add TruncationResult dataclass for tracking compression metrics
- Add ContextCompressor for type-specific compression
- Add ContextPipeline orchestrating full assembly workflow:
  - Token counting for all contexts
  - Scoring and ranking via ContextRanker
  - Optional compression when budget threshold exceeded
  - Model-specific formatting (XML for Claude, markdown for OpenAI)
- Add PipelineMetrics for performance tracking
- Update AssembledContext with new fields (model, contexts, metadata)
- Add backward compatibility aliases for renamed fields

Tests: 34 new tests, 223 total context tests passing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-04 02:32:25 +01:00
parent 0d2005ddcb
commit 6b07e62f00
9 changed files with 1631 additions and 23 deletions

View File

@@ -63,6 +63,19 @@ from .exceptions import (
TokenCountError,
)
# Assembly
from .assembly import (
ContextPipeline,
PipelineMetrics,
)
# Compression
from .compression import (
ContextCompressor,
TruncationResult,
TruncationStrategy,
)
# Prioritization
from .prioritization import (
ContextRanker,
@@ -97,10 +110,17 @@ from .types import (
)
__all__ = [
# Assembly
"ContextPipeline",
"PipelineMetrics",
# Budget Management
"BudgetAllocator",
"TokenBudget",
"TokenCalculator",
# Compression
"ContextCompressor",
"TruncationResult",
"TruncationStrategy",
# Configuration
"ContextSettings",
"get_context_settings",

View File

@@ -3,3 +3,10 @@ Context Assembly Module.
Provides the assembly pipeline and formatting.
"""
from .pipeline import ContextPipeline, PipelineMetrics
__all__ = [
"ContextPipeline",
"PipelineMetrics",
]

View File

@@ -0,0 +1,432 @@
"""
Context Assembly Pipeline.
Orchestrates the full context assembly workflow:
Gather → Count → Score → Rank → Compress → Format
"""
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from ..budget import BudgetAllocator, TokenBudget, TokenCalculator
from ..compression.truncation import ContextCompressor
from ..config import ContextSettings, get_context_settings
from ..exceptions import AssemblyTimeoutError
from ..prioritization import ContextRanker
from ..scoring import CompositeScorer
from ..types import AssembledContext, BaseContext, ContextType
if TYPE_CHECKING:
from app.services.mcp.client_manager import MCPClientManager
logger = logging.getLogger(__name__)
@dataclass
class PipelineMetrics:
"""Metrics from pipeline execution."""
start_time: datetime = field(default_factory=lambda: datetime.now(UTC))
end_time: datetime | None = None
total_contexts: int = 0
selected_contexts: int = 0
excluded_contexts: int = 0
compressed_contexts: int = 0
total_tokens: int = 0
assembly_time_ms: float = 0.0
scoring_time_ms: float = 0.0
compression_time_ms: float = 0.0
formatting_time_ms: float = 0.0
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat() if self.end_time else None,
"total_contexts": self.total_contexts,
"selected_contexts": self.selected_contexts,
"excluded_contexts": self.excluded_contexts,
"compressed_contexts": self.compressed_contexts,
"total_tokens": self.total_tokens,
"assembly_time_ms": round(self.assembly_time_ms, 2),
"scoring_time_ms": round(self.scoring_time_ms, 2),
"compression_time_ms": round(self.compression_time_ms, 2),
"formatting_time_ms": round(self.formatting_time_ms, 2),
}
class ContextPipeline:
"""
Context assembly pipeline.
Orchestrates the full workflow of context assembly:
1. Validate and count tokens for all contexts
2. Score contexts based on relevance, recency, and priority
3. Rank and select contexts within budget
4. Compress if needed to fit remaining budget
5. Format for the target model
"""
def __init__(
self,
mcp_manager: "MCPClientManager | None" = None,
settings: ContextSettings | None = None,
calculator: TokenCalculator | None = None,
scorer: CompositeScorer | None = None,
ranker: ContextRanker | None = None,
compressor: ContextCompressor | None = None,
) -> None:
"""
Initialize the context pipeline.
Args:
mcp_manager: MCP client manager for LLM Gateway integration
settings: Context settings
calculator: Token calculator
scorer: Context scorer
ranker: Context ranker
compressor: Context compressor
"""
self._settings = settings or get_context_settings()
self._mcp = mcp_manager
# Initialize components
self._calculator = calculator or TokenCalculator(mcp_manager=mcp_manager)
self._scorer = scorer or CompositeScorer(
mcp_manager=mcp_manager, settings=self._settings
)
self._ranker = ranker or ContextRanker(
scorer=self._scorer, calculator=self._calculator
)
self._compressor = compressor or ContextCompressor(
calculator=self._calculator
)
self._allocator = BudgetAllocator(self._settings)
def set_mcp_manager(self, mcp_manager: "MCPClientManager") -> None:
"""Set MCP manager for all components."""
self._mcp = mcp_manager
self._calculator.set_mcp_manager(mcp_manager)
self._scorer.set_mcp_manager(mcp_manager)
async def assemble(
self,
contexts: list[BaseContext],
query: str,
model: str,
max_tokens: int | None = None,
custom_budget: TokenBudget | None = None,
compress: bool = True,
format_output: bool = True,
timeout_ms: int | None = None,
) -> AssembledContext:
"""
Assemble context for an LLM request.
This is the main entry point for context assembly.
Args:
contexts: List of contexts to assemble
query: Query to optimize for
model: Target model name
max_tokens: Maximum total tokens (uses model default if None)
custom_budget: Optional pre-configured budget
compress: Whether to compress oversized contexts
format_output: Whether to format the final output
timeout_ms: Maximum assembly time in milliseconds
Returns:
AssembledContext with optimized content
Raises:
AssemblyTimeoutError: If assembly exceeds timeout
"""
timeout = timeout_ms or self._settings.max_assembly_time_ms
start = time.perf_counter()
metrics = PipelineMetrics(total_contexts=len(contexts))
try:
# Create or use budget
if custom_budget:
budget = custom_budget
elif max_tokens:
budget = self._allocator.create_budget(max_tokens)
else:
budget = self._allocator.create_budget_for_model(model)
# 1. Count tokens for all contexts
await self._ensure_token_counts(contexts, model)
# Check timeout
self._check_timeout(start, timeout, "token counting")
# 2. Score and rank contexts
scoring_start = time.perf_counter()
ranking_result = await self._ranker.rank(
contexts=contexts,
query=query,
budget=budget,
model=model,
)
metrics.scoring_time_ms = (time.perf_counter() - scoring_start) * 1000
selected_contexts = ranking_result.selected_contexts
metrics.selected_contexts = len(selected_contexts)
metrics.excluded_contexts = len(ranking_result.excluded)
# Check timeout
self._check_timeout(start, timeout, "scoring")
# 3. Compress if needed and enabled
if compress and self._needs_compression(selected_contexts, budget):
compression_start = time.perf_counter()
selected_contexts = await self._compressor.compress_contexts(
selected_contexts, budget, model
)
metrics.compression_time_ms = (
time.perf_counter() - compression_start
) * 1000
metrics.compressed_contexts = sum(
1 for c in selected_contexts if c.metadata.get("truncated", False)
)
# Check timeout
self._check_timeout(start, timeout, "compression")
# 4. Format output
formatting_start = time.perf_counter()
if format_output:
formatted_content = self._format_contexts(selected_contexts, model)
else:
formatted_content = "\n\n".join(c.content for c in selected_contexts)
metrics.formatting_time_ms = (time.perf_counter() - formatting_start) * 1000
# Calculate final metrics
total_tokens = sum(c.token_count or 0 for c in selected_contexts)
metrics.total_tokens = total_tokens
metrics.assembly_time_ms = (time.perf_counter() - start) * 1000
metrics.end_time = datetime.now(UTC)
return AssembledContext(
content=formatted_content,
total_tokens=total_tokens,
context_count=len(selected_contexts),
assembly_time_ms=metrics.assembly_time_ms,
model=model,
contexts=selected_contexts,
excluded_count=metrics.excluded_contexts,
metadata={
"metrics": metrics.to_dict(),
"query": query,
"budget": budget.to_dict(),
},
)
except AssemblyTimeoutError:
raise
except Exception as e:
logger.error(f"Context assembly failed: {e}", exc_info=True)
raise
async def _ensure_token_counts(
self,
contexts: list[BaseContext],
model: str | None = None,
) -> None:
"""Ensure all contexts have token counts."""
tasks = []
for context in contexts:
if context.token_count is None:
tasks.append(self._count_and_set(context, model))
if tasks:
await asyncio.gather(*tasks)
async def _count_and_set(
self,
context: BaseContext,
model: str | None = None,
) -> None:
"""Count tokens and set on context."""
count = await self._calculator.count_tokens(context.content, model)
context.token_count = count
def _needs_compression(
self,
contexts: list[BaseContext],
budget: TokenBudget,
) -> bool:
"""Check if any contexts exceed their type budget."""
# Group by type and check totals
by_type: dict[ContextType, int] = {}
for context in contexts:
ct = context.get_type()
by_type[ct] = by_type.get(ct, 0) + (context.token_count or 0)
for ct, total in by_type.items():
if total > budget.get_allocation(ct):
return True
# Also check if utilization exceeds threshold
return budget.utilization() > self._settings.compression_threshold
def _format_contexts(
self,
contexts: list[BaseContext],
model: str,
) -> str:
"""
Format contexts for the target model.
Groups contexts by type and applies model-specific formatting.
"""
# Group by type
by_type: dict[ContextType, list[BaseContext]] = {}
for context in contexts:
ct = context.get_type()
if ct not in by_type:
by_type[ct] = []
by_type[ct].append(context)
# Order types: System -> Task -> Knowledge -> Conversation -> Tool
type_order = [
ContextType.SYSTEM,
ContextType.TASK,
ContextType.KNOWLEDGE,
ContextType.CONVERSATION,
ContextType.TOOL,
]
parts: list[str] = []
for ct in type_order:
if ct in by_type:
formatted = self._format_type(by_type[ct], ct, model)
if formatted:
parts.append(formatted)
return "\n\n".join(parts)
def _format_type(
self,
contexts: list[BaseContext],
context_type: ContextType,
model: str,
) -> str:
"""Format contexts of a specific type."""
if not contexts:
return ""
# Check if model prefers XML tags (Claude)
use_xml = "claude" in model.lower()
if context_type == ContextType.SYSTEM:
return self._format_system(contexts, use_xml)
elif context_type == ContextType.TASK:
return self._format_task(contexts, use_xml)
elif context_type == ContextType.KNOWLEDGE:
return self._format_knowledge(contexts, use_xml)
elif context_type == ContextType.CONVERSATION:
return self._format_conversation(contexts, use_xml)
elif context_type == ContextType.TOOL:
return self._format_tool(contexts, use_xml)
return "\n".join(c.content for c in contexts)
def _format_system(
self, contexts: list[BaseContext], use_xml: bool
) -> str:
"""Format system contexts."""
content = "\n\n".join(c.content for c in contexts)
if use_xml:
return f"<system_instructions>\n{content}\n</system_instructions>"
return content
def _format_task(
self, contexts: list[BaseContext], use_xml: bool
) -> str:
"""Format task contexts."""
content = "\n\n".join(c.content for c in contexts)
if use_xml:
return f"<current_task>\n{content}\n</current_task>"
return f"## Current Task\n\n{content}"
def _format_knowledge(
self, contexts: list[BaseContext], use_xml: bool
) -> str:
"""Format knowledge contexts."""
if use_xml:
parts = ["<reference_documents>"]
for ctx in contexts:
parts.append(f'<document source="{ctx.source}">')
parts.append(ctx.content)
parts.append("</document>")
parts.append("</reference_documents>")
return "\n".join(parts)
else:
parts = ["## Reference Documents\n"]
for ctx in contexts:
parts.append(f"### Source: {ctx.source}\n")
parts.append(ctx.content)
parts.append("")
return "\n".join(parts)
def _format_conversation(
self, contexts: list[BaseContext], use_xml: bool
) -> str:
"""Format conversation contexts."""
if use_xml:
parts = ["<conversation_history>"]
for ctx in contexts:
role = ctx.metadata.get("role", "user")
parts.append(f'<message role="{role}">')
parts.append(ctx.content)
parts.append("</message>")
parts.append("</conversation_history>")
return "\n".join(parts)
else:
parts = []
for ctx in contexts:
role = ctx.metadata.get("role", "user")
parts.append(f"**{role.upper()}**: {ctx.content}")
return "\n\n".join(parts)
def _format_tool(
self, contexts: list[BaseContext], use_xml: bool
) -> str:
"""Format tool contexts."""
if use_xml:
parts = ["<tool_results>"]
for ctx in contexts:
tool_name = ctx.metadata.get("tool_name", "unknown")
parts.append(f'<tool_result name="{tool_name}">')
parts.append(ctx.content)
parts.append("</tool_result>")
parts.append("</tool_results>")
return "\n".join(parts)
else:
parts = ["## Recent Tool Results\n"]
for ctx in contexts:
tool_name = ctx.metadata.get("tool_name", "unknown")
parts.append(f"### Tool: {tool_name}\n")
parts.append(f"```\n{ctx.content}\n```")
parts.append("")
return "\n".join(parts)
def _check_timeout(
self,
start: float,
timeout_ms: int,
phase: str,
) -> None:
"""Check if timeout exceeded and raise if so."""
elapsed_ms = (time.perf_counter() - start) * 1000
if elapsed_ms > timeout_ms:
raise AssemblyTimeoutError(
message=f"Context assembly timed out during {phase}",
elapsed_ms=elapsed_ms,
timeout_ms=timeout_ms,
)

View File

@@ -3,3 +3,11 @@ Context Compression Module.
Provides truncation and compression strategies.
"""
from .truncation import ContextCompressor, TruncationResult, TruncationStrategy
__all__ = [
"ContextCompressor",
"TruncationResult",
"TruncationStrategy",
]

View File

@@ -0,0 +1,391 @@
"""
Smart Truncation for Context Compression.
Provides intelligent truncation strategies to reduce context size
while preserving the most important information.
"""
import logging
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
from ..types import BaseContext, ContextType
if TYPE_CHECKING:
from ..budget import TokenBudget, TokenCalculator
logger = logging.getLogger(__name__)
@dataclass
class TruncationResult:
"""Result of truncation operation."""
original_tokens: int
truncated_tokens: int
content: str
truncated: bool
truncation_ratio: float # 0.0 = no truncation, 1.0 = completely removed
@property
def tokens_saved(self) -> int:
"""Calculate tokens saved by truncation."""
return self.original_tokens - self.truncated_tokens
class TruncationStrategy:
"""
Smart truncation strategies for context compression.
Strategies:
1. End truncation: Cut from end (for knowledge/docs)
2. Middle truncation: Keep start and end (for code)
3. Sentence-aware: Truncate at sentence boundaries
4. Semantic chunking: Keep most relevant chunks
"""
# Default truncation marker
TRUNCATION_MARKER = "\n\n[...content truncated...]\n\n"
def __init__(
self,
calculator: "TokenCalculator | None" = None,
preserve_ratio_start: float = 0.7, # Keep 70% from start by default
min_content_length: int = 100, # Minimum characters to keep
) -> None:
"""
Initialize truncation strategy.
Args:
calculator: Token calculator for accurate counting
preserve_ratio_start: Ratio of content to keep from start
min_content_length: Minimum characters to preserve
"""
self._calculator = calculator
self._preserve_ratio_start = preserve_ratio_start
self._min_content_length = min_content_length
def set_calculator(self, calculator: "TokenCalculator") -> None:
"""Set token calculator."""
self._calculator = calculator
async def truncate_to_tokens(
self,
content: str,
max_tokens: int,
strategy: str = "end",
model: str | None = None,
) -> TruncationResult:
"""
Truncate content to fit within token limit.
Args:
content: Content to truncate
max_tokens: Maximum tokens allowed
strategy: Truncation strategy ('end', 'middle', 'sentence')
model: Model for token counting
Returns:
TruncationResult with truncated content
"""
if not content:
return TruncationResult(
original_tokens=0,
truncated_tokens=0,
content="",
truncated=False,
truncation_ratio=0.0,
)
# Get original token count
original_tokens = await self._count_tokens(content, model)
if original_tokens <= max_tokens:
return TruncationResult(
original_tokens=original_tokens,
truncated_tokens=original_tokens,
content=content,
truncated=False,
truncation_ratio=0.0,
)
# Apply truncation strategy
if strategy == "middle":
truncated = await self._truncate_middle(content, max_tokens, model)
elif strategy == "sentence":
truncated = await self._truncate_sentence(content, max_tokens, model)
else: # "end"
truncated = await self._truncate_end(content, max_tokens, model)
truncated_tokens = await self._count_tokens(truncated, model)
return TruncationResult(
original_tokens=original_tokens,
truncated_tokens=truncated_tokens,
content=truncated,
truncated=True,
truncation_ratio=1 - (truncated_tokens / original_tokens),
)
async def _truncate_end(
self,
content: str,
max_tokens: int,
model: str | None = None,
) -> str:
"""
Truncate from end of content.
Simple but effective for most content types.
"""
# Binary search for optimal truncation point
marker_tokens = await self._count_tokens(self.TRUNCATION_MARKER, model)
available_tokens = max_tokens - marker_tokens
# Estimate characters per token
chars_per_token = len(content) / await self._count_tokens(content, model)
# Start with estimated position
estimated_chars = int(available_tokens * chars_per_token)
truncated = content[:estimated_chars]
# Refine with binary search
low, high = len(truncated) // 2, len(truncated)
best = truncated
for _ in range(5): # Max 5 iterations
mid = (low + high) // 2
candidate = content[:mid]
tokens = await self._count_tokens(candidate, model)
if tokens <= available_tokens:
best = candidate
low = mid + 1
else:
high = mid - 1
return best + self.TRUNCATION_MARKER
async def _truncate_middle(
self,
content: str,
max_tokens: int,
model: str | None = None,
) -> str:
"""
Truncate from middle, keeping start and end.
Good for code or content where context at boundaries matters.
"""
marker_tokens = await self._count_tokens(self.TRUNCATION_MARKER, model)
available_tokens = max_tokens - marker_tokens
# Split between start and end
start_tokens = int(available_tokens * self._preserve_ratio_start)
end_tokens = available_tokens - start_tokens
# Get start portion
start_content = await self._get_content_for_tokens(
content, start_tokens, from_start=True, model=model
)
# Get end portion
end_content = await self._get_content_for_tokens(
content, end_tokens, from_start=False, model=model
)
return start_content + self.TRUNCATION_MARKER + end_content
async def _truncate_sentence(
self,
content: str,
max_tokens: int,
model: str | None = None,
) -> str:
"""
Truncate at sentence boundaries.
Produces cleaner output by not cutting mid-sentence.
"""
# Split into sentences
sentences = re.split(r"(?<=[.!?])\s+", content)
result: list[str] = []
total_tokens = 0
marker_tokens = await self._count_tokens(self.TRUNCATION_MARKER, model)
available = max_tokens - marker_tokens
for sentence in sentences:
sentence_tokens = await self._count_tokens(sentence, model)
if total_tokens + sentence_tokens <= available:
result.append(sentence)
total_tokens += sentence_tokens
else:
break
if len(result) < len(sentences):
return " ".join(result) + self.TRUNCATION_MARKER
return " ".join(result)
async def _get_content_for_tokens(
self,
content: str,
target_tokens: int,
from_start: bool = True,
model: str | None = None,
) -> str:
"""Get portion of content fitting within token limit."""
if target_tokens <= 0:
return ""
current_tokens = await self._count_tokens(content, model)
if current_tokens <= target_tokens:
return content
# Estimate characters
chars_per_token = len(content) / current_tokens
estimated_chars = int(target_tokens * chars_per_token)
if from_start:
return content[:estimated_chars]
else:
return content[-estimated_chars:]
async def _count_tokens(self, text: str, model: str | None = None) -> int:
"""Count tokens using calculator or estimation."""
if self._calculator is not None:
return await self._calculator.count_tokens(text, model)
# Fallback estimation
return max(1, len(text) // 4)
class ContextCompressor:
"""
Compresses contexts to fit within budget constraints.
Uses truncation strategies to reduce context size while
preserving the most important information.
"""
def __init__(
self,
truncation: TruncationStrategy | None = None,
calculator: "TokenCalculator | None" = None,
) -> None:
"""
Initialize context compressor.
Args:
truncation: Truncation strategy to use
calculator: Token calculator for counting
"""
self._truncation = truncation or TruncationStrategy(calculator)
self._calculator = calculator
if calculator:
self._truncation.set_calculator(calculator)
def set_calculator(self, calculator: "TokenCalculator") -> None:
"""Set token calculator."""
self._calculator = calculator
self._truncation.set_calculator(calculator)
async def compress_context(
self,
context: BaseContext,
max_tokens: int,
model: str | None = None,
) -> BaseContext:
"""
Compress a single context to fit token limit.
Args:
context: Context to compress
max_tokens: Maximum tokens allowed
model: Model for token counting
Returns:
Compressed context (may be same object if no compression needed)
"""
current_tokens = context.token_count or await self._count_tokens(
context.content, model
)
if current_tokens <= max_tokens:
return context
# Choose strategy based on context type
strategy = self._get_strategy_for_type(context.get_type())
result = await self._truncation.truncate_to_tokens(
content=context.content,
max_tokens=max_tokens,
strategy=strategy,
model=model,
)
# Update context with truncated content
context.content = result.content
context.token_count = result.truncated_tokens
context.metadata["truncated"] = True
context.metadata["original_tokens"] = result.original_tokens
return context
async def compress_contexts(
self,
contexts: list[BaseContext],
budget: "TokenBudget",
model: str | None = None,
) -> list[BaseContext]:
"""
Compress multiple contexts to fit within budget.
Args:
contexts: Contexts to potentially compress
budget: Token budget constraints
model: Model for token counting
Returns:
List of contexts (compressed as needed)
"""
result: list[BaseContext] = []
for context in contexts:
context_type = context.get_type()
remaining = budget.remaining(context_type)
current_tokens = context.token_count or await self._count_tokens(
context.content, model
)
if current_tokens > remaining:
# Need to compress
compressed = await self.compress_context(context, remaining, model)
result.append(compressed)
logger.debug(
f"Compressed {context_type.value} context from "
f"{current_tokens} to {compressed.token_count} tokens"
)
else:
result.append(context)
return result
def _get_strategy_for_type(self, context_type: ContextType) -> str:
"""Get optimal truncation strategy for context type."""
strategies = {
ContextType.SYSTEM: "end", # Keep instructions at start
ContextType.TASK: "end", # Keep task description start
ContextType.KNOWLEDGE: "sentence", # Clean sentence boundaries
ContextType.CONVERSATION: "end", # Keep recent conversation
ContextType.TOOL: "middle", # Keep command and result summary
}
return strategies.get(context_type, "end")
async def _count_tokens(self, text: str, model: str | None = None) -> int:
"""Count tokens using calculator or estimation."""
if self._calculator is not None:
return await self._calculator.count_tokens(text, model)
return max(1, len(text) // 4)

View File

@@ -253,12 +253,19 @@ class AssembledContext:
# Main content
content: str
token_count: int
total_tokens: int
# Assembly metadata
contexts_included: int
contexts_excluded: int = 0
context_count: int
excluded_count: int = 0
assembly_time_ms: float = 0.0
model: str = ""
# Included contexts (optional - for inspection)
contexts: list["BaseContext"] = field(default_factory=list)
# Additional metadata from assembly
metadata: dict[str, Any] = field(default_factory=dict)
# Budget tracking
budget_total: int = 0
@@ -271,6 +278,22 @@ class AssembledContext:
cache_hit: bool = False
cache_key: str | None = None
# Aliases for backward compatibility
@property
def token_count(self) -> int:
"""Alias for total_tokens."""
return self.total_tokens
@property
def contexts_included(self) -> int:
"""Alias for context_count."""
return self.context_count
@property
def contexts_excluded(self) -> int:
"""Alias for excluded_count."""
return self.excluded_count
@property
def budget_utilization(self) -> float:
"""Get budget utilization percentage."""
@@ -282,10 +305,12 @@ class AssembledContext:
"""Convert to dictionary."""
return {
"content": self.content,
"token_count": self.token_count,
"contexts_included": self.contexts_included,
"contexts_excluded": self.contexts_excluded,
"total_tokens": self.total_tokens,
"context_count": self.context_count,
"excluded_count": self.excluded_count,
"assembly_time_ms": round(self.assembly_time_ms, 2),
"model": self.model,
"metadata": self.metadata,
"budget_total": self.budget_total,
"budget_used": self.budget_used,
"budget_utilization": round(self.budget_utilization, 3),
@@ -308,10 +333,12 @@ class AssembledContext:
data = json.loads(json_str)
return cls(
content=data["content"],
token_count=data["token_count"],
contexts_included=data["contexts_included"],
contexts_excluded=data.get("contexts_excluded", 0),
total_tokens=data["total_tokens"],
context_count=data["context_count"],
excluded_count=data.get("excluded_count", 0),
assembly_time_ms=data.get("assembly_time_ms", 0.0),
model=data.get("model", ""),
metadata=data.get("metadata", {}),
budget_total=data.get("budget_total", 0),
budget_used=data.get("budget_used", 0),
by_type=data.get("by_type", {}),