server.pyā¢12.4 kB
#!/usr/bin/env python3
"""
MCP Server Implementation (Python)
This server provides various tools and resources that AI models can use
through the Model Context Protocol (MCP).
"""
import asyncio
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
except ImportError:
print("MCP SDK not installed. Install with: pip install mcp")
sys.exit(1)
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from dotenv import load_dotenv
import requests
import platform
import psutil
load_dotenv()
class MCPServer:
def __init__(self):
self.app = FastAPI(title="MCP Server", version="1.0.0")
self.setup_fastapi()
self.server = Server("custom-mcp-server")
self.setup_handlers()
def setup_fastapi(self):
"""Setup FastAPI server for HTTP interface"""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@self.app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": asyncio.get_event_loop().time()
}
@self.app.post("/mcp")
async def mcp_endpoint(request: dict):
# Handle MCP requests over HTTP
return {"message": "MCP HTTP transport not yet implemented"}
def setup_handlers(self):
"""Setup MCP request handlers"""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="read_file",
description="Read the contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the file to read",
}
},
"required": ["path"],
},
),
types.Tool(
name="write_file",
description="Write content to a file",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the file to write",
},
"content": {
"type": "string",
"description": "The content to write to the file",
},
},
"required": ["path", "content"],
},
),
types.Tool(
name="list_directory",
description="List the contents of a directory",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the directory to list",
}
},
"required": ["path"],
},
),
types.Tool(
name="get_system_info",
description="Get system information",
inputSchema={
"type": "object",
"properties": {},
},
),
types.Tool(
name="execute_command",
description="Execute a system command (use with caution)",
inputSchema={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The command to execute",
},
"cwd": {
"type": "string",
"description": "Working directory for the command",
},
},
"required": ["command"],
},
),
types.Tool(
name="fetch_url",
description="Fetch content from a URL",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch",
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST, etc.)",
"default": "GET",
},
"headers": {
"type": "object",
"description": "HTTP headers to include",
},
},
"required": ["url"],
},
),
]
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool calls"""
try:
if name == "read_file":
return await self.read_file(arguments["path"])
elif name == "write_file":
return await self.write_file(arguments["path"], arguments["content"])
elif name == "list_directory":
return await self.list_directory(arguments["path"])
elif name == "get_system_info":
return await self.get_system_info()
elif name == "execute_command":
return await self.execute_command(
arguments["command"], arguments.get("cwd")
)
elif name == "fetch_url":
return await self.fetch_url(
arguments["url"],
arguments.get("method", "GET"),
arguments.get("headers", {}),
)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as error:
return [types.TextContent(type="text", text=f"Error: {str(error)}")]
# Tool implementations
async def read_file(self, file_path: str) -> list[types.TextContent]:
"""Read file contents"""
try:
path = Path(file_path)
content = path.read_text(encoding="utf-8")
return [types.TextContent(type="text", text=content)]
except Exception as error:
raise Exception(f"Failed to read file: {str(error)}")
async def write_file(self, file_path: str, content: str) -> list[types.TextContent]:
"""Write content to file"""
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return [types.TextContent(type="text", text=f"Successfully wrote to {file_path}")]
except Exception as error:
raise Exception(f"Failed to write file: {str(error)}")
async def list_directory(self, dir_path: str) -> list[types.TextContent]:
"""List directory contents"""
try:
path = Path(dir_path)
items = []
for item in path.iterdir():
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"path": str(item),
})
return [types.TextContent(type="text", text=json.dumps(items, indent=2))]
except Exception as error:
raise Exception(f"Failed to list directory: {str(error)}")
async def get_system_info(self) -> list[types.TextContent]:
"""Get system information"""
info = {
"platform": platform.system(),
"arch": platform.machine(),
"hostname": platform.node(),
"cpus": psutil.cpu_count(),
"totalMemory": f"{round(psutil.virtual_memory().total / (1024**3))} GB",
"freeMemory": f"{round(psutil.virtual_memory().available / (1024**3))} GB",
"uptime": f"{round(psutil.boot_time())} seconds",
"pythonVersion": sys.version,
"currentDirectory": str(Path.cwd()),
}
return [types.TextContent(type="text", text=json.dumps(info, indent=2))]
async def execute_command(self, command: str, cwd: Optional[str] = None) -> list[types.TextContent]:
"""Execute system command"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
cwd=cwd or Path.cwd(),
timeout=30,
)
output = f"Command: {command}\nOutput:\n{result.stdout}"
if result.stderr:
output += f"\nErrors:\n{result.stderr}"
return [types.TextContent(type="text", text=output)]
except Exception as error:
raise Exception(f"Command execution failed: {str(error)}")
async def fetch_url(
self, url: str, method: str = "GET", headers: Optional[dict] = None
) -> list[types.TextContent]:
"""Fetch content from URL"""
try:
response = requests.request(
method=method,
url=url,
headers=headers or {},
timeout=30,
)
response_info = {
"status": response.status_code,
"statusText": response.reason,
"headers": dict(response.headers),
"content": response.text,
}
return [types.TextContent(type="text", text=json.dumps(response_info, indent=2))]
except Exception as error:
raise Exception(f"Failed to fetch URL: {str(error)}")
async def start_stdio(self):
"""Start MCP server with stdio transport"""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="custom-mcp-server",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def start_http(self, port: int = 3000):
"""Start HTTP server"""
print(f"Starting MCP Server HTTP interface on port {port}")
print(f"Health check: http://localhost:{port}/health")
uvicorn.run(self.app, host="0.0.0.0", port=port)
def main():
"""Main entry point"""
server = MCPServer()
# Check if we should start HTTP server or stdio
if len(sys.argv) > 1 and sys.argv[1] == "--http":
port = int(os.getenv("PORT", 3000))
server.start_http(port)
else:
print("MCP Server started with stdio transport")
asyncio.run(server.start_stdio())
if __name__ == "__main__":
main()