"""Naver News collector implementation."""
import re
import logging
from datetime import datetime, timezone, date
from typing import Dict, Any, List, Optional
from urllib.parse import urlencode, urlparse, parse_qs
import aiohttp
from bs4 import BeautifulSoup
from src.collectors.base_collector import BaseCollector, CollectorError
class NaverAPIError(Exception):
"""Naver API specific error."""
pass
class NaverNewsCollector(BaseCollector):
"""Naver News API collector."""
def __init__(self, client_id: str, client_secret: str, **kwargs):
"""Initialize Naver News collector.
Args:
client_id: Naver API client ID
client_secret: Naver API client secret
**kwargs: Additional arguments passed to BaseCollector
"""
if not client_id or not client_secret:
raise ValueError("Naver API credentials (client_id, client_secret) are required")
super().__init__(source_name="naver", **kwargs)
self.client_id = client_id
self.client_secret = client_secret
self.base_url = "https://openapi.naver.com/v1/search/news.json"
self.max_results_per_request = 100 # Naver API limit
# Statistics tracking
self.stats.update({
"api_errors": 0,
"rate_limited_requests": 0,
"full_content_fetched": 0
})
self.logger = logging.getLogger("collector.naver")
def _build_api_url(self, keyword: str, start: int = 1, display: int = 100,
sort: str = "date") -> str:
"""Build Naver API URL with parameters.
Args:
keyword: Search keyword
start: Start index (1-based)
display: Number of results to display
sort: Sort order ('date' or 'sim')
Returns:
Complete API URL
"""
params = {
"query": keyword,
"start": start,
"display": min(display, self.max_results_per_request),
"sort": self._validate_sort_option(sort)
}
return f"{self.base_url}?{urlencode(params, encoding='utf-8')}"
def _build_headers(self) -> Dict[str, str]:
"""Build headers for Naver API request.
Returns:
Headers dictionary
"""
return {
"X-Naver-Client-Id": self.client_id,
"X-Naver-Client-Secret": self.client_secret,
"User-Agent": "NewsCollector/1.0 (Compatible Bot)"
}
def _validate_sort_option(self, sort: str) -> str:
"""Validate and normalize sort option.
Args:
sort: Sort option to validate
Returns:
Valid sort option
"""
valid_sorts = {"date", "sim"}
return sort if sort in valid_sorts else "date"
async def _make_api_request(self, keyword: str, start: int = 1,
display: int = 100, sort: str = "date") -> Dict[str, Any]:
"""Make request to Naver API.
Args:
keyword: Search keyword
start: Start index
display: Number of results
sort: Sort order
Returns:
API response data
Raises:
NaverAPIError: If API request fails
CollectorError: If network error occurs
"""
url = self._build_api_url(keyword, start, display, sort)
headers = self._build_headers()
try:
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
self.stats["rate_limited_requests"] += 1
raise NaverAPIError("Rate limit exceeded")
else:
error_text = await response.text()
self.stats["api_errors"] += 1
raise NaverAPIError(f"API request failed: {response.status} - {error_text}")
except aiohttp.ClientError as e:
self.logger.error(f"Network error during API request: {e}")
raise CollectorError(f"Network error: {e}") from e
async def collect(self, keyword: str = None, limit: int = 100,
sort: str = "date", fetch_full_content: bool = False,
start_date: Optional[date] = None, end_date: Optional[date] = None,
**kwargs) -> List[Dict[str, Any]]:
"""Collect news from Naver API.
Args:
keyword: Search keyword (required)
limit: Maximum number of articles to collect
sort: Sort order ('date' or 'sim')
fetch_full_content: Whether to fetch full article content
start_date: Start date for filtering
end_date: End date for filtering
**kwargs: Additional arguments
Returns:
List of raw news data
Raises:
CollectorError: If collection fails
"""
if not keyword:
raise CollectorError("Keyword is required for Naver news collection")
await self.initialize()
all_articles = []
start_index = 1
collected_count = 0
try:
while collected_count < limit:
# Calculate how many to request this time
remaining = limit - collected_count
display_count = min(remaining, self.max_results_per_request)
# Make API request
response = await self._make_api_request(
keyword=keyword,
start=start_index,
display=display_count,
sort=sort
)
items = response.get("items", [])
if not items:
break # No more results
# Filter by date if specified
filtered_items = self._filter_by_date(items, start_date, end_date)
# Fetch full content if requested
if fetch_full_content:
for item in filtered_items:
full_content = await self._fetch_full_content(item["link"])
if full_content:
item["full_content"] = full_content
self.stats["full_content_fetched"] += 1
all_articles.extend(filtered_items)
collected_count = len(all_articles)
# Check if we've reached the end of available results
if len(items) < display_count:
break
start_index += display_count
self.logger.info(f"Collected {len(all_articles)} articles for keyword '{keyword}'")
return all_articles
except Exception as e:
self.logger.error(f"Error during collection: {e}")
raise
def _filter_by_date(self, items: List[Dict[str, Any]],
start_date: Optional[date] = None,
end_date: Optional[date] = None) -> List[Dict[str, Any]]:
"""Filter articles by date range.
Args:
items: List of articles to filter
start_date: Start date filter
end_date: End date filter
Returns:
Filtered list of articles
"""
if not start_date and not end_date:
return items
filtered = []
for item in items:
try:
pub_date = self._parse_date(item.get("pubDate", ""))
if not pub_date:
continue
article_date = pub_date.date()
# Apply date filters
if start_date and article_date < start_date:
continue
if end_date and article_date > end_date:
continue
filtered.append(item)
except Exception:
# Include articles with unparseable dates
filtered.append(item)
return filtered
async def parse(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse raw Naver API response into standardized format.
Args:
raw_data: Raw article data from Naver API
Returns:
Standardized news dictionary
"""
title = self._clean_html_tags(raw_data.get("title", ""))
description = self._clean_html_tags(raw_data.get("description", ""))
# Use full content if available, otherwise use description
content = raw_data.get("full_content", description)
parsed_data = {
"title": title,
"content": content,
"url": raw_data.get("link", ""),
"original_url": raw_data.get("originallink", ""),
"published_at": self._parse_date(raw_data.get("pubDate", "")),
"source": self.source_name,
"article_id": self._extract_article_id(raw_data.get("link", "")),
"raw_data": raw_data # Keep original data for debugging
}
return parsed_data
def _clean_html_tags(self, text: str) -> str:
"""Remove HTML tags from text.
Args:
text: Text with HTML tags
Returns:
Clean text without HTML tags
"""
if not text:
return ""
# Remove HTML tags
clean_text = re.sub(r'<[^>]+>', '', text)
return clean_text.strip()
def _parse_date(self, date_string: str) -> datetime:
"""Parse date string from Naver API format.
Args:
date_string: Date string (e.g., "Tue, 23 Jul 2024 14:30:00 +0900")
Returns:
Parsed datetime object
"""
if not date_string:
return datetime.now(timezone.utc)
try:
# Naver uses RFC 2822 format
from email.utils import parsedate_to_datetime
parsed = parsedate_to_datetime(date_string)
# Convert to UTC if needed
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except Exception as e:
self.logger.warning(f"Failed to parse date '{date_string}': {e}")
return datetime.now(timezone.utc)
def _extract_article_id(self, url: str) -> Optional[str]:
"""Extract article ID from Naver news URL.
Args:
url: Naver news URL
Returns:
Article ID in format "oid_aid" or None
"""
if not url or "news.naver.com" not in url:
return None
try:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
oid = query_params.get("oid", [None])[0]
aid = query_params.get("aid", [None])[0]
if oid and aid:
return f"{oid}_{aid}"
except Exception as e:
self.logger.debug(f"Failed to extract article ID from URL {url}: {e}")
return None
async def _fetch_full_content(self, article_url: str) -> str:
"""Fetch full content from article URL.
Args:
article_url: URL of the article
Returns:
Full article content or empty string if failed
"""
if not article_url:
return ""
try:
async with self.session.get(article_url) as response:
if response.status != 200:
return ""
html_content = await response.text()
soup = BeautifulSoup(html_content, 'html.parser')
# Try different content selectors for Naver news
content_selectors = [
".go_trans._article_content", # New Naver layout
"#articleBodyContents", # Old Naver layout
".news_end", # Another layout
"._article_body_contents" # Mobile layout
]
for selector in content_selectors:
content_element = soup.select_one(selector)
if content_element:
# Clean up the content
# Remove script and style elements
for element in content_element.find_all(["script", "style"]):
element.decompose()
text = content_element.get_text(separator='\n', strip=True)
if text and len(text) > 100: # Ensure we got substantial content
return text
# Fallback: try to get content from meta description or paragraph tags
meta_desc = soup.find("meta", property="og:description")
if meta_desc and meta_desc.get("content"):
return meta_desc["content"]
# Last resort: get first few paragraphs
paragraphs = soup.find_all("p")
if paragraphs:
content_parts = []
for p in paragraphs[:5]: # First 5 paragraphs
text = p.get_text(strip=True)
if text and len(text) > 20: # Skip very short paragraphs
content_parts.append(text)
if content_parts:
return '\n'.join(content_parts)
except Exception as e:
self.logger.debug(f"Failed to fetch full content from {article_url}: {e}")
return ""
def _build_search_filters(self, start_date: Optional[date] = None,
end_date: Optional[date] = None) -> Dict[str, Any]:
"""Build search filters for API request.
Args:
start_date: Start date for filtering
end_date: End date for filtering
Returns:
Filter dictionary
"""
filters = {}
if start_date:
filters["start_date"] = start_date.isoformat()
if end_date:
filters["end_date"] = end_date.isoformat()
return filters
def get_stats(self) -> Dict[str, Any]:
"""Get collector statistics.
Returns:
Dictionary with collection statistics
"""
return {
**super().get_stats(),
"api_errors": self.stats["api_errors"],
"rate_limited_requests": self.stats["rate_limited_requests"],
"full_content_fetched": self.stats["full_content_fetched"]
}