Skip to main content

RAG Integration Guide

Step-by-Step Implementation Guide for Production RAG Systems


๐ŸŽฏ Overviewโ€‹

This guide provides comprehensive instructions for implementing a production-ready RAG system. It covers everything from basic setup to advanced optimizations, with practical examples and best practices.

Implementation Pathsโ€‹

  • ๐Ÿš€ Quick Start: Basic RAG system in 30 minutes
  • โšก Advanced Setup: Full-featured system in 2-4 hours
  • ๐Ÿข Enterprise Setup: Complete enterprise deployment in 1-2 days

๐Ÿš€ Quick Start (30 minutes)โ€‹

Prerequisitesโ€‹

  • Python 3.8+ installed
  • Basic understanding of Python and APIs
  • Access to at least one LLM provider (OpenAI, Anthropic, or Google)

Step 1: Install Dependenciesโ€‹

# Create virtual environment
python -m venv rag-env
source rag-env/bin/activate # On Windows: rag-env\Scripts\activate

# Install core dependencies
pip install recoagent
pip install openai anthropic google-generativeai
pip install redis # For caching

Step 2: Basic Configurationโ€‹

Create config.py:

import os
from recoagent.config import RAGConfig

# Basic configuration
config = RAGConfig(
# LLM Provider
llm_provider="openai",
openai_api_key=os.getenv("OPENAI_API_KEY"),

# Vector Store
vector_store="chroma",
vector_store_path="./vector_db",

# Basic settings
chunk_size=1000,
chunk_overlap=200,
top_k=5,
temperature=0.7
)

# Initialize RAG system
from recoagent import RAGSystem
rag = RAGSystem(config)

Step 3: Load Documentsโ€‹

# Load documents
documents = [
"Your document content here...",
"Another document...",
# Add more documents
]

# Process and index documents
rag.add_documents(documents)
print("Documents indexed successfully!")

Step 4: Basic Queryโ€‹

# Simple query
query = "What is the main topic of the documents?"
response = rag.query(query)
print(f"Answer: {response['answer']}")
print(f"Sources: {response['sources']}")

Step 5: Test Your Systemโ€‹

# Test with different queries
test_queries = [
"Summarize the key points",
"What are the main benefits?",
"How does this work?"
]

for query in test_queries:
response = rag.query(query)
print(f"Q: {query}")
print(f"A: {response['answer']}\n")

๐ŸŽ‰ Congratulations! You now have a basic RAG system running.


โšก Advanced Setup (2-4 hours)โ€‹

Step 1: Multi-LLM Provider Setupโ€‹

Update config.py:

from recoagent.config import RAGConfig, MultiLLMConfig

# Multi-LLM configuration
multi_llm_config = MultiLLMConfig(
providers={
"openai": {
"api_key": os.getenv("OPENAI_API_KEY"),
"model": "gpt-4",
"cost_per_token": 0.01
},
"anthropic": {
"api_key": os.getenv("ANTHROPIC_API_KEY"),
"model": "claude-3-sonnet-20240229",
"cost_per_token": 0.003
},
"google": {
"api_key": os.getenv("GOOGLE_API_KEY"),
"model": "gemini-pro",
"cost_per_token": 0.0005
}
},
routing_strategy="cost_optimized", # or "quality", "latency", "manual"
fallback_enabled=True
)

config = RAGConfig(
multi_llm=multi_llm_config,
# ... other settings
)

Step 2: Advanced Retrieval Setupโ€‹

from recoagent.retrieval import HybridRetriever, ColBERTReranker

# Configure hybrid retrieval
retriever = HybridRetriever(
vector_store="chroma",
bm25_weight=0.3,
semantic_weight=0.7,
top_k=20
)

# Add ColBERT reranking
reranker = ColBERTReranker(
model_name="colbert-ir/colbertv2.0",
top_k=5
)

config.retriever = retriever
config.reranker = reranker

Step 3: Prompt Compressionโ€‹

from recoagent.optimization import PromptCompressor

# Configure prompt compression
compressor = PromptCompressor(
model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
compression_ratio=0.5, # 50% compression
quality_threshold=0.9 # 90% quality preservation
)

config.prompt_compressor = compressor

Step 4: Semantic Cachingโ€‹

from recoagent.caching import SemanticCache

# Configure semantic caching
cache = SemanticCache(
backend="redis",
host="localhost",
port=6379,
similarity_threshold=0.85,
ttl=3600 # 1 hour
)

config.cache = cache

Step 5: Advanced RAG Systemโ€‹

# Initialize advanced RAG system
rag = RAGSystem(config)

# Load documents with metadata
documents_with_metadata = [
{
"content": "Document content...",
"metadata": {
"title": "Document Title",
"author": "Author Name",
"date": "2024-01-01",
"category": "Technical"
}
}
]

