Enhanced State Persistence
Multi-backend state persistence with Redis, compression, versioning, and advanced features for enterprise-grade state management.
Features
- Multi-Backend Support: Redis, SQLite, PostgreSQL backends
- Compression: LZ4 and zlib compression for efficient storage
- Versioning: Automatic version management with rollback
- TTL Support: Time-to-live for automatic cleanup
- Checksums: Data integrity verification
- Tagging: Organize and search states by tags
- Statistics: Comprehensive performance metrics
Quick Start
from recoagent.state import RedisStatePersistence, CompressionType, StateMetadata
# Configure Redis persistence
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
enable_versioning=True,
max_versions=10,
default_ttl=3600
)
# Save state with metadata
persistence.save_with_metadata(
state={"user_id": "123", "data": "important_data"},
state_id="user_session_123",
version=1,
tags=["user", "session"],
ttl_seconds=7200
)
# Load state with metadata
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
print(f"State: {state}")
print(f"Version: {metadata.version}")
print(f"Created: {metadata.created_at}")
print(f"Size: {metadata.size_bytes} bytes")
print(f"Compression: {metadata.compression_type}")
Backend Configuration
Redis Backend
from recoagent.state import RedisStatePersistence, CompressionType
# Basic Redis configuration
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
enable_versioning=True,
max_versions=10,
default_ttl=3600
)
# Advanced Redis configuration
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
enable_versioning=True,
max_versions=20,
default_ttl=7200,
# Additional Redis options
redis_options={
"socket_timeout": 5,
"socket_connect_timeout": 5,
"retry_on_timeout": True,
"decode_responses": False
}
)
Compression Types
from recoagent.state import CompressionType
# No compression (fastest)
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.NONE
)
# LZ4 compression (balanced)
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4
)
# Zlib compression (highest compression)
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.ZLIB
)
State Operations
Saving States
# Save basic state
persistence.save_with_metadata(
state={"key": "value"},
state_id="basic_state",
version=1
)
# Save with tags
persistence.save_with_metadata(
state={"user_id": "123", "session_data": "data"},
state_id="user_session_123",
version=1,
tags=["user", "session", "active"]
)
# Save with TTL
persistence.save_with_metadata(
state={"temporary_data": "data"},
state_id="temp_state",
version=1,
ttl_seconds=300 # 5 minutes
)
# Save with custom metadata
persistence.save_with_metadata(
state={"complex_data": "data"},
state_id="complex_state",
version=1,
tags=["complex", "important"],
ttl_seconds=3600,
metadata={"source": "workflow", "priority": "high"}
)
Loading States
# Load latest version
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
print(f"State: {state}")
print(f"Metadata: {metadata}")
# Load specific version
result = persistence.load_with_metadata("user_session_123", version=2)
if result:
state, metadata = result
print(f"State: {state}")
# Load with version check
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
if metadata.version < 5:
print("Using older version")
Version Management
# List all versions
versions = persistence.list_versions("user_session_123")
for version_metadata in versions:
print(f"Version {version_metadata.version}: {version_metadata.created_at}")
# Delete specific version
persistence.delete_version("user_session_123", version=1)
# Get version count
versions = persistence.list_versions("user_session_123")
print(f"Total versions: {len(versions)}")
Advanced Features
Tag-Based Search
# Search by tags
results = persistence.search_by_tags(["user", "session"])
for metadata in results:
print(f"State ID: {metadata.state_id}")
print(f"Version: {metadata.version}")
print(f"Tags: {metadata.tags}")
# Search by multiple tags
results = persistence.search_by_tags(["user", "active"])
for metadata in results:
print(f"Found: {metadata.state_id}")
State Statistics
# Get persistence statistics
stats = persistence.get_stats()
print(f"Backend Type: {stats['backend_type']}")
print(f"Compression Type: {stats['compression_type']}")
print(f"Versioning Enabled: {stats['versioning_enabled']}")
print(f"Max Versions: {stats['max_versions']}")
# Redis-specific stats
if 'redis_info' in stats:
redis_info = stats['redis_info']
print(f"Used Memory: {redis_info['used_memory']}")
print(f"Connected Clients: {redis_info['connected_clients']}")
print(f"Total Commands: {redis_info['total_commands_processed']}")
Data Integrity
# Verify data integrity
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
print(f"Checksum: {metadata.checksum}")
print(f"Size: {metadata.size_bytes} bytes")
print(f"Compression: {metadata.compression_type}")
# Verify checksum
if metadata.checksum:
print("Data integrity verified")
else:
print("Data integrity check failed")
Integration Examples
With Workflows
from packages.observability import trace_workflow
@trace_workflow(name="stateful_workflow")
async def stateful_workflow(user_id: str):
# Load existing state
result = persistence.load_with_metadata(f"workflow_{user_id}")
if result:
state, metadata = result
print(f"Resuming workflow from version {metadata.version}")
else:
state = {"step": 0, "data": {}}
print("Starting new workflow")
# Update state
state["step"] += 1
state["data"]["processed"] = True
# Save updated state
persistence.save_with_metadata(
state=state,
state_id=f"workflow_{user_id}",
version=metadata.version + 1 if result else 1,
tags=["workflow", "user"],
ttl_seconds=3600
)
return state
With Checkpoints
# Create checkpoint
checkpoint_id = persistence.create_checkpoint(
state_id="workflow_123",
checkpoint_data={"step": 5, "data": "checkpoint_data"},
metadata={"workflow_name": "data_processing"}
)
# Restore checkpoint
checkpoint = persistence.restore_checkpoint(checkpoint_id)
if checkpoint:
print(f"Restored checkpoint: {checkpoint['checkpoint_data']}")
print(f"Metadata: {checkpoint['metadata']}")
With Compression
# Save with compression
large_state = {"data": "x" * 10000} # 10KB of data
persistence.save_with_metadata(
state=large_state,
state_id="large_state",
version=1,
tags=["large", "compressed"]
)
# Load and verify compression
result = persistence.load_with_metadata("large_state")
if result:
state, metadata = result
print(f"Original size: {metadata.size_bytes} bytes")
print(f"Compression type: {metadata.compression_type}")
print(f"Compression ratio: {len(str(large_state)) / metadata.size_bytes:.2f}")
Performance Optimization
Connection Pooling
# Configure connection pooling
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
# Connection pool options
redis_options={
"max_connections": 20,
"retry_on_timeout": True,
"socket_keepalive": True,
"socket_keepalive_options": {}
}
)
Batch Operations
# Batch save multiple states
states_to_save = [
{"state_id": "state_1", "state": {"data": "1"}, "version": 1},
{"state_id": "state_2", "state": {"data": "2"}, "version": 1},
{"state_id": "state_3", "state": {"data": "3"}, "version": 1}
]
for state_data in states_to_save:
persistence.save_with_metadata(
state=state_data["state"],
state_id=state_data["state_id"],
version=state_data["version"]
)
Memory Management
# Monitor memory usage
stats = persistence.get_stats()
if 'redis_info' in stats:
used_memory = stats['redis_info']['used_memory']
print(f"Redis memory usage: {used_memory}")
# Clean up old versions if memory is high
if used_memory > "100MB":
# Clean up old versions
for state_id in persistence.list_states():
versions = persistence.list_versions(state_id)
if len(versions) > 5: # Keep only 5 versions
for version in versions[:-5]:
persistence.delete_version(state_id, version.version)
Error Handling
Connection Errors
try:
persistence = RedisStatePersistence(redis_url="redis://localhost:6379/0")
except ConnectionError as e:
print(f"Redis connection failed: {e}")
# Fallback to in-memory storage
from recoagent.state import InMemoryStatePersistence
persistence = InMemoryStatePersistence()
Data Corruption
# Handle data corruption
result = persistence.load_with_metadata("corrupted_state")
if result:
state, metadata = result
if not metadata.checksum:
print("Warning: Data integrity check failed")
# Attempt to recover or use backup
Version Conflicts
# Handle version conflicts
try:
persistence.save_with_metadata(
state={"data": "new_data"},
state_id="conflicted_state",
version=5 # Assuming version 5 already exists
)
except VersionConflictError as e:
print(f"Version conflict: {e}")
# Resolve conflict
latest_result = persistence.load_with_metadata("conflicted_state")
if latest_result:
latest_state, latest_metadata = latest_result
# Merge or resolve conflict
resolved_state = resolve_conflict(latest_state, {"data": "new_data"})
persistence.save_with_metadata(
state=resolved_state,
state_id="conflicted_state",
version=latest_metadata.version + 1
)
Best Practices
- Choose Right Backend: Redis for production, SQLite for development
- Use Compression: For large states, enable compression
- Set Appropriate TTL: Based on state lifetime
- Version Management: Keep reasonable number of versions
- Tag Organization: Use tags for better organization
- Monitor Performance: Track statistics and optimize
- Handle Errors: Implement proper error handling
- Data Integrity: Verify checksums and handle corruption
Troubleshooting
Common Issues
- Redis Connection: Check Redis server and connection string
- Memory Issues: Monitor Redis memory usage
- Compression Errors: Check LZ4/zlib installation
- Version Conflicts: Handle concurrent access properly
Debug Mode
# Enable debug logging
import logging
logging.getLogger('recoagent.state.persistence').setLevel(logging.DEBUG)
Health Check
# Check persistence health
def check_persistence_health():
try:
# Test basic operations
test_state = {"test": "value"}
test_id = "health_check"
# Save
persistence.save_with_metadata(
state=test_state,
state_id=test_id,
version=1
)
# Load
result = persistence.load_with_metadata(test_id)
# Clean up
persistence.delete_version(test_id, version=1)
if result and result[0] == test_state:
print("✅ Persistence is healthy")
return True
else:
print("❌ Persistence data corruption")
return False
except Exception as e:
print(f"❌ Persistence health check failed: {e}")
return False
API Reference
RedisStatePersistence
| Parameter | Type | Description |
|---|---|---|
redis_url | str | Redis connection URL |
compression_type | CompressionType | Compression type |
enable_versioning | bool | Enable versioning |
max_versions | int | Maximum versions to keep |
default_ttl | int | Default TTL in seconds |
CompressionType
| Value | Description |
|---|---|
NONE | No compression |
LZ4 | LZ4 compression |
ZLIB | Zlib compression |
Methods
| Method | Description |
|---|---|
save_with_metadata(state, state_id, version, tags, ttl) | Save state with metadata |
load_with_metadata(state_id, version) | Load state with metadata |
list_versions(state_id) | List all versions |
delete_version(state_id, version) | Delete specific version |
search_by_tags(tags) | Search by tags |
get_stats() | Get statistics |