Skip to main content

Cache Performance Tuning Guide

This guide provides comprehensive strategies for optimizing cache performance, including memory management, eviction policies, compression settings, and monitoring techniques.

Table of Contents

Performance Metrics

Key Performance Indicators (KPIs)

Monitor these critical metrics to assess cache performance:

Hit Rate Metrics

  • Overall Hit Rate: Percentage of requests served from cache
  • Layer Hit Rates: Hit rates for each cache layer (embedding, search, LLM)
  • Semantic Hit Rate: Percentage of semantic matches vs exact matches
  • User-Specific Hit Rate: Hit rates per user or user segment

Response Time Metrics

  • Average Response Time: Mean time to serve cached content
  • P95 Response Time: 95th percentile response time
  • P99 Response Time: 99th percentile response time
  • Cache Miss Penalty: Time difference between hits and misses

Memory Metrics

  • Memory Usage: Current memory consumption
  • Memory Efficiency: Ratio of useful data to total memory
  • Fragmentation Ratio: Memory fragmentation level
  • Eviction Rate: Frequency of cache evictions

Throughput Metrics

  • Requests Per Second: Cache request throughput
  • Operations Per Second: Cache operations throughput
  • Warming Throughput: Cache warming operation rate
  • Compression Ratio: Data compression effectiveness

Performance Targets

MetricTargetWarningCritical
Overall Hit Rate>85%<80%<70%
Average Response Time<50ms>100ms>200ms
P95 Response Time<100ms>200ms>500ms
Memory Usage<80%>90%>95%
Eviction Rate<5/min>10/min>20/min
Semantic Hit Rate>60%<50%<40%

Memory Optimization

Memory Allocation Strategy

from packages.caching.optimization import MemoryOptimizer, OptimizationConfig

# Configure memory optimization
memory_config = OptimizationConfig(
memory_limit_bytes=2 * 1024 * 1024 * 1024, # 2GB
memory_cleanup_interval=300, # 5 minutes
memory_pressure_threshold=0.8 # 80%
)

memory_optimizer = MemoryOptimizer(memory_config)

# Optimize cache entries
optimized_entries = memory_optimizer.optimize_memory_usage(cache_entries)

Memory Pressure Detection

# Check memory pressure
current_usage = await cache_manager.get_stats(CacheLayer.EMBEDDING).total_size_bytes
is_pressure = memory_optimizer.check_memory_pressure(current_usage)

if is_pressure:
# Trigger aggressive eviction
await cache_manager.clear_old_entries()
# Enable compression for new entries
config.compression_enabled = True

Memory Profiling

# Get detailed memory statistics
memory_stats = memory_optimizer.get_memory_stats()

print(f"Total allocated: {memory_stats['total_allocated'] / (1024*1024):.2f} MB")
print(f"Total freed: {memory_stats['total_freed'] / (1024*1024):.2f} MB")
print(f"Current usage: {memory_stats['current_usage'] / (1024*1024):.2f} MB")
print(f"Peak usage: {memory_stats['peak_usage'] / (1024*1024):.2f} MB")
print(f"Fragmentation: {memory_stats['fragmentation_ratio']:.2%}")

Eviction Policy Tuning

Policy Selection Guide

PolicyBest ForProsCons
LRUGeneral purpose, predictable access patternsSimple, effective for temporal localityDoesn't consider frequency or cost
LFUFrequently accessed content, stable workloadsGood for popular contentCan get stuck with old popular content
TTLTime-sensitive data, compliance requirementsAutomatic expiration, predictableMay evict useful content
HybridComplex workloads, balanced performanceCombines multiple factorsMore complex to tune
Size-basedMemory-constrained environmentsMaximizes memory efficiencyMay evict important large items
Cost-basedCompute-intensive applicationsConsiders computation costRequires cost estimation

Hybrid Policy Tuning

from packages.caching.optimization import EvictionPolicy, EvictionPolicyManager

# Configure hybrid eviction with custom weights
eviction_config = OptimizationConfig(
eviction_policy=EvictionPolicy.HYBRID,
eviction_threshold=0.85, # Start evicting at 85%
eviction_batch_size=50 # Evict 50 entries at a time
)

