Channel Adapters
Communication channel adapters providing seamless integration with Slack, Microsoft Teams, Telegram, and webhook-based systems for enterprise chatbot deployments.
Core Classes
SlackAdapter
Description: Slack workspace integration adapter
Parameters:
bot_token(str): Slack bot tokensigning_secret(str): Slack signing secretenable_threads(bool): Enable thread support (default: True)enable_file_sharing(bool): Enable file sharing (default: True)enable_slash_commands(bool): Enable slash commands (default: True)
Returns: SlackAdapter instance
Example:
from recoagent.channels import SlackAdapter
# Create Slack adapter
slack_adapter = SlackAdapter(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET"),
enable_threads=True,
enable_file_sharing=True,
enable_slash_commands=True
)
# Send message
response = await slack_adapter.send_message(
channel="C1234567890",
text="Hello from RecoAgent!",
thread_ts="1234567890.123456" # Optional: reply in thread
)
TeamsAdapter
Description: Microsoft Teams integration adapter
Parameters:
app_id(str): Teams app IDapp_password(str): Teams app passwordtenant_id(str): Azure tenant IDenable_adaptive_cards(bool): Enable adaptive cards (default: True)enable_file_attachments(bool): Enable file attachments (default: True)
Returns: TeamsAdapter instance
Example:
from recoagent.channels import TeamsAdapter
# Create Teams adapter
teams_adapter = TeamsAdapter(
app_id=os.getenv("TEAMS_APP_ID"),
app_password=os.getenv("TEAMS_APP_PASSWORD"),
tenant_id=os.getenv("AZURE_TENANT_ID"),
enable_adaptive_cards=True,
enable_file_attachments=True
)
# Send adaptive card
card = {
"type": "AdaptiveCard",
"body": [
{"type": "TextBlock", "text": "RecoAgent Response", "weight": "bolder"},
{"type": "TextBlock", "text": "Here's your AI-generated response"}
]
}
response = await teams_adapter.send_adaptive_card(
conversation_id="conversation_123",
card=card
)
TelegramAdapter
Description: Telegram bot integration adapter
Parameters:
bot_token(str): Telegram bot tokenwebhook_url(str, optional): Webhook URL for updatesenable_inline_keyboards(bool): Enable inline keyboards (default: True)enable_file_sharing(bool): Enable file sharing (default: True)
Returns: TelegramAdapter instance
Example:
from recoagent.channels import TelegramAdapter
# Create Telegram adapter
telegram_adapter = TelegramAdapter(
bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
webhook_url="https://your-domain.com/webhook/telegram",
enable_inline_keyboards=True,
enable_file_sharing=True
)
# Send message with inline keyboard
keyboard = [
[{"text": "Option 1", "callback_data": "option1"}],
[{"text": "Option 2", "callback_data": "option2"}]
]
response = await telegram_adapter.send_message(
chat_id="123456789",
text="Choose an option:",
reply_markup={"inline_keyboard": keyboard}
)
WebhookAdapter
Description: Generic webhook-based integration adapter
Parameters:
webhook_url(str): Webhook endpoint URLauthentication(Dict, optional): Authentication configurationretry_attempts(int): Number of retry attempts (default: 3)timeout(int): Request timeout in seconds (default: 30)
Returns: WebhookAdapter instance
Example:
from recoagent.channels import WebhookAdapter
# Create webhook adapter
webhook_adapter = WebhookAdapter(
webhook_url="https://api.example.com/webhook",
authentication={"type": "bearer", "token": "your_token"},
retry_attempts=3,
timeout=30
)
# Send webhook payload
payload = {
"message": "Hello from RecoAgent",
"user_id": "user_123",
"timestamp": datetime.utcnow().isoformat()
}
response = await webhook_adapter.send_webhook(payload)
Usage Examples
Multi-Channel Bot Setup
from recoagent.channels import SlackAdapter, TeamsAdapter, TelegramAdapter, WebhookAdapter
import asyncio
# Create multiple channel adapters
adapters = {
"slack": SlackAdapter(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET")
),
"teams": TeamsAdapter(
app_id=os.getenv("TEAMS_APP_ID"),
app_password=os.getenv("TEAMS_APP_PASSWORD"),
tenant_id=os.getenv("AZURE_TENANT_ID")
),
"telegram": TelegramAdapter(
bot_token=os.getenv("TELEGRAM_BOT_TOKEN")
),
"webhook": WebhookAdapter(
webhook_url=os.getenv("WEBHOOK_URL")
)
}
# Send message to all channels
async def broadcast_message(message: str, channels: List[str] = None):
"""Broadcast message to specified channels."""
if channels is None:
channels = list(adapters.keys())
results = {}
for channel in channels:
if channel in adapters:
try:
response = await adapters[channel].send_message(
channel_or_id=get_channel_id(channel),
text=message
)
results[channel] = {"success": True, "response": response}
except Exception as e:
results[channel] = {"success": False, "error": str(e)}
return results
# Broadcast to all channels
results = await broadcast_message("RecoAgent is now online!")
for channel, result in results.items():
if result["success"]:
print(f"✅ {channel}: Message sent successfully")
else:
print(f"❌ {channel}: {result['error']}")
Advanced Slack Integration
from recoagent.channels import SlackAdapter
import asyncio
# Create advanced Slack adapter
slack_adapter = SlackAdapter(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET"),
enable_threads=True,
enable_file_sharing=True,
enable_slash_commands=True
)
# Handle slash command
@slack_adapter.slash_command("/ask")
async def handle_ask_command(command_data):
"""Handle /ask slash command."""
query = command_data.get("text", "")
user_id = command_data.get("user_id")
channel_id = command_data.get("channel_id")
# Process query with RecoAgent
response = await process_query_with_recoagent(query, user_id)
# Send response
await slack_adapter.send_message(
channel=channel_id,
text=response,
thread_ts=command_data.get("ts") # Reply in thread
)
# Send interactive message with buttons
async def send_interactive_message(channel_id: str, query: str):
"""Send message with interactive buttons."""
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Query: *{query}*\n\nHere's your response:"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "👍 Helpful"},
"action_id": "feedback_helpful",
"value": query
},
{
"type": "button",
"text": {"type": "plain_text", "text": "👎 Not Helpful"},
"action_id": "feedback_not_helpful",
"value": query
}
]
}
]
await slack_adapter.send_blocks(
channel=channel_id,
blocks=blocks
)
# Handle button interactions
@slack_adapter.button_action("feedback_helpful")
async def handle_helpful_feedback(button_data):
"""Handle helpful feedback."""
query = button_data.get("value")
user_id = button_data.get("user_id")
# Record positive feedback
await record_feedback(query, user_id, "positive")
# Send confirmation
await slack_adapter.send_ephemeral_message(
channel=button_data.get("channel_id"),
user=user_id,
text="Thank you for your feedback! 👍"
)
Teams Adaptive Cards
from recoagent.channels import TeamsAdapter
import asyncio
# Create Teams adapter
teams_adapter = TeamsAdapter(
app_id=os.getenv("TEAMS_APP_ID"),
app_password=os.getenv("TEAMS_APP_PASSWORD"),
tenant_id=os.getenv("AZURE_TENANT_ID"),
enable_adaptive_cards=True
)
# Create adaptive card for search results
def create_search_results_card(query: str, results: List[Dict]):
"""Create adaptive card for search results."""
card = {
"type": "AdaptiveCard",
"version": "1.3",
"body": [
{
"type": "TextBlock",
"text": f"Search Results for: **{query}**",
"weight": "bolder",
"size": "medium"
},
{
"type": "Container",
"items": []
}
]
}
# Add result items
for i, result in enumerate(results[:5]): # Limit to 5 results
result_item = {
"type": "Container",
"style": "emphasis",
"items": [
{
"type": "TextBlock",
"text": result.get("title", "Untitled"),
"weight": "bolder",
"wrap": True
},
{
"type": "TextBlock",
"text": result.get("snippet", "")[:200] + "...",
"wrap": True
},
{
"type": "ActionSet",
"actions": [
{
"type": "Action.OpenUrl",
"title": "View Full Result",
"url": result.get("url", "#")
}
]
}
]
}
card["body"][1]["items"].append(result_item)
return card
# Send search results
async def send_search_results(conversation_id: str, query: str, results: List[Dict]):
"""Send search results as adaptive card."""
card = create_search_results_card(query, results)
await teams_adapter.send_adaptive_card(
conversation_id=conversation_id,
card=card
)
# Handle Teams message
@teams_adapter.message_handler()
async def handle_teams_message(message_data):
"""Handle incoming Teams message."""
text = message_data.get("text", "")
conversation_id = message_data.get("conversation_id")
# Process with RecoAgent
results = await search_with_recoagent(text)
# Send results as adaptive card
await send_search_results(conversation_id, text, results)
Telegram Bot with Inline Keyboards
from recoagent.channels import TelegramAdapter
import asyncio
# Create Telegram adapter
telegram_adapter = TelegramAdapter(
bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
enable_inline_keyboards=True,
enable_file_sharing=True
)
# Create inline keyboard for options
def create_options_keyboard(options: List[str], callback_prefix: str = "option"):
"""Create inline keyboard for options."""
keyboard = []
for i, option in enumerate(options):
keyboard.append([{
"text": option,
"callback_data": f"{callback_prefix}_{i}"
}])
return {"inline_keyboard": keyboard}
# Handle callback queries
@telegram_adapter.callback_query_handler()
async def handle_callback_query(callback_data):
"""Handle callback query from inline keyboard."""
query_id = callback_data.get("id")
user_id = callback_data.get("from", {}).get("id")
data = callback_data.get("data")
if data.startswith("option_"):
option_index = int(data.split("_")[1])
# Process selected option
response = await process_option_selection(user_id, option_index)
# Answer callback query
await telegram_adapter.answer_callback_query(
callback_query_id=query_id,
text="Processing your selection..."
)
# Send response
await telegram_adapter.send_message(
chat_id=user_id,
text=response
)
# Send message with options
async def send_message_with_options(chat_id: str, message: str, options: List[str]):
"""Send message with inline keyboard options."""
keyboard = create_options_keyboard(options)
await telegram_adapter.send_message(
chat_id=chat_id,
text=message,
reply_markup=keyboard
)
# Handle file uploads
@telegram_adapter.document_handler()
async def handle_document_upload(document_data):
"""Handle document upload."""
file_id = document_data.get("file_id")
user_id = document_data.get("from", {}).get("id")
# Download file
file_info = await telegram_adapter.get_file(file_id)
file_content = await telegram_adapter.download_file(file_info["file_path"])
# Process file with RecoAgent
result = await process_document_with_recoagent(file_content, user_id)
# Send result
await telegram_adapter.send_message(
chat_id=user_id,
text=f"Document processed:\n{result}"
)
Webhook Integration
from recoagent.channels import WebhookAdapter
import asyncio
# Create webhook adapter
webhook_adapter = WebhookAdapter(
webhook_url="https://api.example.com/webhook",
authentication={"type": "bearer", "token": "your_token"},
retry_attempts=3,
timeout=30
)
# Send structured webhook payload
async def send_webhook_notification(event_type: str, data: Dict):
"""Send webhook notification."""
payload = {
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"data": data,
"source": "recoagent"
}
try:
response = await webhook_adapter.send_webhook(payload)
print(f"Webhook sent successfully: {response.status_code}")
except Exception as e:
print(f"Webhook failed: {str(e)}")
# Handle different event types
async def handle_recoagent_events():
"""Handle various RecoAgent events."""
events = [
{
"type": "query_processed",
"data": {
"user_id": "user_123",
"query": "What is machine learning?",
"response_time": 1.5,
"confidence": 0.9
}
},
{
"type": "user_feedback",
"data": {
"user_id": "user_123",
"query": "What is AI?",
"rating": 5,
"feedback": "Very helpful"
}
},
{
"type": "system_alert",
"data": {
"alert_type": "high_latency",
"message": "Response time exceeded threshold",
"value": 10.5,
"threshold": 5.0
}
}
]
for event in events:
await send_webhook_notification(event["type"], event["data"])
await asyncio.sleep(1) # Rate limiting
# Run webhook notifications
asyncio.run(handle_recoagent_events())
API Reference
BaseChannelAdapter Methods
send_message(channel_or_id: str, text: str, **kwargs) -> ChannelResponse
Send text message to channel
Parameters:
channel_or_id(str): Channel identifiertext(str): Message text**kwargs: Additional channel-specific parameters
Returns: ChannelResponse with delivery status
send_file(channel_or_id: str, file_path: str, **kwargs) -> ChannelResponse
Send file to channel
Parameters:
channel_or_id(str): Channel identifierfile_path(str): Path to file**kwargs: Additional parameters
Returns: ChannelResponse with delivery status
SlackAdapter Methods
send_blocks(channel: str, blocks: List[Dict]) -> ChannelResponse
Send Slack blocks (rich formatting)
Parameters:
channel(str): Slack channel IDblocks(List[Dict]): Slack block elements
Returns: ChannelResponse
send_ephemeral_message(channel: str, user: str, text: str) -> ChannelResponse
Send ephemeral message (only visible to user)
Parameters:
channel(str): Slack channel IDuser(str): User IDtext(str): Message text
Returns: ChannelResponse
TeamsAdapter Methods
send_adaptive_card(conversation_id: str, card: Dict) -> ChannelResponse
Send adaptive card
Parameters:
conversation_id(str): Teams conversation IDcard(Dict): Adaptive card definition
Returns: ChannelResponse
TelegramAdapter Methods
send_message(chat_id: str, text: str, reply_markup: Dict = None) -> ChannelResponse
Send message with optional keyboard
Parameters:
chat_id(str): Telegram chat IDtext(str): Message textreply_markup(Dict, optional): Inline or reply keyboard
Returns: ChannelResponse
answer_callback_query(callback_query_id: str, text: str) -> ChannelResponse
Answer callback query
Parameters:
callback_query_id(str): Callback query IDtext(str): Answer text
Returns: ChannelResponse
WebhookAdapter Methods
send_webhook(payload: Dict) -> ChannelResponse
Send webhook payload
Parameters:
payload(Dict): Webhook payload
Returns: ChannelResponse
See Also
- Agent Graphs - Agent integration with channels
- Conversational Search Core - Conversational interfaces
- Analytics Core - Channel usage analytics
- Security Core - Channel security