Skip to main content

Caching Core

Core caching infrastructure providing multi-layer caching, GPTCache integration, semantic caching, and performance optimization for enterprise RAG systems.

Core Classes

CacheManager

Description: Main cache manager for multi-layer caching system

Parameters:

  • cache_layers (List[CacheLayer]): List of cache layers to enable
  • default_ttl (int): Default time-to-live in seconds (default: 3600)
  • max_size (int): Maximum cache size (default: 10000)
  • eviction_policy (str): Eviction policy ("lru", "lfu", "ttl")

Returns: CacheManager instance

Example:

from recoagent.caching.core import CacheManager, CacheLayer

# Create cache manager
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT, CacheLayer.LLM_RESPONSE],
default_ttl=7200, # 2 hours
max_size=50000,
eviction_policy="lru"
)

# Cache embedding
embedding_key = cache_manager.generate_key("machine learning", CacheLayer.EMBEDDING)
cache_manager.set(embedding_key, embedding_vector, ttl=3600)

# Retrieve embedding
cached_embedding = cache_manager.get(embedding_key)

SemanticCache

Description: Semantic caching using GPTCache for similarity-based retrieval

Parameters:

  • similarity_threshold (float): Similarity threshold for cache hits (default: 0.8)
  • embedding_model (str): Embedding model for semantic similarity (default: "text-embedding-ada-002")
  • cache_backend (str): Cache backend ("memory", "redis", "sqlite")

Returns: SemanticCache instance

Example:

from recoagent.caching.core import SemanticCache

# Create semantic cache
semantic_cache = SemanticCache(
similarity_threshold=0.85,
embedding_model="text-embedding-ada-002",
cache_backend="redis"
)

# Cache semantic result
semantic_cache.set(
query="What is artificial intelligence?",
result="AI is the simulation of human intelligence...",
metadata={"domain": "AI", "confidence": 0.9}
)

# Retrieve similar query
similar_result = semantic_cache.get("What is AI?")
print(f"Cache hit: {similar_result is not None}")

DistributedCache

Description: Distributed caching for multi-node deployments

Parameters:

  • redis_url (str): Redis connection URL
  • cluster_mode (bool): Enable Redis cluster mode (default: False)
  • replication_factor (int): Replication factor (default: 1)
  • consistency_level (str): Consistency level ("strong", "eventual")

Returns: DistributedCache instance

Example:

from recoagent.caching.core import DistributedCache

# Create distributed cache
distributed_cache = DistributedCache(
redis_url="redis://localhost:6379",
cluster_mode=False,
replication_factor=2,
consistency_level="eventual"
)

# Set with replication
distributed_cache.set(
key="global_config",
value={"model": "gpt-4", "temperature": 0.1},
replicate=True
)

# Get from any node
config = distributed_cache.get("global_config")

CacheEntry

Description: Cache entry with metadata and lifecycle management

Fields:

  • key (str): Cache key
  • value (Any): Cached value
  • created_at (datetime): Creation timestamp
  • expires_at (datetime, optional): Expiration timestamp
  • access_count (int): Number of accesses
  • last_accessed (datetime): Last access timestamp
  • metadata (Dict): Additional metadata
  • layer (CacheLayer): Cache layer type

Usage Examples

Basic Multi-Layer Caching

from recoagent.caching.core import CacheManager, CacheLayer

# Create cache manager with multiple layers
cache_manager = CacheManager(
cache_layers=[
CacheLayer.EMBEDDING,
CacheLayer.SEARCH_RESULT,
CacheLayer.LLM_RESPONSE,
CacheLayer.QUERY_PATTERN
],
default_ttl=3600,
max_size=10000,
eviction_policy="lru"
)

# Cache different types of data
# 1. Embeddings
embedding_key = cache_manager.generate_key("machine learning", CacheLayer.EMBEDDING)
cache_manager.set(embedding_key, [0.1, 0.2, 0.3, ...], ttl=7200)

# 2. Search results
search_key = cache_manager.generate_key("AI applications", CacheLayer.SEARCH_RESULT)
search_results = [
{"title": "AI in Healthcare", "content": "..."},
{"title": "AI in Finance", "content": "..."}
]
cache_manager.set(search_key, search_results, ttl=1800)

