bot.pyā¢9.47 kB
#!/usr/bin/env python3
"""
Simple Telegram Bot for Agno Agent Integration
Forwards user messages to the expense tracker agent and returns responses
"""
import os
import asyncio
import logging
import requests
from typing import Optional
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Configuration
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
AGNO_API_URL = os.getenv("AGNO_API_URL", "http://localhost:7777")
AGENT_ID = os.getenv("AGENT_ID", "expense-tracker-agent")
DEFAULT_USER_ID = os.getenv("DEFAULT_USER_ID", "MUHAMMAD")
# Setup logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
class AgnoAgentClient:
"""Client for communicating with Agno Agent API"""
def __init__(self, api_url: str, agent_id: str):
self.api_url = api_url.rstrip('/')
self.agent_id = agent_id
def send_message(self, message: str, user_id: str, session_id: Optional[str] = None) -> dict:
"""Send message to Agno agent and get response"""
endpoint = f"{self.api_url}/agents/{self.agent_id}/runs"
# Prepare form data
data = {
'message': message,
'stream': 'false',
'user_id': user_id,
}
# Add session_id if provided
if session_id:
data['session_id'] = session_id
else:
data['session_id'] = f"telegram_{user_id}"
try:
logger.info(f"Sending request to {endpoint}")
logger.info(f"Data: {data}")
response = requests.post(
endpoint,
data=data,
timeout=30
)
logger.info(f"Response status: {response.status_code}")
if response.status_code == 200:
result = response.json()
logger.info(f"Response received: {result.get('status', 'unknown')}")
return result
else:
logger.error(f"API error: {response.status_code} - {response.text}")
return {
"error": f"API returned status {response.status_code}",
"details": response.text
}
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
return {
"error": "Failed to connect to Agno agent",
"details": str(e)
}
# Initialize Agno client
agno_client = AgnoAgentClient(AGNO_API_URL, AGENT_ID)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command"""
welcome_message = """
š¤ **Expense Tracker Bot**
Hi! I'm your personal expense tracking assistant. I can help you:
⢠š° Track your income and expenses
⢠š Get spending summaries and analytics
⢠š Search through your transactions
⢠š View category breakdowns
Just send me a message like:
- "What was my last expense?"
- "Show me my spending summary"
- "Add a $50 grocery expense"
- "How much did I spend on food this month?"
Let's get started! š
"""
await update.message.reply_text(
welcome_message,
parse_mode='Markdown'
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /help command"""
help_text = """
š **Help - Expense Tracker Bot**
**Available Commands:**
⢠`/start` - Welcome message and introduction
⢠`/help` - Show this help message
⢠`/status` - Check bot and agent status
**Example Messages:**
⢠"What's my balance?"
⢠"Show last 5 transactions"
⢠"Add $25 coffee expense"
⢠"How much did I spend on transportation?"
⢠"Delete my last transaction"
⢠"Show spending by category"
**Tips:**
⢠Be specific with your requests
⢠Include amounts with currency symbols ($25, ā¬50)
⢠Mention categories (food, transport, entertainment)
⢠Use natural language - I understand context!
Need more help? Just ask me anything! š¬
"""
await update.message.reply_text(
help_text,
parse_mode='Markdown'
)
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /status command - check if agent is accessible"""
try:
# Try to get agent health status
health_url = f"{AGNO_API_URL}/health"
response = requests.get(health_url, timeout=5)
if response.status_code == 200:
status_message = """
ā
**Status: All Systems Operational**
š¤ Bot: Online
š Agno Agent: Connected
š Expense Tracker: Ready
You can start asking questions about your expenses!
"""
else:
status_message = """
ā ļø **Status: Partial Service**
š¤ Bot: Online
š Agno Agent: Connection issues
š Expense Tracker: Unknown
Please try again later or contact support.
"""
except Exception as e:
status_message = f"""
ā **Status: Service Unavailable**
š¤ Bot: Online
š Agno Agent: Offline
š Expense Tracker: Unavailable
Error: {str(e)[:100]}...
"""
await update.message.reply_text(
status_message,
parse_mode='Markdown'
)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle regular messages by forwarding to Agno agent"""
user_message = update.message.text
user_id = str(update.effective_user.id)
username = update.effective_user.username or update.effective_user.first_name or "User"
logger.info(f"Received message from {username} ({user_id}): {user_message}")
# Show typing indicator
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
try:
# Send message to Agno agent
response = agno_client.send_message(
message=user_message,
user_id=f"{DEFAULT_USER_ID}_{user_id}", # Combine default user with telegram user ID
session_id=f"telegram_{user_id}"
)
if "error" in response:
# Handle API errors
error_message = f"ā **Error:** {response['error']}\n\n"
if "details" in response:
error_message += f"Details: {response['details'][:200]}..."
await update.message.reply_text(
error_message,
parse_mode='Markdown'
)
else:
# Extract and send the agent's response
agent_response = response.get('content', 'No response from agent')
# Add some metadata if available
if 'metrics' in response:
metrics = response['metrics']
duration = metrics.get('duration', 0)
tokens = metrics.get('total_tokens', 0)
# Add a small footer with metrics (optional)
if duration > 0:
agent_response += f"\n\n_Response time: {duration:.1f}s"
if tokens > 0:
agent_response += f" ⢠Tokens: {tokens}_"
# Send the response
await update.message.reply_text(
agent_response,
parse_mode='Markdown'
)
logger.info(f"Response sent to {username}")
except Exception as e:
logger.error(f"Error handling message: {e}")
await update.message.reply_text(
f"ā **Sorry, something went wrong!**\n\nError: {str(e)[:100]}...\n\nPlease try again later.",
parse_mode='Markdown'
)
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle errors"""
logger.error(f"Exception while handling an update: {context.error}")
def main():
"""Start the bot"""
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN environment variable is required!")
print("ā Error: Please set TELEGRAM_BOT_TOKEN environment variable")
print(" Get your token from @BotFather on Telegram")
return
logger.info("Starting Expense Tracker Telegram Bot...")
logger.info(f"Agno API URL: {AGNO_API_URL}")
logger.info(f"Agent ID: {AGENT_ID}")
# Create application
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("status", status))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Add error handler
application.add_error_handler(error_handler)
# Start the bot
print("š¤ Expense Tracker Telegram Bot is starting...")
print(f"š Connected to Agno Agent at: {AGNO_API_URL}")
print(f"š Using agent: {AGENT_ID}")
print("š Bot is ready! Send /start to begin.")
print("š Press Ctrl+C to stop")
# Run the bot
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()