eviction_manager = EvictionPolicyManager(eviction_config)

# Custom hybrid scoring (in the implementation)
# LRU factor: 30% weight
# LFU factor: 25% weight
# TTL factor: 20% weight
# Size factor: 15% weight
# Age factor: 10% weight

Dynamic Eviction Tuning

# Adjust eviction threshold based on performance
def adjust_eviction_threshold(cache_stats):
hit_rate = cache_stats.hit_rate
memory_usage = cache_stats.total_size_bytes / config.max_size_bytes

if hit_rate < 0.8 and memory_usage > 0.7:
# Low hit rate, high memory usage - be more aggressive
return 0.7
elif hit_rate > 0.9 and memory_usage < 0.6:
# High hit rate, low memory usage - be less aggressive
return 0.9
else:
# Default threshold
return 0.8

# Apply dynamic threshold
current_threshold = adjust_eviction_threshold(stats)
config.eviction_threshold = current_threshold

Compression Optimization

Compression Algorithm Selection

AlgorithmCompression RatioSpeedCPU UsageBest For
GZIPHighMediumMediumGeneral purpose
ZLIBHighMediumMediumSimilar to GZIP
LZ4MediumVery FastLowHigh-throughput applications
NoneN/AFastestNoneSmall data, fast access

Compression Configuration

from packages.caching.optimization import CompressionEngine, CompressionAlgorithm

# High compression for storage efficiency
high_compression_config = OptimizationConfig(
compression_enabled=True,
compression_threshold_bytes=512, # Compress smaller items
compression_algorithm=CompressionAlgorithm.GZIP,
compression_level=9 # Maximum compression
)

# Fast compression for performance
fast_compression_config = OptimizationConfig(
compression_enabled=True,
compression_threshold_bytes=2048, # Only compress larger items
compression_algorithm=CompressionAlgorithm.LZ4,
compression_level=1 # Fast compression
)

compression_engine = CompressionEngine(fast_compression_config)

Compression Performance Monitoring

# Monitor compression effectiveness
compression_stats = compression_engine.get_stats()

print(f"Total compressed: {compression_stats['total_compressed']}")
print(f"Total decompressed: {compression_stats['total_decompressed']}")
print(f"Average compression ratio: {compression_stats['compression_ratio']:.2%}")
print(f"Time saved: {compression_stats['time_saved_ms']:.2f} ms")

Semantic Matching Tuning

Similarity Threshold Optimization

from packages.caching.semantic import SemanticMatcher, SimilarityMetric

# Test different thresholds
thresholds = [0.7, 0.75, 0.8, 0.85, 0.9, 0.95]
results = []

for threshold in thresholds:
matcher = SemanticMatcher(
similarity_threshold=threshold,
max_candidates=10
)

# Test with your data
hit_rate = test_semantic_matching(matcher, test_queries)
precision = calculate_precision(matcher, test_queries)

results.append({
'threshold': threshold,
'hit_rate': hit_rate,
'precision': precision,
'f1_score': 2 * (hit_rate * precision) / (hit_rate + precision)
})

# Find optimal threshold
best_result = max(results, key=lambda x: x['f1_score'])
optimal_threshold = best_result['threshold']

Similarity Metric Selection

# Test different similarity metrics
metrics = [
SimilarityMetric.COSINE,
SimilarityMetric.EUCLIDEAN,
SimilarityMetric.DOT_PRODUCT,
SimilarityMetric.MANHATTAN
]

for metric in metrics:
matcher = SemanticMatcher(
similarity_threshold=0.85,
max_candidates=10,
metric=metric
)

# Test performance with your data
performance = test_similarity_metric(matcher, test_embeddings)
print(f"{metric.value}: {performance}")

Candidate Selection Tuning

# Optimize number of candidates
candidate_counts = [3, 5, 10, 15, 20]
results = []

for max_candidates in candidate_counts:
matcher = SemanticMatcher(
similarity_threshold=0.85,
max_candidates=max_candidates
)

# Test with your data
hit_rate = test_candidate_selection(matcher, test_queries)
response_time = measure_response_time(matcher, test_queries)

