Skip to main content

Document Ingestion Pipeline Deployment

This guide provides comprehensive instructions for deploying the enterprise document ingestion pipeline in production environments.

Overview

The document ingestion pipeline is designed to handle thousands of documents daily from various sources with automatic error recovery and comprehensive monitoring. It consists of multiple components that work together to provide a robust, scalable solution.

Architecture Components

Core Components

  1. Document Processor - Handles multiple formats (PDF, DOCX, HTML, CSV, JSON)
  2. Retry Manager - Implements intelligent retry logic with exponential backoff
  3. Error Classifier - Distinguishes recoverable vs permanent failures
  4. Dead Letter Queue - Manages failed documents with manual review workflows
  5. Partial Processor - Handles large documents in chunks
  6. Version Manager - Tracks document versions and changes
  7. Monitoring System - Provides metrics, health checks, and alerting

Data Flow

Prerequisites

System Requirements

  • Python: 3.8 or higher
  • Memory: Minimum 4GB RAM (8GB+ recommended for high-volume processing)
  • Storage: SSD recommended for database operations
  • CPU: Multi-core processor recommended for concurrent processing

Dependencies

# Core dependencies
pip install asyncio aiosqlite aiofiles

# Document processing
pip install pypdf python-docx beautifulsoup4 pandas

# Monitoring (optional)
pip install psutil

# Development dependencies
pip install pytest pytest-asyncio

Installation

1. Clone the Repository

git clone <repository-url>
cd recoagent

2. Install Dependencies

pip install -r requirements.txt

3. Configuration

Create a configuration file config.yaml:

# Database Configuration
databases:
main: "ingestion_pipeline.db"
dlq: "dlq.db"
monitoring: "monitoring.db"
versioning: "versions.db"

# Processing Configuration
processing:
max_concurrent_documents: 10
chunk_size: 1000
chunk_overlap: 200
max_memory_size: 104857600 # 100MB

# Retry Configuration
retry:
max_retries: 3
base_delay: 1.0
max_delay: 60.0
exponential_base: 2.0
jitter: true

# Monitoring Configuration
monitoring:
health_check_interval: 60
metrics_collection_interval: 60
alert_check_interval: 300

# Dead Letter Queue Configuration
dlq:
cleanup_days: 30
max_review_age_days: 7
auto_retry_failed: true

# Versioning Configuration
versioning:
archive_days: 30
keep_latest_versions: 5
change_detection: true

Deployment Options

Option 1: Standalone Deployment

For small to medium-scale deployments:

from recoagent.ingestion import EnterpriseIngestionPipeline

# Initialize pipeline
pipeline = EnterpriseIngestionPipeline(
db_path="production.db",
max_concurrent_documents=5
)

# Start processing
await pipeline.initialize()

Option 2: Docker Deployment

Create a Dockerfile:

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY recoagent/ ./recoagent/
COPY config.yaml .

EXPOSE 8000

CMD ["python", "-m", "recoagent.ingestion.main"]

Build and run:

docker build -t ingestion-pipeline .
docker run -d --name ingestion-pipeline \
-v $(pwd)/data:/app/data \
-p 8000:8000 \
ingestion-pipeline

Option 3: Kubernetes Deployment

Create k8s-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: ingestion-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: ingestion-pipeline
template:
metadata:
labels:
app: ingestion-pipeline
spec:
containers:
- name: ingestion-pipeline
image: ingestion-pipeline:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_PATH
value: "/data/ingestion.db"
- name: MAX_CONCURRENT_DOCUMENTS
value: "10"
volumeMounts:
- name: data-volume
mountPath: /data
volumes:
- name: data-volume
persistentVolumeClaim:
claimName: ingestion-data-pvc
---
apiVersion: v1
kind: Service
metadata:
name: ingestion-pipeline-service
spec:
selector:
app: ingestion-pipeline
ports:
- port: 8000
targetPort: 8000
type: LoadBalancer

Deploy:

kubectl apply -f k8s-deployment.yaml

Configuration

Environment Variables

VariableDescriptionDefault
DATABASE_PATHMain database pathingestion_pipeline.db
DLQ_DB_PATHDead letter queue database pathdlq.db
MONITORING_DB_PATHMonitoring database pathmonitoring.db
VERSIONING_DB_PATHVersioning database pathversions.db
MAX_CONCURRENT_DOCUMENTSMaximum concurrent documents10
LOG_LEVELLogging levelINFO
RETRY_MAX_ATTEMPTSMaximum retry attempts3

Database Configuration

The pipeline uses SQLite databases by default. For production environments, consider:

  1. PostgreSQL: For better concurrency and performance
  2. MySQL: For compatibility with existing infrastructure
  3. SQLite: For simple deployments (default)

To use PostgreSQL:

# Update database configuration
pipeline = EnterpriseIngestionPipeline(
db_path="postgresql://user:pass@localhost/ingestion",
# ... other config
)

Monitoring and Alerting

Health Checks

The pipeline provides several health checks:

  • Database Connectivity: Verifies database connections
  • File System Access: Checks file system permissions
  • Memory Usage: Monitors memory consumption
  • Disk Space: Checks available disk space
  • Processing Queue: Monitors queue health
  • Error Rate: Tracks processing error rates

Metrics

Key metrics collected:

  • ingestion.documents.started - Documents started processing
  • ingestion.documents.successful - Successfully processed documents
  • ingestion.documents.failed - Failed documents
  • ingestion.processing_time_ms - Processing time histogram
  • ingestion.error_rate_percentage - Error rate gauge
  • system.memory_usage_percentage - System memory usage
  • system.disk_usage_percentage - Disk usage

Alerting Rules

