Rate Limiting Core
Comprehensive rate limiting service providing token bucket rate limiting, user tier management, provider rate limits, cost-based throttling, and intelligent queuing for enterprise RAG systems.
Core Classes
RateLimitingService
Description: Main rate limiting service integrating all rate limiting components
Parameters:
redis_client(redis.Redis): Redis client for distributed rate limitingtier_manager(UserTierManager, optional): User tier managementpricing_manager(ProviderPricingManager, optional): Provider pricing managementbudget_config(BudgetConfig, optional): Budget configurationqueue_config(QueueConfig, optional): Queue configuration
Returns: RateLimitingService instance
Example:
from recoagent.rate_limiting import RateLimitingService, UserTierManager, ProviderPricingManager
import redis.asyncio as redis
# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=0)
# Create tier manager
tier_manager = UserTierManager(
tiers={
"free": {"requests_per_minute": 10, "requests_per_hour": 100},
"premium": {"requests_per_minute": 100, "requests_per_hour": 1000},
"enterprise": {"requests_per_minute": 1000, "requests_per_hour": 10000}
}
)
# Create pricing manager
pricing_manager = ProviderPricingManager(
providers={
"openai": {"cost_per_token": 0.0001, "rate_limit": 1000},
"anthropic": {"cost_per_token": 0.0002, "rate_limit": 500}
}
)
# Create rate limiting service
rate_limiter = RateLimitingService(
redis_client=redis_client,
tier_manager=tier_manager,
pricing_manager=pricing_manager
)
# Check rate limit
result = await rate_limiter.check_rate_limit(
user_id="user_123",
tier="premium",
provider="openai",
request_cost=0.01
)
MultiBucketRateLimiter
Description: Token bucket rate limiter with multiple bucket types
Parameters:
bucket_configs(Dict[BucketType, BucketConfig]): Bucket configurationsredis_client(redis.Redis): Redis client for distributed bucketsenable_burst(bool): Enable burst capacity (default: True)
Returns: MultiBucketRateLimiter instance
Example:
from recoagent.rate_limiting import MultiBucketRateLimiter, BucketConfig, BucketType
# Create bucket configurations
bucket_configs = {
BucketType.REQUESTS_PER_MINUTE: BucketConfig(
capacity=100,
refill_rate=100,
refill_period=60
),
BucketType.REQUESTS_PER_HOUR: BucketConfig(
capacity=1000,
refill_rate=1000,
refill_period=3600
),
BucketType.TOKENS_PER_MINUTE: BucketConfig(
capacity=10000,
refill_rate=10000,
refill_period=60
)
}
# Create rate limiter
rate_limiter = MultiBucketRateLimiter(
bucket_configs=bucket_configs,
redis_client=redis_client,
enable_burst=True
)
# Check rate limit
allowed = await rate_limiter.consume_tokens(
user_id="user_123",
tokens_requested=100,
bucket_type=BucketType.TOKENS_PER_MINUTE
)
UserTierManager
Description: Manages user tiers and their rate limiting configurations
Parameters:
tiers(Dict[str, Dict]): Tier configurationsdefault_tier(str): Default tier for new users (default: "free")enable_tier_upgrades(bool): Enable automatic tier upgrades (default: True)
Returns: UserTierManager instance
Example:
from recoagent.rate_limiting import UserTierManager, UserTier
# Create tier manager
tier_manager = UserTierManager(
tiers={
"free": {
"requests_per_minute": 10,
"requests_per_hour": 100,
"cost_limit_per_day": 1.0,
"features": ["basic_search", "limited_queries"]
},
"premium": {
"requests_per_minute": 100,
"requests_per_hour": 1000,
"cost_limit_per_day": 10.0,
"features": ["advanced_search", "unlimited_queries", "priority_support"]
},
"enterprise": {
"requests_per_minute": 1000,
"requests_per_hour": 10000,
"cost_limit_per_day": 100.0,
"features": ["all_features", "custom_models", "dedicated_support"]
}
},
default_tier="free",
enable_tier_upgrades=True
)
# Get user tier
user_tier = tier_manager.get_user_tier("user_123")
print(f"User tier: {user_tier.name}")
print(f"Rate limits: {user_tier.rate_limits}")
CostBasedThrottler
Description: Cost-based throttling for budget management
Parameters:
budget_config(BudgetConfig): Budget configurationpricing_manager(ProviderPricingManager): Provider pricing managerenable_dynamic_throttling(bool): Enable dynamic throttling (default: True)
Returns: CostBasedThrottler instance
Example:
from recoagent.rate_limiting import CostBasedThrottler, BudgetConfig
# Create budget configuration
budget_config = BudgetConfig(
daily_budget=50.0,
monthly_budget=1000.0,
cost_thresholds={
"warning": 0.8, # 80% of budget
"throttle": 0.9, # 90% of budget
"block": 0.95 # 95% of budget
}
)
# Create cost throttler
cost_throttler = CostBasedThrottler(
budget_config=budget_config,
pricing_manager=pricing_manager,
enable_dynamic_throttling=True
)
# Check cost limit
throttle_result = await cost_throttler.check_cost_limit(
user_id="user_123",
request_cost=0.05,
provider="openai"
)
Usage Examples
Basic Rate Limiting Setup
from recoagent.rate_limiting import RateLimitingService, UserTierManager
import redis.asyncio as redis
# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=0)
# Create tier manager
tier_manager = UserTierManager(
tiers={
"free": {"requests_per_minute": 10, "requests_per_hour": 100},
"premium": {"requests_per_minute": 100, "requests_per_hour": 1000},
"enterprise": {"requests_per_minute": 1000, "requests_per_hour": 10000}
}
)
# Create rate limiting service
rate_limiter = RateLimitingService(
redis_client=redis_client,
tier_manager=tier_manager
)
# Check rate limit for user
async def check_user_rate_limit(user_id: str, tier: str):
"""Check rate limit for user request."""
result = await rate_limiter.check_rate_limit(
user_id=user_id,
tier=tier,
provider="openai",
request_cost=0.01
)
if result.allowed:
print(f"✅ Request allowed for user {user_id}")
return True
else:
print(f"❌ Request blocked for user {user_id}: {result.reason}")
if result.wait_time > 0:
print(f"⏳ Wait time: {result.wait_time:.2f} seconds")
return False
# Test rate limiting
await check_user_rate_limit("user_123", "free")
await check_user_rate_limit("user_456", "premium")
Advanced Multi-Bucket Rate Limiting
from recoagent.rate_limiting import MultiBucketRateLimiter, BucketConfig, BucketType
# Create comprehensive bucket configurations
bucket_configs = {
BucketType.REQUESTS_PER_MINUTE: BucketConfig(
capacity=100,
refill_rate=100,
refill_period=60,
burst_capacity=150
),
BucketType.REQUESTS_PER_HOUR: BucketConfig(
capacity=1000,
refill_rate=1000,
refill_period=3600,
burst_capacity=1200
),
BucketType.TOKENS_PER_MINUTE: BucketConfig(
capacity=10000,
refill_rate=10000,
refill_period=60,
burst_capacity=15000
),
BucketType.COST_PER_HOUR: BucketConfig(
capacity=10.0, # $10 per hour
refill_rate=10.0,
refill_period=3600,
burst_capacity=15.0
)
}
# Create multi-bucket rate limiter
multi_limiter = MultiBucketRateLimiter(
bucket_configs=bucket_configs,
redis_client=redis_client,
enable_burst=True
)
# Check multiple rate limits
async def check_comprehensive_rate_limit(user_id: str, request_data: Dict):
"""Check comprehensive rate limits."""
checks = [
{
"bucket_type": BucketType.REQUESTS_PER_MINUTE,
"tokens": 1,
"description": "Request count per minute"
},
{
"bucket_type": BucketType.TOKENS_PER_MINUTE,
"tokens": request_data.get("token_count", 100),
"description": "Token usage per minute"
},
{
"bucket_type": BucketType.COST_PER_HOUR,
"tokens": request_data.get("cost", 0.01),
"description": "Cost per hour"
}
]
results = []
for check in checks:
allowed = await multi_limiter.consume_tokens(
user_id=user_id,
tokens_requested=check["tokens"],
bucket_type=check["bucket_type"]
)
results.append({
"check": check["description"],
"allowed": allowed,
"bucket_type": check["bucket_type"]
})
# All checks must pass
all_allowed = all(result["allowed"] for result in results)
if all_allowed:
print("✅ All rate limits passed")
else:
print("❌ Some rate limits failed:")
for result in results:
if not result["allowed"]:
print(f" - {result['check']} failed")
return all_allowed
# Test comprehensive rate limiting
request_data = {
"token_count": 500,
"cost": 0.05
}
await check_comprehensive_rate_limit("user_123", request_data)
Cost-Based Throttling with Budget Management
from recoagent.rate_limiting import CostBasedThrottler, BudgetConfig, ProviderPricingManager
# Create provider pricing manager
pricing_manager = ProviderPricingManager(
providers={
"openai": {
"cost_per_token": 0.0001,
"rate_limit": 1000,
"models": {
"gpt-4": {"cost_per_token": 0.0003},
"gpt-3.5-turbo": {"cost_per_token": 0.0001}
}
},
"anthropic": {
"cost_per_token": 0.0002,
"rate_limit": 500,
"models": {
"claude-3": {"cost_per_token": 0.0004},
"claude-2": {"cost_per_token": 0.0002}
}
}
}
)
# Create budget configuration
budget_config = BudgetConfig(
daily_budget=50.0,
monthly_budget=1000.0,
cost_thresholds={
"warning": 0.7, # 70% of budget
"throttle": 0.85, # 85% of budget
"block": 0.95 # 95% of budget
},
enable_dynamic_throttling=True
)
# Create cost throttler
cost_throttler = CostBasedThrottler(
budget_config=budget_config,
pricing_manager=pricing_manager,
enable_dynamic_throttling=True
)
# Check cost-based throttling
async def check_cost_throttling(user_id: str, provider: str, model: str, token_count: int):
"""Check cost-based throttling."""
# Calculate request cost
model_config = pricing_manager.get_model_config(provider, model)
request_cost = token_count * model_config["cost_per_token"]
# Check cost limit
throttle_result = await cost_throttler.check_cost_limit(
user_id=user_id,
request_cost=request_cost,
provider=provider
)
print(f"Request cost: ${request_cost:.4f}")
print(f"Throttle action: {throttle_result.action}")
if throttle_result.action == "allow":
print("✅ Request allowed")
elif throttle_result.action == "throttle":
print("⚠️ Request throttled")
print(f"Suggested wait time: {throttle_result.wait_time:.2f} seconds")
elif throttle_result.action == "block":
print("❌ Request blocked - budget exceeded")
elif throttle_result.action == "fallback":
print(f"🔄 Using fallback model: {throttle_result.fallback_model}")
return throttle_result
# Test cost throttling
await check_cost_throttling("user_123", "openai", "gpt-4", 1000)
await check_cost_throttling("user_123", "anthropic", "claude-3", 500)
Intelligent Queue Management
from recoagent.rate_limiting import IntelligentQueueManager, QueueConfig, QueuePriority
# Create queue configuration
queue_config = QueueConfig(
max_queue_size=1000,
priority_levels={
QueuePriority.HIGH: {"weight": 3, "max_wait_time": 30},
QueuePriority.MEDIUM: {"weight": 2, "max_wait_time": 60},
QueuePriority.LOW: {"weight": 1, "max_wait_time": 120}
},
enable_priority_boost=True,
enable_auto_scaling=True
)
# Create queue manager
queue_manager = IntelligentQueueManager(
queue_config=queue_config,
redis_client=redis_client
)
# Queue management
async def manage_request_queue(user_id: str, request_data: Dict, priority: QueuePriority):
"""Manage request queue with priority."""
# Add request to queue
queue_position = await queue_manager.add_request(
user_id=user_id,
request_data=request_data,
priority=priority
)
print(f"Request queued at position: {queue_position}")
# Wait for processing
result = await queue_manager.wait_for_processing(
user_id=user_id,
request_id=request_data.get("request_id"),
timeout=120 # 2 minutes
)
if result.processed:
print("✅ Request processed successfully")
return result.response
else:
print(f"❌ Request failed: {result.error}")
return None
# Test queue management
request_data = {
"request_id": "req_123",
"query": "What is machine learning?",
"model": "gpt-4"
}
# High priority request
result = await manage_request_queue("user_123", request_data, QueuePriority.HIGH)
# Medium priority request
result = await manage_request_queue("user_456", request_data, QueuePriority.MEDIUM)
Real-time Rate Limit Monitoring
from recoagent.rate_limiting import RateLimitingService
import asyncio
# Create rate limiting service with monitoring
rate_limiter = RateLimitingService(
redis_client=redis_client,
tier_manager=tier_manager,
enable_monitoring=True
)
async def monitor_rate_limits():
"""Monitor rate limits in real-time."""
while True:
# Get rate limit statistics
stats = await rate_limiter.get_rate_limit_statistics()
print("=== Rate Limit Statistics ===")
print(f"Total requests: {stats['total_requests']}")
print(f"Blocked requests: {stats['blocked_requests']}")
print(f"Block rate: {stats['block_rate']:.2%}")
# Get tier statistics
tier_stats = stats['tier_statistics']
for tier, tier_data in tier_stats.items():
print(f"\n{tier.upper()} Tier:")
print(f" Active users: {tier_data['active_users']}")
print(f" Requests per minute: {tier_data['requests_per_minute']}")
print(f" Average wait time: {tier_data['avg_wait_time']:.2f}s")
# Get provider statistics
provider_stats = stats['provider_statistics']
for provider, provider_data in provider_stats.items():
print(f"\n{provider.upper()} Provider:")
print(f" Requests: {provider_data['requests']}")
print(f" Cost: ${provider_data['cost']:.2f}")
print(f" Rate limit utilization: {provider_data['rate_limit_utilization']:.2%}")
# Check for alerts
alerts = await rate_limiter.get_alerts()
if alerts:
print("\n🚨 ALERTS:")
for alert in alerts:
print(f" - {alert['type']}: {alert['message']}")
await asyncio.sleep(60) # Monitor every minute
# Run monitoring
asyncio.run(monitor_rate_limits())
API Reference
RateLimitingService Methods
check_rate_limit(user_id: str, tier: str, provider: str, request_cost: float) -> RateLimitResult
Check rate limit for user request
Parameters:
user_id(str): User identifiertier(str): User tierprovider(str): Provider namerequest_cost(float): Request cost
Returns: RateLimitResult with decision and metadata
get_rate_limit_statistics() -> Dict
Get comprehensive rate limit statistics
Returns: Dictionary with statistics
get_alerts() -> List[Dict]
Get current rate limit alerts
Returns: List of alert dictionaries
MultiBucketRateLimiter Methods
consume_tokens(user_id: str, tokens_requested: int, bucket_type: BucketType) -> bool
Consume tokens from bucket
Parameters:
user_id(str): User identifiertokens_requested(int): Number of tokens requestedbucket_type(BucketType): Type of bucket
Returns: True if tokens available
get_bucket_status(user_id: str, bucket_type: BucketType) -> Dict
Get bucket status for user
Parameters:
user_id(str): User identifierbucket_type(BucketType): Type of bucket
Returns: Dictionary with bucket status
UserTierManager Methods
get_user_tier(user_id: str) -> UserTier
Get user tier information
Parameters:
user_id(str): User identifier
Returns: UserTier object
upgrade_user_tier(user_id: str, new_tier: str) -> bool
Upgrade user tier
Parameters:
user_id(str): User identifiernew_tier(str): New tier name
Returns: True if upgrade successful
CostBasedThrottler Methods
check_cost_limit(user_id: str, request_cost: float, provider: str) -> ThrottleResult
Check cost-based throttling
Parameters:
user_id(str): User identifierrequest_cost(float): Request costprovider(str): Provider name
Returns: ThrottleResult with action and metadata
get_budget_status(user_id: str) -> Dict
Get budget status for user
Parameters:
user_id(str): User identifier
Returns: Dictionary with budget status
See Also
- Rate Limiting Tiers - Tier management system
- Analytics Core - Rate limiting analytics
- LLM Providers - Provider rate limits
- Security Core - Rate limiting security