results.append({
'max_candidates': max_candidates,
'hit_rate': hit_rate,
'response_time': response_time,
'efficiency': hit_rate / response_time # Hit rate per ms
})

# Find optimal candidate count
best_result = max(results, key=lambda x: x['efficiency'])
optimal_candidates = best_result['max_candidates']

Cache Warming Optimization

Warming Strategy Tuning

from packages.caching.warming import WarmingStrategy, PredictiveWarmer

# Test different warming strategies
strategies = [
WarmingStrategy(
enabled=True,
batch_size=50,
interval_seconds=1800, # 30 minutes
priority_threshold=0.8
),
WarmingStrategy(
enabled=True,
batch_size=100,
interval_seconds=900, # 15 minutes
priority_threshold=0.7
),
WarmingStrategy(
enabled=True,
batch_size=200,
interval_seconds=3600, # 1 hour
priority_threshold=0.6
)
]

for strategy in strategies:
warmer = PredictiveWarmer(config)
warmer.add_warming_strategy("test", strategy)

# Test warming effectiveness
effectiveness = test_warming_strategy(warmer, test_queries)
print(f"Strategy {strategy.batch_size}/{strategy.interval_seconds}: {effectiveness}")

Pattern Analysis Optimization

from packages.caching.warming import QueryPatternAnalyzer

# Optimize pattern analysis window
windows = [6, 12, 24, 48, 72] # hours
results = []

for window in windows:
analyzer = QueryPatternAnalyzer(config)

# Analyze patterns with different windows
patterns = analyzer.analyze_patterns(
time_window_hours=window,
min_frequency=2
)

# Test pattern quality
quality = evaluate_pattern_quality(patterns)
results.append({
'window': window,
'quality': quality,
'pattern_count': len(patterns.get('frequent_queries', []))
})

# Find optimal window
best_result = max(results, key=lambda x: x['quality'])
optimal_window = best_result['window']

User Behavior Analysis Tuning

from packages.caching.warming import UserBehaviorAnalyzer

# Configure behavior analysis
behavior_analyzer = UserBehaviorAnalyzer(config)

# Track user actions with different contexts
def track_user_action(user_id, action, context):
# Add relevant context for better analysis
enriched_context = {
**context,
'timestamp': datetime.utcnow(),
'session_id': context.get('session_id'),
'user_segment': get_user_segment(user_id),
'query_type': classify_query(context.get('query', ''))
}

behavior_analyzer.track_user_action(user_id, action, enriched_context)

# Analyze behavior with different parameters
behavior = behavior_analyzer.analyze_user_behavior(
user_id="user123",
time_window_hours=24
)

# Use behavior insights for warming
if behavior.get('recurring_queries'):
for query in behavior['recurring_queries'][:5]:
await warm_query(query, user_id="user123")

Distributed Cache Tuning

Cluster Configuration

from packages.caching.distributed import CacheCluster, CacheNode

# Optimize cluster size
def calculate_optimal_cluster_size(expected_load, memory_per_node=1024*1024*1024):
# Calculate based on expected load and memory per node
total_memory_needed = expected_load * 2 # 2x for redundancy
nodes_needed = math.ceil(total_memory_needed / memory_per_node)

# Ensure odd number for quorum
if nodes_needed % 2 == 0:
nodes_needed += 1

return max(3, nodes_needed) # Minimum 3 nodes

optimal_size = calculate_optimal_cluster_size(
expected_load=1000, # requests per second
memory_per_node=2*1024*1024*1024 # 2GB per node
)

Replication Factor Tuning

# Configure replication based on requirements
def calculate_replication_factor(consistency_requirement, availability_requirement):
if consistency_requirement == "strong":
return 3 # Need majority for strong consistency
elif availability_requirement == "high":
return 2 # Balance between consistency and availability
else:
return 1 # No replication for simple setups

replication_factor = calculate_replication_factor(
consistency_requirement="eventual",
availability_requirement="medium"
)

config.replication_factor = replication_factor

Consistency Level Optimization

# Choose consistency level based on use case
consistency_levels = {
"eventual": {
"description": "Best performance, eventual consistency",
"use_cases": ["Analytics", "Recommendations", "Non-critical data"],
"latency": "Low",
"availability": "High"
},
"strong": {
"description": "Immediate consistency, higher latency",
"use_cases": ["Financial data", "User sessions", "Critical operations"],
"latency": "Higher",
"availability": "Lower"
}
}