Default alert rules:

  1. High Error Rate: Error rate > 10%
  2. Low Throughput: Throughput < 5 docs/min
  3. High DLQ Items: DLQ items > 100
  4. High Processing Time: P95 processing time > 30s
  5. Low Disk Space: Disk space < 10%
  6. High Memory Usage: Memory usage > 90%

Alert Handlers

Configure alert handlers:

async def email_alert_handler(alert):
"""Send email alerts for critical issues."""
if alert.severity == AlertSeverity.CRITICAL:
# Send email
pass

async def slack_alert_handler(alert):
"""Send Slack notifications."""
if alert.severity in [AlertSeverity.HIGH, AlertSeverity.CRITICAL]:
# Send to Slack
pass

# Register handlers
pipeline.monitor.add_alert_handler(email_alert_handler)
pipeline.monitor.add_alert_handler(slack_alert_handler)

Performance Tuning

Concurrency Settings

Adjust based on your system:

# CPU-bound workloads
max_concurrent_documents = cpu_count * 2

# I/O-bound workloads
max_concurrent_documents = cpu_count * 4

# Memory-constrained systems
max_concurrent_documents = min(4, available_memory_gb // 2)

Database Optimization

For SQLite:

-- Enable WAL mode for better concurrency
PRAGMA journal_mode=WAL;

-- Increase cache size
PRAGMA cache_size=10000;

-- Enable foreign keys
PRAGMA foreign_keys=ON;

Memory Management

# Adjust chunk sizes based on available memory
chunk_size = min(1000, available_memory_mb // 10)

# Limit large document processing
max_memory_size = available_memory_mb * 0.8

Security Considerations

File Access

  • Restrict file system access to necessary directories
  • Use non-root user for running the pipeline
  • Implement file size limits to prevent DoS attacks

Database Security

  • Use connection strings with credentials
  • Enable SSL/TLS for database connections
  • Regularly rotate database credentials

Network Security

  • Use HTTPS for API endpoints
  • Implement rate limiting
  • Use API keys for authentication

Backup and Recovery

Database Backup

# SQLite backup
sqlite3 ingestion.db ".backup backup_$(date +%Y%m%d).db"

# PostgreSQL backup
pg_dump -h localhost -U user ingestion > backup_$(date +%Y%m%d).sql

Automated Backups

Create a backup script:

#!/bin/bash
BACKUP_DIR="/backups/ingestion"
DATE=$(date +%Y%m%d_%H%M%S)

# Create backup directory
mkdir -p $BACKUP_DIR

# Backup databases
for db in ingestion.db dlq.db monitoring.db versions.db; do
if [ -f "$db" ]; then
sqlite3 "$db" ".backup $BACKUP_DIR/${db}_${DATE}.backup"
fi
done

# Clean up old backups (keep 30 days)
find $BACKUP_DIR -name "*.backup" -mtime +30 -delete

Schedule with cron:

# Daily backup at 2 AM
0 2 * * * /path/to/backup_script.sh

Disaster Recovery

  1. RTO (Recovery Time Objective): 1 hour
  2. RPO (Recovery Point Objective): 15 minutes

Recovery procedure:

  1. Stop the pipeline
  2. Restore database backups
  3. Verify data integrity
  4. Restart the pipeline
  5. Monitor for issues

Troubleshooting

Common Issues

  1. Database Locked Errors

    • Check for long-running transactions
    • Verify WAL mode is enabled
    • Consider connection pooling
  2. Memory Issues

    • Reduce max_concurrent_documents
    • Increase chunk sizes
    • Monitor memory usage
  3. High Error Rates

    • Check file permissions
    • Verify document formats
    • Review error logs
  4. Performance Issues

    • Enable database indexes
    • Optimize chunk sizes
    • Consider horizontal scaling

Log Analysis

Key log patterns to monitor:

# Error patterns
grep "ERROR" ingestion.log | tail -100

# Performance issues
grep "processing_time" ingestion.log | awk '{print $NF}' | sort -n

# High error rates
grep "failed" ingestion.log | wc -l

Health Check Endpoints

The pipeline exposes health check endpoints:

  • GET /health - Overall health status
  • GET /health/database - Database connectivity
  • GET /health/filesystem - File system access
  • GET /metrics - Prometheus metrics
  • GET /stats - Processing statistics

Scaling

Vertical Scaling

Increase system resources:

  • CPU: Add more cores for concurrent processing
  • Memory: Increase RAM for larger documents
  • Storage: Use faster SSDs for better I/O

Horizontal Scaling

Deploy multiple instances:

  1. Load Balancer: Distribute documents across instances
  2. Shared Storage: Use network-attached storage
  3. Database Clustering: Use database clustering for high availability

Microservices Architecture

Split into separate services:

  • Document Processor Service: Handle document processing
  • Dead Letter Queue Service: Manage failed documents
  • Monitoring Service: Collect metrics and alerts
  • Versioning Service: Track document versions

Maintenance

Regular Tasks

  1. Daily: Monitor error rates and performance
  2. Weekly: Review dead letter queue items
  3. Monthly: Clean up old data and backups
  4. Quarterly: Review and update alert thresholds

Updates

  1. Backup current deployment
  2. Test new version in staging
  3. Deploy during maintenance window
  4. Monitor for issues post-deployment
  5. Rollback if necessary

Capacity Planning

Monitor these metrics for capacity planning:

  • Document processing rate
  • Error rates
  • Resource utilization
  • Queue depths
  • Storage growth

Plan for 20% growth in document volume and 50% buffer for peak loads.

Support

For deployment issues:

  1. Check logs for error messages
  2. Review monitoring dashboards
  3. Verify configuration settings
  4. Test connectivity to external systems
  5. Contact support at support@recohut.com

Next Steps