Implementing A/B Testing with Bandit Algorithms
This guide shows you how to implement intelligent A/B testing using RecoAgent's bandit algorithms for optimal exploration-exploitation balance.
Overview
A/B testing with bandit algorithms provides several advantages over traditional A/B testing:
- Adaptive Learning - Continuously learns and adapts to user preferences
- Reduced Regret - Minimizes the cost of showing suboptimal variants
- Real-time Optimization - Adjusts traffic allocation in real-time
- Contextual Decisions - Makes decisions based on user context
Prerequisites
- RecoAgent with enterprise features:
pip install recoagent[enterprise] - Basic understanding of A/B testing concepts
- Python 3.9+
Step 1: Choose the Right Bandit Algorithm
Thompson Sampling (Recommended)
Best for most use cases - balances exploration and exploitation well.
from recoagent.packages.recommendations.bandits import ThompsonSamplingBandit
bandit = ThompsonSamplingBandit(
n_arms=3, # Number of variants
alpha_prior=1.0, # Prior alpha parameter
beta_prior=1.0 # Prior beta parameter
)
Upper Confidence Bound (UCB)
Good for when you want more explicit exploration control.
from recoagent.packages.recommendations.bandits import UCBBandit
bandit = UCBBandit(
n_arms=3,
confidence_level=0.95, # Confidence level
exploration_factor=2.0 # Exploration factor
)
Epsilon-Greedy
Simple and interpretable, good for getting started.
from recoagent.packages.recommendations.bandits import EpsilonGreedyBandit
bandit = EpsilonGreedyBandit(
n_arms=3,
epsilon=0.1, # Exploration probability
decay_rate=0.99 # Epsilon decay over time
)
Step 2: Basic A/B Testing Implementation
Simple A/B Test
# simple_ab_test.py
from recoagent.packages.recommendations.bandits import ThompsonSamplingBandit
import asyncio
class SimpleABTest:
def __init__(self, test_name, variants):
self.test_name = test_name
self.variants = variants
self.bandit = ThompsonSamplingBandit(n_arms=len(variants))
def select_variant(self, user_id=None):
"""Select a variant for the user."""
arm_index = self.bandit.select_arm()
return self.variants[arm_index], arm_index
def update_feedback(self, arm_index, reward):
"""Update the bandit with user feedback."""
self.bandit.update(arm_index, reward)
def get_stats(self):
"""Get current test statistics."""
return {
"test_name": self.test_name,
"variants": self.variants,
"arm_counts": self.bandit.get_arm_counts(),
"arm_rewards": self.bandit.get_arm_rewards()
}
# Example usage
ab_test = SimpleABTest(
test_name="homepage_layout",
variants=["layout_a", "layout_b", "layout_c"]
)
# Select variant for user
variant, arm_index = ab_test.select_variant("user_123")
print(f"Selected variant: {variant}")
# Simulate user interaction (click = 1, no click = 0)
user_clicked = True # In real implementation, this comes from user behavior
reward = 1.0 if user_clicked else 0.0
# Update bandit with feedback
ab_test.update_feedback(arm_index, reward)
# Get test statistics
stats = ab_test.get_stats()
print(f"Test stats: {stats}")
Step 3: Contextual A/B Testing
Using LinUCB for Contextual Decisions
# contextual_ab_test.py
from recoagent.packages.recommendations.bandits import LinUCBBandit
import numpy as np
class ContextualABTest:
def __init__(self, test_name, variants, n_features):
self.test_name = test_name
self.variants = variants
self.bandit = LinUCBBandit(
n_features=n_features,
alpha=1.0
)
def select_variant(self, user_context):
"""Select variant based on user context."""
# Convert context to feature vector
context_vector = self._context_to_features(user_context)
arm_index = self.bandit.select_arm(context=context_vector)
return self.variants[arm_index], arm_index
def update_feedback(self, arm_index, reward, user_context):
"""Update bandit with feedback and context."""
context_vector = self._context_to_features(user_context)
self.bandit.update(arm_index, reward, context=context_vector)
def _context_to_features(self, context):
"""Convert user context to feature vector."""
# Example feature engineering
features = np.zeros(5) # 5 features
# User segment (one-hot encoded)
if context.get("user_segment") == "premium":
features[0] = 1
elif context.get("user_segment") == "regular":
features[1] = 1
# Time of day
if context.get("time_of_day") == "morning":
features[2] = 1
elif context.get("time_of_day") == "evening":
features[3] = 1
# Device type
if context.get("device") == "mobile":
features[4] = 1
return features
# Example usage
contextual_test = ContextualABTest(
test_name="recommendation_algorithm",
variants=["collaborative_filtering", "content_based", "hybrid"],
n_features=5
)
# User context
user_context = {
"user_segment": "premium",
"time_of_day": "evening",
"device": "mobile"
}
# Select variant based on context
variant, arm_index = contextual_test.select_variant(user_context)
print(f"Selected variant for premium mobile user: {variant}")
# Update with feedback
reward = 0.8 # User engagement score
contextual_test.update_feedback(arm_index, reward, user_context)
Step 4: Advanced A/B Testing Framework
Comprehensive A/B Testing System
# advanced_ab_test.py
from recoagent.packages.recommendations.bandits import ThompsonSamplingBandit
from recoagent.packages.recommendations.evaluation import ABTestMetrics
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
class AdvancedABTest:
def __init__(self, test_name, variants, algorithm="thompson_sampling",
min_samples=100, max_duration_days=30):
self.test_name = test_name
self.variants = variants
self.algorithm = algorithm
self.min_samples = min_samples
self.max_duration_days = max_duration_days
# Initialize bandit
if algorithm == "thompson_sampling":
self.bandit = ThompsonSamplingBandit(n_arms=len(variants))
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
# Initialize metrics
self.metrics = ABTestMetrics()
# Test metadata
self.start_time = datetime.now()
self.is_active = True
self.logger = logging.getLogger(__name__)
async def select_variant(self, user_id, context=None):
"""Select variant for user with context."""
if not self.is_active:
return self._get_winner(), None
# Check if test should be stopped
if await self._should_stop_test():
await self._stop_test()
return self._get_winner(), None
# Select variant
arm_index = self.bandit.select_arm(context=context)
variant = self.variants[arm_index]
# Track selection
await self.metrics.record_variant_selection(
test_name=self.test_name,
user_id=user_id,
variant=variant,
arm_index=arm_index,
context=context
)
return variant, arm_index
async def update_feedback(self, user_id, arm_index, reward, context=None):
"""Update bandit with user feedback."""
if not self.is_active:
return
# Update bandit
self.bandit.update(arm_index, reward, context=context)
# Track feedback
await self.metrics.record_feedback(
test_name=self.test_name,
user_id=user_id,
arm_index=arm_index,
reward=reward,
context=context
)
# Check for statistical significance
if await self._check_significance():
await self._stop_test()
async def _should_stop_test(self):
"""Check if test should be stopped."""
# Check duration
if datetime.now() - self.start_time > timedelta(days=self.max_duration_days):
return True
# Check minimum samples
arm_counts = self.bandit.get_arm_counts()
if all(count >= self.min_samples for count in arm_counts):
return True
return False
async def _check_significance(self):
"""Check if results are statistically significant."""
# In a real implementation, use proper statistical tests
# For now, use a simple heuristic
arm_counts = self.bandit.get_arm_counts()
arm_rewards = self.bandit.get_arm_rewards()
if all(count >= self.min_samples for count in arm_counts):
# Calculate confidence intervals and check for significance
return True
return False
async def _stop_test(self):
"""Stop the A/B test and determine winner."""
self.is_active = False
winner = self._get_winner()
self.logger.info(f"A/B test '{self.test_name}' stopped. Winner: {winner}")
# Generate final report
await self._generate_final_report()
def _get_winner(self):
"""Get the winning variant."""
arm_rewards = self.bandit.get_arm_rewards()
winner_index = max(range(len(arm_rewards)), key=lambda i: arm_rewards[i])
return self.variants[winner_index]
async def _generate_final_report(self):
"""Generate final A/B test report."""
report = {
"test_name": self.test_name,
"start_time": self.start_time.isoformat(),
"end_time": datetime.now().isoformat(),
"duration_days": (datetime.now() - self.start_time).days,
"winner": self._get_winner(),
"arm_stats": {
variant: {
"count": self.bandit.get_arm_counts()[i],
"total_reward": self.bandit.get_arm_rewards()[i],
"average_reward": self.bandit.get_arm_rewards()[i] / max(1, self.bandit.get_arm_counts()[i])
}
for i, variant in enumerate(self.variants)
}
}
await self.metrics.save_final_report(report)
return report
async def get_test_status(self):
"""Get current test status."""
return {
"test_name": self.test_name,
"is_active": self.is_active,
"start_time": self.start_time.isoformat(),
"duration_days": (datetime.now() - self.start_time).days,
"arm_stats": {
variant: {
"count": self.bandit.get_arm_counts()[i],
"total_reward": self.bandit.get_arm_rewards()[i],
"average_reward": self.bandit.get_arm_rewards()[i] / max(1, self.bandit.get_arm_counts()[i])
}
for i, variant in enumerate(self.variants)
}
}
Step 5: Integration with Recommendation Systems
A/B Testing for Recommendation Algorithms
# recommendation_ab_test.py
from advanced_ab_test import AdvancedABTest
from recoagent.packages.recommendations.agents import RecommendationAgent
class RecommendationABTest:
def __init__(self):
self.ab_test = AdvancedABTest(
test_name="recommendation_algorithm",
variants=["collaborative_filtering", "content_based", "hybrid"],
algorithm="thompson_sampling",
min_samples=1000,
max_duration_days=14
)
self.recommendation_agent = RecommendationAgent()
async def get_recommendations(self, user_id, n_recommendations=10, context=None):
"""Get recommendations using A/B tested algorithm."""
# Select algorithm variant
algorithm, arm_index = await self.ab_test.select_variant(
user_id=user_id,
context=context
)
# Get recommendations using selected algorithm
recommendations = await self.recommendation_agent.get_recommendations(
user_id=user_id,
n_recommendations=n_recommendations,
algorithm=algorithm,
context=context
)
# Store algorithm used for feedback
recommendations["_algorithm_used"] = algorithm
recommendations["_arm_index"] = arm_index
return recommendations
async def record_feedback(self, user_id, item_id, interaction_type, context=None):
"""Record user feedback and update A/B test."""
# Map interaction types to rewards
reward_mapping = {
"view": 0.1,
"click": 0.3,
"add_to_cart": 0.7,
"purchase": 1.0,
"dismiss": -0.2
}
reward = reward_mapping.get(interaction_type, 0.1)
# In a real implementation, you'd need to track which algorithm was used
# for this user's recommendations. This is a simplified version.
arm_index = 0 # This should be retrieved from user's recommendation history
await self.ab_test.update_feedback(
user_id=user_id,
arm_index=arm_index,
reward=reward,
context=context
)
async def get_test_status(self):
"""Get A/B test status."""
return await self.ab_test.get_test_status()
Step 6: FastAPI Integration
API Endpoints for A/B Testing
# ab_test_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional, List
import asyncio
app = FastAPI(title="A/B Testing API")
# Initialize A/B test
recommendation_ab_test = RecommendationABTest()
# Request/Response Models
class RecommendationRequest(BaseModel):
user_id: str
n_recommendations: int = 10
context: Optional[Dict[str, Any]] = None
class FeedbackRequest(BaseModel):
user_id: str
item_id: str
interaction_type: str
context: Optional[Dict[str, Any]] = None
class ABTestRequest(BaseModel):
test_name: str
variants: List[str]
algorithm: str = "thompson_sampling"
min_samples: int = 100
max_duration_days: int = 30
# API Endpoints
@app.post("/recommendations")
async def get_recommendations(request: RecommendationRequest):
"""Get recommendations using A/B tested algorithm."""
try:
recommendations = await recommendation_ab_test.get_recommendations(
user_id=request.user_id,
n_recommendations=request.n_recommendations,
context=request.context
)
return {
"success": True,
"recommendations": recommendations["recommendations"],
"algorithm_used": recommendations["_algorithm_used"],
"ab_test_active": True
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/feedback")
async def record_feedback(request: FeedbackRequest):
"""Record user feedback for A/B test."""
try:
await recommendation_ab_test.record_feedback(
user_id=request.user_id,
item_id=request.item_id,
interaction_type=request.interaction_type,
context=request.context
)
return {"success": True, "message": "Feedback recorded"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/ab-test/status")
async def get_ab_test_status():
"""Get A/B test status."""
try:
status = await recommendation_ab_test.get_test_status()
return status
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ab-test/create")
async def create_ab_test(request: ABTestRequest):
"""Create a new A/B test."""
try:
# In a real implementation, you'd create a new A/B test
# For now, we'll just return success
return {
"success": True,
"message": f"A/B test '{request.test_name}' created",
"test_config": request.dict()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Step 7: Monitoring and Analytics
A/B Test Analytics
# ab_test_analytics.py
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
class ABTestAnalytics:
def __init__(self):
self.metrics = ABTestMetrics()
async def generate_report(self, test_name, start_date=None, end_date=None):
"""Generate comprehensive A/B test report."""
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Get test data
test_data = await self.metrics.get_test_data(
test_name=test_name,
start_date=start_date,
end_date=end_date
)
# Calculate metrics
report = {
"test_name": test_name,
"period": f"{start_date.date()} to {end_date.date()}",
"total_users": len(test_data),
"conversion_rates": self._calculate_conversion_rates(test_data),
"statistical_significance": self._calculate_significance(test_data),
"confidence_intervals": self._calculate_confidence_intervals(test_data),
"recommendation": self._generate_recommendation(test_data)
}
return report
def _calculate_conversion_rates(self, test_data):
"""Calculate conversion rates for each variant."""
conversion_rates = {}
for variant in test_data["variant"].unique():
variant_data = test_data[test_data["variant"] == variant]
conversions = variant_data[variant_data["reward"] > 0.5].shape[0]
total = variant_data.shape[0]
conversion_rates[variant] = {
"rate": conversions / total if total > 0 else 0,
"conversions": conversions,
"total": total
}
return conversion_rates
def _calculate_significance(self, test_data):
"""Calculate statistical significance."""
# In a real implementation, use proper statistical tests
# For now, return a placeholder
return {
"p_value": 0.05,
"significant": True,
"confidence_level": 0.95
}
def _calculate_confidence_intervals(self, test_data):
"""Calculate confidence intervals for conversion rates."""
# In a real implementation, calculate proper confidence intervals
return {
"variant_a": {"lower": 0.15, "upper": 0.25},
"variant_b": {"lower": 0.18, "upper": 0.28},
"variant_c": {"lower": 0.12, "upper": 0.22}
}
def _generate_recommendation(self, test_data):
"""Generate recommendation based on test results."""
conversion_rates = self._calculate_conversion_rates(test_data)
# Find best performing variant
best_variant = max(conversion_rates.keys(),
key=lambda v: conversion_rates[v]["rate"])
return {
"winner": best_variant,
"improvement": f"{conversion_rates[best_variant]['rate']:.1%} conversion rate",
"action": "Implement winning variant" if self._calculate_significance(test_data)["significant"] else "Continue testing"
}
async def plot_test_results(self, test_name, save_path=None):
"""Plot A/B test results."""
test_data = await self.metrics.get_test_data(test_name=test_name)
# Create plots
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Conversion rates over time
daily_conversions = test_data.groupby(['date', 'variant'])['reward'].mean().unstack()
daily_conversions.plot(ax=axes[0, 0], title='Conversion Rates Over Time')
# Sample sizes
daily_samples = test_data.groupby(['date', 'variant']).size().unstack()
daily_samples.plot(ax=axes[0, 1], title='Sample Sizes Over Time')
# Cumulative rewards
cumulative_rewards = test_data.groupby(['variant'])['reward'].cumsum()
cumulative_rewards.plot(ax=axes[1, 0], title='Cumulative Rewards')
# Final conversion rates
final_rates = self._calculate_conversion_rates(test_data)
variants = list(final_rates.keys())
rates = [final_rates[v]['rate'] for v in variants]
axes[1, 1].bar(variants, rates)
axes[1, 1].set_title('Final Conversion Rates')
axes[1, 1].set_ylabel('Conversion Rate')
plt.tight_layout()
if save_path:
plt.savefig(save_path)
else:
plt.show()
Best Practices
- Start Simple - Begin with epsilon-greedy, move to more sophisticated algorithms
- Set Clear Goals - Define success metrics before starting the test
- Monitor Continuously - Track performance and stop tests when appropriate
- Use Context - Leverage user context for better decisions
- Handle Non-stationarity - Adapt to changing user behavior
- Statistical Rigor - Use proper statistical tests for significance
- Document Everything - Keep detailed records of test configurations and results
Common Pitfalls
- Peeking - Don't check results too frequently
- Insufficient Sample Size - Ensure adequate sample sizes for significance
- Multiple Testing - Account for multiple comparisons
- Seasonality - Consider seasonal effects in your data
- User Segments - Test across different user segments
Examples
See the examples section for complete working examples of A/B testing implementations.