"""Tests for RSS feed collector."""
import pytest
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from datetime import datetime, timezone
import aiohttp
import xml.etree.ElementTree as ET
from src.collectors.rss_collector import RSSFeedCollector, RSSParsingError
from src.collectors.base_collector import CollectorError
class TestRSSFeedCollector:
"""Test cases for RSSFeedCollector."""
@pytest.fixture
def collector(self):
"""Create RSSFeedCollector instance for testing."""
return RSSFeedCollector()
@pytest.fixture
def sample_rss_feed(self):
"""Sample RSS feed XML."""
return """<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>테스트 뉴스 RSS</title>
<link>https://example.com/news</link>
<description>테스트용 뉴스 피드</description>
<language>ko-KR</language>
<lastBuildDate>Tue, 23 Jul 2024 14:30:00 +0900</lastBuildDate>
<item>
<title>첫 번째 테스트 뉴스</title>
<link>https://example.com/news/1</link>
<description>첫 번째 뉴스의 요약 내용입니다.</description>
<pubDate>Tue, 23 Jul 2024 14:30:00 +0900</pubDate>
<guid>https://example.com/news/1</guid>
<category>경제</category>
<author>기자1</author>
</item>
<item>
<title>두 번째 테스트 뉴스</title>
<link>https://example.com/news/2</link>
<description>두 번째 뉴스의 요약 내용입니다.</description>
<pubDate>Tue, 23 Jul 2024 13:15:00 +0900</pubDate>
<guid>https://example.com/news/2</guid>
<category>정치</category>
<author>기자2</author>
</item>
</channel>
</rss>"""
@pytest.fixture
def sample_atom_feed(self):
"""Sample Atom feed XML."""
return """<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>테스트 Atom 피드</title>
<link href="https://example.com/atom"/>
<updated>2024-07-23T14:30:00+09:00</updated>
<id>https://example.com/atom</id>
<entry>
<title>첫 번째 Atom 뉴스</title>
<link href="https://example.com/atom/1"/>
<id>https://example.com/atom/1</id>
<updated>2024-07-23T14:30:00+09:00</updated>
<summary>첫 번째 Atom 뉴스의 요약입니다.</summary>
<author>
<name>Atom 기자1</name>
</author>
<category term="기술"/>
</entry>
</feed>"""
def test_rss_collector_initialization(self, collector):
"""Test RSSFeedCollector initialization."""
assert collector.source_name == "rss"
assert hasattr(collector, 'feed_urls')
assert isinstance(collector.feed_urls, list)
assert collector.max_items_per_feed == 100
def test_rss_collector_with_feed_urls(self):
"""Test RSSFeedCollector initialization with feed URLs."""
feed_urls = [
"https://example.com/rss1.xml",
"https://example.com/rss2.xml"
]
collector = RSSFeedCollector(feed_urls=feed_urls)
assert collector.feed_urls == feed_urls
def test_add_feed_url(self, collector):
"""Test adding feed URLs."""
collector.add_feed_url("https://example.com/rss.xml")
assert "https://example.com/rss.xml" in collector.feed_urls
# Test duplicate prevention
collector.add_feed_url("https://example.com/rss.xml")
assert collector.feed_urls.count("https://example.com/rss.xml") == 1
def test_remove_feed_url(self, collector):
"""Test removing feed URLs."""
collector.add_feed_url("https://example.com/rss.xml")
collector.remove_feed_url("https://example.com/rss.xml")
assert "https://example.com/rss.xml" not in collector.feed_urls
@pytest.mark.asyncio
async def test_fetch_feed_success(self, collector, sample_rss_feed):
"""Test successful RSS feed fetching."""
with patch('aiohttp.ClientSession.get') as mock_get:
# Mock response
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text.return_value = sample_rss_feed
mock_response.headers = {'content-type': 'application/rss+xml'}
# Configure the async context manager
mock_get.return_value.__aenter__.return_value = mock_response
mock_get.return_value.__aexit__.return_value = False
# Initialize collector session
await collector.initialize()
# Test feed fetch
result = await collector._fetch_feed("https://example.com/rss.xml")
assert result == sample_rss_feed
mock_get.assert_called_once()
@pytest.mark.asyncio
async def test_fetch_feed_error(self, collector):
"""Test RSS feed fetch with error."""
with patch('aiohttp.ClientSession.get') as mock_get:
# Mock network error
mock_get.side_effect = aiohttp.ClientError("Network error")
# Initialize collector session
await collector.initialize()
with pytest.raises(CollectorError):
await collector._fetch_feed("https://invalid-url.com")
def test_parse_rss_feed(self, collector, sample_rss_feed):
"""Test RSS feed parsing."""
items = collector._parse_rss_feed(sample_rss_feed)
assert len(items) == 2
assert items[0]["title"] == "첫 번째 테스트 뉴스"
assert items[0]["link"] == "https://example.com/news/1"
assert items[0]["description"] == "첫 번째 뉴스의 요약 내용입니다."
assert items[0]["category"] == "경제"
assert items[0]["author"] == "기자1"
def test_parse_atom_feed(self, collector, sample_atom_feed):
"""Test Atom feed parsing."""
items = collector._parse_atom_feed(sample_atom_feed)
assert len(items) == 1
assert items[0]["title"] == "첫 번째 Atom 뉴스"
assert items[0]["link"] == "https://example.com/atom/1"
assert items[0]["description"] == "첫 번째 Atom 뉴스의 요약입니다."
assert items[0]["category"] == "기술"
assert items[0]["author"] == "Atom 기자1"
def test_parse_invalid_feed(self, collector):
"""Test parsing invalid feed."""
invalid_xml = "<invalid>xml content</invalid>"
with pytest.raises(RSSParsingError):
collector._parse_rss_feed(invalid_xml)
def test_detect_feed_type(self, collector, sample_rss_feed, sample_atom_feed):
"""Test feed type detection."""
assert collector._detect_feed_type(sample_rss_feed) == "rss"
assert collector._detect_feed_type(sample_atom_feed) == "atom"
# Test unknown feed type
invalid_feed = "<unknown>content</unknown>"
assert collector._detect_feed_type(invalid_feed) == "unknown"
@pytest.mark.asyncio
async def test_collect_from_single_feed(self, collector, sample_rss_feed):
"""Test collecting from a single RSS feed."""
collector._fetch_feed = AsyncMock(return_value=sample_rss_feed)
result = await collector.collect_from_feed("https://example.com/rss.xml")
assert len(result) == 2
assert result[0]["title"] == "첫 번째 테스트 뉴스"
assert result[1]["title"] == "두 번째 테스트 뉴스"
@pytest.mark.asyncio
async def test_collect_from_multiple_feeds(self, collector, sample_rss_feed):
"""Test collecting from multiple RSS feeds."""
collector.feed_urls = [
"https://example.com/rss1.xml",
"https://example.com/rss2.xml"
]
# Create different feeds with unique GUIDs
feed1 = sample_rss_feed.replace("https://example.com/news/1", "https://example.com/feed1/1").replace("https://example.com/news/2", "https://example.com/feed1/2")
feed2 = sample_rss_feed.replace("https://example.com/news/1", "https://example.com/feed2/1").replace("https://example.com/news/2", "https://example.com/feed2/2")
collector._fetch_feed = AsyncMock(side_effect=[feed1, feed2])
result = await collector.collect(limit=10)
# Should collect from all feeds
assert collector._fetch_feed.call_count == 2
assert len(result) == 4 # 2 items per feed × 2 feeds
@pytest.mark.asyncio
async def test_collect_with_limit(self, collector, sample_rss_feed):
"""Test collecting with item limit."""
collector.feed_urls = ["https://example.com/rss.xml"]
collector._fetch_feed = AsyncMock(return_value=sample_rss_feed)
result = await collector.collect(limit=1)
assert len(result) == 1
@pytest.mark.asyncio
async def test_collect_with_date_filter(self, collector, sample_rss_feed):
"""Test collecting with date filtering."""
collector.feed_urls = ["https://example.com/rss.xml"]
collector._fetch_feed = AsyncMock(return_value=sample_rss_feed)
from datetime import date
result = await collector.collect(
start_date=date(2024, 7, 20),
end_date=date(2024, 7, 23)
)
assert len(result) >= 0 # Should apply date filter
@pytest.mark.asyncio
async def test_parse_feed_item(self, collector):
"""Test parsing individual feed item."""
raw_item = {
"title": "테스트 뉴스 제목",
"link": "https://example.com/news/123",
"description": "테스트 뉴스 내용입니다.",
"pubDate": "Tue, 23 Jul 2024 14:30:00 +0900",
"guid": "https://example.com/news/123",
"category": "경제",
"author": "테스트 기자"
}
parsed = await collector.parse(raw_item)
assert parsed["title"] == "테스트 뉴스 제목"
assert parsed["content"] == "테스트 뉴스 내용입니다."
assert parsed["url"] == "https://example.com/news/123"
assert parsed["source"] == "rss"
assert parsed["category"] == "경제"
assert parsed["author"] == "테스트 기자"
assert isinstance(parsed["published_at"], datetime)
def test_parse_date(self, collector):
"""Test date parsing from RSS format."""
test_dates = [
"Tue, 23 Jul 2024 14:30:00 +0900",
"Mon, 01 Jan 2024 00:00:00 +0900",
"2024-07-23T14:30:00+09:00" # ISO format
]
for date_str in test_dates:
parsed_date = collector._parse_date(date_str)
assert isinstance(parsed_date, datetime)
assert parsed_date.tzinfo is not None
def test_parse_invalid_date(self, collector):
"""Test parsing invalid date."""
invalid_dates = [
"Invalid date format",
"",
None
]
for invalid_date in invalid_dates:
parsed_date = collector._parse_date(invalid_date)
assert isinstance(parsed_date, datetime)
def test_extract_guid(self, collector):
"""Test GUID extraction."""
item_with_guid = {"guid": "https://example.com/guid/123"}
item_without_guid = {"link": "https://example.com/link/456"}
assert collector._extract_guid(item_with_guid) == "https://example.com/guid/123"
assert collector._extract_guid(item_without_guid) == "https://example.com/link/456"
def test_clean_html_description(self, collector):
"""Test HTML cleaning in descriptions."""
html_description = "This is <b>bold</b> and <i>italic</i> text."
cleaned = collector._clean_html_tags(html_description)
assert cleaned == "This is bold and italic text."
assert "<b>" not in cleaned
assert "<i>" not in cleaned
@pytest.mark.asyncio
async def test_collect_with_error_recovery(self, collector):
"""Test error recovery during collection."""
collector.feed_urls = [
"https://example.com/good.xml",
"https://example.com/bad.xml"
]
# First feed succeeds, second fails
collector._fetch_feed = AsyncMock(side_effect=[
"<rss><channel><item><title>Good</title></item></channel></rss>",
aiohttp.ClientError("Network error")
])
# Should continue collecting from other feeds despite errors
result = await collector.collect()
# Should still get results from the successful feed
assert len(result) >= 0
def test_feed_url_validation(self, collector):
"""Test feed URL validation."""
valid_urls = [
"https://example.com/rss.xml",
"http://example.com/feed.rss",
"https://example.com/atom.xml"
]
invalid_urls = [
"not-a-url",
"ftp://example.com/file.txt",
""
]
for url in valid_urls:
assert collector._is_valid_feed_url(url) is True
for url in invalid_urls:
assert collector._is_valid_feed_url(url) is False
@pytest.mark.asyncio
async def test_collect_with_fetch_full_content(self, collector, sample_rss_feed):
"""Test collection with full content fetching."""
collector.feed_urls = ["https://example.com/rss.xml"]
collector._fetch_feed = AsyncMock(return_value=sample_rss_feed)
collector._fetch_full_content = AsyncMock(return_value="전체 기사 내용입니다.")
result = await collector.collect(fetch_full_content=True)
# Should attempt to fetch full content for each article
assert collector._fetch_full_content.call_count == 2
@pytest.mark.asyncio
async def test_fetch_full_content_from_url(self, collector):
"""Test fetching full content from article URL."""
html_content = """
<html>
<body>
<article>
<p>전체 기사 내용입니다. 이것은 충분히 긴 내용으로 테스트를 위해 작성된 가짜 뉴스 기사입니다.</p>
<p>두번째 문단입니다. 추가적인 내용을 포함하여 길이 제한을 통과할 수 있도록 합니다.</p>
</article>
</body>
</html>
"""
with patch('aiohttp.ClientSession.get') as mock_get:
# Mock response
mock_response = AsyncMock()
mock_response.text.return_value = html_content
mock_response.status = 200
# Configure the async context manager
mock_get.return_value.__aenter__.return_value = mock_response
mock_get.return_value.__aexit__.return_value = False
# Initialize collector session
await collector.initialize()
content = await collector._fetch_full_content("https://example.com/news/123")
assert "전체 기사 내용입니다" in content
assert "두번째 문단입니다" in content
def test_categorize_content(self, collector):
"""Test content categorization."""
# Test with explicit category
item_with_category = {"category": "정치", "title": "정치 뉴스"}
category = collector._categorize_content(item_with_category)
assert category == "정치"
# Test with title-based categorization
item_without_category = {"title": "경제 상황이 좋아졌습니다"}
category = collector._categorize_content(item_without_category)
# Should attempt to categorize based on content
def test_deduplicate_items(self, collector):
"""Test item deduplication."""
items = [
{"guid": "1", "title": "뉴스 1"},
{"guid": "2", "title": "뉴스 2"},
{"guid": "1", "title": "뉴스 1 (중복)"} # Duplicate
]
deduplicated = collector._deduplicate_items(items)
assert len(deduplicated) == 2
assert deduplicated[0]["guid"] == "1"
assert deduplicated[1]["guid"] == "2"
def test_sort_items_by_date(self, collector):
"""Test sorting items by publication date."""
items = [
{"pubDate": "Tue, 23 Jul 2024 14:30:00 +0900"},
{"pubDate": "Tue, 23 Jul 2024 13:15:00 +0900"},
{"pubDate": "Tue, 23 Jul 2024 15:45:00 +0900"}
]
sorted_items = collector._sort_items_by_date(items)
# Should be sorted in descending order (newest first)
assert sorted_items[0]["pubDate"] == "Tue, 23 Jul 2024 15:45:00 +0900"
assert sorted_items[2]["pubDate"] == "Tue, 23 Jul 2024 13:15:00 +0900"
@pytest.mark.asyncio
async def test_concurrent_feed_collection(self, collector, sample_rss_feed):
"""Test concurrent collection from multiple feeds."""
collector.feed_urls = [
"https://example.com/rss1.xml",
"https://example.com/rss2.xml",
"https://example.com/rss3.xml"
]
# Create different feeds with unique GUIDs
feed1 = sample_rss_feed.replace("https://example.com/news/1", "https://example.com/feed1/1").replace("https://example.com/news/2", "https://example.com/feed1/2")
feed2 = sample_rss_feed.replace("https://example.com/news/1", "https://example.com/feed2/1").replace("https://example.com/news/2", "https://example.com/feed2/2")
feed3 = sample_rss_feed.replace("https://example.com/news/1", "https://example.com/feed3/1").replace("https://example.com/news/2", "https://example.com/feed3/2")
collector._fetch_feed = AsyncMock(side_effect=[feed1, feed2, feed3])
# Test concurrent collection
result = await collector.collect_concurrent()
# Should collect from all feeds concurrently
assert collector._fetch_feed.call_count == 3
assert len(result) == 6 # 2 items per feed × 3 feeds
def test_feed_metadata_extraction(self, collector, sample_rss_feed):
"""Test extraction of feed metadata."""
metadata = collector._extract_feed_metadata(sample_rss_feed)
assert metadata["title"] == "테스트 뉴스 RSS"
assert metadata["link"] == "https://example.com/news"
assert metadata["description"] == "테스트용 뉴스 피드"
assert metadata["language"] == "ko-KR"
def test_get_collector_stats(self, collector):
"""Test collector statistics."""
stats = collector.get_stats()
assert "requests_made" in stats
assert "articles_collected" in stats
assert "feeds_processed" in stats
assert "parsing_errors" in stats
@pytest.mark.asyncio
async def test_feed_health_check(self, collector):
"""Test RSS feed health checking."""
collector.feed_urls = [
"https://example.com/working.xml",
"https://example.com/broken.xml"
]
# Mock responses
async def mock_fetch_feed(url):
if "working" in url:
return "<rss><channel><title>Working</title></channel></rss>"
else:
raise aiohttp.ClientError("Feed is down")
collector._fetch_feed = AsyncMock(side_effect=mock_fetch_feed)
health_report = await collector.check_feeds_health()
assert len(health_report) == 2
assert health_report[0]["status"] == "healthy"
assert health_report[1]["status"] == "unhealthy"
def test_feed_caching(self, collector):
"""Test RSS feed caching mechanism."""
# Test cache key generation
cache_key = collector._generate_cache_key("https://example.com/rss.xml")
assert isinstance(cache_key, str)
assert len(cache_key) > 0
# Test cache expiry
assert collector._is_cache_expired(cache_key) is True # No cache initially
@pytest.mark.asyncio
async def test_rate_limiting_between_feeds(self, collector):
"""Test rate limiting between feed requests."""
collector.feed_urls = [
"https://example.com/rss1.xml",
"https://example.com/rss2.xml"
]
collector._fetch_feed = AsyncMock(return_value="<rss></rss>")
import time
start_time = time.time()
await collector.collect()
end_time = time.time()
# Should have some delay between requests (rate limiting)
# This is a basic test - actual implementation may vary