"""Main link checker implementation."""
import asyncio
import logging
from urllib.parse import urlparse
import httpx
from .models import CheckResult, CheckSummary, LinkResult
from .link_extractor import LinkExtractor
from .link_validator import LinkValidator
from .robots_parser import RobotsParser
from .rate_limiter import RateLimiter
logger = logging.getLogger(__name__)
class LinkChecker:
"""Main link checker for scanning pages and domains."""
def __init__(
self,
timeout: float = 10.0,
rate_limit_delay: float = 1.0,
user_agent: str = "BrokenLinkChecker/1.0",
):
"""
Initialize the link checker.
Args:
timeout: Request timeout in seconds
rate_limit_delay: Default delay between requests to same domain
user_agent: User agent string for requests
"""
self.timeout = timeout
self.user_agent = user_agent
self.extractor = LinkExtractor()
self.validator = LinkValidator(timeout=timeout)
self.robots = RobotsParser(user_agent=user_agent)
self.rate_limiter = RateLimiter(default_delay=rate_limit_delay)
async def _fetch_page(self, url: str) -> str | None:
"""
Fetch a page's HTML content.
Args:
url: The URL to fetch
Returns:
HTML content or None if fetch failed
"""
try:
# Check robots.txt
if not await self.robots.can_fetch(url):
logger.info(f"Skipping {url} (blocked by robots.txt)")
return None
# Apply rate limiting
await self.rate_limiter.acquire(url)
# Fetch the page
async with httpx.AsyncClient(
timeout=self.timeout,
headers={"User-Agent": self.user_agent},
follow_redirects=True,
) as client:
response = await client.get(url)
response.raise_for_status()
return response.text
except Exception as e:
logger.error(f"Failed to fetch {url}: {e}")
return None
async def check_page(self, url: str) -> CheckResult:
"""
Check all links on a single page.
Args:
url: The URL of the page to check
Returns:
CheckResult containing all link validation results
"""
logger.info(f"Checking page: {url}")
# Fetch the page
html = await self._fetch_page(url)
if html is None:
return CheckResult(
results=[],
summary=CheckSummary(
total_links=0, good_links=0, bad_links=0, pages_scanned=0
),
)
# Extract all links
links = self.extractor.extract_links(html, url)
logger.info(f"Found {len(links)} links on {url}")
if not links:
return CheckResult(
results=[],
summary=CheckSummary(
total_links=0, good_links=0, bad_links=0, pages_scanned=1
),
)
# Get unique URLs to validate
unique_urls = list({link.url for link in links})
# Validate all links concurrently
validation_results = await self.validator.validate_links(unique_urls)
# Build results
results = []
for link in links:
status = validation_results.get(link.url, "Bad")
results.append(
LinkResult(
page_url=url,
link_reference=link.reference,
link_url=link.url,
status=status,
)
)
# Calculate summary
good_count = sum(1 for r in results if r.status == "Good")
bad_count = sum(1 for r in results if r.status == "Bad")
return CheckResult(
results=results,
summary=CheckSummary(
total_links=len(results),
good_links=good_count,
bad_links=bad_count,
pages_scanned=1,
),
)
async def check_domain(self, url: str, max_depth: int = -1) -> CheckResult:
"""
Recursively check all pages in a domain.
Args:
url: The root URL of the domain to check
max_depth: Maximum crawl depth (-1 for unlimited)
Returns:
CheckResult containing all link validation results across the domain
"""
logger.info(f"Checking domain: {url} (max_depth: {max_depth})")
# Get domain from URL
parsed = urlparse(url)
domain = f"{parsed.scheme}://{parsed.netloc}"
# Check if there's a crawl delay specified in robots.txt
crawl_delay = await self.robots.get_crawl_delay(url)
if crawl_delay:
logger.info(f"Setting crawl delay for {domain} to {crawl_delay}s")
self.rate_limiter.set_domain_delay(parsed.netloc, crawl_delay)
# Track visited pages and all results
visited_pages: set[str] = set()
all_results: list[LinkResult] = []
pages_to_visit: list[tuple[str, int]] = [(url, 0)] # (url, depth)
while pages_to_visit:
current_url, depth = pages_to_visit.pop(0)
# Skip if already visited
if current_url in visited_pages:
continue
# Skip if max depth exceeded
if max_depth >= 0 and depth > max_depth:
continue
visited_pages.add(current_url)
logger.info(f"Crawling {current_url} (depth: {depth})")
# Fetch and parse the page
html = await self._fetch_page(current_url)
if html is None:
continue
# Extract links
links = self.extractor.extract_links(html, current_url)
# Separate internal links for further crawling
internal_links = []
links_to_validate = []
for link in links:
if self.extractor.is_internal_link(link.url, domain):
# Only follow hyperlinks for crawling (not images, scripts, etc.)
if link.element_type == "a":
internal_links.append(link.url)
links_to_validate.append(link)
# Add internal links to crawl queue
for internal_link in internal_links:
if internal_link not in visited_pages:
pages_to_visit.append((internal_link, depth + 1))
# Validate all links on this page
if links_to_validate:
unique_urls = list({link.url for link in links_to_validate})
validation_results = await self.validator.validate_links(unique_urls)
# Build results for this page
for link in links_to_validate:
status = validation_results.get(link.url, "Bad")
all_results.append(
LinkResult(
page_url=current_url,
link_reference=link.reference,
link_url=link.url,
status=status,
)
)
# Calculate summary
good_count = sum(1 for r in all_results if r.status == "Good")
bad_count = sum(1 for r in all_results if r.status == "Bad")
logger.info(
f"Domain check complete: {len(visited_pages)} pages, {len(all_results)} links"
)
return CheckResult(
results=all_results,
summary=CheckSummary(
total_links=len(all_results),
good_links=good_count,
bad_links=bad_count,
pages_scanned=len(visited_pages),
),
)