"""
Webpage Server MCP Server
A Model Context Protocol server for querying webpages and page contents
"""
import os
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, Any, List
from collections import defaultdict
import time
from dotenv import load_dotenv
from urllib.parse import urljoin, urlparse
from openai import ChatOpenAI
from browser_use import Agent, Browser
import asyncio
from mcp.server.fastmcp import FastMCP
# Load environment variables
env_path = Path('.') / '.env.local'
if env_path.exists():
load_dotenv(env_path)
load_dotenv() # Also load .env if exists
# Get port from environment or command line args
port = int(os.environ.get('PORT', '8080'))
for i, arg in enumerate(sys.argv):
if arg == '--port' and i + 1 < len(sys.argv):
port = int(sys.argv[i + 1])
break
# Get host from environment or command line args
host = os.environ.get('HOST', '0.0.0.0')
for i, arg in enumerate(sys.argv):
if arg == '--host' and i + 1 < len(sys.argv):
host = sys.argv[i + 1]
break
mcp = FastMCP(
name='Webpage Server',
host=host,
port=port,
instructions="""This MCP server queries webpages and page contents of a specific website.
Available tools:
- list_pages(): List the path of all webpages
Example: list_pages() -> ["/", "/blog", "/blog/post-1", "/marketplace"]
Parse the sitemap.xml to get the list of pages.
Return the page paths instead of the full url.
- get_page(path): Get the html content of a webpage
Example: get_page("/blog/post-1") -> "<html>...</html>"
Get the html content by visiting the full url.
Resources:
- sitemap.xml: The sitemap of the website
This server includes rate limiting (10 requests/minute) to protect API keys.""",
)
# Configuration
BASE_URL = os.getenv('BASE_URL', 'https://example.com')
SITEMAP_PATH = Path(__file__).parent.parent / 'assets' / 'sitemap.xml'
# Rate limiting for API protection
class RateLimiter:
"""Simple rate limiter to protect API keys from abuse"""
def __init__(self, max_requests: int = 10, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, identifier: str) -> bool:
"""Check if request is allowed for this identifier"""
now = time.time()
# Clean old requests outside window
self.requests[identifier] = [
req_time
for req_time in self.requests[identifier]
if now - req_time < self.window_seconds
]
# Check if under limit
if len(self.requests[identifier]) < self.max_requests:
self.requests[identifier].append(now)
return True
return False
def get_reset_time(self, identifier: str) -> int:
"""Get seconds until rate limit resets"""
if not self.requests[identifier]:
return 0
oldest = min(self.requests[identifier])
return max(0, int(self.window_seconds - (time.time() - oldest)))
# Initialize rate limiter (10 requests per minute)
rate_limiter = RateLimiter(max_requests=10, window_seconds=60)
def fetch_sitemap() -> str:
"""Read sitemap.xml content from local file"""
if not SITEMAP_PATH.exists():
raise ValueError(f"Sitemap file not found at {SITEMAP_PATH}")
return SITEMAP_PATH.read_text()
def parse_sitemap_urls(sitemap_xml: str) -> List[str]:
"""Parse sitemap XML and extract URLs"""
try:
root = ET.fromstring(sitemap_xml)
# Handle namespace
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
# Try with namespace first
urls = root.findall('.//ns:url/ns:loc', namespace)
if not urls:
# Try without namespace
urls = root.findall('.//url/loc')
return [url.text for url in urls if url.text]
except Exception as e:
raise ValueError(f"Failed to parse sitemap: {str(e)}")
@mcp.resource('sitemap://sitemap.xml')
def get_sitemap() -> str:
"""
Serve the sitemap.xml file
Returns:
Content of the sitemap.xml file
"""
return fetch_sitemap()
@mcp.tool()
def list_pages() -> List[str]:
"""
List the path of all webpages from sitemap.xml
Returns:
List of page paths (e.g., ["/", "/blog", "/blog/post-1"])
"""
sitemap_content = fetch_sitemap()
urls = parse_sitemap_urls(sitemap_content)
# Convert full URLs to paths
paths = []
for url in urls:
parsed = urlparse(url)
path = parsed.path or '/'
paths.append(path)
return sorted(set(paths)) # Remove duplicates and sort
@mcp.tool()
async def get_kush_email() -> str:
"""
Extract the primary contact email address from kush.pw
Returns:
Email address string or empty string if not found
"""
prompt = """
TASK DESCRIPTION:
Objective
- Determine and return the primary contact email address published by the website at
https://kush.pw. If none can be found after limited on-site review, return an empty string.
What to extract
- Single email address that best represents the site's primary contact.
- Acceptable sources (in priority order):
1) Email addresses found in mailto: links in anchor tags.
2) Email addresses present in visible text content that match a strict email pattern (after
de-obfuscation if needed).
3) Email addresses present in structured/scripted content (e.g., JSON-LD with an "email"
field, inline script objects, or other structured markup).
4) Email-like values in meta tags (e.g., reply-to).
- If the landing page does not provide a valid email, consult at most one additional
in-domain page that is likely to contain contact information (choose the first internal link
whose text or href indicates "contact", "about", "support", or "email"). Do not leave the
domain kush.pw or its subdomains. Do not consult more than two pages in total (landing page +
at most one internal page).
Candidate discovery and interpretation rules
- Consider as candidates:
- mailto: anchors (extract the address only, not parameters like ?subject=).
- Text that matches a strict email regex (e.g., name@domain.tld).
- De-obfuscated formats such as:
- name [at] domain [dot] com
- name(at)domain(dot)com
- name at domain dot com
- name (at) domain.com
- name [@] domain . com
- Structured/scripted content (e.g., application/ld+json, inline JS objects) with fields
named "email".
- Meta tags whose content resembles an email (e.g., reply-to).
- Ignore images/screenshots and content that cannot be reliably converted to text.
- Treat external email addresses as valid candidates, but prefer addresses at kush.pw or its
subdomains during ranking.
Normalization
- For each candidate:
- Trim whitespace.
- Decode HTML entities and URL-encoding if present (e.g., %40 -> @).
- Remove mailto: prefix and any query parameters/fragments (anything after ? or #).
- Replace common obfuscations (e.g., [at], (at), " at " -> @; [dot], (dot), " dot " -> .),
removing surrounding spaces.
- Strip surrounding punctuation such as trailing periods, commas, semicolons, colons,
closing parentheses/brackets/quotes.
- Normalize domain to lowercase (local-part can remain as-is).
- Validate using a robust email pattern:
- Exactly one @, non-empty local and domain parts.
- Domain contains at least one dot with a valid TLD.
- No leading/trailing dots or hyphens in domain labels; no consecutive dots.
Ranking and selection
- Rank valid candidates using these rules, in order:
1) Presence in a mailto: link (highest priority).
2) Domain relevance: addresses at kush.pw or any subdomain of kush.pw outrank external
domains.
3) Non-role and non-no-reply preference: personal or named addresses outrank role accounts
(e.g., info@, support@, contact@, admin@, sales@, hello@, webmaster@, team@) and
no-reply/noreply@ addresses.
- Tie-breakers if still equal after the above:
- Prefer candidates found on the landing page over the secondary page.
- If still tied, select the first encountered in page order.
Limits and failure conditions
- Total pages considered: maximum two (the landing page plus at most one internal page likely
to contain contact information).
- Respect typical redirects; do not proceed if the destination is off-domain.
- If network issues, non-HTML content, or no valid candidates are discovered, the result is
an empty string.
Final output format
- Output exactly one value: the selected email address as a plain string with no surrounding
whitespace, no "mailto:", and no additional text.
- If no valid email is found, output an empty string (no characters, no whitespace, no
explanatory text).
"""
api_key = os.getenv("OPENAI_API_KEY")
llm = ChatOpenAI(model="gpt-4o", api_key=api_key, temperature=0)
# Concatenate inputs to task
full_task = prompt
browser = Browser(headless=False)
try:
await browser.start()
agent = Agent(task=full_task, browser=browser, llm=llm)
result = await asyncio.wait_for(agent.run(max_steps=15), timeout=180)
return result
except asyncio.TimeoutError:
return "TIMEOUT: Browser agent exceeded 180 seconds"
except Exception as e:
return f"ERROR: {str(e)}"
finally:
await browser.stop()
@mcp.tool()
def get_page(path: str, user_id: str = None) -> Dict[str, Any]:
"""
Get the HTML content of a webpage
Args:
path: Path to the webpage (e.g., "/blog/post-1")
user_id: Optional user identifier for rate limiting
Returns:
Dictionary with HTML content and metadata
"""
# Rate limiting check
identifier = user_id or 'default'
if not rate_limiter.is_allowed(identifier):
reset_time = rate_limiter.get_reset_time(identifier)
return {
'error': 'Rate limit exceeded',
'message': f'Too many requests. Please wait {reset_time} seconds before trying again.',
'reset_in_seconds': reset_time,
'limit': '10 requests per minute',
}
# Construct full URL
if not path.startswith('/'):
path = '/' + path
full_url = urljoin(BASE_URL, path)
try:
import requests
response = requests.get(full_url, timeout=10)
response.raise_for_status()
return {
'path': path,
'url': full_url,
'html': response.text,
'status_code': response.status_code,
'content_type': response.headers.get('Content-Type', 'unknown'),
}
except Exception as e:
return {
'error': 'Failed to fetch page',
'path': path,
'url': full_url,
'message': str(e),
}
def main():
"""Main entry point for the MCP server"""
import argparse
# Parse command line arguments
parser = argparse.ArgumentParser(description='Webpage Server MCP Server')
parser.add_argument('--port', type=int, help='Port for HTTP transport')
parser.add_argument(
'--host', type=str, default='0.0.0.0', help='Host for HTTP transport'
)
parser.add_argument('--stdio', action='store_true', help='Force STDIO transport')
parser.add_argument('--test', action='store_true', help='Test mode')
args = parser.parse_args()
# Check if running in test mode
if args.test:
# Test mode - just verify everything loads
print('Webpage Server MCP Server loaded successfully')
print(f'Base URL: {BASE_URL}')
print(f'Sitemap Path: {SITEMAP_PATH}')
print('Tools available: list_pages, get_page')
print('Resources available: sitemap.xml')
return 0
# Determine transport mode
if (args.port or os.environ.get('PORT')) and not args.stdio:
# HTTP transport mode
actual_host = host if not args.host else args.host
actual_port = port if not args.port else args.port
print(f'Starting HTTP server on {actual_host}:{actual_port}')
print(f'MCP endpoint: http://{actual_host}:{actual_port}/mcp')
mcp.run(transport='streamable-http')
else:
# STDIO transport (default for MCP)
mcp.run('stdio')
return 0
if __name__ == '__main__':
import sys
sys.exit(main())