Skip to main content

Rate Limiting Core

Comprehensive rate limiting service providing token bucket rate limiting, user tier management, provider rate limits, cost-based throttling, and intelligent queuing for enterprise RAG systems.

Core Classes

RateLimitingService

Description: Main rate limiting service integrating all rate limiting components

Parameters:

  • redis_client (redis.Redis): Redis client for distributed rate limiting
  • tier_manager (UserTierManager, optional): User tier management
  • pricing_manager (ProviderPricingManager, optional): Provider pricing management
  • budget_config (BudgetConfig, optional): Budget configuration
  • queue_config (QueueConfig, optional): Queue configuration

Returns: RateLimitingService instance

Example:

from recoagent.rate_limiting import RateLimitingService, UserTierManager, ProviderPricingManager
import redis.asyncio as redis

# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=0)

# Create tier manager
tier_manager = UserTierManager(
tiers={
"free": {"requests_per_minute": 10, "requests_per_hour": 100},
"premium": {"requests_per_minute": 100, "requests_per_hour": 1000},
"enterprise": {"requests_per_minute": 1000, "requests_per_hour": 10000}
}
)

# Create pricing manager
pricing_manager = ProviderPricingManager(
providers={
"openai": {"cost_per_token": 0.0001, "rate_limit": 1000},
"anthropic": {"cost_per_token": 0.0002, "rate_limit": 500}
}
)

# Create rate limiting service
rate_limiter = RateLimitingService(
redis_client=redis_client,
tier_manager=tier_manager,
pricing_manager=pricing_manager
)

# Check rate limit
result = await rate_limiter.check_rate_limit(
user_id="user_123",
tier="premium",
provider="openai",
request_cost=0.01
)

MultiBucketRateLimiter

Description: Token bucket rate limiter with multiple bucket types

Parameters:

  • bucket_configs (Dict[BucketType, BucketConfig]): Bucket configurations
  • redis_client (redis.Redis): Redis client for distributed buckets
  • enable_burst (bool): Enable burst capacity (default: True)

Returns: MultiBucketRateLimiter instance

Example:

from recoagent.rate_limiting import MultiBucketRateLimiter, BucketConfig, BucketType

# Create bucket configurations
bucket_configs = {
BucketType.REQUESTS_PER_MINUTE: BucketConfig(
capacity=100,
refill_rate=100,
refill_period=60
),
BucketType.REQUESTS_PER_HOUR: BucketConfig(
capacity=1000,
refill_rate=1000,
refill_period=3600
),
BucketType.TOKENS_PER_MINUTE: BucketConfig(
capacity=10000,
refill_rate=10000,
refill_period=60
)
}

# Create rate limiter
rate_limiter = MultiBucketRateLimiter(
bucket_configs=bucket_configs,
redis_client=redis_client,
enable_burst=True
)

# Check rate limit
allowed = await rate_limiter.consume_tokens(
user_id="user_123",
tokens_requested=100,
bucket_type=BucketType.TOKENS_PER_MINUTE
)

UserTierManager

Description: Manages user tiers and their rate limiting configurations

Parameters:

  • tiers (Dict[str, Dict]): Tier configurations
  • default_tier (str): Default tier for new users (default: "free")
  • enable_tier_upgrades (bool): Enable automatic tier upgrades (default: True)

Returns: UserTierManager instance

Example:

from recoagent.rate_limiting import UserTierManager, UserTier

# Create tier manager
tier_manager = UserTierManager(
tiers={
"free": {
"requests_per_minute": 10,
"requests_per_hour": 100,
"cost_limit_per_day": 1.0,
"features": ["basic_search", "limited_queries"]
},
"premium": {
"requests_per_minute": 100,
"requests_per_hour": 1000,
"cost_limit_per_day": 10.0,
"features": ["advanced_search", "unlimited_queries", "priority_support"]
},
"enterprise": {
"requests_per_minute": 1000,
"requests_per_hour": 10000,
"cost_limit_per_day": 100.0,
"features": ["all_features", "custom_models", "dedicated_support"]
}
},
default_tier="free",
enable_tier_upgrades=True
)

