Skip to main content

Building Recommendation Systems

This guide walks you through building a complete recommendation system using RecoAgent's recommendation capabilities.

Overview

We'll build a product recommendation system for an e-commerce platform that includes:

  • Collaborative filtering for personalized recommendations
  • A/B testing for optimization
  • Cold start handling for new users
  • Real-time personalization
  • Business rules integration

Prerequisites

  • Python 3.9+
  • RecoAgent installed with enterprise features: pip install recoagent[enterprise]
  • Basic understanding of recommendation systems

Step 1: Setup and Configuration

Install Dependencies

pip install recoagent[enterprise]
pip install pandas numpy scikit-learn

Create Configuration

# config.py
from recoagent.packages.recommendations.service import ServiceConfig

config = ServiceConfig(
service_id="ecommerce_recommendations",
client_id="my_ecommerce_app",
algorithms={
"collaborative_filtering": {
"type": "ALSRecommender",
"params": {"factors": 50, "regularization": 0.01}
},
"content_based": {
"type": "ContentBasedRecommender",
"params": {"embedding_model": "sentence-transformers/all-MiniLM-L6-v2"}
}
},
primary_algorithm="collaborative_filtering",
enable_cold_start=True,
enable_real_time=True,
business_rules={
"inventory_aware": True,
"price_sensitivity": True,
"category_preferences": True
}
)

Step 2: Data Preparation

Load and Prepare Data

# data_preparation.py
import pandas as pd
from recoagent.packages.recommendations.data import DataPreprocessor

# Load interaction data
interactions = pd.read_csv("user_interactions.csv")
products = pd.read_csv("products.csv")
users = pd.read_csv("users.csv")

# Preprocess data
preprocessor = DataPreprocessor()
processed_interactions = preprocessor.preprocess_interactions(interactions)
processed_products = preprocessor.preprocess_items(products)
processed_users = preprocessor.preprocess_users(users)

print(f"Processed {len(processed_interactions)} interactions")
print(f"Processed {len(processed_products)} products")
print(f"Processed {len(processed_users)} users")

Data Quality Validation

# data_validation.py
from recoagent.packages.recommendations.data import DataValidator

# Validate data quality
validator = DataValidator()
quality_report = validator.validate_data(
interactions=processed_interactions,
users=processed_users,
items=processed_products
)

print("Data Quality Report:")
print(f"Completeness: {quality_report.completeness:.2%}")
print(f"Consistency: {quality_report.consistency:.2%}")
print(f"Validity: {quality_report.validity:.2%}")

Step 3: Build the Recommendation Service

Initialize Service

# recommendation_service.py
from recoagent.packages.recommendations.service import RecommendationService
from recoagent.packages.recommendations.agents import (
RecommendationAgent,
PersonalizationAgent,
BanditOptimizationAgent,
ColdStartAgent
)

class EcommerceRecommendationService:
def __init__(self, config):
self.config = config
self.service = RecommendationService(config)
self.recommendation_agent = RecommendationAgent()
self.personalization_agent = PersonalizationAgent()
self.bandit_agent = BanditOptimizationAgent()
self.cold_start_agent = ColdStartAgent()

async def initialize(self):
"""Initialize the recommendation service."""
await self.service.start()
print("✅ Recommendation service initialized")

async def get_recommendations(self, user_id, n_recommendations=10, context=None):
"""Get personalized recommendations for a user."""
try:
# Check if user is new (cold start)
is_new_user = await self._is_new_user(user_id)

if is_new_user:
# Handle cold start
recommendations = await self.cold_start_agent.handle_new_user(
user_id=user_id,
user_attributes=context or {}
)
else:
# Get personalized recommendations
recommendations = await self.recommendation_agent.get_recommendations(
user_id=user_id,
n_recommendations=n_recommendations,
context=context
)

# Apply business rules
filtered_recommendations = await self._apply_business_rules(
recommendations, context
)

return {
"success": True,
"recommendations": filtered_recommendations,
"is_new_user": is_new_user
}

except Exception as e:
return {
"success": False,
"error": str(e)
}

