#!/usr/bin/env python3
"""
Calculator MCP Server with JWT Authentication
Requires: pip install PyJWT
"""
import asyncio
import os
import jwt
from datetime import datetime, timedelta
from typing import Any
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio
# JWT Configuration
JWT_SECRET = os.getenv("JWT_SECRET", "your-secret-key-change-this")
JWT_ALGORITHM = "HS256"
app = Server("calculator-jwt-server")
def verify_jwt_token(token: str) -> bool:
"""Verify JWT token."""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
# Check expiration
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp) < datetime.now():
return False
# Check if user has calculator permission
permissions = payload.get("permissions", [])
return "calculator" in permissions
except jwt.InvalidTokenError:
return False
except Exception:
return False
def generate_token(user_id: str, permissions: list) -> str:
"""Generate JWT token (for testing)."""
payload = {
"user_id": user_id,
"permissions": permissions,
"exp": datetime.now() + timedelta(hours=24)
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available calculator tools."""
return [
Tool(
name="add",
description="Add two numbers (requires JWT token)",
inputSchema={
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"},
"jwt_token": {"type": "string", "description": "JWT authentication token"}
},
"required": ["a", "b", "jwt_token"]
}
),
Tool(
name="generate_test_token",
description="Generate a test JWT token (for development only)",
inputSchema={
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User identifier"}
},
"required": ["user_id"]
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls with JWT authentication."""
if name == "generate_test_token":
user_id = arguments.get("user_id", "test_user")
token = generate_token(user_id, ["calculator"])
return [TextContent(
type="text",
text=f"Test JWT Token: {token}\n\nThis token is valid for 24 hours."
)]
# Verify JWT token for other operations
jwt_token = arguments.get("jwt_token", "")
if not verify_jwt_token(jwt_token):
return [TextContent(
type="text",
text="Error: Invalid or expired JWT token. Use 'generate_test_token' to get a valid token."
)]
try:
if name == "add":
a = arguments["a"]
b = arguments["b"]
result = a + b
return [TextContent(
type="text",
text=f"{a} + {b} = {result}"
)]
else:
return [TextContent(
type="text",
text=f"Error: Unknown tool '{name}'"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
async def main():
"""Run the calculator MCP server with JWT authentication."""
print("Starting calculator server with JWT authentication")
print(f"JWT Secret: {JWT_SECRET[:10]}... (first 10 chars)")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())