Skip to main content

Connect External APIs to Your Agents

Difficulty: ⭐⭐ Intermediate | Time: 45 minutes

🎯 The Problem

Your agents need real-time data from external systems - CRM, ERP, weather APIs, stock prices, etc. You need to securely connect to these APIs, handle authentication, rate limits, and errors without breaking your agent workflow.

This guide solves: Integrating external APIs as agent tools with proper error handling, rate limiting, and security.

⚡ TL;DR - Quick API Integration

from packages.agents.tools import APITool

# 1. Define API tool
class WeatherAPITool(APITool):
name = "get_weather"
description = "Get current weather for a location"
base_url = "https://api.weather.com/v1"

def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key

async def _arun(self, location: str) -> dict:
return await self.get(
"/weather",
params={"location": location, "key": self.api_key}
)

# 2. Register with agent
registry.register_tool(WeatherAPITool(api_key="your-key"))

# 3. Agent can now fetch real-time weather!
result = await agent.run("What's the weather in San Francisco?")

Result: Agent autonomously calls external APIs when needed!


Full Integration Guide

Common API Patterns

REST API Integration

from packages.agents.tools import BaseTool
import aiohttp

class RESTAPITool(BaseTool):
"""Generic REST API integration"""

def __init__(self, base_url: str, api_key: str, headers: dict = None):
super().__init__()
self.base_url = base_url
self.api_key = api_key
self.headers = headers or {}
self.headers['Authorization'] = f'Bearer {api_key}'

async def _get(self, endpoint: str, params: dict = None):
"""GET request"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}{endpoint}",
params=params,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()

async def _post(self, endpoint: str, data: dict):
"""POST request"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}{endpoint}",
json=data,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()

GraphQL API Integration

class GraphQLAPITool(BaseTool):
"""GraphQL API integration"""

async def _arun(self, query: str, variables: dict = None):
"""Execute GraphQL query"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.endpoint,
json={"query": query, "variables": variables},
headers=self.headers
) as response:
data = await response.json()
if "errors" in data:
return {"success": False, "errors": data["errors"]}
return {"success": True, "data": data["data"]}

Error Handling & Retries

from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientAPITool(BaseTool):
"""API tool with automatic retries"""

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True
)
async def _arun(self, **kwargs):
"""Auto-retry on transient failures"""
try:
result = await self.call_api(**kwargs)
return {"success": True, "data": result}

except aiohttp.ClientTimeout:
# Timeout - retry
raise

except aiohttp.ClientResponseError as e:
if e.status in [429, 500, 502, 503, 504]:
# Transient errors - retry
raise
else:
# Permanent errors - don't retry
return {"success": False, "error": f"HTTP {e.status}"}

Rate Limiting

from packages.rate_limiting import RateLimiter
import asyncio

class RateLimitedAPITool(BaseTool):
"""API tool with rate limiting"""

def __init__(self, api_key: str, max_requests_per_minute: int = 60):
super().__init__()
self.api_key = api_key
self.rate_limiter = RateLimiter(
max_requests=max_requests_per_minute,
time_window=60
)

async def _arun(self, **kwargs):
"""Call API with rate limiting"""
# Wait if rate limit reached
await self.rate_limiter.acquire()

try:
result = await self.call_api(**kwargs)
return result
finally:
# Release rate limit slot
self.rate_limiter.release()

Authentication Patterns

API Key Authentication

headers = {"Authorization": f"Bearer {api_key}"}

OAuth 2.0

from authlib.integrations.httpx_client import AsyncOAuth2Client

class OAuth2APITool(BaseTool):
def __init__(self, client_id: str, client_secret: str):
self.client = AsyncOAuth2Client(
client_id=client_id,
client_secret=client_secret,
token_endpoint="https://api.example.com/oauth/token"
)

async def _arun(self, **kwargs):
# Get access token (auto-refreshed)
token = await self.client.fetch_token()

# Make API call with token
headers = {"Authorization": f"Bearer {token['access_token']}"}
# ... make request

Real-World Examples

Example 1: Salesforce Integration

class SalesforceTool(RESTAPITool):
name = "salesforce_query"
description = "Query Salesforce CRM for customer data"

def __init__(self, instance_url: str, access_token: str):
super().__init__(
base_url=f"{instance_url}/services/data/v58.0",
api_key=access_token
)

async def get_customer(self, email: str):
"""Get customer by email"""
query = f"SELECT Id, Name, Email, Account.Name FROM Contact WHERE Email = '{email}'"
return await self._get("/query", params={"q": query})

Example 2: Stripe Payment Info

import stripe

class StripeAPITool(BaseTool):
name = "stripe_lookup"
description = "Look up payment information from Stripe"

def __init__(self, api_key: str):
super().__init__()
stripe.api_key = api_key

async def _arun(self, customer_id: str):
"""Get customer payment info"""
try:
customer = stripe.Customer.retrieve(customer_id)
return {
"success": True,
"customer": {
"email": customer.email,
"balance": customer.balance,
"currency": customer.currency
}
}
except stripe.error.StripeError as e:
return {"success": False, "error": str(e)}

Example 3: Internal Microservices

class InternalServiceTool(BaseTool):
name = "analytics_service"
description = "Get analytics from internal service"

async def _arun(self, metric: str, timeframe: str):
"""Call internal analytics microservice"""
async with aiohttp.ClientSession() as session:
async with session.get(
"http://analytics-service.internal:8080/api/metrics",
params={"metric": metric, "timeframe": timeframe},
headers={"X-Internal-Auth": os.getenv("INTERNAL_AUTH_TOKEN")}
) as response:
return await response.json()

Best Practices

PracticeWhyExample
Use asyncNon-blocking I/Oasync def _arun()
Set timeoutsPrevent hangingtimeout=10s
Cache responsesReduce API callsCache for 5-60 minutes
Handle rate limitsAvoid 429 errorsImplement backoff
Log all callsDebugging, auditLog request/response
Sanitize inputsPrevent injectionValidate before API call
Mask sensitive dataSecurityRedact in logs

Security Considerations

Secure API Key Storage

# ❌ BAD: Hardcoded
api_key = "sk-1234567890"

# ✅ GOOD: Environment variables
api_key = os.getenv("API_KEY")

# ✅ BETTER: Secrets manager
from packages.security import SecretsManager
api_key = await SecretsManager().get_secret("api_key")

Request Signing

import hmac
import hashlib

def sign_request(payload: str, secret: str) -> str:
"""Sign API requests for verification"""
signature = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return signature

What You've Learned

REST API integration - GET/POST requests with auth
GraphQL integration - Query external GraphQL APIs
Error handling - Retries and fallbacks
Rate limiting - Respect API limits
Authentication - API keys, OAuth, internal auth
Security - Secrets management and request signing
Real examples - Salesforce, Stripe, internal services

Next Steps