Skip to main content

API Reference Overview

This section provides comprehensive API documentation for all RecoAgent components. The API is organized into three main packages: packages.agents, packages.rag, and packages.observability.

Package Structure

packages/
├── agents/ # Agent orchestration and workflows
├── rag/ # Retrieval, reranking, and evaluation
└── observability/ # Monitoring, tracing, and metrics

Quick Start

# Basic imports
from recoagent import RecoAgent
from recoagent.retrievers import HybridRetriever
from recoagent.evaluators import RAGASEvaluator

# Create agent
agent = RecoAgent(
llm_provider="openai",
llm_model="gpt-3.5-turbo",
embedding_model="text-embedding-ada-002"
)

# Add documents
agent.add_documents(["Document 1", "Document 2"])

# Ask a question
response = agent.ask("Your question here")
print(f"Answer: {response.answer}")
print(f"Confidence: {response.confidence}")
print(f"Sources: {len(response.sources)}")

Main RecoAgent Class

The RecoAgent class is the primary interface for interacting with the system. It provides a high-level API that abstracts the complexity of the underlying components.

Constructor

class RecoAgent:
def __init__(
self,
llm_provider: str = "openai",
llm_model: str = "gpt-3.5-turbo",
embedding_model: str = "text-embedding-ada-002",
chunk_size: int = 500,
chunk_overlap: int = 50,
vector_store_config: Optional[Dict[str, Any]] = None,
safety_policy: Optional[SafetyPolicy] = None,
max_tokens: int = 1000,
temperature: float = 0.1,
timeout: int = 30,
enable_hybrid_search: bool = True,
enable_reranking: bool = False,
max_steps: int = 5
):
"""Initialize RecoAgent with configuration.

Args:
llm_provider: LLM provider ('openai', 'anthropic', 'google')
llm_model: Model name for the LLM
embedding_model: Model name for embeddings
chunk_size: Size of document chunks
chunk_overlap: Overlap between chunks
vector_store_config: Configuration for vector store
safety_policy: Safety policy configuration
max_tokens: Maximum tokens for LLM responses
temperature: Temperature for LLM generation
timeout: Timeout for operations in seconds
enable_hybrid_search: Whether to use hybrid retrieval
enable_reranking: Whether to use cross-encoder reranking
max_steps: Maximum reasoning steps for complex queries
"""

Core Methods

add_documents()

def add_documents(
self,
documents: List[str],
metadata: Optional[List[Dict[str, Any]]] = None,
batch_size: int = 100
) -> None:
"""Add documents to the knowledge base.

Args:
documents: List of document texts
metadata: Optional metadata for each document
batch_size: Batch size for processing documents

Example:
agent.add_documents([
"RecoAgent is an enterprise RAG platform.",
"It supports hybrid retrieval and evaluation."
])

# With metadata
agent.add_documents(
documents=["Document 1", "Document 2"],
metadata=[
{"source": "manual.pdf", "page": 1},
{"source": "guide.pdf", "page": 5}
]
)
"""

ask()

def ask(
self,
question: str,
context: Optional[str] = None,
max_steps: Optional[int] = None,
include_sources: bool = True,
include_reasoning: bool = False
) -> AgentResponse:
"""Ask a question and get a response.

Args:
question: The question to ask
context: Optional additional context
max_steps: Override default max reasoning steps
include_sources: Whether to include source citations
include_reasoning: Whether to include reasoning steps

Returns:
AgentResponse with answer, confidence, sources, and metadata

Example:
response = agent.ask("What is RecoAgent?")
print(f"Answer: {response.answer}")
print(f"Confidence: {response.confidence:.2f}")

for source in response.sources:
print(f"Source: {source.content[:100]}...")
"""

add_tool()

def add_tool(self, tool: Tool) -> None:
"""Add a custom tool to the agent.

Args:
tool: Tool instance to add

Example:
from recoagent.tools import Tool

def search_web(query: str) -> str:
# Your web search implementation
return f"Search results for: {query}"

agent.add_tool(Tool(
name="web_search",
description="Search the web for current information",
func=search_web
))
"""

