Skip to main content

Channel Adapters

Communication channel adapters providing seamless integration with Slack, Microsoft Teams, Telegram, and webhook-based systems for enterprise chatbot deployments.

Core Classes

SlackAdapter

Description: Slack workspace integration adapter

Parameters:

  • bot_token (str): Slack bot token
  • signing_secret (str): Slack signing secret
  • enable_threads (bool): Enable thread support (default: True)
  • enable_file_sharing (bool): Enable file sharing (default: True)
  • enable_slash_commands (bool): Enable slash commands (default: True)

Returns: SlackAdapter instance

Example:

from recoagent.channels import SlackAdapter

# Create Slack adapter
slack_adapter = SlackAdapter(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET"),
enable_threads=True,
enable_file_sharing=True,
enable_slash_commands=True
)

# Send message
response = await slack_adapter.send_message(
channel="C1234567890",
text="Hello from RecoAgent!",
thread_ts="1234567890.123456" # Optional: reply in thread
)

TeamsAdapter

Description: Microsoft Teams integration adapter

Parameters:

  • app_id (str): Teams app ID
  • app_password (str): Teams app password
  • tenant_id (str): Azure tenant ID
  • enable_adaptive_cards (bool): Enable adaptive cards (default: True)
  • enable_file_attachments (bool): Enable file attachments (default: True)

Returns: TeamsAdapter instance

Example:

from recoagent.channels import TeamsAdapter

# Create Teams adapter
teams_adapter = TeamsAdapter(
app_id=os.getenv("TEAMS_APP_ID"),
app_password=os.getenv("TEAMS_APP_PASSWORD"),
tenant_id=os.getenv("AZURE_TENANT_ID"),
enable_adaptive_cards=True,
enable_file_attachments=True
)

# Send adaptive card
card = {
"type": "AdaptiveCard",
"body": [
{"type": "TextBlock", "text": "RecoAgent Response", "weight": "bolder"},
{"type": "TextBlock", "text": "Here's your AI-generated response"}
]
}

response = await teams_adapter.send_adaptive_card(
conversation_id="conversation_123",
card=card
)

TelegramAdapter

Description: Telegram bot integration adapter

Parameters:

  • bot_token (str): Telegram bot token
  • webhook_url (str, optional): Webhook URL for updates
  • enable_inline_keyboards (bool): Enable inline keyboards (default: True)
  • enable_file_sharing (bool): Enable file sharing (default: True)

Returns: TelegramAdapter instance

Example:

from recoagent.channels import TelegramAdapter

# Create Telegram adapter
telegram_adapter = TelegramAdapter(
bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
webhook_url="https://your-domain.com/webhook/telegram",
enable_inline_keyboards=True,
enable_file_sharing=True
)

# Send message with inline keyboard
keyboard = [
[{"text": "Option 1", "callback_data": "option1"}],
[{"text": "Option 2", "callback_data": "option2"}]
]

response = await telegram_adapter.send_message(
chat_id="123456789",
text="Choose an option:",
reply_markup={"inline_keyboard": keyboard}
)

WebhookAdapter

Description: Generic webhook-based integration adapter

Parameters:

  • webhook_url (str): Webhook endpoint URL
  • authentication (Dict, optional): Authentication configuration
  • retry_attempts (int): Number of retry attempts (default: 3)
  • timeout (int): Request timeout in seconds (default: 30)

Returns: WebhookAdapter instance

Example:

from recoagent.channels import WebhookAdapter

# Create webhook adapter
webhook_adapter = WebhookAdapter(
webhook_url="https://api.example.com/webhook",
authentication={"type": "bearer", "token": "your_token"},
retry_attempts=3,
timeout=30
)

# Send webhook payload
payload = {
"message": "Hello from RecoAgent",
"user_id": "user_123",
"timestamp": datetime.utcnow().isoformat()
}

response = await webhook_adapter.send_webhook(payload)

Usage Examples

Multi-Channel Bot Setup

from recoagent.channels import SlackAdapter, TeamsAdapter, TelegramAdapter, WebhookAdapter
import asyncio

# Create multiple channel adapters
adapters = {
"slack": SlackAdapter(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET")
),
"teams": TeamsAdapter(
app_id=os.getenv("TEAMS_APP_ID"),
app_password=os.getenv("TEAMS_APP_PASSWORD"),
tenant_id=os.getenv("AZURE_TENANT_ID")
),
"telegram": TelegramAdapter(
bot_token=os.getenv("TELEGRAM_BOT_TOKEN")
),
"webhook": WebhookAdapter(
webhook_url=os.getenv("WEBHOOK_URL")
)
}

# Send message to all channels
async def broadcast_message(message: str, channels: List[str] = None):
"""Broadcast message to specified channels."""
if channels is None:
channels = list(adapters.keys())

results = {}
for channel in channels:
if channel in adapters:
try:
response = await adapters[channel].send_message(
channel_or_id=get_channel_id(channel),
text=message
)
results[channel] = {"success": True, "response": response}
except Exception as e:
results[channel] = {"success": False, "error": str(e)}

