Skip to main content

Security Best Practices for Document Ingestion Pipeline

This guide outlines comprehensive security measures and best practices for securing the document ingestion pipeline in enterprise environments.

Security Overview

The document ingestion pipeline handles sensitive enterprise documents and must implement robust security measures to protect data confidentiality, integrity, and availability.

Security Principles

  1. Defense in Depth: Multiple layers of security controls
  2. Least Privilege: Minimum necessary access rights
  3. Zero Trust: Verify everything, trust nothing
  4. Security by Design: Built-in security from the ground up
  5. Continuous Monitoring: Ongoing security assessment

Authentication and Authorization

User Authentication

Implement strong authentication mechanisms:

from recoagent.ingestion.security.auth import AuthenticationManager

# JWT-based authentication
auth_manager = AuthenticationManager(
secret_key="your-secret-key",
algorithm="HS256",
token_expiry_hours=8
)

# API key authentication
api_key_auth = APIKeyAuth(
key_header="X-API-Key",
allowed_keys=["key1", "key2"]
)

Role-Based Access Control (RBAC)

Define roles and permissions:

# roles.yaml
roles:
admin:
permissions:
- "ingestion:read"
- "ingestion:write"
- "ingestion:delete"
- "monitoring:read"
- "dlq:manage"

operator:
permissions:
- "ingestion:read"
- "ingestion:write"
- "monitoring:read"

viewer:
permissions:
- "ingestion:read"
- "monitoring:read"

API Security

Secure API endpoints:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

app = FastAPI()
security = HTTPBearer()

@app.post("/api/v1/documents")
async def process_document(
document: UploadFile,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
# Verify token
user = await auth_manager.verify_token(credentials.credentials)

# Check permissions
if not user.has_permission("ingestion:write"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)

# Process document
return await pipeline.process_document(document)

Data Protection

Encryption at Rest

Encrypt sensitive data in databases:

from cryptography.fernet import Fernet

class EncryptedStorage:
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)

def encrypt_data(self, data: str) -> str:
return self.cipher.encrypt(data.encode()).decode()

def decrypt_data(self, encrypted_data: str) -> str:
return self.cipher.decrypt(encrypted_data.encode()).decode()

# Usage in pipeline
encryption_key = Fernet.generate_key()
storage = EncryptedStorage(encryption_key)

# Encrypt document metadata
encrypted_metadata = storage.encrypt_data(json.dumps(metadata))

Encryption in Transit

Use HTTPS/TLS for all communications:

import ssl

# SSL context configuration
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain("server.crt", "server.key")
ssl_context.load_verify_locations("ca.crt")

# FastAPI with HTTPS
app = FastAPI()
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl_context=ssl_context
)

Data Classification

Classify documents by sensitivity:

from enum import Enum

