Skip to main content

Memory Persistence API Reference

This document provides a comprehensive API reference for RecoAgent's LangGraph memory persistence system.

Core Classes

MemoryManager

High-level manager that coordinates all memory persistence components.

class MemoryManager:
def __init__(
self,
db_path: str = "memory.db",
max_connections: int = 10,
session_timeout_minutes: int = 30,
max_threads_per_session: int = 10,
cleanup_interval_minutes: int = 60
)

Parameters:

  • db_path (str): Path to SQLite database file
  • max_connections (int): Maximum number of concurrent database connections
  • session_timeout_minutes (int): Minutes before a session expires
  • max_threads_per_session (int): Maximum threads per session
  • cleanup_interval_minutes (int): Minutes between cleanup operations

Methods:

async initialize() -> None

Initialize all memory management components.

async close() -> None

Close all memory management components and cleanup resources.


ConversationState

Rich conversation state with LangGraph compatibility.

@dataclass
class ConversationState:
messages: Annotated[List[Message], "Chat messages"]
context: Optional[ConversationContext]
query: str
retrieved_docs: List[Dict[str, Any]]
reranked_docs: List[Dict[str, Any]]
plan: Optional[str]
action: Optional[str]
answer: Optional[str]
error: Optional[str]
metadata: Dict[str, Any]
step_count: int
max_steps: int
cost_tracker: Dict[str, float]
latency_tracker: Dict[str, float]
memory_summary: Optional[str]
relevant_history: List[str]
conversation_embedding: Optional[List[float]]

Methods:

add_message(message_type: MessageType, content: str, **kwargs) -> Message

Add a message to the conversation.

Parameters:

  • message_type (MessageType): Type of message to add
  • content (str): Message content
  • **kwargs: Additional message metadata

Returns: Message object

get_recent_messages(count: int = 10) -> List[Message]

Get the most recent messages.

Parameters:

  • count (int): Number of messages to retrieve

Returns: List of recent messages

get_messages_by_type(message_type: MessageType) -> List[Message]

Get all messages of a specific type.

Parameters:

  • message_type (MessageType): Type of messages to filter

Returns: List of messages of specified type

update_context(**kwargs) -> None

Update conversation context.

Parameters:

  • **kwargs: Context fields to update

to_dict() -> Dict[str, Any]

Convert state to dictionary for serialization.

Returns: Dictionary representation of state

to_langgraph_state() -> Dict[str, Any]

Convert to LangGraph-compatible state format.

Returns: LangGraph-compatible state dictionary


ThreadManager

Manages conversation threads and sessions.

class ThreadManager:
def __init__(
self,
saver: AsyncSqliteSaver,
session_timeout_minutes: int = 30,
max_threads_per_session: int = 10,
cleanup_interval_minutes: int = 60
)

Methods:

