Document Ingestion Pipeline Deployment
This guide provides comprehensive instructions for deploying the enterprise document ingestion pipeline in production environments.
Overview
The document ingestion pipeline is designed to handle thousands of documents daily from various sources with automatic error recovery and comprehensive monitoring. It consists of multiple components that work together to provide a robust, scalable solution.
Architecture Components
Core Components
- Document Processor - Handles multiple formats (PDF, DOCX, HTML, CSV, JSON)
- Retry Manager - Implements intelligent retry logic with exponential backoff
- Error Classifier - Distinguishes recoverable vs permanent failures
- Dead Letter Queue - Manages failed documents with manual review workflows
- Partial Processor - Handles large documents in chunks
- Version Manager - Tracks document versions and changes
- Monitoring System - Provides metrics, health checks, and alerting
Data Flow
Prerequisites
System Requirements
- Python: 3.8 or higher
- Memory: Minimum 4GB RAM (8GB+ recommended for high-volume processing)
- Storage: SSD recommended for database operations
- CPU: Multi-core processor recommended for concurrent processing
Dependencies
# Core dependencies
pip install asyncio aiosqlite aiofiles
# Document processing
pip install pypdf python-docx beautifulsoup4 pandas
# Monitoring (optional)
pip install psutil
# Development dependencies
pip install pytest pytest-asyncio
Installation
1. Clone the Repository
git clone <repository-url>
cd recoagent
2. Install Dependencies
pip install -r requirements.txt
3. Configuration
Create a configuration file config.yaml
:
# Database Configuration
databases:
main: "ingestion_pipeline.db"
dlq: "dlq.db"
monitoring: "monitoring.db"
versioning: "versions.db"
# Processing Configuration
processing:
max_concurrent_documents: 10
chunk_size: 1000
chunk_overlap: 200
max_memory_size: 104857600 # 100MB
# Retry Configuration
retry:
max_retries: 3
base_delay: 1.0
max_delay: 60.0
exponential_base: 2.0
jitter: true
# Monitoring Configuration
monitoring:
health_check_interval: 60
metrics_collection_interval: 60
alert_check_interval: 300
# Dead Letter Queue Configuration
dlq:
cleanup_days: 30
max_review_age_days: 7
auto_retry_failed: true
# Versioning Configuration
versioning:
archive_days: 30
keep_latest_versions: 5
change_detection: true
Deployment Options
Option 1: Standalone Deployment
For small to medium-scale deployments:
from recoagent.ingestion import EnterpriseIngestionPipeline
# Initialize pipeline
pipeline = EnterpriseIngestionPipeline(
db_path="production.db",
max_concurrent_documents=5
)
# Start processing
await pipeline.initialize()
Option 2: Docker Deployment
Create a Dockerfile
:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY recoagent/ ./recoagent/
COPY config.yaml .
EXPOSE 8000
CMD ["python", "-m", "recoagent.ingestion.main"]
Build and run:
docker build -t ingestion-pipeline .
docker run -d --name ingestion-pipeline \
-v $(pwd)/data:/app/data \
-p 8000:8000 \
ingestion-pipeline
Option 3: Kubernetes Deployment
Create k8s-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: ingestion-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: ingestion-pipeline
template:
metadata:
labels:
app: ingestion-pipeline
spec:
containers:
- name: ingestion-pipeline
image: ingestion-pipeline:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_PATH
value: "/data/ingestion.db"
- name: MAX_CONCURRENT_DOCUMENTS
value: "10"
volumeMounts:
- name: data-volume
mountPath: /data
volumes:
- name: data-volume
persistentVolumeClaim:
claimName: ingestion-data-pvc
---
apiVersion: v1
kind: Service
metadata:
name: ingestion-pipeline-service
spec:
selector:
app: ingestion-pipeline
ports:
- port: 8000
targetPort: 8000
type: LoadBalancer
Deploy:
kubectl apply -f k8s-deployment.yaml
Configuration
Environment Variables
Variable | Description | Default |
---|---|---|
DATABASE_PATH | Main database path | ingestion_pipeline.db |
DLQ_DB_PATH | Dead letter queue database path | dlq.db |
MONITORING_DB_PATH | Monitoring database path | monitoring.db |
VERSIONING_DB_PATH | Versioning database path | versions.db |
MAX_CONCURRENT_DOCUMENTS | Maximum concurrent documents | 10 |
LOG_LEVEL | Logging level | INFO |
RETRY_MAX_ATTEMPTS | Maximum retry attempts | 3 |
Database Configuration
The pipeline uses SQLite databases by default. For production environments, consider:
- PostgreSQL: For better concurrency and performance
- MySQL: For compatibility with existing infrastructure
- SQLite: For simple deployments (default)
To use PostgreSQL:
# Update database configuration
pipeline = EnterpriseIngestionPipeline(
db_path="postgresql://user:pass@localhost/ingestion",
# ... other config
)
Monitoring and Alerting
Health Checks
The pipeline provides several health checks:
- Database Connectivity: Verifies database connections
- File System Access: Checks file system permissions
- Memory Usage: Monitors memory consumption
- Disk Space: Checks available disk space
- Processing Queue: Monitors queue health
- Error Rate: Tracks processing error rates
Metrics
Key metrics collected:
ingestion.documents.started
- Documents started processingingestion.documents.successful
- Successfully processed documentsingestion.documents.failed
- Failed documentsingestion.processing_time_ms
- Processing time histogramingestion.error_rate_percentage
- Error rate gaugesystem.memory_usage_percentage
- System memory usagesystem.disk_usage_percentage
- Disk usage
Alerting Rules
Default alert rules:
- High Error Rate: Error rate > 10%
- Low Throughput: Throughput < 5 docs/min
- High DLQ Items: DLQ items > 100
- High Processing Time: P95 processing time > 30s
- Low Disk Space: Disk space < 10%
- High Memory Usage: Memory usage > 90%
Alert Handlers
Configure alert handlers:
async def email_alert_handler(alert):
"""Send email alerts for critical issues."""
if alert.severity == AlertSeverity.CRITICAL:
# Send email
pass
async def slack_alert_handler(alert):
"""Send Slack notifications."""
if alert.severity in [AlertSeverity.HIGH, AlertSeverity.CRITICAL]:
# Send to Slack
pass
# Register handlers
pipeline.monitor.add_alert_handler(email_alert_handler)
pipeline.monitor.add_alert_handler(slack_alert_handler)
Performance Tuning
Concurrency Settings
Adjust based on your system:
# CPU-bound workloads
max_concurrent_documents = cpu_count * 2
# I/O-bound workloads
max_concurrent_documents = cpu_count * 4
# Memory-constrained systems
max_concurrent_documents = min(4, available_memory_gb // 2)
Database Optimization
For SQLite:
-- Enable WAL mode for better concurrency
PRAGMA journal_mode=WAL;
-- Increase cache size
PRAGMA cache_size=10000;
-- Enable foreign keys
PRAGMA foreign_keys=ON;
Memory Management
# Adjust chunk sizes based on available memory
chunk_size = min(1000, available_memory_mb // 10)
# Limit large document processing
max_memory_size = available_memory_mb * 0.8
Security Considerations
File Access
- Restrict file system access to necessary directories
- Use non-root user for running the pipeline
- Implement file size limits to prevent DoS attacks
Database Security
- Use connection strings with credentials
- Enable SSL/TLS for database connections
- Regularly rotate database credentials
Network Security
- Use HTTPS for API endpoints
- Implement rate limiting
- Use API keys for authentication
Backup and Recovery
Database Backup
# SQLite backup
sqlite3 ingestion.db ".backup backup_$(date +%Y%m%d).db"
# PostgreSQL backup
pg_dump -h localhost -U user ingestion > backup_$(date +%Y%m%d).sql
Automated Backups
Create a backup script:
#!/bin/bash
BACKUP_DIR="/backups/ingestion"
DATE=$(date +%Y%m%d_%H%M%S)
# Create backup directory
mkdir -p $BACKUP_DIR
# Backup databases
for db in ingestion.db dlq.db monitoring.db versions.db; do
if [ -f "$db" ]; then
sqlite3 "$db" ".backup $BACKUP_DIR/${db}_${DATE}.backup"
fi
done
# Clean up old backups (keep 30 days)
find $BACKUP_DIR -name "*.backup" -mtime +30 -delete
Schedule with cron:
# Daily backup at 2 AM
0 2 * * * /path/to/backup_script.sh
Disaster Recovery
- RTO (Recovery Time Objective): 1 hour
- RPO (Recovery Point Objective): 15 minutes
Recovery procedure:
- Stop the pipeline
- Restore database backups
- Verify data integrity
- Restart the pipeline
- Monitor for issues
Troubleshooting
Common Issues
-
Database Locked Errors
- Check for long-running transactions
- Verify WAL mode is enabled
- Consider connection pooling
-
Memory Issues
- Reduce
max_concurrent_documents
- Increase chunk sizes
- Monitor memory usage
- Reduce
-
High Error Rates
- Check file permissions
- Verify document formats
- Review error logs
-
Performance Issues
- Enable database indexes
- Optimize chunk sizes
- Consider horizontal scaling
Log Analysis
Key log patterns to monitor:
# Error patterns
grep "ERROR" ingestion.log | tail -100
# Performance issues
grep "processing_time" ingestion.log | awk '{print $NF}' | sort -n
# High error rates
grep "failed" ingestion.log | wc -l
Health Check Endpoints
The pipeline exposes health check endpoints:
GET /health
- Overall health statusGET /health/database
- Database connectivityGET /health/filesystem
- File system accessGET /metrics
- Prometheus metricsGET /stats
- Processing statistics
Scaling
Vertical Scaling
Increase system resources:
- CPU: Add more cores for concurrent processing
- Memory: Increase RAM for larger documents
- Storage: Use faster SSDs for better I/O
Horizontal Scaling
Deploy multiple instances:
- Load Balancer: Distribute documents across instances
- Shared Storage: Use network-attached storage
- Database Clustering: Use database clustering for high availability
Microservices Architecture
Split into separate services:
- Document Processor Service: Handle document processing
- Dead Letter Queue Service: Manage failed documents
- Monitoring Service: Collect metrics and alerts
- Versioning Service: Track document versions
Maintenance
Regular Tasks
- Daily: Monitor error rates and performance
- Weekly: Review dead letter queue items
- Monthly: Clean up old data and backups
- Quarterly: Review and update alert thresholds
Updates
- Backup current deployment
- Test new version in staging
- Deploy during maintenance window
- Monitor for issues post-deployment
- Rollback if necessary
Capacity Planning
Monitor these metrics for capacity planning:
- Document processing rate
- Error rates
- Resource utilization
- Queue depths
- Storage growth
Plan for 20% growth in document volume and 50% buffer for peak loads.
Support
For deployment issues:
- Check logs for error messages
- Review monitoring dashboards
- Verify configuration settings
- Test connectivity to external systems
- Contact support at support@recohut.com
Next Steps
- Security Configuration - Security best practices
- Operational Runbooks - Day-to-day operations
- Performance Optimization - Tuning for high volume