# Get user tier
user_tier = tier_manager.get_user_tier("user_123")
print(f"User tier: {user_tier.name}")
print(f"Rate limits: {user_tier.rate_limits}")

CostBasedThrottler

Description: Cost-based throttling for budget management

Parameters:

  • budget_config (BudgetConfig): Budget configuration
  • pricing_manager (ProviderPricingManager): Provider pricing manager
  • enable_dynamic_throttling (bool): Enable dynamic throttling (default: True)

Returns: CostBasedThrottler instance

Example:

from recoagent.rate_limiting import CostBasedThrottler, BudgetConfig

# Create budget configuration
budget_config = BudgetConfig(
daily_budget=50.0,
monthly_budget=1000.0,
cost_thresholds={
"warning": 0.8, # 80% of budget
"throttle": 0.9, # 90% of budget
"block": 0.95 # 95% of budget
}
)

# Create cost throttler
cost_throttler = CostBasedThrottler(
budget_config=budget_config,
pricing_manager=pricing_manager,
enable_dynamic_throttling=True
)

# Check cost limit
throttle_result = await cost_throttler.check_cost_limit(
user_id="user_123",
request_cost=0.05,
provider="openai"
)

Usage Examples

Basic Rate Limiting Setup

from recoagent.rate_limiting import RateLimitingService, UserTierManager
import redis.asyncio as redis

# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=0)

# Create tier manager
tier_manager = UserTierManager(
tiers={
"free": {"requests_per_minute": 10, "requests_per_hour": 100},
"premium": {"requests_per_minute": 100, "requests_per_hour": 1000},
"enterprise": {"requests_per_minute": 1000, "requests_per_hour": 10000}
}
)

# Create rate limiting service
rate_limiter = RateLimitingService(
redis_client=redis_client,
tier_manager=tier_manager
)

# Check rate limit for user
async def check_user_rate_limit(user_id: str, tier: str):
"""Check rate limit for user request."""
result = await rate_limiter.check_rate_limit(
user_id=user_id,
tier=tier,
provider="openai",
request_cost=0.01
)

if result.allowed:
print(f"✅ Request allowed for user {user_id}")
return True
else:
print(f"❌ Request blocked for user {user_id}: {result.reason}")
if result.wait_time > 0:
print(f"⏳ Wait time: {result.wait_time:.2f} seconds")
return False

# Test rate limiting
await check_user_rate_limit("user_123", "free")
await check_user_rate_limit("user_456", "premium")

Advanced Multi-Bucket Rate Limiting

from recoagent.rate_limiting import MultiBucketRateLimiter, BucketConfig, BucketType

# Create comprehensive bucket configurations
bucket_configs = {
BucketType.REQUESTS_PER_MINUTE: BucketConfig(
capacity=100,
refill_rate=100,
refill_period=60,
burst_capacity=150
),
BucketType.REQUESTS_PER_HOUR: BucketConfig(
capacity=1000,
refill_rate=1000,
refill_period=3600,
burst_capacity=1200
),
BucketType.TOKENS_PER_MINUTE: BucketConfig(
capacity=10000,
refill_rate=10000,
refill_period=60,
burst_capacity=15000
),
BucketType.COST_PER_HOUR: BucketConfig(
capacity=10.0, # $10 per hour
refill_rate=10.0,
refill_period=3600,
burst_capacity=15.0
)
}

# Create multi-bucket rate limiter
multi_limiter = MultiBucketRateLimiter(
bucket_configs=bucket_configs,
redis_client=redis_client,
enable_burst=True
)

# Check multiple rate limits
async def check_comprehensive_rate_limit(user_id: str, request_data: Dict):
"""Check comprehensive rate limits."""
checks = [
{
"bucket_type": BucketType.REQUESTS_PER_MINUTE,
"tokens": 1,
"description": "Request count per minute"
},
{
"bucket_type": BucketType.TOKENS_PER_MINUTE,
"tokens": request_data.get("token_count", 100),
"description": "Token usage per minute"
},
{
"bucket_type": BucketType.COST_PER_HOUR,
"tokens": request_data.get("cost", 0.01),
"description": "Cost per hour"
}
]