async create_session(user_id: str, session_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str

Create a new session for a user.

Parameters:

  • user_id (str): User identifier
  • session_id (Optional[str]): Custom session ID (auto-generated if None)
  • metadata (Optional[Dict[str, Any]]): Session metadata

Returns: Session ID

async create_thread(user_id: str, session_id: str, thread_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str

Create a new conversation thread.

Parameters:

  • user_id (str): User identifier
  • session_id (str): Session identifier
  • thread_id (Optional[str]): Custom thread ID (auto-generated if None)
  • metadata (Optional[Dict[str, Any]]): Thread metadata

Returns: Thread ID

async get_thread_state(thread_id: str, session_id: Optional[str] = None) -> Optional[ConversationState]

Get the current state of a conversation thread.

Parameters:

  • thread_id (str): Thread identifier
  • session_id (Optional[str]): Session identifier for validation

Returns: Conversation state or None if not found

async update_thread_state(thread_id: str, state: ConversationState) -> None

Update the state of a conversation thread.

Parameters:

  • thread_id (str): Thread identifier
  • state (ConversationState): Updated conversation state

async delete_thread(thread_id: str, session_id: Optional[str] = None) -> bool

Delete a conversation thread.

Parameters:

  • thread_id (str): Thread identifier
  • session_id (Optional[str]): Session identifier for validation

Returns: True if deleted, False if not found

async list_user_threads(user_id: Optional[str] = None, session_id: Optional[str] = None, status: Optional[ConversationStatus] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]

List conversation threads for a user.

Parameters:

  • user_id (Optional[str]): Filter by user ID
  • session_id (Optional[str]): Filter by session ID
  • status (Optional[ConversationStatus]): Filter by status
  • limit (int): Maximum number of results
  • offset (int): Number of results to skip

Returns: List of thread metadata

async get_conversation_history(thread_id: str, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]

Get conversation history for a thread.

Parameters:

  • thread_id (str): Thread identifier
  • limit (int): Maximum number of messages
  • offset (int): Number of messages to skip

Returns: List of message dictionaries

async archive_session(session_id: str) -> bool

Archive a session and all its threads.

Parameters:

  • session_id (str): Session identifier

Returns: True if archived, False if not found

async get_session_info(session_id: str) -> Optional[SessionInfo]

Get information about a session.

Parameters:

  • session_id (str): Session identifier

Returns: Session info or None if not found

async get_statistics() -> Dict[str, Any]

Get statistics about active sessions and threads.

Returns: Dictionary with statistics


AsyncSqliteSaver

High-performance SQLite-based persistence layer.

class AsyncSqliteSaver:
def __init__(
self,
db_path: Union[str, Path],
max_connections: int = 10,
connection_timeout: float = 30.0,
enable_wal_mode: bool = True,
vacuum_interval_hours: int = 24
)

Methods:

async save_conversation_state(thread_id: str, state: ConversationState, checkpoint_id: Optional[str] = None) -> str

Save a conversation state to the database.

Parameters:

  • thread_id (str): Unique thread identifier
  • state (ConversationState): Conversation state to save
  • checkpoint_id (Optional[str]): Optional checkpoint identifier

Returns: Checkpoint ID

async load_conversation_state(thread_id: str, checkpoint_id: Optional[str] = None) -> Optional[ConversationState]

Load a conversation state from the database.

Parameters:

  • thread_id (str): Unique thread identifier
  • checkpoint_id (Optional[str]): Optional checkpoint identifier

Returns: Conversation state or None if not found

async list_threads(user_id: Optional[str] = None, session_id: Optional[str] = None, status: Optional[ConversationStatus] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]

List conversation threads with optional filtering.

Parameters:

  • user_id (Optional[str]): Filter by user ID
  • session_id (Optional[str]): Filter by session ID
  • status (Optional[ConversationStatus]): Filter by status
  • limit (int): Maximum number of results
  • offset (int): Number of results to skip

Returns: List of thread metadata

async delete_thread(thread_id: str) -> bool

Delete a conversation thread and all associated data.

Parameters:

  • thread_id (str): Thread identifier to delete

Returns: True if deleted, False if not found

async cleanup_old_conversations(days_old: int = 30) -> int

Clean up old conversation threads.

Parameters:

  • days_old (int): Delete conversations older than this many days

Returns: Number of threads deleted

async vacuum_database() -> None

Perform database vacuum operation.

async get_conversation_history(thread_id: str, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]

Get conversation history for a thread.

Parameters:

  • thread_id (str): Thread identifier
  • limit (int): Maximum number of messages
  • offset (int): Number of messages to skip

Returns: List of message dictionaries


ConversationHistoryAPI

High-level API for conversation history retrieval and management.

class ConversationHistoryAPI:
def __init__(self, thread_manager: ThreadManager)

Methods:

async get_conversation_summary(thread_id: str) -> Optional[ConversationSummary]

Get a summary of a conversation thread.

Parameters:

  • thread_id (str): Thread identifier

Returns: Conversation summary or None if not found

async search_conversations(query: str, search_type: SearchType = SearchType.SEMANTIC, filters: Optional[ConversationFilter] = None, limit: int = 50, offset: int = 0, sort_by: str = "last_accessed", sort_order: SortOrder = SortOrder.DESC) -> Tuple[List[ConversationSummary], int]

Search conversations with advanced filtering.

Parameters:

  • query (str): Search query string
  • search_type (SearchType): Type of search to perform
  • filters (Optional[ConversationFilter]): Optional filter criteria
  • limit (int): Maximum number of results
  • offset (int): Number of results to skip
  • sort_by (str): Field to sort by
  • sort_order (SortOrder): Sort order

Returns: Tuple of (results, total_count)

async get_user_conversation_history(user_id: str, days_back: int = 30, limit: int = 100, offset: int = 0) -> Tuple[List[ConversationSummary], int]

Get conversation history for a user.

Parameters:

  • user_id (str): User identifier
  • days_back (int): Number of days to look back
  • limit (int): Maximum number of results
  • offset (int): Number of results to skip

Returns: Tuple of (summaries, total_count)

async get_conversation_messages(thread_id: str, message_types: Optional[List[MessageType]] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]

Get messages from a conversation with filtering.

Parameters:

  • thread_id (str): Thread identifier
  • message_types (Optional[List[MessageType]]): Optional list of message types to filter
  • date_from (Optional[datetime]): Optional start date filter
  • date_to (Optional[datetime]): Optional end date filter
  • limit (int): Maximum number of messages
  • offset (int): Number of messages to skip

Returns: List of message dictionaries

async get_conversation_analytics(user_id: Optional[str] = None, session_id: Optional[str] = None, days_back: int = 30) -> Dict[str, Any]

Get analytics for conversations.

Parameters:

  • user_id (Optional[str]): Optional user filter
  • session_id (Optional[str]): Optional session filter
  • days_back (int): Number of days to analyze

Returns: Analytics dictionary

async export_conversation_data(thread_id: str, format: str = "json") -> Union[Dict[str, Any], str]

Export conversation data in various formats.

Parameters:

  • thread_id (str): Thread identifier
  • format (str): Export format ("json", "csv", "markdown")

Returns: Exported data


MemoryOptimizer

Memory optimization and cleanup utilities.

class MemoryOptimizer:
def __init__(self, thread_manager: ThreadManager)

Methods:

async cleanup_memory(policy: CleanupPolicy, progress_callback: Optional[Callable[[int, int], None]] = None) -> OptimizationResult

Perform memory cleanup based on the given policy.

Parameters:

  • policy (CleanupPolicy): Cleanup policy to apply
  • progress_callback (Optional[Callable]): Optional callback for progress updates

Returns: Optimization result

async optimize_database(level: OptimizationLevel = OptimizationLevel.MODERATE) -> OptimizationResult

Perform database optimization.

Parameters:

  • level (OptimizationLevel): Optimization level

Returns: Optimization result

async get_memory_statistics() -> Dict[str, Any]

Get comprehensive memory usage statistics.

Returns: Dictionary with statistics


Data Types

Message

Individual message in a conversation.

@dataclass
class Message:
id: str
type: MessageType
content: str
timestamp: datetime
metadata: Dict[str, Any]
tool_calls: Optional[List[Dict[str, Any]]]
tool_results: Optional[List[Dict[str, Any]]]

ConversationContext

Context information for a conversation.

@dataclass
class ConversationContext:
user_id: str
session_id: str
thread_id: str
created_at: datetime
last_accessed: datetime
status: ConversationStatus
metadata: Dict[str, Any]

ConversationSummary

Summary of a conversation thread.

@dataclass
class ConversationSummary:
thread_id: str
user_id: str
session_id: str
status: ConversationStatus
message_count: int
created_at: datetime
last_accessed: datetime
last_message_preview: str
topics: List[str]
has_tool_calls: bool
has_errors: bool
metadata: Dict[str, Any]

ConversationFilter

Filter criteria for conversation queries.

@dataclass
class ConversationFilter:
user_id: Optional[str] = None
session_id: Optional[str] = None
thread_id: Optional[str] = None
status: Optional[ConversationStatus] = None
message_type: Optional[MessageType] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
has_tool_calls: Optional[bool] = None
has_errors: Optional[bool] = None
content_contains: Optional[str] = None
metadata_filter: Optional[Dict[str, Any]] = None

CleanupPolicy

Policy for memory cleanup operations.

@dataclass
class CleanupPolicy:
strategy: CleanupStrategy
max_age_days: int = 30
max_size_mb: int = 1000
min_access_frequency: int = 1
compression_threshold: float = 0.5
preserve_important: bool = True
dry_run: bool = False

OptimizationResult

Result of an optimization operation.

@dataclass
class OptimizationResult:
threads_processed: int
threads_deleted: int
space_freed_mb: float
compression_ratio: Optional[float] = None
execution_time_seconds: float = 0.0
errors: List[str] = None

Enums

MessageType

Types of messages in conversation.

class MessageType(str, Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"
ERROR = "error"

ConversationStatus

Status of a conversation thread.

class ConversationStatus(str, Enum):
ACTIVE = "active"
PAUSED = "paused"
COMPLETED = "completed"
ARCHIVED = "archived"
ERROR = "error"

SearchType

Types of search operations.

class SearchType(str, Enum):
EXACT = "exact"
SEMANTIC = "semantic"
FUZZY = "fuzzy"

SortOrder

Sort order for results.

class SortOrder(str, Enum):
ASC = "asc"
DESC = "desc"

CleanupStrategy

Strategies for memory cleanup.

class CleanupStrategy(str, Enum):
AGE_BASED = "age_based"
SIZE_BASED = "size_based"
FREQUENCY_BASED = "frequency_based"
IMPORTANCE_BASED = "importance_based"
COMPRESSION_BASED = "compression_based"

OptimizationLevel

Levels of optimization.

class OptimizationLevel(str, Enum):
LIGHT = "light"
MODERATE = "moderate"
AGGRESSIVE = "aggressive"

Utility Functions

create_conversation_state(user_id: str, session_id: str, thread_id: str, **kwargs) -> ConversationState

Create a new conversation state with proper initialization.

Parameters:

  • user_id (str): User identifier
  • session_id (str): Session identifier
  • thread_id (str): Thread identifier
  • **kwargs: Additional initialization parameters

Returns: Initialized conversation state

validate_conversation_state(state: ConversationState) -> bool

Validate a conversation state.

Parameters:

  • state (ConversationState): State to validate

Returns: True if valid, False otherwise

Error Handling

The memory persistence system raises the following exceptions:

MemoryError

Raised when memory operations fail due to resource constraints.

DatabaseError

Raised when database operations fail.

ValidationError

Raised when conversation state validation fails.

SessionNotFoundError

Raised when attempting to access a non-existent session.

ThreadNotFoundError

Raised when attempting to access a non-existent thread.

Usage Examples

Basic Usage

from recoagent.memory import MemoryManager

async def basic_example():
# Initialize memory manager
memory_manager = MemoryManager(db_path="memory.db")
await memory_manager.initialize()

try:
# Create session and thread
session_id = await memory_manager.thread_manager.create_session("user123")
thread_id = await memory_manager.thread_manager.create_thread("user123", session_id)

# Get and update state
state = await memory_manager.thread_manager.get_thread_state(thread_id)
state.add_message(MessageType.USER, "Hello!")
await memory_manager.thread_manager.update_thread_state(thread_id, state)

finally:
await memory_manager.close()
async def advanced_example():
memory_manager = MemoryManager(db_path="memory.db")
await memory_manager.initialize()

try:
# Search conversations
summaries, total = await memory_manager.history_api.search_conversations(
query="machine learning",
filters=ConversationFilter(user_id="user123"),
limit=10
)

# Get analytics
analytics = await memory_manager.history_api.get_conversation_analytics(
user_id="user123"
)

finally:
await memory_manager.close()

Memory Optimization

async def optimization_example():
memory_manager = MemoryManager(db_path="memory.db")
await memory_manager.initialize()

try:
# Cleanup old conversations
policy = CleanupPolicy(
strategy=CleanupStrategy.AGE_BASED,
max_age_days=30
)

result = await memory_manager.optimizer.cleanup_memory(policy)
print(f"Cleaned up {result.threads_deleted} threads")

# Optimize database
await memory_manager.optimizer.optimize_database(
OptimizationLevel.AGGRESSIVE
)

finally:
await memory_manager.close()

This API reference provides comprehensive documentation for all public interfaces in the memory persistence system. For more detailed examples and usage patterns, see the example implementations and troubleshooting guide.