Skip to main content

MongoDB Performance Benchmark

This example demonstrates how to benchmark MongoDB Atlas Vector Search performance and compare it with other vector databases like OpenSearch, Qdrant, and Azure AI Search.

Prerequisites​

  • MongoDB Atlas cluster with Vector Search enabled
  • Optional: Other vector databases for comparison
  • Python 3.8+
  • RecoAgent installed

Basic Performance Testing​

import time
import statistics
from packages.rag.stores import MongoDBAtlasVectorStore, VectorDocument
from packages.rag.mongodb_retrievers import MongoDBAdvancedRetriever

def basic_performance_test():
"""Basic performance test for MongoDB vector search."""

# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="recoagent",
collection="documents",
vector_search_index="vector_index"
)

retriever = MongoDBAdvancedRetriever(vector_store)

# Generate test data
test_documents = []
for i in range(1000):
doc = VectorDocument(
id=f"perf_test_{i}",
content=f"Test document {i} about machine learning, artificial intelligence, and data science topics.",
embedding=[0.1 + (i % 100) * 0.01] * 384, # Simple embedding for demo
metadata={
"category": f"category_{i % 10}",
"difficulty": ["beginner", "intermediate", "advanced"][i % 3],
"year": 2023 + (i % 2),
"index": i
}
)
test_documents.append(doc)

# Add documents
print("Adding test documents...")
start_time = time.time()
success = vector_store.add_documents(test_documents)
add_time = time.time() - start_time
print(f"āœ… Added {len(test_documents)} documents in {add_time:.2f} seconds")

# Test search performance
test_queries = [
"machine learning algorithms",
"artificial intelligence applications",
"neural network architectures",
"deep learning techniques",
"data science methods"
]

search_times = []

print("\nTesting search performance...")
for query in test_queries:
start_time = time.time()
results = retriever.retrieve(query, k=10, search_type="vector")
search_time = time.time() - start_time
search_times.append(search_time)

print(f"Query: '{query}' - {search_time:.3f}s ({len(results)} results)")

# Calculate statistics
avg_search_time = statistics.mean(search_times)
min_search_time = min(search_times)
max_search_time = max(search_times)

print(f"\nSearch Performance Summary:")
print(f" Average time: {avg_search_time:.3f} seconds")
print(f" Min time: {min_search_time:.3f} seconds")
print(f" Max time: {max_search_time:.3f} seconds")
print(f" Queries per second: {1/avg_search_time:.1f}")

# Cleanup
doc_ids = [doc.id for doc in test_documents]
vector_store.delete_documents(doc_ids)
vector_store.close()

return {
"avg_search_time": avg_search_time,
"min_search_time": min_search_time,
"max_search_time": max_search_time,
"queries_per_second": 1/avg_search_time
}

# Run basic performance test
performance_results = basic_performance_test()

Comprehensive Benchmark Suite​

import asyncio
import numpy as np
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class BenchmarkResult:
"""Results from a benchmark run."""
operation: str
dataset_size: int
query_count: int
avg_latency_ms: float
min_latency_ms: float
max_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
throughput_per_second: float
success_rate: float

class MongoDBBenchmark:
"""Comprehensive MongoDB Atlas Vector Search benchmark."""

def __init__(self, uri: str, database: str = "benchmark_db"):
self.uri = uri
self.database = database
self.vector_store = None
self.retriever = None

def initialize(self):
"""Initialize MongoDB components."""
self.vector_store = MongoDBAtlasVectorStore(
uri=self.uri,
database=self.database,
collection="benchmark_collection",
vector_search_index="benchmark_index",
embedding_dim=384
)

self.retriever = MongoDBAdvancedRetriever(self.vector_store)
print("āœ… MongoDB benchmark initialized")

def generate_test_data(self, dataset_size: int) -> List[VectorDocument]:
"""Generate test data for benchmarking."""
print(f"Generating {dataset_size} test documents...")

