server.pyโข8.86 kB
#!/usr/bin/env python3
"""
MCP Web Research Scraper Server
Converts the web scraper into an MCP tool for research automation
"""
import asyncio
import json
import logging
from typing import Any, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
from .scraper import MCPWebScraper
from .database import DatabaseManager
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-web-scraper")
app = Server("web-research-scraper")
class WebResearchScraperServer:
def __init__(self):
self.db_manager = DatabaseManager()
self.scraper = MCPWebScraper(self.db_manager)
self.active_sessions = {}
# Create server instance
server_instance = WebResearchScraperServer()
@app.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="scrape_url",
description="Scrape a single URL for specific keywords",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL to scrape"
},
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "Keywords to search for"
},
"extract_links": {
"type": "boolean",
"description": "Whether to extract and follow internal links",
"default": False
},
"max_depth": {
"type": "integer",
"description": "Maximum crawl depth if extract_links is True",
"default": 1
}
},
"required": ["url", "keywords"]
}
),
types.Tool(
name="search_and_scrape",
description="Search for a query and scrape the results",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "Keywords to search for in results"
},
"search_engine_url": {
"type": "string",
"description": "Search engine URL (defaults to SearXNG)",
"default": "https://searx.gophernuttz.us/search/"
},
"max_results": {
"type": "integer",
"description": "Maximum number of search results to process",
"default": 10
}
},
"required": ["query", "keywords"]
}
),
types.Tool(
name="get_scraping_results",
description="Get scraping results from the database",
inputSchema={
"type": "object",
"properties": {
"keyword_filter": {
"type": "string",
"description": "Filter results by specific keyword"
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 50
}
}
}
),
types.Tool(
name="export_results",
description="Export scraping results to various formats",
inputSchema={
"type": "object",
"properties": {
"format": {
"type": "string",
"enum": ["json", "markdown", "csv"],
"description": "Export format",
"default": "json"
},
"keyword_filter": {
"type": "string",
"description": "Filter results by specific keyword"
},
"output_path": {
"type": "string",
"description": "Output file path (optional)"
}
},
"required": ["format"]
}
),
types.Tool(
name="get_scraping_stats",
description="Get scraping statistics and status",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]:
"""Handle tool calls"""
logger.info(f"Tool call: {name} with args: {arguments}")
try:
if name == "scrape_url":
result = await server_instance.scraper.scrape_url(
url=arguments["url"],
keywords=arguments["keywords"],
extract_links=arguments.get("extract_links", False),
max_depth=arguments.get("max_depth", 1)
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "search_and_scrape":
result = await server_instance.scraper.search_and_scrape(
query=arguments["query"],
keywords=arguments["keywords"],
search_engine_url=arguments.get("search_engine_url", "https://searx.gophernuttz.us/search/"),
max_results=arguments.get("max_results", 10)
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_scraping_results":
result = server_instance.db_manager.get_scraping_results(
keyword_filter=arguments.get("keyword_filter"),
limit=arguments.get("limit", 50)
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "export_results":
result = server_instance.scraper.export_results(
format=arguments["format"],
keyword_filter=arguments.get("keyword_filter"),
output_path=arguments.get("output_path")
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_scraping_stats":
result = server_instance.scraper.get_stats()
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Tool error: {e}")
return [types.TextContent(type="text", text=json.dumps({"error": str(e)}, indent=2))]
@app.list_resources()
async def handle_list_resources() -> list[types.Resource]:
"""List available resources"""
return [
types.Resource(
uri="scraper://stats/overview",
name="Scraping Statistics",
description="Current scraping statistics and status"
),
types.Resource(
uri="scraper://config/settings",
name="Scraper Configuration",
description="Current scraper configuration settings"
)
]
@app.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read resources"""
if uri == "scraper://stats/overview":
stats = server_instance.scraper.get_stats()
return json.dumps(stats, indent=2)
elif uri == "scraper://config/settings":
config = server_instance.scraper.get_config()
return json.dumps(config, indent=2)
else:
raise ValueError(f"Unknown resource: {uri}")
async def main():
"""Main entry point"""
# Run the server
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="web-research-scraper",
server_version="0.1.0",
capabilities=app.get_capabilities(
notification_options=None,
experimental_capabilities=None,
),
),
)
if __name__ == "__main__":
asyncio.run(main())