Skip to main content

Performance Optimization Guide

This guide provides comprehensive strategies and techniques for optimizing the document ingestion pipeline to handle high-volume document processing efficiently.

Performance Overview

The document ingestion pipeline is designed to scale horizontally and vertically to handle enterprise-level document volumes. This guide covers optimization strategies for different components and scenarios.

Performance Targets

MetricTargetHigh Volume
Throughput100 docs/min1000+ docs/min
Processing Time (P95)< 5 seconds< 10 seconds
Error Rate< 1%< 2%
Memory Usage< 2GB< 8GB
CPU Usage< 50%< 80%
Disk I/O< 100 MB/s< 500 MB/s

System Architecture Optimization

Horizontal Scaling

Load Balancing

Implement load balancing for multiple ingestion instances:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

app = FastAPI()

# Configure for load balancing
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Health check endpoint for load balancer
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.utcnow()}

# Ready check endpoint
@app.get("/ready")
async def ready_check():
# Check database connectivity
# Check file system access
# Check resource availability
return {"status": "ready"}

Microservices Architecture

Split the pipeline into specialized services:

# docker-compose.yml
version: '3.8'
services:
document-processor:
image: ingestion-processor:latest
replicas: 3
environment:
- WORKER_TYPE=processor
- MAX_CONCURRENT=5

dead-letter-queue:
image: ingestion-dlq:latest
replicas: 2
environment:
- WORKER_TYPE=dlq
- MAX_CONCURRENT=10

monitoring:
image: ingestion-monitoring:latest
replicas: 1
environment:
- WORKER_TYPE=monitoring

api-gateway:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf

Vertical Scaling

Resource Optimization

Optimize system resources:

import os
import psutil
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

class ResourceOptimizer:
def __init__(self):
self.cpu_count = os.cpu_count()
self.memory_gb = psutil.virtual_memory().total // (1024**3)

def get_optimal_concurrency(self):
"""Calculate optimal concurrency based on system resources."""
# CPU-bound: 2x CPU cores
# I/O-bound: 4x CPU cores
# Memory-constrained: Based on available memory

if self.memory_gb < 4:
return min(4, self.cpu_count)
elif self.memory_gb < 8:
return min(8, self.cpu_count * 2)
else:
return min(16, self.cpu_count * 4)

def get_optimal_chunk_size(self):
"""Calculate optimal chunk size based on memory."""
available_memory_mb = psutil.virtual_memory().available // (1024**2)

if available_memory_mb < 1000:
return 500 # 500 words per chunk
elif available_memory_mb < 4000:
return 1000 # 1000 words per chunk
else:
return 2000 # 2000 words per chunk

Database Optimization

Connection Pooling

Optimize database connections:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Optimized connection pool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Base number of connections
max_overflow=30, # Additional connections when needed
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections after 1 hour
pool_timeout=30, # Timeout for getting connection
echo=False # Disable SQL logging in production
)

# Async connection pool
from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
ASYNC_DATABASE_URL,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600,
pool_timeout=30,
)

Query Optimization

Optimize database queries:

-- Create optimized indexes
CREATE INDEX CONCURRENTLY idx_documents_status_created
ON documents(status, created_at);

CREATE INDEX CONCURRENTLY idx_documents_source_processing
ON documents(source, processing_status)
WHERE processing_status = 'processing';

CREATE INDEX CONCURRENTLY idx_dlq_priority_status
ON dlq_items(priority, status, created_at);

-- Partial indexes for common queries
CREATE INDEX CONCURRENTLY idx_metrics_recent
ON metrics(timestamp)
WHERE timestamp > NOW() - INTERVAL '7 days';

-- Optimize frequently used queries
EXPLAIN ANALYZE
SELECT d.*, v.version_number
FROM documents d
JOIN document_versions v ON d.id = v.document_id
WHERE d.status = 'completed'
AND d.created_at > NOW() - INTERVAL '24 hours'
ORDER BY d.created_at DESC
LIMIT 100;

Database Partitioning

Implement table partitioning:

-- Partition metrics table by date
CREATE TABLE metrics (
id SERIAL,
name VARCHAR(255),
value DECIMAL,
timestamp TIMESTAMP,
tags JSONB
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE metrics_2023_12 PARTITION OF metrics
FOR VALUES FROM ('2023-12-01') TO ('2024-01-01');

CREATE TABLE metrics_2024_01 PARTITION OF metrics
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- Automatic partition creation function
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
partition_name text;
end_date date;
BEGIN
partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
end_date := start_date + interval '1 month';

EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
partition_name, table_name, start_date, end_date);
END;
$$ LANGUAGE plpgsql;

