Implement Caching for Performance & Cost Savings
Difficulty: ⭐⭐ Intermediate | Time: 1 hour
🎯 The Problem
You're making expensive LLM calls for repeated questions, wasting money and time. Same query from different users = same API call = unnecessary costs. Response times are slow (2-3s) when they could be instant for cached queries.
This guide solves: Implementing multi-layer caching (response cache, embedding cache, semantic cache) to reduce costs by 30-50% and improve response times by 10x for repeated queries.
⚡ TL;DR - Quick Caching
from packages.caching import ResponseCache, EmbeddingCache
# 1. Enable response caching
response_cache = ResponseCache(
backend="redis",
redis_url="redis://localhost:6379",
ttl=3600 # Cache for 1 hour
)
# 2. Enable embedding caching
embedding_cache = EmbeddingCache(ttl=86400) # Cache for 24 hours
# 3. Use with agent
@app.post("/api/query")
async def query(request: QueryRequest):
# Check cache first
cached = await response_cache.get(request.query)
if cached:
return cached # Instant response! (10ms vs 2000ms)
# Generate fresh response
result = await agent.run(request.query)
# Cache for next time
await response_cache.set(request.query, result)
return result
# Expected savings: 30-50% cost reduction, 10x faster for cache hits
Impact: $15,000/month → $8,000/month in LLM costs!
Full Caching Guide
Caching Strategy Overview
Cache Layer 1: Response Cache
Purpose: Cache exact query-response pairs
Hit Rate: 20-30% for common queries
Savings: $0.025 → $0 per cached query
Speed: 2000ms → 10ms (200x faster)
Implementation
from packages.caching import ResponseCache
import hashlib
class ResponseCache:
def __init__(self, redis_client, ttl=3600):
self.redis = redis_client
self.ttl = ttl
def _make_key(self, query: str, user_context: dict = None) -> str:
"""Create cache key"""
# Include user context for personalized caching
cache_input = f"{query}_{user_context.get('domain', 'general')}"
return f"response:{hashlib.md5(cache_input.encode()).hexdigest()}"
async def get(self, query: str, user_context: dict = None):
"""Get cached response"""
key = self._make_key(query, user_context)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
return None
async def set(self, query: str, response: dict, user_context: dict = None):
"""Cache response"""
key = self._make_key(query, user_context)
await self.redis.setex(
key,
self.ttl,
json.dumps(response)
)
Cache Layer 2: Semantic Cache
Purpose: Return cached responses for similar queries
Hit Rate: 40-50% when combined with exact cache
Savings: Significant (avoids LLM calls)
Speed: ~50ms (40x faster)
Implementation
from packages.caching import SemanticCache
class SemanticCache:
def __init__(self, vector_store, similarity_threshold=0.95):
self.vector_store = vector_store
self.similarity_threshold = similarity_threshold
async def get(self, query: str):
"""Find semantically similar cached query"""
# Search for similar queries
similar = await self.vector_store.search(
query=query,
index="cached_queries",
k=1
)
if similar and similar[0].score >= self.similarity_threshold:
# Found similar enough query
cached_query_id = similar[0].metadata['query_id']
return await self.get_response_by_id(cached_query_id)
return None
async def set(self, query: str, response: dict):
"""Cache query and response"""
query_id = generate_id()
# Store response
await self.storage.set(f"response:{query_id}", response)
# Index query for semantic search
await self.vector_store.add_document(
id=query_id,
content=query,
metadata={"query_id": query_id, "type": "cached_query"}
)
Cache Layer 3: Embedding Cache
Purpose: Cache expensive embedding computations
Hit Rate: 60-70% for repeated documents
Savings: $0.005 per cached embedding
Speed: 200ms → 5ms (40x faster)
Implementation
from packages.caching import EmbeddingCache
class EmbeddingCache:
def __init__(self, redis_client, ttl=86400): # 24 hours
self.redis = redis_client
self.ttl = ttl
async def get_embedding(self, text: str, model: str):
"""Get cached embedding"""
key = f"emb:{model}:{hashlib.md5(text.encode()).hexdigest()}"
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
# Generate fresh embedding
embedding = await generate_embedding(text, model)
# Cache it
await self.redis.setex(key, self.ttl, json.dumps(embedding))
return embedding
Complete Caching Setup
from packages.caching import MultiLayerCache
import redis.asyncio as redis
# Initialize Redis
redis_client = await redis.from_url("redis://localhost:6379")
# Set up multi-layer caching
cache = MultiLayerCache(
redis_client=redis_client,
response_ttl=3600, # 1 hour for responses
embedding_ttl=86400, # 24 hours for embeddings
semantic_threshold=0.95 # 95% similarity for semantic cache
)
# Use in your API
@app.post("/api/query")
async def query(request: QueryRequest):
# Try all cache layers
cached_response = await cache.get(request.query)
if cached_response:
return {
**cached_response,
"cached": True,
"latency_ms": cached_response.get("cache_latency", 10)
}
# Generate fresh response
result = await agent.run(request.query)
# Cache for future requests
await cache.set(request.query, result)
return result
Cache Invalidation
Time-Based Invalidation
# Automatic TTL expiration
cache.set(query, response, ttl=3600) # Expires in 1 hour
Manual Invalidation
# Invalidate when content updates
async def update_knowledge_base(new_docs):
"""Clear cache when KB updates"""
await vector_store.add_documents(new_docs)
# Clear response cache (knowledge changed)
await response_cache.clear()
# Keep embedding cache (embeddings still valid)
print("✅ Knowledge base updated, response cache cleared")
Selective Invalidation
# Invalidate specific patterns
await cache.delete_pattern("response:*medical*") # Clear medical responses
await cache.delete_pattern("response:*outdated_product*") # Clear specific products
Monitoring Cache Performance
# Cache metrics
metrics = await cache.get_stats()
print(f"""
Cache Performance:
Hit Rate: {metrics['hit_rate']:.1%} (target: >40%)
Total Hits: {metrics['hits']:,}
Total Misses: {metrics['misses']:,}
Avg Hit Time: {metrics['avg_hit_latency']}ms
Cost Saved: ${metrics['cost_saved']:.2f}
Response Cache: {metrics['response_hit_rate']:.1%} hit rate
Semantic Cache: {metrics['semantic_hit_rate']:.1%} hit rate
Embedding Cache: {metrics['embedding_hit_rate']:.1%} hit rate
""")
Cost Impact Analysis
Before Caching
10,000 queries/day × $0.025/query = $250/day = $7,500/month
Average latency: 2000ms
After Caching (40% hit rate)
6,000 fresh queries × $0.025 = $150/day
4,000 cached queries × $0 = $0/day
Total: $150/day = $4,500/month
Savings: $3,000/month (40%)
Average latency: 1240ms (38% faster)
- 40% at 10ms (cached)
- 60% at 2000ms (fresh)
Best Practices
| Practice | Why | Implementation |
|---|---|---|
| Cache by user tier | Different TTLs for different users | Free: 10min, Pro: 1hour, Enterprise: 24hours |
| Monitor hit rates | Optimize cache strategy | Alert if hit rate < 30% |
| Warm cache | Preload common queries | Background job for FAQ |
| Set max cache size | Prevent memory issues | LRU eviction at 10GB |
| Use compression | Store more in same space | gzip responses |
| Version cache keys | Invalidate on code changes | Include version in key |
Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Low hit rate (<20%) | Queries too unique | Implement semantic cache |
| Stale responses | TTL too long | Reduce TTL or manual invalidation |
| High memory usage | Cache too large | Set max size, use LRU eviction |
| Cache thrashing | Eviction too frequent | Increase cache size |
| Slow cache lookups | Network latency | Use local Redis or connection pooling |
Advanced Caching Features
GPTCache Integration
For production-grade semantic caching with sub-50ms hits and 90%+ cost reduction:
from packages.caching import GPTCacheManager
# Initialize GPTCache with Redis backend
gpt_cache = GPTCacheManager(
redis_url="redis://localhost:6379",
similarity_threshold=0.85,
ttl_seconds=3600,
embedding_model="text-embedding-ada-002",
max_cache_size=10000
)
# Use with your RAG system
async def get_response_with_gpt_cache(query: str):
# Check GPTCache first
cache_result = gpt_cache.get(query)
if cache_result.hit:
logger.info(f"Cache hit! Similarity: {cache_result.similarity_score:.3f}")
return {
"response": cache_result.response,
"cached": True,
"similarity_score": cache_result.similarity_score,
"latency_ms": cache_result.latency_ms
}
# Cache miss - generate fresh response
response = await llm.invoke(query)
# Store in GPTCache
gpt_cache.set(query, response)
return {
"response": response,
"cached": False,
"latency_ms": 2000
}
# Get cache statistics
stats = gpt_cache.get_stats()
print(f"Hit rate: {stats.hit_rate:.2%}")
print(f"Total cost saved: ${stats.total_cost_saved:.2f}")
print(f"Average hit latency: {stats.avg_hit_latency_ms:.1f}ms")
Distributed Caching
For horizontal scaling across multiple nodes:
from packages.caching import DistributedCache, CacheNode, ConsistencyLevel
# Define cache cluster nodes
cluster_nodes = [
CacheNode(node_id="node1", host="cache1.example.com", port=6379),
CacheNode(node_id="node2", host="cache2.example.com", port=6379),
CacheNode(node_id="node3", host="cache3.example.com", port=6379)
]
# Initialize distributed cache
distributed_cache = DistributedCache(
cluster_nodes=cluster_nodes,
replication_factor=2,
consistency_level=ConsistencyLevel.EVENTUAL
)
# Use distributed cache
async def get_with_distributed_cache(key: str):
# Get from distributed cache
result = await distributed_cache.get(key)
if result:
return result.value
# Generate and store
value = await generate_value(key)
await distributed_cache.set(key, value)
return value
# Monitor cluster health
cluster_status = await distributed_cache.get_cluster_status()
for node_id, status in cluster_status.items():
print(f"Node {node_id}: {status.status} (last heartbeat: {status.last_heartbeat})")
Cache Warming Strategies
Pre-populate cache with frequently accessed data:
from packages.caching import CacheWarmer
# Initialize cache warmer
warmer = CacheWarmer(
cache_manager=gpt_cache,
warming_strategies=["popular_queries", "trending_topics", "user_preferences"]
)
# Warm cache with popular queries
popular_queries = await get_popular_queries(time_range="7d", limit=1000)
await warmer.warm_cache(popular_queries)
# Warm cache with trending topics
trending_topics = await get_trending_topics()
await warmer.warm_cache(trending_topics)
# Scheduled cache warming
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job('cron', hour=2) # Run at 2 AM daily
async def daily_cache_warming():
await warmer.warm_cache(await get_popular_queries())
scheduler.start()
Advanced Cache Analytics
from packages.caching import CacheAnalytics
# Initialize cache analytics
analytics = CacheAnalytics(cache_manager=gpt_cache)
# Get detailed analytics
analytics_report = await analytics.generate_report(
time_range="7d",
include_trends=True,
include_cost_analysis=True
)
print(f"""
Cache Analytics Report:
Hit Rate Trend: {analytics_report.hit_rate_trend}
Cost Savings: ${analytics_report.total_cost_saved:.2f}
Top Cache Hits: {analytics_report.top_cache_hits}
Cache Miss Patterns: {analytics_report.miss_patterns}
Optimization Recommendations: {analytics_report.recommendations}
""")
# Cache performance optimization
optimization = await analytics.get_optimization_recommendations()
for recommendation in optimization:
print(f"Recommendation: {recommendation.description}")
print(f"Expected Impact: {recommendation.expected_impact}")
print(f"Implementation Effort: {recommendation.effort}")
Cache Security and Privacy
from packages.caching import SecureCache
# Initialize secure cache with encryption
secure_cache = SecureCache(
base_cache=gpt_cache,
encryption_key="your-encryption-key",
enable_audit_logging=True,
privacy_level="high"
)
# Store sensitive data securely
await secure_cache.set(
key="user_query",
value=response,
user_id="user_123",
privacy_flags=["pii", "sensitive"]
)
# Get with privacy checks
result = await secure_cache.get(
key="user_query",
user_id="user_123",
access_level="user"
)
# Audit trail
audit_log = await secure_cache.get_audit_log(
user_id="user_123",
time_range="24h"
)
What You've Accomplished
✅ Implemented multi-layer caching (response, semantic, embedding)
✅ Reduced costs by 30-50% through caching
✅ Improved latency 10x for cache hits
✅ Set up cache monitoring and metrics
✅ Configured intelligent cache invalidation
✅ Integrated GPTCache for advanced semantic caching
✅ Implemented distributed caching for horizontal scaling
✅ Added cache warming and optimization strategies
✅ Enhanced security and privacy features
Next Steps
- 💰 Cost Optimization - More cost-saving strategies
- 🚀 Deploy to Production - Deploy with caching
- 📊 Monitor Performance - Track cache effectiveness
- 🌐 Distributed Caching - Scale across multiple nodes
- 🔒 Cache Security - Secure caching implementation