documents = []
for i in range(dataset_size):
# Generate realistic content
content = f"Document {i} about machine learning, artificial intelligence, neural networks, deep learning, and data science. This document contains information about algorithms, models, and applications in the field of AI and ML."

# Generate simple embedding
embedding = [0.1 + (i % 100) * 0.01] * 384

# Generate metadata
metadata = {
"id": i,
"category": f"category_{i % 20}",
"subcategory": f"subcategory_{i % 50}",
"difficulty": ["beginner", "intermediate", "advanced"][i % 3],
"year": 2020 + (i % 5),
"rating": round(3.0 + (i % 20) * 0.1, 1),
"language": "English",
"tags": [f"tag_{j}" for j in range(i % 5)],
"read_time": 5 + (i % 30)
}

doc = VectorDocument(
id=f"benchmark_doc_{i}",
content=content,
embedding=embedding,
metadata=metadata
)
documents.append(doc)

return documents

def generate_query_embeddings(self, query_count: int) -> List[List[float]]:
"""Generate query embeddings for testing."""
queries = []
for i in range(query_count):
embedding = [0.1 + (i % 50) * 0.02] * 384
queries.append(embedding)
return queries

def benchmark_vector_search(self, dataset_size: int, query_count: int, k: int = 10) -> BenchmarkResult:
"""Benchmark vector search performance."""
print(f"Benchmarking vector search: {dataset_size} docs, {query_count} queries, k={k}")

# Generate test data
documents = self.generate_test_data(dataset_size)
query_embeddings = self.generate_query_embeddings(query_count)

# Add documents
start_time = time.time()
success = self.vector_store.add_documents(documents)
add_time = time.time() - start_time

if not success:
return BenchmarkResult("vector_search", dataset_size, query_count, 0, 0, 0, 0, 0, 0, 0)

print(f" Added {len(documents)} documents in {add_time:.2f}s")

# Warmup queries
for _ in range(5):
try:
self.vector_store.search(query_embeddings[0], k=k)
except:
pass

# Benchmark queries
latencies = []
errors = 0

for query_embedding in query_embeddings:
start_time = time.time()
try:
results = self.vector_store.search(query_embedding, k=k)
latency = (time.time() - start_time) * 1000 # Convert to ms
latencies.append(latency)
except Exception as e:
errors += 1
print(f" Query failed: {e}")

# Calculate statistics
if latencies:
avg_latency = statistics.mean(latencies)
min_latency = min(latencies)
max_latency = max(latencies)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
throughput = query_count / (sum(latencies) / 1000)
success_rate = (query_count - errors) / query_count
else:
avg_latency = min_latency = max_latency = p95_latency = p99_latency = 0
throughput = 0
success_rate = 0

# Cleanup
doc_ids = [doc.id for doc in documents]
self.vector_store.delete_documents(doc_ids)

return BenchmarkResult(
operation="vector_search",
dataset_size=dataset_size,
query_count=query_count,
avg_latency_ms=avg_latency,
min_latency_ms=min_latency,
max_latency_ms=max_latency,
p95_latency_ms=p95_latency,
p99_latency_ms=p99_latency,
throughput_per_second=throughput,
success_rate=success_rate
)

def benchmark_hybrid_search(self, dataset_size: int, query_count: int, k: int = 10) -> BenchmarkResult:
"""Benchmark hybrid search performance."""
print(f"Benchmarking hybrid search: {dataset_size} docs, {query_count} queries, k={k}")

# Generate test data
documents = self.generate_test_data(dataset_size)
query_embeddings = self.generate_query_embeddings(query_count)
query_texts = [f"test query {i}" for i in range(query_count)]

# Add documents
success = self.vector_store.add_documents(documents)
if not success:
return BenchmarkResult("hybrid_search", dataset_size, query_count, 0, 0, 0, 0, 0, 0, 0)

# Create text index
try:
self.vector_store.create_text_index(['content'])
except:
pass

# Warmup queries
for _ in range(5):
try:
self.vector_store.hybrid_search(query_texts[0], query_embeddings[0], k=k)
except:
pass

