server.py•8.27 kB
from mcp.server import FastMCP
import base64
import json
from typing import Optional
# Create the MCP server instance
server = FastMCP(name="jwtAuditor-Mcp", version="0.1.0")
@server.tool()
def jwt_decode(token: str) -> dict:
"""Decode a JWT and return its header, payload, and signature (no verification)."""
try:
header_b64, payload_b64, signature_b64 = token.split(".")
def b64decode(data):
# Add padding if needed
rem = len(data) % 4
if rem:
data += '=' * (4 - rem)
return base64.urlsafe_b64decode(data.encode())
header = json.loads(b64decode(header_b64))
payload = json.loads(b64decode(payload_b64))
signature = signature_b64
return {
"header": header,
"payload": payload,
"signature": signature
}
except Exception as e:
return {"error": str(e)}
@server.tool()
def jwt_analyze(token: str) -> dict:
"""Analyze a JWT for common vulnerabilities and issues."""
import re
import time
findings = []
try:
header_b64, payload_b64, signature_b64 = token.split(".")
def b64decode(data):
rem = len(data) % 4
if rem:
data += '=' * (4 - rem)
return base64.urlsafe_b64decode(data.encode())
header = json.loads(b64decode(header_b64))
payload = json.loads(b64decode(payload_b64))
# 1. Algorithm vulnerabilities
alg = header.get("alg", "")
if alg.lower() == "none":
findings.append({"type": "alg_none", "severity": "critical", "description": "JWT uses 'none' algorithm (no signature)."})
if alg.lower() in ["hs256", "hs384", "hs512"]:
findings.append({"type": "weak_alg", "severity": "warning", "description": f"JWT uses symmetric algorithm: {alg}. Check for secret reuse and brute-force risk."})
if alg.lower() in ["rs256", "rs384", "rs512"]:
findings.append({"type": "asymmetric_alg", "severity": "info", "description": f"JWT uses asymmetric algorithm: {alg}."})
# 2. Sensitive data exposure
sensitive_patterns = [
(re.compile(r"(password|secret|api[_-]?key|access[_-]?token)", re.I), "Potential credential exposure"),
(re.compile(r"\b(?:\d[ -]*?){13,16}\b"), "Possible credit card number"),
(re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}"), "Possible email address (PII)")
]
for k, v in payload.items():
for pat, desc in sensitive_patterns:
if isinstance(v, str) and pat.search(v):
findings.append({"type": "sensitive_data", "severity": "warning", "description": f"{desc} in claim '{k}'"})
# 3. Missing security claims
for claim in ["exp", "iss", "aud", "jti"]:
if claim not in payload:
findings.append({"type": "missing_claim", "severity": "warning", "description": f"Missing recommended claim: {claim}"})
# 4. Header injection vulnerabilities (kid)
if "kid" in header:
if isinstance(header["kid"], str) and (".." in header["kid"] or "/" in header["kid"]):
findings.append({"type": "header_injection", "severity": "critical", "description": "Potential header injection in 'kid' parameter."})
# 5. Token lifetime and replay attack analysis
now = int(time.time())
if "exp" in payload:
try:
exp = int(payload["exp"])
if exp < now:
findings.append({"type": "expired", "severity": "critical", "description": "Token is expired."})
elif exp - now > 60*60*24*7:
findings.append({"type": "long_lifetime", "severity": "warning", "description": "Token lifetime is unusually long (> 7 days)."})
except Exception:
findings.append({"type": "invalid_exp", "severity": "warning", "description": "Invalid 'exp' claim format."})
# 6. Replay attack analysis (jti)
if "jti" not in payload:
findings.append({"type": "replay_attack", "severity": "info", "description": "No 'jti' claim: token may be replayable."})
# 7. Algorithm confusion (alg confusion attacks)
if alg.lower().startswith("hs") and "kid" in header and header["kid"].endswith(".pem"):
findings.append({"type": "alg_confusion", "severity": "critical", "description": "Potential algorithm confusion: HS* with 'kid' referencing a PEM file."})
return {"findings": findings, "header": header, "payload": payload}
except Exception as e:
return {"error": str(e)}
@server.tool()
def jwt_bruteforce(token: str, wordlist: Optional[list] = None) -> dict:
"""Bruteforce the secret for HS256/HS384/HS512 JWTs using a common wordlist or a custom one."""
import hmac
import hashlib
import base64
import json
import time
# Default wordlist (short for demo; in production, use a large list)
common_secrets = [
'secret', 'password', '123456', 'admin', 'jwtsecret', 'letmein', 'qwerty', 'iloveyou', 'welcome', 'monkey',
'abc123', '1q2w3e4r', 'test', 'changeme', 'default', 'passw0rd', 'supersecret', 'trustno1', 'hunter2', 'root'
]
secrets = wordlist if wordlist else common_secrets
try:
header_b64, payload_b64, signature_b64 = token.split(".")
def b64decode(data):
rem = len(data) % 4
if rem:
data += '=' * (4 - rem)
return base64.urlsafe_b64decode(data.encode())
def b64encode(data):
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
header = json.loads(b64decode(header_b64))
alg = header.get("alg", "").upper()
if alg not in ["HS256", "HS384", "HS512"]:
return {"error": f"Bruteforce only supported for HS256/HS384/HS512, got {alg}"}
hash_alg = {"HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512}[alg]
signing_input = f"{header_b64}.{payload_b64}".encode()
for secret in secrets:
sig = hmac.new(secret.encode(), signing_input, hash_alg).digest()
sig_b64 = b64encode(sig)
if sig_b64 == signature_b64:
return {"result": "success", "secret": secret}
return {"result": "not found", "tested": len(secrets)}
except Exception as e:
return {"error": str(e)}
@server.tool()
def jwt_generate(header: dict, payload: dict, alg: str, key: str) -> dict:
"""Generate a JWT with the given header, payload, algorithm, and key (HS* or RS*)."""
import base64
import json
import hmac
import hashlib
try:
def b64encode(data):
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
header_b64 = b64encode(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64encode(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
alg_upper = alg.upper()
if alg_upper in ["HS256", "HS384", "HS512"]:
hash_alg = {"HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512}[alg_upper]
sig = hmac.new(key.encode(), signing_input, hash_alg).digest()
signature_b64 = b64encode(sig)
elif alg_upper in ["RS256", "RS384", "RS512"]:
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
private_key = serialization.load_pem_private_key(key.encode(), password=None)
hash_alg = {"RS256": hashes.SHA256(), "RS384": hashes.SHA384(), "RS512": hashes.SHA512()}[alg_upper]
sig = private_key.sign(
signing_input,
padding.PKCS1v15(),
hash_alg
)
signature_b64 = b64encode(sig)
else:
return {"error": f"Unsupported algorithm: {alg}"}
jwt = f"{header_b64}.{payload_b64}.{signature_b64}"
return {"jwt": jwt}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
server.run("stdio")