main.py•10.4 kB
"""
Webpage Server MCP Server
A Model Context Protocol server for querying webpages and page contents
"""
import os
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, Any, List
from collections import defaultdict
import time
from dotenv import load_dotenv
from urllib.parse import urljoin, urlparse
from mcp.server.fastmcp import FastMCP
import asyncio
from agents.action_generation import action_agent, input_agent, task_agent, State, FunctionParameter
from agents.browser_agent import browser_agent
from agents.browser_loop import browser_loop
from src.tool_template import tool_template
# Load environment variables
env_path = Path('.') / '.env.local'
if env_path.exists():
load_dotenv(env_path)
load_dotenv() # Also load .env if exists
# Get port from environment or command line args
port = int(os.environ.get('PORT', '8080'))
for i, arg in enumerate(sys.argv):
if arg == '--port' and i + 1 < len(sys.argv):
port = int(sys.argv[i + 1])
break
# Get host from environment or command line args
host = os.environ.get('HOST', '0.0.0.0')
for i, arg in enumerate(sys.argv):
if arg == '--host' and i + 1 < len(sys.argv):
host = sys.argv[i + 1]
break
mcp = FastMCP(
name='Webpage Server',
host=host,
port=port,
instructions="""This MCP server queries webpages and page contents of a specific website.
Available tools:
- list_pages(): List the path of all webpages
Example: list_pages() -> ["/", "/blog", "/blog/post-1", "/marketplace"]
Parse the sitemap.xml to get the list of pages.
Return the page paths instead of the full url.
- get_page(path): Get the html content of a webpage
Example: get_page("/blog/post-1") -> "<html>...</html>"
Get the html content by visiting the full url.
Resources:
- sitemap.xml: The sitemap of the website
This server includes rate limiting (10 requests/minute) to protect API keys.""",
)
# Configuration
BASE_URL = os.getenv('BASE_URL', 'https://example.com')
SITEMAP_PATH = Path(__file__).parent.parent / 'assets' / 'sitemap.xml'
# Rate limiting for API protection
class RateLimiter:
"""Simple rate limiter to protect API keys from abuse"""
def __init__(self, max_requests: int = 10, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, identifier: str) -> bool:
"""Check if request is allowed for this identifier"""
now = time.time()
# Clean old requests outside window
self.requests[identifier] = [
req_time
for req_time in self.requests[identifier]
if now - req_time < self.window_seconds
]
# Check if under limit
if len(self.requests[identifier]) < self.max_requests:
self.requests[identifier].append(now)
return True
return False
def get_reset_time(self, identifier: str) -> int:
"""Get seconds until rate limit resets"""
if not self.requests[identifier]:
return 0
oldest = min(self.requests[identifier])
return max(0, int(self.window_seconds - (time.time() - oldest)))
# Initialize rate limiter (10 requests per minute)
rate_limiter = RateLimiter(max_requests=10, window_seconds=60)
def fetch_sitemap() -> str:
"""Read sitemap.xml content from local file"""
if not SITEMAP_PATH.exists():
raise ValueError(f"Sitemap file not found at {SITEMAP_PATH}")
return SITEMAP_PATH.read_text()
def parse_sitemap_urls(sitemap_xml: str) -> List[str]:
"""Parse sitemap XML and extract URLs"""
try:
root = ET.fromstring(sitemap_xml)
# Handle namespace
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
# Try with namespace first
urls = root.findall('.//ns:url/ns:loc', namespace)
if not urls:
# Try without namespace
urls = root.findall('.//url/loc')
return [url.text for url in urls if url.text]
except Exception as e:
raise ValueError(f"Failed to parse sitemap: {str(e)}")
@mcp.resource('sitemap://sitemap.xml')
def get_sitemap() -> str:
"""
Serve the sitemap.xml file
Returns:
Content of the sitemap.xml file
"""
return fetch_sitemap()
@mcp.tool()
def list_pages() -> List[str]:
"""
List the path of all webpages from sitemap.xml
Returns:
List of page paths (e.g., ["/", "/blog", "/blog/post-1"])
"""
sitemap_content = fetch_sitemap()
urls = parse_sitemap_urls(sitemap_content)
# Convert full URLs to paths
paths = []
for url in urls:
parsed = urlparse(url)
path = parsed.path or '/'
paths.append(path)
return sorted(set(paths)) # Remove duplicates and sort
@mcp.tool()
def get_page(path: str, user_id: str = None) -> Dict[str, Any]:
"""
Get the HTML content of a webpage
Args:
path: Path to the webpage (e.g., "/blog/post-1")
user_id: Optional user identifier for rate limiting
Returns:
Dictionary with HTML content and metadata
"""
# Rate limiting check
identifier = user_id or 'default'
if not rate_limiter.is_allowed(identifier):
reset_time = rate_limiter.get_reset_time(identifier)
return {
'error': 'Rate limit exceeded',
'message': f'Too many requests. Please wait {reset_time} seconds before trying again.',
'reset_in_seconds': reset_time,
'limit': '10 requests per minute',
}
# Construct full URL
if not path.startswith('/'):
path = '/' + path
full_url = urljoin(BASE_URL, path)
try:
import requests
response = requests.get(full_url, timeout=10)
response.raise_for_status()
return {
'path': path,
'url': full_url,
'html': response.text,
'status_code': response.status_code,
'content_type': response.headers.get('Content-Type', 'unknown'),
}
except Exception as e:
return {
'error': 'Failed to fetch page',
'path': path,
'url': full_url,
'message': str(e),
}
def format_input_list(parameters: List[FunctionParameter]) -> str:
return "\n".join([f" '{p.name}': {p.name}" for p in parameters])
def format_parameter_list(parameters: List[FunctionParameter]) -> str:
return ", ".join([f"{p.name}: {p.type}" for p in parameters])
@mcp.tool()
async def add_tool(action_description: str) -> str:
"""
Fequest to add a tool to the MCP server.
Args:
action_description: The description of the action to add to the MCP server.
Returns:
The response from the MCP server. Three options:
1. "Tool already exists: <tool_name>"
2. "Tool added successfully. You can redeploy the server to use it."
3. "Error: <error_message>"
"""
# skip existence check for now
state = State(
action_description=action_description,
website_url=BASE_URL
)
print("1. action_agent...")
state = await action_agent(state)
print(f" ✓ {state.function_metadata.function_name}")
print("2. task_agent...")
state = await task_agent(state)
print(f" ✓ task created")
print("3. browser_loop...")
try:
state = await asyncio.wait_for(browser_loop(state), timeout=180)
print(f"\n✓ RESULT: {state.result}")
print(f" Test results: {state.test_results}")
except asyncio.TimeoutError:
print("\n✗ Browser loop timed out after 3 minutes")
except Exception as e:
print(f"\n✗ Error: {e}")
if state.winning_task == None:
return "Error: tool cannot be added. Either the website lacks this feature, or the task is too complex."
tool_code = tool_template.format(
function_name=state.function_metadata.function_name,
parameter_list=format_parameter_list(state.function_metadata.parameters),
description=state.function_metadata.description,
input_list=format_input_list(state.function_metadata.parameters),
task_description=state.winning_task,
)
# Read main.py
with open(__file__, 'r') as f:
content = f.read()
# Find the insertion marker
marker = "# INSERT TOOL CODE AFTER THIS LINE"
insert_pos = content.find(marker) + len(marker)
# Insert the new tool
new_content = content[:insert_pos] + "\n\n" + tool_code + content[insert_pos:]
# Write it back
with open(__file__, 'w') as f:
f.write(new_content)
return f"Tool '{state.function_metadata.function_name}' added! Restart the MCP server to use it."
# INSERT TOOL CODE AFTER THIS LINE
def main():
"""Main entry point for the MCP server"""
import argparse
# Parse command line arguments
parser = argparse.ArgumentParser(description='Webpage Server MCP Server')
parser.add_argument('--port', type=int, help='Port for HTTP transport')
parser.add_argument(
'--host', type=str, default='0.0.0.0', help='Host for HTTP transport'
)
parser.add_argument('--stdio', action='store_true', help='Force STDIO transport')
parser.add_argument('--test', action='store_true', help='Test mode')
args = parser.parse_args()
# Check if running in test mode
if args.test:
# Test mode - just verify everything loads
print('Webpage Server MCP Server loaded successfully')
print(f'Base URL: {BASE_URL}')
print(f'Sitemap Path: {SITEMAP_PATH}')
print('Tools available: list_pages, get_page')
print('Resources available: sitemap.xml')
return 0
# Determine transport mode
if (args.port or os.environ.get('PORT')) and not args.stdio:
# HTTP transport mode
actual_host = host if not args.host else args.host
actual_port = port if not args.port else args.port
print(f'Starting HTTP server on {actual_host}:{actual_port}')
print(f'MCP endpoint: http://{actual_host}:{actual_port}/mcp')
mcp.run(transport='streamable-http')
else:
# STDIO transport (default for MCP)
mcp.run('stdio')
return 0
if __name__ == '__main__':
import sys
sys.exit(main())