Skip to main content
Glama

AiDD MCP Server

by skydeckai
web_tools.py35.5 kB
import os import random import time from typing import List from urllib.parse import urlparse import requests from mcp.types import TextContent from .state import state def web_fetch_tool(): return { "name": "web_fetch", "description": "Fetches content from a URL. " "WHEN TO USE: When you need to retrieve data from web APIs, download documentation, " "check external resources, or gather information from websites. Useful for getting " "real-time data, documentation, or referencing external content. " "WHEN NOT TO USE: When you need to interact with complex websites requiring authentication " "or session management, when the data needs to be processed in a specific format not supported, " "or when you need to make authenticated API calls with OAuth. " "TIP: Use 'web_search' first to find relevant URLs, then use this tool to fetch detailed content. " "RETURNS: The content of the URL as text. For HTML pages, returns the raw HTML content. " "For JSON endpoints, returns the JSON content as a string. Successful response includes HTTP " "status code. Failed requests include error details. Maximum request size enforced for safety.", "inputSchema": { "type": "object", "properties": { "url": { "type": "string", "description": "The URL to fetch content from. Must be a valid URL with supported protocol " "(http or https). Examples: 'https://example.com', 'https://api.github.com/repos/user/repo'. " "The URL must be publicly accessible." }, "headers": { "type": "object", "description": "Optional HTTP headers to include in the request. Useful for API calls that " "require specific headers like User-Agent or Accept. Example: {'User-Agent': 'SkyDeckAI', " "'Accept': 'application/json'}.", "default": {} }, "timeout": { "type": "integer", "description": "Request timeout in seconds. Maximum time to wait for the server to respond before " "aborting the request. Defaults to 10 seconds.", "default": 10 }, "save_to_file": { "type": "string", "description": "Optional path to save the response content to a file. If provided, the content " "will be saved to this location. Must be within the allowed directory. Example: " "'downloads/page.html', 'data/api_response.json'.", "default": None }, "convert_html_to_markdown": { "type": "boolean", "description": "If set to true and the content is HTML, it will be converted to markdown format " "for better readability. This is especially useful for web pages with a lot of content.", "default": True } }, "required": ["url"] } } async def handle_web_fetch(arguments: dict) -> List[TextContent]: """Handle fetching content from a URL.""" url = arguments.get("url") headers = arguments.get("headers", {}) timeout = arguments.get("timeout", 10) save_to_file = arguments.get("save_to_file") convert_html_to_markdown = arguments.get("convert_html_to_markdown", True) if not url: raise ValueError("URL must be provided") # Basic URL validation parsed_url = urlparse(url) if not parsed_url.scheme or not parsed_url.netloc: raise ValueError(f"Invalid URL: {url}. Must include scheme (http/https) and domain.") if parsed_url.scheme not in ["http", "https"]: raise ValueError(f"Unsupported URL scheme: {parsed_url.scheme}. Only http and https are supported.") # Add a default User-Agent if not provided if "User-Agent" not in headers: headers["User-Agent"] = "SkyDeckAI-Web-Fetch/1.0" # Validate and prepare file path if saving to file full_save_path = None if save_to_file: if os.path.isabs(save_to_file): full_save_path = os.path.abspath(save_to_file) else: full_save_path = os.path.abspath(os.path.join(state.allowed_directory, save_to_file)) # Security check if not full_save_path.startswith(state.allowed_directory): raise ValueError(f"Access denied: Path ({full_save_path}) must be within allowed directory") # Create parent directories if they don't exist os.makedirs(os.path.dirname(full_save_path), exist_ok=True) try: # Make the request with a maximum size limit to prevent abuse response = requests.get( url, headers=headers, timeout=timeout, stream=True # Use streaming for better control over large responses ) # Check if response is successful response.raise_for_status() # Get content type from headers content_type = response.headers.get("Content-Type", "").lower() # Maximum size limit (10MB) max_size = 10 * 1024 * 1024 content = b"" for chunk in response.iter_content(chunk_size=8192): content += chunk if len(content) > max_size: raise ValueError(f"Response too large. Maximum size is {max_size // (1024 * 1024)}MB.") # Save to file if requested if full_save_path: with open(full_save_path, 'wb') as f: f.write(content) # Try to decode the content try: text_content = content.decode('utf-8') # Convert HTML to markdown if requested and content appears to be HTML if convert_html_to_markdown and ("html" in content_type or text_content.strip().startswith(("<!DOCTYPE", "<html"))): try: # Using the html2text library to convert HTML to markdown # Need to import here to avoid dependency issues if the library is not installed import html2text h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = False h.ignore_emphasis = False h.body_width = 0 # Don't wrap text text_content = h.handle(text_content) except ImportError: # Add note that html2text needs to be installed text_content = f"NOTE: Could not convert HTML to markdown because html2text library is not installed.\n\n{text_content}" except UnicodeDecodeError: # If content can't be decoded as utf-8, provide info about binary content if full_save_path: return [TextContent( type="text", text=f"Binary content saved to {save_to_file} (size: {len(content)} bytes, type: {content_type})" )] else: return [TextContent( type="text", text=f"Binary content received (size: {len(content)} bytes, type: {content_type})" )] # Success message status_info = f"HTTP {response.status_code}" size_info = f"{len(content)} bytes" save_info = f", saved to {save_to_file}" if full_save_path else "" format_info = " (converted to markdown)" if convert_html_to_markdown and ("html" in content_type or text_content.strip().startswith(("<!DOCTYPE", "<html"))) else "" result = [TextContent( type="text", text=f"{status_info}, {size_info}{save_info}{format_info}:\n\n{text_content}" )] return result except requests.exceptions.RequestException as e: # Handle request-related errors error_message = str(e) if hasattr(e, 'response') and e.response is not None: error_message = f"HTTP {e.response.status_code}: {error_message}" raise ValueError(f"Error fetching URL ({url}): {error_message}") except Exception as e: # Handle other errors raise ValueError(f"Error processing content from {url}: {str(e)}") def web_search_tool(): return { "name": "web_search", "description": "Performs a web search and returns the search results. " "WHEN TO USE: When you need to find information on the web, get up-to-date data, " "or research a topic. This provides more current information than your training data. " "WHEN NOT TO USE: For queries requiring complex authentication, accessing private data, " "or when you want to browse interactive websites. " "TIP: For best results, use this tool to find relevant URLs, then use 'web_fetch' to get the full content of specific pages. " "RETURNS: A list of search results including titles, URLs, and snippets for each result.", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query to send to search engine. Be specific to get better results. " "Example: 'latest python release features' or 'climate change statistics 2023'." }, "num_results": { "type": "integer", "description": "Number of search results to return. Maximum is 20 to prevent abuse.", "default": 10 }, "convert_html_to_markdown": { "type": "boolean", "description": "If true, search result snippets will be converted from HTML to markdown " "for better readability.", "default": True }, "search_engine": { "type": "string", "description": "Specifies which search engine to use. Options: 'auto' (tries all in sequence), " "'bing', or 'duckduckgo'. Some engines may block automated requests.", "enum": ["auto", "bing", "duckduckgo"], "default": "auto" } }, "required": ["query"] } } def _process_ddg_url(url): """Process DuckDuckGo URLs to get the actual target URL.""" try: import urllib.parse url_parts = urllib.parse.urlparse(url) # Case 1: Traditional uddg parameter format if 'uddg' in url_parts.query: query_parts = urllib.parse.parse_qs(url_parts.query) extracted_url = query_parts.get('uddg', [''])[0] if extracted_url: return extracted_url # Case 2: Advertising/redirect y.js format elif 'y.js' in url_parts.path: query_parts = urllib.parse.parse_qs(url_parts.query) # Try ad_domain first if 'ad_domain' in query_parts and query_parts['ad_domain'][0]: return f"https://{query_parts['ad_domain'][0]}" # Then try du parameter elif 'du' in query_parts and query_parts['du'][0]: return query_parts['du'][0] # Try other known parameters for param in ['u', 'l']: if param in query_parts and query_parts[param][0]: return query_parts[param][0] # Case 3: Direct URL elif url.startswith('http'): return url except Exception as e: print(f"Error processing DuckDuckGo URL: {str(e)}") # Default to original URL if all else fails return url def _process_bing_url(url): """Process Bing URLs to get the actual target URL.""" try: import urllib.parse parsed_url = urllib.parse.urlparse(url) # Check if it's a Bing redirect URL if parsed_url.netloc == 'www.bing.com' and parsed_url.path == '/ck/a': # Try to extract the actual URL from Bing's redirect query_dict = urllib.parse.parse_qs(parsed_url.query) if 'u' in query_dict: # Bing stores the actual URL in the 'u' parameter, often base64 encoded import base64 try: # Try to decode if it's base64 real_url = base64.b64decode(query_dict['u'][0]).decode('utf-8') return real_url except Exception: # If not base64, just use it directly return query_dict['u'][0] # Try other known redirect parameters for param in ['purl', 'r']: if param in query_dict: return query_dict[param][0] except Exception as e: print(f"Error processing Bing URL: {str(e)}") # Default to original URL if all else fails return url async def handle_web_search(arguments: dict) -> List[TextContent]: """Handle performing a web search using direct HTML scraping with anti-detection measures.""" query = arguments.get("query") num_results = min(arguments.get("num_results", 10), 20) # Cap at 20 results max convert_html_to_markdown = arguments.get("convert_html_to_markdown", True) search_engine = arguments.get("search_engine", "auto").lower() engine_warning = None if not query: raise ValueError("Search query must be provided") # Validate search engine parameter valid_engines = ["auto", "bing", "duckduckgo"] if search_engine not in valid_engines: if search_engine == "google": engine_warning = "Warning: Google search engine is no longer supported due to blocking automated requests. Falling back to 'auto' mode." else: engine_warning = f"Warning: Unsupported search engine '{search_engine}'. Valid options are: {', '.join(valid_engines)}. Falling back to 'auto' mode." print(engine_warning) search_engine = "auto" # Default to auto if invalid # Create a list of common user agents to rotate through user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36" ] # Use a random user agent user_agent = random.choice(user_agents) # Set up params for the request params = { "q": query, "num": num_results + 5, # Request a few more results than needed "hl": "en", # Language hint "gl": "us", # Geolocation hint (helps avoid redirect to country-specific sites) } # Set up headers to more closely mimic a real browser headers = { "User-Agent": user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate", "Referer": "https://www.skydeck.ai/", "Connection": "keep-alive", "Cache-Control": "max-age=0", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", } # Define search engines configurations search_engines = [ { "name": "DuckDuckGo HTML", "id": "duckduckgo", "url": "https://html.duckduckgo.com/html/", "params": {"q": query}, "headers": { "User-Agent": user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate", "Referer": "https://duckduckgo.com/", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1" }, "result_selector": [ ".web-result", ".result:not(.result--ad)", ".results_links:not(.result--ad)", ".result" ], "title_selector": [ ".result__title", ".result__a", "h2", ".result__title a" ], "link_selector": [ "a.result__a", "a.result__url", ".result__title a", "a[href^='http']" ], "snippet_selector": [ ".result__snippet", ".result__snippet p", ".result__desc", ".result__body", ".snippet" ] }, { "name": "Bing", "id": "bing", "url": "https://www.bing.com/search", "params": {"q": query, "count": num_results}, "headers": { "User-Agent": user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate", "Referer": "https://www.bing.com/", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1" }, "result_selector": [ ".b_algo", "li.b_algo", ".b_results > li:not(.b_ad)", "ol#b_results > li" ], "title_selector": [ "h2", ".b_title", "h2 a", "a" ], "link_selector": [ "h2 a", "a.tilk", "cite", ".b_attribution > cite", "a[href^='http']" ], "snippet_selector": [ ".b_caption p", ".b_snippet", ".b_richcard", ".b_caption", ".b_algoSlug" ] } ] # Filter engines based on user preference if search_engine != "auto": filtered_engines = [engine for engine in search_engines if engine["id"] == search_engine] if filtered_engines: search_engines = filtered_engines # If no matching engine found, keep the original list (fallback to auto) # Track URLs we've already seen to prevent duplicates seen_urls = set() # Try each search engine until one works for engine in search_engines: try: print(f"Trying search with {engine['name']}...") # Add a small delay to avoid rate limiting time.sleep(random.uniform(0.5, 1.5)) # Make the request response = requests.get( engine["url"], params=engine["params"], headers=engine["headers"], timeout=15 ) # Check if the response was successful if response.status_code == 200: # Parse the HTML response try: from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') search_results = [] # Special handling for DuckDuckGo which uses different URL structure is_ddg = engine["name"] == "DuckDuckGo HTML" # Convert single selector to list for consistent handling result_selectors = engine["result_selector"] if isinstance(result_selectors, str): result_selectors = [result_selectors] # Try each result selector until we find results result_elements = [] for selector in result_selectors: result_elements = soup.select(selector) if result_elements: print(f"Found {len(result_elements)} results with selector '{selector}'") break print(f"Found {len(result_elements)} potential results with {engine['name']}") for result in result_elements: if len(search_results) >= num_results: break # Try all title selectors title_selectors = engine["title_selector"] if isinstance(title_selectors, str): title_selectors = [title_selectors] title_element = None for selector in title_selectors: title_element = result.select_one(selector) if title_element: break # Try all link selectors link_selectors = engine["link_selector"] if isinstance(link_selectors, str): link_selectors = [link_selectors] link_element = None for selector in link_selectors: link_element = result.select_one(selector) if link_element and 'href' in link_element.attrs: break # Try all snippet selectors snippet_selectors = engine["snippet_selector"] if isinstance(snippet_selectors, str): snippet_selectors = [snippet_selectors] snippet_element = None for selector in snippet_selectors: snippet_element = result.select_one(selector) if snippet_element: break # If we couldn't find link or title, try looking for any anchor tag with text if not link_element and not title_element: for anchor in result.find_all('a', href=True): if anchor.text.strip() and len(anchor.text.strip()) > 3: link_element = anchor title_element = anchor break if title_element and link_element and 'href' in link_element.attrs: # Process URL url = link_element['href'] # Process URL based on search engine if is_ddg: url = _process_ddg_url(url) elif engine["id"] == "bing": url = _process_bing_url(url) # Skip duplicate URLs canonical_url = url.split('?')[0].rstrip('/') # Remove query params and trailing slash for comparison if canonical_url in seen_urls: continue seen_urls.add(canonical_url) # Ensure URL is valid if not url or not url.startswith('http'): continue # Get title and snippet title = title_element.text.strip() snippet = snippet_element.text.strip() if snippet_element else "No description available" # Add to results if we have valid data if title: search_results.append({ "title": title, "link": url, "snippet": snippet }) # If we found results, format and return them if search_results: print(f"Success! Found {len(search_results)} results with {engine['name']}") return _format_search_results(query, search_results, convert_html_to_markdown, engine["name"], engine_warning) except Exception as parse_error: print(f"Error parsing {engine['name']} results: {str(parse_error)}") # Continue to the next engine else: print(f"{engine['name']} returned status code: {response.status_code}") except Exception as e: print(f"Error with {engine['name']}: {str(e)}") # Continue to the next engine # If all engines fail, try a last-resort approach: extract any links from the last response try: if 'response' in locals() and response.status_code == 200: from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') print("Attempting emergency link extraction...") emergency_results = [] # Look for common result containers first potential_containers = [ # Common search result containers soup.select("div.g, div.b_algo, .result, .web-result, .results_links, li[data-bm], div[data-hveid]"), # Any div with title-like content soup.select("div:has(h1), div:has(h2), div:has(h3), div:has(h4)"), # Main content areas soup.select("main, #main, #content, .content, #results, .results"), # Fallback to any link with reasonable text soup.select("a[href^='http']") ] # Process each container type in order until we find enough results for container_set in potential_containers: if container_set and len(emergency_results) < num_results: for container in container_set: # For containers, look for links inside if container.name != 'a': links = container.select("a[href^='http']") or [] # Process each link in the container for link in links: url = link.get('href', '') title = link.text.strip() # Skip navigation links or empty links if not url or not title or len(title) < 5: continue # Skip search engine internal links if any(s in url for s in ['google.com/search', 'bing.com/search', 'duckduckgo.com']): continue # Skip duplicate URLs canonical_url = url.split('?')[0].rstrip('/') if canonical_url in seen_urls: continue seen_urls.add(canonical_url) # Process URL based on domain if 'bing.com' in url: url = _process_bing_url(url) elif 'duckduckgo.com' in url: url = _process_ddg_url(url) # Find snippet text near the link if possible snippet = "No description available" # Try to get snippet from surrounding paragraph or div parent = link.parent if parent: # Look for sibling paragraphs or divs sibling = parent.find_next_sibling(['p', 'div', 'span']) if sibling and sibling.text.strip(): snippet = sibling.text.strip() # Or try parent's text excluding the link text elif parent.name in ['p', 'div', 'span'] and len(parent.text) > len(title): snippet_text = parent.text.replace(title, '').strip() if snippet_text: snippet = snippet_text emergency_results.append({ "title": title, "link": url, "snippet": snippet }) if len(emergency_results) >= num_results: break else: # Process direct link url = container.get('href', '') title = container.text.strip() # Skip invalid links if not url or not title or len(title) < 5: continue # Skip search engine internal links if any(s in url for s in ['google.com/search', 'bing.com/search', 'duckduckgo.com']): continue # Skip duplicate URLs canonical_url = url.split('?')[0].rstrip('/') if canonical_url in seen_urls: continue seen_urls.add(canonical_url) emergency_results.append({ "title": title, "link": url, "snippet": "No description available" }) if len(emergency_results) >= num_results: break if len(emergency_results) >= num_results: break if emergency_results: print(f"Found {len(emergency_results)} emergency results by extracting links") return _format_search_results(query, emergency_results, convert_html_to_markdown, "Emergency Links", engine_warning) except Exception as e: print(f"Error in emergency link extraction: {str(e)}") # If all search methods fail, provide helpful fallback information print("All search methods failed, providing search fallback") return _provide_search_fallback(query, engine_warning) def _format_search_results(query: str, search_results: list, convert_html_to_markdown: bool, engine_name: str = None, engine_warning: str = None) -> List[TextContent]: """Format search results into markdown.""" formatted_results = ["# Web Search Results\n\n"] formatted_results.append(f"**Query:** {query}\n\n") if engine_warning: formatted_results.append(f"**{engine_warning}**\n\n") if engine_name: formatted_results.append(f"**Source:** {engine_name}\n\n") for i, item in enumerate(search_results, 1): title = item.get("title", "No title") link = item.get("link", "") snippet = item.get("snippet", "No description available") # Convert HTML in snippet to markdown if requested if convert_html_to_markdown: try: import html2text h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = True h.body_width = 0 # Don't wrap text # Remove HTML tags from title and snippet title = h.handle(title) if '<' in title else title snippet = h.handle(snippet) if '<' in snippet else snippet except ImportError: # Continue without conversion if html2text is not available # Just strip basic HTML tags as a fallback import re title = re.sub(r'<[^>]*>', '', title) snippet = re.sub(r'<[^>]*>', '', snippet) formatted_results.append(f"## {i}. {title}\n") formatted_results.append(f"**URL:** {link}\n\n") formatted_results.append(f"{snippet}\n\n---\n\n") return [TextContent( type="text", text="".join(formatted_results) )] def _provide_search_fallback(query: str, engine_warning: str = None) -> List[TextContent]: """Provide a useful fallback when search fails.""" # Create a helpful response with suggestions for alternative approaches formatted_results = ["# Web Search Results\n\n"] formatted_results.append(f"**Query:** {query}\n\n") if engine_warning: formatted_results.append(f"**{engine_warning}**\n\n") formatted_results.append("I couldn't retrieve search results at this time.\n\n") # Add explanation about limitations formatted_results.append("## Why search might be unavailable\n\n") formatted_results.append("Web search APIs often have restrictions on automated access, which can cause searches to fail. When this happens, it's better to:\n\n") formatted_results.append("1. Try a different search engine (Bing or DuckDuckGo which are more reliable for automated access)\n") formatted_results.append("2. Visit specific authoritative sites directly\n") formatted_results.append("3. Try the search again later, or with different terms\n") return [TextContent( type="text", text="".join(formatted_results) )]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/skydeckai/mcp-server-aidd'

If you have feedback or need assistance with the MCP directory API, please join our Discord server