Production Serving Guide for Recommendation Systems
This guide demonstrates how to use RecoAgent's production serving capabilities for recommendation systems, including ONNX optimization, batch prediction, drift detection, and advanced evaluation metrics.
Overview
RecoAgent provides comprehensive production serving capabilities for recommendation systems:
- ONNX Model Serving: Optimized inference using ONNX Runtime
- Batch Prediction: High-throughput batch processing
- Model Optimization: Quantization, pruning, and compilation
- Drift Detection: Real-time monitoring of model and feature drift
- Advanced Evaluation: Beyond-accuracy and causal evaluation metrics
Quick Start
1. Initialize Production System
from recoagent.packages.recommendations.serving import (
ONNXModelServer, ONNXConfig, BatchPredictor, BatchConfig,
ModelOptimizer, OptimizationConfig
)
from recoagent.packages.recommendations.monitoring import (
DriftDetector, DriftConfig, DriftType
)
from recoagent.packages.recommendations.evaluation import (
BeyondAccuracyMetrics, BeyondAccuracyConfig, MetricType,
CausalEvaluationMetrics, CausalEvaluationConfig, CausalMetricType
)
# Initialize ONNX server
onnx_config = ONNXConfig(
model_path="models/recommendation_model.pth",
onnx_path="models/recommendation_model.onnx",
batch_size=32,
optimize=True
)
onnx_server = ONNXModelServer(onnx_config)
# Initialize batch predictor
batch_config = BatchConfig(
batch_size=1000,
max_workers=4,
enable_caching=True
)
batch_predictor = BatchPredictor(batch_config, prediction_function)
# Initialize drift detector
drift_config = DriftConfig(
drift_type=DriftType.MODEL_DRIFT,
threshold=0.05,
enable_realtime=True
)
drift_detector = DriftDetector(drift_config)
2. Train and Optimize Model
# Train model
model = train_recommendation_model(training_data)
# Convert to ONNX
sample_input = torch.randn(1, 10)
conversion_result = onnx_server.convert_pytorch_to_onnx(model, sample_input)
# Optimize model
optimization_config = OptimizationConfig(
method="quantization",
target_device="cpu",
optimize_for_inference=True
)
optimizer = ModelOptimizer(optimization_config)
optimization_result = optimizer.optimize_model(model, sample_input)
3. Serve Recommendations
# Batch prediction
predictions = batch_predictor.predict_batch(user_data)
# ONNX serving
input_data = user_data.select_dtypes(include=[np.number]).values
predictions = onnx_server.predict(input_data)
ONNX Model Serving
Configuration
onnx_config = ONNXConfig(
model_path="models/recommendation_model.pth",
onnx_path="models/recommendation_model.onnx",
input_names=["input"],
output_names=["output"],
opset_version=11,
optimize=True,
batch_size=32,
enable_profiling=True
)
Model Conversion
# Convert PyTorch model to ONNX
conversion_result = onnx_server.convert_pytorch_to_onnx(
pytorch_model, sample_input
)
if conversion_result["success"]:
print(f"Model converted to ONNX: {conversion_result['onnx_path']}")
else:
print(f"Conversion failed: {conversion_result['error']}")
Inference
# Single prediction
prediction = onnx_server.predict(input_data)
# Batch prediction
batch_predictions = onnx_server.predict_batch(batch_inputs)
# Benchmark performance
benchmark_results = onnx_server.benchmark_model(sample_inputs, 100)
Batch Prediction
Configuration
batch_config = BatchConfig(
batch_size=1000,
max_workers=4,
batch_mode=BatchMode.PARALLEL,
enable_caching=True,
cache_ttl=3600,
max_retries=3,
fail_fast=False
)
Batch Processing
# Initialize batch predictor
batch_predictor = BatchPredictor(batch_config, prediction_function)
# Process batch
predictions = batch_predictor.predict_batch(data)
# Get batch statistics
stats = batch_predictor.get_batch_stats()
Performance Benchmarking
# Benchmark batch prediction
benchmark_results = batch_predictor.benchmark_batch_prediction(
sample_data, num_iterations=100
)
print(f"Throughput: {benchmark_results['throughput']:.2f} predictions/second")
Model Optimization
Quantization
optimization_config = OptimizationConfig(
method=OptimizationMethod.QUANTIZATION,
quantization_type="int8",
target_device="cpu"
)
optimizer = ModelOptimizer(optimization_config)
# Optimize model
optimization_result = optimizer.optimize_model(model, sample_input)
Pruning
optimization_config = OptimizationConfig(
method=OptimizationMethod.PRUNING,
pruning_ratio=0.1,
pruning_criteria="magnitude"
)
optimizer = ModelOptimizer(optimization_config)
# Prune model
optimization_result = optimizer.optimize_model(model, sample_input)
Compilation
optimization_config = OptimizationConfig(
method=OptimizationMethod.COMPILATION,
compile_backend="torchscript",
optimize_for_inference=True
)
optimizer = ModelOptimizer(optimization_config)
# Compile model
optimization_result = optimizer.optimize_model(model, sample_input)
Drift Detection
Configuration
drift_config = DriftConfig(
drift_type=DriftType.MODEL_DRIFT,
threshold=0.05,
window_size=1000,
statistical_test="ks",
enable_realtime=True,
enable_alerts=True,
alert_threshold=0.1
)
Drift Detection
# Initialize drift detector
drift_detector = DriftDetector(drift_config)
# Set baseline data
drift_detector.set_baseline_data(reference_data)
# Detect drift
drift_result = drift_detector.detect_drift(current_data)
if drift_result["drift_detected"]:
print(f"Drift detected with score: {drift_result['overall_score']:.3f}")
Monitoring
# Get drift history
drift_history = drift_detector.get_drift_history()
# Get alert history
alert_history = drift_detector.get_alert_history()
# Get drift summary
drift_summary = drift_detector.get_drift_summary()
Advanced Evaluation
Beyond-Accuracy Metrics
# Configure beyond-accuracy metrics
beyond_config = BeyondAccuracyConfig(
metric_types=[MetricType.DIVERSITY, MetricType.NOVELTY, MetricType.COVERAGE],
diversity_method="intra_list",
novelty_method="popularity",
coverage_method="catalog"
)
# Initialize metrics
beyond_metrics = BeyondAccuracyMetrics(beyond_config)
# Evaluate recommendations
evaluation_result = beyond_metrics.evaluate(
recommendations, user_data, item_data
)
Causal Evaluation Metrics
# Configure causal metrics
causal_config = CausalEvaluationConfig(
metric_types=[CausalMetricType.CAUSAL_ACCURACY, CausalMetricType.CAUSAL_F1],
propensity_method="logistic_regression",
ips_method="naive",
ips_clipping=True
)
# Initialize metrics
causal_metrics = CausalEvaluationMetrics(causal_config)
# Evaluate recommendations
evaluation_result = causal_metrics.evaluate(
recommendations, interactions, user_features, item_features
)
Production Example
Complete Production System
class ProductionRecommendationSystem:
def __init__(self):
# Initialize ONNX server
onnx_config = ONNXConfig(
model_path="models/recommendation_model.pth",
onnx_path="models/recommendation_model.onnx",
batch_size=32,
optimize=True
)
self.onnx_server = ONNXModelServer(onnx_config)
# Initialize batch predictor
batch_config = BatchConfig(
batch_size=1000,
max_workers=4,
enable_caching=True
)
self.batch_predictor = BatchPredictor(batch_config, self._predict_function)
# Initialize drift detector
drift_config = DriftConfig(
drift_type=DriftType.MODEL_DRIFT,
threshold=0.05,
enable_realtime=True
)
self.drift_detector = DriftDetector(drift_config)
def serve_recommendations(self, user_data, batch_mode=True):
if batch_mode:
return self.batch_predictor.predict_batch(user_data)
else:
input_data = user_data.select_dtypes(include=[np.number]).values
return self.onnx_server.predict(input_data)
def monitor_drift(self, current_data, reference_data=None):
if reference_data is not None:
self.drift_detector.set_baseline_data(reference_data)
return self.drift_detector.detect_drift(current_data)
Best Practices
1. Model Optimization
- Use quantization for CPU inference
- Apply pruning for model compression
- Use TorchScript for production deployment
- Benchmark performance before deployment
2. Batch Processing
- Use appropriate batch sizes for your hardware
- Enable caching for repeated predictions
- Use parallel processing for large batches
- Monitor memory usage during batch processing
3. Drift Detection
- Set appropriate drift thresholds
- Use multiple statistical tests
- Monitor both model and feature drift
- Set up alerting for drift detection
4. Evaluation
- Use beyond-accuracy metrics for comprehensive evaluation
- Apply causal evaluation for unbiased metrics
- Monitor evaluation metrics over time
- Use multiple evaluation methods
5. Production Deployment
- Use ONNX for optimized inference
- Implement proper error handling
- Set up monitoring and alerting
- Use batch processing for high throughput
Performance Optimization
ONNX Optimization
# Optimize ONNX model
optimization_result = onnx_server.optimize_model()
# Benchmark performance
benchmark_results = onnx_server.benchmark_model(sample_inputs, 100)
# Get model info
model_info = onnx_server.get_model_info()
Batch Processing Optimization
# Configure for high throughput
batch_config = BatchConfig(
batch_size=5000,
max_workers=8,
batch_mode=BatchMode.PARALLEL,
enable_caching=True
)
# Benchmark batch performance
benchmark_results = batch_predictor.benchmark_batch_prediction(
sample_data, num_iterations=100
)
Monitoring and Alerting
Drift Monitoring
# Real-time drift monitoring
drift_config = DriftConfig(
drift_type=DriftType.MODEL_DRIFT,
threshold=0.05,
enable_realtime=True,
enable_alerts=True,
alert_threshold=0.1,
alert_cooldown=3600
)
Performance Monitoring
# Monitor ONNX server performance
serving_info = onnx_server.get_serving_info()
# Monitor batch predictor performance
batch_stats = batch_predictor.get_batch_stats()
# Monitor drift detection
drift_summary = drift_detector.get_drift_summary()
Troubleshooting
Common Issues
-
ONNX Conversion Failures
- Check model compatibility
- Verify input/output shapes
- Use appropriate opset version
-
Batch Processing Errors
- Check memory usage
- Verify batch size
- Monitor worker processes
-
Drift Detection Issues
- Verify data quality
- Check statistical test parameters
- Monitor baseline data
-
Evaluation Errors
- Validate input data
- Check metric configurations
- Verify feature availability
Debugging
# Enable detailed logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Check component status
status = system.get_system_status()
# Validate model
validation_result = onnx_server.validate_model(sample_inputs)
# Check drift detection
drift_summary = drift_detector.get_drift_summary()
Conclusion
RecoAgent's production serving capabilities provide comprehensive tools for deploying recommendation systems in production environments. By following this guide, you can:
- Optimize models for production inference
- Implement high-throughput batch processing
- Monitor model and feature drift
- Evaluate recommendations using advanced metrics
- Deploy production-ready recommendation systems
For more examples and advanced usage, see the examples directory and the API documentation.