Skip to main content
Glama
bot_manager.py16.7 kB
#!/usr/bin/env python3 """ IRIS Bot Manager Unified script to manage the IRIS Telegram bot lifecycle """ import os import sys import time import signal import asyncio import argparse from pathlib import Path from datetime import datetime from typing import Optional # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) # Colors for terminal output class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' MAGENTA = '\033[0;35m' CYAN = '\033[0;36m' BOLD = '\033[1m' NC = '\033[0m' # No Color def print_header(text: str): """Print a formatted header""" print(f"\n{'━' * 60}") print(f"{Colors.BOLD}{text}{Colors.NC}") print(f"{'━' * 60}\n") def print_success(text: str): """Print success message""" print(f"{Colors.GREEN}✓ {text}{Colors.NC}") def print_error(text: str): """Print error message""" print(f"{Colors.RED}✗ {text}{Colors.NC}") def print_warning(text: str): """Print warning message""" print(f"{Colors.YELLOW}⚠️ {text}{Colors.NC}") def print_info(text: str): """Print info message""" print(f"{Colors.BLUE}ℹ {text}{Colors.NC}") class BotManager: def __init__(self): self.project_root = PROJECT_ROOT self.bot_log = self.project_root / "bot.log" self.bot_pid_file = self.project_root / ".bot.pid" self.env_file = self.project_root / ".env" def get_bot_pid(self) -> Optional[int]: """Get bot PID from file""" if not self.bot_pid_file.exists(): return None try: return int(self.bot_pid_file.read_text().strip()) except: return None def is_bot_running(self, pid: int) -> bool: """Check if bot process is running""" try: os.kill(pid, 0) return True except OSError: return False def check_environment(self) -> bool: """Check environment configuration""" print_info("Checking environment configuration...") if not self.env_file.exists(): print_error(".env file not found!") return False print_success(".env file found") # Load .env from dotenv import load_dotenv load_dotenv(self.env_file) # Check required variables required_vars = [ "TELEGRAM_BOT_TOKEN", "ANTHROPIC_API_KEY", "OPENAI_API_KEY", "REDIS_HOST" ] missing = [] for var in required_vars: if not os.getenv(var): missing.append(var) if missing: print_error(f"Missing required environment variables: {', '.join(missing)}") return False print_success("All required environment variables set") return True async def check_redis(self) -> bool: """Check Redis connection""" print_info("Checking Redis connection...") from dotenv import load_dotenv load_dotenv(self.env_file) redis_host = os.getenv("REDIS_HOST", "localhost") redis_port = int(os.getenv("REDIS_PORT", "6379")) redis_db = int(os.getenv("REDIS_DB", "0")) try: import redis.asyncio as redis client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, socket_connect_timeout=3 ) await client.ping() info = await client.info("server") keys_count = await client.dbsize() await client.aclose() print_success(f"Redis connected at {redis_host}:{redis_port}/{redis_db}") print(f" Version: {info.get('redis_version')}") print(f" Keys: {keys_count}") return True except Exception as e: print_error(f"Cannot connect to Redis at {redis_host}:{redis_port}") print(f" Error: {e}") return False def rotate_log_if_needed(self): """Rotate log file if too large (>10MB)""" if not self.bot_log.exists(): return size = self.bot_log.stat().st_size if size > 10 * 1024 * 1024: # 10MB print_warning("Log file too large, rotating...") old_log = self.bot_log.with_suffix(".log.old") self.bot_log.rename(old_log) print_success(f"Log rotated to {old_log}") def start(self, force: bool = False): """Start the bot""" print_header("🚀 IRIS BOT - STARTUP") # Check if already running pid = self.get_bot_pid() if pid and self.is_bot_running(pid): print_warning(f"Bot is already running (PID: {pid})") if not force: response = input("Do you want to restart it? (y/n): ") if response.lower() != 'y': print_info("Keeping existing bot process") return print_warning("Stopping old bot process...") self.stop(quiet=True) time.sleep(2) # Environment checks if not self.check_environment(): sys.exit(1) # Redis check if not asyncio.run(self.check_redis()): sys.exit(1) # Rotate log if needed self.rotate_log_if_needed() print_success(f"Log file: {self.bot_log}") # Start the bot print_info("Starting IRIS bot...") import subprocess log_file = open(self.bot_log, 'a') process = subprocess.Popen( [sys.executable, "-m", "src.telegram_bot.bot"], cwd=self.project_root, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True ) # Save PID self.bot_pid_file.write_text(str(process.pid)) # Wait and check if process is still running time.sleep(3) if self.is_bot_running(process.pid): print_success("Bot started successfully!") from dotenv import load_dotenv load_dotenv(self.env_file) print_header("✅ IRIS BOT IS RUNNING") print(f"\n{Colors.CYAN}📊 Bot Information:{Colors.NC}") print(f" • PID: {process.pid}") print(f" • Redis: {os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}") print(f" • Log: {self.bot_log}") print(f" • Provider: {os.getenv('LLM_PROVIDER', 'anthropic')}") print(f"\n{Colors.CYAN}🛠️ Useful Commands:{Colors.NC}") print(f" • View logs: python scripts/bot_manager.py logs") print(f" • Check status: python scripts/bot_manager.py status") print(f" • Stop bot: python scripts/bot_manager.py stop") print() else: print_error("Bot failed to start!") print(f"Check logs: tail -n 50 {self.bot_log}") self.bot_pid_file.unlink(missing_ok=True) sys.exit(1) def stop(self, quiet: bool = False): """Stop the bot""" if not quiet: print_header("🛑 IRIS BOT - STOP") pid = self.get_bot_pid() if not pid: if not quiet: print_warning("No PID file found") print_info("Checking for running bot processes...") # Try to find process manually import subprocess try: result = subprocess.run( ["pgrep", "-f", "src.telegram_bot.bot"], capture_output=True, text=True ) if result.returncode == 0: pids = result.stdout.strip().split('\n') print_warning(f"Found bot processes: {', '.join(pids)}") if not quiet: response = input("Kill these processes? (y/n): ") if response.lower() != 'y': return for pid_str in pids: try: os.kill(int(pid_str), signal.SIGTERM) print_success(f"Killed process {pid_str}") except: pass else: if not quiet: print_success("No bot processes running") except FileNotFoundError: # pgrep not available, manual check if not quiet: print_success("No bot processes found") return if not quiet: print_info(f"Stopping bot (PID: {pid})...") if self.is_bot_running(pid): # Graceful shutdown try: os.kill(pid, signal.SIGTERM) except: pass # Wait up to 10 seconds for _ in range(10): if not self.is_bot_running(pid): break time.sleep(1) # Force kill if still running if self.is_bot_running(pid): if not quiet: print_warning("Force killing bot...") try: os.kill(pid, signal.SIGKILL) except: pass if not quiet: print_success("Bot stopped successfully") else: if not quiet: print_warning("Process not running (stale PID file)") self.bot_pid_file.unlink(missing_ok=True) async def status(self): """Show bot status""" print_header("📊 IRIS BOT - STATUS CHECK") # Load environment from dotenv import load_dotenv load_dotenv(self.env_file) # Check bot process print(f"{Colors.BLUE}🤖 Bot Process:{Colors.NC}") pid = self.get_bot_pid() if pid and self.is_bot_running(pid): # Get process info try: import subprocess uptime = subprocess.check_output( ["ps", "-p", str(pid), "-o", "etime="], text=True ).strip() mem = subprocess.check_output( ["ps", "-p", str(pid), "-o", "rss="], text=True ).strip() mem_mb = int(mem) / 1024 print_success("RUNNING") print(f" PID: {pid}") print(f" Uptime: {uptime}") print(f" Memory: {mem_mb:.1f} MB") except: print_success("RUNNING") print(f" PID: {pid}") else: print_error("NOT RUNNING") if pid: self.bot_pid_file.unlink(missing_ok=True) print() # Check Redis print(f"{Colors.BLUE}💾 Redis Connection:{Colors.NC}") redis_ok = await self.check_redis() print() # Check API keys print(f"{Colors.BLUE}🔑 API Keys:{Colors.NC}") keys = { "Telegram": os.getenv("TELEGRAM_BOT_TOKEN"), "Anthropic": os.getenv("ANTHROPIC_API_KEY"), "OpenAI": os.getenv("OPENAI_API_KEY") } for name, key in keys.items(): if key: print(f" {name}: {Colors.GREEN}✓{Colors.NC}") else: print(f" {name}: {Colors.RED}✗{Colors.NC}") print() # Log file info print(f"{Colors.BLUE}📝 Log File:{Colors.NC}") if self.bot_log.exists(): size = self.bot_log.stat().st_size size_mb = size / (1024 * 1024) lines = len(self.bot_log.read_text().splitlines()) print(f" Path: {self.bot_log}") print(f" Size: {size_mb:.2f} MB ({lines} lines)") # Check for recent errors log_content = self.bot_log.read_text() error_lines = [l for l in log_content.splitlines() if 'error' in l.lower() or 'exception' in l.lower()] if error_lines: last_error = error_lines[-1] print(f" {Colors.YELLOW}Last Error:{Colors.NC}") print(f" {last_error[:100]}") else: print(" No log file found") print() # Configuration print(f"{Colors.BLUE}⚙️ Configuration:{Colors.NC}") print(f" LLM Provider: {os.getenv('LLM_PROVIDER', 'anthropic')}") print(f" Environment: {os.getenv('ENVIRONMENT', 'development')}") print(f" Log Level: {os.getenv('LOG_LEVEL', 'INFO')}") print() def logs(self, mode: str = "follow", lines: int = 50): """View bot logs""" if not self.bot_log.exists(): print_error(f"Log file not found: {self.bot_log}") sys.exit(1) if mode == "follow": print_info("Following bot logs (Ctrl+C to stop)...") print("━" * 60) import subprocess try: subprocess.run(["tail", "-f", str(self.bot_log)]) except KeyboardInterrupt: print("\n" + "━" * 60) print_info("Stopped following logs") elif mode == "last": print_info(f"Last {lines} lines of bot logs:") print("━" * 60) log_lines = self.bot_log.read_text().splitlines() for line in log_lines[-lines:]: print(line) elif mode == "errors": print_info("Errors in bot logs:") print("━" * 60) log_lines = self.bot_log.read_text().splitlines() error_lines = [l for l in log_lines if 'error' in l.lower() or 'exception' in l.lower() or 'failed' in l.lower()] for line in error_lines[-20:]: print(line) elif mode == "clear": print_warning("Clearing log file...") self.bot_log.write_text("") print_success("Log file cleared") def main(): parser = argparse.ArgumentParser( description="IRIS Bot Manager - Manage the IRIS Telegram bot lifecycle", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python scripts/bot_manager.py start # Start the bot python scripts/bot_manager.py stop # Stop the bot python scripts/bot_manager.py status # Check bot status python scripts/bot_manager.py logs # Follow logs in real-time python scripts/bot_manager.py logs last 100 # Show last 100 lines python scripts/bot_manager.py logs errors # Show only errors """ ) parser.add_argument( "command", choices=["start", "stop", "status", "logs", "restart"], help="Command to execute" ) parser.add_argument( "subcommand", nargs="?", choices=["follow", "last", "errors", "clear"], default="follow", help="Logs subcommand (only with 'logs' command)" ) parser.add_argument( "-n", "--lines", type=int, default=50, help="Number of lines to show (for 'logs last')" ) parser.add_argument( "-f", "--force", action="store_true", help="Force restart without confirmation" ) args = parser.parse_args() manager = BotManager() try: if args.command == "start": manager.start(force=args.force) elif args.command == "stop": manager.stop() elif args.command == "restart": print_header("🔄 IRIS BOT - RESTART") manager.stop(quiet=True) time.sleep(2) manager.start(force=True) elif args.command == "status": asyncio.run(manager.status()) elif args.command == "logs": manager.logs(mode=args.subcommand, lines=args.lines) except KeyboardInterrupt: print("\n\nInterrupted by user") sys.exit(0) except Exception as e: print_error(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ilvolodel/iris-legacy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server