Skip to main content

Memory Persistence Troubleshooting Guide

This guide helps you diagnose and resolve common issues with RecoAgent's LangGraph memory persistence system.

Common Issues and Solutions

Database Connection Issues

Issue: "Database is locked" or "Database connection timeout"

Symptoms:

  • Operations fail with database lock errors
  • Slow response times
  • Connection timeout exceptions

Causes:

  • Too many concurrent connections
  • Long-running transactions
  • Database file corruption
  • Insufficient connection pool size

Solutions:

  1. Increase connection pool size:

    memory_manager = MemoryManager(
    db_path="memory.db",
    max_connections=20, # Increase from default 10
    connection_timeout=60.0 # Increase timeout
    )
  2. Check for long-running operations:

    # Use connection context managers
    async with memory_manager.saver.get_connection() as conn:
    # Your database operations here
    pass
  3. Verify database file integrity:

    # Check database file permissions and disk space
    ls -la memory.db
    df -h .
  4. Enable WAL mode for better concurrency:

    saver = AsyncSqliteSaver(
    db_path="memory.db",
    enable_wal_mode=True # Default is True
    )

Issue: "Database file not found" or "Permission denied"

Symptoms:

  • FileNotFoundError when accessing database
  • Permission denied errors

Solutions:

  1. Check file permissions:

    chmod 644 memory.db
    chown $USER:$USER memory.db
  2. Ensure directory exists:

    from pathlib import Path

    db_path = Path("data/memory.db")
    db_path.parent.mkdir(parents=True, exist_ok=True)
  3. Use absolute paths:

    import os
    db_path = os.path.abspath("memory.db")

Memory and Performance Issues

Issue: High memory usage or slow performance

Symptoms:

  • Memory usage keeps growing
  • Operations become slower over time
  • System becomes unresponsive

Solutions:

  1. Run memory cleanup:

    from recoagent.memory import CleanupPolicy, CleanupStrategy

    policy = CleanupPolicy(
    strategy=CleanupStrategy.AGE_BASED,
    max_age_days=30,
    dry_run=False
    )

    result = await memory_manager.optimizer.cleanup_memory(policy)
    print(f"Cleaned up {result.threads_deleted} threads")
  2. Optimize database:

    from recoagent.memory import OptimizationLevel

    await memory_manager.optimizer.optimize_database(
    OptimizationLevel.AGGRESSIVE
    )
  3. Monitor memory usage:

    stats = await memory_manager.optimizer.get_memory_statistics()
    print(f"Database size: {stats['database_size_mb']:.2f} MB")
    print(f"Total threads: {stats['total_threads']}")
  4. Set up automatic cleanup:

    # Configure automatic cleanup in ThreadManager
    thread_manager = ThreadManager(
    saver=saver,
    cleanup_interval_minutes=60 # Run cleanup every hour
    )

Issue: "Out of memory" errors

Symptoms:

  • MemoryError exceptions
  • System running out of RAM
  • Process killed by OOM killer

Solutions:

  1. Reduce batch sizes:

    # Process data in smaller batches
    batch_size = 100
    for i in range(0, len(data), batch_size):
    batch = data[i:i + batch_size]
    await process_batch(batch)
  2. Use streaming for large datasets:

    async def process_large_dataset(data_stream):
    async for item in data_stream:
    await process_item(item)
    # Process one item at a time
  3. Implement pagination:

    limit = 100
    offset = 0

    while True:
    threads = await memory_manager.thread_manager.list_user_threads(
    user_id="user123",
    limit=limit,
    offset=offset
    )

    if not threads:
    break

    # Process threads
    for thread in threads:
    await process_thread(thread)

    offset += limit

Session and Thread Management Issues

Issue: "Session not found" or "Thread does not belong to session"

Symptoms:

  • SessionNotFoundError
  • Thread validation failures
  • Inconsistent state between sessions and threads

Solutions:

  1. Verify session exists before creating threads:

    # Always check session exists
    session_info = await thread_manager.get_session_info(session_id)
    if not session_info:
    session_id = await thread_manager.create_session(user_id)

    thread_id = await thread_manager.create_thread(user_id, session_id)
  2. Use session locks for concurrent access:

    async with thread_manager.session_lock(session_id):
    # All operations on this session are now atomic
    await thread_manager.create_thread(user_id, session_id)
    await thread_manager.update_thread_state(thread_id, state)
  3. Handle session expiration:

    # Check if session is expired
    session_info = await thread_manager.get_session_info(session_id)
    if session_info and session_info.is_expired():
    # Create new session
    session_id = await thread_manager.create_session(user_id)

Issue: Thread limit exceeded

Symptoms:

  • "Maximum threads per session exceeded" error
  • Cannot create new conversation threads

Solutions:

  1. Increase thread limit:

    thread_manager = ThreadManager(
    saver=saver,
    max_threads_per_session=50 # Increase from default 10
    )
  2. Archive old threads:

    # Archive completed threads
    threads = await thread_manager.list_user_threads(user_id)
    for thread in threads:
    if thread['status'] == 'completed':
    await thread_manager.archive_session(thread['session_id'])
  3. Clean up unused threads:

    # Delete old inactive threads
    await thread_manager.cleanup_old_conversations(days_old=7)

Data Integrity Issues

Issue: Corrupted conversation state or missing messages

Symptoms:

  • Incomplete conversation history
  • Missing messages in threads
  • Inconsistent state data

