Skip to main content
Glama

JWT Auditor MCP Server

by mohdhaji87
server.py8.27 kB
from mcp.server import FastMCP import base64 import json from typing import Optional # Create the MCP server instance server = FastMCP(name="jwtAuditor-Mcp", version="0.1.0") @server.tool() def jwt_decode(token: str) -> dict: """Decode a JWT and return its header, payload, and signature (no verification).""" try: header_b64, payload_b64, signature_b64 = token.split(".") def b64decode(data): # Add padding if needed rem = len(data) % 4 if rem: data += '=' * (4 - rem) return base64.urlsafe_b64decode(data.encode()) header = json.loads(b64decode(header_b64)) payload = json.loads(b64decode(payload_b64)) signature = signature_b64 return { "header": header, "payload": payload, "signature": signature } except Exception as e: return {"error": str(e)} @server.tool() def jwt_analyze(token: str) -> dict: """Analyze a JWT for common vulnerabilities and issues.""" import re import time findings = [] try: header_b64, payload_b64, signature_b64 = token.split(".") def b64decode(data): rem = len(data) % 4 if rem: data += '=' * (4 - rem) return base64.urlsafe_b64decode(data.encode()) header = json.loads(b64decode(header_b64)) payload = json.loads(b64decode(payload_b64)) # 1. Algorithm vulnerabilities alg = header.get("alg", "") if alg.lower() == "none": findings.append({"type": "alg_none", "severity": "critical", "description": "JWT uses 'none' algorithm (no signature)."}) if alg.lower() in ["hs256", "hs384", "hs512"]: findings.append({"type": "weak_alg", "severity": "warning", "description": f"JWT uses symmetric algorithm: {alg}. Check for secret reuse and brute-force risk."}) if alg.lower() in ["rs256", "rs384", "rs512"]: findings.append({"type": "asymmetric_alg", "severity": "info", "description": f"JWT uses asymmetric algorithm: {alg}."}) # 2. Sensitive data exposure sensitive_patterns = [ (re.compile(r"(password|secret|api[_-]?key|access[_-]?token)", re.I), "Potential credential exposure"), (re.compile(r"\b(?:\d[ -]*?){13,16}\b"), "Possible credit card number"), (re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}"), "Possible email address (PII)") ] for k, v in payload.items(): for pat, desc in sensitive_patterns: if isinstance(v, str) and pat.search(v): findings.append({"type": "sensitive_data", "severity": "warning", "description": f"{desc} in claim '{k}'"}) # 3. Missing security claims for claim in ["exp", "iss", "aud", "jti"]: if claim not in payload: findings.append({"type": "missing_claim", "severity": "warning", "description": f"Missing recommended claim: {claim}"}) # 4. Header injection vulnerabilities (kid) if "kid" in header: if isinstance(header["kid"], str) and (".." in header["kid"] or "/" in header["kid"]): findings.append({"type": "header_injection", "severity": "critical", "description": "Potential header injection in 'kid' parameter."}) # 5. Token lifetime and replay attack analysis now = int(time.time()) if "exp" in payload: try: exp = int(payload["exp"]) if exp < now: findings.append({"type": "expired", "severity": "critical", "description": "Token is expired."}) elif exp - now > 60*60*24*7: findings.append({"type": "long_lifetime", "severity": "warning", "description": "Token lifetime is unusually long (> 7 days)."}) except Exception: findings.append({"type": "invalid_exp", "severity": "warning", "description": "Invalid 'exp' claim format."}) # 6. Replay attack analysis (jti) if "jti" not in payload: findings.append({"type": "replay_attack", "severity": "info", "description": "No 'jti' claim: token may be replayable."}) # 7. Algorithm confusion (alg confusion attacks) if alg.lower().startswith("hs") and "kid" in header and header["kid"].endswith(".pem"): findings.append({"type": "alg_confusion", "severity": "critical", "description": "Potential algorithm confusion: HS* with 'kid' referencing a PEM file."}) return {"findings": findings, "header": header, "payload": payload} except Exception as e: return {"error": str(e)} @server.tool() def jwt_bruteforce(token: str, wordlist: Optional[list] = None) -> dict: """Bruteforce the secret for HS256/HS384/HS512 JWTs using a common wordlist or a custom one.""" import hmac import hashlib import base64 import json import time # Default wordlist (short for demo; in production, use a large list) common_secrets = [ 'secret', 'password', '123456', 'admin', 'jwtsecret', 'letmein', 'qwerty', 'iloveyou', 'welcome', 'monkey', 'abc123', '1q2w3e4r', 'test', 'changeme', 'default', 'passw0rd', 'supersecret', 'trustno1', 'hunter2', 'root' ] secrets = wordlist if wordlist else common_secrets try: header_b64, payload_b64, signature_b64 = token.split(".") def b64decode(data): rem = len(data) % 4 if rem: data += '=' * (4 - rem) return base64.urlsafe_b64decode(data.encode()) def b64encode(data): return base64.urlsafe_b64encode(data).rstrip(b'=').decode() header = json.loads(b64decode(header_b64)) alg = header.get("alg", "").upper() if alg not in ["HS256", "HS384", "HS512"]: return {"error": f"Bruteforce only supported for HS256/HS384/HS512, got {alg}"} hash_alg = {"HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512}[alg] signing_input = f"{header_b64}.{payload_b64}".encode() for secret in secrets: sig = hmac.new(secret.encode(), signing_input, hash_alg).digest() sig_b64 = b64encode(sig) if sig_b64 == signature_b64: return {"result": "success", "secret": secret} return {"result": "not found", "tested": len(secrets)} except Exception as e: return {"error": str(e)} @server.tool() def jwt_generate(header: dict, payload: dict, alg: str, key: str) -> dict: """Generate a JWT with the given header, payload, algorithm, and key (HS* or RS*).""" import base64 import json import hmac import hashlib try: def b64encode(data): return base64.urlsafe_b64encode(data).rstrip(b'=').decode() header_b64 = b64encode(json.dumps(header, separators=(",", ":")).encode()) payload_b64 = b64encode(json.dumps(payload, separators=(",", ":")).encode()) signing_input = f"{header_b64}.{payload_b64}".encode() alg_upper = alg.upper() if alg_upper in ["HS256", "HS384", "HS512"]: hash_alg = {"HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512}[alg_upper] sig = hmac.new(key.encode(), signing_input, hash_alg).digest() signature_b64 = b64encode(sig) elif alg_upper in ["RS256", "RS384", "RS512"]: from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding private_key = serialization.load_pem_private_key(key.encode(), password=None) hash_alg = {"RS256": hashes.SHA256(), "RS384": hashes.SHA384(), "RS512": hashes.SHA512()}[alg_upper] sig = private_key.sign( signing_input, padding.PKCS1v15(), hash_alg ) signature_b64 = b64encode(sig) else: return {"error": f"Unsupported algorithm: {alg}"} jwt = f"{header_b64}.{payload_b64}.{signature_b64}" return {"jwt": jwt} except Exception as e: return {"error": str(e)} if __name__ == "__main__": server.run("stdio")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mohdhaji87/JWTAuditorMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server