results = []
for check in checks:
allowed = await multi_limiter.consume_tokens(
user_id=user_id,
tokens_requested=check["tokens"],
bucket_type=check["bucket_type"]
)

results.append({
"check": check["description"],
"allowed": allowed,
"bucket_type": check["bucket_type"]
})

# All checks must pass
all_allowed = all(result["allowed"] for result in results)

if all_allowed:
print("✅ All rate limits passed")
else:
print("❌ Some rate limits failed:")
for result in results:
if not result["allowed"]:
print(f" - {result['check']} failed")

return all_allowed

# Test comprehensive rate limiting
request_data = {
"token_count": 500,
"cost": 0.05
}

await check_comprehensive_rate_limit("user_123", request_data)

Cost-Based Throttling with Budget Management

from recoagent.rate_limiting import CostBasedThrottler, BudgetConfig, ProviderPricingManager

# Create provider pricing manager
pricing_manager = ProviderPricingManager(
providers={
"openai": {
"cost_per_token": 0.0001,
"rate_limit": 1000,
"models": {
"gpt-4": {"cost_per_token": 0.0003},
"gpt-3.5-turbo": {"cost_per_token": 0.0001}
}
},
"anthropic": {
"cost_per_token": 0.0002,
"rate_limit": 500,
"models": {
"claude-3": {"cost_per_token": 0.0004},
"claude-2": {"cost_per_token": 0.0002}
}
}
}
)

# Create budget configuration
budget_config = BudgetConfig(
daily_budget=50.0,
monthly_budget=1000.0,
cost_thresholds={
"warning": 0.7, # 70% of budget
"throttle": 0.85, # 85% of budget
"block": 0.95 # 95% of budget
},
enable_dynamic_throttling=True
)

# Create cost throttler
cost_throttler = CostBasedThrottler(
budget_config=budget_config,
pricing_manager=pricing_manager,
enable_dynamic_throttling=True
)

# Check cost-based throttling
async def check_cost_throttling(user_id: str, provider: str, model: str, token_count: int):
"""Check cost-based throttling."""
# Calculate request cost
model_config = pricing_manager.get_model_config(provider, model)
request_cost = token_count * model_config["cost_per_token"]

# Check cost limit
throttle_result = await cost_throttler.check_cost_limit(
user_id=user_id,
request_cost=request_cost,
provider=provider
)

print(f"Request cost: ${request_cost:.4f}")
print(f"Throttle action: {throttle_result.action}")

if throttle_result.action == "allow":
print("✅ Request allowed")
elif throttle_result.action == "throttle":
print("⚠️ Request throttled")
print(f"Suggested wait time: {throttle_result.wait_time:.2f} seconds")
elif throttle_result.action == "block":
print("❌ Request blocked - budget exceeded")
elif throttle_result.action == "fallback":
print(f"🔄 Using fallback model: {throttle_result.fallback_model}")

return throttle_result

# Test cost throttling
await check_cost_throttling("user_123", "openai", "gpt-4", 1000)
await check_cost_throttling("user_123", "anthropic", "claude-3", 500)

Intelligent Queue Management

from recoagent.rate_limiting import IntelligentQueueManager, QueueConfig, QueuePriority

# Create queue configuration
queue_config = QueueConfig(
max_queue_size=1000,
priority_levels={
QueuePriority.HIGH: {"weight": 3, "max_wait_time": 30},
QueuePriority.MEDIUM: {"weight": 2, "max_wait_time": 60},
QueuePriority.LOW: {"weight": 1, "max_wait_time": 120}
},
enable_priority_boost=True,
enable_auto_scaling=True
)

# Create queue manager
queue_manager = IntelligentQueueManager(
queue_config=queue_config,
redis_client=redis_client
)

# Queue management
async def manage_request_queue(user_id: str, request_data: Dict, priority: QueuePriority):
"""Manage request queue with priority."""
# Add request to queue
queue_position = await queue_manager.add_request(
user_id=user_id,
request_data=request_data,
priority=priority
)

print(f"Request queued at position: {queue_position}")

# Wait for processing
result = await queue_manager.wait_for_processing(
user_id=user_id,
request_id=request_data.get("request_id"),
timeout=120 # 2 minutes
)