async def _is_new_user(self, user_id):
"""Check if user is new (has no interaction history)."""
# In a real implementation, check database
return len(user_id) < 5 # Simple heuristic for demo

async def _apply_business_rules(self, recommendations, context):
"""Apply business rules to recommendations."""
filtered = []
for rec in recommendations:
# Check inventory
if await self._check_inventory(rec):
# Check price sensitivity
if await self._check_price_sensitivity(rec, context):
filtered.append(rec)
return filtered

async def _check_inventory(self, recommendation):
"""Check if product is in stock."""
# In a real implementation, check inventory system
return True

async def _check_price_sensitivity(self, recommendation, context):
"""Check if product price matches user's price sensitivity."""
if not context or "budget" not in context:
return True

product_price = recommendation.get("price", 0)
user_budget = context["budget"]

return product_price <= user_budget * 1.2 # Allow 20% over budget

Step 4: Implement A/B Testing

A/B Testing for Recommendation Strategies

# ab_testing.py
from recoagent.packages.recommendations.bandits import ThompsonSamplingBandit

class RecommendationABTest:
def __init__(self):
self.bandit = ThompsonSamplingBandit(
n_arms=3, # Three recommendation strategies
alpha_prior=1.0,
beta_prior=1.0
)
self.strategies = {
0: "collaborative_filtering",
1: "content_based",
2: "hybrid"
}

async def select_strategy(self, user_id, context=None):
"""Select recommendation strategy using bandit algorithm."""
# Get user segment for context
user_segment = await self._get_user_segment(user_id)

# Select strategy
strategy_index = self.bandit.select_arm(
context={"user_segment": user_segment}
)

return self.strategies[strategy_index], strategy_index

async def update_feedback(self, user_id, strategy_index, reward, context=None):
"""Update bandit with user feedback."""
self.bandit.update(strategy_index, reward, context=context)

async def _get_user_segment(self, user_id):
"""Get user segment for contextual bandits."""
# In a real implementation, determine user segment
return "regular_user"

Step 5: Create the Main Application

FastAPI Application

# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional, List
import asyncio

app = FastAPI(title="E-commerce Recommendation API")

# Initialize services
recommendation_service = None
ab_test = None

@app.on_event("startup")
async def startup_event():
"""Initialize services on startup."""
global recommendation_service, ab_test

from config import config
recommendation_service = EcommerceRecommendationService(config)
await recommendation_service.initialize()

ab_test = RecommendationABTest()

print("🚀 Recommendation API ready!")

# Request/Response Models
class RecommendationRequest(BaseModel):
user_id: str
n_recommendations: int = 10
context: Optional[Dict[str, Any]] = None

class RecommendationResponse(BaseModel):
success: bool
recommendations: List[Dict[str, Any]]
is_new_user: bool
strategy_used: Optional[str] = None

class FeedbackRequest(BaseModel):
user_id: str
item_id: str
interaction_type: str
context: Optional[Dict[str, Any]] = None

# API Endpoints
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(request: RecommendationRequest):
"""Get personalized recommendations."""
try:
# Select strategy using A/B test
strategy, strategy_index = await ab_test.select_strategy(
request.user_id, request.context
)

# Get recommendations
result = await recommendation_service.get_recommendations(
user_id=request.user_id,
n_recommendations=request.n_recommendations,
context=request.context
)

if result["success"]:
return RecommendationResponse(
success=True,
recommendations=result["recommendations"],
is_new_user=result["is_new_user"],
strategy_used=strategy
)
else:
raise HTTPException(status_code=500, detail=result["error"])

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.post("/feedback")
async def record_feedback(request: FeedbackRequest):
"""Record user feedback on recommendations."""
try:
# Map interaction types to rewards
reward_mapping = {
"view": 0.1,
"click": 0.3,
"add_to_cart": 0.7,
"purchase": 1.0,
"dismiss": -0.2
}

reward = reward_mapping.get(request.interaction_type, 0.1)

# Update bandit with feedback
# In a real implementation, you'd need to track which strategy was used
await ab_test.update_feedback(
user_id=request.user_id,
strategy_index=0, # This should be tracked per user
reward=reward,
context=request.context
)

