style(memory): apply ruff formatting and linting fixes

Auto-fixed linting errors and formatting issues:
- Removed unused imports (F401): pytest, Any, AnalysisType, MemoryType, OutcomeType
- Removed unused variable (F841): hooks variable in test
- Applied consistent formatting across memory service and test files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-05 14:07:48 +01:00
parent e3fe0439fd
commit cf6291ac8e
17 changed files with 236 additions and 185 deletions

View File

@@ -149,8 +149,7 @@ class MemoryReflection:
# Filter to time range
episodes = [
e for e in episodes
if time_range.start <= e.occurred_at <= time_range.end
e for e in episodes if time_range.start <= e.occurred_at <= time_range.end
]
if not episodes:
@@ -313,7 +312,9 @@ class MemoryReflection:
f"Task type '{task_type}': {success_rate:.0%} success rate, "
f"avg {avg_duration:.1f}s duration, {avg_tokens:.0f} tokens"
),
confidence=min(1.0, stats["total"] / 10), # Higher sample = higher confidence
confidence=min(
1.0, stats["total"] / 10
), # Higher sample = higher confidence
occurrence_count=stats["total"],
episode_ids=[e.id for e in stats["episodes"]],
first_seen=min(e.occurred_at for e in stats["episodes"]),
@@ -397,7 +398,9 @@ class MemoryReflection:
failed = [e for e in episodes if e.outcome == Outcome.FAILURE]
if len(successful) >= 3 and len(failed) >= 3:
avg_success_duration = statistics.mean(e.duration_seconds for e in successful)
avg_success_duration = statistics.mean(
e.duration_seconds for e in successful
)
avg_failure_duration = statistics.mean(e.duration_seconds for e in failed)
if avg_failure_duration > avg_success_duration * 1.5:
@@ -409,7 +412,7 @@ class MemoryReflection:
description=(
f"Failed tasks average {avg_failure_duration:.1f}s vs "
f"{avg_success_duration:.1f}s for successful tasks "
f"({avg_failure_duration/avg_success_duration:.1f}x longer)"
f"({avg_failure_duration / avg_success_duration:.1f}x longer)"
),
confidence=0.8,
occurrence_count=len(successful) + len(failed),
@@ -427,9 +430,15 @@ class MemoryReflection:
# Analyze token efficiency
if len(successful) >= 3:
avg_tokens = statistics.mean(e.tokens_used for e in successful)
std_tokens = statistics.stdev(e.tokens_used for e in successful) if len(successful) > 1 else 0
std_tokens = (
statistics.stdev(e.tokens_used for e in successful)
if len(successful) > 1
else 0
)
efficient = [e for e in successful if e.tokens_used < avg_tokens - std_tokens]
efficient = [
e for e in successful if e.tokens_used < avg_tokens - std_tokens
]
if len(efficient) >= self._config.min_pattern_occurrences:
patterns.append(
Pattern(
@@ -508,8 +517,7 @@ class MemoryReflection:
# Filter to time range
episodes = [
e for e in episodes
if time_range.start <= e.occurred_at <= time_range.end
e for e in episodes if time_range.start <= e.occurred_at <= time_range.end
]
if len(episodes) < self._config.min_sample_size_for_factor:
@@ -652,9 +660,7 @@ class MemoryReflection:
avg_success_duration = statistics.mean(
e.duration_seconds for e in successful
)
avg_failure_duration = statistics.mean(
e.duration_seconds for e in failed
)
avg_failure_duration = statistics.mean(e.duration_seconds for e in failed)
if avg_success_duration > 0:
duration_ratio = avg_failure_duration / avg_success_duration
@@ -837,7 +843,9 @@ class MemoryReflection:
baseline_durations = [e.duration_seconds for e in baseline]
baseline_mean = statistics.mean(baseline_durations)
baseline_std = statistics.stdev(baseline_durations) if len(baseline_durations) > 1 else 0
baseline_std = (
statistics.stdev(baseline_durations) if len(baseline_durations) > 1 else 0
)
if baseline_std == 0:
return anomalies
@@ -997,7 +1005,10 @@ class MemoryReflection:
) / len(recent)
# Detect significant failure rate increase
if recent_failure_rate > baseline_failure_rate * 1.5 and recent_failure_rate > 0.3:
if (
recent_failure_rate > baseline_failure_rate * 1.5
and recent_failure_rate > 0.3
):
rate_increase = recent_failure_rate / max(baseline_failure_rate, 0.01)
anomalies.append(
@@ -1074,14 +1085,11 @@ class MemoryReflection:
insights.extend(self._insights_from_anomalies(anomalies))
# Generate cross-cutting insights
insights.extend(
self._generate_cross_insights(patterns, factors, anomalies)
)
insights.extend(self._generate_cross_insights(patterns, factors, anomalies))
# Filter by confidence and sort by priority
insights = [
i for i in insights
if i.confidence >= self._config.min_insight_confidence
i for i in insights if i.confidence >= self._config.min_insight_confidence
]
insights.sort(key=lambda i: -i.priority)
@@ -1182,9 +1190,7 @@ class MemoryReflection:
source_patterns=[],
source_factors=[f.id for f in top_positive],
source_anomalies=[],
recommended_actions=[
f"Reinforce: {f.name}" for f in top_positive
],
recommended_actions=[f"Reinforce: {f.name}" for f in top_positive],
generated_at=_utcnow(),
metadata={
"factors": [f.to_dict() for f in top_positive],
@@ -1200,17 +1206,16 @@ class MemoryReflection:
insight_type=InsightType.WARNING,
title="Factors correlating with failure",
description=(
"Risky factors: "
+ ", ".join(f.name for f in top_negative)
"Risky factors: " + ", ".join(f.name for f in top_negative)
),
priority=0.75,
confidence=statistics.mean(abs(f.correlation) for f in top_negative),
confidence=statistics.mean(
abs(f.correlation) for f in top_negative
),
source_patterns=[],
source_factors=[f.id for f in top_negative],
source_anomalies=[],
recommended_actions=[
f"Mitigate: {f.name}" for f in top_negative
],
recommended_actions=[f"Mitigate: {f.name}" for f in top_negative],
generated_at=_utcnow(),
metadata={
"factors": [f.to_dict() for f in top_negative],
@@ -1254,8 +1259,7 @@ class MemoryReflection:
)
failure_rate_anomalies = [
a for a in anomalies
if a.anomaly_type == AnomalyType.UNUSUAL_FAILURE_RATE
a for a in anomalies if a.anomaly_type == AnomalyType.UNUSUAL_FAILURE_RATE
]
if failure_rate_anomalies:
for anomaly in failure_rate_anomalies:
@@ -1295,7 +1299,13 @@ class MemoryReflection:
total_items = len(patterns) + len(factors) + len(anomalies)
if total_items > 0:
warning_count = (
len([p for p in patterns if p.pattern_type == PatternType.RECURRING_FAILURE])
len(
[
p
for p in patterns
if p.pattern_type == PatternType.RECURRING_FAILURE
]
)
+ len([a for a in anomalies if a.is_critical])
+ len([f for f in factors if f.correlation < -0.3])
)
@@ -1312,13 +1322,19 @@ class MemoryReflection:
f"Found {warning_count} warning indicators."
),
priority=0.6,
confidence=min(1.0, total_items / 20), # Higher sample = higher confidence
confidence=min(
1.0, total_items / 20
), # Higher sample = higher confidence
source_patterns=[p.id for p in patterns[:5]],
source_factors=[f.id for f in factors[:5]],
source_anomalies=[a.id for a in anomalies[:5]],
recommended_actions=(
["Continue current practices"] if health_score > 0.7
else ["Review warnings and address issues", "Focus on improvement areas"]
["Continue current practices"]
if health_score > 0.7
else [
"Review warnings and address issues",
"Focus on improvement areas",
]
),
generated_at=_utcnow(),
metadata={
@@ -1374,8 +1390,7 @@ class MemoryReflection:
agent_instance_id=agent_instance_id,
)
episodes_in_range = [
e for e in episodes
if time_range.start <= e.occurred_at <= time_range.end
e for e in episodes if time_range.start <= e.occurred_at <= time_range.end
]
# Run all analyses

View File

@@ -70,8 +70,7 @@ class TimeRange:
"""Create time range for last N hours."""
end = _utcnow()
start = datetime(
end.year, end.month, end.day, end.hour, end.minute, end.second,
tzinfo=UTC
end.year, end.month, end.day, end.hour, end.minute, end.second, tzinfo=UTC
) - __import__("datetime").timedelta(hours=hours)
return cls(start=start, end=end)