# Benchmark queries
latencies = []
errors = 0

for query_text, query_embedding in zip(query_texts, query_embeddings):
start_time = time.time()
try:
results = self.vector_store.hybrid_search(query_text, query_embedding, k=k)
latency = (time.time() - start_time) * 1000
latencies.append(latency)
except Exception as e:
errors += 1
print(f" Hybrid query failed: {e}")

# Calculate statistics
if latencies:
avg_latency = statistics.mean(latencies)
min_latency = min(latencies)
max_latency = max(latencies)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
throughput = query_count / (sum(latencies) / 1000)
success_rate = (query_count - errors) / query_count
else:
avg_latency = min_latency = max_latency = p95_latency = p99_latency = 0
throughput = 0
success_rate = 0

# Cleanup
doc_ids = [doc.id for doc in documents]
self.vector_store.delete_documents(doc_ids)

return BenchmarkResult(
operation="hybrid_search",
dataset_size=dataset_size,
query_count=query_count,
avg_latency_ms=avg_latency,
min_latency_ms=min_latency,
max_latency_ms=max_latency,
p95_latency_ms=p95_latency,
p99_latency_ms=p99_latency,
throughput_per_second=throughput,
success_rate=success_rate
)

def benchmark_faceted_search(self, dataset_size: int, query_count: int, k: int = 10) -> BenchmarkResult:
"""Benchmark faceted search performance."""
print(f"Benchmarking faceted search: {dataset_size} docs, {query_count} queries, k={k}")

# Generate test data
documents = self.generate_test_data(dataset_size)
query_embeddings = self.generate_query_embeddings(query_count)
facets = ["category", "difficulty", "year", "rating"]

# Add documents
success = self.vector_store.add_documents(documents)
if not success:
return BenchmarkResult("faceted_search", dataset_size, query_count, 0, 0, 0, 0, 0, 0, 0)

# Warmup queries
for _ in range(5):
try:
self.vector_store.faceted_search(query_embeddings[0], facets, k=k)
except:
pass

# Benchmark queries
latencies = []
errors = 0

for query_embedding in query_embeddings:
start_time = time.time()
try:
results = self.vector_store.faceted_search(query_embedding, facets, k=k)
latency = (time.time() - start_time) * 1000
latencies.append(latency)
except Exception as e:
errors += 1
print(f" Faceted query failed: {e}")

# Calculate statistics
if latencies:
avg_latency = statistics.mean(latencies)
min_latency = min(latencies)
max_latency = max(latencies)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
throughput = query_count / (sum(latencies) / 1000)
success_rate = (query_count - errors) / query_count
else:
avg_latency = min_latency = max_latency = p95_latency = p99_latency = 0
throughput = 0
success_rate = 0

# Cleanup
doc_ids = [doc.id for doc in documents]
self.vector_store.delete_documents(doc_ids)

return BenchmarkResult(
operation="faceted_search",
dataset_size=dataset_size,
query_count=query_count,
avg_latency_ms=avg_latency,
min_latency_ms=min_latency,
max_latency_ms=max_latency,
p95_latency_ms=p95_latency,
p99_latency_ms=p99_latency,
throughput_per_second=throughput,
success_rate=success_rate
)

def run_comprehensive_benchmark(self):
"""Run comprehensive benchmark suite."""
print("šŸš€ Starting Comprehensive MongoDB Atlas Vector Search Benchmark")
print("=" * 70)

self.initialize()

# Benchmark configurations
dataset_sizes = [100, 1000, 5000]
query_counts = [10, 50, 100]
k_values = [5, 10, 20]

results = []

for dataset_size in dataset_sizes:
for query_count in query_counts:
for k in k_values:
print(f"\nšŸ“Š Testing: {dataset_size} docs, {query_count} queries, k={k}")

# Vector search
vector_result = self.benchmark_vector_search(dataset_size, query_count, k)
results.append(vector_result)

