run_scheduled_tasks.py•5.86 kB
#!/usr/bin/env python3
"""
Scheduled Task Runner - Automatically execute eligible TimeLooker tasks
This script checks all active tasks and runs those that are eligible based on their frequency.
Use with cron or as a background service for automated task execution.
Usage:
# Run once (check all tasks, execute eligible ones)
python run_scheduled_tasks.py
# Cron example (every 5 minutes):
*/5 * * * * cd /path/to/MCP_Server && python run_scheduled_tasks.py
# Background service mode (check every minute):
python run_scheduled_tasks.py --daemon --interval 60
"""
import sys
import time
import argparse
from datetime import datetime
# Import our Lambda architecture components
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from scripts.lambda_function import handle_execute_search
from src.api.task_manager_client import TaskManager
from src.core.models import db_manager
from src.utils.logging_config import setup_logging
# Configure logging
logger = setup_logging(__name__)
def run_eligible_tasks():
"""
Check all active tasks and execute those that are eligible to run.
Returns:
dict: Summary of execution results
"""
try:
# Initialize database
db_manager.create_tables()
task_manager = TaskManager()
# Get all active tasks
active_tasks = task_manager.get_active_tasks()
if not active_tasks:
logger.info("No active tasks found")
return {"total_tasks": 0, "executed": 0, "skipped": 0, "errors": 0}
logger.info(f"Checking {len(active_tasks)} active tasks")
executed = 0
skipped = 0
errors = 0
for task in active_tasks:
task_id = task['id']
task_desc = task['task_description'][:50] + "..."
try:
# Check if task should run
if task_manager.should_task_run(task_id):
logger.info(f"Executing task {task_id}: {task_desc}")
# Execute the task using Lambda handler
result = handle_execute_search(task_id)
if result.get('statusCode') == 200:
body = result.get('body', '{}')
if isinstance(body, str):
import json
body = json.loads(body)
new_items = body.get('new_items_count', 0)
total_items = body.get('items_found', 0)
logger.info(f"Task {task_id} completed: {total_items} items, {new_items} new")
executed += 1
else:
logger.error(f"Task {task_id} failed: {result}")
errors += 1
else:
logger.debug(f"Task {task_id} not ready to run (frequency not met)")
skipped += 1
except Exception as e:
logger.error(f"Error executing task {task_id}: {e}")
errors += 1
summary = {
"total_tasks": len(active_tasks),
"executed": executed,
"skipped": skipped,
"errors": errors,
"timestamp": datetime.now().isoformat()
}
if executed > 0 or errors > 0:
logger.info(f"Execution summary: {executed} executed, {skipped} skipped, {errors} errors")
return summary
except Exception as e:
logger.error(f"Error in run_eligible_tasks: {e}")
return {"error": str(e)}
def daemon_mode(interval_seconds=60):
"""
Run in daemon mode, checking tasks at regular intervals.
Args:
interval_seconds: How often to check for eligible tasks
"""
logger.info(f"Starting daemon mode, checking every {interval_seconds} seconds")
logger.info("Press Ctrl+C to stop")
try:
while True:
summary = run_eligible_tasks()
if summary.get('executed', 0) > 0:
logger.info(f"Executed {summary['executed']} tasks")
time.sleep(interval_seconds)
except KeyboardInterrupt:
logger.info("Daemon mode stopped by user")
except Exception as e:
logger.error(f"Daemon mode error: {e}")
def main():
"""Main function with command line interface."""
parser = argparse.ArgumentParser(description="TimeLooker Scheduled Task Runner")
parser.add_argument(
"--daemon",
action="store_true",
help="Run in daemon mode (continuous background checking)"
)
parser.add_argument(
"--interval",
type=int,
default=60,
help="Interval in seconds for daemon mode (default: 60)"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if args.daemon:
daemon_mode(args.interval)
else:
# Single run mode
summary = run_eligible_tasks()
if 'error' in summary:
print(f"Error: {summary['error']}")
sys.exit(1)
else:
print(f"Checked {summary['total_tasks']} tasks: "
f"{summary['executed']} executed, "
f"{summary['skipped']} skipped, "
f"{summary['errors']} errors")
if summary['errors'] > 0:
sys.exit(1)
if __name__ == "__main__":
main()