Memory Persistence API Reference
This document provides a comprehensive API reference for RecoAgent's LangGraph memory persistence system.
Core Classes
MemoryManager
High-level manager that coordinates all memory persistence components.
class MemoryManager:
def __init__(
self,
db_path: str = "memory.db",
max_connections: int = 10,
session_timeout_minutes: int = 30,
max_threads_per_session: int = 10,
cleanup_interval_minutes: int = 60
)
Parameters:
db_path
(str): Path to SQLite database filemax_connections
(int): Maximum number of concurrent database connectionssession_timeout_minutes
(int): Minutes before a session expiresmax_threads_per_session
(int): Maximum threads per sessioncleanup_interval_minutes
(int): Minutes between cleanup operations
Methods:
async initialize() -> None
Initialize all memory management components.
async close() -> None
Close all memory management components and cleanup resources.
ConversationState
Rich conversation state with LangGraph compatibility.
@dataclass
class ConversationState:
messages: Annotated[List[Message], "Chat messages"]
context: Optional[ConversationContext]
query: str
retrieved_docs: List[Dict[str, Any]]
reranked_docs: List[Dict[str, Any]]
plan: Optional[str]
action: Optional[str]
answer: Optional[str]
error: Optional[str]
metadata: Dict[str, Any]
step_count: int
max_steps: int
cost_tracker: Dict[str, float]
latency_tracker: Dict[str, float]
memory_summary: Optional[str]
relevant_history: List[str]
conversation_embedding: Optional[List[float]]
Methods:
add_message(message_type: MessageType, content: str, **kwargs) -> Message
Add a message to the conversation.
Parameters:
message_type
(MessageType): Type of message to addcontent
(str): Message content**kwargs
: Additional message metadata
Returns: Message object
get_recent_messages(count: int = 10) -> List[Message]
Get the most recent messages.
Parameters:
count
(int): Number of messages to retrieve
Returns: List of recent messages
get_messages_by_type(message_type: MessageType) -> List[Message]
Get all messages of a specific type.
Parameters:
message_type
(MessageType): Type of messages to filter
Returns: List of messages of specified type
update_context(**kwargs) -> None
Update conversation context.
Parameters:
**kwargs
: Context fields to update
to_dict() -> Dict[str, Any]
Convert state to dictionary for serialization.
Returns: Dictionary representation of state
to_langgraph_state() -> Dict[str, Any]
Convert to LangGraph-compatible state format.
Returns: LangGraph-compatible state dictionary
ThreadManager
Manages conversation threads and sessions.
class ThreadManager:
def __init__(
self,
saver: AsyncSqliteSaver,
session_timeout_minutes: int = 30,
max_threads_per_session: int = 10,
cleanup_interval_minutes: int = 60
)
Methods:
async create_session(user_id: str, session_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str
Create a new session for a user.
Parameters:
user_id
(str): User identifiersession_id
(Optional[str]): Custom session ID (auto-generated if None)metadata
(Optional[Dict[str, Any]]): Session metadata
Returns: Session ID
async create_thread(user_id: str, session_id: str, thread_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str
Create a new conversation thread.
Parameters:
user_id
(str): User identifiersession_id
(str): Session identifierthread_id
(Optional[str]): Custom thread ID (auto-generated if None)metadata
(Optional[Dict[str, Any]]): Thread metadata
Returns: Thread ID
async get_thread_state(thread_id: str, session_id: Optional[str] = None) -> Optional[ConversationState]
Get the current state of a conversation thread.
Parameters:
thread_id
(str): Thread identifiersession_id
(Optional[str]): Session identifier for validation
Returns: Conversation state or None if not found
async update_thread_state(thread_id: str, state: ConversationState) -> None
Update the state of a conversation thread.
Parameters:
thread_id
(str): Thread identifierstate
(ConversationState): Updated conversation state
async delete_thread(thread_id: str, session_id: Optional[str] = None) -> bool
Delete a conversation thread.
Parameters:
thread_id
(str): Thread identifiersession_id
(Optional[str]): Session identifier for validation
Returns: True if deleted, False if not found
async list_user_threads(user_id: Optional[str] = None, session_id: Optional[str] = None, status: Optional[ConversationStatus] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]
List conversation threads for a user.
Parameters:
user_id
(Optional[str]): Filter by user IDsession_id
(Optional[str]): Filter by session IDstatus
(Optional[ConversationStatus]): Filter by statuslimit
(int): Maximum number of resultsoffset
(int): Number of results to skip
Returns: List of thread metadata
async get_conversation_history(thread_id: str, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]
Get conversation history for a thread.
Parameters:
thread_id
(str): Thread identifierlimit
(int): Maximum number of messagesoffset
(int): Number of messages to skip
Returns: List of message dictionaries
async archive_session(session_id: str) -> bool
Archive a session and all its threads.
Parameters:
session_id
(str): Session identifier
Returns: True if archived, False if not found
async get_session_info(session_id: str) -> Optional[SessionInfo]
Get information about a session.
Parameters:
session_id
(str): Session identifier
Returns: Session info or None if not found
async get_statistics() -> Dict[str, Any]
Get statistics about active sessions and threads.
Returns: Dictionary with statistics
AsyncSqliteSaver
High-performance SQLite-based persistence layer.
class AsyncSqliteSaver:
def __init__(
self,
db_path: Union[str, Path],
max_connections: int = 10,
connection_timeout: float = 30.0,
enable_wal_mode: bool = True,
vacuum_interval_hours: int = 24
)
Methods:
async save_conversation_state(thread_id: str, state: ConversationState, checkpoint_id: Optional[str] = None) -> str
Save a conversation state to the database.
Parameters:
thread_id
(str): Unique thread identifierstate
(ConversationState): Conversation state to savecheckpoint_id
(Optional[str]): Optional checkpoint identifier
Returns: Checkpoint ID
async load_conversation_state(thread_id: str, checkpoint_id: Optional[str] = None) -> Optional[ConversationState]
Load a conversation state from the database.
Parameters:
thread_id
(str): Unique thread identifiercheckpoint_id
(Optional[str]): Optional checkpoint identifier
Returns: Conversation state or None if not found
async list_threads(user_id: Optional[str] = None, session_id: Optional[str] = None, status: Optional[ConversationStatus] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]
List conversation threads with optional filtering.
Parameters:
user_id
(Optional[str]): Filter by user IDsession_id
(Optional[str]): Filter by session IDstatus
(Optional[ConversationStatus]): Filter by statuslimit
(int): Maximum number of resultsoffset
(int): Number of results to skip
Returns: List of thread metadata
async delete_thread(thread_id: str) -> bool
Delete a conversation thread and all associated data.
Parameters:
thread_id
(str): Thread identifier to delete
Returns: True if deleted, False if not found
async cleanup_old_conversations(days_old: int = 30) -> int
Clean up old conversation threads.
Parameters:
days_old
(int): Delete conversations older than this many days
Returns: Number of threads deleted
async vacuum_database() -> None
Perform database vacuum operation.
async get_conversation_history(thread_id: str, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]
Get conversation history for a thread.
Parameters:
thread_id
(str): Thread identifierlimit
(int): Maximum number of messagesoffset
(int): Number of messages to skip
Returns: List of message dictionaries
ConversationHistoryAPI
High-level API for conversation history retrieval and management.
class ConversationHistoryAPI:
def __init__(self, thread_manager: ThreadManager)
Methods:
async get_conversation_summary(thread_id: str) -> Optional[ConversationSummary]
Get a summary of a conversation thread.
Parameters:
thread_id
(str): Thread identifier
Returns: Conversation summary or None if not found
async search_conversations(query: str, search_type: SearchType = SearchType.SEMANTIC, filters: Optional[ConversationFilter] = None, limit: int = 50, offset: int = 0, sort_by: str = "last_accessed", sort_order: SortOrder = SortOrder.DESC) -> Tuple[List[ConversationSummary], int]
Search conversations with advanced filtering.
Parameters:
query
(str): Search query stringsearch_type
(SearchType): Type of search to performfilters
(Optional[ConversationFilter]): Optional filter criterialimit
(int): Maximum number of resultsoffset
(int): Number of results to skipsort_by
(str): Field to sort bysort_order
(SortOrder): Sort order
Returns: Tuple of (results, total_count)
async get_user_conversation_history(user_id: str, days_back: int = 30, limit: int = 100, offset: int = 0) -> Tuple[List[ConversationSummary], int]
Get conversation history for a user.
Parameters:
user_id
(str): User identifierdays_back
(int): Number of days to look backlimit
(int): Maximum number of resultsoffset
(int): Number of results to skip
Returns: Tuple of (summaries, total_count)
async get_conversation_messages(thread_id: str, message_types: Optional[List[MessageType]] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]
Get messages from a conversation with filtering.
Parameters:
thread_id
(str): Thread identifiermessage_types
(Optional[List[MessageType]]): Optional list of message types to filterdate_from
(Optional[datetime]): Optional start date filterdate_to
(Optional[datetime]): Optional end date filterlimit
(int): Maximum number of messagesoffset
(int): Number of messages to skip
Returns: List of message dictionaries
async get_conversation_analytics(user_id: Optional[str] = None, session_id: Optional[str] = None, days_back: int = 30) -> Dict[str, Any]
Get analytics for conversations.
Parameters:
user_id
(Optional[str]): Optional user filtersession_id
(Optional[str]): Optional session filterdays_back
(int): Number of days to analyze
Returns: Analytics dictionary
async export_conversation_data(thread_id: str, format: str = "json") -> Union[Dict[str, Any], str]
Export conversation data in various formats.
Parameters:
thread_id
(str): Thread identifierformat
(str): Export format ("json", "csv", "markdown")
Returns: Exported data
MemoryOptimizer
Memory optimization and cleanup utilities.
class MemoryOptimizer:
def __init__(self, thread_manager: ThreadManager)
Methods:
async cleanup_memory(policy: CleanupPolicy, progress_callback: Optional[Callable[[int, int], None]] = None) -> OptimizationResult
Perform memory cleanup based on the given policy.
Parameters:
policy
(CleanupPolicy): Cleanup policy to applyprogress_callback
(Optional[Callable]): Optional callback for progress updates
Returns: Optimization result
async optimize_database(level: OptimizationLevel = OptimizationLevel.MODERATE) -> OptimizationResult
Perform database optimization.
Parameters:
level
(OptimizationLevel): Optimization level
Returns: Optimization result
async get_memory_statistics() -> Dict[str, Any]
Get comprehensive memory usage statistics.
Returns: Dictionary with statistics
Data Types
Message
Individual message in a conversation.
@dataclass
class Message:
id: str
type: MessageType
content: str
timestamp: datetime
metadata: Dict[str, Any]
tool_calls: Optional[List[Dict[str, Any]]]
tool_results: Optional[List[Dict[str, Any]]]
ConversationContext
Context information for a conversation.
@dataclass
class ConversationContext:
user_id: str
session_id: str
thread_id: str
created_at: datetime
last_accessed: datetime
status: ConversationStatus
metadata: Dict[str, Any]
ConversationSummary
Summary of a conversation thread.
@dataclass
class ConversationSummary:
thread_id: str
user_id: str
session_id: str
status: ConversationStatus
message_count: int
created_at: datetime
last_accessed: datetime
last_message_preview: str
topics: List[str]
has_tool_calls: bool
has_errors: bool
metadata: Dict[str, Any]
ConversationFilter
Filter criteria for conversation queries.
@dataclass
class ConversationFilter:
user_id: Optional[str] = None
session_id: Optional[str] = None
thread_id: Optional[str] = None
status: Optional[ConversationStatus] = None
message_type: Optional[MessageType] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
has_tool_calls: Optional[bool] = None
has_errors: Optional[bool] = None
content_contains: Optional[str] = None
metadata_filter: Optional[Dict[str, Any]] = None
CleanupPolicy
Policy for memory cleanup operations.
@dataclass
class CleanupPolicy:
strategy: CleanupStrategy
max_age_days: int = 30
max_size_mb: int = 1000
min_access_frequency: int = 1
compression_threshold: float = 0.5
preserve_important: bool = True
dry_run: bool = False
OptimizationResult
Result of an optimization operation.
@dataclass
class OptimizationResult:
threads_processed: int
threads_deleted: int
space_freed_mb: float
compression_ratio: Optional[float] = None
execution_time_seconds: float = 0.0
errors: List[str] = None
Enums
MessageType
Types of messages in conversation.
class MessageType(str, Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"
ERROR = "error"
ConversationStatus
Status of a conversation thread.
class ConversationStatus(str, Enum):
ACTIVE = "active"
PAUSED = "paused"
COMPLETED = "completed"
ARCHIVED = "archived"
ERROR = "error"
SearchType
Types of search operations.
class SearchType(str, Enum):
EXACT = "exact"
SEMANTIC = "semantic"
FUZZY = "fuzzy"
SortOrder
Sort order for results.
class SortOrder(str, Enum):
ASC = "asc"
DESC = "desc"
CleanupStrategy
Strategies for memory cleanup.
class CleanupStrategy(str, Enum):
AGE_BASED = "age_based"
SIZE_BASED = "size_based"
FREQUENCY_BASED = "frequency_based"
IMPORTANCE_BASED = "importance_based"
COMPRESSION_BASED = "compression_based"
OptimizationLevel
Levels of optimization.
class OptimizationLevel(str, Enum):
LIGHT = "light"
MODERATE = "moderate"
AGGRESSIVE = "aggressive"
Utility Functions
create_conversation_state(user_id: str, session_id: str, thread_id: str, **kwargs) -> ConversationState
Create a new conversation state with proper initialization.
Parameters:
user_id
(str): User identifiersession_id
(str): Session identifierthread_id
(str): Thread identifier**kwargs
: Additional initialization parameters
Returns: Initialized conversation state
validate_conversation_state(state: ConversationState) -> bool
Validate a conversation state.
Parameters:
state
(ConversationState): State to validate
Returns: True if valid, False otherwise
Error Handling
The memory persistence system raises the following exceptions:
MemoryError
Raised when memory operations fail due to resource constraints.
DatabaseError
Raised when database operations fail.
ValidationError
Raised when conversation state validation fails.
SessionNotFoundError
Raised when attempting to access a non-existent session.
ThreadNotFoundError
Raised when attempting to access a non-existent thread.
Usage Examples
Basic Usage
from recoagent.memory import MemoryManager
async def basic_example():
# Initialize memory manager
memory_manager = MemoryManager(db_path="memory.db")
await memory_manager.initialize()
try:
# Create session and thread
session_id = await memory_manager.thread_manager.create_session("user123")
thread_id = await memory_manager.thread_manager.create_thread("user123", session_id)
# Get and update state
state = await memory_manager.thread_manager.get_thread_state(thread_id)
state.add_message(MessageType.USER, "Hello!")
await memory_manager.thread_manager.update_thread_state(thread_id, state)
finally:
await memory_manager.close()
Advanced Usage with Search
async def advanced_example():
memory_manager = MemoryManager(db_path="memory.db")
await memory_manager.initialize()
try:
# Search conversations
summaries, total = await memory_manager.history_api.search_conversations(
query="machine learning",
filters=ConversationFilter(user_id="user123"),
limit=10
)
# Get analytics
analytics = await memory_manager.history_api.get_conversation_analytics(
user_id="user123"
)
finally:
await memory_manager.close()
Memory Optimization
async def optimization_example():
memory_manager = MemoryManager(db_path="memory.db")
await memory_manager.initialize()
try:
# Cleanup old conversations
policy = CleanupPolicy(
strategy=CleanupStrategy.AGE_BASED,
max_age_days=30
)
result = await memory_manager.optimizer.cleanup_memory(policy)
print(f"Cleaned up {result.threads_deleted} threads")
# Optimize database
await memory_manager.optimizer.optimize_database(
OptimizationLevel.AGGRESSIVE
)
finally:
await memory_manager.close()
This API reference provides comprehensive documentation for all public interfaces in the memory persistence system. For more detailed examples and usage patterns, see the example implementations and troubleshooting guide.