class DataClassification(str, Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"

class DocumentClassifier:
def classify_document(self, content: str, metadata: dict) -> DataClassification:
# Implement classification logic
if "confidential" in content.lower():
return DataClassification.CONFIDENTIAL
elif "internal" in content.lower():
return DataClassification.INTERNAL
else:
return DataClassification.PUBLIC

Data Masking

Mask sensitive data in logs and monitoring:

import re
from typing import Any

class DataMasker:
def __init__(self):
self.patterns = [
(r'\b\d{4}-\d{4}-\d{4}-\d{4}\b', '****-****-****-****'), # Credit cards
(r'\b\d{3}-\d{2}-\d{4}\b', '***-**-****'), # SSN
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***'), # Email
]

def mask_data(self, data: str) -> str:
masked_data = data
for pattern, replacement in self.patterns:
masked_data = re.sub(pattern, replacement, masked_data)
return masked_data

def mask_log_entry(self, log_entry: dict) -> dict:
masked_entry = log_entry.copy()
for key, value in masked_entry.items():
if isinstance(value, str):
masked_entry[key] = self.mask_data(value)
return masked_entry

Input Validation and Sanitization

File Validation

Validate uploaded files:

import magic
from pathlib import Path

class FileValidator:
def __init__(self):
self.allowed_mime_types = {
'application/pdf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'text/html',
'text/csv',
'application/json',
'text/plain'
}
self.max_file_size = 100 * 1024 * 1024 # 100MB

async def validate_file(self, file_path: str) -> bool:
# Check file size
file_size = Path(file_path).stat().st_size
if file_size > self.max_file_size:
raise ValueError(f"File too large: {file_size} bytes")

# Check MIME type
mime_type = magic.from_file(file_path, mime=True)
if mime_type not in self.allowed_mime_types:
raise ValueError(f"Unsupported file type: {mime_type}")

# Check for malicious content
await self._scan_for_malware(file_path)

return True

async def _scan_for_malware(self, file_path: str):
# Implement malware scanning
# This could integrate with ClamAV or similar
pass

Content Sanitization

Sanitize document content:

import html
import bleach

class ContentSanitizer:
def __init__(self):
self.allowed_tags = [
'p', 'br', 'strong', 'em', 'ul', 'ol', 'li',
'h1', 'h2', 'h3', 'h4', 'h5', 'h6'
]
self.allowed_attributes = {
'a': ['href', 'title'],
'img': ['src', 'alt', 'title']
}

def sanitize_html(self, html_content: str) -> str:
return bleach.clean(
html_content,
tags=self.allowed_tags,
attributes=self.allowed_attributes,
strip=True
)

def sanitize_text(self, text_content: str) -> str:
# Remove potentially dangerous characters
sanitized = html.escape(text_content)
# Remove control characters
sanitized = ''.join(char for char in sanitized if ord(char) >= 32)
return sanitized

Rate Limiting

Implement rate limiting:

from collections import defaultdict, deque
import time
import asyncio

class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(deque)

async def is_allowed(self, client_id: str) -> bool:
now = time.time()
client_requests = self.requests[client_id]

# Remove old requests
while client_requests and client_requests[0] <= now - self.time_window:
client_requests.popleft()

# Check if under limit
if len(client_requests) >= self.max_requests:
return False

# Add current request
client_requests.append(now)
return True

# Usage
rate_limiter = RateLimiter(max_requests=100, time_window=3600) # 100 requests per hour

@app.post("/api/v1/documents")
async def process_document(
request: Request,
document: UploadFile,
current_user: User = Depends(get_current_user)
):
client_id = request.client.host

if not await rate_limiter.is_allowed(client_id):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded"
)

# Process document
return await pipeline.process_document(document)

Network Security

Firewall Configuration

Configure firewalls to restrict access:

# iptables rules for document ingestion server
# Allow SSH (port 22)
iptables -A INPUT -p tcp --dport 22 -j ACCEPT

# Allow HTTPS (port 443)
iptables -A INPUT -p tcp --dport 443 -j ACCEPT

# Allow internal API (port 8000) only from internal network
iptables -A INPUT -p tcp --dport 8000 -s 10.0.0.0/8 -j ACCEPT

# Drop all other traffic
iptables -A INPUT -j DROP

VPN Access

Require VPN access for administrative functions:

from fastapi import Request

class VPNValidator:
def __init__(self, allowed_networks: list):
self.allowed_networks = allowed_networks

def validate_vpn_access(self, request: Request) -> bool:
client_ip = request.client.host

for network in self.allowed_networks:
if self._ip_in_network(client_ip, network):
return True

return False

def _ip_in_network(self, ip: str, network: str) -> bool:
# Implement IP network checking
import ipaddress
return ipaddress.ip_address(ip) in ipaddress.ip_network(network)

# Usage
vpn_validator = VPNValidator(["10.0.0.0/8", "192.168.0.0/16"])

@app.get("/admin/health")
async def admin_health(request: Request):
if not vpn_validator.validate_vpn_access(request):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="VPN access required"
)

return {"status": "healthy"}

TLS Configuration

Secure TLS configuration:

import ssl

# Strong TLS configuration
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ssl_context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')

# HSTS headers
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
return response

Database Security

Connection Security

Secure database connections:

import ssl
from sqlalchemy import create_engine

# PostgreSQL with SSL
DATABASE_URL = "postgresql://user:pass@localhost/ingestion?sslmode=require"

# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

engine = create_engine(
DATABASE_URL,
connect_args={"sslmode": "require", "sslcert": "client.crt", "sslkey": "client.key"}
)

Database Encryption

Encrypt sensitive database fields:

from sqlalchemy import Column, String, LargeBinary
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class EncryptedDocument(Base):
__tablename__ = 'encrypted_documents'

id = Column(String, primary_key=True)
encrypted_content = Column(LargeBinary) # Encrypted content
encryption_key_id = Column(String) # Key identifier

def encrypt_content(self, content: str, key: bytes):
from cryptography.fernet import Fernet
cipher = Fernet(key)
self.encrypted_content = cipher.encrypt(content.encode())

