simple_mcp_server.py•4.48 kB
#!/usr/bin/env python3
"""
Simple MCP Server for FastAPI Tools
"""
import json
import sys
def main():
"""Simple MCP server that responds to JSON-RPC requests"""
print("MCP Server started. Listening for requests...", file=sys.stderr)
try:
while True:
line = sys.stdin.readline()
if not line:
break
try:
request = json.loads(line.strip())
response = handle_request(request)
print(json.dumps(response))
sys.stdout.flush()
except json.JSONDecodeError:
continue
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
print(json.dumps(error_response))
sys.stdout.flush()
except KeyboardInterrupt:
print("MCP Server shutting down...", file=sys.stderr)
def handle_request(request):
"""Handle MCP protocol requests"""
method = request.get("method")
request_id = request.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "fastapi-mcp-server",
"version": "1.0.0"
}
}
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": [
{
"name": "calculate_sum",
"description": "Calculate the sum of two numbers",
"inputSchema": {
"type": "object",
"properties": {
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"}
},
"required": ["a", "b"]
}
},
{
"name": "greet_user",
"description": "Generate a greeting message for a user",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name of the person to greet"}
},
"required": ["name"]
}
}
]
}
}
elif method == "tools/call":
params = request.get("params", {})
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "calculate_sum":
a = arguments.get("a", 0)
b = arguments.get("b", 0)
result = a + b
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": f"The sum of {a} and {b} is {result}"}]
}
}
elif tool_name == "greet_user":
name = arguments.get("name", "World")
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": f"Hello, {name}! Welcome to our MCP server!"}]
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {tool_name}"
}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
if __name__ == "__main__":
main()