MongoDB Atlas Vector Search Examples
Comprehensive examples demonstrating MongoDB Atlas Vector Search capabilities with RecoAgent.
Basic Examples
Simple Vector Search
from packages.rag.stores import MongoDBAtlasVectorStore, VectorDocument
from packages.rag.mongodb_retrievers import MongoDBVectorRetriever
# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="recoagent",
collection="documents",
vector_search_index="vector_index"
)
# Initialize retriever
retriever = MongoDBVectorRetriever(vector_store)
# Create sample documents
documents = [
VectorDocument(
id="doc1",
content="Machine learning is a subset of artificial intelligence",
embedding=[0.1, 0.2, 0.3, ...], # Your embedding
metadata={"category": "AI", "year": 2023}
),
VectorDocument(
id="doc2",
content="Neural networks are computing systems inspired by biological neural networks",
embedding=[0.2, 0.3, 0.4, ...], # Your embedding
metadata={"category": "AI", "year": 2023}
)
]
# Add documents
vector_store.add_documents(documents)
# Search documents
results = retriever.retrieve("machine learning", k=5)
for result in results:
print(f"Score: {result.score}")
print(f"Content: {result.chunk.content}")
print(f"Metadata: {result.chunk.metadata}")
print()
Async Vector Search
import asyncio
from packages.rag.stores import MongoDBAtlasVectorStore
from packages.rag.mongodb_retrievers import MongoDBVectorRetriever
async def async_search_example():
vector_store = MongoDBAtlasVectorStore(uri="...", database="recoagent")
retriever = MongoDBVectorRetriever(vector_store)
# Async search
results = await retriever.retrieve_async("artificial intelligence", k=10)
for result in results:
print(f"Score: {result.score}")
print(f"Content: {result.chunk.content[:100]}...")
return results
# Run async example
results = asyncio.run(async_search_example())
Hybrid Search Examples
Basic Hybrid Search
from packages.rag.mongodb_retrievers import MongoDBHybridRetriever, MongoDBHybridConfig
# Configure hybrid search
config = MongoDBHybridConfig(
text_weight=0.3,
vector_weight=0.7,
vector_k=20,
text_k=20,
final_k=10
)
retriever = MongoDBHybridRetriever(vector_store, config)
# Create text index for hybrid search
retriever.create_text_index(['content', 'title'])
# Perform hybrid search
results = retriever.retrieve("machine learning algorithms", k=10)
for result in results:
print(f"Score: {result.score} (Hybrid)")
print(f"Content: {result.chunk.content}")
print(f"Method: {result.retrieval_method}")
print()
Advanced Hybrid Search with Filtering
# Hybrid search with metadata filtering
filter_metadata = {
"category": "AI",
"year": {"operator": "$gte", "value": 2023},
"difficulty": {"operator": "$in", "value": ["beginner", "intermediate"]}
}
results = retriever.retrieve(
query="deep learning techniques",
k=10,
filter_metadata=filter_metadata
)
print(f"Found {len(results)} results with filtering")
Faceted Search Examples
Basic Faceted Search
from packages.rag.mongodb_retrievers import MongoDBFacetedRetriever
retriever = MongoDBFacetedRetriever(vector_store)
# Perform faceted search
results = retriever.retrieve(
query="artificial intelligence",
k=10,
facets=['category', 'year', 'difficulty']
)
print(f"Found {len(results)} results")
# Get facet information
facets = retriever.get_facets("artificial intelligence", ['category', 'year'])
print(f"Facets: {facets}")
Advanced Faceted Search
# Faceted search with complex filtering
filter_metadata = {
"category": "AI",
"year": {"operator": "$gte", "value": 2020}
}
results = retriever.retrieve(
query="machine learning",
k=15,
facets=['category', 'subcategory', 'difficulty', 'language'],
filter_metadata=filter_metadata
)
# Display results with facets
for result in results:
print(f"Score: {result.score}")
print(f"Content: {result.chunk.content[:100]}...")
print(f"Metadata: {result.chunk.metadata}")
print()
# Display facet information
facets = retriever.get_facets(
"machine learning",
['category', 'subcategory', 'difficulty'],
filter_metadata
)
for facet_name, facet_values in facets.items():
print(f"\n{facet_name}:")
for value in facet_values:
print(f" {value['_id']}: {value['count']} documents")
Advanced Examples
Multi-Strategy Search
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever
retriever = MongoDBAdvancedRetriever(vector_store)
# Test different search strategies
query = "machine learning applications"
# Vector search
vector_results = retriever.retrieve(query, k=5, search_type="vector")
print(f"Vector search: {len(vector_results)} results")
# Hybrid search
hybrid_results = retriever.retrieve(query, k=5, search_type="hybrid")
print(f"Hybrid search: {len(hybrid_results)} results")
# Faceted search
faceted_results = retriever.retrieve(
query, k=5, search_type="faceted",
facets=['category', 'year']
)
print(f"Faceted search: {len(faceted_results)} results")
Batch Processing
import asyncio
from typing import List
async def batch_search_example():
vector_store = MongoDBAtlasVectorStore(uri="...", database="recoagent")
retriever = MongoDBVectorRetriever(vector_store)
queries = [
"machine learning algorithms",
"neural network architectures",
"deep learning applications",
"artificial intelligence ethics",
"computer vision techniques"
]
# Batch async searches
tasks = []
for query in queries:
task = retriever.retrieve_async(query, k=5)
tasks.append(task)
# Execute all searches concurrently
results_list = await asyncio.gather(*tasks)
# Process results
for i, (query, results) in enumerate(zip(queries, results_list)):
print(f"Query {i+1}: '{query}'")
print(f"Found {len(results)} results")
for result in results[:2]: # Show top 2 results
print(f" Score: {result.score:.3f}")
print(f" Content: {result.chunk.content[:80]}...")
print()
return results_list
# Run batch example
results = asyncio.run(batch_search_example())
Document Management
def document_management_example():
vector_store = MongoDBAtlasVectorStore(uri="...", database="recoagent")
# Add documents
documents = [
VectorDocument(
id=f"doc_{i}",
content=f"Document {i} about machine learning",
embedding=[0.1 + i * 0.01] * 384,
metadata={"category": "AI", "index": i}
)
for i in range(100)
]
# Batch add documents
batch_size = 20
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
success = vector_store.add_documents(batch)
print(f"Added batch {i//batch_size + 1}: {success}")
# Get statistics
stats = vector_store.get_stats()
print(f"Total documents: {stats['total_documents']}")
print(f"Storage size: {stats['storage_size']} bytes")
# Delete some documents
doc_ids_to_delete = [f"doc_{i}" for i in range(0, 100, 10)]
success = vector_store.delete_documents(doc_ids_to_delete)
print(f"Deleted {len(doc_ids_to_delete)} documents: {success}")
# Updated statistics
stats = vector_store.get_stats()
print(f"Total documents after deletion: {stats['total_documents']}")
document_management_example()
Production Examples
Production Setup
from config.settings import get_config
from packages.rag.stores import get_vector_store
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever
def setup_production_search():
"""Setup production-ready MongoDB vector search."""
# Get configuration
config = get_config()
# Initialize vector store with production settings
vector_store = get_vector_store(
"mongodb_atlas",
uri=config.vector_store.mongodb_uri,
database=config.vector_store.mongodb_database,
collection=config.vector_store.mongodb_collection,
vector_search_index=config.vector_store.mongodb_vector_search_index,
embedding_dim=config.llm.embedding_dimension,
max_pool_size=config.vector_store.mongodb_max_pool_size,
min_pool_size=config.vector_store.mongodb_min_pool_size
)
# Initialize advanced retriever
retriever = MongoDBAdvancedRetriever(vector_store)
# Create necessary indexes
retriever.create_text_index(['content', 'title', 'description'])
# Verify setup
stats = retriever.get_stats()
print(f"Production setup complete: {stats['total_documents']} documents")
return retriever
# Use in production
search_engine = setup_production_search()
results = search_engine.retrieve("user query", k=10, search_type="hybrid")
Error Handling and Resilience
import time
from typing import Optional, List
class ResilientMongoDBSearch:
"""Resilient MongoDB search with retry logic and fallbacks."""
def __init__(self, vector_store, max_retries=3, retry_delay=1.0):
self.vector_store = vector_store
self.max_retries = max_retries
self.retry_delay = retry_delay
def search_with_retry(self, query: str, k: int = 5) -> Optional[List]:
"""Search with retry logic."""
for attempt in range(self.max_retries):
try:
results = self.vector_store.search(query, k=k)
return results
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
else:
print("All retry attempts failed")
return None
def search_with_fallback(self, query: str, k: int = 5) -> List:
"""Search with fallback strategies."""
# Try hybrid search first
try:
results = self.vector_store.hybrid_search(query, query_embedding, k=k)
return results
except Exception as e:
print(f"Hybrid search failed: {e}")
# Fallback to vector search
try:
results = self.vector_store.search(query_embedding, k=k)
return results
except Exception as e:
print(f"Vector search failed: {e}")
# Final fallback: return empty results
return []
# Usage
resilient_search = ResilientMongoDBSearch(vector_store)
results = resilient_search.search_with_fallback("machine learning", k=10)
Performance Monitoring
import time
import psutil
from typing import Dict, Any
class PerformanceMonitor:
"""Monitor MongoDB search performance."""
def __init__(self, vector_store):
self.vector_store = vector_store
self.metrics = []
def search_with_monitoring(self, query: str, k: int = 5) -> Dict[str, Any]:
"""Search with performance monitoring."""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
try:
results = self.vector_store.search(query, k=k)
success = True
error = None
except Exception as e:
results = []
success = False
error = str(e)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
# Record metrics
metrics = {
"query": query,
"k": k,
"latency_ms": (end_time - start_time) * 1000,
"memory_delta_mb": (end_memory - start_memory) / 1024 / 1024,
"success": success,
"error": error,
"result_count": len(results),
"timestamp": time.time()
}
self.metrics.append(metrics)
return {"results": results, "metrics": metrics}
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary."""
if not self.metrics:
return {}
latencies = [m["latency_ms"] for m in self.metrics if m["success"]]
success_rate = sum(m["success"] for m in self.metrics) / len(self.metrics)
return {
"total_queries": len(self.metrics),
"success_rate": success_rate,
"avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
"min_latency_ms": min(latencies) if latencies else 0,
"max_latency_ms": max(latencies) if latencies else 0
}
# Usage
monitor = PerformanceMonitor(vector_store)
# Run some searches
for query in ["AI", "machine learning", "neural networks"]:
result = monitor.search_with_monitoring(query, k=5)
print(f"Query: {query}, Latency: {result['metrics']['latency_ms']:.2f}ms")
# Get summary
summary = monitor.get_performance_summary()
print(f"Performance Summary: {summary}")
Integration Examples
FastAPI Integration
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# Initialize search engine
search_engine = setup_production_search()
class SearchRequest(BaseModel):
query: str
k: int = 5
search_type: str = "hybrid"
facets: Optional[List[str]] = None
filter_metadata: Optional[dict] = None
class SearchResponse(BaseModel):
results: List[dict]
total_results: int
search_type: str
@app.post("/search", response_model=SearchResponse)
async def search_documents(request: SearchRequest):
"""Search documents endpoint."""
try:
results = search_engine.retrieve(
query=request.query,
k=request.k,
search_type=request.search_type,
facets=request.facets,
filter_metadata=request.filter_metadata
)
return SearchResponse(
results=[
{
"score": result.score,
"content": result.chunk.content,
"metadata": result.chunk.metadata
}
for result in results
],
total_results=len(results),
search_type=request.search_type
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/stats")
async def get_stats():
"""Get collection statistics."""
stats = search_engine.get_stats()
return stats
LangChain Integration
from langchain.vectorstores import VectorStore
from langchain.schema import Document
from typing import List
class MongoDBAtlasLangChainWrapper(VectorStore):
"""LangChain wrapper for MongoDB Atlas Vector Search."""
def __init__(self, vector_store):
self.vector_store = vector_store
def add_documents(self, documents: List[Document]) -> List[str]:
"""Add documents to the vector store."""
vector_docs = []
for doc in documents:
vector_doc = VectorDocument(
id=doc.metadata.get("id", str(hash(doc.page_content))),
content=doc.page_content,
embedding=doc.metadata.get("embedding", []),
metadata=doc.metadata
)
vector_docs.append(vector_doc)
success = self.vector_store.add_documents(vector_docs)
return [doc.id for doc in vector_docs] if success else []
def similarity_search(self, query: str, k: int = 4, **kwargs) -> List[Document]:
"""Perform similarity search."""
# Get query embedding (you'll need to implement this)
query_embedding = self._get_query_embedding(query)
results = self.vector_store.search(query_embedding, k=k)
documents = []
for result in results:
doc = Document(
page_content=result["content"],
metadata=result["metadata"]
)
documents.append(doc)
return documents
def _get_query_embedding(self, query: str) -> List[float]:
"""Get embedding for query (implement with your embedding model)."""
# This is a placeholder - implement with your embedding model
return [0.0] * 384
# Usage with LangChain
vector_store_wrapper = MongoDBAtlasLangChainWrapper(vector_store)
documents = vector_store_wrapper.similarity_search("machine learning", k=5)
These examples demonstrate the full range of MongoDB Atlas Vector Search capabilities with RecoAgent, from basic usage to production-ready implementations.