Skip to main content

Enhanced State Persistence

Multi-backend state persistence with Redis, compression, versioning, and advanced features for enterprise-grade state management.

Features

  • Multi-Backend Support: Redis, SQLite, PostgreSQL backends
  • Compression: LZ4 and zlib compression for efficient storage
  • Versioning: Automatic version management with rollback
  • TTL Support: Time-to-live for automatic cleanup
  • Checksums: Data integrity verification
  • Tagging: Organize and search states by tags
  • Statistics: Comprehensive performance metrics

Quick Start

from recoagent.state import RedisStatePersistence, CompressionType, StateMetadata

# Configure Redis persistence
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
enable_versioning=True,
max_versions=10,
default_ttl=3600
)

# Save state with metadata
persistence.save_with_metadata(
state={"user_id": "123", "data": "important_data"},
state_id="user_session_123",
version=1,
tags=["user", "session"],
ttl_seconds=7200
)

# Load state with metadata
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
print(f"State: {state}")
print(f"Version: {metadata.version}")
print(f"Created: {metadata.created_at}")
print(f"Size: {metadata.size_bytes} bytes")
print(f"Compression: {metadata.compression_type}")

Backend Configuration

Redis Backend

from recoagent.state import RedisStatePersistence, CompressionType

# Basic Redis configuration
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
enable_versioning=True,
max_versions=10,
default_ttl=3600
)

# Advanced Redis configuration
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
enable_versioning=True,
max_versions=20,
default_ttl=7200,
# Additional Redis options
redis_options={
"socket_timeout": 5,
"socket_connect_timeout": 5,
"retry_on_timeout": True,
"decode_responses": False
}
)

Compression Types

from recoagent.state import CompressionType

# No compression (fastest)
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.NONE
)

# LZ4 compression (balanced)
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4
)

# Zlib compression (highest compression)
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.ZLIB
)

State Operations

Saving States

# Save basic state
persistence.save_with_metadata(
state={"key": "value"},
state_id="basic_state",
version=1
)

# Save with tags
persistence.save_with_metadata(
state={"user_id": "123", "session_data": "data"},
state_id="user_session_123",
version=1,
tags=["user", "session", "active"]
)

# Save with TTL
persistence.save_with_metadata(
state={"temporary_data": "data"},
state_id="temp_state",
version=1,
ttl_seconds=300 # 5 minutes
)

# Save with custom metadata
persistence.save_with_metadata(
state={"complex_data": "data"},
state_id="complex_state",
version=1,
tags=["complex", "important"],
ttl_seconds=3600,
metadata={"source": "workflow", "priority": "high"}
)

Loading States

# Load latest version
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
print(f"State: {state}")
print(f"Metadata: {metadata}")

# Load specific version
result = persistence.load_with_metadata("user_session_123", version=2)
if result:
state, metadata = result
print(f"State: {state}")

# Load with version check
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
if metadata.version < 5:
print("Using older version")

Version Management

# List all versions
versions = persistence.list_versions("user_session_123")
for version_metadata in versions:
print(f"Version {version_metadata.version}: {version_metadata.created_at}")

# Delete specific version
persistence.delete_version("user_session_123", version=1)

# Get version count
versions = persistence.list_versions("user_session_123")
print(f"Total versions: {len(versions)}")

Advanced Features

# Search by tags
results = persistence.search_by_tags(["user", "session"])
for metadata in results:
print(f"State ID: {metadata.state_id}")
print(f"Version: {metadata.version}")
print(f"Tags: {metadata.tags}")

# Search by multiple tags
results = persistence.search_by_tags(["user", "active"])
for metadata in results:
print(f"Found: {metadata.state_id}")

State Statistics

# Get persistence statistics
stats = persistence.get_stats()
print(f"Backend Type: {stats['backend_type']}")
print(f"Compression Type: {stats['compression_type']}")
print(f"Versioning Enabled: {stats['versioning_enabled']}")
print(f"Max Versions: {stats['max_versions']}")

# Redis-specific stats
if 'redis_info' in stats:
redis_info = stats['redis_info']
print(f"Used Memory: {redis_info['used_memory']}")
print(f"Connected Clients: {redis_info['connected_clients']}")
print(f"Total Commands: {redis_info['total_commands_processed']}")

Data Integrity

# Verify data integrity
result = persistence.load_with_metadata("user_session_123")
if result:
state, metadata = result
print(f"Checksum: {metadata.checksum}")
print(f"Size: {metadata.size_bytes} bytes")
print(f"Compression: {metadata.compression_type}")

# Verify checksum
if metadata.checksum:
print("Data integrity verified")
else:
print("Data integrity check failed")

Integration Examples

With Workflows

from packages.observability import trace_workflow

@trace_workflow(name="stateful_workflow")
async def stateful_workflow(user_id: str):
# Load existing state
result = persistence.load_with_metadata(f"workflow_{user_id}")
if result:
state, metadata = result
print(f"Resuming workflow from version {metadata.version}")
else:
state = {"step": 0, "data": {}}
print("Starting new workflow")

# Update state
state["step"] += 1
state["data"]["processed"] = True

# Save updated state
persistence.save_with_metadata(
state=state,
state_id=f"workflow_{user_id}",
version=metadata.version + 1 if result else 1,
tags=["workflow", "user"],
ttl_seconds=3600
)

return state

With Checkpoints