return results

# Broadcast to all channels
results = await broadcast_message("RecoAgent is now online!")
for channel, result in results.items():
if result["success"]:
print(f"✅ {channel}: Message sent successfully")
else:
print(f"❌ {channel}: {result['error']}")

Advanced Slack Integration

from recoagent.channels import SlackAdapter
import asyncio

# Create advanced Slack adapter
slack_adapter = SlackAdapter(
bot_token=os.getenv("SLACK_BOT_TOKEN"),
signing_secret=os.getenv("SLACK_SIGNING_SECRET"),
enable_threads=True,
enable_file_sharing=True,
enable_slash_commands=True
)

# Handle slash command
@slack_adapter.slash_command("/ask")
async def handle_ask_command(command_data):
"""Handle /ask slash command."""
query = command_data.get("text", "")
user_id = command_data.get("user_id")
channel_id = command_data.get("channel_id")

# Process query with RecoAgent
response = await process_query_with_recoagent(query, user_id)

# Send response
await slack_adapter.send_message(
channel=channel_id,
text=response,
thread_ts=command_data.get("ts") # Reply in thread
)

# Send interactive message with buttons
async def send_interactive_message(channel_id: str, query: str):
"""Send message with interactive buttons."""
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Query: *{query}*\n\nHere's your response:"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "👍 Helpful"},
"action_id": "feedback_helpful",
"value": query
},
{
"type": "button",
"text": {"type": "plain_text", "text": "👎 Not Helpful"},
"action_id": "feedback_not_helpful",
"value": query
}
]
}
]

await slack_adapter.send_blocks(
channel=channel_id,
blocks=blocks
)

# Handle button interactions
@slack_adapter.button_action("feedback_helpful")
async def handle_helpful_feedback(button_data):
"""Handle helpful feedback."""
query = button_data.get("value")
user_id = button_data.get("user_id")

# Record positive feedback
await record_feedback(query, user_id, "positive")

# Send confirmation
await slack_adapter.send_ephemeral_message(
channel=button_data.get("channel_id"),
user=user_id,
text="Thank you for your feedback! 👍"
)

Teams Adaptive Cards

from recoagent.channels import TeamsAdapter
import asyncio

# Create Teams adapter
teams_adapter = TeamsAdapter(
app_id=os.getenv("TEAMS_APP_ID"),
app_password=os.getenv("TEAMS_APP_PASSWORD"),
tenant_id=os.getenv("AZURE_TENANT_ID"),
enable_adaptive_cards=True
)

# Create adaptive card for search results
def create_search_results_card(query: str, results: List[Dict]):
"""Create adaptive card for search results."""
card = {
"type": "AdaptiveCard",
"version": "1.3",
"body": [
{
"type": "TextBlock",
"text": f"Search Results for: **{query}**",
"weight": "bolder",
"size": "medium"
},
{
"type": "Container",
"items": []
}
]
}

# Add result items
for i, result in enumerate(results[:5]): # Limit to 5 results
result_item = {
"type": "Container",
"style": "emphasis",
"items": [
{
"type": "TextBlock",
"text": result.get("title", "Untitled"),
"weight": "bolder",
"wrap": True
},
{
"type": "TextBlock",
"text": result.get("snippet", "")[:200] + "...",
"wrap": True
},
{
"type": "ActionSet",
"actions": [
{
"type": "Action.OpenUrl",
"title": "View Full Result",
"url": result.get("url", "#")
}
]
}
]
}
card["body"][1]["items"].append(result_item)

return card

# Send search results
async def send_search_results(conversation_id: str, query: str, results: List[Dict]):
"""Send search results as adaptive card."""
card = create_search_results_card(query, results)

await teams_adapter.send_adaptive_card(
conversation_id=conversation_id,
card=card
)

# Handle Teams message
@teams_adapter.message_handler()
async def handle_teams_message(message_data):
"""Handle incoming Teams message."""
text = message_data.get("text", "")
conversation_id = message_data.get("conversation_id")

# Process with RecoAgent
results = await search_with_recoagent(text)

# Send results as adaptive card
await send_search_results(conversation_id, text, results)

Telegram Bot with Inline Keyboards

from recoagent.channels import TelegramAdapter
import asyncio

# Create Telegram adapter
telegram_adapter = TelegramAdapter(
bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
enable_inline_keyboards=True,
enable_file_sharing=True
)

# Create inline keyboard for options
def create_options_keyboard(options: List[str], callback_prefix: str = "option"):
"""Create inline keyboard for options."""
keyboard = []
for i, option in enumerate(options):
keyboard.append([{
"text": option,
"callback_data": f"{callback_prefix}_{i}"
}])
return {"inline_keyboard": keyboard}

# Handle callback queries
@telegram_adapter.callback_query_handler()
async def handle_callback_query(callback_data):
"""Handle callback query from inline keyboard."""
query_id = callback_data.get("id")
user_id = callback_data.get("from", {}).get("id")
data = callback_data.get("data")

