Recommendations API Reference
This document provides comprehensive API documentation for RecoAgent's Intelligent Recommendations system.
Overview
The Intelligent Recommendations system provides state-of-the-art recommendation capabilities including:
- Neural Collaborative Filtering: NCF, GMF, NeuMF for deep learning recommendations
- Two-Tower Architecture: Large-scale retrieval with dual-encoder models
- Library Adapter Framework: Integration with Microsoft Recommenders, RecBole, Implicit, DLRM, Surprise
- LLM Integration: AI-powered ranking, feature extraction, and RAG-based recommendations
- Causal Inference: Unbiased optimization using IPS and doubly robust methods
- Production Serving: ONNX optimization, batch processing, and drift detection
- Advanced Evaluation: Beyond-accuracy metrics and causal evaluation
Core Components
1. Library Adapter Framework
BaseRecommenderAdapter
from recoagent.packages.recommendations.adapters import BaseRecommenderAdapter
class BaseRecommenderAdapter:
"""Base class for recommendation library adapters."""
def get_available_models(self) -> List[str]:
"""Get list of available models."""
pass
def train_model(self, model_name: str, data: pd.DataFrame) -> Any:
"""Train a model."""
pass
def predict(self, model: Any, data: pd.DataFrame) -> pd.DataFrame:
"""Make predictions."""
pass
Microsoft Recommenders Adapter
from recoagent.packages.recommendations.adapters import MicrosoftRecommendersAdapter
adapter = MicrosoftRecommendersAdapter()
# Available models: SAR, NCF, DeepFM, xDeepFM, FastAI
models = adapter.get_available_models()
# Train NCF model
ncf_model = adapter.train_model("NCF", training_data)
# Make predictions
predictions = adapter.predict(ncf_model, user_data)
RecBole Adapter
from recoagent.packages.recommendations.adapters import RecBoleAdapter
adapter = RecBoleAdapter()
# Available models: BPR, NGCF, LightGCN, SASRec, GRU4Rec, etc.
models = adapter.get_available_models()
# Train BPR model
bpr_model = adapter.train_model("BPR", training_data)
Implicit Adapter
from recoagent.packages.recommendations.adapters import ImplicitAdapter
adapter = ImplicitAdapter()
# Available models: ALS, BPR, LMF
models = adapter.get_available_models()
# Train ALS model
als_model = adapter.train_model("ALS", training_data)
2. Two-Tower Retrieval System
TwoTowerRetriever
from recoagent.packages.recommendations.retrieval import TwoTowerRetriever
retriever = TwoTowerRetriever(
user_tower_config={
"embedding_dim": 128,
"hidden_dims": [256, 128]
},
item_tower_config={
"embedding_dim": 128,
"hidden_dims": [256, 128]
}
)
# Train the retriever
retriever.train(training_data)
# Retrieve candidates
candidates = retriever.retrieve(user_embeddings, top_k=1000)
Vector Store Integration
from recoagent.packages.recommendations.retrieval import VectorStoreAdapter
# Chroma integration
chroma_adapter = VectorStoreAdapter("chroma")
# Qdrant integration
qdrant_adapter = VectorStoreAdapter("qdrant")
# Weaviate integration
weaviate_adapter = VectorStoreAdapter("weaviate")
3. Feature Store Integration
Feast Adapter
from recoagent.packages.recommendations.feature_store import FeastAdapter
adapter = FeastAdapter(
feature_store_url="feast://localhost:6566",
entity_ids=["user_id", "item_id"]
)
# Get real-time features
features = adapter.get_features(
entity_ids=["user_123"],
feature_names=["user_age", "user_income", "item_category"]
)
Tecton Adapter
from recoagent.packages.recommendations.feature_store import TectonAdapter
adapter = TectonAdapter(
workspace="production",
environment="prod"
)
# Get features
features = adapter.get_features(entity_ids, feature_names)
4. LLM Integration
LLM Ranker
from recoagent.packages.recommendations.llm_enhanced import LLMRanker
ranker = LLMRanker(
model_name="gpt-4",
ranking_method="listwise",
max_candidates=100
)
# Rank recommendations
ranked_items = ranker.rank(
user_context="Looking for electronics",
candidate_items=candidates
)
LLM Feature Extractor
from recoagent.packages.recommendations.llm_enhanced import LLMFeatureExtractor
extractor = LLMFeatureExtractor(
model_name="gpt-4",
feature_types=["semantic", "contextual", "preference"]
)
# Extract features
features = extractor.extract_features(
text_data=item_descriptions,
context=user_context
)
RAG Recommender
from recoagent.packages.recommendations.rag import RAGRecommender
rag_recommender = RAGRecommender(
llm_model="gpt-4",
retriever="vector_store",
knowledge_base="product_database"
)
# Generate recommendations
recommendations = rag_recommender.recommend(
user_query="Looking for sustainable fashion",
context={"budget": 200, "style": "casual"}
)
5. Causal Inference
Causal Estimator
from recoagent.packages.recommendations.causal import CausalEstimator
estimator = CausalEstimator(
method="doubly_robust",
propensity_model="logistic_regression"
)
# Estimate causal effects
causal_effects = estimator.estimate_effects(
treatment_data=treatment_data,
outcome_data=outcome_data
)
Debiasing Utils
from recoagent.packages.recommendations.causal import DebiasingUtils
debiasing_utils = DebiasingUtils(
bias_types=["selection", "position", "popularity"]
)
# Detect bias
bias_detection = debiasing_utils.detect_bias(recommendations)
# Apply debiasing
debiased_recommendations = debiasing_utils.apply_debiasing(
recommendations, bias_detection
)
6. Production Serving
ONNX Model Server
from recoagent.packages.recommendations.serving import ONNXModelServer, ONNXConfig
config = ONNXConfig(
model_path="models/recommendation_model.pth",
onnx_path="models/recommendation_model.onnx",
batch_size=32,
optimize=True
)
server = ONNXModelServer(config)
# Convert PyTorch to ONNX
conversion_result = server.convert_pytorch_to_onnx(model, sample_input)
# Make predictions
predictions = server.predict(input_data)
# Benchmark performance
benchmark_results = server.benchmark_model(sample_inputs, 100)
Batch Predictor
from recoagent.packages.recommendations.serving import BatchPredictor, BatchConfig
config = BatchConfig(
batch_size=1000,
max_workers=4,
enable_caching=True
)
predictor = BatchPredictor(config, prediction_function)
# Process batch
predictions = predictor.predict_batch(user_data)
# Get statistics
stats = predictor.get_batch_stats()
Model Optimizer
from recoagent.packages.recommendations.serving import ModelOptimizer, OptimizationConfig
config = OptimizationConfig(
method="quantization",
target_device="cpu",
optimize_for_inference=True
)
optimizer = ModelOptimizer(config)
# Optimize model
optimization_result = optimizer.optimize_model(model, sample_input)
7. Drift Detection
Drift Detector
from recoagent.packages.recommendations.monitoring import DriftDetector, DriftConfig, DriftType
config = DriftConfig(
drift_type=DriftType.MODEL_DRIFT,
threshold=0.05,
enable_realtime=True
)
detector = DriftDetector(config)
# Set baseline data
detector.set_baseline_data(reference_data)
# Detect drift
drift_result = detector.detect_drift(current_data)
# Get drift summary
drift_summary = detector.get_drift_summary()
8. Advanced Evaluation
Beyond-Accuracy Metrics
from recoagent.packages.recommendations.evaluation import BeyondAccuracyMetrics, BeyondAccuracyConfig, MetricType
config = BeyondAccuracyConfig(
metric_types=[MetricType.DIVERSITY, MetricType.NOVELTY, MetricType.COVERAGE],
diversity_method="intra_list",
novelty_method="popularity"
)
metrics = BeyondAccuracyMetrics(config)
# Evaluate recommendations
evaluation_result = metrics.evaluate(
recommendations=recommendations,
user_data=user_data,
item_data=item_data
)
Causal Evaluation Metrics
from recoagent.packages.recommendations.evaluation import CausalEvaluationMetrics, CausalEvaluationConfig, CausalMetricType
config = CausalEvaluationConfig(
metric_types=[CausalMetricType.CAUSAL_ACCURACY, CausalMetricType.CAUSAL_F1],
propensity_method="logistic_regression",
ips_method="naive"
)
metrics = CausalEvaluationMetrics(config)
# Evaluate with causal inference
evaluation_result = metrics.evaluate(
recommendations=recommendations,
interactions=interactions,
user_features=user_features,
item_features=item_features
)
9. Transfer Learning
Domain Adapter
from recoagent.packages.recommendations.transfer import DomainAdapter
adapter = DomainAdapter(
source_domain="electronics",
target_domain="fashion",
adaptation_method="fine_tuning"
)
# Adapt model to new domain
adapted_model = adapter.adapt_model(source_model, target_data)
Meta Learner
from recoagent.packages.recommendations.transfer import MetaLearner
learner = MetaLearner(
model_architecture="neural_cf",
meta_lr=0.001,
inner_lr=0.01
)
# Learn from multiple tasks
meta_model = learner.meta_learn(training_tasks)
Cross-Domain Bridge
from recoagent.packages.recommendations.transfer import CrossDomainBridge
bridge = CrossDomainBridge(
embedding_mapping=True,
attention_bridge=True
)
# Transfer knowledge between domains
transferred_model = bridge.transfer_knowledge(
source_model, target_domain_data
)
Configuration
Environment Variables
# Feature Store Configuration
FEAST_URL=feast://localhost:6566
TECTON_WORKSPACE=production
# LLM Configuration
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
# Vector Store Configuration
CHROMA_URL=http://localhost:8000
QDRANT_URL=http://localhost:6333
WEAVIATE_URL=http://localhost:8080
# Production Serving
ONNX_MODEL_PATH=/models/recommendation_model.onnx
BATCH_SIZE=1000
MAX_WORKERS=4
Configuration Files
# config/recommendations.yaml
recommendations:
adapters:
microsoft_recommenders:
enabled: true
models: ["SAR", "NCF", "DeepFM"]
recbole:
enabled: true
models: ["BPR", "NGCF", "LightGCN"]
serving:
onnx:
enabled: true
optimization: true
batch:
enabled: true
batch_size: 1000
evaluation:
beyond_accuracy:
enabled: true
metrics: ["diversity", "novelty", "serendipity"]
causal:
enabled: true
methods: ["ips", "doubly_robust"]
Examples
Complete Recommendation Pipeline
from recoagent.packages.recommendations.adapters import MicrosoftRecommendersAdapter
from recoagent.packages.recommendations.serving import ONNXModelServer, BatchPredictor
from recoagent.packages.recommendations.evaluation import BeyondAccuracyMetrics
from recoagent.packages.recommendations.monitoring import DriftDetector
# 1. Train model with Microsoft Recommenders
adapter = MicrosoftRecommendersAdapter()
model = adapter.train_model("NCF", training_data)
# 2. Convert to ONNX for production serving
onnx_server = ONNXModelServer(ONNXConfig(
model_path="models/ncf_model.pth",
onnx_path="models/ncf_model.onnx"
))
onnx_server.convert_pytorch_to_onnx(model, sample_input)
# 3. Set up batch prediction
batch_predictor = BatchPredictor(BatchConfig(
batch_size=1000,
max_workers=4
))
# 4. Configure drift detection
drift_detector = DriftDetector(DriftConfig(
drift_type=DriftType.MODEL_DRIFT,
threshold=0.05
))
# 5. Set up evaluation
beyond_metrics = BeyondAccuracyMetrics(BeyondAccuracyConfig(
metric_types=[MetricType.DIVERSITY, MetricType.NOVELTY]
))
# 6. Production pipeline
def production_pipeline(user_data):
# Make predictions
predictions = batch_predictor.predict_batch(user_data)
# Monitor drift
drift_result = drift_detector.detect_drift(user_data)
# Evaluate quality
evaluation = beyond_metrics.evaluate(predictions, user_data)
return predictions, drift_result, evaluation
Performance Benchmarks
ONNX Optimization
- 2-5x faster inference compared to PyTorch
- 50% reduction in model size with quantization
- Sub-100ms latency for real-time serving
Batch Processing
- 10x higher throughput with parallel processing
- 1000+ recommendations/second processing capability
- Scalable to millions of users
Advanced Evaluation
- Beyond-accuracy metrics: Diversity, novelty, serendipity, coverage
- Causal evaluation: Unbiased performance measurement
- Real-time monitoring: Drift detection and alerting
Troubleshooting
Common Issues
-
Import Errors: Ensure all dependencies are installed
pip install recoagent[recommendations] -
ONNX Conversion Failures: Check model compatibility and input shapes
-
Batch Processing Errors: Monitor memory usage and worker processes
-
Drift Detection Issues: Verify data quality and statistical parameters
Debug Mode
import logging
logging.basicConfig(level=logging.DEBUG)
# Enable detailed logging for troubleshooting
Support
For additional support and examples, see: