Cache Performance Tuning Guide
This guide provides comprehensive strategies for optimizing cache performance, including memory management, eviction policies, compression settings, and monitoring techniques.
Table of Contents
- Performance Metrics
- Memory Optimization
- Eviction Policy Tuning
- Compression Optimization
- Semantic Matching Tuning
- Cache Warming Optimization
- Distributed Cache Tuning
- Monitoring and Alerting
- Performance Testing
- Troubleshooting Performance Issues
Performance Metrics
Key Performance Indicators (KPIs)
Monitor these critical metrics to assess cache performance:
Hit Rate Metrics
- Overall Hit Rate: Percentage of requests served from cache
- Layer Hit Rates: Hit rates for each cache layer (embedding, search, LLM)
- Semantic Hit Rate: Percentage of semantic matches vs exact matches
- User-Specific Hit Rate: Hit rates per user or user segment
Response Time Metrics
- Average Response Time: Mean time to serve cached content
- P95 Response Time: 95th percentile response time
- P99 Response Time: 99th percentile response time
- Cache Miss Penalty: Time difference between hits and misses
Memory Metrics
- Memory Usage: Current memory consumption
- Memory Efficiency: Ratio of useful data to total memory
- Fragmentation Ratio: Memory fragmentation level
- Eviction Rate: Frequency of cache evictions
Throughput Metrics
- Requests Per Second: Cache request throughput
- Operations Per Second: Cache operations throughput
- Warming Throughput: Cache warming operation rate
- Compression Ratio: Data compression effectiveness
Performance Targets
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Overall Hit Rate | >85% | <80% | <70% |
| Average Response Time | <50ms | >100ms | >200ms |
| P95 Response Time | <100ms | >200ms | >500ms |
| Memory Usage | <80% | >90% | >95% |
| Eviction Rate | <5/min | >10/min | >20/min |
| Semantic Hit Rate | >60% | <50% | <40% |
Memory Optimization
Memory Allocation Strategy
from packages.caching.optimization import MemoryOptimizer, OptimizationConfig
# Configure memory optimization
memory_config = OptimizationConfig(
memory_limit_bytes=2 * 1024 * 1024 * 1024, # 2GB
memory_cleanup_interval=300, # 5 minutes
memory_pressure_threshold=0.8 # 80%
)
memory_optimizer = MemoryOptimizer(memory_config)
# Optimize cache entries
optimized_entries = memory_optimizer.optimize_memory_usage(cache_entries)
Memory Pressure Detection
# Check memory pressure
current_usage = await cache_manager.get_stats(CacheLayer.EMBEDDING).total_size_bytes
is_pressure = memory_optimizer.check_memory_pressure(current_usage)
if is_pressure:
# Trigger aggressive eviction
await cache_manager.clear_old_entries()
# Enable compression for new entries
config.compression_enabled = True
Memory Profiling
# Get detailed memory statistics
memory_stats = memory_optimizer.get_memory_stats()
print(f"Total allocated: {memory_stats['total_allocated'] / (1024*1024):.2f} MB")
print(f"Total freed: {memory_stats['total_freed'] / (1024*1024):.2f} MB")
print(f"Current usage: {memory_stats['current_usage'] / (1024*1024):.2f} MB")
print(f"Peak usage: {memory_stats['peak_usage'] / (1024*1024):.2f} MB")
print(f"Fragmentation: {memory_stats['fragmentation_ratio']:.2%}")
Eviction Policy Tuning
Policy Selection Guide
| Policy | Best For | Pros | Cons |
|---|---|---|---|
| LRU | General purpose, predictable access patterns | Simple, effective for temporal locality | Doesn't consider frequency or cost |
| LFU | Frequently accessed content, stable workloads | Good for popular content | Can get stuck with old popular content |
| TTL | Time-sensitive data, compliance requirements | Automatic expiration, predictable | May evict useful content |
| Hybrid | Complex workloads, balanced performance | Combines multiple factors | More complex to tune |
| Size-based | Memory-constrained environments | Maximizes memory efficiency | May evict important large items |
| Cost-based | Compute-intensive applications | Considers computation cost | Requires cost estimation |
Hybrid Policy Tuning
from packages.caching.optimization import EvictionPolicy, EvictionPolicyManager
# Configure hybrid eviction with custom weights
eviction_config = OptimizationConfig(
eviction_policy=EvictionPolicy.HYBRID,
eviction_threshold=0.85, # Start evicting at 85%
eviction_batch_size=50 # Evict 50 entries at a time
)
eviction_manager = EvictionPolicyManager(eviction_config)
# Custom hybrid scoring (in the implementation)
# LRU factor: 30% weight
# LFU factor: 25% weight
# TTL factor: 20% weight
# Size factor: 15% weight
# Age factor: 10% weight
Dynamic Eviction Tuning
# Adjust eviction threshold based on performance
def adjust_eviction_threshold(cache_stats):
hit_rate = cache_stats.hit_rate
memory_usage = cache_stats.total_size_bytes / config.max_size_bytes
if hit_rate < 0.8 and memory_usage > 0.7:
# Low hit rate, high memory usage - be more aggressive
return 0.7
elif hit_rate > 0.9 and memory_usage < 0.6:
# High hit rate, low memory usage - be less aggressive
return 0.9
else:
# Default threshold
return 0.8
# Apply dynamic threshold
current_threshold = adjust_eviction_threshold(stats)
config.eviction_threshold = current_threshold
Compression Optimization
Compression Algorithm Selection
| Algorithm | Compression Ratio | Speed | CPU Usage | Best For |
|---|---|---|---|---|
| GZIP | High | Medium | Medium | General purpose |
| ZLIB | High | Medium | Medium | Similar to GZIP |
| LZ4 | Medium | Very Fast | Low | High-throughput applications |
| None | N/A | Fastest | None | Small data, fast access |
Compression Configuration
from packages.caching.optimization import CompressionEngine, CompressionAlgorithm
# High compression for storage efficiency
high_compression_config = OptimizationConfig(
compression_enabled=True,
compression_threshold_bytes=512, # Compress smaller items
compression_algorithm=CompressionAlgorithm.GZIP,
compression_level=9 # Maximum compression
)
# Fast compression for performance
fast_compression_config = OptimizationConfig(
compression_enabled=True,
compression_threshold_bytes=2048, # Only compress larger items
compression_algorithm=CompressionAlgorithm.LZ4,
compression_level=1 # Fast compression
)
compression_engine = CompressionEngine(fast_compression_config)
Compression Performance Monitoring
# Monitor compression effectiveness
compression_stats = compression_engine.get_stats()
print(f"Total compressed: {compression_stats['total_compressed']}")
print(f"Total decompressed: {compression_stats['total_decompressed']}")
print(f"Average compression ratio: {compression_stats['compression_ratio']:.2%}")
print(f"Time saved: {compression_stats['time_saved_ms']:.2f} ms")
Semantic Matching Tuning
Similarity Threshold Optimization
from packages.caching.semantic import SemanticMatcher, SimilarityMetric
# Test different thresholds
thresholds = [0.7, 0.75, 0.8, 0.85, 0.9, 0.95]
results = []
for threshold in thresholds:
matcher = SemanticMatcher(
similarity_threshold=threshold,
max_candidates=10
)
# Test with your data
hit_rate = test_semantic_matching(matcher, test_queries)
precision = calculate_precision(matcher, test_queries)
results.append({
'threshold': threshold,
'hit_rate': hit_rate,
'precision': precision,
'f1_score': 2 * (hit_rate * precision) / (hit_rate + precision)
})
# Find optimal threshold
best_result = max(results, key=lambda x: x['f1_score'])
optimal_threshold = best_result['threshold']
Similarity Metric Selection
# Test different similarity metrics
metrics = [
SimilarityMetric.COSINE,
SimilarityMetric.EUCLIDEAN,
SimilarityMetric.DOT_PRODUCT,
SimilarityMetric.MANHATTAN
]
for metric in metrics:
matcher = SemanticMatcher(
similarity_threshold=0.85,
max_candidates=10,
metric=metric
)
# Test performance with your data
performance = test_similarity_metric(matcher, test_embeddings)
print(f"{metric.value}: {performance}")
Candidate Selection Tuning
# Optimize number of candidates
candidate_counts = [3, 5, 10, 15, 20]
results = []
for max_candidates in candidate_counts:
matcher = SemanticMatcher(
similarity_threshold=0.85,
max_candidates=max_candidates
)
# Test with your data
hit_rate = test_candidate_selection(matcher, test_queries)
response_time = measure_response_time(matcher, test_queries)
results.append({
'max_candidates': max_candidates,
'hit_rate': hit_rate,
'response_time': response_time,
'efficiency': hit_rate / response_time # Hit rate per ms
})
# Find optimal candidate count
best_result = max(results, key=lambda x: x['efficiency'])
optimal_candidates = best_result['max_candidates']
Cache Warming Optimization
Warming Strategy Tuning
from packages.caching.warming import WarmingStrategy, PredictiveWarmer
# Test different warming strategies
strategies = [
WarmingStrategy(
enabled=True,
batch_size=50,
interval_seconds=1800, # 30 minutes
priority_threshold=0.8
),
WarmingStrategy(
enabled=True,
batch_size=100,
interval_seconds=900, # 15 minutes
priority_threshold=0.7
),
WarmingStrategy(
enabled=True,
batch_size=200,
interval_seconds=3600, # 1 hour
priority_threshold=0.6
)
]
for strategy in strategies:
warmer = PredictiveWarmer(config)
warmer.add_warming_strategy("test", strategy)
# Test warming effectiveness
effectiveness = test_warming_strategy(warmer, test_queries)
print(f"Strategy {strategy.batch_size}/{strategy.interval_seconds}: {effectiveness}")
Pattern Analysis Optimization
from packages.caching.warming import QueryPatternAnalyzer
# Optimize pattern analysis window
windows = [6, 12, 24, 48, 72] # hours
results = []
for window in windows:
analyzer = QueryPatternAnalyzer(config)
# Analyze patterns with different windows
patterns = analyzer.analyze_patterns(
time_window_hours=window,
min_frequency=2
)
# Test pattern quality
quality = evaluate_pattern_quality(patterns)
results.append({
'window': window,
'quality': quality,
'pattern_count': len(patterns.get('frequent_queries', []))
})
# Find optimal window
best_result = max(results, key=lambda x: x['quality'])
optimal_window = best_result['window']
User Behavior Analysis Tuning
from packages.caching.warming import UserBehaviorAnalyzer
# Configure behavior analysis
behavior_analyzer = UserBehaviorAnalyzer(config)
# Track user actions with different contexts
def track_user_action(user_id, action, context):
# Add relevant context for better analysis
enriched_context = {
**context,
'timestamp': datetime.utcnow(),
'session_id': context.get('session_id'),
'user_segment': get_user_segment(user_id),
'query_type': classify_query(context.get('query', ''))
}
behavior_analyzer.track_user_action(user_id, action, enriched_context)
# Analyze behavior with different parameters
behavior = behavior_analyzer.analyze_user_behavior(
user_id="user123",
time_window_hours=24
)
# Use behavior insights for warming
if behavior.get('recurring_queries'):
for query in behavior['recurring_queries'][:5]:
await warm_query(query, user_id="user123")
Distributed Cache Tuning
Cluster Configuration
from packages.caching.distributed import CacheCluster, CacheNode
# Optimize cluster size
def calculate_optimal_cluster_size(expected_load, memory_per_node=1024*1024*1024):
# Calculate based on expected load and memory per node
total_memory_needed = expected_load * 2 # 2x for redundancy
nodes_needed = math.ceil(total_memory_needed / memory_per_node)
# Ensure odd number for quorum
if nodes_needed % 2 == 0:
nodes_needed += 1
return max(3, nodes_needed) # Minimum 3 nodes
optimal_size = calculate_optimal_cluster_size(
expected_load=1000, # requests per second
memory_per_node=2*1024*1024*1024 # 2GB per node
)
Replication Factor Tuning
# Configure replication based on requirements
def calculate_replication_factor(consistency_requirement, availability_requirement):
if consistency_requirement == "strong":
return 3 # Need majority for strong consistency
elif availability_requirement == "high":
return 2 # Balance between consistency and availability
else:
return 1 # No replication for simple setups
replication_factor = calculate_replication_factor(
consistency_requirement="eventual",
availability_requirement="medium"
)
config.replication_factor = replication_factor
Consistency Level Optimization
# Choose consistency level based on use case
consistency_levels = {
"eventual": {
"description": "Best performance, eventual consistency",
"use_cases": ["Analytics", "Recommendations", "Non-critical data"],
"latency": "Low",
"availability": "High"
},
"strong": {
"description": "Immediate consistency, higher latency",
"use_cases": ["Financial data", "User sessions", "Critical operations"],
"latency": "Higher",
"availability": "Lower"
}
}
# Select based on your requirements
if use_case == "financial_data":
config.consistency_level = "strong"
else:
config.consistency_level = "eventual"
Monitoring and Alerting
Performance Monitoring Setup
from packages.caching.monitoring import CacheMonitor, CacheAnalytics, CacheDashboard
# Set up comprehensive monitoring
monitor = CacheMonitor(config)
analytics = CacheAnalytics(monitor)
dashboard = CacheDashboard(monitor, analytics)
await monitor.start()
await analytics.start()
await dashboard.start()
# Configure alerts
def setup_performance_alerts(monitor):
# Low hit rate alert
if monitor.get_metrics(CacheLayer.EMBEDDING).hit_rate < 0.8:
send_alert("Low hit rate detected", severity="warning")
# High response time alert
if monitor.get_metrics(CacheLayer.EMBEDDING).avg_response_time_ms > 100:
send_alert("High response time detected", severity="warning")
# Memory pressure alert
memory_usage = monitor.get_metrics(CacheLayer.EMBEDDING).total_size_bytes
if memory_usage > config.max_size_bytes * 0.9:
send_alert("High memory usage detected", severity="critical")
Custom Metrics
# Define custom metrics for your use case
class CustomCacheMetrics:
def __init__(self):
self.user_satisfaction_score = 0.0
self.cost_savings = 0.0
self.business_impact = 0.0
def calculate_user_satisfaction(self, hit_rate, response_time):
# Custom formula for user satisfaction
satisfaction = hit_rate * (1.0 - min(response_time / 1000, 1.0))
return max(0.0, min(1.0, satisfaction))
def calculate_cost_savings(self, api_calls_saved, cost_per_call):
return api_calls_saved * cost_per_call
def calculate_business_impact(self, satisfaction, cost_savings):
return satisfaction * 0.7 + (cost_savings / 1000) * 0.3
custom_metrics = CustomCacheMetrics()
Performance Testing
Load Testing
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
async def load_test_cache(cache_manager, num_requests=1000, concurrency=10):
"""Load test the cache system."""
async def make_request(request_id):
start_time = time.time()
# Simulate cache operations
result = await cache_manager.get(f"key_{request_id}", CacheLayer.EMBEDDING)
end_time = time.time()
return {
'request_id': request_id,
'response_time': (end_time - start_time) * 1000,
'hit': isinstance(result, CacheHit)
}
# Run concurrent requests
tasks = [make_request(i) for i in range(num_requests)]
results = await asyncio.gather(*tasks)
# Calculate statistics
response_times = [r['response_time'] for r in results]
hits = [r['hit'] for r in results]
return {
'total_requests': num_requests,
'avg_response_time': sum(response_times) / len(response_times),
'p95_response_time': sorted(response_times)[int(len(response_times) * 0.95)],
'hit_rate': sum(hits) / len(hits),
'throughput': num_requests / max(response_times) * 1000
}
# Run load test
performance_results = await load_test_cache(cache_manager, 1000, 10)
print(f"Performance: {performance_results}")
Stress Testing
async def stress_test_cache(cache_manager, duration_seconds=300):
"""Stress test the cache system."""
start_time = time.time()
request_count = 0
error_count = 0
while time.time() - start_time < duration_seconds:
try:
# Simulate high load
tasks = []
for _ in range(100): # 100 concurrent requests
task = cache_manager.get(f"stress_key_{request_count}", CacheLayer.EMBEDDING)
tasks.append(task)
request_count += 1
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
error_count += 1
print(f"Error during stress test: {e}")
return {
'duration': duration_seconds,
'total_requests': request_count,
'error_count': error_count,
'error_rate': error_count / request_count if request_count > 0 else 0,
'requests_per_second': request_count / duration_seconds
}
# Run stress test
stress_results = await stress_test_cache(cache_manager, 300)
print(f"Stress test results: {stress_results}")
Troubleshooting Performance Issues
Common Performance Problems
1. Low Hit Rate
Symptoms: Hit rate below 80% Causes:
- Semantic threshold too high
- Insufficient cache warming
- Poor query patterns
- TTL too short
Solutions:
# Lower semantic threshold
config.semantic_threshold = 0.75
# Increase warming frequency
config.warming_interval_seconds = 900 # 15 minutes
# Increase TTL
config.default_ttl_seconds = 7200 # 2 hours
# Improve warming strategy
strategy = WarmingStrategy(
batch_size=200,
priority_threshold=0.6
)
2. High Response Time
Symptoms: Response time above 100ms Causes:
- Memory pressure
- Inefficient eviction
- Poor compression
- Network latency (distributed)
Solutions:
# Increase memory limit
config.max_size_bytes = 2 * 1024 * 1024 * 1024 # 2GB
# Optimize eviction
config.eviction_policy = "hybrid"
config.eviction_threshold = 0.9
# Use faster compression
config.compression_algorithm = "lz4"
# Optimize network (distributed)
config.consistency_level = "eventual"
3. High Memory Usage
Symptoms: Memory usage above 90% Causes:
- Cache too large
- Inefficient compression
- Memory leaks
- Poor eviction
Solutions:
# Reduce cache size
config.max_size_bytes = 1024 * 1024 * 1024 # 1GB
# Enable aggressive compression
config.compression_enabled = True
config.compression_threshold_bytes = 512
# More aggressive eviction
config.eviction_threshold = 0.7
config.cleanup_interval_seconds = 60
# Check for memory leaks
memory_stats = memory_optimizer.get_memory_stats()
if memory_stats['fragmentation_ratio'] > 0.3:
# Trigger defragmentation
await cache_manager.defragment()
4. High Eviction Rate
Symptoms: Frequent evictions Causes:
- Cache too small
- Poor eviction policy
- TTL too short
- Memory pressure
Solutions:
# Increase cache size
config.max_size_bytes = 2 * 1024 * 1024 * 1024 # 2GB
# Optimize eviction policy
config.eviction_policy = "cost_based"
# Increase TTL
config.default_ttl_seconds = 14400 # 4 hours
# Reduce memory pressure
config.eviction_threshold = 0.95
Performance Debugging
# Comprehensive performance debugging
async def debug_cache_performance(cache_manager):
"""Debug cache performance issues."""
# Get all layer statistics
for layer in CacheLayer:
stats = await cache_manager.get_stats(layer)
print(f"\n{layer.value} Layer:")
print(f" Hit Rate: {stats.hit_rate:.2%}")
print(f" Avg Response Time: {stats.avg_retrieval_time_ms:.2f}ms")
print(f" Memory Usage: {stats.total_size_bytes / (1024*1024):.2f}MB")
print(f" Entry Count: {stats.entry_count}")
print(f" Evictions: {stats.evictions}")
# Check semantic matching effectiveness
semantic_stats = semantic_matcher.get_similarity_stats()
print(f"\nSemantic Matching:")
print(f" Cached Embeddings: {semantic_stats['cached_embeddings']}")
print(f" Threshold: {semantic_stats['similarity_threshold']}")
print(f" Max Candidates: {semantic_stats['max_candidates']}")
# Check warming effectiveness
warming_stats = warmer.get_warming_stats()
print(f"\nCache Warming:")
print(f" Total Cycles: {warming_stats['total_warming_cycles']}")
print(f" Items Warmed: {warming_stats['total_items_warmed']}")
print(f" Errors: {warming_stats['total_errors']}")
# Check optimization effectiveness
opt_stats = optimizer.get_optimization_stats()
print(f"\nOptimization:")
print(f" Memory Saved: {opt_stats['memory_saved_bytes'] / (1024*1024):.2f}MB")
print(f" Compression Ratio: {opt_stats['compression_ratio']:.2%}")
print(f" Evictions: {opt_stats['evictions_performed']}")
# Run debugging
await debug_cache_performance(cache_manager)
This performance tuning guide provides comprehensive strategies for optimizing your cache system. Monitor your metrics regularly and adjust settings based on your specific workload patterns and performance requirements.