Skip to main content
Glama
main.py10.4 kB
""" Webpage Server MCP Server A Model Context Protocol server for querying webpages and page contents """ import os import sys import xml.etree.ElementTree as ET from pathlib import Path from typing import Dict, Any, List from collections import defaultdict import time from dotenv import load_dotenv from urllib.parse import urljoin, urlparse from mcp.server.fastmcp import FastMCP import asyncio from agents.action_generation import action_agent, input_agent, task_agent, State, FunctionParameter from agents.browser_agent import browser_agent from agents.browser_loop import browser_loop from src.tool_template import tool_template # Load environment variables env_path = Path('.') / '.env.local' if env_path.exists(): load_dotenv(env_path) load_dotenv() # Also load .env if exists # Get port from environment or command line args port = int(os.environ.get('PORT', '8080')) for i, arg in enumerate(sys.argv): if arg == '--port' and i + 1 < len(sys.argv): port = int(sys.argv[i + 1]) break # Get host from environment or command line args host = os.environ.get('HOST', '0.0.0.0') for i, arg in enumerate(sys.argv): if arg == '--host' and i + 1 < len(sys.argv): host = sys.argv[i + 1] break mcp = FastMCP( name='Webpage Server', host=host, port=port, instructions="""This MCP server queries webpages and page contents of a specific website. Available tools: - list_pages(): List the path of all webpages Example: list_pages() -> ["/", "/blog", "/blog/post-1", "/marketplace"] Parse the sitemap.xml to get the list of pages. Return the page paths instead of the full url. - get_page(path): Get the html content of a webpage Example: get_page("/blog/post-1") -> "<html>...</html>" Get the html content by visiting the full url. Resources: - sitemap.xml: The sitemap of the website This server includes rate limiting (10 requests/minute) to protect API keys.""", ) # Configuration BASE_URL = os.getenv('BASE_URL', 'https://example.com') SITEMAP_PATH = Path(__file__).parent.parent / 'assets' / 'sitemap.xml' # Rate limiting for API protection class RateLimiter: """Simple rate limiter to protect API keys from abuse""" def __init__(self, max_requests: int = 10, window_seconds: int = 60): self.max_requests = max_requests self.window_seconds = window_seconds self.requests = defaultdict(list) def is_allowed(self, identifier: str) -> bool: """Check if request is allowed for this identifier""" now = time.time() # Clean old requests outside window self.requests[identifier] = [ req_time for req_time in self.requests[identifier] if now - req_time < self.window_seconds ] # Check if under limit if len(self.requests[identifier]) < self.max_requests: self.requests[identifier].append(now) return True return False def get_reset_time(self, identifier: str) -> int: """Get seconds until rate limit resets""" if not self.requests[identifier]: return 0 oldest = min(self.requests[identifier]) return max(0, int(self.window_seconds - (time.time() - oldest))) # Initialize rate limiter (10 requests per minute) rate_limiter = RateLimiter(max_requests=10, window_seconds=60) def fetch_sitemap() -> str: """Read sitemap.xml content from local file""" if not SITEMAP_PATH.exists(): raise ValueError(f"Sitemap file not found at {SITEMAP_PATH}") return SITEMAP_PATH.read_text() def parse_sitemap_urls(sitemap_xml: str) -> List[str]: """Parse sitemap XML and extract URLs""" try: root = ET.fromstring(sitemap_xml) # Handle namespace namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} # Try with namespace first urls = root.findall('.//ns:url/ns:loc', namespace) if not urls: # Try without namespace urls = root.findall('.//url/loc') return [url.text for url in urls if url.text] except Exception as e: raise ValueError(f"Failed to parse sitemap: {str(e)}") @mcp.resource('sitemap://sitemap.xml') def get_sitemap() -> str: """ Serve the sitemap.xml file Returns: Content of the sitemap.xml file """ return fetch_sitemap() @mcp.tool() def list_pages() -> List[str]: """ List the path of all webpages from sitemap.xml Returns: List of page paths (e.g., ["/", "/blog", "/blog/post-1"]) """ sitemap_content = fetch_sitemap() urls = parse_sitemap_urls(sitemap_content) # Convert full URLs to paths paths = [] for url in urls: parsed = urlparse(url) path = parsed.path or '/' paths.append(path) return sorted(set(paths)) # Remove duplicates and sort @mcp.tool() def get_page(path: str, user_id: str = None) -> Dict[str, Any]: """ Get the HTML content of a webpage Args: path: Path to the webpage (e.g., "/blog/post-1") user_id: Optional user identifier for rate limiting Returns: Dictionary with HTML content and metadata """ # Rate limiting check identifier = user_id or 'default' if not rate_limiter.is_allowed(identifier): reset_time = rate_limiter.get_reset_time(identifier) return { 'error': 'Rate limit exceeded', 'message': f'Too many requests. Please wait {reset_time} seconds before trying again.', 'reset_in_seconds': reset_time, 'limit': '10 requests per minute', } # Construct full URL if not path.startswith('/'): path = '/' + path full_url = urljoin(BASE_URL, path) try: import requests response = requests.get(full_url, timeout=10) response.raise_for_status() return { 'path': path, 'url': full_url, 'html': response.text, 'status_code': response.status_code, 'content_type': response.headers.get('Content-Type', 'unknown'), } except Exception as e: return { 'error': 'Failed to fetch page', 'path': path, 'url': full_url, 'message': str(e), } def format_input_list(parameters: List[FunctionParameter]) -> str: return "\n".join([f" '{p.name}': {p.name}" for p in parameters]) def format_parameter_list(parameters: List[FunctionParameter]) -> str: return ", ".join([f"{p.name}: {p.type}" for p in parameters]) @mcp.tool() async def add_tool(action_description: str) -> str: """ Fequest to add a tool to the MCP server. Args: action_description: The description of the action to add to the MCP server. Returns: The response from the MCP server. Three options: 1. "Tool already exists: <tool_name>" 2. "Tool added successfully. You can redeploy the server to use it." 3. "Error: <error_message>" """ # skip existence check for now state = State( action_description=action_description, website_url=BASE_URL ) print("1. action_agent...") state = await action_agent(state) print(f" ✓ {state.function_metadata.function_name}") print("2. task_agent...") state = await task_agent(state) print(f" ✓ task created") print("3. browser_loop...") try: state = await asyncio.wait_for(browser_loop(state), timeout=180) print(f"\n✓ RESULT: {state.result}") print(f" Test results: {state.test_results}") except asyncio.TimeoutError: print("\n✗ Browser loop timed out after 3 minutes") except Exception as e: print(f"\n✗ Error: {e}") if state.winning_task == None: return "Error: tool cannot be added. Either the website lacks this feature, or the task is too complex." tool_code = tool_template.format( function_name=state.function_metadata.function_name, parameter_list=format_parameter_list(state.function_metadata.parameters), description=state.function_metadata.description, input_list=format_input_list(state.function_metadata.parameters), task_description=state.winning_task, ) # Read main.py with open(__file__, 'r') as f: content = f.read() # Find the insertion marker marker = "# INSERT TOOL CODE AFTER THIS LINE" insert_pos = content.find(marker) + len(marker) # Insert the new tool new_content = content[:insert_pos] + "\n\n" + tool_code + content[insert_pos:] # Write it back with open(__file__, 'w') as f: f.write(new_content) return f"Tool '{state.function_metadata.function_name}' added! Restart the MCP server to use it." # INSERT TOOL CODE AFTER THIS LINE def main(): """Main entry point for the MCP server""" import argparse # Parse command line arguments parser = argparse.ArgumentParser(description='Webpage Server MCP Server') parser.add_argument('--port', type=int, help='Port for HTTP transport') parser.add_argument( '--host', type=str, default='0.0.0.0', help='Host for HTTP transport' ) parser.add_argument('--stdio', action='store_true', help='Force STDIO transport') parser.add_argument('--test', action='store_true', help='Test mode') args = parser.parse_args() # Check if running in test mode if args.test: # Test mode - just verify everything loads print('Webpage Server MCP Server loaded successfully') print(f'Base URL: {BASE_URL}') print(f'Sitemap Path: {SITEMAP_PATH}') print('Tools available: list_pages, get_page') print('Resources available: sitemap.xml') return 0 # Determine transport mode if (args.port or os.environ.get('PORT')) and not args.stdio: # HTTP transport mode actual_host = host if not args.host else args.host actual_port = port if not args.port else args.port print(f'Starting HTTP server on {actual_host}:{actual_port}') print(f'MCP endpoint: http://{actual_host}:{actual_port}/mcp') mcp.run(transport='streamable-http') else: # STDIO transport (default for MCP) mcp.run('stdio') return 0 if __name__ == '__main__': import sys sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brian-bfz/fireworks4'

If you have feedback or need assistance with the MCP directory API, please join our Discord server