API Reference Overview
This section provides comprehensive API documentation for all RecoAgent components. The API is organized into three main packages: packages.agents
, packages.rag
, and packages.observability
.
Package Structure
packages/
├── agents/ # Agent orchestration and workflows
├── rag/ # Retrieval, reranking, and evaluation
└── observability/ # Monitoring, tracing, and metrics
Quick Start
# Basic imports
from recoagent import RecoAgent
from recoagent.retrievers import HybridRetriever
from recoagent.evaluators import RAGASEvaluator
# Create agent
agent = RecoAgent(
llm_provider="openai",
llm_model="gpt-3.5-turbo",
embedding_model="text-embedding-ada-002"
)
# Add documents
agent.add_documents(["Document 1", "Document 2"])
# Ask a question
response = agent.ask("Your question here")
print(f"Answer: {response.answer}")
print(f"Confidence: {response.confidence}")
print(f"Sources: {len(response.sources)}")
Main RecoAgent Class
The RecoAgent
class is the primary interface for interacting with the system. It provides a high-level API that abstracts the complexity of the underlying components.
Constructor
class RecoAgent:
def __init__(
self,
llm_provider: str = "openai",
llm_model: str = "gpt-3.5-turbo",
embedding_model: str = "text-embedding-ada-002",
chunk_size: int = 500,
chunk_overlap: int = 50,
vector_store_config: Optional[Dict[str, Any]] = None,
safety_policy: Optional[SafetyPolicy] = None,
max_tokens: int = 1000,
temperature: float = 0.1,
timeout: int = 30,
enable_hybrid_search: bool = True,
enable_reranking: bool = False,
max_steps: int = 5
):
"""Initialize RecoAgent with configuration.
Args:
llm_provider: LLM provider ('openai', 'anthropic', 'google')
llm_model: Model name for the LLM
embedding_model: Model name for embeddings
chunk_size: Size of document chunks
chunk_overlap: Overlap between chunks
vector_store_config: Configuration for vector store
safety_policy: Safety policy configuration
max_tokens: Maximum tokens for LLM responses
temperature: Temperature for LLM generation
timeout: Timeout for operations in seconds
enable_hybrid_search: Whether to use hybrid retrieval
enable_reranking: Whether to use cross-encoder reranking
max_steps: Maximum reasoning steps for complex queries
"""
Core Methods
add_documents()
def add_documents(
self,
documents: List[str],
metadata: Optional[List[Dict[str, Any]]] = None,
batch_size: int = 100
) -> None:
"""Add documents to the knowledge base.
Args:
documents: List of document texts
metadata: Optional metadata for each document
batch_size: Batch size for processing documents
Example:
agent.add_documents([
"RecoAgent is an enterprise RAG platform.",
"It supports hybrid retrieval and evaluation."
])
# With metadata
agent.add_documents(
documents=["Document 1", "Document 2"],
metadata=[
{"source": "manual.pdf", "page": 1},
{"source": "guide.pdf", "page": 5}
]
)
"""
ask()
def ask(
self,
question: str,
context: Optional[str] = None,
max_steps: Optional[int] = None,
include_sources: bool = True,
include_reasoning: bool = False
) -> AgentResponse:
"""Ask a question and get a response.
Args:
question: The question to ask
context: Optional additional context
max_steps: Override default max reasoning steps
include_sources: Whether to include source citations
include_reasoning: Whether to include reasoning steps
Returns:
AgentResponse with answer, confidence, sources, and metadata
Example:
response = agent.ask("What is RecoAgent?")
print(f"Answer: {response.answer}")
print(f"Confidence: {response.confidence:.2f}")
for source in response.sources:
print(f"Source: {source.content[:100]}...")
"""
add_tool()
def add_tool(self, tool: Tool) -> None:
"""Add a custom tool to the agent.
Args:
tool: Tool instance to add
Example:
from recoagent.tools import Tool
def search_web(query: str) -> str:
# Your web search implementation
return f"Search results for: {query}"
agent.add_tool(Tool(
name="web_search",
description="Search the web for current information",
func=search_web
))
"""
set_safety_policy()
def set_safety_policy(self, policy: SafetyPolicy) -> None:
"""Set safety policies for the agent.
Args:
policy: Safety policy configuration
Example:
from recoagent.policies import SafetyPolicy
policy = SafetyPolicy(
max_cost_per_query=0.05,
blocked_topics=["sensitive_topic"],
enable_pii_detection=True
)
agent.set_safety_policy(policy)
"""
get_conversation_history()
def get_conversation_history(self) -> List[Dict[str, Any]]:
"""Get the conversation history.
Returns:
List of conversation entries with questions, answers, and metadata
Example:
history = agent.get_conversation_history()
for entry in history:
print(f"Q: {entry['question']}")
print(f"A: {entry['answer'][:100]}...")
print(f"Time: {entry['timestamp']}")
"""
clear_history()
def clear_history(self) -> None:
"""Clear the conversation history."""
health_check()
def health_check(self) -> Dict[str, Any]:
"""Check the health of the agent and its components.
Returns:
Health status information including component status,
configuration, and performance metrics
Example:
health = agent.health_check()
print(f"Status: {health['status']}")
print(f"LLM: {health['llm_status']}")
print(f"Vector Store: {health['vector_store_status']}")
"""
Response Object
The AgentResponse
class contains the results of a query:
@dataclass
class AgentResponse:
answer: str
confidence: float
sources: List[Source]
reasoning_steps: Optional[List[ReasoningStep]]
tools_used: List[str]
cost: float
latency_ms: float
timestamp: datetime
metadata: Dict[str, Any]
Source Object
@dataclass
class Source:
content: str
metadata: Dict[str, Any]
score: float
retrieval_method: str
ReasoningStep Object
@dataclass
class ReasoningStep:
step_number: int
type: str # "retrieval", "planning", "action", "generation"
description: str
input_data: Optional[Dict[str, Any]]
output_data: Optional[Dict[str, Any]]
tool_name: Optional[str]
tool_input: Optional[Dict[str, Any]]
tool_output: Optional[str]
latency_ms: float
cost: float
Core Classes
Agent Classes
RAGAgentGraph
Main agent class that orchestrates the complete RAG workflow.
class RAGAgentGraph:
def __init__(
self,
config: AgentConfig,
tool_registry: ToolRegistry,
safety_policy: Optional[SafetyPolicy] = None,
callback_handlers: Optional[List[AgentCallbackHandler]] = None
)
async def run(
self,
query: str,
user_id: Optional[str] = None
) -> Dict[str, Any]
Parameters:
config
: Agent configuration settingstool_registry
: Registry of available toolssafety_policy
: Safety and content filtering policiescallback_handlers
: Observability and monitoring handlers
Returns:
- Dictionary containing answer, metadata, cost, and latency information
AgentConfig
Configuration class for agent behavior and limits.
@dataclass
class AgentConfig:
model_name: str = "gpt-4"
temperature: float = 0.1
max_tokens: int = 1000
max_steps: int = 5
cost_limit: float = 0.10
timeout_seconds: int = 30
safety_enabled: bool = True
AgentState
State management for agent execution flow.
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], "Chat messages"]
query: str
retrieved_docs: List[Dict[str, Any]]
reranked_docs: List[Dict[str, Any]]
plan: Optional[str]
action: Optional[str]
answer: Optional[str]
error: Optional[str]
metadata: Dict[str, Any]
step_count: int
max_steps: int
cost_tracker: Dict[str, float]
latency_tracker: Dict[str, float]
RAG Classes
HybridRetriever
Combines BM25 and vector retrieval for optimal results.
class HybridRetriever(BaseRetriever):
def __init__(
self,
vector_retriever: VectorRetriever,
bm25_retriever: BM25Retriever,
alpha: float = 0.5,
vector_k: int = 20,
bm25_k: int = 20
)
def retrieve(self, query: str, k: int = 5) -> List[RetrievalResult]
Parameters:
vector_retriever
: Vector-based retriever instancebm25_retriever
: BM25-based retriever instancealpha
: Weight for vector search (1-alpha for BM25)vector_k
: Number of results from vector searchbm25_k
: Number of results from BM25 search
CrossEncoderReranker
Reranks retrieved results using cross-encoder models.
class CrossEncoderReranker(BaseReranker):
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
top_k: int = 5,
budget_ms: int = 100
)
def rerank(
self,
query: str,
documents: List[Chunk]
) -> List[RerankResult]
RAGASEvaluator
Evaluates RAG systems using RAGAS metrics.
class RAGASEvaluator:
def __init__(
self,
langsmith_api_key: Optional[str] = None,
langsmith_project: str = "recoagent-eval"
)
def evaluate_samples(
self,
samples: List[EvaluationSample]
) -> List[EvaluationResult]
def compute_aggregate_metrics(
self,
results: List[EvaluationResult]
) -> Dict[str, float]
Vector Store Classes
OpenSearchStore
OpenSearch vector store implementation.
class OpenSearchStore(VectorStore):
def __init__(
self,
endpoint: str,
index_name: str,
embedding_dimension: int = 1536,
username: Optional[str] = None,
password: Optional[str] = None
)
def add_document(
self,
document_id: str,
content: str,
embedding: List[float],
metadata: Dict[str, Any]
) -> bool
def search(
self,
query_embedding: List[float],
k: int = 5,
include_metadata: bool = True
) -> List[Dict[str, Any]]
def health_check(self) -> Dict[str, Any]
AzureAISearchStore
Azure AI Search vector store implementation.
class AzureAISearchStore(VectorStore):
def __init__(
self,
endpoint: str,
api_key: str,
index_name: str,
embedding_dimension: int = 1536
)
VertexAIVectorStore
Vertex AI Vector Search implementation.
class VertexAIVectorStore(VectorStore):
def __init__(
self,
project_id: str,
location: str,
index_name: str,
embedding_dimension: int = 1536,
credentials_info: Optional[Dict[str, Any]] = None
)
Observability Classes
LangSmithClient
LangSmith integration for tracing and monitoring.
class LangSmithClient:
def __init__(self, config: LangSmithConfig)
def create_run(
self,
name: str,
run_type: str = "chain",
inputs: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str
def update_run(
self,
run_id: str,
outputs: Optional[Dict[str, Any]] = None,
error: Optional[str] = None,
end_time: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
)
def get_project_metrics(self) -> Dict[str, Any]
MetricsCollector
Metrics collection and reporting.
class MetricsCollector:
def __init__(
self,
prometheus_enabled: bool = True,
custom_metrics: Optional[Dict[str, Any]] = None
)
def record_query_metrics(
self,
query_id: str,
user_id: str,
latency_ms: float,
cost_usd: float,
status: str
)
def get_summary(
self,
time_window_hours: int = 24,
include_cost_metrics: bool = True,
include_performance_metrics: bool = True
) -> Dict[str, Any]
Data Models
Retrieval Models
RetrievalResult
@dataclass
class RetrievalResult:
chunk: Chunk
score: float
retrieval_method: str
RerankResult
@dataclass
class RerankResult:
chunk: Chunk
original_score: float
reranked_score: float
reranking_method: str
Evaluation Models
EvaluationSample
@dataclass
class EvaluationSample:
question: str
ground_truth: str
answer: str
contexts: List[str]
source: str
metadata: Dict[str, Any]
EvaluationResult
@dataclass
class EvaluationResult:
sample_id: str
question: str
ground_truth: str
answer: str
contexts: List[str]
metrics: Dict[str, float]
cost: float
latency_ms: float
timestamp: datetime
Document Models
Chunk
@dataclass
class Chunk:
content: str
metadata: Dict[str, Any]
chunk_id: str
source: str
start_char: int
end_char: int
VectorDocument
@dataclass
class VectorDocument:
id: str
content: str
embedding: List[float]
metadata: Dict[str, Any]
Configuration Classes
LangSmithConfig
@dataclass
class LangSmithConfig:
api_key: str
project: str = "recoagent-rag"
endpoint: str = "https://api.smith.langchain.com"
tracing_enabled: bool = True
experiment_tracking: bool = True
Safety and Policy Classes
SafetyPolicy
class SafetyPolicy(BasePolicy):
def __init__(
self,
enable_pii_detection: bool = True,
enable_content_filtering: bool = True,
blocked_topics: Optional[List[str]] = None,
max_query_length: int = 1000
)
def should_block(self, input_data: Dict[str, Any]) -> bool
def get_blocking_reasons(self, input_data: Dict[str, Any]) -> List[str]
EscalationPolicy
class EscalationPolicy(BasePolicy):
def __init__(
self,
max_cost: float = 0.05,
max_steps: int = 3,
error_threshold: int = 2,
sensitive_topics: Optional[List[str]] = None
)
Utility Functions
Retriever Factory
def get_retriever(retriever_type: str, **kwargs) -> BaseRetriever:
"""Factory function to get the appropriate retriever."""
retrievers = {
"bm25": BM25Retriever,
"vector": VectorRetriever,
"hybrid": HybridRetriever,
}
if retriever_type not in retrievers:
raise ValueError(f"Unknown retriever type: {retriever_type}")
return retrievers[retriever_type](**kwargs)
Vector Store Factory
def get_vector_store(store_type: str, **kwargs) -> VectorStore:
"""Factory function to get the appropriate vector store."""
stores = {
"opensearch": OpenSearchStore,
"azure": AzureAISearchStore,
"vertex": VertexAIVectorStore,
}
if store_type not in stores:
raise ValueError(f"Unknown vector store type: {store_type}")
return stores[store_type](**kwargs)
Error Handling
Common Exceptions
RecoAgentError
Base exception for all RecoAgent errors.
class RecoAgentError(Exception):
"""Base exception for RecoAgent errors."""
pass
RetrievalError
class RetrievalError(RecoAgentError):
"""Error during document retrieval."""
pass
EvaluationError
class EvaluationError(RecoAgentError):
"""Error during evaluation."""
pass
VectorStoreError
class VectorStoreError(RecoAgentError):
"""Error with vector store operations."""
pass
Performance Considerations
Optimization Tips
- Batch Operations: Use batch methods for adding multiple documents
- Connection Pooling: Configure appropriate connection pool sizes
- Caching: Cache embeddings and frequent queries
- Async Operations: Use async methods for better concurrency
Memory Management
- Streaming: Use streaming for large document sets
- Chunking: Optimize chunk sizes for your use case
- Cleanup: Properly close connections and clean up resources
Security Considerations
API Keys
- Store API keys in environment variables
- Use secret management systems in production
- Rotate keys regularly
Data Privacy
- Enable PII detection and filtering
- Use content filtering for sensitive topics
- Implement proper access controls
Network Security
- Use HTTPS for all external communications
- Implement proper authentication and authorization
- Monitor for suspicious activity
Best Practices
Code Organization
- Modular Design: Keep components loosely coupled
- Error Handling: Implement comprehensive error handling
- Logging: Use structured logging for better observability
- Testing: Write comprehensive tests for all components
Production Deployment
- Health Checks: Implement health check endpoints
- Monitoring: Set up comprehensive monitoring and alerting
- Scaling: Design for horizontal scaling
- Backup: Implement backup and recovery strategies
Migration Guide
Version Compatibility
- Check version compatibility before upgrading
- Test thoroughly in staging environments
- Plan for backward compatibility when possible
Configuration Migration
- Update configuration files for new versions
- Migrate custom components as needed
- Update documentation and examples
Support and Resources
Documentation
- Tutorials: Step-by-step learning guides
- How-To Guides: Task-focused implementation guides
- Examples: Working code examples
- Explanations: Architecture and design decisions
Community
- Email Support: support@recohut.com for questions and support
Professional Support
- Enterprise support available
- Custom implementation services
- Training and consulting