# Select based on your requirements
if use_case == "financial_data":
config.consistency_level = "strong"
else:
config.consistency_level = "eventual"

Monitoring and Alerting

Performance Monitoring Setup

from packages.caching.monitoring import CacheMonitor, CacheAnalytics, CacheDashboard

# Set up comprehensive monitoring
monitor = CacheMonitor(config)
analytics = CacheAnalytics(monitor)
dashboard = CacheDashboard(monitor, analytics)

await monitor.start()
await analytics.start()
await dashboard.start()

# Configure alerts
def setup_performance_alerts(monitor):
# Low hit rate alert
if monitor.get_metrics(CacheLayer.EMBEDDING).hit_rate < 0.8:
send_alert("Low hit rate detected", severity="warning")

# High response time alert
if monitor.get_metrics(CacheLayer.EMBEDDING).avg_response_time_ms > 100:
send_alert("High response time detected", severity="warning")

# Memory pressure alert
memory_usage = monitor.get_metrics(CacheLayer.EMBEDDING).total_size_bytes
if memory_usage > config.max_size_bytes * 0.9:
send_alert("High memory usage detected", severity="critical")

Custom Metrics

# Define custom metrics for your use case
class CustomCacheMetrics:
def __init__(self):
self.user_satisfaction_score = 0.0
self.cost_savings = 0.0
self.business_impact = 0.0

def calculate_user_satisfaction(self, hit_rate, response_time):
# Custom formula for user satisfaction
satisfaction = hit_rate * (1.0 - min(response_time / 1000, 1.0))
return max(0.0, min(1.0, satisfaction))

def calculate_cost_savings(self, api_calls_saved, cost_per_call):
return api_calls_saved * cost_per_call

def calculate_business_impact(self, satisfaction, cost_savings):
return satisfaction * 0.7 + (cost_savings / 1000) * 0.3

custom_metrics = CustomCacheMetrics()

Performance Testing

Load Testing

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def load_test_cache(cache_manager, num_requests=1000, concurrency=10):
"""Load test the cache system."""

async def make_request(request_id):
start_time = time.time()

# Simulate cache operations
result = await cache_manager.get(f"key_{request_id}", CacheLayer.EMBEDDING)

end_time = time.time()
return {
'request_id': request_id,
'response_time': (end_time - start_time) * 1000,
'hit': isinstance(result, CacheHit)
}

# Run concurrent requests
tasks = [make_request(i) for i in range(num_requests)]
results = await asyncio.gather(*tasks)

# Calculate statistics
response_times = [r['response_time'] for r in results]
hits = [r['hit'] for r in results]

return {
'total_requests': num_requests,
'avg_response_time': sum(response_times) / len(response_times),
'p95_response_time': sorted(response_times)[int(len(response_times) * 0.95)],
'hit_rate': sum(hits) / len(hits),
'throughput': num_requests / max(response_times) * 1000
}

# Run load test
performance_results = await load_test_cache(cache_manager, 1000, 10)
print(f"Performance: {performance_results}")

Stress Testing

async def stress_test_cache(cache_manager, duration_seconds=300):
"""Stress test the cache system."""

start_time = time.time()
request_count = 0
error_count = 0

while time.time() - start_time < duration_seconds:
try:
# Simulate high load
tasks = []
for _ in range(100): # 100 concurrent requests
task = cache_manager.get(f"stress_key_{request_count}", CacheLayer.EMBEDDING)
tasks.append(task)
request_count += 1

await asyncio.gather(*tasks, return_exceptions=True)

except Exception as e:
error_count += 1
print(f"Error during stress test: {e}")

return {
'duration': duration_seconds,
'total_requests': request_count,
'error_count': error_count,
'error_rate': error_count / request_count if request_count > 0 else 0,
'requests_per_second': request_count / duration_seconds
}

# Run stress test
stress_results = await stress_test_cache(cache_manager, 300)
print(f"Stress test results: {stress_results}")

Troubleshooting Performance Issues