# Hybrid search
hybrid_result = self.benchmark_hybrid_search(dataset_size, query_count, k)
results.append(hybrid_result)

# Faceted search
faceted_result = self.benchmark_faceted_search(dataset_size, query_count, k)
results.append(faceted_result)

# Generate report
self.generate_benchmark_report(results)

return results

def generate_benchmark_report(self, results: List[BenchmarkResult]):
"""Generate benchmark report."""
print("\nšŸ“‹ BENCHMARK REPORT")
print("=" * 50)

# Group results by operation
by_operation = {}
for result in results:
if result.operation not in by_operation:
by_operation[result.operation] = []
by_operation[result.operation].append(result)

# Generate summary for each operation
for operation, op_results in by_operation.items():
print(f"\n{operation.upper()} PERFORMANCE:")
print("-" * 30)

if op_results:
avg_latencies = [r.avg_latency_ms for r in op_results]
avg_throughputs = [r.throughput_per_second for r in op_results]
success_rates = [r.success_rate for r in op_results]

print(f" Average Latency: {statistics.mean(avg_latencies):.2f} ms")
print(f" Min Latency: {min(avg_latencies):.2f} ms")
print(f" Max Latency: {max(avg_latencies):.2f} ms")
print(f" Average Throughput: {statistics.mean(avg_throughputs):.1f} qps")
print(f" Average Success Rate: {statistics.mean(success_rates):.2%}")

# Show best and worst performance
best_result = min(op_results, key=lambda x: x.avg_latency_ms)
worst_result = max(op_results, key=lambda x: x.avg_latency_ms)

print(f" Best Performance: {best_result.avg_latency_ms:.2f}ms ({best_result.dataset_size} docs, {best_result.query_count} queries)")
print(f" Worst Performance: {worst_result.avg_latency_ms:.2f}ms ({worst_result.dataset_size} docs, {worst_result.query_count} queries)")

def close(self):
"""Close connections."""
if self.vector_store:
self.vector_store.close()

# Run comprehensive benchmark
def run_mongodb_benchmark():
"""Run MongoDB benchmark."""
uri = "mongodb+srv://username:password@cluster.mongodb.net/"

benchmark = MongoDBBenchmark(uri)

try:
results = benchmark.run_comprehensive_benchmark()
return results
except Exception as e:
print(f"āŒ Benchmark failed: {e}")
return []
finally:
benchmark.close()

# Run benchmark
benchmark_results = run_mongodb_benchmark()

Performance Comparison with Other Vector Databases​

def compare_vector_databases():
"""Compare MongoDB Atlas with other vector databases."""

print("šŸ” Vector Database Performance Comparison")
print("=" * 50)

# This is a simplified comparison - in practice, you'd run the same
# benchmark against each database

# Mock results for demonstration
comparison_results = {
"MongoDB Atlas": {
"avg_latency_ms": 45.2,
"throughput_qps": 22.1,
"memory_usage_mb": 128,
"setup_complexity": "Medium",
"features": ["Vector Search", "Text Search", "Faceted Search", "Unified Storage"]
},
"OpenSearch": {
"avg_latency_ms": 38.7,
"throughput_qps": 25.8,
"memory_usage_mb": 256,
"setup_complexity": "High",
"features": ["Vector Search", "Text Search", "Faceted Search", "Separate Storage"]
},
"Qdrant": {
"avg_latency_ms": 32.1,
"throughput_qps": 31.2,
"memory_usage_mb": 192,
"setup_complexity": "Low",
"features": ["Vector Search", "Filtering", "Separate Storage"]
},
"Azure AI Search": {
"avg_latency_ms": 67.3,
"throughput_qps": 14.9,
"memory_usage_mb": 0, # Managed service
"setup_complexity": "Low",
"features": ["Vector Search", "Text Search", "Faceted Search", "Managed Service"]
}
}

print(f"{'Database':<15} {'Latency (ms)':<12} {'Throughput (qps)':<15} {'Memory (MB)':<12} {'Complexity':<12}")
print("-" * 80)

