Skip to main content

Implement Production Observability for RAG Systems

Difficulty: ⭐⭐⭐⭐ Expert | Time: 3 hours

🎯 The Problem

You need comprehensive observability for your RAG system in production - monitoring, tracing, alerting, and evaluation. Without proper observability, you can't detect issues, understand performance bottlenecks, or ensure your system meets quality standards.

This guide solves: Implementing end-to-end observability with OpenTelemetry tracing, Prometheus metrics, Grafana dashboards, SLO monitoring, synthetic testing, and RAG evaluation integration.

⚡ TL;DR - Quick Observability Setup

from packages.observability import ObservabilitySetup, RAGSLORegistry, SyntheticMonitor

# 1. Initialize observability
obs_setup = ObservabilitySetup(service_name="recoagent-rag")

# 2. Setup FastAPI app
obs_setup.setup_fastapi(app)

# 3. Configure SLOs
slo_registry = RAGSLORegistry(obs_setup.get_metrics_collector())
slo_registry.initialize_default_slos()

# 4. Setup synthetic monitoring
synthetic_monitor = SyntheticMonitor(
base_url="http://localhost:8000",
metrics_collector=obs_setup.get_metrics_collector(),
logger=obs_setup.get_logger(),
trace_collector=obs_setup.get_trace_collector()
)

# 5. Start monitoring
await synthetic_monitor.start_monitoring()

# Expected: Full observability stack with dashboards, alerts, and evaluation

Impact: Production-ready monitoring with 99.9% uptime visibility and quality assurance!


Full Observability Implementation Guide

Architecture Overview

Core Components

1. Observability Setup

The main orchestrator that sets up all observability components.

from packages.observability import ObservabilitySetup

# Initialize complete observability stack
obs_setup = ObservabilitySetup(service_name="recoagent-rag")

# Setup FastAPI application
obs_setup.setup_fastapi(app)

# Get individual components
metrics_collector = obs_setup.get_metrics_collector()
trace_collector = obs_setup.get_trace_collector()
logger = obs_setup.get_logger()

Features:

  • OpenTelemetry Integration: Automatic instrumentation
  • Correlation IDs: Request tracing across services
  • Health Checks: Built-in health monitoring
  • Metrics Endpoint: Prometheus metrics exposure

2. Metrics Collection

Comprehensive metrics for RAG system monitoring.

from packages.observability import MetricsCollector

# Initialize metrics collector
metrics = MetricsCollector()

# Track RAG-specific metrics
metrics.record_rag_query(
query_id="query_123",
latency_ms=850.0,
success=True,
model="gpt-4",
tokens_used=150
)

# Track business metrics
metrics.record_user_interaction(
user_id="user_456",
action="query",
session_id="session_789"
)

# Get metrics summary
summary = metrics.get_metrics_summary()
print(f"Total queries: {summary.total_queries}")
print(f"Average latency: {summary.avg_latency_ms}ms")
print(f"Success rate: {summary.success_rate:.2%}")

Key Metrics:

  • Performance: Latency, throughput, error rates
  • Business: User interactions, query patterns
  • Quality: RAG evaluation scores
  • Infrastructure: CPU, memory, disk usage

3. Distributed Tracing

End-to-end request tracing with OpenTelemetry.

from packages.observability import TraceCollector, RAGTracer

# Initialize trace collector
trace_collector = TraceCollector(service_name="recoagent-rag")

# Start a trace
with trace_collector.start_span("rag_query_processing") as span:
span.set_attribute("query", "What is machine learning?")
span.set_attribute("user_id", "user_123")

# Process query
result = await rag_agent.process_query(query)

span.set_attribute("response_length", len(result["response"]))
span.set_attribute("tokens_used", result["tokens_used"])

# RAG-specific tracer
rag_tracer = RAGTracer(trace_collector)