# 3. LLM responses
llm_key = cache_manager.generate_key("What is AI?", CacheLayer.LLM_RESPONSE)
llm_response = "Artificial Intelligence is..."
cache_manager.set(llm_key, llm_response, ttl=3600)

# Retrieve cached data
cached_embedding = cache_manager.get(embedding_key)
cached_search = cache_manager.get(search_key)
cached_llm = cache_manager.get(llm_key)

Advanced Semantic Caching

from recoagent.caching.core import SemanticCache

# Create semantic cache with high similarity threshold
semantic_cache = SemanticCache(
similarity_threshold=0.9,
embedding_model="text-embedding-ada-002",
cache_backend="redis"
)

# Cache semantic results
queries_and_results = [
("What is machine learning?", "ML is a subset of AI..."),
("How does deep learning work?", "Deep learning uses neural networks..."),
("Explain artificial intelligence", "AI is the simulation of human intelligence...")
]

for query, result in queries_and_results:
semantic_cache.set(
query=query,
result=result,
metadata={
"domain": "AI",
"confidence": 0.95,
"timestamp": datetime.utcnow()
}
)

# Test semantic similarity
similar_queries = [
"What is ML?", # Should match "What is machine learning?"
"How do neural networks work?", # Should match "How does deep learning work?"
"What is AI?", # Should match "Explain artificial intelligence"
]

for query in similar_queries:
result = semantic_cache.get(query)
if result:
print(f"Cache hit for: {query}")
print(f"Original query: {result.metadata.get('original_query')}")
print(f"Similarity: {result.similarity_score:.3f}")
else:
print(f"Cache miss for: {query}")
print("---")

Distributed Caching with Replication

from recoagent.caching.core import DistributedCache

# Create distributed cache with replication
distributed_cache = DistributedCache(
redis_url="redis://cluster.example.com:6379",
cluster_mode=True,
replication_factor=3,
consistency_level="eventual"
)

# Cache with automatic replication
cache_data = {
"user_preferences": {"language": "en", "theme": "dark"},
"model_config": {"temperature": 0.1, "max_tokens": 1000},
"api_keys": {"openai": "sk-...", "anthropic": "sk-..."}
}

for key, value in cache_data.items():
distributed_cache.set(
key=key,
value=value,
ttl=86400, # 24 hours
replicate=True
)

# Retrieve from any node
user_prefs = distributed_cache.get("user_preferences")
model_config = distributed_cache.get("model_config")

# Check replication status
replication_status = distributed_cache.get_replication_status()
print(f"Replication status: {replication_status}")

Cache Warming and Preloading

from recoagent.caching.core import CacheManager, CacheWarmer

# Create cache manager
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT],
default_ttl=3600
)

# Create cache warmer
cache_warmer = CacheWarmer(cache_manager=cache_manager)

# Define warming strategies
warming_strategies = [
{
"layer": CacheLayer.EMBEDDING,
"queries": ["machine learning", "artificial intelligence", "deep learning"],
"priority": "high"
},
{
"layer": CacheLayer.SEARCH_RESULT,
"queries": ["AI applications", "ML algorithms", "neural networks"],
"priority": "medium"
}
]

# Warm cache
async def warm_cache():
for strategy in warming_strategies:
await cache_warmer.warm_layer(
layer=strategy["layer"],
queries=strategy["queries"],
priority=strategy["priority"]
)

# Run cache warming
import asyncio
asyncio.run(warm_cache())

# Check cache statistics
stats = cache_manager.get_statistics()
print(f"Cache hit rate: {stats['hit_rate']:.2%}")
print(f"Total entries: {stats['total_entries']}")
print(f"Memory usage: {stats['memory_usage']:.2f} MB")

Cache Performance Monitoring

from recoagent.caching.core import CacheManager, CacheMonitor

# Create cache manager with monitoring
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT, CacheLayer.LLM_RESPONSE],
enable_monitoring=True
)

# Create cache monitor
cache_monitor = CacheMonitor(cache_manager=cache_manager)

# Start monitoring
cache_monitor.start_monitoring(interval=30) # Monitor every 30 seconds

# Simulate cache operations
import time
import random

for i in range(100):
# Random cache operations
key = f"test_key_{i}"
value = f"test_value_{i}"

if random.random() < 0.7: # 70% cache sets
cache_manager.set(key, value, ttl=3600)
else: # 30% cache gets
cache_manager.get(key)