Processing Optimization

Async Processing

Implement efficient async processing:

import asyncio
import aiofiles
from asyncio import Semaphore
from typing import List, Dict, Any

class AsyncDocumentProcessor:
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
self.processing_queue = asyncio.Queue(maxsize=1000)

async def process_documents_batch(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple documents concurrently."""
tasks = []

for doc in documents:
task = asyncio.create_task(self._process_single_document(doc))
tasks.append(task)

# Process with controlled concurrency
results = await asyncio.gather(*tasks, return_exceptions=True)

# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'document_id': documents[i].get('id'),
'status': 'failed',
'error': str(result)
})
else:
processed_results.append(result)

return processed_results

async def _process_single_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single document with semaphore control."""
async with self.semaphore:
try:
# Simulate document processing
await asyncio.sleep(0.1) # Replace with actual processing

return {
'document_id': document['id'],
'status': 'success',
'chunks_created': 5,
'processing_time_ms': 100
}
except Exception as e:
return {
'document_id': document['id'],
'status': 'failed',
'error': str(e)
}

Memory Management

Optimize memory usage:

import gc
import tracemalloc
from typing import Generator

class MemoryOptimizedProcessor:
def __init__(self):
self.memory_threshold = 0.8 # 80% memory usage threshold
self.chunk_size = 1000

def process_large_dataset(self, documents: List[Dict]) -> Generator[Dict, None, None]:
"""Process large datasets with memory management."""
tracemalloc.start()

try:
for i in range(0, len(documents), self.chunk_size):
chunk = documents[i:i + self.chunk_size]

# Process chunk
for doc in chunk:
result = self._process_document(doc)
yield result

# Memory management
if self._memory_usage_high():
gc.collect() # Force garbage collection

# Log memory usage
current, peak = tracemalloc.get_traced_memory()
logger.info(f"Memory usage: {current / 1024**2:.1f} MB (peak: {peak / 1024**2:.1f} MB)")

finally:
tracemalloc.stop()

def _memory_usage_high(self) -> bool:
"""Check if memory usage is high."""
memory = psutil.virtual_memory()
return memory.percent > (self.memory_threshold * 100)

def _process_document(self, document: Dict) -> Dict:
"""Process a single document."""
# Implement document processing logic
pass

Caching Strategy

Implement intelligent caching:

import redis
from functools import wraps
import hashlib
import json

class CacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 3600 # 1 hour

def cache_result(self, ttl: int = None):
"""Decorator to cache function results."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_cache_key(func.__name__, args, kwargs)

# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)

# Execute function
result = await func(*args, **kwargs)

# Cache result
self.redis_client.setex(
cache_key,
ttl or self.default_ttl,
json.dumps(result, default=str)
)

return result
return wrapper
return decorator

def _generate_cache_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate a unique cache key."""
key_data = {
'function': func_name,
'args': args,
'kwargs': sorted(kwargs.items())
}
key_string = json.dumps(key_data, default=str, sort_keys=True)
return hashlib.md5(key_string.encode()).hexdigest()

# Usage
cache_manager = CacheManager()

@cache_manager.cache_result(ttl=1800) # 30 minutes
async def get_document_metadata(document_id: str) -> Dict:
"""Get document metadata with caching."""
# Expensive database query
pass

File System Optimization

I/O Optimization

Optimize file I/O operations:

import aiofiles
import asyncio
from pathlib import Path

class OptimizedFileHandler:
def __init__(self, buffer_size: int = 8192):
self.buffer_size = buffer_size

async def read_file_chunks(self, file_path: str) -> Generator[bytes, None, None]:
"""Read file in optimized chunks."""
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(self.buffer_size):
yield chunk

async def write_file_optimized(self, file_path: str, data: bytes):
"""Write file with optimized buffering."""
async with aiofiles.open(file_path, 'wb', buffering=self.buffer_size) as f:
await f.write(data)
await f.flush() # Ensure data is written

async def copy_file_optimized(self, src_path: str, dst_path: str):
"""Copy file with optimized buffering."""
async with aiofiles.open(src_path, 'rb') as src:
async with aiofiles.open(dst_path, 'wb', buffering=self.buffer_size) as dst:
while chunk := await src.read(self.buffer_size):
await dst.write(chunk)

Storage Optimization

Optimize storage usage:

import gzip
import shutil
from pathlib import Path

class StorageOptimizer:
def __init__(self, compression_level: int = 6):
self.compression_level = compression_level

async def compress_large_files(self, file_path: str, threshold_mb: int = 10) -> str:
"""Compress files larger than threshold."""
file_size = Path(file_path).stat().st_size

if file_size < threshold_mb * 1024 * 1024:
return file_path

compressed_path = f"{file_path}.gz"

with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb', compresslevel=self.compression_level) as f_out:
shutil.copyfileobj(f_in, f_out)

# Remove original file
Path(file_path).unlink()

return compressed_path

async def decompress_file(self, compressed_path: str) -> str:
"""Decompress a gzipped file."""
if not compressed_path.endswith('.gz'):
return compressed_path

original_path = compressed_path[:-3] # Remove .gz extension

with gzip.open(compressed_path, 'rb') as f_in:
with open(original_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)

return original_path

def cleanup_old_files(self, directory: str, days_old: int = 30):
"""Clean up old files to save storage."""
from datetime import datetime, timedelta

cutoff_date = datetime.now() - timedelta(days=days_old)
directory_path = Path(directory)

for file_path in directory_path.rglob('*'):
if file_path.is_file() and datetime.fromtimestamp(file_path.stat().st_mtime) < cutoff_date:
file_path.unlink()

Network Optimization

Connection Pooling

Optimize network connections:

import aiohttp
import asyncio
from typing import Dict, Any

class OptimizedHTTPClient:
def __init__(self, max_connections: int = 100):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=30,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.session = None

async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=30)
)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()