# Trace RAG pipeline
with rag_tracer.trace_rag_pipeline(query) as pipeline_span:
# Retrieval phase
with rag_tracer.trace_retrieval() as retrieval_span:
docs = await retriever.retrieve(query)
retrieval_span.set_attribute("documents_found", len(docs))

# Generation phase
with rag_tracer.trace_generation() as generation_span:
response = await generator.generate(query, docs)
generation_span.set_attribute("response_tokens", response["tokens"])

4. SLO Monitoring

Service Level Objectives with burn-rate alerting.

from packages.observability import RAGSLORegistry, SLOThreshold, BurnRateRule

# Initialize SLO registry
slo_registry = RAGSLORegistry(metrics_collector)

# Define custom SLO
custom_slo = RAGSLODefinition(
name="response_quality",
description="RAG response quality score",
metric_name="rag_response_quality_score",
metric_query="avg(rag_response_quality_score)",
thresholds=[
SLOThreshold(
name="target",
target=0.85, # 85% quality target
window="5m",
description="85% of responses should meet quality standards",
severity="warning"
)
],
burn_rate_rules=[
BurnRateRule(
name="quality_degradation",
slo_name="response_quality",
window="10m",
threshold=2.0, # 2x burn rate
severity="critical",
description="Quality degradation detected",
runbook_url="https://docs.recoagent.com/runbooks/quality-degradation"
)
]
)

# Register SLO
slo_registry.register_slo(custom_slo)

# Check SLO status
slo_status = slo_registry.get_slo_status("response_quality")
print(f"SLO Status: {slo_status.status}")
print(f"Current Value: {slo_status.current_value}")
print(f"Error Budget: {slo_status.error_budget_remaining}")

Default SLOs:

  • Query Success Rate: 99% success rate target
  • Response Latency: P95 < 2 seconds
  • Response Quality: 85% quality score
  • Availability: 99.9% uptime

5. Synthetic Monitoring

Automated testing and monitoring of your RAG system.

from packages.observability import SyntheticMonitor, SyntheticTest

# Initialize synthetic monitor
synthetic_monitor = SyntheticMonitor(
base_url="http://localhost:8000",
metrics_collector=metrics_collector,
logger=logger,
trace_collector=trace_collector
)

# Define custom test
custom_test = SyntheticTest(
name="rag_query_test",
description="Test RAG query endpoint",
endpoint="/api/query",
method="POST",
payload={
"query": "What is artificial intelligence?",
"user_id": "synthetic_user"
},
expected_status=200,
expected_latency_ms=5000,
frequency_seconds=300 # Every 5 minutes
)

# Add test
synthetic_monitor.add_test(custom_test)

# Start monitoring
await synthetic_monitor.start_monitoring()

# Check test results
results = synthetic_monitor.get_test_results("rag_query_test")
for result in results[-10:]: # Last 10 results
print(f"Test: {result.test_name}")
print(f"Success: {result.success}")
print(f"Latency: {result.latency_ms}ms")

Default Tests:

  • Health Check: Basic endpoint availability
  • RAG Query: End-to-end query processing
  • Authentication: Auth endpoint testing
  • Database: Database connectivity

6. RAG Evaluation Integration

Real-time RAG quality evaluation with RAGAS and TruLens.

from packages.observability import RAGEvaluationPipeline, RagasEvaluator, TruLensEvaluator

# Initialize evaluation pipeline
eval_pipeline = RAGEvaluationPipeline(
metrics_collector=metrics_collector,
logger=logger
)

# Setup RAGAS evaluator
ragas_evaluator = RagasEvaluator(
metrics=[
"context_precision",
"context_recall",
"faithfulness",
"answer_relevancy"
]
)

# Setup TruLens evaluator
trulens_evaluator = TruLensEvaluator(
feedback_functions=[
"answer_relevance",
"context_relevance",
"groundedness"
]
)

