Skip to main content
Glama

DevDocs MCP Server

by cyberagiinc
crawler.py44.9 kB
from typing import List, Optional, Set, Dict, Any # Added Any import logging import sys import asyncio import os import requests import httpx # Added httpx import import json from datetime import datetime from pydantic import BaseModel from urllib.parse import urljoin, urlparse, urlsplit import re # Import status management functions from .status_manager import update_overall_status, update_url_status, is_cancellation_requested, get_job_status # Added get_job_status from .utils import normalize_url # Import from utils # Configure logging logger = logging.getLogger(__name__) # Increase recursion limit for complex pages sys.setrecursionlimit(10000) # Get Crawl4AI API URL and token from environment variables # Note: In Docker, we should use the container name, not localhost CRAWL4AI_URL = os.environ.get("CRAWL4AI_URL", "http://crawl4ai:11235") logger.info(f"Using Crawl4AI URL: {CRAWL4AI_URL}") # Hard-code a default API key for testing # This is a temporary solution for development/testing only DEFAULT_API_KEY = "devdocs-demo-key" CRAWL4AI_API_TOKEN = os.environ.get("CRAWL4AI_API_TOKEN", DEFAULT_API_KEY) # Set up headers for API requests headers = {"Authorization": f"Bearer {CRAWL4AI_API_TOKEN}"} logger.info(f"API token is {'set' if CRAWL4AI_API_TOKEN else 'not set'}") class InternalLink(BaseModel): href: str text: str status: str = 'pending' # Default status for internal links class DiscoveredPage(BaseModel): url: str title: Optional[str] = None status: str = "pending" # Default status for parent pages internalLinks: Optional[List[InternalLink]] = None class CrawlStats(BaseModel): subdomains_parsed: int = 0 pages_crawled: int = 0 data_extracted: str = "0 KB" errors_encountered: int = 0 class CrawlResult(BaseModel): markdown: str stats: CrawlStats # normalize_url function moved to utils.py def url_to_filename(url: str) -> str: """ Convert a URL to a valid filename. This function extracts the domain and path from a URL and converts it to a valid filename by replacing invalid characters with underscores. Args: url: The URL to convert Returns: A valid filename based on the URL """ # Parse the URL parsed = urlsplit(url) # Extract domain and path domain = parsed.netloc path = parsed.path # Remove www. prefix if present if domain.startswith('www.'): domain = domain[4:] # Replace dots and slashes with underscores domain = domain.replace('.', '_') # Clean up the path if path == '/' or not path: # If path is empty or just '/', use only the domain filename = domain else: # Remove leading and trailing slashes path = path.strip('/') # Replace slashes and other invalid characters with underscores path = re.sub(r'[\\/:*?"<>|]', '_', path) # Combine domain and path filename = f"{domain}_{path}" # Ensure the filename is not too long (max 255 characters) if len(filename) > 255: # If too long, truncate and add a hash of the original URL from hashlib import md5 url_hash = md5(url.encode()).hexdigest()[:8] filename = filename[:240] + '_' + url_hash # Convert to lowercase for consistency filename = filename.lower() logger.info(f"Converted URL '{url}' to filename '{filename}'") return filename # In-memory storage removed # MemoryFileObject class removed # is_individual_file function removed # is_consolidated_file function removed # redirect_file_writes function removed # _task_context removed # set_task_context function removed # Monkey patch for open function removed # Ensure storage/markdown directory exists os.makedirs("storage/markdown", exist_ok=True) # File redirection logging removed async def discover_pages( url: str, max_depth: int = 3, current_depth: int = 1, seen_urls: Set[str] = None, parent_urls: Set[str] = None, all_internal_links: Set[str] = None, root_url: str = None, root_task_id: str = None, # Keep for file naming consistency if needed, though job_id is primary now job_id: Optional[str] = None # Add job_id parameter ) -> List[DiscoveredPage]: """Recursively discovers internal links starting from a given URL.""" if seen_urls is None: seen_urls = set() if parent_urls is None: parent_urls = set() if all_internal_links is None: all_internal_links = set() # --- Cancellation Check --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} during discovery of {url}. Aborting discovery branch.") # No status update here, let the caller handle final status if needed, or rely on initial request handler return [] # Stop processing this branch # --- End Cancellation Check --- # If this is the root URL (first call), set root_url, generate root_task_id, and update status if root_url is None: root_url = url # Use a human-readable filename based on the URL root_task_id = url_to_filename(root_url) logger.info(f"Starting discovery for root URL: {root_url} with filename: {root_task_id} (Job ID: {job_id})") if job_id: update_overall_status(job_id, 'discovering') # No longer replacing docs.crawl4ai.com URLs logger.info(f"Processing URL: {url} without replacement") url = normalize_url(url) discovered_pages = [] logger.info(f"Starting discovery for URL: {url} at depth {current_depth}/{max_depth}") if url in seen_urls or current_depth > max_depth: logger.info(f"Skipping URL: {url} (seen: {url in seen_urls}, depth: {current_depth})") return discovered_pages seen_urls.add(url) parent_urls.add(url) try: # Update status for the current URL being discovered (no status code needed here) if job_id: update_url_status(job_id, normalize_url(url), 'discovering', statusCode=None) # --- Start: Re-added crawl4ai call for link discovery ONLY --- logger.info(f"Requesting link discovery for {url} from Crawl4AI (Job ID: {job_id})") simple_request = {"urls": url} task_id = None result = None try: # Submit job to Crawl4AI response = requests.post( f"{CRAWL4AI_URL}/crawl", headers=headers, json=simple_request, timeout=30 ) # Log the response status and headers logger.info(f"Response status code: {response.status_code}") logger.info(f"Response headers: {response.headers}") # Check if the response is valid JSON try: response_json = response.json() logger.info(f"Response JSON: {response_json}") except Exception as json_error: logger.error(f"Response is not valid JSON: {str(json_error)}") logger.error(f"Response text: {response.text[:500]}") # Return a page with error status return [DiscoveredPage( url=url, title=f"Invalid Response: Crawl4AI service returned invalid JSON", status="error", internalLinks=[] )] response.raise_for_status() task_id = response.json()["task_id"] logger.info(f"Submitted link discovery task for {url}, task ID: {task_id} (Job ID: {job_id})") # Poll for result # --- Configurable Polling Timeout --- poll_interval = 1 # Define poll interval (seconds) default_timeout = 300 discovery_timeout_str = os.environ.get("DISCOVERY_POLLING_TIMEOUT_SECONDS", str(default_timeout)) try: discovery_timeout = int(discovery_timeout_str) if discovery_timeout <= 0: logger.warning(f"Invalid DISCOVERY_POLLING_TIMEOUT_SECONDS value '{discovery_timeout_str}', using default {default_timeout}s.") discovery_timeout = default_timeout except ValueError: logger.warning(f"Non-integer DISCOVERY_POLLING_TIMEOUT_SECONDS value '{discovery_timeout_str}', using default {default_timeout}s.") discovery_timeout = default_timeout max_attempts = int(discovery_timeout / poll_interval) # Calculate max_attempts based on timeout and interval logger.info(f"Using discovery polling timeout: {discovery_timeout} seconds ({max_attempts} attempts at {poll_interval}s interval)") # --- End Configurable Polling Timeout --- logger.info(f"[Polling Debug] Starting polling for task {task_id} (URL: {url}, Job ID: {job_id})") # DEBUG LOG START for attempt in range(max_attempts): # --- Cancellation Check (Polling Loop) --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} while polling discovery task {task_id} for {url}. Aborting discovery.") # Update overall status to cancelled as this is a significant interruption point update_overall_status(job_id, 'cancelled') return [DiscoveredPage(url=url, title="Cancelled During Discovery", status="cancelled")] # Indicate cancellation # --- End Cancellation Check --- await asyncio.sleep(poll_interval) # Use await for asyncio.sleep logger.info(f"[Polling Debug] Polling task {task_id} (URL: {url}, Attempt: {attempt+1}/{max_attempts}, Job ID: {job_id})") # DEBUG LOG ATTEMPT status_response = requests.get( f"{CRAWL4AI_URL}/task/{task_id}", headers=headers, timeout=10 ) status_response.raise_for_status() status = status_response.json() logger.info(f"[Polling Debug] Task {task_id} status received: {status.get('status', 'N/A')} (Job ID: {job_id})") # DEBUG LOG STATUS RECEIVED if status["status"] == "completed": # Check if result field exists if "result" not in status: logger.error(f"Task completed but no 'result' field in response: {status}") return [DiscoveredPage( url=url, title=f"Invalid Response: No result in completed task response", status="error", internalLinks=[] )] result = status["result"] logger.info(f"Link discovery task {task_id} for {url} completed successfully (Job ID: {job_id})") # Mark URL as pending crawl, ready for the next phase (no status code needed here) if job_id: update_url_status(job_id, normalize_url(url), 'pending_crawl', statusCode=None) logger.info(f"[Polling Debug] Exiting poll loop for task {task_id} - COMPLETED (Job ID: {job_id})") # DEBUG LOG EXIT break # Exit polling loop on completion elif status["status"] == "failed": error_msg = status.get('error', 'Unknown error') logger.error(f"Link discovery task {task_id} for {url} failed: {error_msg} (Job ID: {job_id})") if job_id: update_url_status(job_id, normalize_url(url), 'discovery_error', statusCode=None, error_message=error_msg) # Pass error message result = None # Ensure result is None on failure logger.info(f"[Polling Debug] Exiting poll loop for task {task_id} - FAILED (Job ID: {job_id})") # DEBUG LOG EXIT break # Exit polling loop on failure # Continue polling if status is 'running' or other non-terminal state if result is None and status["status"] != "failed": # Check if loop finished without success or explicit failure timeout_error_msg = f"Timeout waiting for link discovery result (Task ID: {task_id})" logger.warning(f"{timeout_error_msg} for {url} (Job ID: {job_id})") if job_id: update_url_status(job_id, normalize_url(url), 'discovery_error', statusCode=None, error_message=timeout_error_msg) # Pass timeout message logger.warning(f"[Polling Debug] Exiting poll loop for task {task_id} - TIMEOUT (Job ID: {job_id})") # DEBUG LOG EXIT TIMEOUT except requests.exceptions.RequestException as e: request_error_msg = f"Request failed during link discovery: {str(e)}" logger.error(f"{request_error_msg} for {url}", exc_info=True) if job_id: update_url_status(job_id, normalize_url(url), 'discovery_error', statusCode=None, error_message=request_error_msg) # Pass request error message result = None # Ensure result is None on request error except Exception as e: unexpected_error_msg = f"Unexpected error during link discovery: {str(e)}" logger.error(f"{unexpected_error_msg} for {url}", exc_info=True) if job_id: update_url_status(job_id, normalize_url(url), 'discovery_error', statusCode=None, error_message=unexpected_error_msg) # Pass unexpected error message result = None # Ensure result is None on other errors logger.error(f"[Polling Debug] Exiting poll loop for task {task_id} due to unexpected error during discovery request/polling. (Job ID: {job_id})") # DEBUG LOG EXIT ERROR # --- End: Re-added crawl4ai call for link discovery ONLY --- # If discovery failed (result is None), return minimal error page for this branch if result is None: logger.warning(f"Skipping further processing for {url} due to discovery failure/timeout (Job ID: {job_id})") # Return minimal info indicating error for this branch return [DiscoveredPage(url=url, title="Error During Discovery", status="error")] # Extract title and links from result title = "Untitled Page" if "title" in result: title = result["title"] elif "markdown" in result and result["markdown"]: content_lines = result["markdown"].split('\n') if content_lines: potential_title = content_lines[0].strip('# ').strip() if potential_title: title = potential_title internal_links = [] if "links" in result and isinstance(result["links"], dict): seen_internal_links = set() for link in result["links"].get("internal", []): href = link.get("href", "") if not href: continue if not href.startswith(('http://', 'https://')): href = urljoin(url, href) href = normalize_url(href) if (href in parent_urls or href in all_internal_links or href in seen_internal_links): continue if any(excluded in href.lower() for excluded in [ "login", "signup", "register", "logout", "account", "profile", "admin" ]): continue base_domain = urlparse(url).netloc link_domain = urlparse(href).netloc if base_domain != link_domain: continue seen_internal_links.add(href) all_internal_links.add(href) internal_links.append(InternalLink( href=href, text=link.get("text", "").strip() )) logger.info(f"Found {len(internal_links)} unique internal links at depth {current_depth}") primary_page = DiscoveredPage( url=url, title=title, internalLinks=internal_links ) discovered_pages.append(primary_page) if current_depth < max_depth: for link in internal_links: # --- Cancellation Check (Before Recursive Call) --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} before recursive discovery for {link.href}. Stopping discovery.") update_overall_status(job_id, 'cancelled') # Return what we have discovered so far for this branch before stopping return discovered_pages # --- End Cancellation Check --- sub_pages = await discover_pages( url=link.href, max_depth=max_depth, current_depth=current_depth + 1, seen_urls=seen_urls, parent_urls=parent_urls, all_internal_links=all_internal_links, root_url=root_url, root_task_id=root_task_id, job_id=job_id # Pass job_id in recursive call ) discovered_pages.extend(sub_pages) # If this was the initial call (depth 1), mark discovery as complete # This signifies that all reachable URLs within the depth limit have been found # and marked as 'pending_crawl'. if current_depth == 1 and job_id: logger.info(f"Initial discovery call complete for job {job_id}. Marking overall status as discovery_complete.") update_overall_status(job_id, 'discovery_complete') return discovered_pages except Exception as e: error_message = f"Error discovering pages starting from {url}: {str(e)}" logger.error(error_message, exc_info=True) if job_id: # Mark current URL as error, passing the captured message update_url_status(job_id, normalize_url(url), 'discovery_error', statusCode=None, error_message=error_message) # Mark overall job as error only if it's the root call failing if current_depth == 1: update_overall_status(job_id, 'error', error_message) # Return minimal info indicating error for this branch return [DiscoveredPage(url=url, title="Error During Discovery", status="error")] async def _fetch_status_code(url: str) -> Optional[int]: """ Fetches the HTTP status code for a given URL using a HEAD request. Args: url: The URL to check. Returns: The HTTP status code as an integer, or None if the request fails or times out. """ try: async with httpx.AsyncClient(follow_redirects=True, timeout=10.0) as client: # Added follow_redirects=True logger.debug(f"Fetching HEAD for status code: {url}") response = await client.head(url) logger.debug(f"Received status code {response.status_code} for {url}") return response.status_code except httpx.TimeoutException: logger.warning(f"Timeout fetching status code for {url}") return None except httpx.RequestError as e: # Log specific request errors (e.g., connection refused, DNS error) logger.warning(f"Request error fetching status code for {url}: {e.__class__.__name__} - {e}") return None except Exception as e: # Catch broader exceptions (e.g., invalid URL format handled by httpx) logger.error(f"Unexpected error fetching status code for {url}: {e}", exc_info=True) return None async def crawl_pages(pages: List[DiscoveredPage], root_url: str = None, job_id: Optional[str] = None) -> CrawlResult: """ Crawl multiple pages, combine content, save to disk, and update job status including HTTP status code. Args: pages: List of pages to crawl (should have status 'pending_crawl'). root_url: The root URL that initiated the crawl. Used for file naming. job_id: The ID of the overall job for status updates. """ all_markdown = [] total_size = 0 errors = 0 crawled_urls = set() # --- Cancellation Check (Start of Crawl) --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} before starting crawl_pages. Aborting.") update_overall_status(job_id, 'cancelled') # Return empty results as nothing was crawled return CrawlResult(markdown="", stats=CrawlStats(errors_encountered=1)) # --- End Cancellation Check --- # Generate a consistent file ID based on the root URL if root_url: root_task_id = url_to_filename(root_url) logger.info(f"Using filename: {root_task_id} for root URL: {root_url} (Job ID: {job_id})") else: # If no root_url is provided, use the first page's URL if pages and len(pages) > 0: root_url = pages[0].url root_task_id = url_to_filename(root_url) logger.info(f"Generated filename: {root_task_id} from first page URL: {root_url} (Job ID: {job_id})") else: # Fallback if no pages are provided root_task_id = None logger.warning(f"No root URL or pages provided for job {job_id}, will use individual task IDs") try: # Set overall status to 'crawling' now that we are starting the crawl process if job_id: logger.info(f"Updating overall status to 'crawling' for job {job_id}") update_overall_status(job_id, 'crawling') for i, page in enumerate(pages): # --- Cancellation Check (Start of Loop Iteration) --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} during crawl loop (before processing {page.url}). Stopping crawl.") update_overall_status(job_id, 'cancelled') break # Exit the loop # --- End Cancellation Check --- if page.url in crawled_urls: logger.debug(f"Skipping already crawled URL: {page.url} (Job ID: {job_id})") continue try: normalized_page_url = normalize_url(page.url) # Normalize once logger.info(f"Crawling page: {normalized_page_url} (Job ID: {job_id})") if job_id: update_url_status(job_id, normalized_page_url, 'crawling', statusCode=None) # Initial status update # No longer replacing docs.crawl4ai.com URLs url = normalized_page_url # Use normalized URL logger.info(f"Processing URL: {url} without replacement") # Simplify the request for testing # The error in the logs shows that there's an issue with the 'magic' parameter # Let's remove any extra parameters and keep it simple simple_request = { "urls": url } # --- Cancellation Check (Before Submitting to Crawl4AI) --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} before submitting task for {url}. Stopping crawl.") update_overall_status(job_id, 'cancelled') break # Exit the loop # --- End Cancellation Check --- # Submit crawl job to Crawl4AI # * Limitation: We cannot easily cancel a task already submitted to Crawl4AI. # * This check prevents submitting *new* tasks after cancellation is requested. logger.info(f"Submitting content crawl task for {url} to Crawl4AI API (Job ID: {job_id})") # Log the full request for debugging logger.info(f"Request URL: {CRAWL4AI_URL}/crawl") logger.info(f"Headers: {headers}") logger.info(f"Request data: {json.dumps(simple_request)}") # Log environment variables for debugging logger.info(f"CRAWL4AI_URL environment variable: {os.environ.get('CRAWL4AI_URL', 'Not set')}") logger.info(f"CRAWL4AI_API_TOKEN environment variable: {'Set' if os.environ.get('CRAWL4AI_API_TOKEN') else 'Not set'}") response = requests.post( f"{CRAWL4AI_URL}/crawl", headers=headers, json=simple_request, timeout=30 ) response.raise_for_status() task_id = response.json()["task_id"] logger.info(f"Submitted content crawl task for {url}, task ID: {task_id} (Job ID: {job_id})") # Poll for result with increased frequency and longer total timeout result = None max_attempts = 120 # Increased from 60 to 120 for complex URLs poll_interval = 1 # Keep at 1 second for attempt in range(max_attempts): # --- Cancellation Check (Polling Loop) --- if job_id and is_cancellation_requested(job_id): logger.warning(f"Cancellation requested for job {job_id} while polling task {task_id} for {url}. Stopping crawl.") update_overall_status(job_id, 'cancelled') # Mark this specific URL as cancelled locally if needed, although overall status is key page.status = "cancelled" result = None # Ensure we don't process potentially incomplete results break # Exit polling loop # --- End Cancellation Check --- logger.info(f"Polling for task {task_id} result (attempt {attempt+1}/{max_attempts})") try: status_response = requests.get( f"{CRAWL4AI_URL}/task/{task_id}", headers=headers, timeout=10 ) status_response.raise_for_status() status = status_response.json() logger.info(f"Task {task_id} status: {status['status']}") if status["status"] == "completed": result = status["result"] logger.info(f"Content crawl task {task_id} for {url} completed successfully (Job ID: {job_id})") # --- Fetch HTTP Status Code (Scenario B) --- http_status_code: Optional[int] = None if job_id: http_status_code = await _fetch_status_code(url) logger.info(f"Fetched HTTP status code for {url}: {http_status_code} (Job ID: {job_id})") # --- End Fetch HTTP Status Code --- # Update status immediately upon successful task completion, including status code if job_id: update_url_status(job_id, url, 'completed', statusCode=http_status_code) # Pass fetched code # Content processing and saving follows # Save the result to files try: # Only create the storage directory for consolidated files # Disable creation of crawl_results directory to prevent individual files # os.makedirs("crawl_results", exist_ok=True) os.makedirs("storage/markdown", exist_ok=True) # Skip any code that might try to write to crawl_results if "crawl_results" in str(task_id): logger.warning(f"Attempted to create file in crawl_results directory - skipping") return # Use the root_task_id for file naming to consolidate all related content file_id = root_task_id if root_task_id else task_id # We no longer save individual files, only consolidated ones logger.info(f"Skipping individual file creation for task {task_id} - using consolidated approach only") # set_task_context call removed as it's obsolete # Extract the markdown content if available if "markdown" in result and result["markdown"]: # Log that we're using the consolidated approach logger.info(f"Using consolidated approach for task {task_id}") # For the consolidated file, we'll append to the root file storage_file = f"storage/markdown/{file_id}.md" # Determine if it's a new file before opening is_new_file = not os.path.exists(storage_file) # Construct the header only if it's a new file header = "" if is_new_file: header = f"# Consolidated Documentation for {root_url}\n\n" header += f"This file contains content from multiple pages related to {root_url}.\n" header += f"Each section represents a different page that was crawled.\n\n" header += "---\n" # Construct the section for the current page page_section = f"\n\n## {result.get('title', 'Untitled Page')}\n" page_section += f"URL: {url}\n\n" page_section += result["markdown"] page_section += "\n\n---\n\n" # Append to the file (mode 'a' creates if not exists) try: with open(storage_file, 'a', encoding='utf-8') as f: # Added encoding if is_new_file: f.write(header) # Write header only once for new files f.write(page_section) # Always append the current page section logger.info(f"{'Appended to' if not is_new_file else 'Created'} consolidated markdown file: {storage_file}") except IOError as io_err: logger.error(f"IOError writing to markdown file {storage_file}: {io_err}", exc_info=True) # Consider how to handle this - maybe mark URL as error? except Exception as e: logger.error(f"Unexpected error writing to markdown file {storage_file}: {e}", exc_info=True) # Consider how to handle this # Update the metadata file with this page's info metadata_file = f"storage/markdown/{file_id}.json" # Read existing metadata if it exists, with error handling metadata = {} if os.path.exists(metadata_file): try: with open(metadata_file, 'r', encoding='utf-8') as f: # Added encoding metadata = json.load(f) except json.JSONDecodeError: logger.error(f"JSONDecodeError reading metadata file {metadata_file}. Will overwrite.", exc_info=True) metadata = {} # Reset metadata if file is corrupt except IOError as io_err: logger.error(f"IOError reading metadata file {metadata_file}: {io_err}. Proceeding with empty metadata.", exc_info=True) metadata = {} # Reset metadata if file is unreadable except Exception as e: logger.error(f"Unexpected error reading metadata file {metadata_file}: {e}. Proceeding with empty metadata.", exc_info=True) metadata = {} # Reset metadata on other errors # Initialize or update the pages list if "pages" not in metadata: metadata = { "title": f"Documentation for {root_url}", "root_url": root_url, "timestamp": datetime.now().isoformat(), "pages": [], "is_consolidated": True } # Add this page to the pages list metadata["pages"].append({ "title": result.get("title", "Untitled"), "url": url, "timestamp": datetime.now().isoformat(), "internal_links": len(result.get("links", {}).get("internal", [])), "external_links": len(result.get("links", {}).get("external", [])) }) # Update the last_updated timestamp metadata["last_updated"] = datetime.now().isoformat() # Write the updated metadata, with error handling try: with open(metadata_file, 'w', encoding='utf-8') as f: # Added encoding json.dump(metadata, f, indent=2) logger.info(f"Updated metadata in {metadata_file}") except IOError as io_err: logger.error(f"IOError writing metadata file {metadata_file}: {io_err}", exc_info=True) except Exception as e: logger.error(f"Unexpected error writing metadata file {metadata_file}: {e}", exc_info=True) logger.info(f"Updated metadata in {metadata_file}") except Exception as e: logger.error(f"Error saving result to files: {str(e)}") break # Exit polling loop on success elif status["status"] == "failed": error_msg = status.get('error', 'Unknown error') logger.error(f"Content crawl task {task_id} for {url} failed: {error_msg} (Job ID: {job_id})") if job_id: update_url_status(job_id, url, 'crawl_error', statusCode=None, error_message=error_msg) # Pass error message result = None # Ensure result is None on failure break # Exit polling loop on failure elif status["status"] == "running": logger.info(f"Task {task_id} is still running") # Continue polling if status is 'running' or other non-terminal state except Exception as e: logger.error(f"Error polling content crawl task {task_id} for {url}: {str(e)} (Job ID: {job_id})") # Potentially mark as error here too, or rely on timeout await asyncio.sleep(poll_interval) # End of polling loop # Check if polling loop was exited due to cancellation if page.status == "cancelled": break # Exit the main page loop if cancellation happened during polling # Check if polling loop finished without success or explicit failure (timeout or other issue) if result is None and status["status"] != "failed": timeout_error_msg = f"Timeout or issue waiting for content crawl result (Task ID: {task_id})" logger.warning(f"{timeout_error_msg} for {url} (Job ID: {job_id})") if job_id: update_url_status(job_id, url, 'crawl_error', statusCode=None, error_message=timeout_error_msg) # Pass timeout message errors += 1 page.status = "error" continue # Move to the next page # --- Result Processing (only if result is not None) --- if result is None: # This case should now be handled by the checks after the polling loop logger.error(f"Reached result processing block unexpectedly with None result for {url}. This indicates a logic error.") errors += 1 page.status = "error" if job_id: update_url_status(job_id, url, 'crawl_error', statusCode=None, error_message="Logic error: Result was None during processing") # Pass specific message continue # Correct indentation for this block if "markdown" in result and result["markdown"]: page_markdown = f"# {page.title or 'Untitled Page'}\n" page_markdown += f"URL: {page.url}\n\n" content = result["markdown"] filtered_lines = [] skip_next = False for line in content.split('\n'): if skip_next: skip_next = False continue if 'To navigate the symbols, press' in line: skip_next = True continue if any(x in line for x in [ 'Skip Navigation', 'Search...', '⌘K', 'symbols inside <root>' ]): continue filtered_lines.append(line) filtered_content = '\n'.join(filtered_lines).strip() if filtered_content: page_markdown += filtered_content page_markdown += "\n\n---\n\n" all_markdown.append(page_markdown) total_size += len(page_markdown.encode('utf-8')) logger.info(f"Successfully extracted content from {url} (Job ID: {job_id})") # Status (including statusCode) was already updated when task completed successfully # Mark URL as crawled locally crawled_urls.add(url) # Use normalized URL page.status = "crawled" # Correct indentation else: logger.warning(f"Skipping {url} - filtered content was empty (Job ID: {job_id})") errors += 1 page.status = "error" # Keep local status for stats if job_id: update_url_status(job_id, url, 'crawl_error', statusCode=None, error_message="Filtered content was empty") # Pass specific message else: logger.warning(f"Skipping {url} - no markdown content available (Job ID: {job_id})") errors += 1 page.status = "error" # Keep local status for stats if job_id: update_url_status(job_id, url, 'crawl_error', statusCode=None, error_message="No markdown content available in result") # Pass specific message except Exception as e: error_message = f"Error crawling page {url}: {str(e)}" # Use normalized URL in log logger.error(error_message, exc_info=True) errors += 1 page.status = "error" # Keep local status for stats if job_id: update_url_status(job_id, url, 'crawl_error', statusCode=None, error_message=error_message) # Pass captured exception message # End of main page loop combined_markdown = "".join(all_markdown) size_str = f"{total_size} B" if total_size > 1024: size_str = f"{total_size/1024:.2f} KB" if total_size > 1024*1024: size_str = f"{total_size/(1024*1024):.2f} MB" # Calculate final stats based on what was processed stats = CrawlStats( subdomains_parsed=len(pages), # Total pages attempted pages_crawled=len(crawled_urls), # Pages successfully crawled data_extracted=size_str, errors_encountered=errors # Errors encountered during processing ) # Determine final status based on whether cancellation occurred final_status = 'unknown' # Default safety value if job_id and is_cancellation_requested(job_id): final_status = 'cancelled' logger.info(f"Crawl process for job {job_id} was cancelled. Returning partial results.") elif errors > 0: final_status = 'completed_with_errors' logger.info(f"Crawl process for job {job_id} completed with {errors} errors.") else: final_status = 'completed' logger.info(f"Crawl process for job {job_id} completed successfully.") # Update overall status only if it wasn't already set to 'cancelled' by a check if job_id and get_job_status(job_id).overall_status != 'cancelled': update_overall_status(job_id, final_status, data_extracted=size_str) elif not job_id: logger.warning("Cannot update final overall status as job_id is missing.") logger.info(f"Returning crawl results for job {job_id} with final status '{final_status}' and stats: {stats}") return CrawlResult( markdown=combined_markdown, # Return potentially partial markdown stats=stats ) except Exception as e: error_message = f"Error in crawl_pages for job {job_id}: {str(e)}" logger.error(error_message, exc_info=True) if job_id: update_overall_status(job_id, 'error', error_message) return CrawlResult( markdown="", stats=CrawlStats( subdomains_parsed=len(pages), pages_crawled=0, data_extracted="0 KB", errors_encountered=1 ) )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cyberagiinc/DevDocs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server