Caching Core
Core caching infrastructure providing multi-layer caching, GPTCache integration, semantic caching, and performance optimization for enterprise RAG systems.
Core Classes
CacheManager
Description: Main cache manager for multi-layer caching system
Parameters:
cache_layers(List[CacheLayer]): List of cache layers to enabledefault_ttl(int): Default time-to-live in seconds (default: 3600)max_size(int): Maximum cache size (default: 10000)eviction_policy(str): Eviction policy ("lru", "lfu", "ttl")
Returns: CacheManager instance
Example:
from recoagent.caching.core import CacheManager, CacheLayer
# Create cache manager
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT, CacheLayer.LLM_RESPONSE],
default_ttl=7200, # 2 hours
max_size=50000,
eviction_policy="lru"
)
# Cache embedding
embedding_key = cache_manager.generate_key("machine learning", CacheLayer.EMBEDDING)
cache_manager.set(embedding_key, embedding_vector, ttl=3600)
# Retrieve embedding
cached_embedding = cache_manager.get(embedding_key)
SemanticCache
Description: Semantic caching using GPTCache for similarity-based retrieval
Parameters:
similarity_threshold(float): Similarity threshold for cache hits (default: 0.8)embedding_model(str): Embedding model for semantic similarity (default: "text-embedding-ada-002")cache_backend(str): Cache backend ("memory", "redis", "sqlite")
Returns: SemanticCache instance
Example:
from recoagent.caching.core import SemanticCache
# Create semantic cache
semantic_cache = SemanticCache(
similarity_threshold=0.85,
embedding_model="text-embedding-ada-002",
cache_backend="redis"
)
# Cache semantic result
semantic_cache.set(
query="What is artificial intelligence?",
result="AI is the simulation of human intelligence...",
metadata={"domain": "AI", "confidence": 0.9}
)
# Retrieve similar query
similar_result = semantic_cache.get("What is AI?")
print(f"Cache hit: {similar_result is not None}")
DistributedCache
Description: Distributed caching for multi-node deployments
Parameters:
redis_url(str): Redis connection URLcluster_mode(bool): Enable Redis cluster mode (default: False)replication_factor(int): Replication factor (default: 1)consistency_level(str): Consistency level ("strong", "eventual")
Returns: DistributedCache instance
Example:
from recoagent.caching.core import DistributedCache
# Create distributed cache
distributed_cache = DistributedCache(
redis_url="redis://localhost:6379",
cluster_mode=False,
replication_factor=2,
consistency_level="eventual"
)
# Set with replication
distributed_cache.set(
key="global_config",
value={"model": "gpt-4", "temperature": 0.1},
replicate=True
)
# Get from any node
config = distributed_cache.get("global_config")
CacheEntry
Description: Cache entry with metadata and lifecycle management
Fields:
key(str): Cache keyvalue(Any): Cached valuecreated_at(datetime): Creation timestampexpires_at(datetime, optional): Expiration timestampaccess_count(int): Number of accesseslast_accessed(datetime): Last access timestampmetadata(Dict): Additional metadatalayer(CacheLayer): Cache layer type
Usage Examples
Basic Multi-Layer Caching
from recoagent.caching.core import CacheManager, CacheLayer
# Create cache manager with multiple layers
cache_manager = CacheManager(
cache_layers=[
CacheLayer.EMBEDDING,
CacheLayer.SEARCH_RESULT,
CacheLayer.LLM_RESPONSE,
CacheLayer.QUERY_PATTERN
],
default_ttl=3600,
max_size=10000,
eviction_policy="lru"
)
# Cache different types of data
# 1. Embeddings
embedding_key = cache_manager.generate_key("machine learning", CacheLayer.EMBEDDING)
cache_manager.set(embedding_key, [0.1, 0.2, 0.3, ...], ttl=7200)
# 2. Search results
search_key = cache_manager.generate_key("AI applications", CacheLayer.SEARCH_RESULT)
search_results = [
{"title": "AI in Healthcare", "content": "..."},
{"title": "AI in Finance", "content": "..."}
]
cache_manager.set(search_key, search_results, ttl=1800)
# 3. LLM responses
llm_key = cache_manager.generate_key("What is AI?", CacheLayer.LLM_RESPONSE)
llm_response = "Artificial Intelligence is..."
cache_manager.set(llm_key, llm_response, ttl=3600)
# Retrieve cached data
cached_embedding = cache_manager.get(embedding_key)
cached_search = cache_manager.get(search_key)
cached_llm = cache_manager.get(llm_key)
Advanced Semantic Caching
from recoagent.caching.core import SemanticCache
# Create semantic cache with high similarity threshold
semantic_cache = SemanticCache(
similarity_threshold=0.9,
embedding_model="text-embedding-ada-002",
cache_backend="redis"
)
# Cache semantic results
queries_and_results = [
("What is machine learning?", "ML is a subset of AI..."),
("How does deep learning work?", "Deep learning uses neural networks..."),
("Explain artificial intelligence", "AI is the simulation of human intelligence...")
]
for query, result in queries_and_results:
semantic_cache.set(
query=query,
result=result,
metadata={
"domain": "AI",
"confidence": 0.95,
"timestamp": datetime.utcnow()
}
)
# Test semantic similarity
similar_queries = [
"What is ML?", # Should match "What is machine learning?"
"How do neural networks work?", # Should match "How does deep learning work?"
"What is AI?", # Should match "Explain artificial intelligence"
]
for query in similar_queries:
result = semantic_cache.get(query)
if result:
print(f"Cache hit for: {query}")
print(f"Original query: {result.metadata.get('original_query')}")
print(f"Similarity: {result.similarity_score:.3f}")
else:
print(f"Cache miss for: {query}")
print("---")
Distributed Caching with Replication
from recoagent.caching.core import DistributedCache
# Create distributed cache with replication
distributed_cache = DistributedCache(
redis_url="redis://cluster.example.com:6379",
cluster_mode=True,
replication_factor=3,
consistency_level="eventual"
)
# Cache with automatic replication
cache_data = {
"user_preferences": {"language": "en", "theme": "dark"},
"model_config": {"temperature": 0.1, "max_tokens": 1000},
"api_keys": {"openai": "sk-...", "anthropic": "sk-..."}
}
for key, value in cache_data.items():
distributed_cache.set(
key=key,
value=value,
ttl=86400, # 24 hours
replicate=True
)
# Retrieve from any node
user_prefs = distributed_cache.get("user_preferences")
model_config = distributed_cache.get("model_config")
# Check replication status
replication_status = distributed_cache.get_replication_status()
print(f"Replication status: {replication_status}")
Cache Warming and Preloading
from recoagent.caching.core import CacheManager, CacheWarmer
# Create cache manager
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT],
default_ttl=3600
)
# Create cache warmer
cache_warmer = CacheWarmer(cache_manager=cache_manager)
# Define warming strategies
warming_strategies = [
{
"layer": CacheLayer.EMBEDDING,
"queries": ["machine learning", "artificial intelligence", "deep learning"],
"priority": "high"
},
{
"layer": CacheLayer.SEARCH_RESULT,
"queries": ["AI applications", "ML algorithms", "neural networks"],
"priority": "medium"
}
]
# Warm cache
async def warm_cache():
for strategy in warming_strategies:
await cache_warmer.warm_layer(
layer=strategy["layer"],
queries=strategy["queries"],
priority=strategy["priority"]
)
# Run cache warming
import asyncio
asyncio.run(warm_cache())
# Check cache statistics
stats = cache_manager.get_statistics()
print(f"Cache hit rate: {stats['hit_rate']:.2%}")
print(f"Total entries: {stats['total_entries']}")
print(f"Memory usage: {stats['memory_usage']:.2f} MB")
Cache Performance Monitoring
from recoagent.caching.core import CacheManager, CacheMonitor
# Create cache manager with monitoring
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT, CacheLayer.LLM_RESPONSE],
enable_monitoring=True
)
# Create cache monitor
cache_monitor = CacheMonitor(cache_manager=cache_manager)
# Start monitoring
cache_monitor.start_monitoring(interval=30) # Monitor every 30 seconds
# Simulate cache operations
import time
import random
for i in range(100):
# Random cache operations
key = f"test_key_{i}"
value = f"test_value_{i}"
if random.random() < 0.7: # 70% cache sets
cache_manager.set(key, value, ttl=3600)
else: # 30% cache gets
cache_manager.get(key)
time.sleep(0.1)
# Get performance metrics
metrics = cache_monitor.get_metrics()
print("=== Cache Performance Metrics ===")
print(f"Hit rate: {metrics['hit_rate']:.2%}")
print(f"Miss rate: {metrics['miss_rate']:.2%}")
print(f"Average access time: {metrics['avg_access_time']:.3f}ms")
print(f"Memory usage: {metrics['memory_usage']:.2f} MB")
print(f"Eviction count: {metrics['eviction_count']}")
# Get layer-specific metrics
for layer in CacheLayer:
layer_metrics = cache_monitor.get_layer_metrics(layer)
print(f"\n{layer.value} Layer:")
print(f" Entries: {layer_metrics['entry_count']}")
print(f" Hit rate: {layer_metrics['hit_rate']:.2%}")
print(f" Memory usage: {layer_metrics['memory_usage']:.2f} MB")
Cache Invalidation Strategies
from recoagent.caching.core import CacheManager, CacheInvalidator
# Create cache manager
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT],
default_ttl=3600
)
# Create cache invalidator
cache_invalidator = CacheInvalidator(cache_manager=cache_manager)
# Define invalidation rules
invalidation_rules = [
{
"pattern": "user_*",
"trigger": "user_update",
"action": "invalidate"
},
{
"pattern": "model_*",
"trigger": "model_update",
"action": "refresh"
},
{
"pattern": "search_*",
"trigger": "data_update",
"action": "invalidate"
}
]
# Register invalidation rules
for rule in invalidation_rules:
cache_invalidator.add_rule(rule)
# Simulate cache operations
cache_manager.set("user_123", {"name": "John", "preferences": {}})
cache_manager.set("model_gpt4", {"temperature": 0.1, "max_tokens": 1000})
cache_manager.set("search_ai", ["result1", "result2"])
# Trigger invalidation events
cache_invalidator.trigger_event("user_update", {"user_id": "123"})
cache_invalidator.trigger_event("model_update", {"model": "gpt4"})
cache_invalidator.trigger_event("data_update", {"domain": "ai"})
# Check cache status
print(f"User cache exists: {cache_manager.exists('user_123')}")
print(f"Model cache exists: {cache_manager.exists('model_gpt4')}")
print(f"Search cache exists: {cache_manager.exists('search_ai')}")
API Reference
CacheManager Methods
set(key: str, value: Any, ttl: int = None, layer: CacheLayer = None) -> None
Set cache entry
Parameters:
key(str): Cache keyvalue(Any): Value to cachettl(int, optional): Time-to-live in secondslayer(CacheLayer, optional): Cache layer
get(key: str, layer: CacheLayer = None) -> Any
Get cache entry
Parameters:
key(str): Cache keylayer(CacheLayer, optional): Cache layer
Returns: Cached value or None
exists(key: str, layer: CacheLayer = None) -> bool
Check if cache entry exists
Parameters:
key(str): Cache keylayer(CacheLayer, optional): Cache layer
Returns: True if entry exists
delete(key: str, layer: CacheLayer = None) -> bool
Delete cache entry
Parameters:
key(str): Cache keylayer(CacheLayer, optional): Cache layer
Returns: True if deleted
SemanticCache Methods
set(query: str, result: Any, metadata: Dict = None) -> None
Set semantic cache entry
Parameters:
query(str): Query textresult(Any): Result to cachemetadata(Dict, optional): Additional metadata
get(query: str, threshold: float = None) -> SemanticCacheResult
Get semantically similar result
Parameters:
query(str): Query textthreshold(float, optional): Similarity threshold
Returns: SemanticCacheResult with similarity score
DistributedCache Methods
set(key: str, value: Any, ttl: int = None, replicate: bool = False) -> None
Set distributed cache entry
Parameters:
key(str): Cache keyvalue(Any): Value to cachettl(int, optional): Time-to-livereplicate(bool): Enable replication
get(key: str, consistent: bool = False) -> Any
Get distributed cache entry
Parameters:
key(str): Cache keyconsistent(bool): Use consistent read
Returns: Cached value or None
See Also
- RAG Retrievers - Document retrieval with caching
- LLM Providers - LLM response caching
- Analytics Core - Cache performance metrics
- Observability Metrics - Cache monitoring