Product Recommendations Example
This example demonstrates how to build a complete product recommendation system using RecoAgent's recommendation capabilities.
Overview
We'll build an e-commerce product recommendation system that includes:
- Collaborative filtering for personalized recommendations
- A/B testing for optimization
- Cold start handling for new users
- Business rules for inventory and pricing
- Real-time personalization
Setup
Install Dependencies
pip install recoagent[enterprise]
pip install pandas numpy scikit-learn
Sample Data
# sample_data.py
import pandas as pd
import numpy as np
# Generate sample user interactions
np.random.seed(42)
n_users = 1000
n_products = 500
n_interactions = 10000
# Create sample interactions
interactions = pd.DataFrame({
'user_id': np.random.randint(0, n_users, n_interactions),
'product_id': np.random.randint(0, n_products, n_interactions),
'rating': np.random.randint(1, 6, n_interactions),
'timestamp': pd.date_range('2023-01-01', periods=n_interactions, freq='1H')
})
# Create sample products
products = pd.DataFrame({
'product_id': range(n_products),
'name': [f'Product {i}' for i in range(n_products)],
'category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], n_products),
'price': np.random.uniform(10, 500, n_products),
'in_stock': np.random.choice([True, False], n_products, p=[0.8, 0.2])
})
# Create sample users
users = pd.DataFrame({
'user_id': range(n_users),
'age': np.random.randint(18, 65, n_users),
'location': np.random.choice(['US', 'EU', 'Asia'], n_users),
'preferred_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], n_users)
})
print(f"Generated {len(interactions)} interactions")
print(f"Generated {len(products)} products")
print(f"Generated {len(users)} users")
Basic Recommendation System
1. Initialize Recommendation Service
# recommendation_service.py
from recoagent.packages.recommendations.service import RecommendationService, ServiceConfig
from recoagent.packages.recommendations.agents import RecommendationAgent
import asyncio
class ProductRecommendationService:
def __init__(self):
# Configure the service
self.config = ServiceConfig(
service_id="product_recommendations",
client_id="ecommerce_app",
algorithms={
"collaborative_filtering": {
"type": "ALSRecommender",
"params": {"factors": 50, "regularization": 0.01}
},
"content_based": {
"type": "ContentBasedRecommender",
"params": {"embedding_model": "sentence-transformers/all-MiniLM-L6-v2"}
}
},
primary_algorithm="collaborative_filtering",
enable_cold_start=True,
enable_real_time=True
)
# Initialize service and agent
self.service = RecommendationService(self.config)
self.agent = RecommendationAgent()
async def initialize(self):
"""Initialize the recommendation service."""
await self.service.start()
print("✅ Product recommendation service initialized")
async def get_recommendations(self, user_id, n_recommendations=10, context=None):
"""Get product recommendations for a user."""
try:
# Get recommendations using the agent
recommendations = await self.agent.get_recommendations(
user_id=user_id,
n_recommendations=n_recommendations,
context=context
)
return {
"success": True,
"recommendations": recommendations,
"user_id": user_id
}
except Exception as e:
return {
"success": False,
"error": str(e),
"user_id": user_id
}
async def record_interaction(self, user_id, product_id, interaction_type, context=None):
"""Record user interaction with a product."""
try:
# Map interaction types to feedback scores
feedback_mapping = {
"view": 0.1,
"click": 0.3,
"add_to_cart": 0.7,
"purchase": 1.0,
"dismiss": -0.2
}
feedback_score = feedback_mapping.get(interaction_type, 0.1)
# Record feedback
await self.service.record_feedback(
user_id=user_id,
item_id=product_id,
feedback=feedback_score,
context=context
)
return {"success": True, "message": "Interaction recorded"}
except Exception as e:
return {"success": False, "error": str(e)}
2. A/B Testing Integration
# ab_testing.py
from recoagent.packages.recommendations.bandits import ThompsonSamplingBandit
class RecommendationABTest:
def __init__(self):
self.bandit = ThompsonSamplingBandit(
n_arms=3, # Three recommendation strategies
alpha_prior=1.0,
beta_prior=1.0
)
self.strategies = {
0: "collaborative_filtering",
1: "content_based",
2: "hybrid"
}
def select_strategy(self, user_id, context=None):
"""Select recommendation strategy using bandit algorithm."""
# Get user segment for context
user_segment = self._get_user_segment(user_id)
# Select strategy
strategy_index = self.bandit.select_arm(
context={"user_segment": user_segment}
)
return self.strategies[strategy_index], strategy_index
def update_feedback(self, user_id, strategy_index, reward, context=None):
"""Update bandit with user feedback."""
self.bandit.update(strategy_index, reward, context=context)
def _get_user_segment(self, user_id):
"""Get user segment for contextual bandits."""
# Simple heuristic based on user ID
if user_id % 3 == 0:
return "new_user"
elif user_id % 3 == 1:
return "regular_user"
else:
return "power_user"
def get_stats(self):
"""Get current A/B test statistics."""
arm_counts = self.bandit.get_arm_counts()
arm_rewards = self.bandit.get_arm_rewards()
stats = {}
for i, strategy in self.strategies.items():
stats[strategy] = {
"count": arm_counts[i],
"total_reward": arm_rewards[i],
"average_reward": arm_rewards[i] / max(1, arm_counts[i])
}
return stats
3. Business Rules Engine
# business_rules.py
from recoagent.packages.recommendations.business import BusinessRulesEngine
class ProductBusinessRules:
def __init__(self):
self.rules_engine = BusinessRulesEngine()
async def apply_rules(self, recommendations, user_context=None):
"""Apply business rules to recommendations."""
filtered_recommendations = []
for rec in recommendations:
# Check inventory
if await self._check_inventory(rec):
# Check price sensitivity
if await self._check_price_sensitivity(rec, user_context):
# Check category preferences
if await self._check_category_preferences(rec, user_context):
# Apply category boost
boosted_rec = await self._apply_category_boost(rec, user_context)
filtered_recommendations.append(boosted_rec)
return filtered_recommendations
async def _check_inventory(self, recommendation):
"""Check if product is in stock."""
# In a real implementation, check inventory system
product_id = recommendation.get("product_id")
# Simulate inventory check
return True # For demo purposes
async def _check_price_sensitivity(self, recommendation, user_context):
"""Check if product price matches user's price sensitivity."""
if not user_context or "budget" not in user_context:
return True
product_price = recommendation.get("price", 0)
user_budget = user_context["budget"]
# Allow 20% over budget
return product_price <= user_budget * 1.2
async def _check_category_preferences(self, recommendation, user_context):
"""Check if product category matches user preferences."""
if not user_context or "preferred_categories" not in user_context:
return True
product_category = recommendation.get("category")
preferred_categories = user_context["preferred_categories"]
return product_category in preferred_categories
async def _apply_category_boost(self, recommendation, user_context):
"""Apply category boost to recommendations."""
if not user_context or "preferred_categories" not in user_context:
return recommendation
product_category = recommendation.get("category")
preferred_categories = user_context["preferred_categories"]
if product_category in preferred_categories:
# Boost score for preferred categories
recommendation["score"] = recommendation.get("score", 0) * 1.2
return recommendation
Complete Implementation
1. Main Application
# main_app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional, List
import asyncio
app = FastAPI(title="Product Recommendation API")
# Initialize services
recommendation_service = None
ab_test = None
business_rules = None
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup."""
global recommendation_service, ab_test, business_rules
recommendation_service = ProductRecommendationService()
await recommendation_service.initialize()
ab_test = RecommendationABTest()
business_rules = ProductBusinessRules()
print("🚀 Product Recommendation API ready!")
# Request/Response Models
class RecommendationRequest(BaseModel):
user_id: str
n_recommendations: int = 10
context: Optional[Dict[str, Any]] = None
class InteractionRequest(BaseModel):
user_id: str
product_id: str
interaction_type: str
context: Optional[Dict[str, Any]] = None
# API Endpoints
@app.post("/recommendations")
async def get_recommendations(request: RecommendationRequest):
"""Get product recommendations for a user."""
try:
# Select strategy using A/B test
strategy, strategy_index = ab_test.select_strategy(
request.user_id, request.context
)
# Get recommendations
result = await recommendation_service.get_recommendations(
user_id=request.user_id,
n_recommendations=request.n_recommendations,
context=request.context
)
if result["success"]:
# Apply business rules
filtered_recommendations = await business_rules.apply_rules(
result["recommendations"], request.context
)
return {
"success": True,
"recommendations": filtered_recommendations,
"strategy_used": strategy,
"user_id": request.user_id
}
else:
raise HTTPException(status_code=500, detail=result["error"])
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/interactions")
async def record_interaction(request: InteractionRequest):
"""Record user interaction with a product."""
try:
# Record interaction
result = await recommendation_service.record_interaction(
user_id=request.user_id,
product_id=request.product_id,
interaction_type=request.interaction_type,
context=request.context
)
if result["success"]:
# Update A/B test with feedback
# In a real implementation, you'd need to track which strategy was used
strategy_index = 0 # This should be retrieved from user's recommendation history
reward = 0.8 if request.interaction_type == "purchase" else 0.1
ab_test.update_feedback(
user_id=request.user_id,
strategy_index=strategy_index,
reward=reward,
context=request.context
)
return {"success": True, "message": "Interaction recorded"}
else:
raise HTTPException(status_code=500, detail=result["error"])
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/ab-test/stats")
async def get_ab_test_stats():
"""Get A/B test statistics."""
try:
stats = ab_test.get_stats()
return {"success": True, "stats": stats}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"service": "product_recommendations",
"timestamp": "2024-01-01T00:00:00Z"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2. Demo Script
# demo.py
import asyncio
import httpx
import json
async def demo_recommendations():
"""Demonstrate the product recommendation system."""
base_url = "http://localhost:8000"
async with httpx.AsyncClient() as client:
# Test 1: Get recommendations for a user
print("🎯 Test 1: Getting product recommendations")
recommendation_request = {
"user_id": "user_123",
"n_recommendations": 5,
"context": {
"category": "Electronics",
"budget": 500,
"preferred_categories": ["Electronics", "Books"]
}
}
response = await client.post(f"{base_url}/recommendations", json=recommendation_request)
if response.status_code == 200:
result = response.json()
print(f"✅ Recommendations for user_123:")
print(f" Strategy used: {result['strategy_used']}")
print(f" Number of recommendations: {len(result['recommendations'])}")
for i, rec in enumerate(result['recommendations'], 1):
print(f" {i}. {rec.get('name', 'Unknown Product')} - Score: {rec.get('score', 0):.3f}")
else:
print(f"❌ Failed to get recommendations: {response.text}")
# Test 2: Record user interaction
print("\n📊 Test 2: Recording user interaction")
interaction_request = {
"user_id": "user_123",
"product_id": "product_456",
"interaction_type": "purchase",
"context": {"session_id": "session_789"}
}
response = await client.post(f"{base_url}/interactions", json=interaction_request)
if response.status_code == 200:
result = response.json()
print(f"✅ Interaction recorded: {result['message']}")
else:
print(f"❌ Failed to record interaction: {response.text}")
# Test 3: Get A/B test statistics
print("\n📈 Test 3: A/B test statistics")
response = await client.get(f"{base_url}/ab-test/stats")
if response.status_code == 200:
result = response.json()
print("✅ A/B test statistics:")
for strategy, stats in result['stats'].items():
print(f" {strategy}:")
print(f" Count: {stats['count']}")
print(f" Total Reward: {stats['total_reward']:.2f}")
print(f" Average Reward: {stats['average_reward']:.3f}")
else:
print(f"❌ Failed to get A/B test stats: {response.text}")
# Test 4: Health check
print("\n🏥 Test 4: Health check")
response = await client.get(f"{base_url}/health")
if response.status_code == 200:
result = response.json()
print(f"✅ Service status: {result['status']}")
else:
print(f"❌ Health check failed: {response.text}")
if __name__ == "__main__":
asyncio.run(demo_recommendations())
Running the Example
1. Start the API Server
python main_app.py
2. Run the Demo
python demo.py
3. Expected Output
🎯 Test 1: Getting product recommendations
✅ Recommendations for user_123:
Strategy used: collaborative_filtering
Number of recommendations: 5
1. Product 123 - Score: 0.856
2. Product 456 - Score: 0.789
3. Product 789 - Score: 0.734
4. Product 321 - Score: 0.698
5. Product 654 - Score: 0.645
📊 Test 2: Recording user interaction
✅ Interaction recorded: Interaction recorded
📈 Test 3: A/B test statistics
✅ A/B test statistics:
collaborative_filtering:
Count: 1
Total Reward: 0.80
Average Reward: 0.800
content_based:
Count: 0
Total Reward: 0.00
Average Reward: 0.000
hybrid:
Count: 0
Total Reward: 0.00
Average Reward: 0.000
🏥 Test 4: Health check
✅ Service status: healthy
Advanced Features
1. Real-time Personalization
# real_time_personalization.py
class RealTimePersonalization:
def __init__(self):
self.user_profiles = {}
self.session_data = {}
async def update_user_profile(self, user_id, interaction):
"""Update user profile in real-time."""
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {
"preferences": {},
"behavior_patterns": [],
"last_updated": None
}
# Update preferences based on interaction
product_category = interaction.get("category")
if product_category:
if product_category not in self.user_profiles[user_id]["preferences"]:
self.user_profiles[user_id]["preferences"][product_category] = 0
self.user_profiles[user_id]["preferences"][product_category] += 1
# Update behavior patterns
self.user_profiles[user_id]["behavior_patterns"].append(interaction)
self.user_profiles[user_id]["last_updated"] = datetime.now()
async def get_personalized_context(self, user_id):
"""Get personalized context for recommendations."""
if user_id not in self.user_profiles:
return {}
profile = self.user_profiles[user_id]
# Get top preferences
top_categories = sorted(
profile["preferences"].items(),
key=lambda x: x[1],
reverse=True
)[:3]
return {
"preferred_categories": [cat for cat, _ in top_categories],
"user_segment": self._determine_user_segment(profile),
"recent_behavior": profile["behavior_patterns"][-5:] # Last 5 interactions
}
def _determine_user_segment(self, profile):
"""Determine user segment based on behavior."""
if len(profile["behavior_patterns"]) < 5:
return "new_user"
elif len(profile["behavior_patterns"]) < 20:
return "regular_user"
else:
return "power_user"
2. Cold Start Handling
# cold_start.py
class ColdStartHandler:
def __init__(self):
self.popular_items = {}
self.category_trends = {}
async def handle_new_user(self, user_id, user_attributes=None):
"""Handle recommendations for new users."""
# Get popular items
popular_items = await self._get_popular_items()
# Get trending items by category
trending_items = await self._get_trending_items()
# Combine and rank
recommendations = []
# Add popular items
for item in popular_items[:3]:
recommendations.append({
"product_id": item["product_id"],
"name": item["name"],
"category": item["category"],
"score": item["popularity_score"],
"reason": "Popular item"
})
# Add trending items
for item in trending_items[:2]:
recommendations.append({
"product_id": item["product_id"],
"name": item["name"],
"category": item["category"],
"score": item["trend_score"],
"reason": "Trending item"
})
return recommendations
async def _get_popular_items(self):
"""Get most popular items."""
# In a real implementation, query database
return [
{"product_id": "1", "name": "Popular Product 1", "category": "Electronics", "popularity_score": 0.9},
{"product_id": "2", "name": "Popular Product 2", "category": "Clothing", "popularity_score": 0.8},
{"product_id": "3", "name": "Popular Product 3", "category": "Books", "popularity_score": 0.7}
]
async def _get_trending_items(self):
"""Get trending items."""
# In a real implementation, query database
return [
{"product_id": "4", "name": "Trending Product 1", "category": "Electronics", "trend_score": 0.85},
{"product_id": "5", "name": "Trending Product 2", "category": "Home", "trend_score": 0.75}
]
Best Practices
- Start Simple: Begin with collaborative filtering, add complexity gradually
- Monitor Performance: Track recommendation quality and user engagement
- A/B Test Everything: Use bandit algorithms to optimize strategies
- Handle Cold Start: Implement proper cold start strategies for new users
- Apply Business Rules: Use business constraints to ensure recommendations are actionable
- Real-time Updates: Keep user profiles and models up to date
- Scalability: Design for horizontal scaling and high throughput
Next Steps
- Advanced Algorithms: Implement sequential and graph-based models
- Multimodal Recommendations: Add image and text features
- Real-time Streaming: Implement real-time recommendation updates
- Advanced Analytics: Add comprehensive analytics and reporting
- MLOps Integration: Implement model versioning and deployment pipelines
This example provides a solid foundation for building production-ready product recommendation systems using RecoAgent's recommendation capabilities.