"""Tests for collection scheduler."""
import pytest
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from datetime import datetime, timezone, timedelta
import asyncio
from src.scheduler.collection_scheduler import CollectionScheduler, SchedulerError
from src.collectors.base_collector import BaseCollector
class TestCollectionScheduler:
"""Test cases for CollectionScheduler."""
@pytest.fixture
def mock_collector(self):
"""Create mock collector for testing."""
collector = AsyncMock(spec=BaseCollector)
collector.source_name = "test"
collector.collect_and_process.return_value = [
{"title": "Test News", "url": "https://example.com/1"},
{"title": "Test News 2", "url": "https://example.com/2"}
]
return collector
@pytest.fixture
def scheduler(self, mock_collector):
"""Create CollectionScheduler instance for testing."""
return CollectionScheduler(collectors=[mock_collector])
def test_scheduler_initialization(self, scheduler, mock_collector):
"""Test CollectionScheduler initialization."""
assert len(scheduler.collectors) == 1
assert scheduler.collectors[0] == mock_collector
assert scheduler.is_running is False
assert isinstance(scheduler.schedules, dict)
def test_scheduler_without_collectors(self):
"""Test scheduler initialization without collectors."""
scheduler = CollectionScheduler()
assert len(scheduler.collectors) == 0
def test_add_collector(self, scheduler):
"""Test adding collectors to scheduler."""
new_collector = AsyncMock(spec=BaseCollector)
new_collector.source_name = "new_test"
scheduler.add_collector(new_collector)
assert len(scheduler.collectors) == 2
assert new_collector in scheduler.collectors
def test_remove_collector(self, scheduler, mock_collector):
"""Test removing collectors from scheduler."""
scheduler.remove_collector(mock_collector)
assert len(scheduler.collectors) == 0
assert mock_collector not in scheduler.collectors
def test_add_schedule(self, scheduler):
"""Test adding collection schedules."""
schedule_config = {
"collector_name": "test",
"interval": 300, # 5 minutes
"keywords": ["주식", "경제"],
"limit": 50
}
scheduler.add_schedule("economy_news", schedule_config)
assert "economy_news" in scheduler.schedules
assert scheduler.schedules["economy_news"].interval == 300
def test_remove_schedule(self, scheduler):
"""Test removing collection schedules."""
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["주식"]
}
scheduler.add_schedule("test_schedule", schedule_config)
scheduler.remove_schedule("test_schedule")
assert "test_schedule" not in scheduler.schedules
def test_update_schedule(self, scheduler):
"""Test updating existing schedules."""
original_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["주식"]
}
updated_config = {
"collector_name": "test",
"interval": 600,
"keywords": ["주식", "경제"]
}
scheduler.add_schedule("test_schedule", original_config)
scheduler.update_schedule("test_schedule", updated_config)
assert scheduler.schedules["test_schedule"].interval == 600
assert len(scheduler.schedules["test_schedule"].keywords) == 2
@pytest.mark.asyncio
async def test_run_single_collection(self, scheduler, mock_collector):
"""Test running a single collection task."""
schedule_config = {
"collector_name": "test",
"keywords": ["테스트"],
"limit": 10
}
result = await scheduler._run_collection("test_job", schedule_config)
assert len(result) == 2
mock_collector.collect_and_process.assert_called_once()
@pytest.mark.asyncio
async def test_run_collection_with_invalid_collector(self, scheduler):
"""Test running collection with invalid collector name."""
schedule_config = {
"collector_name": "nonexistent",
"keywords": ["테스트"]
}
with pytest.raises(SchedulerError):
await scheduler._run_collection("invalid_job", schedule_config)
@pytest.mark.asyncio
async def test_run_collection_with_error(self, scheduler, mock_collector):
"""Test handling errors during collection."""
mock_collector.collect_and_process.side_effect = Exception("Collection failed")
schedule_config = {
"collector_name": "test",
"keywords": ["테스트"]
}
# Should handle error gracefully
result = await scheduler._run_collection("error_job", schedule_config)
assert result == [] # Returns empty list on error
@pytest.mark.asyncio
async def test_start_scheduler(self, scheduler):
"""Test starting the scheduler."""
schedule_config = {
"collector_name": "test",
"interval": 1, # 1 second for quick test
"keywords": ["테스트"]
}
scheduler.add_schedule("quick_test", schedule_config)
# Start scheduler in background
scheduler_task = asyncio.create_task(scheduler.start())
# Wait a short time
await asyncio.sleep(0.1)
# Stop scheduler
await scheduler.stop()
# Wait for task to complete
try:
await asyncio.wait_for(scheduler_task, timeout=1.0)
except asyncio.TimeoutError:
scheduler_task.cancel()
assert scheduler.is_running is False
@pytest.mark.asyncio
async def test_stop_scheduler(self, scheduler):
"""Test stopping the scheduler."""
assert scheduler.is_running is False
# Start scheduler
scheduler_task = asyncio.create_task(scheduler.start())
await asyncio.sleep(0.1)
assert scheduler.is_running is True
# Stop scheduler
await scheduler.stop()
# Wait for task to complete
try:
await asyncio.wait_for(scheduler_task, timeout=1.0)
except asyncio.TimeoutError:
scheduler_task.cancel()
assert scheduler.is_running is False
def test_calculate_next_run_time(self, scheduler):
"""Test calculating next run time for schedules."""
now = datetime.now(timezone.utc)
# Test with 5 minute interval
next_run = scheduler._calculate_next_run_time(300)
expected = now + timedelta(seconds=300)
# Allow for small time differences
assert abs((next_run - expected).total_seconds()) < 5
def test_is_schedule_due(self, scheduler):
"""Test checking if schedule is due to run."""
now = datetime.now(timezone.utc)
# Schedule that should run (past due)
past_time = now - timedelta(seconds=10)
assert scheduler._is_schedule_due(past_time) is True
# Schedule that shouldn't run yet (future)
future_time = now + timedelta(seconds=10)
assert scheduler._is_schedule_due(future_time) is False
def test_get_schedule_status(self, scheduler):
"""Test getting schedule status information."""
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["테스트"]
}
scheduler.add_schedule("status_test", schedule_config)
status = scheduler.get_schedule_status("status_test")
assert status["name"] == "status_test"
assert status["collector"] == "test"
assert status["interval"] == 300
assert "next_run" in status
assert "last_run" in status
def test_get_all_schedules_status(self, scheduler):
"""Test getting status of all schedules."""
schedule_config1 = {
"collector_name": "test",
"interval": 300,
"keywords": ["테스트1"]
}
schedule_config2 = {
"collector_name": "test",
"interval": 600,
"keywords": ["테스트2"]
}
scheduler.add_schedule("schedule1", schedule_config1)
scheduler.add_schedule("schedule2", schedule_config2)
all_status = scheduler.get_all_schedules_status()
assert len(all_status) == 2
assert "schedule1" in [s["name"] for s in all_status]
assert "schedule2" in [s["name"] for s in all_status]
@pytest.mark.asyncio
async def test_pause_and_resume_schedule(self, scheduler):
"""Test pausing and resuming individual schedules."""
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["테스트"]
}
scheduler.add_schedule("pausable", schedule_config)
# Test pause
scheduler.pause_schedule("pausable")
status = scheduler.get_schedule_status("pausable")
assert status["paused"] is True
# Test resume
scheduler.resume_schedule("pausable")
status = scheduler.get_schedule_status("pausable")
assert status["paused"] is False
@pytest.mark.asyncio
async def test_run_schedule_immediately(self, scheduler, mock_collector):
"""Test running a schedule immediately."""
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["즉시실행"],
"limit": 5
}
scheduler.add_schedule("immediate", schedule_config)
result = await scheduler.run_schedule_now("immediate")
assert len(result) == 2
mock_collector.collect_and_process.assert_called()
def test_validate_schedule_config(self, scheduler):
"""Test schedule configuration validation."""
# Valid config
valid_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["테스트"]
}
assert scheduler._validate_schedule_config(valid_config) is True
# Invalid config - missing collector_name
invalid_config1 = {
"interval": 300,
"keywords": ["테스트"]
}
assert scheduler._validate_schedule_config(invalid_config1) is False
# Invalid config - negative interval
invalid_config2 = {
"collector_name": "test",
"interval": -100,
"keywords": ["테스트"]
}
assert scheduler._validate_schedule_config(invalid_config2) is False
@pytest.mark.asyncio
async def test_concurrent_collections(self, scheduler):
"""Test handling concurrent collection tasks."""
collector1 = AsyncMock(spec=BaseCollector)
collector1.source_name = "collector1"
collector1.collect_and_process.return_value = [{"title": "News 1"}]
collector2 = AsyncMock(spec=BaseCollector)
collector2.source_name = "collector2"
collector2.collect_and_process.return_value = [{"title": "News 2"}]
scheduler.add_collector(collector1)
scheduler.add_collector(collector2)
config1 = {"collector_name": "collector1", "keywords": ["테스트1"], "interval": 300}
config2 = {"collector_name": "collector2", "keywords": ["테스트2"], "interval": 300}
# Run collections concurrently
results = await asyncio.gather(
scheduler._run_collection("job1", config1),
scheduler._run_collection("job2", config2)
)
assert len(results[0]) == 1
assert len(results[1]) == 1
def test_schedule_conflict_detection(self, scheduler):
"""Test detection of schedule conflicts."""
config1 = {
"collector_name": "test",
"interval": 60,
"keywords": ["테스트"]
}
config2 = {
"collector_name": "test",
"interval": 30,
"keywords": ["테스트"]
}
scheduler.add_schedule("schedule1", config1)
scheduler.add_schedule("schedule2", config2)
conflicts = scheduler.detect_schedule_conflicts()
# Should detect potential conflicts with same collector
assert len(conflicts) > 0
def test_get_scheduler_statistics(self, scheduler, mock_collector):
"""Test getting scheduler statistics."""
stats = scheduler.get_statistics()
assert "total_schedules" in stats
assert "active_schedules" in stats
assert "paused_schedules" in stats
assert "total_collections" in stats
assert "successful_collections" in stats
assert "failed_collections" in stats
@pytest.mark.asyncio
async def test_cleanup_on_shutdown(self, scheduler):
"""Test proper cleanup when scheduler is shut down."""
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["테스트"]
}
scheduler.add_schedule("cleanup_test", schedule_config)
# Start and immediately stop
scheduler_task = asyncio.create_task(scheduler.start())
await asyncio.sleep(0.1)
await scheduler.stop()
try:
await asyncio.wait_for(scheduler_task, timeout=1.0)
except asyncio.TimeoutError:
scheduler_task.cancel()
# Check that cleanup was performed
assert scheduler.is_running is False
def test_schedule_persistence(self, scheduler):
"""Test saving and loading schedule configurations."""
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["지속성테스트"]
}
scheduler.add_schedule("persistent", schedule_config)
# Save schedules
saved_config = scheduler.export_schedules()
# Create new scheduler and load
new_scheduler = CollectionScheduler()
new_scheduler.import_schedules(saved_config)
assert "persistent" in new_scheduler.schedules
assert new_scheduler.schedules["persistent"].interval == 300
@pytest.mark.asyncio
async def test_error_recovery(self, scheduler, mock_collector):
"""Test error recovery and continuation of other schedules."""
# Add a failing collector
failing_collector = AsyncMock(spec=BaseCollector)
failing_collector.source_name = "failing"
failing_collector.collect_and_process.side_effect = Exception("Always fails")
scheduler.add_collector(failing_collector)
# Add schedules for both collectors
scheduler.add_schedule("working", {
"collector_name": "test",
"interval": 1,
"keywords": ["작동"]
})
scheduler.add_schedule("failing", {
"collector_name": "failing",
"interval": 1,
"keywords": ["실패"]
})
# Start scheduler briefly
scheduler_task = asyncio.create_task(scheduler.start())
await asyncio.sleep(0.5)
await scheduler.stop()
try:
await asyncio.wait_for(scheduler_task, timeout=1.0)
except asyncio.TimeoutError:
scheduler_task.cancel()
# Check statistics show both successes and failures
stats = scheduler.get_statistics()
# Should have attempted collections from both schedules
def test_dynamic_schedule_modification(self, scheduler):
"""Test modifying schedules while scheduler is running."""
original_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["원본"]
}
modified_config = {
"collector_name": "test",
"interval": 600,
"keywords": ["수정됨"]
}
scheduler.add_schedule("dynamic", original_config)
# Modify schedule
scheduler.update_schedule("dynamic", modified_config)
# Check modification was applied
status = scheduler.get_schedule_status("dynamic")
assert status["interval"] == 600
assert "수정됨" in status["keywords"]
@pytest.mark.asyncio
async def test_graceful_shutdown_with_running_tasks(self, scheduler, mock_collector):
"""Test graceful shutdown when collections are running."""
# Make collection take some time
async def slow_collection(*args, **kwargs):
await asyncio.sleep(2)
return [{"title": "Slow news"}]
mock_collector.collect_and_process.side_effect = slow_collection
scheduler.add_schedule("slow", {
"collector_name": "test",
"interval": 1,
"keywords": ["느림"]
})
# Start scheduler
scheduler_task = asyncio.create_task(scheduler.start())
# Wait for collection to start
await asyncio.sleep(0.5)
# Stop scheduler while collection is running
stop_start = datetime.now()
await scheduler.stop()
stop_end = datetime.now()
# Should stop gracefully
try:
await asyncio.wait_for(scheduler_task, timeout=3.0)
except asyncio.TimeoutError:
scheduler_task.cancel()
# Should not take too long to stop
stop_duration = (stop_end - stop_start).total_seconds()
assert stop_duration < 5 # Should stop within reasonable time
def test_schedule_priority_handling(self, scheduler):
"""Test handling of schedule priorities."""
high_priority_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["중요"],
"priority": "high"
}
low_priority_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["일반"],
"priority": "low"
}
scheduler.add_schedule("high_priority", high_priority_config)
scheduler.add_schedule("low_priority", low_priority_config)
# Get prioritized schedule list
prioritized = scheduler.get_prioritized_schedules()
# High priority should come first
assert prioritized[0]["priority"] == "high"
@pytest.mark.asyncio
async def test_schedule_retry_mechanism(self, scheduler, mock_collector):
"""Test retry mechanism for failed collections."""
# Test that retry attempts are configured correctly
schedule_config = {
"collector_name": "test",
"interval": 300,
"keywords": ["재시도"],
"retry_attempts": 3,
"retry_delay": 0.1
}
# Add the schedule first so retry logic can find it
scheduler.add_schedule("retry_test", schedule_config)
# Check that schedule was added with retry configuration
schedule_info = scheduler.schedules["retry_test"]
assert schedule_info.retry_attempts == 3
assert schedule_info.retry_delay == 0.1
def test_load_schedules_from_config_file(self, scheduler, tmp_path):
"""Test loading schedules from configuration file."""
config_file = tmp_path / "schedules.json"
config_data = {
"schedules": {
"economy_news": {
"collector_name": "naver",
"interval": 300,
"keywords": ["경제", "주식"],
"limit": 50
},
"politics_news": {
"collector_name": "daum",
"interval": 600,
"keywords": ["정치", "선거"],
"limit": 30
}
}
}
import json
config_file.write_text(json.dumps(config_data, ensure_ascii=False))
scheduler.load_schedules_from_file(str(config_file))
assert len(scheduler.schedules) == 2
assert "economy_news" in scheduler.schedules
assert "politics_news" in scheduler.schedules