# Create checkpoint
checkpoint_id = persistence.create_checkpoint(
state_id="workflow_123",
checkpoint_data={"step": 5, "data": "checkpoint_data"},
metadata={"workflow_name": "data_processing"}
)

# Restore checkpoint
checkpoint = persistence.restore_checkpoint(checkpoint_id)
if checkpoint:
print(f"Restored checkpoint: {checkpoint['checkpoint_data']}")
print(f"Metadata: {checkpoint['metadata']}")

With Compression

# Save with compression
large_state = {"data": "x" * 10000} # 10KB of data
persistence.save_with_metadata(
state=large_state,
state_id="large_state",
version=1,
tags=["large", "compressed"]
)

# Load and verify compression
result = persistence.load_with_metadata("large_state")
if result:
state, metadata = result
print(f"Original size: {metadata.size_bytes} bytes")
print(f"Compression type: {metadata.compression_type}")
print(f"Compression ratio: {len(str(large_state)) / metadata.size_bytes:.2f}")

Performance Optimization

Connection Pooling

# Configure connection pooling
persistence = RedisStatePersistence(
redis_url="redis://localhost:6379/0",
compression_type=CompressionType.LZ4,
# Connection pool options
redis_options={
"max_connections": 20,
"retry_on_timeout": True,
"socket_keepalive": True,
"socket_keepalive_options": {}
}
)

Batch Operations

# Batch save multiple states
states_to_save = [
{"state_id": "state_1", "state": {"data": "1"}, "version": 1},
{"state_id": "state_2", "state": {"data": "2"}, "version": 1},
{"state_id": "state_3", "state": {"data": "3"}, "version": 1}
]

for state_data in states_to_save:
persistence.save_with_metadata(
state=state_data["state"],
state_id=state_data["state_id"],
version=state_data["version"]
)

Memory Management

# Monitor memory usage
stats = persistence.get_stats()
if 'redis_info' in stats:
used_memory = stats['redis_info']['used_memory']
print(f"Redis memory usage: {used_memory}")

# Clean up old versions if memory is high
if used_memory > "100MB":
# Clean up old versions
for state_id in persistence.list_states():
versions = persistence.list_versions(state_id)
if len(versions) > 5: # Keep only 5 versions
for version in versions[:-5]:
persistence.delete_version(state_id, version.version)

Error Handling

Connection Errors

try:
persistence = RedisStatePersistence(redis_url="redis://localhost:6379/0")
except ConnectionError as e:
print(f"Redis connection failed: {e}")
# Fallback to in-memory storage
from recoagent.state import InMemoryStatePersistence
persistence = InMemoryStatePersistence()

Data Corruption

# Handle data corruption
result = persistence.load_with_metadata("corrupted_state")
if result:
state, metadata = result
if not metadata.checksum:
print("Warning: Data integrity check failed")
# Attempt to recover or use backup

Version Conflicts

# Handle version conflicts
try:
persistence.save_with_metadata(
state={"data": "new_data"},
state_id="conflicted_state",
version=5 # Assuming version 5 already exists
)
except VersionConflictError as e:
print(f"Version conflict: {e}")
# Resolve conflict
latest_result = persistence.load_with_metadata("conflicted_state")
if latest_result:
latest_state, latest_metadata = latest_result
# Merge or resolve conflict
resolved_state = resolve_conflict(latest_state, {"data": "new_data"})
persistence.save_with_metadata(
state=resolved_state,
state_id="conflicted_state",
version=latest_metadata.version + 1
)

Best Practices

  1. Choose Right Backend: Redis for production, SQLite for development
  2. Use Compression: For large states, enable compression
  3. Set Appropriate TTL: Based on state lifetime
  4. Version Management: Keep reasonable number of versions
  5. Tag Organization: Use tags for better organization
  6. Monitor Performance: Track statistics and optimize
  7. Handle Errors: Implement proper error handling
  8. Data Integrity: Verify checksums and handle corruption

Troubleshooting

Common Issues

  1. Redis Connection: Check Redis server and connection string
  2. Memory Issues: Monitor Redis memory usage
  3. Compression Errors: Check LZ4/zlib installation
  4. Version Conflicts: Handle concurrent access properly

Debug Mode

# Enable debug logging
import logging
logging.getLogger('recoagent.state.persistence').setLevel(logging.DEBUG)

Health Check

# Check persistence health
def check_persistence_health():
try:
# Test basic operations
test_state = {"test": "value"}
test_id = "health_check"

# Save
persistence.save_with_metadata(
state=test_state,
state_id=test_id,
version=1
)

# Load
result = persistence.load_with_metadata(test_id)

# Clean up
persistence.delete_version(test_id, version=1)

if result and result[0] == test_state:
print("✅ Persistence is healthy")
return True
else:
print("❌ Persistence data corruption")
return False

except Exception as e:
print(f"❌ Persistence health check failed: {e}")
return False

API Reference

RedisStatePersistence

ParameterTypeDescription
redis_urlstrRedis connection URL
compression_typeCompressionTypeCompression type
enable_versioningboolEnable versioning
max_versionsintMaximum versions to keep
default_ttlintDefault TTL in seconds

CompressionType

ValueDescription
NONENo compression
LZ4LZ4 compression
ZLIBZlib compression

Methods

MethodDescription
save_with_metadata(state, state_id, version, tags, ttl)Save state with metadata
load_with_metadata(state_id, version)Load state with metadata
list_versions(state_id)List all versions
delete_version(state_id, version)Delete specific version
search_by_tags(tags)Search by tags
get_stats()Get statistics