Performance Optimization Guide
This guide provides comprehensive strategies and techniques for optimizing the document ingestion pipeline to handle high-volume document processing efficiently.
Performance Overview
The document ingestion pipeline is designed to scale horizontally and vertically to handle enterprise-level document volumes. This guide covers optimization strategies for different components and scenarios.
Performance Targets
Metric | Target | High Volume |
---|---|---|
Throughput | 100 docs/min | 1000+ docs/min |
Processing Time (P95) | < 5 seconds | < 10 seconds |
Error Rate | < 1% | < 2% |
Memory Usage | < 2GB | < 8GB |
CPU Usage | < 50% | < 80% |
Disk I/O | < 100 MB/s | < 500 MB/s |
System Architecture Optimization
Horizontal Scaling
Load Balancing
Implement load balancing for multiple ingestion instances:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
app = FastAPI()
# Configure for load balancing
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check endpoint for load balancer
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.utcnow()}
# Ready check endpoint
@app.get("/ready")
async def ready_check():
# Check database connectivity
# Check file system access
# Check resource availability
return {"status": "ready"}
Microservices Architecture
Split the pipeline into specialized services:
# docker-compose.yml
version: '3.8'
services:
document-processor:
image: ingestion-processor:latest
replicas: 3
environment:
- WORKER_TYPE=processor
- MAX_CONCURRENT=5
dead-letter-queue:
image: ingestion-dlq:latest
replicas: 2
environment:
- WORKER_TYPE=dlq
- MAX_CONCURRENT=10
monitoring:
image: ingestion-monitoring:latest
replicas: 1
environment:
- WORKER_TYPE=monitoring
api-gateway:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
Vertical Scaling
Resource Optimization
Optimize system resources:
import os
import psutil
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ResourceOptimizer:
def __init__(self):
self.cpu_count = os.cpu_count()
self.memory_gb = psutil.virtual_memory().total // (1024**3)
def get_optimal_concurrency(self):
"""Calculate optimal concurrency based on system resources."""
# CPU-bound: 2x CPU cores
# I/O-bound: 4x CPU cores
# Memory-constrained: Based on available memory
if self.memory_gb < 4:
return min(4, self.cpu_count)
elif self.memory_gb < 8:
return min(8, self.cpu_count * 2)
else:
return min(16, self.cpu_count * 4)
def get_optimal_chunk_size(self):
"""Calculate optimal chunk size based on memory."""
available_memory_mb = psutil.virtual_memory().available // (1024**2)
if available_memory_mb < 1000:
return 500 # 500 words per chunk
elif available_memory_mb < 4000:
return 1000 # 1000 words per chunk
else:
return 2000 # 2000 words per chunk
Database Optimization
Connection Pooling
Optimize database connections:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Optimized connection pool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Base number of connections
max_overflow=30, # Additional connections when needed
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections after 1 hour
pool_timeout=30, # Timeout for getting connection
echo=False # Disable SQL logging in production
)
# Async connection pool
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600,
pool_timeout=30,
)
Query Optimization
Optimize database queries:
-- Create optimized indexes
CREATE INDEX CONCURRENTLY idx_documents_status_created
ON documents(status, created_at);
CREATE INDEX CONCURRENTLY idx_documents_source_processing
ON documents(source, processing_status)
WHERE processing_status = 'processing';
CREATE INDEX CONCURRENTLY idx_dlq_priority_status
ON dlq_items(priority, status, created_at);
-- Partial indexes for common queries
CREATE INDEX CONCURRENTLY idx_metrics_recent
ON metrics(timestamp)
WHERE timestamp > NOW() - INTERVAL '7 days';
-- Optimize frequently used queries
EXPLAIN ANALYZE
SELECT d.*, v.version_number
FROM documents d
JOIN document_versions v ON d.id = v.document_id
WHERE d.status = 'completed'
AND d.created_at > NOW() - INTERVAL '24 hours'
ORDER BY d.created_at DESC
LIMIT 100;
Database Partitioning
Implement table partitioning:
-- Partition metrics table by date
CREATE TABLE metrics (
id SERIAL,
name VARCHAR(255),
value DECIMAL,
timestamp TIMESTAMP,
tags JSONB
) PARTITION BY RANGE (timestamp);
-- Create monthly partitions
CREATE TABLE metrics_2023_12 PARTITION OF metrics
FOR VALUES FROM ('2023-12-01') TO ('2024-01-01');
CREATE TABLE metrics_2024_01 PARTITION OF metrics
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- Automatic partition creation function
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
partition_name text;
end_date date;
BEGIN
partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
end_date := start_date + interval '1 month';
EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
partition_name, table_name, start_date, end_date);
END;
$$ LANGUAGE plpgsql;
Processing Optimization
Async Processing
Implement efficient async processing:
import asyncio
import aiofiles
from asyncio import Semaphore
from typing import List, Dict, Any
class AsyncDocumentProcessor:
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
self.processing_queue = asyncio.Queue(maxsize=1000)
async def process_documents_batch(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple documents concurrently."""
tasks = []
for doc in documents:
task = asyncio.create_task(self._process_single_document(doc))
tasks.append(task)
# Process with controlled concurrency
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'document_id': documents[i].get('id'),
'status': 'failed',
'error': str(result)
})
else:
processed_results.append(result)
return processed_results
async def _process_single_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single document with semaphore control."""
async with self.semaphore:
try:
# Simulate document processing
await asyncio.sleep(0.1) # Replace with actual processing
return {
'document_id': document['id'],
'status': 'success',
'chunks_created': 5,
'processing_time_ms': 100
}
except Exception as e:
return {
'document_id': document['id'],
'status': 'failed',
'error': str(e)
}
Memory Management
Optimize memory usage:
import gc
import tracemalloc
from typing import Generator
class MemoryOptimizedProcessor:
def __init__(self):
self.memory_threshold = 0.8 # 80% memory usage threshold
self.chunk_size = 1000
def process_large_dataset(self, documents: List[Dict]) -> Generator[Dict, None, None]:
"""Process large datasets with memory management."""
tracemalloc.start()
try:
for i in range(0, len(documents), self.chunk_size):
chunk = documents[i:i + self.chunk_size]
# Process chunk
for doc in chunk:
result = self._process_document(doc)
yield result
# Memory management
if self._memory_usage_high():
gc.collect() # Force garbage collection
# Log memory usage
current, peak = tracemalloc.get_traced_memory()
logger.info(f"Memory usage: {current / 1024**2:.1f} MB (peak: {peak / 1024**2:.1f} MB)")
finally:
tracemalloc.stop()
def _memory_usage_high(self) -> bool:
"""Check if memory usage is high."""
memory = psutil.virtual_memory()
return memory.percent > (self.memory_threshold * 100)
def _process_document(self, document: Dict) -> Dict:
"""Process a single document."""
# Implement document processing logic
pass
Caching Strategy
Implement intelligent caching:
import redis
from functools import wraps
import hashlib
import json
class CacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 3600 # 1 hour
def cache_result(self, ttl: int = None):
"""Decorator to cache function results."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_cache_key(func.__name__, args, kwargs)
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute function
result = await func(*args, **kwargs)
# Cache result
self.redis_client.setex(
cache_key,
ttl or self.default_ttl,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
def _generate_cache_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate a unique cache key."""
key_data = {
'function': func_name,
'args': args,
'kwargs': sorted(kwargs.items())
}
key_string = json.dumps(key_data, default=str, sort_keys=True)
return hashlib.md5(key_string.encode()).hexdigest()
# Usage
cache_manager = CacheManager()
@cache_manager.cache_result(ttl=1800) # 30 minutes
async def get_document_metadata(document_id: str) -> Dict:
"""Get document metadata with caching."""
# Expensive database query
pass
File System Optimization
I/O Optimization
Optimize file I/O operations:
import aiofiles
import asyncio
from pathlib import Path
class OptimizedFileHandler:
def __init__(self, buffer_size: int = 8192):
self.buffer_size = buffer_size
async def read_file_chunks(self, file_path: str) -> Generator[bytes, None, None]:
"""Read file in optimized chunks."""
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(self.buffer_size):
yield chunk
async def write_file_optimized(self, file_path: str, data: bytes):
"""Write file with optimized buffering."""
async with aiofiles.open(file_path, 'wb', buffering=self.buffer_size) as f:
await f.write(data)
await f.flush() # Ensure data is written
async def copy_file_optimized(self, src_path: str, dst_path: str):
"""Copy file with optimized buffering."""
async with aiofiles.open(src_path, 'rb') as src:
async with aiofiles.open(dst_path, 'wb', buffering=self.buffer_size) as dst:
while chunk := await src.read(self.buffer_size):
await dst.write(chunk)
Storage Optimization
Optimize storage usage:
import gzip
import shutil
from pathlib import Path
class StorageOptimizer:
def __init__(self, compression_level: int = 6):
self.compression_level = compression_level
async def compress_large_files(self, file_path: str, threshold_mb: int = 10) -> str:
"""Compress files larger than threshold."""
file_size = Path(file_path).stat().st_size
if file_size < threshold_mb * 1024 * 1024:
return file_path
compressed_path = f"{file_path}.gz"
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb', compresslevel=self.compression_level) as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove original file
Path(file_path).unlink()
return compressed_path
async def decompress_file(self, compressed_path: str) -> str:
"""Decompress a gzipped file."""
if not compressed_path.endswith('.gz'):
return compressed_path
original_path = compressed_path[:-3] # Remove .gz extension
with gzip.open(compressed_path, 'rb') as f_in:
with open(original_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
return original_path
def cleanup_old_files(self, directory: str, days_old: int = 30):
"""Clean up old files to save storage."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_old)
directory_path = Path(directory)
for file_path in directory_path.rglob('*'):
if file_path.is_file() and datetime.fromtimestamp(file_path.stat().st_mtime) < cutoff_date:
file_path.unlink()
Network Optimization
Connection Pooling
Optimize network connections:
import aiohttp
import asyncio
from typing import Dict, Any
class OptimizedHTTPClient:
def __init__(self, max_connections: int = 100):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=30,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Make optimized GET request."""
async with self.session.get(url, **kwargs) as response:
return await response.json()
async def post(self, url: str, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Make optimized POST request."""
async with self.session.post(url, json=data, **kwargs) as response:
return await response.json()
# Usage
async def fetch_documents_from_api():
async with OptimizedHTTPClient() as client:
documents = await client.get("https://api.company.com/documents")
return documents
Batch Processing
Implement efficient batch processing:
from typing import List, Dict, Any
import asyncio
class BatchProcessor:
def __init__(self, batch_size: int = 100, max_concurrent_batches: int = 5):
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrent_batches)
async def process_in_batches(self, items: List[Dict[str, Any]],
process_func) -> List[Dict[str, Any]]:
"""Process items in optimized batches."""
results = []
# Split into batches
batches = [items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)]
# Process batches concurrently
batch_tasks = []
for batch in batches:
task = asyncio.create_task(self._process_batch(batch, process_func))
batch_tasks.append(task)
# Wait for all batches to complete
batch_results = await asyncio.gather(*batch_tasks)
# Flatten results
for batch_result in batch_results:
results.extend(batch_result)
return results
async def _process_batch(self, batch: List[Dict[str, Any]],
process_func) -> List[Dict[str, Any]]:
"""Process a single batch with concurrency control."""
async with self.semaphore:
return await process_func(batch)
Monitoring and Profiling
Performance Monitoring
Implement comprehensive performance monitoring:
import time
import psutil
from functools import wraps
from typing import Dict, Any
class PerformanceMonitor:
def __init__(self):
self.metrics = {}
def monitor_function(self, func_name: str = None):
"""Decorator to monitor function performance."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
try:
result = await func(*args, **kwargs)
status = "success"
return result
except Exception as e:
status = "error"
raise
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
# Record metrics
self._record_metrics(
func_name or func.__name__,
end_time - start_time,
end_memory - start_memory,
status
)
return wrapper
return decorator
def _record_metrics(self, func_name: str, duration: float,
memory_delta: int, status: str):
"""Record performance metrics."""
if func_name not in self.metrics:
self.metrics[func_name] = {
'count': 0,
'total_duration': 0,
'total_memory': 0,
'errors': 0
}
metrics = self.metrics[func_name]
metrics['count'] += 1
metrics['total_duration'] += duration
metrics['total_memory'] += memory_delta
if status == "error":
metrics['errors'] += 1
def get_performance_report(self) -> Dict[str, Any]:
"""Generate performance report."""
report = {}
for func_name, metrics in self.metrics.items():
avg_duration = metrics['total_duration'] / metrics['count']
avg_memory = metrics['total_memory'] / metrics['count']
error_rate = metrics['errors'] / metrics['count']
report[func_name] = {
'calls': metrics['count'],
'avg_duration_ms': avg_duration * 1000,
'avg_memory_mb': avg_memory / 1024 / 1024,
'error_rate': error_rate
}
return report
# Usage
monitor = PerformanceMonitor()
@monitor.monitor_function("process_document")
async def process_document(document: Dict[str, Any]) -> Dict[str, Any]:
# Document processing logic
pass
Profiling Tools
Use profiling tools for optimization:
import cProfile
import pstats
import io
from functools import wraps
def profile_function(func):
"""Decorator to profile function performance."""
@wraps(func)
async def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
try:
result = await func(*args, **kwargs)
return result
finally:
profiler.disable()
# Generate profile report
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Top 10 functions
# Log profile results
logger.info(f"Profile for {func.__name__}:\n{s.getvalue()}")
return wrapper
# Memory profiling
import tracemalloc
def profile_memory(func):
"""Decorator to profile memory usage."""
@wraps(func)
async def wrapper(*args, **kwargs):
tracemalloc.start()
try:
result = await func(*args, **kwargs)
return result
finally:
current, peak = tracemalloc.get_traced_memory()
logger.info(f"Memory usage for {func.__name__}: "
f"current={current/1024/1024:.1f}MB, "
f"peak={peak/1024/1024:.1f}MB")
tracemalloc.stop()
return wrapper
Scaling Strategies
Auto-scaling
Implement auto-scaling based on metrics:
import asyncio
import psutil
from typing import Dict, Any
class AutoScaler:
def __init__(self, min_instances: int = 1, max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
self.scaling_metrics = {
'cpu_threshold': 70,
'memory_threshold': 80,
'queue_length_threshold': 100
}
async def monitor_and_scale(self):
"""Monitor metrics and scale accordingly."""
while True:
try:
metrics = await self._collect_metrics()
scaling_action = self._determine_scaling_action(metrics)
if scaling_action != 'none':
await self._execute_scaling_action(scaling_action)
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Auto-scaling error: {e}")
await asyncio.sleep(60)
async def _collect_metrics(self) -> Dict[str, float]:
"""Collect system metrics."""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'queue_length': await self._get_queue_length(),
'processing_time': await self._get_avg_processing_time()
}
def _determine_scaling_action(self, metrics: Dict[str, float]) -> str:
"""Determine scaling action based on metrics."""
cpu_high = metrics['cpu_percent'] > self.scaling_metrics['cpu_threshold']
memory_high = metrics['memory_percent'] > self.scaling_metrics['memory_threshold']
queue_long = metrics['queue_length'] > self.scaling_metrics['queue_length_threshold']
# Scale up conditions
if (cpu_high or memory_high or queue_long) and self.current_instances < self.max_instances:
return 'scale_up'
# Scale down conditions
cpu_low = metrics['cpu_percent'] < 30
memory_low = metrics['memory_percent'] < 40
queue_short = metrics['queue_length'] < 10
if (cpu_low and memory_low and queue_short) and self.current_instances > self.min_instances:
return 'scale_down'
return 'none'
async def _execute_scaling_action(self, action: str):
"""Execute scaling action."""
if action == 'scale_up':
self.current_instances += 1
await self._add_instance()
logger.info(f"Scaled up to {self.current_instances} instances")
elif action == 'scale_down':
self.current_instances -= 1
await self._remove_instance()
logger.info(f"Scaled down to {self.current_instances} instances")
async def _add_instance(self):
"""Add new processing instance."""
# Implement instance addition logic
pass
async def _remove_instance(self):
"""Remove processing instance."""
# Implement instance removal logic
pass
Load Testing
Implement load testing for performance validation:
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
class LoadTester:
def __init__(self, base_url: str):
self.base_url = base_url
self.results = []
async def run_load_test(self, concurrent_users: int, duration_seconds: int):
"""Run load test with specified parameters."""
print(f"Starting load test: {concurrent_users} users for {duration_seconds} seconds")
start_time = time.time()
tasks = []
# Create tasks for concurrent users
for i in range(concurrent_users):
task = asyncio.create_task(self._simulate_user(i, start_time + duration_seconds))
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks)
# Generate report
self._generate_report()
async def _simulate_user(self, user_id: int, end_time: float):
"""Simulate a single user's behavior."""
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
try:
start_time = time.time()
# Simulate document processing request
async with session.post(
f"{self.base_url}/api/v1/documents",
json={"document_id": f"test_doc_{user_id}_{int(time.time())}"}
) as response:
await response.json()
duration = time.time() - start_time
self.results.append({
'user_id': user_id,
'duration': duration,
'status': response.status,
'timestamp': time.time()
})
except Exception as e:
self.results.append({
'user_id': user_id,
'duration': 0,
'status': 500,
'error': str(e),
'timestamp': time.time()
})
# Random delay between requests
await asyncio.sleep(0.1)
def _generate_report(self):
"""Generate load test report."""
if not self.results:
print("No results to report")
return
successful_requests = [r for r in self.results if r['status'] == 200]
failed_requests = [r for r in self.results if r['status'] != 200]
total_requests = len(self.results)
success_rate = len(successful_requests) / total_requests * 100
durations = [r['duration'] for r in successful_requests]
avg_duration = sum(durations) / len(durations) if durations else 0
max_duration = max(durations) if durations else 0
print(f"\nLoad Test Results:")
print(f"Total Requests: {total_requests}")
print(f"Successful: {len(successful_requests)}")
print(f"Failed: {len(failed_requests)}")
print(f"Success Rate: {success_rate:.2f}%")
print(f"Average Response Time: {avg_duration:.3f}s")
print(f"Max Response Time: {max_duration:.3f}s")
print(f"Requests per Second: {total_requests / (max(r['timestamp'] for r in self.results) - min(r['timestamp'] for r in self.results)):.2f}")
# Usage
load_tester = LoadTester("http://localhost:8000")
asyncio.run(load_tester.run_load_test(concurrent_users=50, duration_seconds=300))
Best Practices
Code Optimization
- Use async/await: Leverage asynchronous programming for I/O operations
- Implement connection pooling: Reuse database and HTTP connections
- Batch operations: Group similar operations together
- Cache frequently accessed data: Reduce redundant computations
- Profile before optimizing: Measure first, optimize second
System Optimization
- Monitor resource usage: Track CPU, memory, and I/O metrics
- Implement auto-scaling: Scale based on demand
- Optimize database queries: Use indexes and query optimization
- Use efficient data structures: Choose appropriate data types
- Implement proper error handling: Fail fast and recover gracefully
Operational Optimization
- Set up monitoring: Implement comprehensive monitoring
- Create performance baselines: Establish performance expectations
- Regular performance testing: Validate performance regularly
- Capacity planning: Plan for growth and peak loads
- Document performance characteristics: Maintain performance documentation
Performance Troubleshooting
Common Performance Issues
-
High CPU Usage
- Check for infinite loops
- Profile CPU-intensive operations
- Optimize algorithms
- Consider horizontal scaling
-
High Memory Usage
- Check for memory leaks
- Optimize data structures
- Implement garbage collection
- Use memory profiling tools
-
Slow Database Queries
- Analyze query execution plans
- Add missing indexes
- Optimize query logic
- Consider database scaling
-
Network Bottlenecks
- Implement connection pooling
- Use compression
- Optimize payload sizes
- Consider CDN usage
Performance Debugging
import logging
import time
from contextlib import contextmanager
@contextmanager
def performance_context(operation_name: str):
"""Context manager for performance debugging."""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
logger.info(f"Starting {operation_name}")
try:
yield
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
duration = end_time - start_time
memory_delta = end_memory - start_memory
logger.info(f"Completed {operation_name}: "
f"{duration:.3f}s, "
f"{memory_delta/1024/1024:.1f}MB")
# Usage
with performance_context("document_processing"):
# Document processing code
pass
Next Steps
- Deployment Guide - Production deployment
- Security Guide - Security best practices
- Operational Runbooks - Day-to-day operations