"""Collection scheduler implementation."""
import asyncio
import logging
import json
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from src.collectors.base_collector import BaseCollector
class SchedulerError(Exception):
"""Scheduler specific error."""
pass
@dataclass
class ScheduleInfo:
"""Schedule information dataclass."""
name: str
collector_name: str
interval: int
keywords: List[str]
limit: Optional[int] = None
priority: str = "medium"
retry_attempts: int = 1
retry_delay: float = 1.0
paused: bool = False
next_run: Optional[datetime] = None
last_run: Optional[datetime] = None
last_result_count: int = 0
failure_count: int = 0
class CollectionScheduler:
"""Collection scheduler for automated news gathering."""
def __init__(self, collectors: Optional[List[BaseCollector]] = None):
"""Initialize collection scheduler.
Args:
collectors: List of collectors to use
"""
self.collectors = collectors or []
self.schedules: Dict[str, ScheduleInfo] = {}
self.is_running = False
self._stop_event = asyncio.Event()
self._running_tasks: Dict[str, asyncio.Task] = {}
# Statistics tracking
self.stats = {
"total_collections": 0,
"successful_collections": 0,
"failed_collections": 0,
"scheduler_start_time": None
}
self.logger = logging.getLogger("scheduler")
def add_collector(self, collector: BaseCollector) -> None:
"""Add a collector to the scheduler.
Args:
collector: Collector instance to add
"""
if collector not in self.collectors:
self.collectors.append(collector)
self.logger.info(f"Added collector: {collector.source_name}")
def remove_collector(self, collector: BaseCollector) -> None:
"""Remove a collector from the scheduler.
Args:
collector: Collector instance to remove
"""
if collector in self.collectors:
self.collectors.remove(collector)
self.logger.info(f"Removed collector: {collector.source_name}")
def add_schedule(self, name: str, config: Dict[str, Any]) -> None:
"""Add a collection schedule.
Args:
name: Schedule name
config: Schedule configuration
"""
if not self._validate_schedule_config(config):
raise SchedulerError(f"Invalid schedule configuration for {name}")
schedule_info = ScheduleInfo(
name=name,
collector_name=config["collector_name"],
interval=config["interval"],
keywords=config.get("keywords", []),
limit=config.get("limit"),
priority=config.get("priority", "medium"),
retry_attempts=config.get("retry_attempts", 1),
retry_delay=config.get("retry_delay", 1.0),
next_run=self._calculate_next_run_time(config["interval"])
)
self.schedules[name] = schedule_info
self.logger.info(f"Added schedule: {name}")
def remove_schedule(self, name: str) -> None:
"""Remove a collection schedule.
Args:
name: Schedule name to remove
"""
if name in self.schedules:
del self.schedules[name]
self.logger.info(f"Removed schedule: {name}")
def update_schedule(self, name: str, config: Dict[str, Any]) -> None:
"""Update an existing schedule.
Args:
name: Schedule name
config: New schedule configuration
"""
if name not in self.schedules:
raise SchedulerError(f"Schedule {name} not found")
if not self._validate_schedule_config(config):
raise SchedulerError(f"Invalid schedule configuration for {name}")
# Update existing schedule
schedule = self.schedules[name]
schedule.collector_name = config["collector_name"]
schedule.interval = config["interval"]
schedule.keywords = config.get("keywords", [])
schedule.limit = config.get("limit")
schedule.priority = config.get("priority", "medium")
schedule.retry_attempts = config.get("retry_attempts", 1)
schedule.retry_delay = config.get("retry_delay", 1.0)
# Recalculate next run time if interval changed
schedule.next_run = self._calculate_next_run_time(schedule.interval)
self.logger.info(f"Updated schedule: {name}")
def pause_schedule(self, name: str) -> None:
"""Pause a schedule.
Args:
name: Schedule name to pause
"""
if name in self.schedules:
self.schedules[name].paused = True
self.logger.info(f"Paused schedule: {name}")
def resume_schedule(self, name: str) -> None:
"""Resume a paused schedule.
Args:
name: Schedule name to resume
"""
if name in self.schedules:
self.schedules[name].paused = False
# Recalculate next run time
self.schedules[name].next_run = self._calculate_next_run_time(
self.schedules[name].interval
)
self.logger.info(f"Resumed schedule: {name}")
async def start(self) -> None:
"""Start the scheduler."""
if self.is_running:
self.logger.warning("Scheduler is already running")
return
self.is_running = True
self.stats["scheduler_start_time"] = datetime.now(timezone.utc)
self._stop_event.clear()
self.logger.info("Starting collection scheduler")
try:
while not self._stop_event.is_set():
# Check for due schedules
await self._check_and_run_due_schedules()
# Wait before next check (1 second intervals)
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=1.0)
break # Stop event was set
except asyncio.TimeoutError:
continue # Continue checking schedules
except Exception as e:
self.logger.error(f"Scheduler error: {e}")
finally:
# Wait for running tasks to complete
if self._running_tasks:
await self._wait_for_running_tasks()
self.is_running = False
self.logger.info("Collection scheduler stopped")
async def stop(self) -> None:
"""Stop the scheduler."""
if not self.is_running:
return
self.logger.info("Stopping collection scheduler")
self._stop_event.set()
# Cancel running tasks
for task in self._running_tasks.values():
if not task.done():
task.cancel()
# Wait for tasks to complete or be cancelled
if self._running_tasks:
await asyncio.gather(*self._running_tasks.values(), return_exceptions=True)
self._running_tasks.clear()
async def _check_and_run_due_schedules(self) -> None:
"""Check for due schedules and run them."""
for name, schedule in self.schedules.items():
if (not schedule.paused and
schedule.next_run and
self._is_schedule_due(schedule.next_run) and
name not in self._running_tasks):
# Create task for this collection
task = asyncio.create_task(
self._run_scheduled_collection(name, schedule)
)
self._running_tasks[name] = task
async def _run_scheduled_collection(self, name: str, schedule: ScheduleInfo) -> None:
"""Run a scheduled collection.
Args:
name: Schedule name
schedule: Schedule information
"""
try:
self.logger.info(f"Running scheduled collection: {name}")
# Update last run time
schedule.last_run = datetime.now(timezone.utc)
# Prepare collection config
config = {
"collector_name": schedule.collector_name,
"keywords": schedule.keywords,
"limit": schedule.limit
}
# Run collection with retry
result = await self._run_collection_with_retry(name, config)
# Update statistics
schedule.last_result_count = len(result)
schedule.failure_count = 0 # Reset failure count on success
self.stats["successful_collections"] += 1
self.logger.info(f"Completed collection {name}: {len(result)} items")
except Exception as e:
schedule.failure_count += 1
self.stats["failed_collections"] += 1
self.logger.error(f"Failed collection {name}: {e}")
finally:
# Schedule next run
schedule.next_run = self._calculate_next_run_time(schedule.interval)
# Remove from running tasks
if name in self._running_tasks:
del self._running_tasks[name]
async def _run_collection(self, name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Run a single collection.
Args:
name: Collection job name
config: Collection configuration
Returns:
List of collected items
"""
collector_name = config["collector_name"]
# Find collector
collector = None
for c in self.collectors:
if c.source_name == collector_name:
collector = c
break
if not collector:
raise SchedulerError(f"Collector {collector_name} not found")
try:
# Run collection
self.stats["total_collections"] += 1
# Prepare arguments for collection
collect_args = {}
if "keywords" in config:
collect_args["keywords"] = config["keywords"]
if "limit" in config:
collect_args["limit"] = config["limit"]
# Add other config parameters except the ones we already handled
for key, value in config.items():
if key not in ["collector_name", "keywords", "limit"]:
collect_args[key] = value
result = await collector.collect_and_process(**collect_args)
return result
except Exception as e:
self.logger.error(f"Collection error in {name}: {e}")
return []
async def _run_collection_with_retry(self, name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Run collection with retry logic.
Args:
name: Collection job name
config: Collection configuration
Returns:
List of collected items
"""
schedule = self.schedules.get(name)
retry_attempts = schedule.retry_attempts if schedule else 1
retry_delay = schedule.retry_delay if schedule else 1.0
last_exception = None
for attempt in range(retry_attempts):
try:
return await self._run_collection(name, config)
except Exception as e:
last_exception = e
if attempt < retry_attempts - 1:
self.logger.warning(f"Collection {name} failed (attempt {attempt + 1}), retrying in {retry_delay}s")
await asyncio.sleep(retry_delay)
else:
self.logger.error(f"Collection {name} failed after {retry_attempts} attempts")
# If we get here, all attempts failed
if last_exception:
raise last_exception
return []
async def run_schedule_now(self, name: str) -> List[Dict[str, Any]]:
"""Run a schedule immediately.
Args:
name: Schedule name
Returns:
Collection results
"""
if name not in self.schedules:
raise SchedulerError(f"Schedule {name} not found")
schedule = self.schedules[name]
config = {
"collector_name": schedule.collector_name,
"keywords": schedule.keywords,
"limit": schedule.limit
}
return await self._run_collection(name, config)
def _calculate_next_run_time(self, interval_seconds: int) -> datetime:
"""Calculate next run time based on interval.
Args:
interval_seconds: Interval in seconds
Returns:
Next run datetime
"""
return datetime.now(timezone.utc) + timedelta(seconds=interval_seconds)
def _is_schedule_due(self, next_run_time: datetime) -> bool:
"""Check if a schedule is due to run.
Args:
next_run_time: Next scheduled run time
Returns:
True if schedule is due
"""
return datetime.now(timezone.utc) >= next_run_time
def _validate_schedule_config(self, config: Dict[str, Any]) -> bool:
"""Validate schedule configuration.
Args:
config: Schedule configuration
Returns:
True if valid
"""
required_fields = ["collector_name", "interval"]
# Check required fields
for field in required_fields:
if field not in config:
return False
# Check interval is positive
if config["interval"] <= 0:
return False
# Check collector exists
collector_name = config["collector_name"]
if not any(c.source_name == collector_name for c in self.collectors):
# Allow validation to pass if collector will be added later
pass
return True
async def _wait_for_running_tasks(self) -> None:
"""Wait for all running tasks to complete."""
if not self._running_tasks:
return
self.logger.info(f"Waiting for {len(self._running_tasks)} running tasks to complete")
# Wait for all tasks with timeout
try:
await asyncio.wait_for(
asyncio.gather(*self._running_tasks.values(), return_exceptions=True),
timeout=30.0 # 30 second timeout
)
except asyncio.TimeoutError:
self.logger.warning("Timeout waiting for tasks to complete, cancelling remaining tasks")
for task in self._running_tasks.values():
if not task.done():
task.cancel()
def get_schedule_status(self, name: str) -> Dict[str, Any]:
"""Get status of a specific schedule.
Args:
name: Schedule name
Returns:
Schedule status information
"""
if name not in self.schedules:
raise SchedulerError(f"Schedule {name} not found")
schedule = self.schedules[name]
return {
"name": name,
"collector": schedule.collector_name,
"interval": schedule.interval,
"keywords": schedule.keywords,
"limit": schedule.limit,
"priority": schedule.priority,
"paused": schedule.paused,
"next_run": schedule.next_run.isoformat() if schedule.next_run else None,
"last_run": schedule.last_run.isoformat() if schedule.last_run else None,
"last_result_count": schedule.last_result_count,
"failure_count": schedule.failure_count
}
def get_all_schedules_status(self) -> List[Dict[str, Any]]:
"""Get status of all schedules.
Returns:
List of schedule status information
"""
return [self.get_schedule_status(name) for name in self.schedules.keys()]
def get_statistics(self) -> Dict[str, Any]:
"""Get scheduler statistics.
Returns:
Statistics dictionary
"""
active_schedules = sum(1 for s in self.schedules.values() if not s.paused)
paused_schedules = sum(1 for s in self.schedules.values() if s.paused)
return {
"total_schedules": len(self.schedules),
"active_schedules": active_schedules,
"paused_schedules": paused_schedules,
"running_tasks": len(self._running_tasks),
"is_running": self.is_running,
"uptime_seconds": (
(datetime.now(timezone.utc) - self.stats["scheduler_start_time"]).total_seconds()
if self.stats["scheduler_start_time"] else 0
),
**self.stats
}
def detect_schedule_conflicts(self) -> List[Dict[str, Any]]:
"""Detect potential schedule conflicts.
Returns:
List of detected conflicts
"""
conflicts = []
schedules_by_collector = {}
# Group schedules by collector
for name, schedule in self.schedules.items():
collector_name = schedule.collector_name
if collector_name not in schedules_by_collector:
schedules_by_collector[collector_name] = []
schedules_by_collector[collector_name].append((name, schedule))
# Check for conflicts within each collector
for collector_name, collector_schedules in schedules_by_collector.items():
if len(collector_schedules) > 1:
# Multiple schedules for same collector
intervals = [s[1].interval for s in collector_schedules]
min_interval = min(intervals)
if min_interval < 60: # Less than 1 minute
conflicts.append({
"type": "high_frequency",
"collector": collector_name,
"schedules": [s[0] for s in collector_schedules],
"description": f"Multiple high-frequency schedules for {collector_name}"
})
return conflicts
def get_prioritized_schedules(self) -> List[Dict[str, Any]]:
"""Get schedules ordered by priority.
Returns:
List of schedules ordered by priority
"""
priority_order = {"high": 0, "medium": 1, "low": 2}
sorted_schedules = sorted(
self.schedules.items(),
key=lambda x: priority_order.get(x[1].priority, 1)
)
return [
{
**self.get_schedule_status(name),
"priority": schedule.priority
}
for name, schedule in sorted_schedules
]
def export_schedules(self) -> Dict[str, Any]:
"""Export schedules configuration.
Returns:
Serializable schedules configuration
"""
schedules_config = {}
for name, schedule in self.schedules.items():
schedules_config[name] = {
"collector_name": schedule.collector_name,
"interval": schedule.interval,
"keywords": schedule.keywords,
"limit": schedule.limit,
"priority": schedule.priority,
"retry_attempts": schedule.retry_attempts,
"retry_delay": schedule.retry_delay
}
return {"schedules": schedules_config}
def import_schedules(self, config: Dict[str, Any]) -> None:
"""Import schedules from configuration.
Args:
config: Schedules configuration
"""
if "schedules" not in config:
raise SchedulerError("Invalid configuration format")
for name, schedule_config in config["schedules"].items():
try:
self.add_schedule(name, schedule_config)
except Exception as e:
self.logger.error(f"Failed to import schedule {name}: {e}")
def load_schedules_from_file(self, file_path: str) -> None:
"""Load schedules from JSON file.
Args:
file_path: Path to JSON configuration file
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
config = json.load(f)
self.import_schedules(config)
self.logger.info(f"Loaded schedules from {file_path}")
except Exception as e:
raise SchedulerError(f"Failed to load schedules from {file_path}: {e}")
def save_schedules_to_file(self, file_path: str) -> None:
"""Save schedules to JSON file.
Args:
file_path: Path to save configuration file
"""
try:
config = self.export_schedules()
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
self.logger.info(f"Saved schedules to {file_path}")
except Exception as e:
raise SchedulerError(f"Failed to save schedules to {file_path}: {e}")