Solutions:

  1. Validate conversation state:

    from recoagent.memory.state.conversation_state import validate_conversation_state

    state = await thread_manager.get_thread_state(thread_id)
    if not validate_conversation_state(state):
    print("Invalid conversation state detected")
    # Handle corrupted state
  2. Implement data recovery:

    async def recover_conversation(thread_id):
    try:
    state = await thread_manager.get_thread_state(thread_id)
    if not state:
    # Try to recover from backup or recreate
    return await recreate_conversation(thread_id)
    except Exception as e:
    print(f"Recovery failed: {e}")
    return None
  3. Use database transactions:

    # All operations are automatically wrapped in transactions
    # But you can also use explicit transactions for complex operations
    async with memory_manager.saver.get_connection() as conn:
    await conn.execute("BEGIN TRANSACTION")
    try:
    # Multiple operations
    await conn.execute("INSERT ...")
    await conn.execute("UPDATE ...")
    await conn.commit()
    except Exception:
    await conn.rollback()
    raise

Search and Query Issues

Issue: Slow search operations or empty results

Symptoms:

  • Search queries take too long
  • No results returned for valid queries
  • Inconsistent search results

Solutions:

  1. Optimize search queries:

    # Use appropriate filters to limit results
    filters = ConversationFilter(
    user_id="specific_user",
    date_from=datetime.now() - timedelta(days=30)
    )

    results, total = await history_api.search_conversations(
    query="search term",
    filters=filters,
    limit=100 # Limit results
    )
  2. Check database indexes:

    # Ensure indexes are created (done automatically)
    # But you can verify they exist
    async with saver.get_connection() as conn:
    cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='index'")
    indexes = await cursor.fetchall()
    print(f"Available indexes: {indexes}")
  3. Use different search types:

    from recoagent.memory.session.history_api import SearchType

    # Try different search strategies
    for search_type in [SearchType.EXACT, SearchType.FUZZY, SearchType.SEMANTIC]:
    results, total = await history_api.search_conversations(
    query="search term",
    search_type=search_type
    )

Initialization and Configuration Issues

Issue: "MemoryManager not initialized" or initialization failures

Symptoms:

  • Initialization errors on startup
  • Components not properly initialized
  • Connection pool issues

Solutions:

  1. Proper initialization sequence:

    async def setup_memory_system():
    memory_manager = MemoryManager(db_path="memory.db")

    try:
    await memory_manager.initialize()
    print("Memory system initialized successfully")
    except Exception as e:
    print(f"Initialization failed: {e}")
    raise

    return memory_manager
  2. Use context managers:

    async with MemoryManager(db_path="memory.db") as memory_manager:
    # System is automatically initialized and cleaned up
    await memory_manager.thread_manager.create_session(user_id)
  3. Check system requirements:

    import sys
    import sqlite3

    # Check Python version
    if sys.version_info < (3, 8):
    raise RuntimeError("Python 3.8+ required")

    # Check SQLite version
    print(f"SQLite version: {sqlite3.sqlite_version}")

Debugging and Monitoring

Enable Debug Logging

import logging

# Enable debug logging for memory system
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('recoagent.memory')
logger.setLevel(logging.DEBUG)

Monitor System Health

async def health_check(memory_manager):
"""Comprehensive health check for memory system."""
health_status = {
"database_accessible": False,
"connection_pool_healthy": False,
"threads_accessible": False,
"optimization_working": False
}

try:
# Test database access
stats = await memory_manager.optimizer.get_memory_statistics()
health_status["database_accessible"] = True

# Test connection pool
async with memory_manager.saver.get_connection() as conn:
cursor = await conn.execute("SELECT 1")
result = await cursor.fetchone()
health_status["connection_pool_healthy"] = result[0] == 1

# Test thread operations
threads = await memory_manager.thread_manager.list_user_threads("test_user", limit=1)
health_status["threads_accessible"] = True

# Test optimization
await memory_manager.optimizer.optimize_database()
health_status["optimization_working"] = True

except Exception as e:
print(f"Health check failed: {e}")

return health_status

Performance Monitoring

import time
from functools import wraps

def monitor_performance(func):
"""Decorator to monitor function performance."""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
print(f"{func.__name__} took {duration:.3f} seconds")

return wrapper

# Use the decorator
@monitor_performance
async def slow_operation():
# Your operation here
pass

Best Practices

1. Resource Management

  • Always use context managers or proper cleanup
  • Monitor memory usage and database size
  • Implement automatic cleanup policies

2. Error Handling

  • Use try-catch blocks around database operations
  • Implement retry logic for transient failures
  • Log errors with sufficient context

3. Performance Optimization

  • Use appropriate batch sizes for bulk operations
  • Implement pagination for large datasets
  • Monitor and optimize database regularly

4. Data Integrity

  • Validate conversation states before saving
  • Use transactions for multi-step operations
  • Implement backup and recovery procedures

5. Monitoring and Alerting

  • Set up health checks and monitoring
  • Monitor key metrics (response times, error rates, memory usage)
  • Implement alerting for critical issues

Getting Help

If you're still experiencing issues after following this guide:

  1. Check the logs for detailed error messages
  2. Run the health check to identify specific problems
  3. Use the performance benchmarks to identify bottlenecks
  4. Contact support with:
    • Error messages and stack traces
    • System configuration details
    • Steps to reproduce the issue
    • Log files (if applicable)

Additional Resources