MongoDB Performance Benchmark
This example demonstrates how to benchmark MongoDB Atlas Vector Search performance and compare it with other vector databases like OpenSearch, Qdrant, and Azure AI Search.
Prerequisitesā
- MongoDB Atlas cluster with Vector Search enabled
- Optional: Other vector databases for comparison
- Python 3.8+
- RecoAgent installed
Basic Performance Testingā
import time
import statistics
from packages.rag.stores import MongoDBAtlasVectorStore, VectorDocument
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever
def basic_performance_test():
"""Basic performance test for MongoDB vector search."""
# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="recoagent",
collection="documents",
vector_search_index="vector_index"
)
retriever = MongoDBAdvancedRetriever(vector_store)
# Generate test data
test_documents = []
for i in range(1000):
doc = VectorDocument(
id=f"perf_test_{i}",
content=f"Test document {i} about machine learning, artificial intelligence, and data science topics.",
embedding=[0.1 + (i % 100) * 0.01] * 384, # Simple embedding for demo
metadata={
"category": f"category_{i % 10}",
"difficulty": ["beginner", "intermediate", "advanced"][i % 3],
"year": 2023 + (i % 2),
"index": i
}
)
test_documents.append(doc)
# Add documents
print("Adding test documents...")
start_time = time.time()
success = vector_store.add_documents(test_documents)
add_time = time.time() - start_time
print(f"ā
Added {len(test_documents)} documents in {add_time:.2f} seconds")
# Test search performance
test_queries = [
"machine learning algorithms",
"artificial intelligence applications",
"neural network architectures",
"deep learning techniques",
"data science methods"
]
search_times = []
print("\nTesting search performance...")
for query in test_queries:
start_time = time.time()
results = retriever.retrieve(query, k=10, search_type="vector")
search_time = time.time() - start_time
search_times.append(search_time)
print(f"Query: '{query}' - {search_time:.3f}s ({len(results)} results)")
# Calculate statistics
avg_search_time = statistics.mean(search_times)
min_search_time = min(search_times)
max_search_time = max(search_times)
print(f"\nSearch Performance Summary:")
print(f" Average time: {avg_search_time:.3f} seconds")
print(f" Min time: {min_search_time:.3f} seconds")
print(f" Max time: {max_search_time:.3f} seconds")
print(f" Queries per second: {1/avg_search_time:.1f}")
# Cleanup
doc_ids = [doc.id for doc in test_documents]
vector_store.delete_documents(doc_ids)
vector_store.close()
return {
"avg_search_time": avg_search_time,
"min_search_time": min_search_time,
"max_search_time": max_search_time,
"queries_per_second": 1/avg_search_time
}
# Run basic performance test
performance_results = basic_performance_test()
Comprehensive Benchmark Suiteā
import asyncio
import numpy as np
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class BenchmarkResult:
"""Results from a benchmark run."""
operation: str
dataset_size: int
query_count: int
avg_latency_ms: float
min_latency_ms: float
max_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
throughput_per_second: float
success_rate: float
class MongoDBBenchmark:
"""Comprehensive MongoDB Atlas Vector Search benchmark."""
def __init__(self, uri: str, database: str = "benchmark_db"):
self.uri = uri
self.database = database
self.vector_store = None
self.retriever = None
def initialize(self):
"""Initialize MongoDB components."""
self.vector_store = MongoDBAtlasVectorStore(
uri=self.uri,
database=self.database,
collection="benchmark_collection",
vector_search_index="benchmark_index",
embedding_dim=384
)
self.retriever = MongoDBAdvancedRetriever(self.vector_store)
print("ā
MongoDB benchmark initialized")
def generate_test_data(self, dataset_size: int) -> List[VectorDocument]:
"""Generate test data for benchmarking."""
print(f"Generating {dataset_size} test documents...")
documents = []
for i in range(dataset_size):
# Generate realistic content
content = f"Document {i} about machine learning, artificial intelligence, neural networks, deep learning, and data science. This document contains information about algorithms, models, and applications in the field of AI and ML."
# Generate simple embedding
embedding = [0.1 + (i % 100) * 0.01] * 384
# Generate metadata
metadata = {
"id": i,
"category": f"category_{i % 20}",
"subcategory": f"subcategory_{i % 50}",
"difficulty": ["beginner", "intermediate", "advanced"][i % 3],
"year": 2020 + (i % 5),
"rating": round(3.0 + (i % 20) * 0.1, 1),
"language": "English",
"tags": [f"tag_{j}" for j in range(i % 5)],
"read_time": 5 + (i % 30)
}
doc = VectorDocument(
id=f"benchmark_doc_{i}",
content=content,
embedding=embedding,
metadata=metadata
)
documents.append(doc)
return documents
def generate_query_embeddings(self, query_count: int) -> List[List[float]]:
"""Generate query embeddings for testing."""
queries = []
for i in range(query_count):
embedding = [0.1 + (i % 50) * 0.02] * 384
queries.append(embedding)
return queries
def benchmark_vector_search(self, dataset_size: int, query_count: int, k: int = 10) -> BenchmarkResult:
"""Benchmark vector search performance."""
print(f"Benchmarking vector search: {dataset_size} docs, {query_count} queries, k={k}")
# Generate test data
documents = self.generate_test_data(dataset_size)
query_embeddings = self.generate_query_embeddings(query_count)
# Add documents
start_time = time.time()
success = self.vector_store.add_documents(documents)
add_time = time.time() - start_time
if not success:
return BenchmarkResult("vector_search", dataset_size, query_count, 0, 0, 0, 0, 0, 0, 0)
print(f" Added {len(documents)} documents in {add_time:.2f}s")
# Warmup queries
for _ in range(5):
try:
self.vector_store.search(query_embeddings[0], k=k)
except:
pass
# Benchmark queries
latencies = []
errors = 0
for query_embedding in query_embeddings:
start_time = time.time()
try:
results = self.vector_store.search(query_embedding, k=k)
latency = (time.time() - start_time) * 1000 # Convert to ms
latencies.append(latency)
except Exception as e:
errors += 1
print(f" Query failed: {e}")
# Calculate statistics
if latencies:
avg_latency = statistics.mean(latencies)
min_latency = min(latencies)
max_latency = max(latencies)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
throughput = query_count / (sum(latencies) / 1000)
success_rate = (query_count - errors) / query_count
else:
avg_latency = min_latency = max_latency = p95_latency = p99_latency = 0
throughput = 0
success_rate = 0
# Cleanup
doc_ids = [doc.id for doc in documents]
self.vector_store.delete_documents(doc_ids)
return BenchmarkResult(
operation="vector_search",
dataset_size=dataset_size,
query_count=query_count,
avg_latency_ms=avg_latency,
min_latency_ms=min_latency,
max_latency_ms=max_latency,
p95_latency_ms=p95_latency,
p99_latency_ms=p99_latency,
throughput_per_second=throughput,
success_rate=success_rate
)
def benchmark_hybrid_search(self, dataset_size: int, query_count: int, k: int = 10) -> BenchmarkResult:
"""Benchmark hybrid search performance."""
print(f"Benchmarking hybrid search: {dataset_size} docs, {query_count} queries, k={k}")
# Generate test data
documents = self.generate_test_data(dataset_size)
query_embeddings = self.generate_query_embeddings(query_count)
query_texts = [f"test query {i}" for i in range(query_count)]
# Add documents
success = self.vector_store.add_documents(documents)
if not success:
return BenchmarkResult("hybrid_search", dataset_size, query_count, 0, 0, 0, 0, 0, 0, 0)
# Create text index
try:
self.vector_store.create_text_index(['content'])
except:
pass
# Warmup queries
for _ in range(5):
try:
self.vector_store.hybrid_search(query_texts[0], query_embeddings[0], k=k)
except:
pass
# Benchmark queries
latencies = []
errors = 0
for query_text, query_embedding in zip(query_texts, query_embeddings):
start_time = time.time()
try:
results = self.vector_store.hybrid_search(query_text, query_embedding, k=k)
latency = (time.time() - start_time) * 1000
latencies.append(latency)
except Exception as e:
errors += 1
print(f" Hybrid query failed: {e}")
# Calculate statistics
if latencies:
avg_latency = statistics.mean(latencies)
min_latency = min(latencies)
max_latency = max(latencies)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
throughput = query_count / (sum(latencies) / 1000)
success_rate = (query_count - errors) / query_count
else:
avg_latency = min_latency = max_latency = p95_latency = p99_latency = 0
throughput = 0
success_rate = 0
# Cleanup
doc_ids = [doc.id for doc in documents]
self.vector_store.delete_documents(doc_ids)
return BenchmarkResult(
operation="hybrid_search",
dataset_size=dataset_size,
query_count=query_count,
avg_latency_ms=avg_latency,
min_latency_ms=min_latency,
max_latency_ms=max_latency,
p95_latency_ms=p95_latency,
p99_latency_ms=p99_latency,
throughput_per_second=throughput,
success_rate=success_rate
)
def benchmark_faceted_search(self, dataset_size: int, query_count: int, k: int = 10) -> BenchmarkResult:
"""Benchmark faceted search performance."""
print(f"Benchmarking faceted search: {dataset_size} docs, {query_count} queries, k={k}")
# Generate test data
documents = self.generate_test_data(dataset_size)
query_embeddings = self.generate_query_embeddings(query_count)
facets = ["category", "difficulty", "year", "rating"]
# Add documents
success = self.vector_store.add_documents(documents)
if not success:
return BenchmarkResult("faceted_search", dataset_size, query_count, 0, 0, 0, 0, 0, 0, 0)
# Warmup queries
for _ in range(5):
try:
self.vector_store.faceted_search(query_embeddings[0], facets, k=k)
except:
pass
# Benchmark queries
latencies = []
errors = 0
for query_embedding in query_embeddings:
start_time = time.time()
try:
results = self.vector_store.faceted_search(query_embedding, facets, k=k)
latency = (time.time() - start_time) * 1000
latencies.append(latency)
except Exception as e:
errors += 1
print(f" Faceted query failed: {e}")
# Calculate statistics
if latencies:
avg_latency = statistics.mean(latencies)
min_latency = min(latencies)
max_latency = max(latencies)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
throughput = query_count / (sum(latencies) / 1000)
success_rate = (query_count - errors) / query_count
else:
avg_latency = min_latency = max_latency = p95_latency = p99_latency = 0
throughput = 0
success_rate = 0
# Cleanup
doc_ids = [doc.id for doc in documents]
self.vector_store.delete_documents(doc_ids)
return BenchmarkResult(
operation="faceted_search",
dataset_size=dataset_size,
query_count=query_count,
avg_latency_ms=avg_latency,
min_latency_ms=min_latency,
max_latency_ms=max_latency,
p95_latency_ms=p95_latency,
p99_latency_ms=p99_latency,
throughput_per_second=throughput,
success_rate=success_rate
)
def run_comprehensive_benchmark(self):
"""Run comprehensive benchmark suite."""
print("š Starting Comprehensive MongoDB Atlas Vector Search Benchmark")
print("=" * 70)
self.initialize()
# Benchmark configurations
dataset_sizes = [100, 1000, 5000]
query_counts = [10, 50, 100]
k_values = [5, 10, 20]
results = []
for dataset_size in dataset_sizes:
for query_count in query_counts:
for k in k_values:
print(f"\nš Testing: {dataset_size} docs, {query_count} queries, k={k}")
# Vector search
vector_result = self.benchmark_vector_search(dataset_size, query_count, k)
results.append(vector_result)
# Hybrid search
hybrid_result = self.benchmark_hybrid_search(dataset_size, query_count, k)
results.append(hybrid_result)
# Faceted search
faceted_result = self.benchmark_faceted_search(dataset_size, query_count, k)
results.append(faceted_result)
# Generate report
self.generate_benchmark_report(results)
return results
def generate_benchmark_report(self, results: List[BenchmarkResult]):
"""Generate benchmark report."""
print("\nš BENCHMARK REPORT")
print("=" * 50)
# Group results by operation
by_operation = {}
for result in results:
if result.operation not in by_operation:
by_operation[result.operation] = []
by_operation[result.operation].append(result)
# Generate summary for each operation
for operation, op_results in by_operation.items():
print(f"\n{operation.upper()} PERFORMANCE:")
print("-" * 30)
if op_results:
avg_latencies = [r.avg_latency_ms for r in op_results]
avg_throughputs = [r.throughput_per_second for r in op_results]
success_rates = [r.success_rate for r in op_results]
print(f" Average Latency: {statistics.mean(avg_latencies):.2f} ms")
print(f" Min Latency: {min(avg_latencies):.2f} ms")
print(f" Max Latency: {max(avg_latencies):.2f} ms")
print(f" Average Throughput: {statistics.mean(avg_throughputs):.1f} qps")
print(f" Average Success Rate: {statistics.mean(success_rates):.2%}")
# Show best and worst performance
best_result = min(op_results, key=lambda x: x.avg_latency_ms)
worst_result = max(op_results, key=lambda x: x.avg_latency_ms)
print(f" Best Performance: {best_result.avg_latency_ms:.2f}ms ({best_result.dataset_size} docs, {best_result.query_count} queries)")
print(f" Worst Performance: {worst_result.avg_latency_ms:.2f}ms ({worst_result.dataset_size} docs, {worst_result.query_count} queries)")
def close(self):
"""Close connections."""
if self.vector_store:
self.vector_store.close()
# Run comprehensive benchmark
def run_mongodb_benchmark():
"""Run MongoDB benchmark."""
uri = "mongodb+srv://username:password@cluster.mongodb.net/"
benchmark = MongoDBBenchmark(uri)
try:
results = benchmark.run_comprehensive_benchmark()
return results
except Exception as e:
print(f"ā Benchmark failed: {e}")
return []
finally:
benchmark.close()
# Run benchmark
benchmark_results = run_mongodb_benchmark()
Performance Comparison with Other Vector Databasesā
def compare_vector_databases():
"""Compare MongoDB Atlas with other vector databases."""
print("š Vector Database Performance Comparison")
print("=" * 50)
# This is a simplified comparison - in practice, you'd run the same
# benchmark against each database
# Mock results for demonstration
comparison_results = {
"MongoDB Atlas": {
"avg_latency_ms": 45.2,
"throughput_qps": 22.1,
"memory_usage_mb": 128,
"setup_complexity": "Medium",
"features": ["Vector Search", "Text Search", "Faceted Search", "Unified Storage"]
},
"OpenSearch": {
"avg_latency_ms": 38.7,
"throughput_qps": 25.8,
"memory_usage_mb": 256,
"setup_complexity": "High",
"features": ["Vector Search", "Text Search", "Faceted Search", "Separate Storage"]
},
"Qdrant": {
"avg_latency_ms": 32.1,
"throughput_qps": 31.2,
"memory_usage_mb": 192,
"setup_complexity": "Low",
"features": ["Vector Search", "Filtering", "Separate Storage"]
},
"Azure AI Search": {
"avg_latency_ms": 67.3,
"throughput_qps": 14.9,
"memory_usage_mb": 0, # Managed service
"setup_complexity": "Low",
"features": ["Vector Search", "Text Search", "Faceted Search", "Managed Service"]
}
}
print(f"{'Database':<15} {'Latency (ms)':<12} {'Throughput (qps)':<15} {'Memory (MB)':<12} {'Complexity':<12}")
print("-" * 80)
for db_name, metrics in comparison_results.items():
print(f"{db_name:<15} {metrics['avg_latency_ms']:<12.1f} {metrics['throughput_qps']:<15.1f} {metrics['memory_usage_mb']:<12} {metrics['setup_complexity']:<12}")
print(f"\nš Feature Comparison:")
print("-" * 30)
all_features = set()
for metrics in comparison_results.values():
all_features.update(metrics['features'])
for feature in sorted(all_features):
print(f"\n{feature}:")
for db_name, metrics in comparison_results.items():
status = "ā
" if feature in metrics['features'] else "ā"
print(f" {db_name}: {status}")
return comparison_results
# Run comparison
comparison = compare_vector_databases()
Load Testingā
import asyncio
import concurrent.futures
from typing import List
async def load_test_mongodb():
"""Load test MongoDB Atlas Vector Search."""
print("š„ MongoDB Atlas Load Testing")
print("=" * 40)
# Initialize
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="load_test_db",
collection="load_test_collection",
vector_search_index="load_test_index",
embedding_dim=384
)
retriever = MongoDBAdvancedRetriever(vector_store)
try:
# Add test data
documents = []
for i in range(10000):
doc = VectorDocument(
id=f"load_test_{i}",
content=f"Load test document {i} about machine learning and artificial intelligence.",
embedding=[0.1 + (i % 100) * 0.01] * 384,
metadata={"index": i, "category": f"cat_{i % 10}"}
)
documents.append(doc)
print("Adding 10,000 test documents...")
vector_store.add_documents(documents)
print("ā
Test data added")
# Load test configurations
concurrent_users = [1, 5, 10, 20, 50]
queries_per_user = 100
for users in concurrent_users:
print(f"\nš§Ŗ Testing with {users} concurrent users...")
async def user_simulation(user_id: int):
"""Simulate a single user making queries."""
query_times = []
for i in range(queries_per_user):
query_embedding = [0.1 + (i % 50) * 0.02] * 384
start_time = time.time()
try:
results = await retriever.retrieve_async(
f"user_{user_id}_query_{i}", k=10, search_type="vector"
)
query_time = time.time() - start_time
query_times.append(query_time)
except Exception as e:
print(f"User {user_id} query {i} failed: {e}")
query_times.append(1.0) # Penalty for failed queries
return query_times
# Run concurrent users
start_time = time.time()
tasks = []
for user_id in range(users):
task = user_simulation(user_id)
tasks.append(task)
all_query_times = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Calculate metrics
all_times = [time for user_times in all_query_times for time in user_times]
avg_query_time = statistics.mean(all_times)
total_queries = users * queries_per_user
queries_per_second = total_queries / total_time
print(f" Total time: {total_time:.2f}s")
print(f" Total queries: {total_queries}")
print(f" Queries per second: {queries_per_second:.1f}")
print(f" Average query time: {avg_query_time:.3f}s")
print(f" P95 query time: {np.percentile(all_times, 95):.3f}s")
# Performance thresholds
if avg_query_time > 1.0:
print(f" ā ļø High latency detected!")
if queries_per_second < 10:
print(f" ā ļø Low throughput detected!")
finally:
# Cleanup
doc_ids = [f"load_test_{i}" for i in range(10000)]
vector_store.delete_documents(doc_ids)
vector_store.close()
# Run load test
asyncio.run(load_test_mongodb())
Memory and Resource Monitoringā
import psutil
import gc
def monitor_resources_during_benchmark():
"""Monitor memory and CPU usage during benchmark."""
print("š Resource Monitoring During Benchmark")
print("=" * 45)
process = psutil.Process()
# Initial state
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
initial_cpu = process.cpu_percent()
print(f"Initial Memory: {initial_memory:.1f} MB")
print(f"Initial CPU: {initial_cpu:.1f}%")
# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="resource_test_db",
collection="resource_test_collection",
vector_search_index="resource_test_index",
embedding_dim=384
)
retriever = MongoDBAdvancedRetriever(vector_store)
try:
# Add documents and monitor
documents = []
for i in range(5000):
doc = VectorDocument(
id=f"resource_test_{i}",
content=f"Resource monitoring document {i} for testing memory and CPU usage.",
embedding=[0.1 + (i % 100) * 0.01] * 384,
metadata={"index": i}
)
documents.append(doc)
print("\nAdding documents...")
start_time = time.time()
vector_store.add_documents(documents)
add_time = time.time() - start_time
memory_after_add = process.memory_info().rss / 1024 / 1024
cpu_after_add = process.cpu_percent()
print(f"After adding documents:")
print(f" Memory: {memory_after_add:.1f} MB (+{memory_after_add - initial_memory:.1f} MB)")
print(f" CPU: {cpu_after_add:.1f}%")
print(f" Add time: {add_time:.2f}s")
# Search and monitor
print("\nPerforming searches...")
search_times = []
memory_samples = []
for i in range(100):
query_embedding = [0.1 + (i % 50) * 0.02] * 384
start_time = time.time()
results = retriever.retrieve(f"query_{i}", k=10, search_type="vector")
search_time = time.time() - start_time
search_times.append(search_time)
# Sample memory every 10 queries
if i % 10 == 0:
memory_samples.append(process.memory_info().rss / 1024 / 1024)
# Final state
final_memory = process.memory_info().rss / 1024 / 1024
final_cpu = process.cpu_percent()
print(f"\nAfter searches:")
print(f" Memory: {final_memory:.1f} MB (+{final_memory - initial_memory:.1f} MB)")
print(f" CPU: {final_cpu:.1f}%")
print(f" Average search time: {statistics.mean(search_times):.3f}s")
print(f" Memory samples: {memory_samples}")
# Memory growth analysis
if memory_samples:
memory_growth = max(memory_samples) - min(memory_samples)
print(f" Memory growth during search: {memory_growth:.1f} MB")
# Garbage collection
print("\nRunning garbage collection...")
gc.collect()
gc_memory = process.memory_info().rss / 1024 / 1024
print(f"Memory after GC: {gc_memory:.1f} MB")
finally:
# Cleanup
doc_ids = [f"resource_test_{i}" for i in range(5000)]
vector_store.delete_documents(doc_ids)
vector_store.close()
final_memory = process.memory_info().rss / 1024 / 1024
print(f"\nFinal memory after cleanup: {final_memory:.1f} MB")
# Run resource monitoring
monitor_resources_during_benchmark()
Complete Performance Benchmark Exampleā
def complete_performance_benchmark():
"""Complete performance benchmark with all features."""
print("šÆ Complete MongoDB Atlas Vector Search Performance Benchmark")
print("=" * 70)
# Initialize benchmark
uri = "mongodb+srv://username:password@cluster.mongodb.net/"
benchmark = MongoDBBenchmark(uri)
try:
# Run comprehensive benchmark
results = benchmark.run_comprehensive_benchmark()
# Run load test
print("\n" + "="*70)
asyncio.run(load_test_mongodb())
# Run resource monitoring
print("\n" + "="*70)
monitor_resources_during_benchmark()
# Compare with other databases
print("\n" + "="*70)
comparison = compare_vector_databases()
print("\nā
Complete benchmark finished!")
return results, comparison
except Exception as e:
print(f"ā Benchmark failed: {e}")
return [], {}
finally:
benchmark.close()
# Run complete benchmark
benchmark_results, comparison_results = complete_performance_benchmark()
This performance benchmark example provides comprehensive testing of MongoDB Atlas Vector Search performance, including load testing, resource monitoring, and comparison with other vector databases.