async def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Make optimized GET request."""
async with self.session.get(url, **kwargs) as response:
return await response.json()

async def post(self, url: str, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Make optimized POST request."""
async with self.session.post(url, json=data, **kwargs) as response:
return await response.json()

# Usage
async def fetch_documents_from_api():
async with OptimizedHTTPClient() as client:
documents = await client.get("https://api.company.com/documents")
return documents

Batch Processing

Implement efficient batch processing:

from typing import List, Dict, Any
import asyncio

class BatchProcessor:
def __init__(self, batch_size: int = 100, max_concurrent_batches: int = 5):
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrent_batches)

async def process_in_batches(self, items: List[Dict[str, Any]],
process_func) -> List[Dict[str, Any]]:
"""Process items in optimized batches."""
results = []

# Split into batches
batches = [items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)]

# Process batches concurrently
batch_tasks = []
for batch in batches:
task = asyncio.create_task(self._process_batch(batch, process_func))
batch_tasks.append(task)

# Wait for all batches to complete
batch_results = await asyncio.gather(*batch_tasks)

# Flatten results
for batch_result in batch_results:
results.extend(batch_result)

return results

async def _process_batch(self, batch: List[Dict[str, Any]],
process_func) -> List[Dict[str, Any]]:
"""Process a single batch with concurrency control."""
async with self.semaphore:
return await process_func(batch)

Monitoring and Profiling

Performance Monitoring

Implement comprehensive performance monitoring:

import time
import psutil
from functools import wraps
from typing import Dict, Any

class PerformanceMonitor:
def __init__(self):
self.metrics = {}