set_safety_policy()

def set_safety_policy(self, policy: SafetyPolicy) -> None:
"""Set safety policies for the agent.

Args:
policy: Safety policy configuration

Example:
from recoagent.policies import SafetyPolicy

policy = SafetyPolicy(
max_cost_per_query=0.05,
blocked_topics=["sensitive_topic"],
enable_pii_detection=True
)
agent.set_safety_policy(policy)
"""

get_conversation_history()

def get_conversation_history(self) -> List[Dict[str, Any]]:
"""Get the conversation history.

Returns:
List of conversation entries with questions, answers, and metadata

Example:
history = agent.get_conversation_history()
for entry in history:
print(f"Q: {entry['question']}")
print(f"A: {entry['answer'][:100]}...")
print(f"Time: {entry['timestamp']}")
"""

clear_history()

def clear_history(self) -> None:
"""Clear the conversation history."""

health_check()

def health_check(self) -> Dict[str, Any]:
"""Check the health of the agent and its components.

Returns:
Health status information including component status,
configuration, and performance metrics

Example:
health = agent.health_check()
print(f"Status: {health['status']}")
print(f"LLM: {health['llm_status']}")
print(f"Vector Store: {health['vector_store_status']}")
"""

Response Object

The AgentResponse class contains the results of a query:

@dataclass
class AgentResponse:
answer: str
confidence: float
sources: List[Source]
reasoning_steps: Optional[List[ReasoningStep]]
tools_used: List[str]
cost: float
latency_ms: float
timestamp: datetime
metadata: Dict[str, Any]

Source Object

@dataclass  
class Source:
content: str
metadata: Dict[str, Any]
score: float
retrieval_method: str

ReasoningStep Object

@dataclass
class ReasoningStep:
step_number: int
type: str # "retrieval", "planning", "action", "generation"
description: str
input_data: Optional[Dict[str, Any]]
output_data: Optional[Dict[str, Any]]
tool_name: Optional[str]
tool_input: Optional[Dict[str, Any]]
tool_output: Optional[str]
latency_ms: float
cost: float

Core Classes

Agent Classes

RAGAgentGraph

Main agent class that orchestrates the complete RAG workflow.

class RAGAgentGraph:
def __init__(
self,
config: AgentConfig,
tool_registry: ToolRegistry,
safety_policy: Optional[SafetyPolicy] = None,
callback_handlers: Optional[List[AgentCallbackHandler]] = None
)

async def run(
self,
query: str,
user_id: Optional[str] = None
) -> Dict[str, Any]

Parameters:

  • config: Agent configuration settings
  • tool_registry: Registry of available tools
  • safety_policy: Safety and content filtering policies
  • callback_handlers: Observability and monitoring handlers

Returns:

  • Dictionary containing answer, metadata, cost, and latency information

AgentConfig

Configuration class for agent behavior and limits.

@dataclass
class AgentConfig:
model_name: str = "gpt-4"
temperature: float = 0.1
max_tokens: int = 1000
max_steps: int = 5
cost_limit: float = 0.10
timeout_seconds: int = 30
safety_enabled: bool = True

AgentState

State management for agent execution flow.

class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], "Chat messages"]
query: str
retrieved_docs: List[Dict[str, Any]]
reranked_docs: List[Dict[str, Any]]
plan: Optional[str]
action: Optional[str]
answer: Optional[str]
error: Optional[str]
metadata: Dict[str, Any]
step_count: int
max_steps: int
cost_tracker: Dict[str, float]
latency_tracker: Dict[str, float]

RAG Classes

HybridRetriever

Combines BM25 and vector retrieval for optimal results.

class HybridRetriever(BaseRetriever):
def __init__(
self,
vector_retriever: VectorRetriever,
bm25_retriever: BM25Retriever,
alpha: float = 0.5,
vector_k: int = 20,
bm25_k: int = 20
)

def retrieve(self, query: str, k: int = 5) -> List[RetrievalResult]

