Integrate with OpenSearch
Difficulty: āā Intermediate | Time: 1-1.5 hours
šÆ The Problemā
You need production-grade vector storage with hybrid search capabilities, but setting up OpenSearch seems complex. You're not sure about cluster configuration, index mappings, or how to optimize for RAG workloads.
This guide solves: Getting OpenSearch fully integrated with RecoAgent for production use, including hybrid search, proper indexing, and performance tuning.
ā” TL;DR - Quick Startā
# 1. Start OpenSearch with docker-compose
docker-compose -f docker-compose.opensearch.yml up -d
# 2. Connect from Python
from packages.rag.stores import OpenSearchStore
store = OpenSearchStore(
endpoint="http://localhost:9200",
index_name="knowledge_base",
hybrid_search=True # Enable BM25 + vector search
)
# 3. Index documents
docs = ["Doc 1 content", "Doc 2 content"]
for i, doc in enumerate(docs):
store.add_document(f"doc_{i}", doc)
# 4. Hybrid search
results = store.hybrid_search("query", k=5)
print(f"ā
Hybrid search working! Found {len(results)} results")
Expected: Hybrid search returns ranked results combining BM25 and vector scores!
Full Integration Guideā
This guide will walk you through integrating RecoAgent with OpenSearch for vector storage, including setup, configuration, and optimization.
Why OpenSearch?ā
OpenSearch is an open-source search and analytics suite that provides excellent vector search capabilities. It's ideal for RecoAgent because it supports:
- Hybrid search combining BM25 and vector similarity
- Scalability for large document collections
- Real-time indexing for dynamic content
- Rich querying with complex filters and aggregations
Prerequisitesā
- Python 3.8+
- RecoAgent installed
- Docker (for local setup) or OpenSearch cluster
- Basic understanding of search concepts
Step 1: OpenSearch Setupā
Option A: Local Development with Dockerā
Create Docker Compose Configuration:
# docker-compose.opensearch.yml
version: '3.8'
services:
opensearch:
image: opensearchproject/opensearch:2.11.0
container_name: opensearch
environment:
- discovery.type=single-node
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g"
- "DISABLE_INSTALL_DEMO_CONFIG=true"
- "DISABLE_SECURITY_PLUGIN=true"
- "plugins.security.disabled=true"
ports:
- 9200:9200
- 9600:9600
volumes:
- opensearch-data:/usr/share/opensearch/data
- ./opensearch.yml:/usr/share/opensearch/config/opensearch.yml
networks:
- opensearch-net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2.11.0
container_name: opensearch-dashboards
ports:
- 5601:5601
environment:
- 'OPENSEARCH_HOSTS=["http://opensearch:9200"]'
- "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
depends_on:
- opensearch
networks:
- opensearch-net
volumes:
opensearch-data:
networks:
opensearch-net:
driver: bridge
Start OpenSearch:
# Start OpenSearch cluster
docker-compose -f docker-compose.opensearch.yml up -d
# Verify cluster is running
curl -X GET "localhost:9200/_cluster/health?pretty"
# Expected output:
# {
# "cluster_name" : "docker-cluster",
# "status" : "green",
# "timed_out" : false,
# "number_of_nodes" : 1,
# "number_of_data_nodes" : 1,
# "active_primary_shards" : 0,
# "active_shards" : 0,
# "relocating_shards" : 0,
# "initializing_shards" : 0,
# "unassigned_shards" : 0,
# "delayed_unassigned_shards" : 0,
# "number_of_pending_tasks" : 0,
# "number_of_in_flight_fetch" : 0,
# "task_max_waiting_in_queue_millis" : 0,
# "active_shards_percent_as_number" : 100.0
# }
Option B: OpenSearch Cloudā
AWS OpenSearch Service:
# Create OpenSearch domain using AWS CLI
aws opensearch create-domain \
--domain-name recoagent-search \
--cluster-config InstanceType=t3.small.search,InstanceCount=1 \
--ebs-options EBSEnabled=true,VolumeType=gp2,VolumeSize=10 \
--access-policies '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:123456789012:domain/recoagent-search/*"
}
]
}'
Self-Managed OpenSearch:
# Download OpenSearch
wget https://artifacts.opensearch.org/releases/bundle/opensearch/2.11.0/opensearch-2.11.0-linux-x64.tar.gz
tar -xzf opensearch-2.11.0-linux-x64.tar.gz
cd opensearch-2.11.0
# Configure OpenSearch
cat > config/opensearch.yml << EOF
cluster.name: recoagent-cluster
node.name: node-1
path.data: /var/lib/opensearch
path.logs: /var/log/opensearch
network.host: 0.0.0.0
discovery.type: single-node
plugins.security.disabled: true
EOF
# Start OpenSearch
./bin/opensearch
Step 2: Configure RecoAgent for OpenSearchā
Basic Configurationā
# opensearch_config.py
import os
from dotenv import load_dotenv
from recoagent import RecoAgent
from recoagent.stores import OpenSearchStore
# Load environment variables
load_dotenv()
def setup_opensearch_agent():
"""Set up RecoAgent with OpenSearch."""
# OpenSearch configuration
opensearch_config = {
"host": os.getenv("OPENSEARCH_URL", "localhost"),
"port": int(os.getenv("OPENSEARCH_PORT", "9200")),
"index_name": os.getenv("OPENSEARCH_INDEX_NAME", "recoagent_kb"),
"embedding_dimension": 1536, # OpenAI ada-002 dimension
"similarity": "cosine"
}
# Initialize RecoAgent with OpenSearch
agent = RecoAgent(
llm_provider="openai",
llm_model="gpt-3.5-turbo",
embedding_model="text-embedding-ada-002",
vector_store_config=opensearch_config,
chunk_size=500,
chunk_overlap=50
)
return agent
# Test the configuration
if __name__ == "__main__":
agent = setup_opensearch_agent()
print("ā
RecoAgent configured with OpenSearch")
Advanced Configurationā
# advanced_opensearch_config.py
from recoagent.stores import OpenSearchStore
from recoagent.config import VectorStoreConfig
def setup_advanced_opensearch():
"""Set up advanced OpenSearch configuration."""
# Advanced OpenSearch configuration
vector_store_config = VectorStoreConfig(
store_type="opensearch",
host="localhost",
port=9200,
index_name="recoagent_advanced",
embedding_dimension=1536,
similarity="cosine",
# Index settings
index_settings={
"number_of_shards": 2,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"custom_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop", "snowball"]
}
}
}
},
# Mapping settings
mapping_settings={
"properties": {
"content": {
"type": "text",
"analyzer": "custom_analyzer"
},
"embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "nmslib",
"parameters": {
"ef_construction": 128,
"m": 24
}
}
},
"metadata": {
"type": "object",
"properties": {
"source": {"type": "keyword"},
"created_at": {"type": "date"},
"category": {"type": "keyword"}
}
}
}
},
# Connection settings
connection_settings={
"timeout": 30,
"max_retries": 3,
"retry_on_timeout": True,
"verify_certs": False, # Set to True for production
"ssl_show_warn": False
}
)
# Initialize RecoAgent
agent = RecoAgent(
vector_store_config=vector_store_config,
llm_provider="openai",
embedding_model="text-embedding-ada-002"
)
return agent
Step 3: Create and Configure Indexā
Basic Index Creationā
# create_index.py
from recoagent.stores import OpenSearchStore
def create_basic_index():
"""Create a basic OpenSearch index for RecoAgent."""
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_basic",
embedding_dimension=1536
)
# Create index with default settings
store.create_index()
print("ā
Basic index created successfully")
if __name__ == "__main__":
create_basic_index()
Advanced Index Creationā
# create_advanced_index.py
from recoagent.stores import OpenSearchStore
def create_advanced_index():
"""Create an advanced OpenSearch index with custom settings."""
# Custom index settings
index_settings = {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"index.knn": True,
"analysis": {
"analyzer": {
"custom_text_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"stop",
"snowball",
"synonym_filter"
]
}
},
"filter": {
"synonym_filter": {
"type": "synonym",
"synonyms": [
"car,automobile,vehicle",
"phone,mobile,cellphone"
]
}
}
}
},
"mappings": {
"properties": {
"content": {
"type": "text",
"analyzer": "custom_text_analyzer",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "nmslib",
"parameters": {
"ef_construction": 128,
"m": 24
}
}
},
"metadata": {
"type": "object",
"properties": {
"source": {"type": "keyword"},
"created_at": {"type": "date"},
"category": {"type": "keyword"},
"tags": {"type": "keyword"},
"priority": {"type": "integer"}
}
}
}
}
}
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced",
embedding_dimension=1536,
index_settings=index_settings
)
# Create index with custom settings
store.create_index()
print("ā
Advanced index created successfully")
if __name__ == "__main__":
create_advanced_index()
Step 4: Load Documentsā
Basic Document Loadingā
# load_documents.py
from recoagent import RecoAgent
import os
from dotenv import load_dotenv
load_dotenv()
def load_sample_documents():
"""Load sample documents into OpenSearch."""
# Initialize agent with OpenSearch
agent = RecoAgent(
llm_provider="openai",
embedding_model="text-embedding-ada-002",
vector_store_config={
"store_type": "opensearch",
"host": "localhost",
"port": 9200,
"index_name": "recoagent_basic"
}
)
# Sample documents
documents = [
"RecoAgent is an enterprise RAG platform built with LangGraph and LangChain.",
"It supports hybrid retrieval combining BM25 and vector search for better results.",
"The platform includes built-in evaluation with RAGAS metrics for continuous improvement.",
"OpenSearch integration provides scalable vector storage with real-time indexing.",
"RecoAgent supports multiple vector stores including OpenSearch, Azure AI Search, and Vertex AI."
]
# Add documents
agent.add_documents(documents)
print(f"ā
Loaded {len(documents)} documents into OpenSearch")
return agent
if __name__ == "__main__":
agent = load_sample_documents()
# Test query
response = agent.ask("What is RecoAgent?")
print(f"Query: What is RecoAgent?")
print(f"Answer: {response.answer}")
print(f"Sources: {len(response.sources)} documents")
Advanced Document Loading with Metadataā
# load_documents_advanced.py
from recoagent import RecoAgent
from recoagent.stores import OpenSearchStore
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
def load_documents_with_metadata():
"""Load documents with rich metadata."""
# Initialize OpenSearch store
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced",
embedding_dimension=1536
)
# Documents with metadata
documents_with_metadata = [
{
"content": "RecoAgent provides enterprise-grade RAG capabilities with LangGraph orchestration.",
"metadata": {
"source": "product_docs.pdf",
"created_at": datetime.now().isoformat(),
"category": "product",
"tags": ["rag", "enterprise", "langgraph"],
"priority": 1
}
},
{
"content": "Hybrid retrieval combines BM25 keyword search with vector similarity for optimal results.",
"metadata": {
"source": "technical_spec.md",
"created_at": datetime.now().isoformat(),
"category": "technical",
"tags": ["retrieval", "bm25", "vector", "search"],
"priority": 2
}
},
{
"content": "OpenSearch integration enables scalable vector storage with real-time indexing capabilities.",
"metadata": {
"source": "integration_guide.md",
"created_at": datetime.now().isoformat(),
"category": "integration",
"tags": ["opensearch", "vector", "storage", "scalability"],
"priority": 1
}
}
]
# Add documents with metadata
for doc in documents_with_metadata:
store.add_document(
content=doc["content"],
metadata=doc["metadata"]
)
print(f"ā
Loaded {len(documents_with_metadata)} documents with metadata")
return store
if __name__ == "__main__":
store = load_documents_with_metadata()
# Test search with metadata filters
results = store.search(
query="What is RecoAgent?",
top_k=5,
filters={"category": "product"}
)
print(f"Found {len(results)} results for category 'product'")
for result in results:
print(f"- {result.document[:50]}... (Score: {result.score:.3f})")
Step 5: Implement Hybrid Searchā
Basic Hybrid Searchā
# hybrid_search.py
from recoagent.retrievers import HybridRetriever
from recoagent.stores import OpenSearchStore
def setup_hybrid_search():
"""Set up hybrid search with OpenSearch."""
# Initialize OpenSearch store
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced",
embedding_dimension=1536
)
# Initialize hybrid retriever
retriever = HybridRetriever(
vector_store=store,
vector_weight=0.7, # 70% vector, 30% BM25
bm25_weight=0.3,
fusion_method="rrf", # Reciprocal Rank Fusion
top_k=20 # Retrieve top 20 from each method
)
return retriever
def test_hybrid_search():
"""Test hybrid search functionality."""
retriever = setup_hybrid_search()
# Test queries
test_queries = [
"What is RecoAgent?",
"How does hybrid retrieval work?",
"OpenSearch vector storage capabilities",
"Enterprise RAG platform features"
]
for query in test_queries:
print(f"\nš Query: {query}")
results = retriever.search(query, top_k=5)
print(f"Found {len(results)} results:")
for i, result in enumerate(results, 1):
print(f" {i}. Score: {result.score:.3f} - {result.document[:60]}...")
if __name__ == "__main__":
test_hybrid_search()
Advanced Hybrid Search with Rerankingā
# advanced_hybrid_search.py
from recoagent.retrievers import HybridRetriever, CrossEncoderReranker
from recoagent.stores import OpenSearchStore
def setup_advanced_hybrid_search():
"""Set up advanced hybrid search with reranking."""
# Initialize OpenSearch store
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced",
embedding_dimension=1536
)
# Initialize hybrid retriever
retriever = HybridRetriever(
vector_store=store,
vector_weight=0.6,
bm25_weight=0.4,
fusion_method="rrf",
top_k=50 # Retrieve more candidates for reranking
)
# Initialize cross-encoder reranker
reranker = CrossEncoderReranker(
model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",
top_k=10 # Keep top 10 after reranking
)
# Combine retriever and reranker
retriever.add_reranker(reranker)
return retriever
def test_advanced_search():
"""Test advanced hybrid search with reranking."""
retriever = setup_advanced_hybrid_search()
# Complex query
query = "How can I implement enterprise RAG with RecoAgent and OpenSearch for scalable vector search?"
print(f"š Complex Query: {query}")
results = retriever.search(query, top_k=5)
print(f"\nš Search Results (with reranking):")
for i, result in enumerate(results, 1):
print(f" {i}. Score: {result.score:.3f}")
print(f" {result.document[:80]}...")
if hasattr(result, 'rerank_score'):
print(f" Rerank Score: {result.rerank_score:.3f}")
print()
if __name__ == "__main__":
test_advanced_search()
Step 6: Performance Optimizationā
Index Optimizationā
# optimize_index.py
from recoagent.stores import OpenSearchStore
import time
def optimize_opensearch_index():
"""Optimize OpenSearch index for better performance."""
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced"
)
# Force merge segments for better performance
print("š Force merging segments...")
store.client.indices.forcemerge(
index=store.index_name,
max_num_segments=1,
wait_for_completion=True
)
# Refresh index to make changes visible
print("š Refreshing index...")
store.client.indices.refresh(index=store.index_name)
# Optimize index settings
print("āļø Optimizing index settings...")
store.client.indices.put_settings(
index=store.index_name,
body={
"index": {
"refresh_interval": "30s", # Reduce refresh frequency
"number_of_replicas": 0, # No replicas for development
"translog.durability": "async", # Async durability for better performance
"translog.sync_interval": "30s"
}
}
)
print("ā
Index optimization completed")
def benchmark_search_performance():
"""Benchmark search performance."""
store = OpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced"
)
test_queries = [
"What is RecoAgent?",
"How does vector search work?",
"OpenSearch integration benefits",
"Enterprise RAG platform features",
"Hybrid retrieval implementation"
]
total_time = 0
total_queries = len(test_queries)
print("ā±ļø Benchmarking search performance...")
for query in test_queries:
start_time = time.time()
results = store.search(query, top_k=10)
end_time = time.time()
query_time = end_time - start_time
total_time += query_time
print(f"Query: '{query}' - {query_time:.3f}s - {len(results)} results")
avg_time = total_time / total_queries
print(f"\nš Performance Summary:")
print(f" Total queries: {total_queries}")
print(f" Total time: {total_time:.3f}s")
print(f" Average time per query: {avg_time:.3f}s")
print(f" Queries per second: {1/avg_time:.1f}")
if __name__ == "__main__":
optimize_opensearch_index()
benchmark_search_performance()
Caching Implementationā
# caching.py
from recoagent.stores import OpenSearchStore
import redis
import json
import hashlib
from typing import List, Dict
class CachedOpenSearchStore(OpenSearchStore):
"""OpenSearch store with Redis caching."""
def __init__(self, redis_url="redis://localhost:6379", cache_ttl=3600, **kwargs):
super().__init__(**kwargs)
self.redis_client = redis.from_url(redis_url)
self.cache_ttl = cache_ttl
def _get_cache_key(self, query: str, top_k: int, filters: Dict = None) -> str:
"""Generate cache key for query."""
cache_data = {
"query": query,
"top_k": top_k,
"filters": filters or {}
}
cache_string = json.dumps(cache_data, sort_keys=True)
return f"opensearch:search:{hashlib.md5(cache_string.encode()).hexdigest()}"
def search(self, query: str, top_k: int = 10, filters: Dict = None) -> List:
"""Search with caching."""
# Check cache first
cache_key = self._get_cache_key(query, top_k, filters)
cached_result = self.redis_client.get(cache_key)
if cached_result:
print("š¦ Cache hit!")
return json.loads(cached_result)
print("š Cache miss, searching OpenSearch...")
# Perform search
results = super().search(query, top_k, filters)
# Cache results
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps([r.__dict__ for r in results], default=str)
)
return results
def test_cached_search():
"""Test cached search performance."""
# Initialize cached store
cached_store = CachedOpenSearchStore(
host="localhost",
port=9200,
index_name="recoagent_advanced",
embedding_dimension=1536,
redis_url="redis://localhost:6379",
cache_ttl=3600
)
query = "What is RecoAgent and how does it work?"
# First search (cache miss)
print("First search (cache miss):")
start_time = time.time()
results1 = cached_store.search(query, top_k=5)
time1 = time.time() - start_time
print(f"Time: {time1:.3f}s, Results: {len(results1)}")
# Second search (cache hit)
print("\nSecond search (cache hit):")
start_time = time.time()
results2 = cached_store.search(query, top_k=5)
time2 = time.time() - start_time
print(f"Time: {time2:.3f}s, Results: {len(results2)}")
print(f"\nSpeedup: {time1/time2:.1f}x faster with caching")
if __name__ == "__main__":
test_cached_search()
Step 7: Monitoring and Maintenanceā
Health Monitoringā
# monitoring.py
from recoagent.stores import OpenSearchStore
import time
from datetime import datetime
class OpenSearchMonitor:
"""Monitor OpenSearch cluster health and performance."""
def __init__(self, host="localhost", port=9200):
self.store = OpenSearchStore(host=host, port=port)
self.client = self.store.client
def check_cluster_health(self):
"""Check cluster health status."""
health = self.client.cluster.health()
print(f"š„ Cluster Health Check - {datetime.now().isoformat()}")
print(f" Status: {health['status']}")
print(f" Nodes: {health['number_of_nodes']}")
print(f" Active Shards: {health['active_shards']}")
print(f" Unassigned Shards: {health['unassigned_shards']}")
if health['status'] != 'green':
print(f"ā ļø Warning: Cluster status is {health['status']}")
return health
def check_index_stats(self, index_name):
"""Check index statistics."""
stats = self.client.indices.stats(index=index_name)
index_stats = stats['indices'][index_name]
print(f"\nš Index Stats for {index_name}")
print(f" Documents: {index_stats['total']['docs']['count']}")
print(f" Size: {index_stats['total']['store']['size_in_bytes'] / 1024 / 1024:.1f} MB")
print(f" Indexing Rate: {index_stats['total']['indexing']['index_total']}")
print(f" Search Rate: {index_stats['total']['search']['query_total']}")
return index_stats
def check_query_performance(self, index_name, test_query="test query"):
"""Check query performance."""
start_time = time.time()
try:
response = self.client.search(
index=index_name,
body={
"query": {"match": {"content": test_query}},
"size": 10
}
)
end_time = time.time()
query_time = end_time - start_time
print(f"\nā” Query Performance")
print(f" Query Time: {query_time:.3f}s")
print(f" Results: {len(response['hits']['hits'])}")
print(f" Total Hits: {response['hits']['total']['value']}")
return query_time
except Exception as e:
print(f"ā Query failed: {e}")
return None
def run_health_check(self, index_name):
"""Run complete health check."""
print("š Running OpenSearch Health Check")
print("=" * 50)
# Check cluster health
cluster_health = self.check_cluster_health()
# Check index stats
index_stats = self.check_index_stats(index_name)
# Check query performance
query_time = self.check_query_performance(index_name)
# Summary
print(f"\nš Health Check Summary")
print(f" Cluster Status: {cluster_health['status']}")
print(f" Index Documents: {index_stats['total']['docs']['count']}")
print(f" Query Time: {query_time:.3f}s" if query_time else " Query Time: Failed")
return {
"cluster_health": cluster_health,
"index_stats": index_stats,
"query_time": query_time
}
def main():
"""Run monitoring checks."""
monitor = OpenSearchMonitor()
# Run health check
results = monitor.run_health_check("recoagent_advanced")
# Check if everything is healthy
if results["cluster_health"]["status"] == "green" and results["query_time"]:
print("\nā
OpenSearch is healthy and performing well!")
else:
print("\nā ļø OpenSearch may need attention. Check the details above.")
if __name__ == "__main__":
main()
Backup and Recoveryā
# backup_recovery.py
from recoagent.stores import OpenSearchStore
import json
from datetime import datetime
class OpenSearchBackup:
"""Handle OpenSearch backup and recovery."""
def __init__(self, host="localhost", port=9200):
self.store = OpenSearchStore(host=host, port=port)
self.client = self.store.client
def backup_index(self, index_name, backup_file=None):
"""Backup index data to file."""
if not backup_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"{index_name}_backup_{timestamp}.json"
print(f"š¦ Creating backup of {index_name}...")
# Get all documents
response = self.client.search(
index=index_name,
body={"query": {"match_all": {}}},
size=10000, # Adjust based on your data size
scroll="5m"
)
documents = response['hits']['hits']
# Save to file
backup_data = {
"index_name": index_name,
"backup_time": datetime.now().isoformat(),
"document_count": len(documents),
"documents": documents
}
with open(backup_file, 'w') as f:
json.dump(backup_data, f, indent=2, default=str)
print(f"ā
Backup completed: {backup_file}")
print(f" Documents backed up: {len(documents)}")
return backup_file
def restore_index(self, backup_file, new_index_name=None):
"""Restore index from backup file."""
print(f"š„ Restoring from backup: {backup_file}")
with open(backup_file, 'r') as f:
backup_data = json.load(f)
original_index = backup_data['index_name']
documents = backup_data['documents']
# Use new index name if provided
target_index = new_index_name or f"{original_index}_restored"
print(f" Original index: {original_index}")
print(f" Target index: {target_index}")
print(f" Documents to restore: {len(documents)}")
# Create new index (you may need to adjust settings)
try:
self.client.indices.create(index=target_index)
print(f" Created index: {target_index}")
except Exception as e:
print(f" Index may already exist: {e}")
# Restore documents
bulk_data = []
for doc in documents:
bulk_data.append({
"index": {
"_index": target_index,
"_id": doc['_id']
}
})
bulk_data.append(doc['_source'])
# Bulk insert
response = self.client.bulk(body=bulk_data)
if response['errors']:
print(f"ā Some documents failed to restore")
for item in response['items']:
if 'error' in item.get('index', {}):
print(f" Error: {item['index']['error']}")
else:
print(f"ā
All documents restored successfully")
# Refresh index
self.client.indices.refresh(index=target_index)
return target_index
def main():
"""Example backup and restore."""
backup_manager = OpenSearchBackup()
# Create backup
backup_file = backup_manager.backup_index("recoagent_advanced")
# Restore to new index
restored_index = backup_manager.restore_index(backup_file, "recoagent_restored")
print(f"\nš Backup and restore completed!")
print(f" Backup file: {backup_file}")
print(f" Restored index: {restored_index}")
if __name__ == "__main__":
main()
Troubleshootingā
Common Issuesā
Connection Issues:
# Check if OpenSearch is running
curl -X GET "localhost:9200/_cluster/health?pretty"
# Check network connectivity
telnet localhost 9200
# Check Docker container status
docker ps | grep opensearch
Index Creation Issues:
# Check index exists
response = client.indices.exists(index="your_index_name")
print(f"Index exists: {response}")
# Delete and recreate index
client.indices.delete(index="your_index_name")
client.indices.create(index="your_index_name")
Performance Issues:
# Check cluster settings
health = client.cluster.health()
print(f"Cluster status: {health['status']}")
# Check shard allocation
allocation = client.cat.allocation(v=True)
print(allocation)
Getting Helpā
If you encounter issues:
- Check OpenSearch logs - Look for error messages
- Verify configuration - Ensure all settings are correct
- Test connectivity - Verify network and authentication
- Monitor performance - Check cluster health and resource usage
- Contact support - Email support@recohut.com for assistance
Next Stepsā
Your OpenSearch integration is now complete! You can:
- š Quickstart Guide - Build your first agent with OpenSearch
- š Browse Examples - See OpenSearch in action
- š§ How-To Guides - Learn more advanced techniques
- š API Reference - Explore the full OpenSearch API
Summaryā
You've successfully:
- ā Set up OpenSearch locally or in the cloud
- ā Configured RecoAgent to use OpenSearch
- ā Created optimized indexes with custom mappings
- ā Implemented hybrid search with BM25 and vector search
- ā Added performance optimizations and caching
- ā Set up monitoring and backup procedures
Your RecoAgent system now has enterprise-grade vector storage with OpenSearch!
Ready for more? Check out the Vector Store Setup guide to learn about other vector store options!