#!/usr/bin/env python3
"""
IRIS Bot Manager
Unified script to manage the IRIS Telegram bot lifecycle
"""
import os
import sys
import time
import signal
import asyncio
import argparse
from pathlib import Path
from datetime import datetime
from typing import Optional
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
# Colors for terminal output
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
MAGENTA = '\033[0;35m'
CYAN = '\033[0;36m'
BOLD = '\033[1m'
NC = '\033[0m' # No Color
def print_header(text: str):
"""Print a formatted header"""
print(f"\n{'━' * 60}")
print(f"{Colors.BOLD}{text}{Colors.NC}")
print(f"{'━' * 60}\n")
def print_success(text: str):
"""Print success message"""
print(f"{Colors.GREEN}✓ {text}{Colors.NC}")
def print_error(text: str):
"""Print error message"""
print(f"{Colors.RED}✗ {text}{Colors.NC}")
def print_warning(text: str):
"""Print warning message"""
print(f"{Colors.YELLOW}⚠️ {text}{Colors.NC}")
def print_info(text: str):
"""Print info message"""
print(f"{Colors.BLUE}ℹ {text}{Colors.NC}")
class BotManager:
def __init__(self):
self.project_root = PROJECT_ROOT
self.bot_log = self.project_root / "bot.log"
self.bot_pid_file = self.project_root / ".bot.pid"
self.env_file = self.project_root / ".env"
def get_bot_pid(self) -> Optional[int]:
"""Get bot PID from file"""
if not self.bot_pid_file.exists():
return None
try:
return int(self.bot_pid_file.read_text().strip())
except:
return None
def is_bot_running(self, pid: int) -> bool:
"""Check if bot process is running"""
try:
os.kill(pid, 0)
return True
except OSError:
return False
def check_environment(self) -> bool:
"""Check environment configuration"""
print_info("Checking environment configuration...")
if not self.env_file.exists():
print_error(".env file not found!")
return False
print_success(".env file found")
# Load .env
from dotenv import load_dotenv
load_dotenv(self.env_file)
# Check required variables
required_vars = [
"TELEGRAM_BOT_TOKEN",
"ANTHROPIC_API_KEY",
"OPENAI_API_KEY",
"REDIS_HOST"
]
missing = []
for var in required_vars:
if not os.getenv(var):
missing.append(var)
if missing:
print_error(f"Missing required environment variables: {', '.join(missing)}")
return False
print_success("All required environment variables set")
return True
async def check_redis(self) -> bool:
"""Check Redis connection"""
print_info("Checking Redis connection...")
from dotenv import load_dotenv
load_dotenv(self.env_file)
redis_host = os.getenv("REDIS_HOST", "localhost")
redis_port = int(os.getenv("REDIS_PORT", "6379"))
redis_db = int(os.getenv("REDIS_DB", "0"))
try:
import redis.asyncio as redis
client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_connect_timeout=3
)
await client.ping()
info = await client.info("server")
keys_count = await client.dbsize()
await client.aclose()
print_success(f"Redis connected at {redis_host}:{redis_port}/{redis_db}")
print(f" Version: {info.get('redis_version')}")
print(f" Keys: {keys_count}")
return True
except Exception as e:
print_error(f"Cannot connect to Redis at {redis_host}:{redis_port}")
print(f" Error: {e}")
return False
def rotate_log_if_needed(self):
"""Rotate log file if too large (>10MB)"""
if not self.bot_log.exists():
return
size = self.bot_log.stat().st_size
if size > 10 * 1024 * 1024: # 10MB
print_warning("Log file too large, rotating...")
old_log = self.bot_log.with_suffix(".log.old")
self.bot_log.rename(old_log)
print_success(f"Log rotated to {old_log}")
def start(self, force: bool = False):
"""Start the bot"""
print_header("🚀 IRIS BOT - STARTUP")
# Check if already running
pid = self.get_bot_pid()
if pid and self.is_bot_running(pid):
print_warning(f"Bot is already running (PID: {pid})")
if not force:
response = input("Do you want to restart it? (y/n): ")
if response.lower() != 'y':
print_info("Keeping existing bot process")
return
print_warning("Stopping old bot process...")
self.stop(quiet=True)
time.sleep(2)
# Environment checks
if not self.check_environment():
sys.exit(1)
# Redis check
if not asyncio.run(self.check_redis()):
sys.exit(1)
# Rotate log if needed
self.rotate_log_if_needed()
print_success(f"Log file: {self.bot_log}")
# Start the bot
print_info("Starting IRIS bot...")
import subprocess
log_file = open(self.bot_log, 'a')
process = subprocess.Popen(
[sys.executable, "-m", "src.telegram_bot.bot"],
cwd=self.project_root,
stdout=log_file,
stderr=subprocess.STDOUT,
start_new_session=True
)
# Save PID
self.bot_pid_file.write_text(str(process.pid))
# Wait and check if process is still running
time.sleep(3)
if self.is_bot_running(process.pid):
print_success("Bot started successfully!")
from dotenv import load_dotenv
load_dotenv(self.env_file)
print_header("✅ IRIS BOT IS RUNNING")
print(f"\n{Colors.CYAN}📊 Bot Information:{Colors.NC}")
print(f" • PID: {process.pid}")
print(f" • Redis: {os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}")
print(f" • Log: {self.bot_log}")
print(f" • Provider: {os.getenv('LLM_PROVIDER', 'anthropic')}")
print(f"\n{Colors.CYAN}🛠️ Useful Commands:{Colors.NC}")
print(f" • View logs: python scripts/bot_manager.py logs")
print(f" • Check status: python scripts/bot_manager.py status")
print(f" • Stop bot: python scripts/bot_manager.py stop")
print()
else:
print_error("Bot failed to start!")
print(f"Check logs: tail -n 50 {self.bot_log}")
self.bot_pid_file.unlink(missing_ok=True)
sys.exit(1)
def stop(self, quiet: bool = False):
"""Stop the bot"""
if not quiet:
print_header("🛑 IRIS BOT - STOP")
pid = self.get_bot_pid()
if not pid:
if not quiet:
print_warning("No PID file found")
print_info("Checking for running bot processes...")
# Try to find process manually
import subprocess
try:
result = subprocess.run(
["pgrep", "-f", "src.telegram_bot.bot"],
capture_output=True,
text=True
)
if result.returncode == 0:
pids = result.stdout.strip().split('\n')
print_warning(f"Found bot processes: {', '.join(pids)}")
if not quiet:
response = input("Kill these processes? (y/n): ")
if response.lower() != 'y':
return
for pid_str in pids:
try:
os.kill(int(pid_str), signal.SIGTERM)
print_success(f"Killed process {pid_str}")
except:
pass
else:
if not quiet:
print_success("No bot processes running")
except FileNotFoundError:
# pgrep not available, manual check
if not quiet:
print_success("No bot processes found")
return
if not quiet:
print_info(f"Stopping bot (PID: {pid})...")
if self.is_bot_running(pid):
# Graceful shutdown
try:
os.kill(pid, signal.SIGTERM)
except:
pass
# Wait up to 10 seconds
for _ in range(10):
if not self.is_bot_running(pid):
break
time.sleep(1)
# Force kill if still running
if self.is_bot_running(pid):
if not quiet:
print_warning("Force killing bot...")
try:
os.kill(pid, signal.SIGKILL)
except:
pass
if not quiet:
print_success("Bot stopped successfully")
else:
if not quiet:
print_warning("Process not running (stale PID file)")
self.bot_pid_file.unlink(missing_ok=True)
async def status(self):
"""Show bot status"""
print_header("📊 IRIS BOT - STATUS CHECK")
# Load environment
from dotenv import load_dotenv
load_dotenv(self.env_file)
# Check bot process
print(f"{Colors.BLUE}🤖 Bot Process:{Colors.NC}")
pid = self.get_bot_pid()
if pid and self.is_bot_running(pid):
# Get process info
try:
import subprocess
uptime = subprocess.check_output(
["ps", "-p", str(pid), "-o", "etime="],
text=True
).strip()
mem = subprocess.check_output(
["ps", "-p", str(pid), "-o", "rss="],
text=True
).strip()
mem_mb = int(mem) / 1024
print_success("RUNNING")
print(f" PID: {pid}")
print(f" Uptime: {uptime}")
print(f" Memory: {mem_mb:.1f} MB")
except:
print_success("RUNNING")
print(f" PID: {pid}")
else:
print_error("NOT RUNNING")
if pid:
self.bot_pid_file.unlink(missing_ok=True)
print()
# Check Redis
print(f"{Colors.BLUE}💾 Redis Connection:{Colors.NC}")
redis_ok = await self.check_redis()
print()
# Check API keys
print(f"{Colors.BLUE}🔑 API Keys:{Colors.NC}")
keys = {
"Telegram": os.getenv("TELEGRAM_BOT_TOKEN"),
"Anthropic": os.getenv("ANTHROPIC_API_KEY"),
"OpenAI": os.getenv("OPENAI_API_KEY")
}
for name, key in keys.items():
if key:
print(f" {name}: {Colors.GREEN}✓{Colors.NC}")
else:
print(f" {name}: {Colors.RED}✗{Colors.NC}")
print()
# Log file info
print(f"{Colors.BLUE}📝 Log File:{Colors.NC}")
if self.bot_log.exists():
size = self.bot_log.stat().st_size
size_mb = size / (1024 * 1024)
lines = len(self.bot_log.read_text().splitlines())
print(f" Path: {self.bot_log}")
print(f" Size: {size_mb:.2f} MB ({lines} lines)")
# Check for recent errors
log_content = self.bot_log.read_text()
error_lines = [l for l in log_content.splitlines() if 'error' in l.lower() or 'exception' in l.lower()]
if error_lines:
last_error = error_lines[-1]
print(f" {Colors.YELLOW}Last Error:{Colors.NC}")
print(f" {last_error[:100]}")
else:
print(" No log file found")
print()
# Configuration
print(f"{Colors.BLUE}⚙️ Configuration:{Colors.NC}")
print(f" LLM Provider: {os.getenv('LLM_PROVIDER', 'anthropic')}")
print(f" Environment: {os.getenv('ENVIRONMENT', 'development')}")
print(f" Log Level: {os.getenv('LOG_LEVEL', 'INFO')}")
print()
def logs(self, mode: str = "follow", lines: int = 50):
"""View bot logs"""
if not self.bot_log.exists():
print_error(f"Log file not found: {self.bot_log}")
sys.exit(1)
if mode == "follow":
print_info("Following bot logs (Ctrl+C to stop)...")
print("━" * 60)
import subprocess
try:
subprocess.run(["tail", "-f", str(self.bot_log)])
except KeyboardInterrupt:
print("\n" + "━" * 60)
print_info("Stopped following logs")
elif mode == "last":
print_info(f"Last {lines} lines of bot logs:")
print("━" * 60)
log_lines = self.bot_log.read_text().splitlines()
for line in log_lines[-lines:]:
print(line)
elif mode == "errors":
print_info("Errors in bot logs:")
print("━" * 60)
log_lines = self.bot_log.read_text().splitlines()
error_lines = [l for l in log_lines if 'error' in l.lower() or 'exception' in l.lower() or 'failed' in l.lower()]
for line in error_lines[-20:]:
print(line)
elif mode == "clear":
print_warning("Clearing log file...")
self.bot_log.write_text("")
print_success("Log file cleared")
def main():
parser = argparse.ArgumentParser(
description="IRIS Bot Manager - Manage the IRIS Telegram bot lifecycle",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python scripts/bot_manager.py start # Start the bot
python scripts/bot_manager.py stop # Stop the bot
python scripts/bot_manager.py status # Check bot status
python scripts/bot_manager.py logs # Follow logs in real-time
python scripts/bot_manager.py logs last 100 # Show last 100 lines
python scripts/bot_manager.py logs errors # Show only errors
"""
)
parser.add_argument(
"command",
choices=["start", "stop", "status", "logs", "restart"],
help="Command to execute"
)
parser.add_argument(
"subcommand",
nargs="?",
choices=["follow", "last", "errors", "clear"],
default="follow",
help="Logs subcommand (only with 'logs' command)"
)
parser.add_argument(
"-n", "--lines",
type=int,
default=50,
help="Number of lines to show (for 'logs last')"
)
parser.add_argument(
"-f", "--force",
action="store_true",
help="Force restart without confirmation"
)
args = parser.parse_args()
manager = BotManager()
try:
if args.command == "start":
manager.start(force=args.force)
elif args.command == "stop":
manager.stop()
elif args.command == "restart":
print_header("🔄 IRIS BOT - RESTART")
manager.stop(quiet=True)
time.sleep(2)
manager.start(force=True)
elif args.command == "status":
asyncio.run(manager.status())
elif args.command == "logs":
manager.logs(mode=args.subcommand, lines=args.lines)
except KeyboardInterrupt:
print("\n\nInterrupted by user")
sys.exit(0)
except Exception as e:
print_error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()