def monitor_function(self, func_name: str = None):
"""Decorator to monitor function performance."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = psutil.Process().memory_info().rss

try:
result = await func(*args, **kwargs)
status = "success"
return result
except Exception as e:
status = "error"
raise
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss

# Record metrics
self._record_metrics(
func_name or func.__name__,
end_time - start_time,
end_memory - start_memory,
status
)

return wrapper
return decorator

def _record_metrics(self, func_name: str, duration: float,
memory_delta: int, status: str):
"""Record performance metrics."""
if func_name not in self.metrics:
self.metrics[func_name] = {
'count': 0,
'total_duration': 0,
'total_memory': 0,
'errors': 0
}

metrics = self.metrics[func_name]
metrics['count'] += 1
metrics['total_duration'] += duration
metrics['total_memory'] += memory_delta

if status == "error":
metrics['errors'] += 1

def get_performance_report(self) -> Dict[str, Any]:
"""Generate performance report."""
report = {}

for func_name, metrics in self.metrics.items():
avg_duration = metrics['total_duration'] / metrics['count']
avg_memory = metrics['total_memory'] / metrics['count']
error_rate = metrics['errors'] / metrics['count']

report[func_name] = {
'calls': metrics['count'],
'avg_duration_ms': avg_duration * 1000,
'avg_memory_mb': avg_memory / 1024 / 1024,
'error_rate': error_rate
}

return report

# Usage
monitor = PerformanceMonitor()

@monitor.monitor_function("process_document")
async def process_document(document: Dict[str, Any]) -> Dict[str, Any]:
# Document processing logic
pass

Profiling Tools

Use profiling tools for optimization:

import cProfile
import pstats
import io
from functools import wraps

def profile_function(func):
"""Decorator to profile function performance."""
@wraps(func)
async def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()

try:
result = await func(*args, **kwargs)
return result
finally:
profiler.disable()

# Generate profile report
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Top 10 functions

# Log profile results
logger.info(f"Profile for {func.__name__}:\n{s.getvalue()}")

return wrapper

# Memory profiling
import tracemalloc

def profile_memory(func):
"""Decorator to profile memory usage."""
@wraps(func)
async def wrapper(*args, **kwargs):
tracemalloc.start()

try:
result = await func(*args, **kwargs)
return result
finally:
current, peak = tracemalloc.get_traced_memory()
logger.info(f"Memory usage for {func.__name__}: "
f"current={current/1024/1024:.1f}MB, "
f"peak={peak/1024/1024:.1f}MB")
tracemalloc.stop()

return wrapper

Scaling Strategies

Auto-scaling

Implement auto-scaling based on metrics:

import asyncio
import psutil
from typing import Dict, Any

class AutoScaler:
def __init__(self, min_instances: int = 1, max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
self.scaling_metrics = {
'cpu_threshold': 70,
'memory_threshold': 80,
'queue_length_threshold': 100
}

async def monitor_and_scale(self):
"""Monitor metrics and scale accordingly."""
while True:
try:
metrics = await self._collect_metrics()
scaling_action = self._determine_scaling_action(metrics)

if scaling_action != 'none':
await self._execute_scaling_action(scaling_action)

await asyncio.sleep(60) # Check every minute

except Exception as e:
logger.error(f"Auto-scaling error: {e}")
await asyncio.sleep(60)

async def _collect_metrics(self) -> Dict[str, float]:
"""Collect system metrics."""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'queue_length': await self._get_queue_length(),
'processing_time': await self._get_avg_processing_time()
}

def _determine_scaling_action(self, metrics: Dict[str, float]) -> str:
"""Determine scaling action based on metrics."""
cpu_high = metrics['cpu_percent'] > self.scaling_metrics['cpu_threshold']
memory_high = metrics['memory_percent'] > self.scaling_metrics['memory_threshold']
queue_long = metrics['queue_length'] > self.scaling_metrics['queue_length_threshold']

# Scale up conditions
if (cpu_high or memory_high or queue_long) and self.current_instances < self.max_instances:
return 'scale_up'

# Scale down conditions
cpu_low = metrics['cpu_percent'] < 30
memory_low = metrics['memory_percent'] < 40
queue_short = metrics['queue_length'] < 10

if (cpu_low and memory_low and queue_short) and self.current_instances > self.min_instances:
return 'scale_down'

return 'none'

async def _execute_scaling_action(self, action: str):
"""Execute scaling action."""
if action == 'scale_up':
self.current_instances += 1
await self._add_instance()
logger.info(f"Scaled up to {self.current_instances} instances")

elif action == 'scale_down':
self.current_instances -= 1
await self._remove_instance()
logger.info(f"Scaled down to {self.current_instances} instances")

async def _add_instance(self):
"""Add new processing instance."""
# Implement instance addition logic
pass

async def _remove_instance(self):
"""Remove processing instance."""
# Implement instance removal logic
pass

Load Testing

Implement load testing for performance validation:

import asyncio
import aiohttp
import time
from typing import List, Dict, Any

class LoadTester:
def __init__(self, base_url: str):
self.base_url = base_url
self.results = []

async def run_load_test(self, concurrent_users: int, duration_seconds: int):
"""Run load test with specified parameters."""
print(f"Starting load test: {concurrent_users} users for {duration_seconds} seconds")

start_time = time.time()
tasks = []

# Create tasks for concurrent users
for i in range(concurrent_users):
task = asyncio.create_task(self._simulate_user(i, start_time + duration_seconds))
tasks.append(task)

# Wait for all tasks to complete
await asyncio.gather(*tasks)

# Generate report
self._generate_report()

async def _simulate_user(self, user_id: int, end_time: float):
"""Simulate a single user's behavior."""
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
try:
start_time = time.time()

