Implement Production Observability for RAG Systems
Difficulty: ⭐⭐⭐⭐ Expert | Time: 3 hours
🎯 The Problem
You need comprehensive observability for your RAG system in production - monitoring, tracing, alerting, and evaluation. Without proper observability, you can't detect issues, understand performance bottlenecks, or ensure your system meets quality standards.
This guide solves: Implementing end-to-end observability with OpenTelemetry tracing, Prometheus metrics, Grafana dashboards, SLO monitoring, synthetic testing, and RAG evaluation integration.
⚡ TL;DR - Quick Observability Setup
from packages.observability import ObservabilitySetup, RAGSLORegistry, SyntheticMonitor
# 1. Initialize observability
obs_setup = ObservabilitySetup(service_name="recoagent-rag")
# 2. Setup FastAPI app
obs_setup.setup_fastapi(app)
# 3. Configure SLOs
slo_registry = RAGSLORegistry(obs_setup.get_metrics_collector())
slo_registry.initialize_default_slos()
# 4. Setup synthetic monitoring
synthetic_monitor = SyntheticMonitor(
base_url="http://localhost:8000",
metrics_collector=obs_setup.get_metrics_collector(),
logger=obs_setup.get_logger(),
trace_collector=obs_setup.get_trace_collector()
)
# 5. Start monitoring
await synthetic_monitor.start_monitoring()
# Expected: Full observability stack with dashboards, alerts, and evaluation
Impact: Production-ready monitoring with 99.9% uptime visibility and quality assurance!
Full Observability Implementation Guide
Architecture Overview
Core Components
1. Observability Setup
The main orchestrator that sets up all observability components.
from packages.observability import ObservabilitySetup
# Initialize complete observability stack
obs_setup = ObservabilitySetup(service_name="recoagent-rag")
# Setup FastAPI application
obs_setup.setup_fastapi(app)
# Get individual components
metrics_collector = obs_setup.get_metrics_collector()
trace_collector = obs_setup.get_trace_collector()
logger = obs_setup.get_logger()
Features:
- OpenTelemetry Integration: Automatic instrumentation
- Correlation IDs: Request tracing across services
- Health Checks: Built-in health monitoring
- Metrics Endpoint: Prometheus metrics exposure
2. Metrics Collection
Comprehensive metrics for RAG system monitoring.
from packages.observability import MetricsCollector
# Initialize metrics collector
metrics = MetricsCollector()
# Track RAG-specific metrics
metrics.record_rag_query(
query_id="query_123",
latency_ms=850.0,
success=True,
model="gpt-4",
tokens_used=150
)
# Track business metrics
metrics.record_user_interaction(
user_id="user_456",
action="query",
session_id="session_789"
)
# Get metrics summary
summary = metrics.get_metrics_summary()
print(f"Total queries: {summary.total_queries}")
print(f"Average latency: {summary.avg_latency_ms}ms")
print(f"Success rate: {summary.success_rate:.2%}")
Key Metrics:
- Performance: Latency, throughput, error rates
- Business: User interactions, query patterns
- Quality: RAG evaluation scores
- Infrastructure: CPU, memory, disk usage
3. Distributed Tracing
End-to-end request tracing with OpenTelemetry.
from packages.observability import TraceCollector, RAGTracer
# Initialize trace collector
trace_collector = TraceCollector(service_name="recoagent-rag")
# Start a trace
with trace_collector.start_span("rag_query_processing") as span:
span.set_attribute("query", "What is machine learning?")
span.set_attribute("user_id", "user_123")
# Process query
result = await rag_agent.process_query(query)
span.set_attribute("response_length", len(result["response"]))
span.set_attribute("tokens_used", result["tokens_used"])
# RAG-specific tracer
rag_tracer = RAGTracer(trace_collector)
# Trace RAG pipeline
with rag_tracer.trace_rag_pipeline(query) as pipeline_span:
# Retrieval phase
with rag_tracer.trace_retrieval() as retrieval_span:
docs = await retriever.retrieve(query)
retrieval_span.set_attribute("documents_found", len(docs))
# Generation phase
with rag_tracer.trace_generation() as generation_span:
response = await generator.generate(query, docs)
generation_span.set_attribute("response_tokens", response["tokens"])
4. SLO Monitoring
Service Level Objectives with burn-rate alerting.
from packages.observability import RAGSLORegistry, SLOThreshold, BurnRateRule
# Initialize SLO registry
slo_registry = RAGSLORegistry(metrics_collector)
# Define custom SLO
custom_slo = RAGSLODefinition(
name="response_quality",
description="RAG response quality score",
metric_name="rag_response_quality_score",
metric_query="avg(rag_response_quality_score)",
thresholds=[
SLOThreshold(
name="target",
target=0.85, # 85% quality target
window="5m",
description="85% of responses should meet quality standards",
severity="warning"
)
],
burn_rate_rules=[
BurnRateRule(
name="quality_degradation",
slo_name="response_quality",
window="10m",
threshold=2.0, # 2x burn rate
severity="critical",
description="Quality degradation detected",
runbook_url="https://docs.recoagent.com/runbooks/quality-degradation"
)
]
)
# Register SLO
slo_registry.register_slo(custom_slo)
# Check SLO status
slo_status = slo_registry.get_slo_status("response_quality")
print(f"SLO Status: {slo_status.status}")
print(f"Current Value: {slo_status.current_value}")
print(f"Error Budget: {slo_status.error_budget_remaining}")
Default SLOs:
- Query Success Rate: 99% success rate target
- Response Latency: P95 < 2 seconds
- Response Quality: 85% quality score
- Availability: 99.9% uptime
5. Synthetic Monitoring
Automated testing and monitoring of your RAG system.
from packages.observability import SyntheticMonitor, SyntheticTest
# Initialize synthetic monitor
synthetic_monitor = SyntheticMonitor(
base_url="http://localhost:8000",
metrics_collector=metrics_collector,
logger=logger,
trace_collector=trace_collector
)
# Define custom test
custom_test = SyntheticTest(
name="rag_query_test",
description="Test RAG query endpoint",
endpoint="/api/query",
method="POST",
payload={
"query": "What is artificial intelligence?",
"user_id": "synthetic_user"
},
expected_status=200,
expected_latency_ms=5000,
frequency_seconds=300 # Every 5 minutes
)
# Add test
synthetic_monitor.add_test(custom_test)
# Start monitoring
await synthetic_monitor.start_monitoring()
# Check test results
results = synthetic_monitor.get_test_results("rag_query_test")
for result in results[-10:]: # Last 10 results
print(f"Test: {result.test_name}")
print(f"Success: {result.success}")
print(f"Latency: {result.latency_ms}ms")
Default Tests:
- Health Check: Basic endpoint availability
- RAG Query: End-to-end query processing
- Authentication: Auth endpoint testing
- Database: Database connectivity
6. RAG Evaluation Integration
Real-time RAG quality evaluation with RAGAS and TruLens.
from packages.observability import RAGEvaluationPipeline, RagasEvaluator, TruLensEvaluator
# Initialize evaluation pipeline
eval_pipeline = RAGEvaluationPipeline(
metrics_collector=metrics_collector,
logger=logger
)
# Setup RAGAS evaluator
ragas_evaluator = RagasEvaluator(
metrics=[
"context_precision",
"context_recall",
"faithfulness",
"answer_relevancy"
]
)
# Setup TruLens evaluator
trulens_evaluator = TruLensEvaluator(
feedback_functions=[
"answer_relevance",
"context_relevance",
"groundedness"
]
)
# Add evaluators to pipeline
eval_pipeline.add_evaluator(ragas_evaluator)
eval_pipeline.add_evaluator(trulens_evaluator)
# Evaluate RAG response
evaluation_result = await eval_pipeline.evaluate_response(
query="What is machine learning?",
answer="Machine learning is a subset of AI...",
context=["ML is a method of data analysis...", "AI includes ML and other techniques..."],
ground_truth="Machine learning is a subset of artificial intelligence..."
)
# Record evaluation metrics
eval_pipeline.record_evaluation_metrics(evaluation_result)
# Get evaluation dashboard data
dashboard_data = eval_pipeline.get_evaluation_dashboard_data()
print(f"Average Quality Score: {dashboard_data['avg_quality_score']:.2f}")
print(f"Context Precision: {dashboard_data['context_precision']:.2f}")
print(f"Faithfulness: {dashboard_data['faithfulness']:.2f}")
Complete Implementation
Step 1: Install Dependencies
# Install observability packages
pip install opentelemetry-api opentelemetry-sdk
pip install opentelemetry-instrumentation-fastapi
pip install opentelemetry-instrumentation-requests
pip install opentelemetry-instrumentation-psycopg2
pip install opentelemetry-instrumentation-redis
pip install opentelemetry-exporter-jaeger
pip install prometheus-client
pip install structlog
# Install evaluation frameworks (optional)
pip install ragas trulens-eval
# Install monitoring tools
pip install aiohttp asyncio
Step 2: Setup Infrastructure
# docker-compose.observability.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-storage:/var/lib/grafana
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_OTLP_ENABLED=true
volumes:
grafana-storage:
Step 3: Configure Prometheus
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'recoagent-api'
static_configs:
- targets: ['host.docker.internal:8000']
metrics_path: '/metrics/prometheus'
scrape_interval: 5s
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
rule_files:
- "rules/*.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
Step 4: Setup FastAPI Application
from fastapi import FastAPI
from packages.observability import ObservabilitySetup
# Create FastAPI app
app = FastAPI(title="RecoAgent API", version="1.0.0")
# Initialize observability
obs_setup = ObservabilitySetup(service_name="recoagent-rag")
# Setup observability for the app
obs_setup.setup_fastapi(app)
# Your existing routes
@app.post("/api/query")
async def query_endpoint(request: QueryRequest):
# This will be automatically instrumented
result = await rag_agent.process_query(request.query)
return result
# Start the application
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Step 5: Setup SLO Monitoring
from packages.observability import RAGSLORegistry
# Initialize SLO registry
slo_registry = RAGSLORegistry(obs_setup.get_metrics_collector())
# Initialize default SLOs
slo_registry.initialize_default_slos()
# Add custom SLOs for your use case
custom_slo = RAGSLODefinition(
name="business_metric_slo",
description="Custom business metric SLO",
metric_name="business_metric",
metric_query="avg(business_metric)",
thresholds=[
SLOThreshold(
name="target",
target=0.90,
window="5m",
description="90% target for business metric",
severity="warning"
)
]
)
slo_registry.register_slo(custom_slo)
Step 6: Setup Synthetic Monitoring
import asyncio
from packages.observability import SyntheticMonitor
async def setup_synthetic_monitoring():
# Initialize synthetic monitor
synthetic_monitor = SyntheticMonitor(
base_url="http://localhost:8000",
metrics_collector=obs_setup.get_metrics_collector(),
logger=obs_setup.get_logger(),
trace_collector=obs_setup.get_trace_collector()
)
# Start monitoring
await synthetic_monitor.start_monitoring()
# Keep running
try:
while True:
await asyncio.sleep(60)
except KeyboardInterrupt:
await synthetic_monitor.stop_monitoring()
# Run synthetic monitoring
asyncio.run(setup_synthetic_monitoring())
Step 7: Setup RAG Evaluation
from packages.observability import RAGEvaluationPipeline
# Initialize evaluation pipeline
eval_pipeline = RAGEvaluationPipeline(
metrics_collector=obs_setup.get_metrics_collector(),
logger=obs_setup.get_logger()
)
# Setup evaluators
if RAGAS_AVAILABLE:
ragas_evaluator = RagasEvaluator()
eval_pipeline.add_evaluator(ragas_evaluator)
if TRULENS_AVAILABLE:
trulens_evaluator = TruLensEvaluator()
eval_pipeline.add_evaluator(trulens_evaluator)
# Start evaluation pipeline
await eval_pipeline.start_evaluation_pipeline()
Grafana Dashboards
RAG Performance Dashboard
{
"dashboard": {
"title": "RAG Performance Dashboard",
"panels": [
{
"title": "Query Success Rate",
"type": "stat",
"targets": [
{
"expr": "sum(rate(rag_queries_total{status=\"success\"}[5m])) / sum(rate(rag_queries_total[5m]))"
}
]
},
{
"title": "Response Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Quality Scores",
"type": "graph",
"targets": [
{
"expr": "avg(rag_response_quality_score)"
}
]
}
]
}
}
SLO Dashboard
{
"dashboard": {
"title": "SLO Dashboard",
"panels": [
{
"title": "Error Budget Burn Rate",
"type": "graph",
"targets": [
{
"expr": "rate(slo_error_budget_burn_rate[1h])"
}
]
},
{
"title": "SLO Status",
"type": "table",
"targets": [
{
"expr": "slo_status"
}
]
}
]
}
}
Alerting Rules
Prometheus Alert Rules
# rules/rag-alerts.yml
groups:
- name: rag-alerts
rules:
- alert: HighErrorRate
expr: rate(rag_queries_total{status="error"}[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P95 latency is {{ $value }} seconds"
- alert: SLOBurnRate
expr: rate(slo_error_budget_burn_rate[5m]) > 2
for: 1m
labels:
severity: critical
annotations:
summary: "SLO burn rate exceeded"
description: "Error budget burn rate is {{ $value }}"
Best Practices
1. Metrics Design
- Use consistent naming: Follow Prometheus naming conventions
- Include labels: Add relevant dimensions for filtering
- Avoid high cardinality: Don't use user IDs as labels
- Use histograms: For latency and size metrics
2. SLO Definition
- Set realistic targets: Based on historical data
- Define error budgets: Allow for planned downtime
- Use multiple windows: Short and long-term monitoring
- Create runbooks: Document response procedures
3. Alerting Strategy
- Avoid alert fatigue: Set appropriate thresholds
- Use severity levels: Critical, warning, info
- Include context: Add relevant information to alerts
- Test alerts: Regularly verify alert functionality
4. Synthetic Testing
- Test critical paths: Focus on important user journeys
- Use realistic data: Test with production-like data
- Monitor from multiple locations: Test from different regions
- Regular updates: Keep tests current with application changes
Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Metrics not appearing | Prometheus not scraping | Check scrape configuration |
| High cardinality | Too many label values | Reduce label cardinality |
| Missing traces | Jaeger not receiving data | Check OTLP endpoint |
| SLO alerts not firing | Incorrect query | Validate SLO queries |
| Synthetic tests failing | Network issues | Check connectivity |
What You've Accomplished
✅ Implemented comprehensive observability stack
✅ Set up OpenTelemetry tracing and metrics
✅ Configured SLO monitoring with burn-rate alerting
✅ Deployed synthetic monitoring and chaos testing
✅ Integrated RAG evaluation with real-time dashboards
✅ Created Grafana dashboards and alerting rules
Next Steps
- 📊 Advanced Metrics - Create custom business metrics
- 🚨 Alert Management - Advanced alerting strategies
- 🔍 Troubleshooting - Debug observability issues
- 🚀 Production Deployment - Deploy to production
Your RAG system now has enterprise-grade observability! Monitor, trace, and optimize with confidence. 📊✨