if result.processed:
print("✅ Request processed successfully")
return result.response
else:
print(f"❌ Request failed: {result.error}")
return None

# Test queue management
request_data = {
"request_id": "req_123",
"query": "What is machine learning?",
"model": "gpt-4"
}

# High priority request
result = await manage_request_queue("user_123", request_data, QueuePriority.HIGH)

# Medium priority request
result = await manage_request_queue("user_456", request_data, QueuePriority.MEDIUM)

Real-time Rate Limit Monitoring

from recoagent.rate_limiting import RateLimitingService
import asyncio

# Create rate limiting service with monitoring
rate_limiter = RateLimitingService(
redis_client=redis_client,
tier_manager=tier_manager,
enable_monitoring=True
)

async def monitor_rate_limits():
"""Monitor rate limits in real-time."""
while True:
# Get rate limit statistics
stats = await rate_limiter.get_rate_limit_statistics()

print("=== Rate Limit Statistics ===")
print(f"Total requests: {stats['total_requests']}")
print(f"Blocked requests: {stats['blocked_requests']}")
print(f"Block rate: {stats['block_rate']:.2%}")

# Get tier statistics
tier_stats = stats['tier_statistics']
for tier, tier_data in tier_stats.items():
print(f"\n{tier.upper()} Tier:")
print(f" Active users: {tier_data['active_users']}")
print(f" Requests per minute: {tier_data['requests_per_minute']}")
print(f" Average wait time: {tier_data['avg_wait_time']:.2f}s")

# Get provider statistics
provider_stats = stats['provider_statistics']
for provider, provider_data in provider_stats.items():
print(f"\n{provider.upper()} Provider:")
print(f" Requests: {provider_data['requests']}")
print(f" Cost: ${provider_data['cost']:.2f}")
print(f" Rate limit utilization: {provider_data['rate_limit_utilization']:.2%}")

# Check for alerts
alerts = await rate_limiter.get_alerts()
if alerts:
print("\n🚨 ALERTS:")
for alert in alerts:
print(f" - {alert['type']}: {alert['message']}")

await asyncio.sleep(60) # Monitor every minute

# Run monitoring
asyncio.run(monitor_rate_limits())

API Reference

RateLimitingService Methods

check_rate_limit(user_id: str, tier: str, provider: str, request_cost: float) -> RateLimitResult

Check rate limit for user request

Parameters:

  • user_id (str): User identifier
  • tier (str): User tier
  • provider (str): Provider name
  • request_cost (float): Request cost

Returns: RateLimitResult with decision and metadata

get_rate_limit_statistics() -> Dict

Get comprehensive rate limit statistics

Returns: Dictionary with statistics

get_alerts() -> List[Dict]

Get current rate limit alerts

Returns: List of alert dictionaries

MultiBucketRateLimiter Methods

consume_tokens(user_id: str, tokens_requested: int, bucket_type: BucketType) -> bool

Consume tokens from bucket

Parameters:

  • user_id (str): User identifier
  • tokens_requested (int): Number of tokens requested
  • bucket_type (BucketType): Type of bucket

Returns: True if tokens available

get_bucket_status(user_id: str, bucket_type: BucketType) -> Dict

Get bucket status for user

Parameters:

  • user_id (str): User identifier
  • bucket_type (BucketType): Type of bucket

Returns: Dictionary with bucket status

UserTierManager Methods

get_user_tier(user_id: str) -> UserTier

Get user tier information

Parameters:

  • user_id (str): User identifier

Returns: UserTier object

upgrade_user_tier(user_id: str, new_tier: str) -> bool

Upgrade user tier

Parameters:

  • user_id (str): User identifier
  • new_tier (str): New tier name

Returns: True if upgrade successful

CostBasedThrottler Methods

check_cost_limit(user_id: str, request_cost: float, provider: str) -> ThrottleResult

Check cost-based throttling

Parameters:

  • user_id (str): User identifier
  • request_cost (float): Request cost
  • provider (str): Provider name

Returns: ThrottleResult with action and metadata

get_budget_status(user_id: str) -> Dict

Get budget status for user

Parameters:

  • user_id (str): User identifier

Returns: Dictionary with budget status

See Also