if data.startswith("option_"):
option_index = int(data.split("_")[1])
# Process selected option
response = await process_option_selection(user_id, option_index)

# Answer callback query
await telegram_adapter.answer_callback_query(
callback_query_id=query_id,
text="Processing your selection..."
)

# Send response
await telegram_adapter.send_message(
chat_id=user_id,
text=response
)

# Send message with options
async def send_message_with_options(chat_id: str, message: str, options: List[str]):
"""Send message with inline keyboard options."""
keyboard = create_options_keyboard(options)

await telegram_adapter.send_message(
chat_id=chat_id,
text=message,
reply_markup=keyboard
)

# Handle file uploads
@telegram_adapter.document_handler()
async def handle_document_upload(document_data):
"""Handle document upload."""
file_id = document_data.get("file_id")
user_id = document_data.get("from", {}).get("id")

# Download file
file_info = await telegram_adapter.get_file(file_id)
file_content = await telegram_adapter.download_file(file_info["file_path"])

# Process file with RecoAgent
result = await process_document_with_recoagent(file_content, user_id)

# Send result
await telegram_adapter.send_message(
chat_id=user_id,
text=f"Document processed:\n{result}"
)

Webhook Integration

from recoagent.channels import WebhookAdapter
import asyncio

# Create webhook adapter
webhook_adapter = WebhookAdapter(
webhook_url="https://api.example.com/webhook",
authentication={"type": "bearer", "token": "your_token"},
retry_attempts=3,
timeout=30
)

# Send structured webhook payload
async def send_webhook_notification(event_type: str, data: Dict):
"""Send webhook notification."""
payload = {
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"data": data,
"source": "recoagent"
}

try:
response = await webhook_adapter.send_webhook(payload)
print(f"Webhook sent successfully: {response.status_code}")
except Exception as e:
print(f"Webhook failed: {str(e)}")

# Handle different event types
async def handle_recoagent_events():
"""Handle various RecoAgent events."""
events = [
{
"type": "query_processed",
"data": {
"user_id": "user_123",
"query": "What is machine learning?",
"response_time": 1.5,
"confidence": 0.9
}
},
{
"type": "user_feedback",
"data": {
"user_id": "user_123",
"query": "What is AI?",
"rating": 5,
"feedback": "Very helpful"
}
},
{
"type": "system_alert",
"data": {
"alert_type": "high_latency",
"message": "Response time exceeded threshold",
"value": 10.5,
"threshold": 5.0
}
}
]

for event in events:
await send_webhook_notification(event["type"], event["data"])
await asyncio.sleep(1) # Rate limiting

# Run webhook notifications
asyncio.run(handle_recoagent_events())

API Reference

BaseChannelAdapter Methods

send_message(channel_or_id: str, text: str, **kwargs) -> ChannelResponse

Send text message to channel

Parameters:

  • channel_or_id (str): Channel identifier
  • text (str): Message text
  • **kwargs: Additional channel-specific parameters

Returns: ChannelResponse with delivery status

send_file(channel_or_id: str, file_path: str, **kwargs) -> ChannelResponse

Send file to channel

Parameters:

  • channel_or_id (str): Channel identifier
  • file_path (str): Path to file
  • **kwargs: Additional parameters

Returns: ChannelResponse with delivery status

SlackAdapter Methods

send_blocks(channel: str, blocks: List[Dict]) -> ChannelResponse

Send Slack blocks (rich formatting)

Parameters:

  • channel (str): Slack channel ID
  • blocks (List[Dict]): Slack block elements

Returns: ChannelResponse

send_ephemeral_message(channel: str, user: str, text: str) -> ChannelResponse

Send ephemeral message (only visible to user)

Parameters:

  • channel (str): Slack channel ID
  • user (str): User ID
  • text (str): Message text

Returns: ChannelResponse

TeamsAdapter Methods

send_adaptive_card(conversation_id: str, card: Dict) -> ChannelResponse

Send adaptive card

Parameters:

  • conversation_id (str): Teams conversation ID
  • card (Dict): Adaptive card definition

Returns: ChannelResponse

TelegramAdapter Methods

send_message(chat_id: str, text: str, reply_markup: Dict = None) -> ChannelResponse

Send message with optional keyboard

Parameters:

  • chat_id (str): Telegram chat ID
  • text (str): Message text
  • reply_markup (Dict, optional): Inline or reply keyboard

Returns: ChannelResponse

answer_callback_query(callback_query_id: str, text: str) -> ChannelResponse

Answer callback query

Parameters:

  • callback_query_id (str): Callback query ID
  • text (str): Answer text

Returns: ChannelResponse

WebhookAdapter Methods

send_webhook(payload: Dict) -> ChannelResponse

Send webhook payload

Parameters:

  • payload (Dict): Webhook payload

Returns: ChannelResponse

See Also