rag.add_documents(documents_with_metadata)

Step 6: Advanced Queryingโ€‹

# Advanced query with options
response = rag.query(
query="What are the main benefits?",
options={
"use_cache": True,
"compression_enabled": True,
"reranking_enabled": True,
"max_tokens": 1000,
"temperature": 0.7
}
)

print(f"Answer: {response['answer']}")
print(f"Sources: {response['sources']}")
print(f"Metadata: {response['metadata']}")
print(f"Cost: ${response['cost']}")
print(f"Latency: {response['latency']}ms")

๐Ÿข Enterprise Setup (1-2 days)โ€‹

Step 1: Infrastructure Setupโ€‹

Docker Compose Configurationโ€‹

Create docker-compose.yml:

version: '3.8'

services:
rag-api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- VECTOR_DB_URL=postgresql://postgres:password@postgres:5432/vectordb
depends_on:
- redis
- postgres
- elasticsearch

redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data

postgres:
image: postgres:15
environment:
POSTGRES_DB: vectordb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data

elasticsearch:
image: elasticsearch:8.8.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data

monitoring:
image: prometheus/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml

volumes:
redis_data:
postgres_data:
es_data:

Step 2: Enterprise Configurationโ€‹

Create enterprise_config.py:

from recoagent.config import EnterpriseRAGConfig
from recoagent.security import SecurityConfig
from recoagent.monitoring import MonitoringConfig

# Enterprise configuration
config = EnterpriseRAGConfig(
# Multi-LLM with failover
multi_llm=MultiLLMConfig(
providers={
"openai": {"api_key": os.getenv("OPENAI_API_KEY")},
"anthropic": {"api_key": os.getenv("ANTHROPIC_API_KEY")},
"google": {"api_key": os.getenv("GOOGLE_API_KEY")}
},
routing_strategy="cost_optimized",
health_check_interval=30,
failover_threshold=3
),

# Security configuration
security=SecurityConfig(
authentication_required=True,
rate_limiting_enabled=True,
data_encryption=True,
audit_logging=True
),

# Monitoring configuration
monitoring=MonitoringConfig(
metrics_enabled=True,
tracing_enabled=True,
alerting_enabled=True,
dashboard_url="http://localhost:3000"
),

# Advanced features
prompt_compression=True,
semantic_caching=True,
colbert_reranking=True,
dspy_optimization=True
)

Step 3: API Server Setupโ€‹

Create app.py:

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from recoagent import EnterpriseRAGSystem
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI(
title="RAG API",
description="Production RAG System API",
version="1.0.0"
)

# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"]
)

# Initialize RAG system
rag_system = EnterpriseRAGSystem(config)

