MongoDB Atlas Vector Search Integration Guide
This comprehensive guide covers the MongoDB Atlas Vector Search integration in RecoAgent, providing detailed configuration, optimization, and troubleshooting information.
Table of Contents
- Overview
- Quick Start
- Configuration
- Vector Search Operations
- Hybrid Search
- Faceted Search
- Index Management
- Performance Optimization
- Troubleshooting
- API Reference
- Examples
Overview
MongoDB Atlas Vector Search provides a unified storage solution for documents and their vector representations, enabling high-performance semantic search with advanced filtering capabilities. This integration offers:
- Unified Storage: Documents and vectors stored together in MongoDB
- Hybrid Search: Combines text search with vector similarity
- Advanced Filtering: Metadata-based filtering and faceted search
- Connection Pooling: Optimized connection management
- Async Operations: Full async/await support
- Performance Monitoring: Built-in statistics and monitoring
Quick Start
1. Prerequisites
- MongoDB Atlas cluster with Vector Search enabled
- Python 3.8+
- Required dependencies installed
2. Installation
pip install pymongo motor
3. Basic Usage
from packages.rag.stores import MongoDBAtlasVectorStore
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever
# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="recoagent",
collection="documents",
vector_search_index="vector_index"
)
# Initialize retriever
retriever = MongoDBAdvancedRetriever(vector_store)
# Search documents
results = retriever.retrieve("machine learning", k=5)
Configuration
Environment Variables
Set the following environment variables:
# MongoDB Atlas Configuration
MONGODB_URI=mongodb+srv://username:password@cluster.mongodb.net/
MONGODB_DATABASE=recoagent
MONGODB_COLLECTION=documents
MONGODB_VECTOR_SEARCH_INDEX=vector_index
# Connection Pool Settings
MONGODB_MAX_POOL_SIZE=100
MONGODB_MIN_POOL_SIZE=10
MONGODB_MAX_IDLE_TIME_MS=30000
MONGODB_CONNECT_TIMEOUT_MS=10000
MONGODB_SERVER_SELECTION_TIMEOUT_MS=10000
# Vector Store Type
VECTOR_STORE_TYPE=mongodb_atlas
EMBEDDING_DIMENSION=3072
Configuration in Code
from config.settings import get_config
config = get_config()
# Access MongoDB configuration
mongodb_uri = config.vector_store.mongodb_uri
database = config.vector_store.mongodb_database
collection = config.vector_store.mongodb_collection
Connection Pool Configuration
vector_store = MongoDBAtlasVectorStore(
uri=uri,
database=database,
collection=collection,
vector_search_index=index_name,
max_pool_size=100, # Maximum connections
min_pool_size=10, # Minimum connections
max_idle_time_ms=30000, # Connection idle timeout
connect_timeout_ms=10000, # Connection timeout
server_selection_timeout_ms=10000 # Server selection timeout
)
Vector Search Operations
Basic Vector Search
# Search with vector similarity
query_embedding = [0.1, 0.2, 0.3, ...] # Your embedding
results = vector_store.search(
query_embedding=query_embedding,
k=10,
include_metadata=True
)
# With metadata filtering
filter_metadata = {
"category": "AI",
"year": {"operator": "$gte", "value": 2023}
}
results = vector_store.search(
query_embedding=query_embedding,
k=10,
filter_metadata=filter_metadata
)
Async Vector Search
import asyncio
async def search_async():
results = await vector_store.search_async(
query_embedding=query_embedding,
k=10,
include_metadata=True
)
return results
# Run async search
results = asyncio.run(search_async())
Adding Documents
from packages.rag.stores import VectorDocument
# Create documents
documents = [
VectorDocument(
id="doc1",
content="Machine learning is a subset of AI",
embedding=[0.1, 0.2, 0.3, ...],
metadata={"category": "AI", "year": 2023}
),
# ... more documents
]
# Add to MongoDB
success = vector_store.add_documents(documents)
# Async version
success = await vector_store.add_documents_async(documents)
Hybrid Search
Hybrid search combines MongoDB's text search with vector similarity for improved results.
Basic Hybrid Search
# Perform hybrid search
results = vector_store.hybrid_search(
query_text="machine learning algorithms",
query_embedding=query_embedding,
k=10,
text_weight=0.3, # Weight for text search
vector_weight=0.7 # Weight for vector search
)
Using Hybrid Retriever
from packages.rag.mongodb_retrievers import MongoDBHybridRetriever
# Configure hybrid search
config = MongoDBHybridConfig(
text_weight=0.3,
vector_weight=0.7,
vector_k=20,
text_k=20,
final_k=10
)
retriever = MongoDBHybridRetriever(vector_store, config)
# Search
results = retriever.retrieve("machine learning", k=10)
Creating Text Index
# Create text index for hybrid search
vector_store.create_text_index(['content', 'title', 'description'])
# Or using the retriever
retriever.create_text_index(['content'])
Faceted Search
Faceted search provides metadata-based filtering and aggregation capabilities.
Basic Faceted Search
# Perform faceted search
results = vector_store.faceted_search(
query_embedding=query_embedding,
facets=['category', 'year', 'difficulty'],
k=10
)
print(f"Found {results['total_results']} results")
print(f"Facets: {results['facets']}")
# Access results
for result in results['results']:
print(f"Score: {result['score']}")
print(f"Content: {result['content']}")
print(f"Metadata: {result['metadata']}")
Using Faceted Retriever
from packages.rag.mongodb_retrievers import MongoDBFacetedRetriever
retriever = MongoDBFacetedRetriever(vector_store)
# Search with facets
results = retriever.retrieve(
query="machine learning",
k=10,
facets=['category', 'year']
)
# Get facet information only
facets = retriever.get_facets("machine learning", ['category', 'year'])
Index Management
Vector Search Index
The vector search index is automatically created when adding documents. You can also create it manually:
# Check if index exists and create if needed
vector_store._create_vector_search_index()
Index Configuration
# Custom index configuration
index_definition = {
"name": "custom_vector_index",
"type": "vectorSearch",
"definition": {
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 3072,
"similarity": "cosine"
},
{
"type": "filter",
"path": "metadata"
}
]
}
}
Text Index for Hybrid Search
# Create text index on multiple fields
vector_store.create_text_index(['content', 'title', 'description'])
# Check existing indexes
indexes = vector_store.sync_client[database].list_indexes()
for index in indexes:
print(f"Index: {index['name']}, Type: {index.get('type', 'regular')}")
Performance Optimization
Connection Pooling
# Optimize connection pool settings
vector_store = MongoDBAtlasVectorStore(
uri=uri,
max_pool_size=100, # Increase for high concurrency
min_pool_size=20, # Keep minimum connections alive
max_idle_time_ms=60000, # Longer idle timeout
connect_timeout_ms=5000, # Faster connection timeout
server_selection_timeout_ms=5000
)
Query Optimization
# Use appropriate k values
# Smaller k for faster queries
results = vector_store.search(query_embedding, k=5)
# Larger k for better recall
results = vector_store.search(query_embedding, k=50)
# Use metadata filtering to reduce search space
filter_metadata = {"category": "AI"}
results = vector_store.search(
query_embedding,
k=10,
filter_metadata=filter_metadata
)
Batch Operations
# Batch document insertion
batch_size = 1000
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
vector_store.add_documents(batch)
# Batch search operations
async def batch_search(queries):
tasks = []
for query in queries:
task = vector_store.search_async(query['embedding'], k=query['k'])
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
Monitoring Performance
# Get collection statistics
stats = vector_store.get_stats()
print(f"Total documents: {stats['total_documents']}")
print(f"Storage size: {stats['storage_size']} bytes")
print(f"Index size: {stats['index_size']} bytes")
# Monitor query performance
import time
start_time = time.time()
results = vector_store.search(query_embedding, k=10)
search_time = time.time() - start_time
print(f"Search completed in {search_time:.3f} seconds")
Troubleshooting
Common Issues
1. Connection Errors
Problem: ServerSelectionTimeoutError
or connection failures
Solutions:
# Check URI format
uri = "mongodb+srv://username:password@cluster.mongodb.net/"
# Verify network access
# Ensure IP whitelist includes your IP address
# Check connection timeout settings
vector_store = MongoDBAtlasVectorStore(
uri=uri,
connect_timeout_ms=30000, # Increase timeout
server_selection_timeout_ms=30000
)
2. Vector Search Index Issues
Problem: Vector search not working or index not found
Solutions:
# Check if index exists
indexes = vector_store.sync_client[database].list_search_indexes()
print(f"Available indexes: {[idx['name'] for idx in indexes]}")
# Create index manually
vector_store._create_vector_search_index()
# Verify index configuration
index_info = vector_store.sync_client[database].get_search_index(collection, index_name)
print(f"Index info: {index_info}")
3. Performance Issues
Problem: Slow search performance
Solutions:
# Optimize k value
# Use smaller k for faster queries
results = vector_store.search(query_embedding, k=5)
# Use metadata filtering
filter_metadata = {"category": "AI"}
results = vector_store.search(query_embedding, k=10, filter_metadata=filter_metadata)
# Optimize connection pool
vector_store = MongoDBAtlasVectorStore(
uri=uri,
max_pool_size=200, # Increase pool size
min_pool_size=50 # Increase minimum connections
)
4. Memory Issues
Problem: High memory usage
Solutions:
# Reduce batch sizes
batch_size = 100 # Smaller batches
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
vector_store.add_documents(batch)
# Close connections when done
vector_store.close()
# Monitor memory usage
import psutil
process = psutil.Process()
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
print(f"Memory usage: {memory_usage:.2f} MB")
Debug Mode
Enable debug logging for troubleshooting:
import logging
# Enable MongoDB driver logging
logging.getLogger('pymongo').setLevel(logging.DEBUG)
logging.getLogger('motor').setLevel(logging.DEBUG)
# Enable RecoAgent logging
logging.getLogger('packages.rag.stores').setLevel(logging.DEBUG)
Error Handling
try:
results = vector_store.search(query_embedding, k=10)
except Exception as e:
print(f"Search failed: {e}")
# Check specific error types
if "ServerSelectionTimeoutError" in str(e):
print("Connection timeout - check network and credentials")
elif "IndexNotFound" in str(e):
print("Vector search index not found - create index first")
elif "AuthenticationFailed" in str(e):
print("Authentication failed - check credentials")
API Reference
MongoDBAtlasVectorStore
Constructor
MongoDBAtlasVectorStore(
uri: str,
database: str = "recoagent",
collection: str = "documents",
vector_search_index: str = "vector_index",
embedding_dim: int = 3072,
max_pool_size: int = 100,
min_pool_size: int = 10,
max_idle_time_ms: int = 30000,
connect_timeout_ms: int = 10000,
server_selection_timeout_ms: int = 10000
)
Methods
add_documents(documents: List[VectorDocument]) -> bool
Add documents to the vector store.
search(query_embedding: List[float], k: int = 5, include_metadata: bool = True, filter_metadata: Optional[Dict] = None) -> List[Dict]
Search documents using vector similarity.
hybrid_search(query_text: str, query_embedding: List[float], k: int = 5, text_weight: float = 0.3, vector_weight: float = 0.7, filter_metadata: Optional[Dict] = None) -> List[Dict]
Perform hybrid search combining text and vector search.
faceted_search(query_embedding: List[float], facets: List[str], k: int = 5, filter_metadata: Optional[Dict] = None) -> Dict
Perform faceted search with metadata aggregation.
delete_documents(document_ids: List[str]) -> bool
Delete documents by IDs.
get_stats() -> Dict
Get collection statistics.
create_text_index(text_fields: List[str]) -> None
Create text index for hybrid search.
close() -> None
Close database connections.
MongoDB Retrievers
MongoDBVectorRetriever
Basic vector search retriever.
MongoDBHybridRetriever
Hybrid search retriever combining text and vector search.
MongoDBFacetedRetriever
Faceted search retriever with metadata filtering.
MongoDBAdvancedRetriever
Advanced retriever supporting multiple search strategies.
Examples
Complete Example
import asyncio
from packages.rag.stores import MongoDBAtlasVectorStore, VectorDocument
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever
async def main():
# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="recoagent",
collection="documents",
vector_search_index="vector_index"
)
# Initialize retriever
retriever = MongoDBAdvancedRetriever(vector_store)
# Create sample documents
documents = [
VectorDocument(
id="doc1",
content="Machine learning is a subset of artificial intelligence",
embedding=[0.1] * 384,
metadata={"category": "AI", "year": 2023}
),
VectorDocument(
id="doc2",
content="Neural networks are computing systems inspired by biological neural networks",
embedding=[0.2] * 384,
metadata={"category": "AI", "year": 2023}
)
]
# Add documents
await vector_store.add_documents_async(documents)
# Search documents
results = retriever.retrieve("machine learning", k=5, search_type="hybrid")
for result in results:
print(f"Score: {result.score}")
print(f"Content: {result.chunk.content}")
print(f"Metadata: {result.chunk.metadata}")
print()
# Cleanup
vector_store.close()
if __name__ == "__main__":
asyncio.run(main())
Production Example
from config.settings import get_config
from packages.rag.stores import get_vector_store
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever
def setup_production_vector_search():
"""Setup vector search for production environment."""
# Get configuration
config = get_config()
# Initialize vector store
vector_store = get_vector_store(
"mongodb_atlas",
uri=config.vector_store.mongodb_uri,
database=config.vector_store.mongodb_database,
collection=config.vector_store.mongodb_collection,
vector_search_index=config.vector_store.mongodb_vector_search_index,
embedding_dim=config.llm.embedding_dimension,
max_pool_size=config.vector_store.mongodb_max_pool_size,
min_pool_size=config.vector_store.mongodb_min_pool_size
)
# Initialize retriever
retriever = MongoDBAdvancedRetriever(vector_store)
# Create text index for hybrid search
retriever.create_text_index(['content', 'title'])
return retriever
# Use in your application
retriever = setup_production_vector_search()
results = retriever.retrieve("your query", k=10, search_type="hybrid")
This comprehensive guide provides everything you need to effectively use MongoDB Atlas Vector Search with RecoAgent. For additional support, refer to the troubleshooting section or check the example implementations.