return {"success": True, "message": "Feedback recorded"}

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"service": "ecommerce_recommendations",
"timestamp": "2024-01-01T00:00:00Z"
}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Step 6: Testing and Evaluation

Unit Tests

# test_recommendations.py
import pytest
import asyncio
from recommendation_service import EcommerceRecommendationService
from config import config

@pytest.fixture
async def recommendation_service():
service = EcommerceRecommendationService(config)
await service.initialize()
return service

@pytest.mark.asyncio
async def test_get_recommendations(recommendation_service):
"""Test getting recommendations for a user."""
result = await recommendation_service.get_recommendations(
user_id="test_user_123",
n_recommendations=5,
context={"category": "electronics", "budget": 500}
)

assert result["success"] is True
assert len(result["recommendations"]) <= 5
assert "is_new_user" in result

@pytest.mark.asyncio
async def test_cold_start_handling(recommendation_service):
"""Test cold start handling for new users."""
result = await recommendation_service.get_recommendations(
user_id="new_user", # Short ID triggers cold start
n_recommendations=3
)

assert result["success"] is True
assert result["is_new_user"] is True

Performance Testing

# performance_test.py
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def performance_test():
"""Test recommendation system performance."""
service = EcommerceRecommendationService(config)
await service.initialize()

# Test concurrent requests
start_time = time.time()

tasks = []
for i in range(100):
task = service.get_recommendations(
user_id=f"user_{i}",
n_recommendations=10
)
tasks.append(task)

results = await asyncio.gather(*tasks)

end_time = time.time()
total_time = end_time - start_time

print(f"Processed 100 requests in {total_time:.2f} seconds")
print(f"Average response time: {total_time/100:.3f} seconds")
print(f"Success rate: {sum(1 for r in results if r['success'])/len(results):.2%}")

if __name__ == "__main__":
asyncio.run(performance_test())

Step 7: Monitoring and Analytics

Metrics Collection

# metrics.py
from recoagent.packages.recommendations.evaluation import RecommendationMetrics
import logging

class RecommendationAnalytics:
def __init__(self):
self.metrics = RecommendationMetrics()
self.logger = logging.getLogger(__name__)

async def track_recommendation(self, user_id, recommendations, strategy):
"""Track recommendation delivery."""
self.logger.info(f"Recommendations delivered to {user_id} using {strategy}")

# Track metrics
await self.metrics.record_recommendation_delivery(
user_id=user_id,
n_recommendations=len(recommendations),
strategy=strategy
)

async def track_interaction(self, user_id, item_id, interaction_type):
"""Track user interactions."""
self.logger.info(f"User {user_id} {interaction_type} item {item_id}")

# Track metrics
await self.metrics.record_interaction(
user_id=user_id,
item_id=item_id,
interaction_type=interaction_type
)

async def get_performance_report(self):
"""Get performance report."""
return await self.metrics.generate_report()

Step 8: Deployment

Docker Configuration

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

# docker-compose.yml
version: '3.8'

services:
recommendation-api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/recommendations
depends_on:
- redis
- postgres

redis:
image: redis:7-alpine
ports:
- "6379:6379"

postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=recommendations
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data

volumes:
postgres_data:

Best Practices

  1. Start Simple - Begin with collaborative filtering, add complexity gradually
  2. Monitor Performance - Track recommendation quality and system performance
  3. A/B Test Everything - Use bandit algorithms to optimize strategies
  4. Handle Cold Start - Implement proper cold start strategies for new users
  5. Real-time Updates - Enable real-time personalization for better UX
  6. Business Rules - Apply business constraints and inventory awareness
  7. Scalability - Design for horizontal scaling and high throughput

Next Steps

  • Advanced Algorithms - Implement sequential and graph-based models
  • Multimodal Recommendations - Add image and text features
  • Real-time Streaming - Implement real-time recommendation updates
  • Advanced Analytics - Add comprehensive analytics and reporting
  • MLOps Integration - Implement model versioning and deployment pipelines

Examples

See the examples section for complete working examples and additional use cases.