Connect External APIs to Your Agents
Difficulty: ⭐⭐ Intermediate | Time: 45 minutes
🎯 The Problem
Your agents need real-time data from external systems - CRM, ERP, weather APIs, stock prices, etc. You need to securely connect to these APIs, handle authentication, rate limits, and errors without breaking your agent workflow.
This guide solves: Integrating external APIs as agent tools with proper error handling, rate limiting, and security.
⚡ TL;DR - Quick API Integration
from packages.agents.tools import APITool
# 1. Define API tool
class WeatherAPITool(APITool):
name = "get_weather"
description = "Get current weather for a location"
base_url = "https://api.weather.com/v1"
def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key
async def _arun(self, location: str) -> dict:
return await self.get(
"/weather",
params={"location": location, "key": self.api_key}
)
# 2. Register with agent
registry.register_tool(WeatherAPITool(api_key="your-key"))
# 3. Agent can now fetch real-time weather!
result = await agent.run("What's the weather in San Francisco?")
Result: Agent autonomously calls external APIs when needed!
Full Integration Guide
Common API Patterns
REST API Integration
from packages.agents.tools import BaseTool
import aiohttp
class RESTAPITool(BaseTool):
"""Generic REST API integration"""
def __init__(self, base_url: str, api_key: str, headers: dict = None):
super().__init__()
self.base_url = base_url
self.api_key = api_key
self.headers = headers or {}
self.headers['Authorization'] = f'Bearer {api_key}'
async def _get(self, endpoint: str, params: dict = None):
"""GET request"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}{endpoint}",
params=params,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()
async def _post(self, endpoint: str, data: dict):
"""POST request"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}{endpoint}",
json=data,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()
GraphQL API Integration
class GraphQLAPITool(BaseTool):
"""GraphQL API integration"""
async def _arun(self, query: str, variables: dict = None):
"""Execute GraphQL query"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.endpoint,
json={"query": query, "variables": variables},
headers=self.headers
) as response:
data = await response.json()
if "errors" in data:
return {"success": False, "errors": data["errors"]}
return {"success": True, "data": data["data"]}
Error Handling & Retries
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientAPITool(BaseTool):
"""API tool with automatic retries"""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True
)
async def _arun(self, **kwargs):
"""Auto-retry on transient failures"""
try:
result = await self.call_api(**kwargs)
return {"success": True, "data": result}
except aiohttp.ClientTimeout:
# Timeout - retry
raise
except aiohttp.ClientResponseError as e:
if e.status in [429, 500, 502, 503, 504]:
# Transient errors - retry
raise
else:
# Permanent errors - don't retry
return {"success": False, "error": f"HTTP {e.status}"}
Rate Limiting
from packages.rate_limiting import RateLimiter
import asyncio
class RateLimitedAPITool(BaseTool):
"""API tool with rate limiting"""
def __init__(self, api_key: str, max_requests_per_minute: int = 60):
super().__init__()
self.api_key = api_key
self.rate_limiter = RateLimiter(
max_requests=max_requests_per_minute,
time_window=60
)
async def _arun(self, **kwargs):
"""Call API with rate limiting"""
# Wait if rate limit reached
await self.rate_limiter.acquire()
try:
result = await self.call_api(**kwargs)
return result
finally:
# Release rate limit slot
self.rate_limiter.release()
Authentication Patterns
API Key Authentication
headers = {"Authorization": f"Bearer {api_key}"}
OAuth 2.0
from authlib.integrations.httpx_client import AsyncOAuth2Client
class OAuth2APITool(BaseTool):
def __init__(self, client_id: str, client_secret: str):
self.client = AsyncOAuth2Client(
client_id=client_id,
client_secret=client_secret,
token_endpoint="https://api.example.com/oauth/token"
)
async def _arun(self, **kwargs):
# Get access token (auto-refreshed)
token = await self.client.fetch_token()
# Make API call with token
headers = {"Authorization": f"Bearer {token['access_token']}"}
# ... make request
Real-World Examples
Example 1: Salesforce Integration
class SalesforceTool(RESTAPITool):
name = "salesforce_query"
description = "Query Salesforce CRM for customer data"
def __init__(self, instance_url: str, access_token: str):
super().__init__(
base_url=f"{instance_url}/services/data/v58.0",
api_key=access_token
)
async def get_customer(self, email: str):
"""Get customer by email"""
query = f"SELECT Id, Name, Email, Account.Name FROM Contact WHERE Email = '{email}'"
return await self._get("/query", params={"q": query})
Example 2: Stripe Payment Info
import stripe
class StripeAPITool(BaseTool):
name = "stripe_lookup"
description = "Look up payment information from Stripe"
def __init__(self, api_key: str):
super().__init__()
stripe.api_key = api_key
async def _arun(self, customer_id: str):
"""Get customer payment info"""
try:
customer = stripe.Customer.retrieve(customer_id)
return {
"success": True,
"customer": {
"email": customer.email,
"balance": customer.balance,
"currency": customer.currency
}
}
except stripe.error.StripeError as e:
return {"success": False, "error": str(e)}
Example 3: Internal Microservices
class InternalServiceTool(BaseTool):
name = "analytics_service"
description = "Get analytics from internal service"
async def _arun(self, metric: str, timeframe: str):
"""Call internal analytics microservice"""
async with aiohttp.ClientSession() as session:
async with session.get(
"http://analytics-service.internal:8080/api/metrics",
params={"metric": metric, "timeframe": timeframe},
headers={"X-Internal-Auth": os.getenv("INTERNAL_AUTH_TOKEN")}
) as response:
return await response.json()
Best Practices
Practice | Why | Example |
---|---|---|
Use async | Non-blocking I/O | async def _arun() |
Set timeouts | Prevent hanging | timeout=10s |
Cache responses | Reduce API calls | Cache for 5-60 minutes |
Handle rate limits | Avoid 429 errors | Implement backoff |
Log all calls | Debugging, audit | Log request/response |
Sanitize inputs | Prevent injection | Validate before API call |
Mask sensitive data | Security | Redact in logs |
Security Considerations
Secure API Key Storage
# ❌ BAD: Hardcoded
api_key = "sk-1234567890"
# ✅ GOOD: Environment variables
api_key = os.getenv("API_KEY")
# ✅ BETTER: Secrets manager
from packages.security import SecretsManager
api_key = await SecretsManager().get_secret("api_key")
Request Signing
import hmac
import hashlib
def sign_request(payload: str, secret: str) -> str:
"""Sign API requests for verification"""
signature = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return signature
What You've Learned
✅ REST API integration - GET/POST requests with auth
✅ GraphQL integration - Query external GraphQL APIs
✅ Error handling - Retries and fallbacks
✅ Rate limiting - Respect API limits
✅ Authentication - API keys, OAuth, internal auth
✅ Security - Secrets management and request signing
✅ Real examples - Salesforce, Stripe, internal services
Next Steps
- 🔧 Add Custom Tools - Build more tool types
- 🔒 Handle Authentication - Secure your API
- 📊 Monitor API Performance - Track external calls