"""Daum News collector implementation."""
import re
import logging
from datetime import datetime, timezone, date
from typing import Dict, Any, List, Optional
from urllib.parse import urlencode, urlparse
import aiohttp
from bs4 import BeautifulSoup
from src.collectors.base_collector import BaseCollector, CollectorError
class DaumAPIError(Exception):
"""Daum API specific error."""
pass
class DaumNewsCollector(BaseCollector):
"""Daum Search API collector."""
def __init__(self, api_key: str, **kwargs):
"""Initialize Daum News collector.
Args:
api_key: Kakao/Daum API key
**kwargs: Additional arguments passed to BaseCollector
"""
if not api_key:
raise ValueError("Daum API key is required")
super().__init__(source_name="daum", **kwargs)
self.api_key = api_key
self.base_url = "https://dapi.kakao.com/v2/search/web"
self.max_results_per_request = 50 # Daum API limit
self.requests_per_minute = 100
self.concurrent_requests = 10
# Statistics tracking
self.stats.update({
"api_errors": 0,
"rate_limited_requests": 0,
"full_content_fetched": 0
})
self.logger = logging.getLogger("collector.daum")
def _build_api_url(self, keyword: str, page: int = 1, size: int = 50,
sort: str = "recency") -> str:
"""Build Daum API URL with parameters.
Args:
keyword: Search keyword
page: Page number (1-based)
size: Number of results to display
sort: Sort order ('recency' or 'accuracy')
Returns:
Complete API URL
"""
params = {
"query": keyword,
"page": page,
"size": min(size, self.max_results_per_request),
"sort": self._validate_sort_option(sort)
}
return f"{self.base_url}?{urlencode(params, encoding='utf-8')}"
def _build_headers(self) -> Dict[str, str]:
"""Build headers for Daum API request.
Returns:
Headers dictionary
"""
return {
"Authorization": f"KakaoAK {self.api_key}",
"User-Agent": "NewsCollector/1.0 (Compatible Bot)"
}
def _validate_sort_option(self, sort: str) -> str:
"""Validate and normalize sort option.
Args:
sort: Sort option to validate
Returns:
Valid sort option
"""
valid_sorts = {"recency", "accuracy"}
return sort if sort in valid_sorts else "recency"
async def _make_api_request(self, keyword: str, page: int = 1,
size: int = 50, sort: str = "recency") -> Dict[str, Any]:
"""Make request to Daum API.
Args:
keyword: Search keyword
page: Page number
size: Number of results
sort: Sort order
Returns:
API response data
Raises:
DaumAPIError: If API request fails
CollectorError: If network error occurs
"""
url = self._build_api_url(keyword, page, size, sort)
headers = self._build_headers()
try:
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
self.stats["rate_limited_requests"] += 1
raise DaumAPIError("Rate limit exceeded")
else:
error_text = await response.text()
self.stats["api_errors"] += 1
raise DaumAPIError(f"API request failed: {response.status} - {error_text}")
except aiohttp.ClientError as e:
self.logger.error(f"Network error during API request: {e}")
raise CollectorError(f"Network error: {e}") from e
async def collect(self, keyword: str = None, limit: int = 50,
sort: str = "recency", fetch_full_content: bool = False,
start_date: Optional[date] = None, end_date: Optional[date] = None,
category: Optional[str] = None, **kwargs) -> List[Dict[str, Any]]:
"""Collect news from Daum API.
Args:
keyword: Search keyword (required)
limit: Maximum number of articles to collect
sort: Sort order ('recency' or 'accuracy')
fetch_full_content: Whether to fetch full article content
start_date: Start date for filtering
end_date: End date for filtering
category: Category filter
**kwargs: Additional arguments
Returns:
List of raw news data
Raises:
CollectorError: If collection fails
"""
if not keyword:
raise CollectorError("Keyword is required for Daum news collection")
await self.initialize()
all_articles = []
page = 1
collected_count = 0
try:
while collected_count < limit:
# Calculate how many to request this time
remaining = limit - collected_count
size = min(remaining, self.max_results_per_request)
# Make API request
response = await self._make_api_request(
keyword=keyword,
page=page,
size=size,
sort=sort
)
documents = response.get("documents", [])
if not documents:
break # No more results
# Filter by date if specified
filtered_documents = self._filter_by_date(documents, start_date, end_date)
# Fetch full content if requested
if fetch_full_content:
for doc in filtered_documents:
full_content = await self._fetch_full_content(doc["url"])
if full_content:
doc["full_content"] = full_content
self.stats["full_content_fetched"] += 1
all_articles.extend(filtered_documents)
collected_count = len(all_articles)
# Check if we've reached the end of available results
meta = response.get("meta", {})
if meta.get("is_end", True) or len(documents) < size:
break
page += 1
self.logger.info(f"Collected {len(all_articles)} articles for keyword '{keyword}'")
return all_articles
except Exception as e:
self.logger.error(f"Error during collection: {e}")
raise
def _filter_by_date(self, documents: List[Dict[str, Any]],
start_date: Optional[date] = None,
end_date: Optional[date] = None) -> List[Dict[str, Any]]:
"""Filter articles by date range.
Args:
documents: List of documents to filter
start_date: Start date filter
end_date: End date filter
Returns:
Filtered list of documents
"""
if not start_date and not end_date:
return documents
filtered = []
for doc in documents:
try:
pub_date = self._parse_date(doc.get("datetime", ""))
if not pub_date:
continue
article_date = pub_date.date()
# Apply date filters
if start_date and article_date < start_date:
continue
if end_date and article_date > end_date:
continue
filtered.append(doc)
except Exception:
# Include documents with unparseable dates
filtered.append(doc)
return filtered
async def parse(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse raw Daum API response into standardized format.
Args:
raw_data: Raw document data from Daum API
Returns:
Standardized news dictionary
"""
title = self._clean_html_tags(raw_data.get("title", ""))
contents = self._clean_html_tags(raw_data.get("contents", ""))
# Use full content if available, otherwise use contents
content = raw_data.get("full_content", contents)
parsed_data = {
"title": title,
"content": content,
"url": raw_data.get("url", ""),
"published_at": self._parse_date(raw_data.get("datetime", "")),
"source": self.source_name,
"blogname": raw_data.get("blogname", ""),
"thumbnail": raw_data.get("thumbnail", ""),
"article_id": self._extract_article_id(raw_data.get("url", "")),
"raw_data": raw_data # Keep original data for debugging
}
return parsed_data
def _clean_html_tags(self, text: str) -> str:
"""Remove HTML tags from text.
Args:
text: Text with HTML tags
Returns:
Clean text without HTML tags
"""
if not text:
return ""
# Remove HTML tags
clean_text = re.sub(r'<[^>]+>', '', text)
return clean_text.strip()
def _parse_date(self, date_string: str) -> datetime:
"""Parse date string from Daum API format.
Args:
date_string: Date string (e.g., "2024-07-23T14:30:00.000+09:00")
Returns:
Parsed datetime object
"""
if not date_string:
return datetime.now(timezone.utc)
try:
# Daum uses ISO 8601 format
from dateutil.parser import parse
parsed = parse(date_string)
# Convert to UTC if needed
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except Exception as e:
self.logger.warning(f"Failed to parse date '{date_string}': {e}")
return datetime.now(timezone.utc)
def _extract_article_id(self, url: str) -> Optional[str]:
"""Extract article ID from Daum news URL.
Args:
url: Daum news URL
Returns:
Article ID or None
"""
if not url or "v.daum.net" not in url:
return None
try:
# Extract ID from URL like https://v.daum.net/v/20240723143000123
match = re.search(r'/v/(\d+)', url)
if match:
return match.group(1)
except Exception as e:
self.logger.debug(f"Failed to extract article ID from URL {url}: {e}")
return None
async def _fetch_full_content(self, article_url: str) -> str:
"""Fetch full content from article URL.
Args:
article_url: URL of the article
Returns:
Full article content or empty string if failed
"""
if not article_url:
return ""
try:
async with self.session.get(article_url) as response:
if response.status != 200:
return ""
html_content = await response.text()
soup = BeautifulSoup(html_content, 'html.parser')
# Try different content selectors for Daum news
content_selectors = [
".news_view", # Common Daum layout
".article_view", # Alternative layout
".news_article", # Another layout
"#mArticle", # Mobile layout
".view_txt" # Old layout
]
for selector in content_selectors:
content_element = soup.select_one(selector)
if content_element:
# Clean up the content
# Remove script and style elements
for element in content_element.find_all(["script", "style"]):
element.decompose()
text = content_element.get_text(separator='\n', strip=True)
if text and len(text) > 100: # Ensure we got substantial content
return text
# Fallback: try to get content from meta description or paragraph tags
meta_desc = soup.find("meta", property="og:description")
if meta_desc and meta_desc.get("content"):
return meta_desc["content"]
# Last resort: get first few paragraphs
paragraphs = soup.find_all("p")
if paragraphs:
content_parts = []
for p in paragraphs[:5]: # First 5 paragraphs
text = p.get_text(strip=True)
if text and len(text) > 20: # Skip very short paragraphs
content_parts.append(text)
if content_parts:
return '\n'.join(content_parts)
except Exception as e:
self.logger.debug(f"Failed to fetch full content from {article_url}: {e}")
return ""
def _build_search_filters(self, start_date: Optional[date] = None,
end_date: Optional[date] = None) -> Dict[str, Any]:
"""Build search filters for API request.
Args:
start_date: Start date for filtering
end_date: End date for filtering
Returns:
Filter dictionary
"""
filters = {}
if start_date:
filters["start_date"] = start_date.isoformat()
if end_date:
filters["end_date"] = end_date.isoformat()
return filters
def _process_thumbnail_url(self, thumbnail_url: str) -> str:
"""Process thumbnail URL for standardization.
Args:
thumbnail_url: Original thumbnail URL
Returns:
Processed thumbnail URL
"""
if not thumbnail_url:
return ""
# Keep Daum CDN URLs as-is
if "daumcdn.net" in thumbnail_url:
return thumbnail_url
return thumbnail_url
def _extract_source_info(self, raw_data: Dict[str, Any]) -> Dict[str, str]:
"""Extract source information from raw data.
Args:
raw_data: Raw document data
Returns:
Source information dictionary
"""
blogname = raw_data.get("blogname", "")
return {
"blogname": blogname,
"source_type": "media" if blogname else "unknown"
}
def _is_valid_daum_url(self, url: str) -> bool:
"""Check if URL is a valid Daum domain URL.
Args:
url: URL to validate
Returns:
True if valid Daum URL, False otherwise
"""
if not url:
return False
try:
parsed = urlparse(url)
daum_domains = ["v.daum.net", "news.v.daum.net"]
return any(domain in parsed.netloc for domain in daum_domains)
except Exception:
return False
def get_stats(self) -> Dict[str, Any]:
"""Get collector statistics.
Returns:
Dictionary with collection statistics
"""
return {
**super().get_stats(),
"api_errors": self.stats["api_errors"],
"rate_limited_requests": self.stats["rate_limited_requests"],
"full_content_fetched": self.stats["full_content_fetched"]
}