Memory Persistence Troubleshooting Guide
This guide helps you diagnose and resolve common issues with RecoAgent's LangGraph memory persistence system.
Common Issues and Solutions
Database Connection Issues
Issue: "Database is locked" or "Database connection timeout"
Symptoms:
- Operations fail with database lock errors
- Slow response times
- Connection timeout exceptions
Causes:
- Too many concurrent connections
- Long-running transactions
- Database file corruption
- Insufficient connection pool size
Solutions:
-
Increase connection pool size:
memory_manager = MemoryManager(
db_path="memory.db",
max_connections=20, # Increase from default 10
connection_timeout=60.0 # Increase timeout
) -
Check for long-running operations:
# Use connection context managers
async with memory_manager.saver.get_connection() as conn:
# Your database operations here
pass -
Verify database file integrity:
# Check database file permissions and disk space
ls -la memory.db
df -h . -
Enable WAL mode for better concurrency:
saver = AsyncSqliteSaver(
db_path="memory.db",
enable_wal_mode=True # Default is True
)
Issue: "Database file not found" or "Permission denied"
Symptoms:
- FileNotFoundError when accessing database
- Permission denied errors
Solutions:
-
Check file permissions:
chmod 644 memory.db
chown $USER:$USER memory.db -
Ensure directory exists:
from pathlib import Path
db_path = Path("data/memory.db")
db_path.parent.mkdir(parents=True, exist_ok=True) -
Use absolute paths:
import os
db_path = os.path.abspath("memory.db")
Memory and Performance Issues
Issue: High memory usage or slow performance
Symptoms:
- Memory usage keeps growing
- Operations become slower over time
- System becomes unresponsive
Solutions:
-
Run memory cleanup:
from recoagent.memory import CleanupPolicy, CleanupStrategy
policy = CleanupPolicy(
strategy=CleanupStrategy.AGE_BASED,
max_age_days=30,
dry_run=False
)
result = await memory_manager.optimizer.cleanup_memory(policy)
print(f"Cleaned up {result.threads_deleted} threads") -
Optimize database:
from recoagent.memory import OptimizationLevel
await memory_manager.optimizer.optimize_database(
OptimizationLevel.AGGRESSIVE
) -
Monitor memory usage:
stats = await memory_manager.optimizer.get_memory_statistics()
print(f"Database size: {stats['database_size_mb']:.2f} MB")
print(f"Total threads: {stats['total_threads']}") -
Set up automatic cleanup:
# Configure automatic cleanup in ThreadManager
thread_manager = ThreadManager(
saver=saver,
cleanup_interval_minutes=60 # Run cleanup every hour
)
Issue: "Out of memory" errors
Symptoms:
- MemoryError exceptions
- System running out of RAM
- Process killed by OOM killer
Solutions:
-
Reduce batch sizes:
# Process data in smaller batches
batch_size = 100
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
await process_batch(batch) -
Use streaming for large datasets:
async def process_large_dataset(data_stream):
async for item in data_stream:
await process_item(item)
# Process one item at a time -
Implement pagination:
limit = 100
offset = 0
while True:
threads = await memory_manager.thread_manager.list_user_threads(
user_id="user123",
limit=limit,
offset=offset
)
if not threads:
break
# Process threads
for thread in threads:
await process_thread(thread)
offset += limit
Session and Thread Management Issues
Issue: "Session not found" or "Thread does not belong to session"
Symptoms:
- SessionNotFoundError
- Thread validation failures
- Inconsistent state between sessions and threads
Solutions:
-
Verify session exists before creating threads:
# Always check session exists
session_info = await thread_manager.get_session_info(session_id)
if not session_info:
session_id = await thread_manager.create_session(user_id)
thread_id = await thread_manager.create_thread(user_id, session_id) -
Use session locks for concurrent access:
async with thread_manager.session_lock(session_id):
# All operations on this session are now atomic
await thread_manager.create_thread(user_id, session_id)
await thread_manager.update_thread_state(thread_id, state) -
Handle session expiration:
# Check if session is expired
session_info = await thread_manager.get_session_info(session_id)
if session_info and session_info.is_expired():
# Create new session
session_id = await thread_manager.create_session(user_id)
Issue: Thread limit exceeded
Symptoms:
- "Maximum threads per session exceeded" error
- Cannot create new conversation threads
Solutions:
-
Increase thread limit:
thread_manager = ThreadManager(
saver=saver,
max_threads_per_session=50 # Increase from default 10
) -
Archive old threads:
# Archive completed threads
threads = await thread_manager.list_user_threads(user_id)
for thread in threads:
if thread['status'] == 'completed':
await thread_manager.archive_session(thread['session_id']) -
Clean up unused threads:
# Delete old inactive threads
await thread_manager.cleanup_old_conversations(days_old=7)
Data Integrity Issues
Issue: Corrupted conversation state or missing messages
Symptoms:
- Incomplete conversation history
- Missing messages in threads
- Inconsistent state data
Solutions:
-
Validate conversation state:
from recoagent.memory.state.conversation_state import validate_conversation_state
state = await thread_manager.get_thread_state(thread_id)
if not validate_conversation_state(state):
print("Invalid conversation state detected")
# Handle corrupted state -
Implement data recovery:
async def recover_conversation(thread_id):
try:
state = await thread_manager.get_thread_state(thread_id)
if not state:
# Try to recover from backup or recreate
return await recreate_conversation(thread_id)
except Exception as e:
print(f"Recovery failed: {e}")
return None -
Use database transactions:
# All operations are automatically wrapped in transactions
# But you can also use explicit transactions for complex operations
async with memory_manager.saver.get_connection() as conn:
await conn.execute("BEGIN TRANSACTION")
try:
# Multiple operations
await conn.execute("INSERT ...")
await conn.execute("UPDATE ...")
await conn.commit()
except Exception:
await conn.rollback()
raise
Search and Query Issues
Issue: Slow search operations or empty results
Symptoms:
- Search queries take too long
- No results returned for valid queries
- Inconsistent search results
Solutions:
-
Optimize search queries:
# Use appropriate filters to limit results
filters = ConversationFilter(
user_id="specific_user",
date_from=datetime.now() - timedelta(days=30)
)
results, total = await history_api.search_conversations(
query="search term",
filters=filters,
limit=100 # Limit results
) -
Check database indexes:
# Ensure indexes are created (done automatically)
# But you can verify they exist
async with saver.get_connection() as conn:
cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='index'")
indexes = await cursor.fetchall()
print(f"Available indexes: {indexes}") -
Use different search types:
from recoagent.memory.session.history_api import SearchType
# Try different search strategies
for search_type in [SearchType.EXACT, SearchType.FUZZY, SearchType.SEMANTIC]:
results, total = await history_api.search_conversations(
query="search term",
search_type=search_type
)
Initialization and Configuration Issues
Issue: "MemoryManager not initialized" or initialization failures
Symptoms:
- Initialization errors on startup
- Components not properly initialized
- Connection pool issues
Solutions:
-
Proper initialization sequence:
async def setup_memory_system():
memory_manager = MemoryManager(db_path="memory.db")
try:
await memory_manager.initialize()
print("Memory system initialized successfully")
except Exception as e:
print(f"Initialization failed: {e}")
raise
return memory_manager -
Use context managers:
async with MemoryManager(db_path="memory.db") as memory_manager:
# System is automatically initialized and cleaned up
await memory_manager.thread_manager.create_session(user_id) -
Check system requirements:
import sys
import sqlite3
# Check Python version
if sys.version_info < (3, 8):
raise RuntimeError("Python 3.8+ required")
# Check SQLite version
print(f"SQLite version: {sqlite3.sqlite_version}")
Debugging and Monitoring
Enable Debug Logging
import logging
# Enable debug logging for memory system
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('recoagent.memory')
logger.setLevel(logging.DEBUG)
Monitor System Health
async def health_check(memory_manager):
"""Comprehensive health check for memory system."""
health_status = {
"database_accessible": False,
"connection_pool_healthy": False,
"threads_accessible": False,
"optimization_working": False
}
try:
# Test database access
stats = await memory_manager.optimizer.get_memory_statistics()
health_status["database_accessible"] = True
# Test connection pool
async with memory_manager.saver.get_connection() as conn:
cursor = await conn.execute("SELECT 1")
result = await cursor.fetchone()
health_status["connection_pool_healthy"] = result[0] == 1
# Test thread operations
threads = await memory_manager.thread_manager.list_user_threads("test_user", limit=1)
health_status["threads_accessible"] = True
# Test optimization
await memory_manager.optimizer.optimize_database()
health_status["optimization_working"] = True
except Exception as e:
print(f"Health check failed: {e}")
return health_status
Performance Monitoring
import time
from functools import wraps
def monitor_performance(func):
"""Decorator to monitor function performance."""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
print(f"{func.__name__} took {duration:.3f} seconds")
return wrapper
# Use the decorator
@monitor_performance
async def slow_operation():
# Your operation here
pass
Best Practices
1. Resource Management
- Always use context managers or proper cleanup
- Monitor memory usage and database size
- Implement automatic cleanup policies
2. Error Handling
- Use try-catch blocks around database operations
- Implement retry logic for transient failures
- Log errors with sufficient context
3. Performance Optimization
- Use appropriate batch sizes for bulk operations
- Implement pagination for large datasets
- Monitor and optimize database regularly
4. Data Integrity
- Validate conversation states before saving
- Use transactions for multi-step operations
- Implement backup and recovery procedures
5. Monitoring and Alerting
- Set up health checks and monitoring
- Monitor key metrics (response times, error rates, memory usage)
- Implement alerting for critical issues
Getting Help
If you're still experiencing issues after following this guide:
- Check the logs for detailed error messages
- Run the health check to identify specific problems
- Use the performance benchmarks to identify bottlenecks
- Contact support with:
- Error messages and stack traces
- System configuration details
- Steps to reproduce the issue
- Log files (if applicable)