@app.post("/query")
async def query_documents(request: QueryRequest):
"""Query documents and get AI-generated responses."""
try:
response = await rag_system.query_async(
query=request.query,
options=request.options
)
return response
except Exception as e:
logger.error(f"Query error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

@app.post("/documents")
async def add_documents(request: DocumentRequest):
"""Add documents to the knowledge base."""
try:
result = await rag_system.add_documents_async(
documents=request.documents,
metadata=request.metadata
)
return {"status": "success", "documents_added": result}
except Exception as e:
logger.error(f"Document addition error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "timestamp": datetime.utcnow()}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Step 4: Monitoring Setupโ€‹

Create monitoring.py:

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Metrics
query_counter = Counter('rag_queries_total', 'Total number of queries')
query_duration = Histogram('rag_query_duration_seconds', 'Query duration')
cache_hit_rate = Gauge('rag_cache_hit_rate', 'Cache hit rate')
cost_per_query = Histogram('rag_cost_per_query', 'Cost per query')

class MetricsCollector:
def __init__(self):
self.query_count = 0
self.cache_hits = 0
self.total_cost = 0.0

def record_query(self, duration, cost, cache_hit=False):
query_counter.inc()
query_duration.observe(duration)
cost_per_query.observe(cost)

if cache_hit:
self.cache_hits += 1

self.query_count += 1
self.total_cost += cost

# Update cache hit rate
if self.query_count > 0:
cache_hit_rate.set(self.cache_hits / self.query_count)

# Start metrics server
start_http_server(8001)

Step 5: Deploymentโ€‹

Kubernetes Deploymentโ€‹

Create k8s-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-api
spec:
replicas: 3
selector:
matchLabels:
app: rag-api
template:
metadata:
labels:
app: rag-api
spec:
containers:
- name: rag-api
image: rag-api:latest
ports:
- containerPort: 8000
env:
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: VECTOR_DB_URL
value: "postgresql://postgres:password@postgres-service:5432/vectordb"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: rag-api-service
spec:
selector:
app: rag-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer

๐Ÿ”ง Configuration Optionsโ€‹

Environment Variablesโ€‹

# LLM Providers
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
GOOGLE_API_KEY=your_google_key

# Database
REDIS_URL=redis://localhost:6379
VECTOR_DB_URL=postgresql://user:password@localhost:5432/vectordb

# Monitoring
PROMETHEUS_URL=http://localhost:9090
GRAFANA_URL=http://localhost:3000

# Security
JWT_SECRET=your_jwt_secret
ENCRYPTION_KEY=your_encryption_key

# Performance
MAX_CONCURRENT_QUERIES=100
CACHE_TTL=3600
RATE_LIMIT_PER_MINUTE=1000

Configuration Filesโ€‹

config.yaml:โ€‹

rag:
llm:
providers:
openai:
api_key: ${OPENAI_API_KEY}
model: gpt-4
max_tokens: 2000
anthropic:
api_key: ${ANTHROPIC_API_KEY}
model: claude-3-sonnet-20240229
max_tokens: 2000
google:
api_key: ${GOOGLE_API_KEY}
model: gemini-pro
max_tokens: 2000

routing:
strategy: cost_optimized
fallback_enabled: true
health_check_interval: 30

retrieval:
vector_store: chroma
embedding_model: sentence-transformers/all-MiniLM-L6-v2
chunk_size: 1000
chunk_overlap: 200
top_k: 5

optimization:
prompt_compression: true
semantic_caching: true
colbert_reranking: true
dspy_optimization: true

security:
authentication_required: true
rate_limiting_enabled: true
data_encryption: true
audit_logging: true

monitoring:
metrics_enabled: true
tracing_enabled: true
alerting_enabled: true

๐Ÿงช Testing Your Implementationโ€‹

Unit Testsโ€‹

Create test_rag.py:

import pytest
from recoagent import RAGSystem
from recoagent.config import RAGConfig

@pytest.fixture
def rag_system():
config = RAGConfig(
llm_provider="openai",
openai_api_key="test_key",
vector_store="chroma",
vector_store_path=":memory:"
)
return RAGSystem(config)

def test_document_indexing(rag_system):
"""Test document indexing functionality."""
documents = ["Test document 1", "Test document 2"]
result = rag_system.add_documents(documents)
assert result["status"] == "success"
assert result["documents_added"] == 2

def test_query_processing(rag_system):
"""Test query processing functionality."""
rag_system.add_documents(["This is a test document about AI."])
response = rag_system.query("What is this document about?")
assert "answer" in response
assert "sources" in response
assert len(response["sources"]) > 0

def test_cache_functionality(rag_system):
"""Test caching functionality."""
query = "Test query"
response1 = rag_system.query(query)
response2 = rag_system.query(query)

# Second query should be faster (cached)
assert response2["latency"] < response1["latency"]

Integration Testsโ€‹

import requests
import time

def test_api_integration():
"""Test API integration."""
base_url = "http://localhost:8000"

# Test health check
response = requests.get(f"{base_url}/health")
assert response.status_code == 200

# Test document addition
doc_data = {
"documents": ["Test document"],
"metadata": {"title": "Test"}
}
response = requests.post(f"{base_url}/documents", json=doc_data)
assert response.status_code == 200

# Test query
query_data = {"query": "What is this about?"}
response = requests.post(f"{base_url}/query", json=query_data)
assert response.status_code == 200
assert "answer" in response.json()

Performance Testsโ€‹

import time
import concurrent.futures

def test_performance():
"""Test system performance under load."""
def make_query(query_id):
start_time = time.time()
response = rag_system.query(f"Test query {query_id}")
end_time = time.time()
return {
"query_id": query_id,
"latency": end_time - start_time,
"success": "answer" in response
}

# Test concurrent queries
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(make_query, i) for i in range(100)]
results = [future.result() for future in futures]

# Analyze results
successful_queries = [r for r in results if r["success"]]
avg_latency = sum(r["latency"] for r in successful_queries) / len(successful_queries)

assert len(successful_queries) >= 95 # 95% success rate
assert avg_latency < 2.0 # Average latency under 2 seconds

๐Ÿš€ Deployment Checklistโ€‹

Pre-deploymentโ€‹

  • All tests passing
  • Configuration validated
  • Security review completed
  • Performance benchmarks met
  • Documentation updated

Deploymentโ€‹

  • Infrastructure provisioned
  • Application deployed
  • Database migrations run
  • Monitoring configured
  • Health checks passing

Post-deploymentโ€‹

  • Smoke tests executed
  • Performance monitoring active
  • Alerting configured
  • Backup procedures tested
  • Rollback plan ready


Ready to deploy? Follow the deployment checklist and start with the Quick Start guide for immediate results.