Parameters:

  • vector_retriever: Vector-based retriever instance
  • bm25_retriever: BM25-based retriever instance
  • alpha: Weight for vector search (1-alpha for BM25)
  • vector_k: Number of results from vector search
  • bm25_k: Number of results from BM25 search

CrossEncoderReranker

Reranks retrieved results using cross-encoder models.

class CrossEncoderReranker(BaseReranker):
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
top_k: int = 5,
budget_ms: int = 100
)

def rerank(
self,
query: str,
documents: List[Chunk]
) -> List[RerankResult]

RAGASEvaluator

Evaluates RAG systems using RAGAS metrics.

class RAGASEvaluator:
def __init__(
self,
langsmith_api_key: Optional[str] = None,
langsmith_project: str = "recoagent-eval"
)

def evaluate_samples(
self,
samples: List[EvaluationSample]
) -> List[EvaluationResult]

def compute_aggregate_metrics(
self,
results: List[EvaluationResult]
) -> Dict[str, float]

Vector Store Classes

OpenSearchStore

OpenSearch vector store implementation.

class OpenSearchStore(VectorStore):
def __init__(
self,
endpoint: str,
index_name: str,
embedding_dimension: int = 1536,
username: Optional[str] = None,
password: Optional[str] = None
)

def add_document(
self,
document_id: str,
content: str,
embedding: List[float],
metadata: Dict[str, Any]
) -> bool

def search(
self,
query_embedding: List[float],
k: int = 5,
include_metadata: bool = True
) -> List[Dict[str, Any]]

def health_check(self) -> Dict[str, Any]

AzureAISearchStore

Azure AI Search vector store implementation.

class AzureAISearchStore(VectorStore):
def __init__(
self,
endpoint: str,
api_key: str,
index_name: str,
embedding_dimension: int = 1536
)

VertexAIVectorStore

Vertex AI Vector Search implementation.

class VertexAIVectorStore(VectorStore):
def __init__(
self,
project_id: str,
location: str,
index_name: str,
embedding_dimension: int = 1536,
credentials_info: Optional[Dict[str, Any]] = None
)

Observability Classes

LangSmithClient

LangSmith integration for tracing and monitoring.

class LangSmithClient:
def __init__(self, config: LangSmithConfig)

