Skip to main content

Handle Authentication & Authorization

Difficulty: ⭐⭐ Intermediate | Time: 1 hour

🎯 The Problem

Your RecoAgent API is exposed to the internet without authentication. Anyone can use it, racking up your LLM costs, or malicious users could abuse it. You need to secure it with proper authentication and authorization without making it complex for legitimate users.

This guide solves: Implementing API authentication (API keys, JWT, OAuth) and role-based access control to secure your RecoAgent deployment.

⚡ TL;DR - Quick API Key Auth

from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader

API_KEY_HEADER = APIKeyHeader(name="X-API-Key")

def verify_api_key(api_key: str = Security(API_KEY_HEADER)):
"""Verify API key"""
valid_keys = os.getenv("VALID_API_KEYS", "").split(",")
if api_key not in valid_keys:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key

# Protect your endpoints
@app.post("/api/query")
async def query(request: QueryRequest, api_key: str = Security(verify_api_key)):
result = await agent.run(request.query)
return result

# Test: curl -H "X-API-Key: your-key" http://localhost:8000/api/query

Result: API secured! Only requests with valid keys work.


Full Authentication Guide

Authentication Methods Comparison

MethodSecurityComplexityUse CaseSetup Time
API Keys⭐⭐ Medium⭐ SimpleInternal/trusted clients15 min
JWT Tokens⭐⭐⭐ High⭐⭐ MediumWeb/mobile apps30 min
OAuth 2.0⭐⭐⭐ High⭐⭐⭐ ComplexThird-party integrations2 hours
mTLS⭐⭐⭐⭐ Very High⭐⭐⭐ ComplexService-to-service3 hours

Option 1: API Key Authentication

Generate API Keys

import secrets
import hashlib

def generate_api_key(prefix: str = "sk") -> str:
"""Generate secure API key"""
random_part = secrets.token_urlsafe(32)
return f"{prefix}_{random_part}"

# Generate keys for different users/apps
user_keys = {
"user_123": generate_api_key(),
"app_dashboard": generate_api_key(),
"mobile_app": generate_api_key()
}

# Store hashed versions in database
def hash_api_key(key: str) -> str:
return hashlib.sha256(key.encode()).hexdigest()

Implement API Key Middleware

from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware

class APIKeyMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Skip auth for health check
if request.url.path == "/health":
return await call_next(request)

# Get API key from header
api_key = request.headers.get("X-API-Key")

if not api_key:
return JSONResponse(
{"error": "API key required"},
status_code=401
)

# Verify API key
if not await verify_key_in_database(api_key):
return JSONResponse(
{"error": "Invalid API key"},
status_code=403
)

# Add user info to request
request.state.user_id = await get_user_from_key(api_key)
request.state.tier = await get_user_tier(api_key)

return await call_next(request)

# Add to app
app.add_middleware(APIKeyMiddleware)

Option 2: JWT Token Authentication

Generate JWT Tokens

from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: timedelta = None):
"""Create JWT access token"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(hours=24))
to_encode.update({"exp": expire})

encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

# Login endpoint
@app.post("/auth/login")
async def login(username: str, password: str):
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")

token = create_access_token(data={"sub": user.id, "role": user.role})
return {"access_token": token, "token_type": "bearer"}

Verify JWT Tokens

from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer

security = HTTPBearer()

async def verify_jwt(credentials = Depends(security)):
"""Verify JWT token"""
token = credentials.credentials

try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
role = payload.get("role")

return {"user_id": user_id, "role": role}

except JWTError:
raise HTTPException(status_code=403, detail="Invalid token")

# Protect endpoints
@app.post("/api/query")
async def query(request: QueryRequest, auth = Depends(verify_jwt)):
result = await agent.run(request.query, user_id=auth["user_id"])
return result

Role-Based Access Control (RBAC)

from enum import Enum

class Role(Enum):
ADMIN = "admin"
USER = "user"
READONLY = "readonly"

# Role permissions
ROLE_PERMISSIONS = {
Role.ADMIN: ["query", "admin", "manage_users", "view_logs"],
Role.USER: ["query", "view_history"],
Role.READONLY: ["view_history"]
}

def require_permission(required_permission: str):
"""Decorator to check permissions"""
def decorator(func):
async def wrapper(*args, auth=None, **kwargs):
user_role = Role(auth["role"])
permissions = ROLE_PERMISSIONS[user_role]

if required_permission not in permissions:
raise HTTPException(
status_code=403,
detail=f"Permission '{required_permission}' required"
)

return await func(*args, **kwargs)
return wrapper
return decorator

# Use on endpoints
@app.post("/api/query")
@require_permission("query")
async def query(request: QueryRequest, auth = Depends(verify_jwt)):
return await agent.run(request.query)

@app.get("/api/admin/users")
@require_permission("manage_users")
async def list_users(auth = Depends(verify_jwt)):
return get_all_users()

Rate Limiting by User

from packages.rate_limiting import RateLimiter

# Different limits per tier
RATE_LIMITS = {
"free": 10, # 10 requests/minute
"pro": 100, # 100 requests/minute
"enterprise": 1000 # 1000 requests/minute
}

async def check_rate_limit(request: Request, auth = Depends(verify_jwt)):
"""Check user-specific rate limit"""
user_tier = auth.get("tier", "free")
limit = RATE_LIMITS[user_tier]

limiter = RateLimiter(user_id=auth["user_id"], limit=limit)

if not await limiter.allow():
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Limit: {limit} req/min"
)

# Apply to endpoints
@app.post("/api/query")
async def query(
request: QueryRequest,
auth = Depends(verify_jwt),
_= Depends(check_rate_limit)
):
return await agent.run(request.query)

Monitoring Auth Events

from packages.observability import SecurityMonitor

monitor = SecurityMonitor()

# Log all auth attempts
@app.post("/auth/login")
async def login(username: str, password: str):
try:
user = authenticate_user(username, password)

monitor.log_event("login_success", {
"user_id": user.id,
"ip": request.client.host
})

return {"token": create_access_token({"sub": user.id})}

except AuthError:
monitor.log_event("login_failed", {
"username": username,
"ip": request.client.host
})
raise HTTPException(status_code=401)

# Alert on suspicious activity
if monitor.get_failed_attempts(ip_address) > 5:
monitor.alert("potential_brute_force", {"ip": ip_address})

Testing Authentication

import pytest
from fastapi.testclient import TestClient

def test_requires_authentication():
"""Test endpoint requires auth"""
client = TestClient(app)

# Without API key
response = client.post("/api/query", json={"query": "test"})
assert response.status_code == 401

# With valid API key
response = client.post(
"/api/query",
json={"query": "test"},
headers={"X-API-Key": "valid-key"}
)
assert response.status_code == 200

def test_invalid_api_key():
"""Test invalid key is rejected"""
client = TestClient(app)
response = client.post(
"/api/query",
json={"query": "test"},
headers={"X-API-Key": "invalid-key"}
)
assert response.status_code == 403

Security Checklist

  • ✅ All endpoints require authentication (except /health)
  • ✅ API keys are hashed in database
  • ✅ Secrets stored in environment/vault (not code)
  • ✅ Rate limiting per user/tier
  • ✅ Role-based access control implemented
  • ✅ Auth events logged and monitored
  • ✅ Failed login attempts tracked
  • ✅ Token expiration configured
  • ✅ HTTPS enforced in production
  • ✅ CORS properly configured

What You've Accomplished

Implemented API key authentication for simple security
Configured JWT tokens for stateless auth
Set up role-based access control (RBAC)
Added user-specific rate limiting
Established security monitoring and alerts
Tested authentication flows

Next Steps