# Add evaluators to pipeline
eval_pipeline.add_evaluator(ragas_evaluator)
eval_pipeline.add_evaluator(trulens_evaluator)

# Evaluate RAG response
evaluation_result = await eval_pipeline.evaluate_response(
query="What is machine learning?",
answer="Machine learning is a subset of AI...",
context=["ML is a method of data analysis...", "AI includes ML and other techniques..."],
ground_truth="Machine learning is a subset of artificial intelligence..."
)

# Record evaluation metrics
eval_pipeline.record_evaluation_metrics(evaluation_result)

# Get evaluation dashboard data
dashboard_data = eval_pipeline.get_evaluation_dashboard_data()
print(f"Average Quality Score: {dashboard_data['avg_quality_score']:.2f}")
print(f"Context Precision: {dashboard_data['context_precision']:.2f}")
print(f"Faithfulness: {dashboard_data['faithfulness']:.2f}")

Complete Implementation

Step 1: Install Dependencies

# Install observability packages
pip install opentelemetry-api opentelemetry-sdk
pip install opentelemetry-instrumentation-fastapi
pip install opentelemetry-instrumentation-requests
pip install opentelemetry-instrumentation-psycopg2
pip install opentelemetry-instrumentation-redis
pip install opentelemetry-exporter-jaeger
pip install prometheus-client
pip install structlog

# Install evaluation frameworks (optional)
pip install ragas trulens-eval

# Install monitoring tools
pip install aiohttp asyncio

Step 2: Setup Infrastructure

# docker-compose.observability.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'

grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-storage:/var/lib/grafana

jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_OTLP_ENABLED=true

volumes:
grafana-storage:

Step 3: Configure Prometheus

# prometheus.yml
global:
scrape_interval: 15s

scrape_configs:
- job_name: 'recoagent-api'
static_configs:
- targets: ['host.docker.internal:8000']
metrics_path: '/metrics/prometheus'
scrape_interval: 5s

- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']

rule_files:
- "rules/*.yml"

alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093

Step 4: Setup FastAPI Application

from fastapi import FastAPI
from packages.observability import ObservabilitySetup

# Create FastAPI app
app = FastAPI(title="RecoAgent API", version="1.0.0")

# Initialize observability
obs_setup = ObservabilitySetup(service_name="recoagent-rag")

# Setup observability for the app
obs_setup.setup_fastapi(app)

# Your existing routes
@app.post("/api/query")
async def query_endpoint(request: QueryRequest):
# This will be automatically instrumented
result = await rag_agent.process_query(request.query)
return result

# Start the application
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Step 5: Setup SLO Monitoring

from packages.observability import RAGSLORegistry

# Initialize SLO registry
slo_registry = RAGSLORegistry(obs_setup.get_metrics_collector())

# Initialize default SLOs
slo_registry.initialize_default_slos()

# Add custom SLOs for your use case
custom_slo = RAGSLODefinition(
name="business_metric_slo",
description="Custom business metric SLO",
metric_name="business_metric",
metric_query="avg(business_metric)",
thresholds=[
SLOThreshold(
name="target",
target=0.90,
window="5m",
description="90% target for business metric",
severity="warning"
)
]
)

slo_registry.register_slo(custom_slo)

Step 6: Setup Synthetic Monitoring

import asyncio
from packages.observability import SyntheticMonitor

async def setup_synthetic_monitoring():
# Initialize synthetic monitor
synthetic_monitor = SyntheticMonitor(
base_url="http://localhost:8000",
metrics_collector=obs_setup.get_metrics_collector(),
logger=obs_setup.get_logger(),
trace_collector=obs_setup.get_trace_collector()
)

# Start monitoring
await synthetic_monitor.start_monitoring()

# Keep running
try:
while True:
await asyncio.sleep(60)
except KeyboardInterrupt:
await synthetic_monitor.stop_monitoring()

# Run synthetic monitoring
asyncio.run(setup_synthetic_monitoring())