for db_name, metrics in comparison_results.items():
print(f"{db_name:<15} {metrics['avg_latency_ms']:<12.1f} {metrics['throughput_qps']:<15.1f} {metrics['memory_usage_mb']:<12} {metrics['setup_complexity']:<12}")

print(f"\nšŸ“Š Feature Comparison:")
print("-" * 30)

all_features = set()
for metrics in comparison_results.values():
all_features.update(metrics['features'])

for feature in sorted(all_features):
print(f"\n{feature}:")
for db_name, metrics in comparison_results.items():
status = "āœ…" if feature in metrics['features'] else "āŒ"
print(f" {db_name}: {status}")

return comparison_results

# Run comparison
comparison = compare_vector_databases()

Load Testing​

import asyncio
import concurrent.futures
from typing import List

async def load_test_mongodb():
"""Load test MongoDB Atlas Vector Search."""

print("šŸ”„ MongoDB Atlas Load Testing")
print("=" * 40)

# Initialize
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="load_test_db",
collection="load_test_collection",
vector_search_index="load_test_index",
embedding_dim=384
)

retriever = MongoDBAdvancedRetriever(vector_store)

try:
# Add test data
documents = []
for i in range(10000):
doc = VectorDocument(
id=f"load_test_{i}",
content=f"Load test document {i} about machine learning and artificial intelligence.",
embedding=[0.1 + (i % 100) * 0.01] * 384,
metadata={"index": i, "category": f"cat_{i % 10}"}
)
documents.append(doc)

print("Adding 10,000 test documents...")
vector_store.add_documents(documents)
print("āœ… Test data added")

# Load test configurations
concurrent_users = [1, 5, 10, 20, 50]
queries_per_user = 100

for users in concurrent_users:
print(f"\n🧪 Testing with {users} concurrent users...")

async def user_simulation(user_id: int):
"""Simulate a single user making queries."""
query_times = []

for i in range(queries_per_user):
query_embedding = [0.1 + (i % 50) * 0.02] * 384

start_time = time.time()
try:
results = await retriever.retrieve_async(
f"user_{user_id}_query_{i}", k=10, search_type="vector"
)
query_time = time.time() - start_time
query_times.append(query_time)
except Exception as e:
print(f"User {user_id} query {i} failed: {e}")
query_times.append(1.0) # Penalty for failed queries

return query_times

# Run concurrent users
start_time = time.time()

tasks = []
for user_id in range(users):
task = user_simulation(user_id)
tasks.append(task)

all_query_times = await asyncio.gather(*tasks)
total_time = time.time() - start_time

# Calculate metrics
all_times = [time for user_times in all_query_times for time in user_times]
avg_query_time = statistics.mean(all_times)
total_queries = users * queries_per_user
queries_per_second = total_queries / total_time

print(f" Total time: {total_time:.2f}s")
print(f" Total queries: {total_queries}")
print(f" Queries per second: {queries_per_second:.1f}")
print(f" Average query time: {avg_query_time:.3f}s")
print(f" P95 query time: {np.percentile(all_times, 95):.3f}s")

# Performance thresholds
if avg_query_time > 1.0:
print(f" āš ļø High latency detected!")
if queries_per_second < 10:
print(f" āš ļø Low throughput detected!")

finally:
# Cleanup
doc_ids = [f"load_test_{i}" for i in range(10000)]
vector_store.delete_documents(doc_ids)
vector_store.close()

# Run load test
asyncio.run(load_test_mongodb())

Memory and Resource Monitoring​

import psutil
import gc

def monitor_resources_during_benchmark():
"""Monitor memory and CPU usage during benchmark."""

print("šŸ“Š Resource Monitoring During Benchmark")
print("=" * 45)

process = psutil.Process()

# Initial state
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
initial_cpu = process.cpu_percent()

print(f"Initial Memory: {initial_memory:.1f} MB")
print(f"Initial CPU: {initial_cpu:.1f}%")