Common Performance Problems

1. Low Hit Rate

Symptoms: Hit rate below 80% Causes:

  • Semantic threshold too high
  • Insufficient cache warming
  • Poor query patterns
  • TTL too short

Solutions:

# Lower semantic threshold
config.semantic_threshold = 0.75

# Increase warming frequency
config.warming_interval_seconds = 900 # 15 minutes

# Increase TTL
config.default_ttl_seconds = 7200 # 2 hours

# Improve warming strategy
strategy = WarmingStrategy(
batch_size=200,
priority_threshold=0.6
)

2. High Response Time

Symptoms: Response time above 100ms Causes:

  • Memory pressure
  • Inefficient eviction
  • Poor compression
  • Network latency (distributed)

Solutions:

# Increase memory limit
config.max_size_bytes = 2 * 1024 * 1024 * 1024 # 2GB

# Optimize eviction
config.eviction_policy = "hybrid"
config.eviction_threshold = 0.9

# Use faster compression
config.compression_algorithm = "lz4"

# Optimize network (distributed)
config.consistency_level = "eventual"

3. High Memory Usage

Symptoms: Memory usage above 90% Causes:

  • Cache too large
  • Inefficient compression
  • Memory leaks
  • Poor eviction

Solutions:

# Reduce cache size
config.max_size_bytes = 1024 * 1024 * 1024 # 1GB

# Enable aggressive compression
config.compression_enabled = True
config.compression_threshold_bytes = 512

# More aggressive eviction
config.eviction_threshold = 0.7
config.cleanup_interval_seconds = 60

# Check for memory leaks
memory_stats = memory_optimizer.get_memory_stats()
if memory_stats['fragmentation_ratio'] > 0.3:
# Trigger defragmentation
await cache_manager.defragment()

4. High Eviction Rate

Symptoms: Frequent evictions Causes:

  • Cache too small
  • Poor eviction policy
  • TTL too short
  • Memory pressure

Solutions:

# Increase cache size
config.max_size_bytes = 2 * 1024 * 1024 * 1024 # 2GB

# Optimize eviction policy
config.eviction_policy = "cost_based"

# Increase TTL
config.default_ttl_seconds = 14400 # 4 hours

# Reduce memory pressure
config.eviction_threshold = 0.95

Performance Debugging

# Comprehensive performance debugging
async def debug_cache_performance(cache_manager):
"""Debug cache performance issues."""

# Get all layer statistics
for layer in CacheLayer:
stats = await cache_manager.get_stats(layer)
print(f"\n{layer.value} Layer:")
print(f" Hit Rate: {stats.hit_rate:.2%}")
print(f" Avg Response Time: {stats.avg_retrieval_time_ms:.2f}ms")
print(f" Memory Usage: {stats.total_size_bytes / (1024*1024):.2f}MB")
print(f" Entry Count: {stats.entry_count}")
print(f" Evictions: {stats.evictions}")

# Check semantic matching effectiveness
semantic_stats = semantic_matcher.get_similarity_stats()
print(f"\nSemantic Matching:")
print(f" Cached Embeddings: {semantic_stats['cached_embeddings']}")
print(f" Threshold: {semantic_stats['similarity_threshold']}")
print(f" Max Candidates: {semantic_stats['max_candidates']}")

# Check warming effectiveness
warming_stats = warmer.get_warming_stats()
print(f"\nCache Warming:")
print(f" Total Cycles: {warming_stats['total_warming_cycles']}")
print(f" Items Warmed: {warming_stats['total_items_warmed']}")
print(f" Errors: {warming_stats['total_errors']}")

# Check optimization effectiveness
opt_stats = optimizer.get_optimization_stats()
print(f"\nOptimization:")
print(f" Memory Saved: {opt_stats['memory_saved_bytes'] / (1024*1024):.2f}MB")
print(f" Compression Ratio: {opt_stats['compression_ratio']:.2%}")
print(f" Evictions: {opt_stats['evictions_performed']}")

# Run debugging
await debug_cache_performance(cache_manager)

This performance tuning guide provides comprehensive strategies for optimizing your cache system. Monitor your metrics regularly and adjust settings based on your specific workload patterns and performance requirements.