Step 7: Setup RAG Evaluation

from packages.observability import RAGEvaluationPipeline

# Initialize evaluation pipeline
eval_pipeline = RAGEvaluationPipeline(
metrics_collector=obs_setup.get_metrics_collector(),
logger=obs_setup.get_logger()
)

# Setup evaluators
if RAGAS_AVAILABLE:
ragas_evaluator = RagasEvaluator()
eval_pipeline.add_evaluator(ragas_evaluator)

if TRULENS_AVAILABLE:
trulens_evaluator = TruLensEvaluator()
eval_pipeline.add_evaluator(trulens_evaluator)

# Start evaluation pipeline
await eval_pipeline.start_evaluation_pipeline()

Grafana Dashboards

RAG Performance Dashboard

{
"dashboard": {
"title": "RAG Performance Dashboard",
"panels": [
{
"title": "Query Success Rate",
"type": "stat",
"targets": [
{
"expr": "sum(rate(rag_queries_total{status=\"success\"}[5m])) / sum(rate(rag_queries_total[5m]))"
}
]
},
{
"title": "Response Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Quality Scores",
"type": "graph",
"targets": [
{
"expr": "avg(rag_response_quality_score)"
}
]
}
]
}
}

SLO Dashboard

{
"dashboard": {
"title": "SLO Dashboard",
"panels": [
{
"title": "Error Budget Burn Rate",
"type": "graph",
"targets": [
{
"expr": "rate(slo_error_budget_burn_rate[1h])"
}
]
},
{
"title": "SLO Status",
"type": "table",
"targets": [
{
"expr": "slo_status"
}
]
}
]
}
}

Alerting Rules

Prometheus Alert Rules

# rules/rag-alerts.yml
groups:
- name: rag-alerts
rules:
- alert: HighErrorRate
expr: rate(rag_queries_total{status="error"}[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"

- alert: HighLatency
expr: histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P95 latency is {{ $value }} seconds"

- alert: SLOBurnRate
expr: rate(slo_error_budget_burn_rate[5m]) > 2
for: 1m
labels:
severity: critical
annotations:
summary: "SLO burn rate exceeded"
description: "Error budget burn rate is {{ $value }}"

Best Practices

1. Metrics Design

  • Use consistent naming: Follow Prometheus naming conventions
  • Include labels: Add relevant dimensions for filtering
  • Avoid high cardinality: Don't use user IDs as labels
  • Use histograms: For latency and size metrics

2. SLO Definition

  • Set realistic targets: Based on historical data
  • Define error budgets: Allow for planned downtime
  • Use multiple windows: Short and long-term monitoring
  • Create runbooks: Document response procedures

3. Alerting Strategy

  • Avoid alert fatigue: Set appropriate thresholds
  • Use severity levels: Critical, warning, info
  • Include context: Add relevant information to alerts
  • Test alerts: Regularly verify alert functionality

4. Synthetic Testing

  • Test critical paths: Focus on important user journeys
  • Use realistic data: Test with production-like data
  • Monitor from multiple locations: Test from different regions
  • Regular updates: Keep tests current with application changes

Troubleshooting

ProblemCauseSolution
Metrics not appearingPrometheus not scrapingCheck scrape configuration
High cardinalityToo many label valuesReduce label cardinality
Missing tracesJaeger not receiving dataCheck OTLP endpoint
SLO alerts not firingIncorrect queryValidate SLO queries
Synthetic tests failingNetwork issuesCheck connectivity

What You've Accomplished

Implemented comprehensive observability stack
Set up OpenTelemetry tracing and metrics
Configured SLO monitoring with burn-rate alerting
Deployed synthetic monitoring and chaos testing
Integrated RAG evaluation with real-time dashboards
Created Grafana dashboards and alerting rules

Next Steps


Your RAG system now has enterprise-grade observability! Monitor, trace, and optimize with confidence. 📊✨