"""RSS Feed collector implementation."""
import re
import logging
import hashlib
import time
from datetime import datetime, timezone, date
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse
import aiohttp
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
import asyncio
from src.collectors.base_collector import BaseCollector, CollectorError
class RSSParsingError(Exception):
"""RSS parsing specific error."""
pass
class RSSFeedCollector(BaseCollector):
"""RSS Feed collector."""
def __init__(self, feed_urls: Optional[List[str]] = None, **kwargs):
"""Initialize RSS Feed collector.
Args:
feed_urls: List of RSS/Atom feed URLs
**kwargs: Additional arguments passed to BaseCollector
"""
super().__init__(source_name="rss", **kwargs)
self.feed_urls = feed_urls or []
self.max_items_per_feed = 100
self.feed_cache = {}
self.cache_ttl = 300 # 5 minutes
# Statistics tracking
self.stats.update({
"feeds_processed": 0,
"parsing_errors": 0,
"full_content_fetched": 0
})
self.logger = logging.getLogger("collector.rss")
def add_feed_url(self, url: str) -> None:
"""Add a feed URL to the collection list.
Args:
url: RSS/Atom feed URL
"""
if url not in self.feed_urls:
self.feed_urls.append(url)
def remove_feed_url(self, url: str) -> None:
"""Remove a feed URL from the collection list.
Args:
url: RSS/Atom feed URL to remove
"""
if url in self.feed_urls:
self.feed_urls.remove(url)
async def _fetch_feed(self, feed_url: str) -> str:
"""Fetch RSS/Atom feed content.
Args:
feed_url: URL of the RSS/Atom feed
Returns:
Raw XML content
Raises:
CollectorError: If feed fetch fails
"""
try:
async with self.session.get(feed_url) as response:
if response.status == 200:
return await response.text()
else:
raise CollectorError(f"Feed fetch failed: {response.status}")
except aiohttp.ClientError as e:
self.logger.error(f"Network error fetching feed {feed_url}: {e}")
raise CollectorError(f"Network error: {e}") from e
def _detect_feed_type(self, xml_content: str) -> str:
"""Detect feed type (RSS or Atom).
Args:
xml_content: Raw XML content
Returns:
Feed type: 'rss', 'atom', or 'unknown'
"""
try:
root = ET.fromstring(xml_content)
if root.tag == "rss":
return "rss"
elif root.tag == "{http://www.w3.org/2005/Atom}feed":
return "atom"
elif "atom" in root.tag.lower():
return "atom"
else:
return "unknown"
except ET.ParseError:
return "unknown"
def _parse_rss_feed(self, xml_content: str) -> List[Dict[str, Any]]:
"""Parse RSS feed content.
Args:
xml_content: Raw RSS XML content
Returns:
List of parsed feed items
Raises:
RSSParsingError: If parsing fails
"""
try:
root = ET.fromstring(xml_content)
items = []
# Check if this is a valid RSS feed
if root.tag != "rss":
raise RSSParsingError("Not a valid RSS feed")
# Find all item elements
for item in root.findall(".//item"):
parsed_item = {}
# Extract basic fields
title_elem = item.find("title")
parsed_item["title"] = title_elem.text if title_elem is not None else ""
link_elem = item.find("link")
parsed_item["link"] = link_elem.text if link_elem is not None else ""
desc_elem = item.find("description")
parsed_item["description"] = desc_elem.text if desc_elem is not None else ""
pubdate_elem = item.find("pubDate")
parsed_item["pubDate"] = pubdate_elem.text if pubdate_elem is not None else ""
guid_elem = item.find("guid")
parsed_item["guid"] = guid_elem.text if guid_elem is not None else parsed_item["link"]
category_elem = item.find("category")
parsed_item["category"] = category_elem.text if category_elem is not None else ""
author_elem = item.find("author")
parsed_item["author"] = author_elem.text if author_elem is not None else ""
items.append(parsed_item)
return items
except ET.ParseError as e:
self.stats["parsing_errors"] += 1
raise RSSParsingError(f"Failed to parse RSS feed: {e}") from e
def _parse_atom_feed(self, xml_content: str) -> List[Dict[str, Any]]:
"""Parse Atom feed content.
Args:
xml_content: Raw Atom XML content
Returns:
List of parsed feed items
Raises:
RSSParsingError: If parsing fails
"""
try:
root = ET.fromstring(xml_content)
items = []
# Atom namespace
ns = {'atom': 'http://www.w3.org/2005/Atom'}
# Find all entry elements
for entry in root.findall(".//atom:entry", ns):
parsed_item = {}
# Extract basic fields
title_elem = entry.find("atom:title", ns)
parsed_item["title"] = title_elem.text if title_elem is not None else ""
link_elem = entry.find("atom:link", ns)
if link_elem is not None:
parsed_item["link"] = link_elem.get("href", "")
else:
parsed_item["link"] = ""
summary_elem = entry.find("atom:summary", ns)
if summary_elem is None:
summary_elem = entry.find("atom:content", ns)
parsed_item["description"] = summary_elem.text if summary_elem is not None else ""
updated_elem = entry.find("atom:updated", ns)
parsed_item["pubDate"] = updated_elem.text if updated_elem is not None else ""
id_elem = entry.find("atom:id", ns)
parsed_item["guid"] = id_elem.text if id_elem is not None else parsed_item["link"]
category_elem = entry.find("atom:category", ns)
if category_elem is not None:
parsed_item["category"] = category_elem.get("term", "")
else:
parsed_item["category"] = ""
author_elem = entry.find("atom:author/atom:name", ns)
parsed_item["author"] = author_elem.text if author_elem is not None else ""
items.append(parsed_item)
return items
except ET.ParseError as e:
self.stats["parsing_errors"] += 1
raise RSSParsingError(f"Failed to parse Atom feed: {e}") from e
async def collect_from_feed(self, feed_url: str, limit: int = None) -> List[Dict[str, Any]]:
"""Collect items from a single RSS feed.
Args:
feed_url: URL of the RSS/Atom feed
limit: Maximum number of items to collect
Returns:
List of collected items
"""
await self.initialize()
try:
# Fetch feed content
xml_content = await self._fetch_feed(feed_url)
# Detect feed type and parse accordingly
feed_type = self._detect_feed_type(xml_content)
if feed_type == "rss":
items = self._parse_rss_feed(xml_content)
elif feed_type == "atom":
items = self._parse_atom_feed(xml_content)
else:
raise RSSParsingError(f"Unknown feed type: {feed_type}")
# Apply limit if specified
if limit:
items = items[:limit]
self.stats["feeds_processed"] += 1
return items
except Exception as e:
self.logger.error(f"Error collecting from feed {feed_url}: {e}")
raise
async def collect(self, limit: int = None, fetch_full_content: bool = False,
start_date: Optional[date] = None, end_date: Optional[date] = None,
**kwargs) -> List[Dict[str, Any]]:
"""Collect news from all RSS feeds.
Args:
limit: Maximum number of articles to collect
fetch_full_content: Whether to fetch full article content
start_date: Start date for filtering
end_date: End date for filtering
**kwargs: Additional arguments
Returns:
List of collected articles
"""
if not self.feed_urls:
self.logger.warning("No feed URLs configured")
return []
all_items = []
collected_count = 0
for feed_url in self.feed_urls:
if limit and collected_count >= limit:
break
try:
# Calculate remaining items needed
remaining = limit - collected_count if limit else None
# Collect from this feed
items = await self.collect_from_feed(feed_url, remaining)
# Filter by date if specified
filtered_items = self._filter_by_date(items, start_date, end_date)
# Fetch full content if requested
if fetch_full_content:
for item in filtered_items:
if item.get("link"):
full_content = await self._fetch_full_content(item["link"])
if full_content:
item["full_content"] = full_content
self.stats["full_content_fetched"] += 1
all_items.extend(filtered_items)
collected_count = len(all_items)
except Exception as e:
self.logger.error(f"Error collecting from feed {feed_url}: {e}")
continue # Continue with other feeds
# Deduplicate and sort items
all_items = self._deduplicate_items(all_items)
all_items = self._sort_items_by_date(all_items)
# Apply final limit
if limit:
all_items = all_items[:limit]
self.logger.info(f"Collected {len(all_items)} items from {len(self.feed_urls)} feeds")
return all_items
def _filter_by_date(self, items: List[Dict[str, Any]],
start_date: Optional[date] = None,
end_date: Optional[date] = None) -> List[Dict[str, Any]]:
"""Filter items by date range.
Args:
items: List of items to filter
start_date: Start date filter
end_date: End date filter
Returns:
Filtered list of items
"""
if not start_date and not end_date:
return items
filtered = []
for item in items:
try:
pub_date = self._parse_date(item.get("pubDate", ""))
if not pub_date:
continue
item_date = pub_date.date()
# Apply date filters
if start_date and item_date < start_date:
continue
if end_date and item_date > end_date:
continue
filtered.append(item)
except Exception:
# Include items with unparseable dates
filtered.append(item)
return filtered
async def parse(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse raw feed item into standardized format.
Args:
raw_data: Raw feed item data
Returns:
Standardized news dictionary
"""
title = self._clean_html_tags(raw_data.get("title", ""))
description = self._clean_html_tags(raw_data.get("description", ""))
# Use full content if available, otherwise use description
content = raw_data.get("full_content", description)
parsed_data = {
"title": title,
"content": content,
"url": raw_data.get("link", ""),
"published_at": self._parse_date(raw_data.get("pubDate", "")),
"source": self.source_name,
"guid": self._extract_guid(raw_data),
"category": raw_data.get("category", ""),
"author": raw_data.get("author", ""),
"raw_data": raw_data # Keep original data for debugging
}
return parsed_data
def _clean_html_tags(self, text: str) -> str:
"""Remove HTML tags from text.
Args:
text: Text with HTML tags
Returns:
Clean text without HTML tags
"""
if not text:
return ""
# Remove HTML tags
clean_text = re.sub(r'<[^>]+>', '', text)
return clean_text.strip()
def _parse_date(self, date_string: str) -> datetime:
"""Parse date string from RSS/Atom format.
Args:
date_string: Date string
Returns:
Parsed datetime object
"""
if not date_string:
return datetime.now(timezone.utc)
try:
# Try RFC 2822 format first (RSS)
from email.utils import parsedate_to_datetime
try:
parsed = parsedate_to_datetime(date_string)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except (ValueError, TypeError):
pass
# Try ISO 8601 format (Atom)
from dateutil.parser import parse
parsed = parse(date_string)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except Exception as e:
self.logger.warning(f"Failed to parse date '{date_string}': {e}")
return datetime.now(timezone.utc)
def _extract_guid(self, item: Dict[str, Any]) -> str:
"""Extract GUID from feed item.
Args:
item: Feed item data
Returns:
GUID or link if no GUID available
"""
return item.get("guid", item.get("link", ""))
async def _fetch_full_content(self, article_url: str) -> str:
"""Fetch full content from article URL.
Args:
article_url: URL of the article
Returns:
Full article content or empty string if failed
"""
if not article_url:
return ""
try:
async with self.session.get(article_url) as response:
if response.status != 200:
return ""
html_content = await response.text()
soup = BeautifulSoup(html_content, 'html.parser')
# Try different content selectors
content_selectors = [
"article", # HTML5 article tag
".article-content", # Common class
".post-content", # Blog posts
".entry-content", # WordPress
".content", # Generic
"main" # HTML5 main tag
]
for selector in content_selectors:
content_element = soup.select_one(selector)
if content_element:
# Clean up the content
for element in content_element.find_all(["script", "style", "nav", "header", "footer"]):
element.decompose()
text = content_element.get_text(separator='\n', strip=True)
if text and len(text) > 100: # Ensure we got substantial content
return text
# Fallback: try to get content from meta description or paragraph tags
meta_desc = soup.find("meta", property="og:description")
if meta_desc and meta_desc.get("content"):
return meta_desc["content"]
# Last resort: get first few paragraphs
paragraphs = soup.find_all("p")
if paragraphs:
content_parts = []
for p in paragraphs[:5]: # First 5 paragraphs
text = p.get_text(strip=True)
if text and len(text) > 20: # Skip very short paragraphs
content_parts.append(text)
if content_parts:
return '\n'.join(content_parts)
except Exception as e:
self.logger.debug(f"Failed to fetch full content from {article_url}: {e}")
return ""
def _categorize_content(self, item: Dict[str, Any]) -> str:
"""Categorize content based on available information.
Args:
item: Feed item data
Returns:
Category string
"""
# Return explicit category if available
if item.get("category"):
return item["category"]
# Try to categorize based on title/content
title = item.get("title", "").lower()
content = item.get("description", "").lower()
text = f"{title} {content}"
# Simple keyword-based categorization
if any(word in text for word in ["경제", "주식", "금융", "경영"]):
return "경제"
elif any(word in text for word in ["정치", "정부", "선거", "국회"]):
return "정치"
elif any(word in text for word in ["기술", "IT", "컴퓨터", "소프트웨어"]):
return "기술"
elif any(word in text for word in ["스포츠", "축구", "야구", "농구"]):
return "스포츠"
else:
return "일반"
def _deduplicate_items(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Remove duplicate items based on GUID.
Args:
items: List of items to deduplicate
Returns:
Deduplicated list of items
"""
seen_guids = set()
deduplicated = []
for item in items:
guid = self._extract_guid(item)
if guid not in seen_guids:
seen_guids.add(guid)
deduplicated.append(item)
return deduplicated
def _sort_items_by_date(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Sort items by publication date (newest first).
Args:
items: List of items to sort
Returns:
Sorted list of items
"""
def get_sort_key(item):
try:
return self._parse_date(item.get("pubDate", ""))
except Exception:
return datetime.min.replace(tzinfo=timezone.utc)
return sorted(items, key=get_sort_key, reverse=True)
def _is_valid_feed_url(self, url: str) -> bool:
"""Check if URL is a valid feed URL.
Args:
url: URL to validate
Returns:
True if valid feed URL, False otherwise
"""
if not url:
return False
try:
parsed = urlparse(url)
return parsed.scheme in ["http", "https"] and bool(parsed.netloc)
except Exception:
return False
async def collect_concurrent(self) -> List[Dict[str, Any]]:
"""Collect from all feeds concurrently.
Returns:
List of collected items from all feeds
"""
if not self.feed_urls:
return []
await self.initialize()
# Create tasks for concurrent collection
tasks = []
for feed_url in self.feed_urls:
task = asyncio.create_task(self.collect_from_feed(feed_url))
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Combine results and handle exceptions
all_items = []
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"Error collecting from feed {self.feed_urls[i]}: {result}")
else:
all_items.extend(result)
return self._deduplicate_items(self._sort_items_by_date(all_items))
def _extract_feed_metadata(self, xml_content: str) -> Dict[str, str]:
"""Extract metadata from RSS feed.
Args:
xml_content: Raw XML content
Returns:
Feed metadata dictionary
"""
try:
root = ET.fromstring(xml_content)
metadata = {}
# RSS feed metadata
channel = root.find("channel")
if channel is not None:
title_elem = channel.find("title")
metadata["title"] = title_elem.text if title_elem is not None else ""
link_elem = channel.find("link")
metadata["link"] = link_elem.text if link_elem is not None else ""
desc_elem = channel.find("description")
metadata["description"] = desc_elem.text if desc_elem is not None else ""
lang_elem = channel.find("language")
metadata["language"] = lang_elem.text if lang_elem is not None else ""
return metadata
except ET.ParseError:
return {}
async def check_feeds_health(self) -> List[Dict[str, Any]]:
"""Check health status of all configured feeds.
Returns:
List of feed health reports
"""
health_reports = []
for feed_url in self.feed_urls:
report = {
"url": feed_url,
"status": "unknown",
"last_checked": datetime.now(timezone.utc),
"error": None
}
try:
xml_content = await self._fetch_feed(feed_url)
metadata = self._extract_feed_metadata(xml_content)
report["status"] = "healthy"
report["metadata"] = metadata
except Exception as e:
report["status"] = "unhealthy"
report["error"] = str(e)
health_reports.append(report)
return health_reports
def _generate_cache_key(self, feed_url: str) -> str:
"""Generate cache key for feed URL.
Args:
feed_url: Feed URL
Returns:
Cache key string
"""
return hashlib.md5(feed_url.encode()).hexdigest()
def _is_cache_expired(self, cache_key: str) -> bool:
"""Check if cache entry is expired.
Args:
cache_key: Cache key to check
Returns:
True if cache is expired or doesn't exist
"""
if cache_key not in self.feed_cache:
return True
cache_entry = self.feed_cache[cache_key]
return time.time() - cache_entry["timestamp"] > self.cache_ttl
def get_stats(self) -> Dict[str, Any]:
"""Get collector statistics.
Returns:
Dictionary with collection statistics
"""
return {
**super().get_stats(),
"feeds_processed": self.stats["feeds_processed"],
"parsing_errors": self.stats["parsing_errors"],
"full_content_fetched": self.stats["full_content_fetched"]
}