"""Core MCP server implementation for Percepta."""
import asyncio
import logging
import json
import argparse
from typing import Any, Dict, List, Optional, Sequence, cast
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource
)
from .ai_router import get_ai_router
from .tools.browser_tools import BrowserAutomation
from .tools.visual_tools import VisualAnalysis
from .tools.web_tools import WebScraper
from .tools.test_generator import AutomatedTestGenerator
from .tools.devtools_analyzer import DevToolsAnalyzer
from .config import Settings, get_default_settings
logger = logging.getLogger(__name__)
class PerceptaMCPServer:
"""Main MCP server for Percepta."""
def __init__(self, settings: Optional[Settings] = None):
self.settings = settings or get_default_settings()
self.server = Server("percepta-mcp")
self.ai_router = get_ai_router(self.settings)
# Initialize tools
self.browser_automation = BrowserAutomation(self.settings)
self.visual_analysis = VisualAnalysis(self.settings)
self.web_scraper = WebScraper(self.settings)
self.test_generator = AutomatedTestGenerator(self.settings, self.ai_router, self.browser_automation)
self.devtools_analyzer = DevToolsAnalyzer(self.settings, self.browser_automation)
# Setup MCP handlers
self._setup_handlers()
def _setup_handlers(self):
"""Setup MCP protocol handlers."""
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]: # type: ignore
"""List available tools."""
tools: List[Tool] = []
# Browser automation tools
tools.extend([
Tool(
name="browser_navigate",
description="Navigate to a URL and capture a screenshot",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to navigate to"},
"wait_for": {"type": "string", "description": "CSS selector to wait for (optional)"},
"timeout": {"type": "number", "description": "Timeout in milliseconds (default: 30000)"}
},
"required": ["url"]
}
),
Tool(
name="browser_click",
description="Click on an element in the browser",
inputSchema={
"type": "object",
"properties": {
"selector": {"type": "string", "description": "CSS selector or XPath of element to click"},
"wait_for_navigation": {"type": "boolean", "description": "Wait for navigation after click (default: false)"}
},
"required": ["selector"]
}
),
Tool(
name="browser_fill",
description="Fill a form field in the browser",
inputSchema={
"type": "object",
"properties": {
"selector": {"type": "string", "description": "CSS selector of input field"},
"value": {"type": "string", "description": "Value to fill"},
"clear_first": {"type": "boolean", "description": "Clear field before filling (default: true)"}
},
"required": ["selector", "value"]
}
),
Tool(
name="browser_screenshot",
description="Take a screenshot of the current page",
inputSchema={
"type": "object",
"properties": {
"full_page": {"type": "boolean", "description": "Capture full page (default: false)"},
"element_selector": {"type": "string", "description": "Capture specific element only"}
}
}
),
Tool(
name="browser_extract_text",
description="Extract text from the current page or specific elements",
inputSchema={
"type": "object",
"properties": {
"selector": {"type": "string", "description": "CSS selector to extract text from (optional)"},
"include_links": {"type": "boolean", "description": "Include link URLs (default: false)"}
}
}
),
Tool(
name="browser_wait_for_element",
description="Wait for an element to appear on the page",
inputSchema={
"type": "object",
"properties": {
"selector": {"type": "string", "description": "CSS selector to wait for"},
"timeout": {"type": "number", "description": "Timeout in milliseconds (default: 30000)"},
"state": {"type": "string", "description": "Element state to wait for (visible, hidden, attached, detached)"}
},
"required": ["selector"]
}
)
])
# Visual analysis tools
tools.extend([
Tool(
name="analyze_image",
description="Analyze an image using AI vision models",
inputSchema={
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file or base64 encoded image"},
"prompt": {"type": "string", "description": "Analysis prompt or question about the image"},
"model": {"type": "string", "description": "AI model to use for analysis (optional)"}
},
"required": ["image_path", "prompt"]
}
),
Tool(
name="extract_text_from_image",
description="Extract text from an image using OCR",
inputSchema={
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file"},
"language": {"type": "string", "description": "OCR language (default: eng)"},
"preprocess": {"type": "boolean", "description": "Apply image preprocessing (default: true)"}
},
"required": ["image_path"]
}
),
Tool(
name="compare_images",
description="Compare two images and find differences",
inputSchema={
"type": "object",
"properties": {
"image1_path": {"type": "string", "description": "Path to first image"},
"image2_path": {"type": "string", "description": "Path to second image"},
"threshold": {"type": "number", "description": "Difference threshold (0.0-1.0, default: 0.1)"}
},
"required": ["image1_path", "image2_path"]
}
)
])
# Web scraping tools
tools.extend([
Tool(
name="scrape_website",
description="Scrape content from a website",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to scrape"},
"selectors": {"type": "array", "items": {"type": "string"}, "description": "CSS selectors to extract"},
"follow_links": {"type": "boolean", "description": "Follow internal links (default: false)"},
"max_pages": {"type": "number", "description": "Maximum pages to scrape (default: 1)"}
},
"required": ["url"]
}
),
Tool(
name="extract_structured_data",
description="Extract structured data from HTML",
inputSchema={
"type": "object",
"properties": {
"html": {"type": "string", "description": "HTML content to parse"},
"schema": {"type": "object", "description": "JSON schema for extraction"},
"ai_assist": {"type": "boolean", "description": "Use AI to assist extraction (default: false)"}
},
"required": ["html"]
}
)
])
# AI interaction tools
tools.extend([
Tool(
name="ai_analyze",
description="Analyze content using AI models",
inputSchema={
"type": "object",
"properties": {
"content": {"type": "string", "description": "Content to analyze"},
"task": {"type": "string", "description": "Analysis task or prompt"},
"model": {"type": "string", "description": "Specific AI model to use (optional)"},
"context": {"type": "object", "description": "Additional context for analysis"}
},
"required": ["content", "task"]
}
),
Tool(
name="ai_generate",
description="Generate content using AI models",
inputSchema={
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Generation prompt"},
"model": {"type": "string", "description": "Specific AI model to use (optional)"},
"max_tokens": {"type": "number", "description": "Maximum tokens to generate"},
"temperature": {"type": "number", "description": "Generation temperature (0.0-2.0)"}
},
"required": ["prompt"]
}
),
Tool(
name="ai_chat",
description="Interactive chat with AI models",
inputSchema={
"type": "object",
"properties": {
"message": {"type": "string", "description": "Chat message"},
"conversation_id": {"type": "string", "description": "Conversation ID for context"},
"model": {"type": "string", "description": "Specific AI model to use (optional)"},
"system_prompt": {"type": "string", "description": "System prompt for the conversation"}
},
"required": ["message"]
}
)
])
# Test generation tools
tools.extend([
Tool(
name="generate_test_case",
description="Generate automated test cases for web pages",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL of the page to test"},
"description": {"type": "string", "description": "Description of the test scenario"},
"test_type": {"type": "string", "enum": ["navigation", "form", "visual", "accessibility", "e2e"], "description": "Type of test to generate"},
"elements": {"type": "array", "items": {"type": "object"}, "description": "Specific elements to test"}
},
"required": ["url", "description"]
}
),
Tool(
name="execute_test_case",
description="Execute a previously generated test case",
inputSchema={
"type": "object",
"properties": {
"test_case_path": {"type": "string", "description": "Path to the test case file"},
"timeout": {"type": "number", "description": "Test execution timeout in seconds"}
},
"required": ["test_case_path"]
}
),
Tool(
name="generate_and_execute_test",
description="Generate and immediately execute a test case",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL of the page to test"},
"description": {"type": "string", "description": "Description of the test scenario"},
"test_type": {"type": "string", "enum": ["navigation", "form", "visual", "accessibility", "e2e"], "description": "Type of test to generate"},
"elements": {"type": "array", "items": {"type": "object"}, "description": "Specific elements to test"}
},
"required": ["url", "description"]
}
)
])
# DevTools analysis tools
tools.extend([
Tool(
name="start_devtools_monitoring",
description="Start monitoring DevTools data for a web page",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to monitor"},
"monitoring_types": {"type": "array", "items": {"type": "string"}, "description": "Types of data to monitor (console, network, performance)"}
},
"required": ["url"]
}
),
Tool(
name="collect_devtools_data",
description="Collect current DevTools data from monitored page",
inputSchema={
"type": "object",
"properties": {
"include_raw_data": {"type": "boolean", "description": "Include raw DevTools data in response"}
}
}
),
Tool(
name="analyze_devtools_anomalies",
description="Analyze DevTools data to detect anomalies and issues",
inputSchema={
"type": "object",
"properties": {
"severity_filter": {"type": "string", "enum": ["all", "high", "medium", "low"], "description": "Filter anomalies by severity level"}
}
}
),
Tool(
name="generate_devtools_report",
description="Generate comprehensive DevTools analysis report",
inputSchema={
"type": "object",
"properties": {
"include_recommendations": {"type": "boolean", "description": "Include improvement recommendations"},
"format": {"type": "string", "enum": ["json", "html", "markdown"], "description": "Report format"}
}
}
),
Tool(
name="monitor_and_analyze_page",
description="Complete monitoring and analysis of a web page",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to monitor and analyze"},
"duration_seconds": {"type": "number", "description": "Monitoring duration in seconds (default: 30)"},
"include_performance": {"type": "boolean", "description": "Include performance analysis"}
},
"required": ["url"]
}
)
])
return tools
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> Sequence[TextContent | ImageContent | EmbeddedResource]: # type: ignore
"""Handle tool calls."""
try:
result = await self._execute_tool(name, arguments)
# Convert result to MCP content format
if isinstance(result, dict):
result_dict = cast(Dict[str, Any], result)
if "image" in result_dict:
# Type-safe dictionary access
image_data = result_dict.get("image")
mime_type = result_dict.get("mime_type", "image/png")
if image_data is not None:
return [
ImageContent(
type="image",
data=str(image_data), # Ensure string type
mimeType=str(mime_type) # Ensure string type
)
]
return [TextContent(type="text", text=json.dumps(result_dict, indent=2))]
elif isinstance(result, str):
return [TextContent(type="text", text=result)]
else:
return [TextContent(type="text", text=str(result))]
except Exception as e:
logger.error(f"Tool execution failed: {name} - {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _execute_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Execute a specific tool."""
# Browser automation tools
if name == "browser_navigate":
return await self.browser_automation.navigate(**arguments)
elif name == "browser_click":
return await self.browser_automation.click(**arguments)
elif name == "browser_fill":
return await self.browser_automation.fill(**arguments)
elif name == "browser_screenshot":
return await self.browser_automation.screenshot(**arguments)
elif name == "browser_extract_text":
return await self.browser_automation.extract_text(**arguments)
elif name == "browser_wait_for_element":
return await self.browser_automation.wait_for_element(**arguments)
# Visual analysis tools
elif name == "analyze_image":
return await self.visual_analysis.analyze_image(**arguments)
elif name == "extract_text_from_image":
return await self.visual_analysis.extract_text(**arguments)
elif name == "compare_images":
return await self.visual_analysis.compare_images(**arguments)
# Web scraping tools
elif name == "scrape_website":
return await self.web_scraper.scrape(**arguments)
elif name == "extract_structured_data":
# Use scrape with structured extraction type
return await self.web_scraper.scrape(extract_type="structured", **arguments)
# AI interaction tools
elif name == "ai_analyze":
return await self._ai_analyze(**arguments)
elif name == "ai_generate":
return await self._ai_generate(**arguments)
elif name == "ai_chat":
return await self._ai_chat(**arguments)
# Test generation tools
elif name == "generate_test_case":
return await self.test_generator.generate_test_case(**arguments)
elif name == "execute_test_case":
return await self.test_generator.execute_test_case(**arguments)
elif name == "generate_and_execute_test":
return await self.test_generator.generate_and_execute_test(**arguments)
# DevTools analysis tools
elif name == "start_devtools_monitoring":
return await self.devtools_analyzer.start_monitoring(**arguments)
elif name == "collect_devtools_data":
return await self.devtools_analyzer.collect_devtools_data(**arguments)
elif name == "analyze_devtools_anomalies":
return await self.devtools_analyzer.analyze_anomalies(**arguments)
elif name == "generate_devtools_report":
return await self.devtools_analyzer.generate_feedback_report(**arguments)
elif name == "monitor_and_analyze_page":
return await self.devtools_analyzer.monitor_and_analyze(**arguments)
else:
raise ValueError(f"Unknown tool: {name}")
async def _ai_analyze(self, content: str, task: str, model: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Analyze content using AI."""
prompt = f"Task: {task}\n\nContent to analyze:\n{content}"
if context:
prompt += f"\n\nAdditional context:\n{json.dumps(context, indent=2)}"
response = await self.ai_router.generate(prompt, provider_name=model)
return {
"analysis": response.content,
"provider": response.provider,
"model": response.model,
"tokens_used": response.tokens_used,
"cost": response.cost,
"response_time": response.response_time,
"error": response.error
}
async def _ai_generate(self, prompt: str, model: Optional[str] = None, **kwargs: Any) -> Dict[str, Any]:
"""Generate content using AI."""
response = await self.ai_router.generate(prompt, provider_name=model, **kwargs)
return {
"content": response.content,
"generated_content": response.content,
"provider": response.provider,
"model": response.model,
"tokens_used": response.tokens_used,
"cost": response.cost,
"response_time": response.response_time,
"error": response.error
}
async def _ai_chat(self, message: str, conversation_id: Optional[str] = None,
model: Optional[str] = None, system_prompt: Optional[str] = None) -> Dict[str, Any]:
"""Interactive chat with AI."""
# TODO: Implement conversation context management
prompt = message
if system_prompt:
prompt = f"System: {system_prompt}\n\nUser: {message}"
response = await self.ai_router.generate(prompt, provider_name=model)
return {
"reply": response.content,
"conversation_id": conversation_id or "default",
"provider": response.provider,
"model": response.model,
"tokens_used": response.tokens_used,
"cost": response.cost,
"response_time": response.response_time,
"error": response.error
}
async def start(self):
"""Start the MCP server."""
logger.info("Starting Percepta MCP Server...")
# Browser automation initializes on demand
logger.info("Percepta MCP Server started successfully")
async def stop(self):
"""Stop the MCP server."""
logger.info("Stopping Percepta MCP Server...")
# Cleanup resources - Make sure both close methods are called even if one fails
browser_error = None
try:
await self.browser_automation.close()
except Exception as e:
logger.error(f"Error during browser automation cleanup: {e}")
browser_error = e
try:
await self.web_scraper.close()
except Exception as e:
logger.error(f"Error during web scraper cleanup: {e}")
if browser_error:
logger.error(f"Error during cleanup: {browser_error}")
logger.info("Percepta MCP Server stopped")
def get_server(self) -> Server[Any, Any]:
"""Get the MCP server instance."""
return self.server
async def main():
"""Main entry point for running the server."""
import argparse
parser = argparse.ArgumentParser(description="Percepta MCP Server")
parser.add_argument("--config", type=str, help="Path to configuration file")
parser.add_argument("--log-level", type=str, default="INFO", help="Log level")
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=getattr(logging, args.log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Load settings
if args.config:
settings = Settings.load_from_yaml(args.config)
else:
settings = get_default_settings()
# Create server
percepta_server = PerceptaMCPServer(settings)
# Run the server using stdio
try:
async with stdio_server() as (read_stream, write_stream):
await percepta_server.start()
logger.info("Percepta MCP Server started with stdio transport")
# Run the server
await percepta_server.server.run(
read_stream,
write_stream,
percepta_server.server.create_initialization_options()
)
except Exception as e:
logger.error(f"Server error: {e}")
raise
finally:
await percepta_server.stop()
if __name__ == "__main__":
asyncio.run(main())