Add Custom Tools to Agents
Difficulty: ⭐⭐ Intermediate | Time: 1-1.5 hours
🎯 The Problem
Your agents need to do more than just retrieve documents - they need to query databases, call APIs, perform calculations, or interact with external systems. The built-in tools aren't enough for your specific use case.
This guide solves: Creating custom tools that extend your agent's capabilities, from simple functions to complex integrations.
⚡ TL;DR - Quick Custom Tool
from packages.agents.tools import BaseTool
# 1. Define your tool
class DatabaseQueryTool(BaseTool):
name = "database_query"
description = "Query the user database for information"
def _run(self, sql_query: str) -> dict:
# Your logic here
results = execute_query(sql_query)
return {"results": results, "count": len(results)}
# 2. Register with agent
from packages.agents import ToolRegistry
registry = ToolRegistry()
registry.register_tool(DatabaseQueryTool())
# 3. Agent will automatically use it when needed!
agent = RAGAgentGraph(tool_registry=registry)
Result: Agent can now query your database autonomously!
Full Guide
Understanding Tools in RecoAgent
Tools are functions that agents can call to extend their capabilities beyond text generation. They:
- Execute actions (API calls, database queries, calculations)
- Retrieve external data (web search, real-time data)
- Perform computations (math, data processing)
- Interact with systems (send emails, create tickets)
Tool Anatomy
from packages.agents.tools import BaseTool
from typing import Dict, Any, Optional
class MyCustomTool(BaseTool):
"""Template for custom tools"""
# Required attributes
name = "my_tool" # Unique identifier
description = "What this tool does (LLM sees this)" # Be descriptive!
# Optional attributes
return_direct = False # Return result directly without further processing
handle_tool_error = True # Catch and handle errors gracefully
def _run(self, **kwargs) -> Dict[str, Any]:
"""Synchronous execution"""
# Your logic here
result = do_something(kwargs)
return {"result": result}
async def _arun(self, **kwargs) -> Dict[str, Any]:
"""Async execution (preferred for I/O operations)"""
result = await async_do_something(kwargs)
return {"result": result}
Real-World Examples
Example 1: Web Search Tool
import requests
from packages.agents.tools import BaseTool
class WebSearchTool(BaseTool):
name = "web_search"
description = """
Search the web for current information.
Use this when the query requires up-to-date information not in the knowledge base.
Input: search_query (str) - What to search for
Returns: List of search results with titles, URLs, and snippets
"""
def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key
async def _arun(self, search_query: str) -> Dict[str, Any]:
"""Search using an external API"""
try:
response = requests.get(
"https://api.search.com/search",
params={"q": search_query, "key": self.api_key},
timeout=10
)
response.raise_for_status()
results = response.json()["results"]
return {
"success": True,
"results": results[:5], # Top 5 results
"count": len(results)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"results": []
}
# Usage
tool = WebSearchTool(api_key="your-key")
registry.register_tool(tool)
Example 2: Database Query Tool
import asyncpg
from packages.agents.tools import BaseTool
class PostgresQueryTool(BaseTool):
name = "query_database"
description = """
Query the PostgreSQL database for user data, analytics, or reports.
Input: sql_query (str) - SQL SELECT statement (read-only)
Returns: Query results as list of dictionaries
"""
def __init__(self, connection_string: str):
super().__init__()
self.connection_string = connection_string
async def _arun(self, sql_query: str) -> Dict[str, Any]:
"""Execute database query"""
# Security: Only allow SELECT statements
if not sql_query.strip().upper().startswith('SELECT'):
return {
"success": False,
"error": "Only SELECT queries allowed"
}
try:
conn = await asyncpg.connect(self.connection_string)
results = await conn.fetch(sql_query)
await conn.close()
# Convert to list of dicts
data = [dict(row) for row in results]
return {
"success": True,
"data": data,
"count": len(data)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": []
}
Example 3: Calculation Tool
from packages.agents.tools import BaseTool
import math
class CalculatorTool(BaseTool):
name = "calculator"
description = """
Perform mathematical calculations.
Input: expression (str) - Math expression to evaluate (e.g., "2 + 2", "sqrt(16)")
Returns: Calculation result
"""
def _run(self, expression: str) -> Dict[str, Any]:
"""Safely evaluate math expressions"""
# Whitelist of safe functions
safe_functions = {
'sqrt': math.sqrt,
'pow': math.pow,
'log': math.log,
'sin': math.sin,
'cos': math.cos,
}
try:
# Use safe eval with limited scope
result = eval(expression, {"__builtins__": {}}, safe_functions)
return {
"success": True,
"result": result,
"expression": expression
}
except Exception as e:
return {
"success": False,
"error": f"Invalid expression: {str(e)}"
}
Example 4: API Integration Tool
import aiohttp
from packages.agents.tools import BaseTool
class SalesforceAPITool(BaseTool):
name = "salesforce_query"
description = """
Query Salesforce CRM for customer data.
Input: customer_email (str) - Customer email to look up
Returns: Customer information including account details and history
"""
def __init__(self, api_token: str, instance_url: str):
super().__init__()
self.api_token = api_token
self.instance_url = instance_url
async def _arun(self, customer_email: str) -> Dict[str, Any]:
"""Query Salesforce API"""
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.instance_url}/services/data/v58.0/query",
params={"q": f"SELECT Id, Name, Email FROM Contact WHERE Email = '{customer_email}'"},
headers=headers
) as response:
if response.status == 200:
data = await response.json()
return {
"success": True,
"customer": data["records"][0] if data["records"] else None
}
else:
return {
"success": False,
"error": f"API error: {response.status}"
}
Best Practices
1. Clear Descriptions
❌ Bad:
description = "Query database"
✅ Good:
description = """
Query the user database for account information.
Use when: User asks about account status, order history, or profile data.
Input: user_id (str) - The user's unique identifier
Returns: Dict with user account details, order history, and preferences
Example: "What orders has user john@example.com placed?"
"""
2. Error Handling
def _run(self, **kwargs):
try:
result = risky_operation()
return {"success": True, "data": result}
except TimeoutError:
return {"success": False, "error": "Operation timed out", "retry": True}
except Exception as e:
# Log error for debugging
logger.error(f"Tool error: {e}")
return {"success": False, "error": str(e), "retry": False}
3. Input Validation
def _run(self, sql_query: str, max_rows: int = 100):
# Validate inputs
if not sql_query:
return {"error": "sql_query is required"}
if max_rows < 1 or max_rows > 1000:
return {"error": "max_rows must be between 1 and 1000"}
# Sanitize SQL to prevent injection
if any(keyword in sql_query.upper() for keyword in ['DROP', 'DELETE', 'UPDATE', 'INSERT']):
return {"error": "Only SELECT queries allowed"}
# Execute...
4. Performance Considerations
class ExpensiveAPITool(BaseTool):
def __init__(self):
super().__init__()
# Add caching for expensive operations
self._cache = {}
self._cache_ttl = 3600 # 1 hour
async def _arun(self, query: str):
# Check cache first
cache_key = f"api_{hash(query)}"
if cache_key in self._cache:
cached_result, timestamp = self._cache[cache_key]
if time.time() - timestamp < self._cache_ttl:
return cached_result
# Make expensive API call
result = await expensive_api_call(query)
# Cache result
self._cache[cache_key] = (result, time.time())
return result
Testing Your Custom Tools
Unit Tests
import pytest
from your_tools import WebSearchTool
@pytest.mark.asyncio
async def test_web_search_tool():
"""Test web search tool"""
tool = WebSearchTool(api_key="test-key")
# Test successful search
result = await tool._arun("RecoAgent documentation")
assert result["success"] == True
assert len(result["results"]) > 0
# Test error handling
result = await tool._arun("")
assert result["success"] == False
assert "error" in result
@pytest.mark.asyncio
async def test_database_tool_security():
"""Test database tool prevents injection"""
tool = PostgresQueryTool("postgresql://...")
# Should reject dangerous queries
result = await tool._arun("DROP TABLE users;")
assert result["success"] == False
assert "Only SELECT" in result["error"]
Integration Tests
async def test_tool_with_agent():
"""Test tool works with agent"""
registry = ToolRegistry()
registry.register_tool(WebSearchTool(api_key="key"))
agent = RAGAgentGraph(tool_registry=registry)
# Agent should use web search for current info
result = await agent.run("What's the weather today?")
assert "web_search" in result["tools_used"]
assert result["answer"] is not None
Deployment Checklist
Before deploying custom tools to production:
- ✅ Unit tests pass with 80%+ coverage
- ✅ Error handling for all failure modes
- ✅ Input validation and sanitization
- ✅ Timeout limits set (prevent hanging)
- ✅ Rate limiting if calling external APIs
- ✅ Caching for expensive operations
- ✅ Logging for debugging
- ✅ Cost tracking if tool incurs charges
- ✅ Security review completed
- ✅ Documentation updated
Common Patterns
Tool Type | Use Case | Example |
---|---|---|
Data Retrieval | Get structured data | Database queries, API calls |
Computation | Calculate, analyze | Math, statistics, data processing |
Action | Perform operations | Send email, create ticket, update record |
External Service | Third-party integration | Weather API, stock prices, news |
File Operations | Read/write files | PDF parsing, image analysis |
What You've Learned
✅ Tool structure - How to create a custom tool with BaseTool
✅ Best practices - Clear descriptions, error handling, validation
✅ Real examples - Web search, database, calculator, API integration
✅ Testing - Unit and integration tests for tools
✅ Production - Deployment checklist and monitoring
Next Steps
- 🔌 Connect External APIs - Integrate third-party services
- 🛡️ Implement Guardrails - Control tool usage
- 📊 Monitor Tool Performance - Track tool execution