Access Control

Implement database access controls:

-- Create role for ingestion pipeline
CREATE ROLE ingestion_pipeline;

-- Grant necessary permissions
GRANT SELECT, INSERT, UPDATE ON documents TO ingestion_pipeline;
GRANT SELECT, INSERT, UPDATE ON document_versions TO ingestion_pipeline;
GRANT SELECT, INSERT, UPDATE ON dead_letter_queue TO ingestion_pipeline;

-- Create user with limited access
CREATE USER pipeline_user WITH PASSWORD 'secure_password';
GRANT ingestion_pipeline TO pipeline_user;

-- Revoke unnecessary permissions
REVOKE CREATE, DROP, ALTER ON DATABASE ingestion FROM pipeline_user;

Monitoring and Auditing

Security Event Logging

Log security-relevant events:

import logging
from datetime import datetime

class SecurityLogger:
def __init__(self):
self.logger = logging.getLogger('security')
handler = logging.FileHandler('security.log')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

def log_authentication_attempt(self, user: str, success: bool, ip: str):
event = {
'event_type': 'authentication',
'user': user,
'success': success,
'ip_address': ip,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(f"Auth attempt: {event}")

def log_file_upload(self, user: str, filename: str, size: int, classification: str):
event = {
'event_type': 'file_upload',
'user': user,
'filename': filename,
'file_size': size,
'classification': classification,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(f"File upload: {event}")

def log_security_violation(self, violation_type: str, details: dict):
event = {
'event_type': 'security_violation',
'violation_type': violation_type,
'details': details,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.warning(f"Security violation: {event}")

# Usage
security_logger = SecurityLogger()

@app.post("/api/v1/documents")
async def process_document(
document: UploadFile,
current_user: User = Depends(get_current_user)
):
# Log file upload
security_logger.log_file_upload(
current_user.username,
document.filename,
document.size,
"internal"
)

# Process document
return await pipeline.process_document(document)

Intrusion Detection

Implement intrusion detection:

class IntrusionDetector:
def __init__(self):
self.suspicious_patterns = [
r'\.\.\/', # Directory traversal
r'<script', # XSS attempts
r'union.*select', # SQL injection
r'eval\s*\(', # Code injection
]
self.failed_attempts = defaultdict(int)
self.blocked_ips = set()

def detect_suspicious_activity(self, content: str, ip: str) -> bool:
# Check for suspicious patterns
for pattern in self.suspicious_patterns:
if re.search(pattern, content, re.IGNORECASE):
self.failed_attempts[ip] += 1
return True

return False

def should_block_ip(self, ip: str) -> bool:
return self.failed_attempts[ip] > 5 or ip in self.blocked_ips

# Usage
intrusion_detector = IntrusionDetector()

@app.middleware("http")
async def security_middleware(request: Request, call_next):
client_ip = request.client.host

if intrusion_detector.should_block_ip(client_ip):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)

# Check request content
if request.method == "POST":
body = await request.body()
if intrusion_detector.detect_suspicious_activity(body.decode(), client_ip):
security_logger.log_security_violation(
"suspicious_content",
{"ip": client_ip, "content_length": len(body)}
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Suspicious content detected"
)

response = await call_next(request)
return response

Compliance and Governance

Data Retention Policies

Implement data retention policies:

from datetime import datetime, timedelta

class DataRetentionManager:
def __init__(self, retention_policies: dict):
self.retention_policies = retention_policies

async def apply_retention_policies(self):
for classification, days in self.retention_policies.items():
cutoff_date = datetime.utcnow() - timedelta(days=days)
await self._delete_old_documents(classification, cutoff_date)

async def _delete_old_documents(self, classification: str, cutoff_date: datetime):
# Implement document deletion logic
pass

# Configuration
retention_policies = {
"public": 365, # 1 year
"internal": 1095, # 3 years
"confidential": 2555, # 7 years
"restricted": 3650 # 10 years
}

retention_manager = DataRetentionManager(retention_policies)

Audit Trail

Maintain comprehensive audit trails:

class AuditTrail:
def __init__(self):
self.db = None # Database connection

async def log_action(self, user: str, action: str, resource: str, details: dict):
audit_entry = {
'timestamp': datetime.utcnow(),
'user': user,
'action': action,
'resource': resource,
'details': details,
'ip_address': self._get_client_ip(),
'user_agent': self._get_user_agent()
}

await self.db.audit_logs.insert_one(audit_entry)

async def get_audit_trail(self, user: str = None, action: str = None,
start_date: datetime = None, end_date: datetime = None):
query = {}
if user:
query['user'] = user
if action:
query['action'] = action
if start_date:
query['timestamp'] = {'$gte': start_date}
if end_date:
query.setdefault('timestamp', {})['$lte'] = end_date

return await self.db.audit_logs.find(query).to_list(None)

Compliance Reporting

Generate compliance reports:

class ComplianceReporter:
def __init__(self, audit_trail: AuditTrail):
self.audit_trail = audit_trail

async def generate_sox_report(self, start_date: datetime, end_date: datetime):
"""Generate SOX compliance report."""
report = {
'period': {'start': start_date, 'end': end_date},
'total_documents_processed': 0,
'failed_documents': 0,
'security_violations': 0,
'data_access_logs': [],
'system_changes': []
}

# Query audit logs for the period
logs = await self.audit_trail.get_audit_trail(
start_date=start_date,
end_date=end_date
)

for log in logs:
if log['action'] == 'document_processed':
report['total_documents_processed'] += 1
elif log['action'] == 'document_failed':
report['failed_documents'] += 1
elif log['action'] == 'security_violation':
report['security_violations'] += 1

return report

Incident Response

Security Incident Response Plan

  1. Detection: Automated monitoring and alerting
  2. Assessment: Determine scope and impact
  3. Containment: Isolate affected systems
  4. Eradication: Remove threats
  5. Recovery: Restore normal operations
  6. Lessons Learned: Improve security posture

Incident Response Automation

class IncidentResponse:
def __init__(self):
self.response_procedures = {}

async def handle_security_incident(self, incident_type: str, severity: str):
"""Automated incident response."""

if severity == "critical":
# Immediate containment
await self._isolate_systems()
await self._notify_security_team()
await self._preserve_evidence()

# Execute response procedure
if incident_type in self.response_procedures:
await self.response_procedures[incident_type]()

async def _isolate_systems(self):
"""Isolate affected systems."""
# Implement system isolation logic
pass

async def _notify_security_team(self):
"""Notify security team of incident."""
# Send alerts to security team
pass

Security Testing

Penetration Testing

Regular security testing checklist:

  • Authentication bypass attempts
  • Authorization escalation
  • Input validation testing
  • SQL injection testing
  • XSS vulnerability testing
  • File upload security testing
  • API security testing
  • Network security testing

Security Scanning

Automated security scanning:

# OWASP ZAP security scan
docker run -t owasp/zap2docker-stable zap-baseline.py \
-t http://your-ingestion-api.com \
-r security-report.html

# Bandit security linter for Python
bandit -r recoagent/ -f json -o security-report.json

# Safety check for known vulnerabilities
safety check --json --output safety-report.json

Security Monitoring

SIEM Integration

Integrate with SIEM systems:

class SIEMConnector:
def __init__(self, siem_endpoint: str, api_key: str):
self.endpoint = siem_endpoint
self.api_key = api_key

async def send_security_event(self, event: dict):
"""Send security event to SIEM."""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}

async with aiohttp.ClientSession() as session:
async with session.post(
self.endpoint,
json=event,
headers=headers
) as response:
return await response.json()

# Usage
siem = SIEMConnector("https://siem.company.com/api", "api-key")

# Send security event
await siem.send_security_event({
'event_type': 'authentication_failure',
'user': 'attacker@evil.com',
'ip_address': '192.168.1.100',
'timestamp': datetime.utcnow().isoformat(),
'severity': 'high'
})

Security Checklist

Pre-Deployment Security Checklist

  • All dependencies updated and scanned
  • Secrets management implemented
  • Database encryption enabled
  • TLS/SSL configured properly
  • Authentication and authorization implemented
  • Input validation and sanitization in place
  • Rate limiting configured
  • Security headers implemented
  • Logging and monitoring configured
  • Backup and recovery procedures tested
  • Incident response plan documented
  • Security testing completed
  • Compliance requirements met

Ongoing Security Maintenance

  • Regular security updates
  • Vulnerability scanning
  • Penetration testing
  • Security training for team
  • Audit trail review
  • Access control review
  • Incident response testing
  • Backup and recovery testing

Support and Resources

For security-related issues:

External Resources

Next Steps