def create_run(
self,
name: str,
run_type: str = "chain",
inputs: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str

def update_run(
self,
run_id: str,
outputs: Optional[Dict[str, Any]] = None,
error: Optional[str] = None,
end_time: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
)

def get_project_metrics(self) -> Dict[str, Any]

MetricsCollector

Metrics collection and reporting.

class MetricsCollector:
def __init__(
self,
prometheus_enabled: bool = True,
custom_metrics: Optional[Dict[str, Any]] = None
)

def record_query_metrics(
self,
query_id: str,
user_id: str,
latency_ms: float,
cost_usd: float,
status: str
)

def get_summary(
self,
time_window_hours: int = 24,
include_cost_metrics: bool = True,
include_performance_metrics: bool = True
) -> Dict[str, Any]

Data Models

Retrieval Models

RetrievalResult

@dataclass
class RetrievalResult:
chunk: Chunk
score: float
retrieval_method: str

RerankResult

@dataclass
class RerankResult:
chunk: Chunk
original_score: float
reranked_score: float
reranking_method: str

Evaluation Models

EvaluationSample

@dataclass
class EvaluationSample:
question: str
ground_truth: str
answer: str
contexts: List[str]
source: str
metadata: Dict[str, Any]

EvaluationResult

@dataclass
class EvaluationResult:
sample_id: str
question: str
ground_truth: str
answer: str
contexts: List[str]
metrics: Dict[str, float]
cost: float
latency_ms: float
timestamp: datetime

Document Models

Chunk

@dataclass
class Chunk:
content: str
metadata: Dict[str, Any]
chunk_id: str
source: str
start_char: int
end_char: int

VectorDocument

@dataclass
class VectorDocument:
id: str
content: str
embedding: List[float]
metadata: Dict[str, Any]

Configuration Classes

LangSmithConfig

@dataclass
class LangSmithConfig:
api_key: str
project: str = "recoagent-rag"
endpoint: str = "https://api.smith.langchain.com"
tracing_enabled: bool = True
experiment_tracking: bool = True

Safety and Policy Classes

SafetyPolicy

class SafetyPolicy(BasePolicy):
def __init__(
self,
enable_pii_detection: bool = True,
enable_content_filtering: bool = True,
blocked_topics: Optional[List[str]] = None,
max_query_length: int = 1000
)

def should_block(self, input_data: Dict[str, Any]) -> bool
def get_blocking_reasons(self, input_data: Dict[str, Any]) -> List[str]

EscalationPolicy

class EscalationPolicy(BasePolicy):
def __init__(
self,
max_cost: float = 0.05,
max_steps: int = 3,
error_threshold: int = 2,
sensitive_topics: Optional[List[str]] = None
)

Utility Functions

Retriever Factory

def get_retriever(retriever_type: str, **kwargs) -> BaseRetriever:
"""Factory function to get the appropriate retriever."""
retrievers = {
"bm25": BM25Retriever,
"vector": VectorRetriever,
"hybrid": HybridRetriever,
}

if retriever_type not in retrievers:
raise ValueError(f"Unknown retriever type: {retriever_type}")

return retrievers[retriever_type](**kwargs)

Vector Store Factory

def get_vector_store(store_type: str, **kwargs) -> VectorStore:
"""Factory function to get the appropriate vector store."""
stores = {
"opensearch": OpenSearchStore,
"azure": AzureAISearchStore,
"vertex": VertexAIVectorStore,
}

if store_type not in stores:
raise ValueError(f"Unknown vector store type: {store_type}")

return stores[store_type](**kwargs)

Error Handling

Common Exceptions

RecoAgentError

Base exception for all RecoAgent errors.

class RecoAgentError(Exception):
"""Base exception for RecoAgent errors."""
pass

RetrievalError

class RetrievalError(RecoAgentError):
"""Error during document retrieval."""
pass

EvaluationError

class EvaluationError(RecoAgentError):
"""Error during evaluation."""
pass

VectorStoreError

class VectorStoreError(RecoAgentError):
"""Error with vector store operations."""
pass

Performance Considerations

Optimization Tips

  1. Batch Operations: Use batch methods for adding multiple documents
  2. Connection Pooling: Configure appropriate connection pool sizes
  3. Caching: Cache embeddings and frequent queries
  4. Async Operations: Use async methods for better concurrency

Memory Management

  1. Streaming: Use streaming for large document sets
  2. Chunking: Optimize chunk sizes for your use case
  3. Cleanup: Properly close connections and clean up resources

Security Considerations

API Keys

  • Store API keys in environment variables
  • Use secret management systems in production
  • Rotate keys regularly

Data Privacy

  • Enable PII detection and filtering
  • Use content filtering for sensitive topics
  • Implement proper access controls

Network Security

  • Use HTTPS for all external communications
  • Implement proper authentication and authorization
  • Monitor for suspicious activity

Best Practices

Code Organization

  1. Modular Design: Keep components loosely coupled
  2. Error Handling: Implement comprehensive error handling
  3. Logging: Use structured logging for better observability
  4. Testing: Write comprehensive tests for all components

Production Deployment

  1. Health Checks: Implement health check endpoints
  2. Monitoring: Set up comprehensive monitoring and alerting
  3. Scaling: Design for horizontal scaling
  4. Backup: Implement backup and recovery strategies

Migration Guide

Version Compatibility

  • Check version compatibility before upgrading
  • Test thoroughly in staging environments
  • Plan for backward compatibility when possible

Configuration Migration

  • Update configuration files for new versions
  • Migrate custom components as needed
  • Update documentation and examples

Support and Resources

Documentation

Community

Professional Support

  • Enterprise support available
  • Custom implementation services
  • Training and consulting