# Initialize vector store
vector_store = MongoDBAtlasVectorStore(
uri="mongodb+srv://username:password@cluster.mongodb.net/",
database="resource_test_db",
collection="resource_test_collection",
vector_search_index="resource_test_index",
embedding_dim=384
)

retriever = MongoDBAdvancedRetriever(vector_store)

try:
# Add documents and monitor
documents = []
for i in range(5000):
doc = VectorDocument(
id=f"resource_test_{i}",
content=f"Resource monitoring document {i} for testing memory and CPU usage.",
embedding=[0.1 + (i % 100) * 0.01] * 384,
metadata={"index": i}
)
documents.append(doc)

print("\nAdding documents...")
start_time = time.time()
vector_store.add_documents(documents)
add_time = time.time() - start_time

memory_after_add = process.memory_info().rss / 1024 / 1024
cpu_after_add = process.cpu_percent()

print(f"After adding documents:")
print(f" Memory: {memory_after_add:.1f} MB (+{memory_after_add - initial_memory:.1f} MB)")
print(f" CPU: {cpu_after_add:.1f}%")
print(f" Add time: {add_time:.2f}s")

# Search and monitor
print("\nPerforming searches...")
search_times = []
memory_samples = []

for i in range(100):
query_embedding = [0.1 + (i % 50) * 0.02] * 384

start_time = time.time()
results = retriever.retrieve(f"query_{i}", k=10, search_type="vector")
search_time = time.time() - start_time
search_times.append(search_time)

# Sample memory every 10 queries
if i % 10 == 0:
memory_samples.append(process.memory_info().rss / 1024 / 1024)

# Final state
final_memory = process.memory_info().rss / 1024 / 1024
final_cpu = process.cpu_percent()

print(f"\nAfter searches:")
print(f" Memory: {final_memory:.1f} MB (+{final_memory - initial_memory:.1f} MB)")
print(f" CPU: {final_cpu:.1f}%")
print(f" Average search time: {statistics.mean(search_times):.3f}s")
print(f" Memory samples: {memory_samples}")

# Memory growth analysis
if memory_samples:
memory_growth = max(memory_samples) - min(memory_samples)
print(f" Memory growth during search: {memory_growth:.1f} MB")

# Garbage collection
print("\nRunning garbage collection...")
gc.collect()

gc_memory = process.memory_info().rss / 1024 / 1024
print(f"Memory after GC: {gc_memory:.1f} MB")

finally:
# Cleanup
doc_ids = [f"resource_test_{i}" for i in range(5000)]
vector_store.delete_documents(doc_ids)
vector_store.close()

final_memory = process.memory_info().rss / 1024 / 1024
print(f"\nFinal memory after cleanup: {final_memory:.1f} MB")

# Run resource monitoring
monitor_resources_during_benchmark()

Complete Performance Benchmark Example​

def complete_performance_benchmark():
"""Complete performance benchmark with all features."""

print("šŸŽÆ Complete MongoDB Atlas Vector Search Performance Benchmark")
print("=" * 70)

# Initialize benchmark
uri = "mongodb+srv://username:password@cluster.mongodb.net/"
benchmark = MongoDBBenchmark(uri)

try:
# Run comprehensive benchmark
results = benchmark.run_comprehensive_benchmark()

# Run load test
print("\n" + "="*70)
asyncio.run(load_test_mongodb())

# Run resource monitoring
print("\n" + "="*70)
monitor_resources_during_benchmark()

# Compare with other databases
print("\n" + "="*70)
comparison = compare_vector_databases()

print("\nāœ… Complete benchmark finished!")
return results, comparison

except Exception as e:
print(f"āŒ Benchmark failed: {e}")
return [], {}

finally:
benchmark.close()

# Run complete benchmark
benchmark_results, comparison_results = complete_performance_benchmark()

This performance benchmark example provides comprehensive testing of MongoDB Atlas Vector Search performance, including load testing, resource monitoring, and comparison with other vector databases.

Next Steps​