# Simulate document processing request
async with session.post(
f"{self.base_url}/api/v1/documents",
json={"document_id": f"test_doc_{user_id}_{int(time.time())}"}
) as response:
await response.json()

duration = time.time() - start_time

self.results.append({
'user_id': user_id,
'duration': duration,
'status': response.status,
'timestamp': time.time()
})

except Exception as e:
self.results.append({
'user_id': user_id,
'duration': 0,
'status': 500,
'error': str(e),
'timestamp': time.time()
})

# Random delay between requests
await asyncio.sleep(0.1)

def _generate_report(self):
"""Generate load test report."""
if not self.results:
print("No results to report")
return

successful_requests = [r for r in self.results if r['status'] == 200]
failed_requests = [r for r in self.results if r['status'] != 200]

total_requests = len(self.results)
success_rate = len(successful_requests) / total_requests * 100

durations = [r['duration'] for r in successful_requests]
avg_duration = sum(durations) / len(durations) if durations else 0
max_duration = max(durations) if durations else 0

print(f"\nLoad Test Results:")
print(f"Total Requests: {total_requests}")
print(f"Successful: {len(successful_requests)}")
print(f"Failed: {len(failed_requests)}")
print(f"Success Rate: {success_rate:.2f}%")
print(f"Average Response Time: {avg_duration:.3f}s")
print(f"Max Response Time: {max_duration:.3f}s")
print(f"Requests per Second: {total_requests / (max(r['timestamp'] for r in self.results) - min(r['timestamp'] for r in self.results)):.2f}")

# Usage
load_tester = LoadTester("http://localhost:8000")
asyncio.run(load_tester.run_load_test(concurrent_users=50, duration_seconds=300))

Best Practices

Code Optimization

  1. Use async/await: Leverage asynchronous programming for I/O operations
  2. Implement connection pooling: Reuse database and HTTP connections
  3. Batch operations: Group similar operations together
  4. Cache frequently accessed data: Reduce redundant computations
  5. Profile before optimizing: Measure first, optimize second

System Optimization

  1. Monitor resource usage: Track CPU, memory, and I/O metrics
  2. Implement auto-scaling: Scale based on demand
  3. Optimize database queries: Use indexes and query optimization
  4. Use efficient data structures: Choose appropriate data types
  5. Implement proper error handling: Fail fast and recover gracefully

Operational Optimization

  1. Set up monitoring: Implement comprehensive monitoring
  2. Create performance baselines: Establish performance expectations
  3. Regular performance testing: Validate performance regularly
  4. Capacity planning: Plan for growth and peak loads
  5. Document performance characteristics: Maintain performance documentation

Performance Troubleshooting

Common Performance Issues

  1. High CPU Usage

    • Check for infinite loops
    • Profile CPU-intensive operations
    • Optimize algorithms
    • Consider horizontal scaling
  2. High Memory Usage

    • Check for memory leaks
    • Optimize data structures
    • Implement garbage collection
    • Use memory profiling tools
  3. Slow Database Queries

    • Analyze query execution plans
    • Add missing indexes
    • Optimize query logic
    • Consider database scaling
  4. Network Bottlenecks

    • Implement connection pooling
    • Use compression
    • Optimize payload sizes
    • Consider CDN usage

Performance Debugging

import logging
import time
from contextlib import contextmanager

@contextmanager
def performance_context(operation_name: str):
"""Context manager for performance debugging."""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss

logger.info(f"Starting {operation_name}")

try:
yield
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss

duration = end_time - start_time
memory_delta = end_memory - start_memory

logger.info(f"Completed {operation_name}: "
f"{duration:.3f}s, "
f"{memory_delta/1024/1024:.1f}MB")

# Usage
with performance_context("document_processing"):
# Document processing code
pass

Next Steps