"""Base collector abstract class and common utilities."""
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Union
import asyncio
import hashlib
import logging
from datetime import datetime, timezone
from urllib.parse import urlparse, urljoin, urlunparse
import re
import aiohttp
from asyncio_throttle import Throttler
from src.utils.deduplicator import NewsDeduplicator
from src.utils.preprocessor import NewsPreprocessor
class CollectorError(Exception):
"""Base exception for collector-related errors."""
pass
class BaseCollector(ABC):
"""Abstract base class for news collectors."""
def __init__(self, source_name: str, max_requests_per_minute: int = 60):
"""Initialize base collector.
Args:
source_name: Name of the news source (e.g., 'naver', 'daum')
max_requests_per_minute: Rate limiting threshold
"""
self.source_name = source_name
self.logger = logging.getLogger(f"collector.{source_name}")
# HTTP session for requests
self.session: Optional[aiohttp.ClientSession] = None
# Rate limiting
self.rate_limiter = Throttler(rate_limit=max_requests_per_minute, period=60)
# Utility classes
self.preprocessor = NewsPreprocessor()
self.deduplicator = NewsDeduplicator()
# Statistics
self.stats = {
"requests_made": 0,
"articles_collected": 0,
"duplicates_filtered": 0,
"errors_occurred": 0
}
async def __aenter__(self):
"""Async context manager entry."""
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def initialize(self):
"""Initialize the collector (create session, etc.)."""
if self.session is None:
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={
'User-Agent': 'NewsCollector/1.0 (Compatible Bot)'
}
)
async def close(self):
"""Clean up resources."""
if self.session:
await self.session.close()
self.session = None
@abstractmethod
async def collect(self, **kwargs) -> List[Dict[str, Any]]:
"""Collect raw news data from the source.
Args:
**kwargs: Collection parameters (keyword, limit, etc.)
Returns:
List of raw news data dictionaries
Raises:
CollectorError: If collection fails
"""
pass
@abstractmethod
async def parse(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse raw news data into standardized format.
Args:
raw_data: Raw news data from collect()
Returns:
Standardized news dictionary with keys:
- title: str
- content: str
- url: str
- published_at: datetime
- source: str
- hash: str
"""
pass
async def validate(self, parsed_data: Dict[str, Any]) -> bool:
"""Validate parsed news data.
Args:
parsed_data: Parsed news data from parse()
Returns:
True if data is valid, False otherwise
"""
required_fields = ['title', 'url']
for field in required_fields:
if not parsed_data.get(field):
self.logger.warning(f"Missing required field: {field}")
return False
# Validate URL format
try:
parsed_url = urlparse(parsed_data['url'])
if not parsed_url.netloc:
self.logger.warning(f"Invalid URL format: {parsed_data['url']}")
return False
except Exception:
self.logger.warning(f"URL parsing failed: {parsed_data['url']}")
return False
return True
def generate_hash(self, data: Dict[str, Any]) -> str:
"""Generate unique hash for news item.
Args:
data: News data dictionary
Returns:
SHA-256 hash as hex string
"""
# Create hash from title + url + key content parts
hash_components = [
data.get('title', ''),
data.get('url', ''),
data.get('content', '')[:500] # First 500 chars of content
]
hash_string = '|'.join(hash_components).encode('utf-8')
return hashlib.sha256(hash_string).hexdigest()
def normalize_url(self, url: str) -> str:
"""Normalize URL for consistency.
Args:
url: Original URL
Returns:
Normalized URL
"""
url = url.strip()
# Add scheme if missing (case-insensitive check)
if not url.lower().startswith(('http://', 'https://')):
url = 'https://' + url
# Parse URL
parsed = urlparse(url)
# Force HTTPS if HTTP
scheme = 'https' if parsed.scheme.lower() == 'http' else parsed.scheme.lower()
# Lowercase hostname
netloc = parsed.netloc.lower()
# Clean path (remove ../, etc.) and normalize case
path = parsed.path.lower() if parsed.path else '/'
if path and path != '/':
# Split and process path components to handle .. and .
path_parts = []
for part in path.split('/'):
if part == '' or part == '.':
continue
elif part == '..':
# Go up one level by removing last component
if path_parts:
path_parts.pop()
else:
path_parts.append(part)
path = '/' + '/'.join(path_parts) if path_parts else '/'
# Remove common tracking parameters
query = parsed.query
if query:
tracking_params = [
'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content',
'fbclid', 'gclid', '_ga', 'mc_cid', 'mc_eid'
]
query_parts = []
for part in query.split('&'):
if '=' in part:
key, _ = part.split('=', 1)
if key not in tracking_params:
query_parts.append(part)
query = '&'.join(query_parts)
# Reconstruct URL
return urlunparse((scheme, netloc, path, parsed.params, query, ''))
async def collect_with_retry(self, max_retries: int = 3, **kwargs) -> List[Dict[str, Any]]:
"""Collect with retry mechanism.
Args:
max_retries: Maximum number of retry attempts
**kwargs: Parameters to pass to collect()
Returns:
List of collected news data
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await self.collect(**kwargs)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
last_exception = e
if attempt < max_retries:
wait_time = 2 ** attempt # Exponential backoff
self.logger.warning(
f"Attempt {attempt + 1} failed, retrying in {wait_time}s: {e}"
)
await asyncio.sleep(wait_time)
else:
self.logger.error(f"All {max_retries + 1} attempts failed")
raise CollectorError(f"Collection failed after {max_retries + 1} attempts") from last_exception
async def collect_and_process(self, **kwargs) -> List[Dict[str, Any]]:
"""Full collection and processing pipeline.
Args:
**kwargs: Collection parameters
Returns:
List of processed and validated news items
"""
processed_items = []
try:
# Ensure session is initialized
await self.initialize()
# Apply rate limiting
async with self.rate_limiter:
# Collect raw data
raw_items = await self.collect_with_retry(**kwargs)
self.stats["requests_made"] += 1
# Process each item
for raw_item in raw_items:
try:
# Parse the item
parsed_item = await self.parse(raw_item)
# Validate the parsed item
if not await self.validate(parsed_item):
continue
# Generate hash
parsed_item['hash'] = self.generate_hash(parsed_item)
# Check for duplicates
if await self.deduplicator.is_duplicate(parsed_item):
self.stats["duplicates_filtered"] += 1
self.logger.debug(f"Filtered duplicate: {parsed_item.get('title', '')[:50]}")
continue
# Preprocess content
if parsed_item.get('content'):
parsed_item['content'] = self.preprocessor.process(parsed_item['content'])
# Normalize URL
parsed_item['url'] = self.normalize_url(parsed_item['url'])
# Add source and timestamp
parsed_item['source'] = self.source_name
if 'collected_at' not in parsed_item:
parsed_item['collected_at'] = datetime.now(timezone.utc)
processed_items.append(parsed_item)
self.stats["articles_collected"] += 1
except Exception as e:
self.stats["errors_occurred"] += 1
self.logger.error(f"Error processing item: {e}")
continue
except Exception as e:
self.stats["errors_occurred"] += 1
self.logger.error(f"Collection pipeline failed: {e}")
raise CollectorError(f"Collection pipeline failed: {e}") from e
self.logger.info(
f"Collection completed: {len(processed_items)} items processed, "
f"{self.stats['duplicates_filtered']} duplicates filtered, "
f"{self.stats['errors_occurred']} errors"
)
return processed_items
def get_stats(self) -> Dict[str, Any]:
"""Get collector statistics.
Returns:
Dictionary with collection statistics
"""
return self.stats.copy()
def reset_stats(self):
"""Reset collector statistics."""
self.stats = {
"requests_made": 0,
"articles_collected": 0,
"duplicates_filtered": 0,
"errors_occurred": 0
}