time.sleep(0.1)

# Get performance metrics
metrics = cache_monitor.get_metrics()
print("=== Cache Performance Metrics ===")
print(f"Hit rate: {metrics['hit_rate']:.2%}")
print(f"Miss rate: {metrics['miss_rate']:.2%}")
print(f"Average access time: {metrics['avg_access_time']:.3f}ms")
print(f"Memory usage: {metrics['memory_usage']:.2f} MB")
print(f"Eviction count: {metrics['eviction_count']}")

# Get layer-specific metrics
for layer in CacheLayer:
layer_metrics = cache_monitor.get_layer_metrics(layer)
print(f"\n{layer.value} Layer:")
print(f" Entries: {layer_metrics['entry_count']}")
print(f" Hit rate: {layer_metrics['hit_rate']:.2%}")
print(f" Memory usage: {layer_metrics['memory_usage']:.2f} MB")

Cache Invalidation Strategies

from recoagent.caching.core import CacheManager, CacheInvalidator

# Create cache manager
cache_manager = CacheManager(
cache_layers=[CacheLayer.EMBEDDING, CacheLayer.SEARCH_RESULT],
default_ttl=3600
)

# Create cache invalidator
cache_invalidator = CacheInvalidator(cache_manager=cache_manager)

# Define invalidation rules
invalidation_rules = [
{
"pattern": "user_*",
"trigger": "user_update",
"action": "invalidate"
},
{
"pattern": "model_*",
"trigger": "model_update",
"action": "refresh"
},
{
"pattern": "search_*",
"trigger": "data_update",
"action": "invalidate"
}
]

# Register invalidation rules
for rule in invalidation_rules:
cache_invalidator.add_rule(rule)

# Simulate cache operations
cache_manager.set("user_123", {"name": "John", "preferences": {}})
cache_manager.set("model_gpt4", {"temperature": 0.1, "max_tokens": 1000})
cache_manager.set("search_ai", ["result1", "result2"])

# Trigger invalidation events
cache_invalidator.trigger_event("user_update", {"user_id": "123"})
cache_invalidator.trigger_event("model_update", {"model": "gpt4"})
cache_invalidator.trigger_event("data_update", {"domain": "ai"})

# Check cache status
print(f"User cache exists: {cache_manager.exists('user_123')}")
print(f"Model cache exists: {cache_manager.exists('model_gpt4')}")
print(f"Search cache exists: {cache_manager.exists('search_ai')}")

API Reference

CacheManager Methods

set(key: str, value: Any, ttl: int = None, layer: CacheLayer = None) -> None

Set cache entry

Parameters:

  • key (str): Cache key
  • value (Any): Value to cache
  • ttl (int, optional): Time-to-live in seconds
  • layer (CacheLayer, optional): Cache layer

get(key: str, layer: CacheLayer = None) -> Any

Get cache entry

Parameters:

  • key (str): Cache key
  • layer (CacheLayer, optional): Cache layer

Returns: Cached value or None

exists(key: str, layer: CacheLayer = None) -> bool

Check if cache entry exists

Parameters:

  • key (str): Cache key
  • layer (CacheLayer, optional): Cache layer

Returns: True if entry exists

delete(key: str, layer: CacheLayer = None) -> bool

Delete cache entry

Parameters:

  • key (str): Cache key
  • layer (CacheLayer, optional): Cache layer

Returns: True if deleted

SemanticCache Methods

set(query: str, result: Any, metadata: Dict = None) -> None

Set semantic cache entry

Parameters:

  • query (str): Query text
  • result (Any): Result to cache
  • metadata (Dict, optional): Additional metadata

get(query: str, threshold: float = None) -> SemanticCacheResult

Get semantically similar result

Parameters:

  • query (str): Query text
  • threshold (float, optional): Similarity threshold

Returns: SemanticCacheResult with similarity score

DistributedCache Methods

set(key: str, value: Any, ttl: int = None, replicate: bool = False) -> None

Set distributed cache entry

Parameters:

  • key (str): Cache key
  • value (Any): Value to cache
  • ttl (int, optional): Time-to-live
  • replicate (bool): Enable replication

get(key: str, consistent: bool = False) -> Any

Get distributed cache entry

Parameters:

  • key (str): Cache key
  • consistent (bool): Use consistent read

Returns: Cached value or None

See Also