Skip to main content
Glama

Have I Been Pwned MCP Server

index.ts14.3 kB
#!/usr/bin/env node /** * Have I Been Pwned MCP Server * * This server provides integration with the Have I Been Pwned API to check if: * - Email addresses have been found in data breaches * - Passwords have been exposed in data breaches (using k-anonymity) * - Accounts have been compromised in specific breaches */ import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError, } from "@modelcontextprotocol/sdk/types.js"; import axios, { AxiosInstance } from "axios"; import crypto from "crypto"; // API key should be provided as an environment variable const API_KEY = process.env.HIBP_API_KEY; /** * Have I Been Pwned MCP Server implementation */ class HibpServer { private server: Server; private axiosInstance: AxiosInstance; constructor() { // Initialize the MCP server this.server = new Server( { name: "hibp-mcp-server", version: "0.1.0", }, { capabilities: { tools: {}, }, } ); // Check if API key is provided if (!API_KEY) { console.error("Warning: HIBP_API_KEY environment variable is not set"); console.error("The server will start but API calls will fail"); } // Initialize Axios instance for API calls this.axiosInstance = axios.create({ baseURL: "https://haveibeenpwned.com/api/v3", headers: { "User-Agent": "hibp-mcp-server/0.1.0", "hibp-api-key": API_KEY, }, }); // Set up tool handlers this.setupToolHandlers(); // Error handling this.server.onerror = (error) => console.error("[MCP Error]", error); process.on("SIGINT", async () => { await this.server.close(); process.exit(0); }); } /** * Set up handlers for the MCP tools */ private setupToolHandlers() { // List available tools this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ { name: "check_email", description: "Check if an email address has been found in data breaches", inputSchema: { type: "object", properties: { email: { type: "string", description: "Email address to check", }, include_unverified: { type: "boolean", description: "Include unverified breaches in the results", default: true, }, truncate_response: { type: "boolean", description: "Truncate the response to only include breach names", default: false, }, }, required: ["email"], }, }, { name: "check_password", description: "Check if a password has been exposed in data breaches (using k-anonymity)", inputSchema: { type: "object", properties: { password: { type: "string", description: "Password to check", }, }, required: ["password"], }, }, { name: "get_breach_details", description: "Get details about a specific data breach", inputSchema: { type: "object", properties: { breach_name: { type: "string", description: "Name of the breach to get details for", }, }, required: ["breach_name"], }, }, { name: "list_all_breaches", description: "List all breaches in the system", inputSchema: { type: "object", properties: { domain: { type: "string", description: "Filter breaches by domain", }, }, }, }, ], })); // Handle tool calls this.server.setRequestHandler(CallToolRequestSchema, async (request) => { // Check if API key is available for endpoints that require it if (!API_KEY && request.params.name !== "check_password") { return { content: [ { type: "text", text: "Error: Have I Been Pwned API key is not configured. Please set the HIBP_API_KEY environment variable.", }, ], isError: true, }; } try { switch (request.params.name) { case "check_email": return await this.handleCheckEmail(request.params.arguments); case "check_password": return await this.handleCheckPassword(request.params.arguments); case "get_breach_details": return await this.handleGetBreachDetails(request.params.arguments); case "list_all_breaches": return await this.handleListAllBreaches(request.params.arguments); default: throw new McpError( ErrorCode.MethodNotFound, `Unknown tool: ${request.params.name}` ); } } catch (error) { if (axios.isAxiosError(error)) { // Handle 404 for email not found in breaches if (error.response?.status === 404 && request.params.name === "check_email") { return { content: [ { type: "text", text: "Good news! This email address has not been found in any known data breaches.", }, ], }; } return { content: [ { type: "text", text: `API Error: ${error.response?.data?.message || error.message} (Status: ${error.response?.status || "unknown"})`, }, ], isError: true, }; } throw error; } }); } /** * Handle the check_email tool */ private async handleCheckEmail(args: any) { if (!args.email || typeof args.email !== "string") { throw new McpError( ErrorCode.InvalidParams, "Email address is required" ); } const params: Record<string, any> = {}; if (args.include_unverified !== undefined) { params.includeUnverified = args.include_unverified; } if (args.truncate_response !== undefined) { params.truncateResponse = args.truncate_response; } const response = await this.axiosInstance.get(`/breachedaccount/${encodeURIComponent(args.email)}`, { params }); if (!response.data || response.data.length === 0) { return { content: [ { type: "text", text: "Good news! This email address has not been found in any known data breaches.", }, ], }; } // Format the breach data for better readability const breaches = response.data; const breachCount = breaches.length; let summary = `⚠️ This email address was found in ${breachCount} data breach${breachCount > 1 ? 'es' : ''}.\n\n`; if (args.truncate_response) { // If truncated, just list the breach names summary += "Breaches: " + breaches.map((breach: any) => breach.Name).join(", "); } else { // Otherwise, provide detailed information summary += "Breach details:\n\n"; breaches.forEach((breach: any, index: number) => { summary += `${index + 1}. ${breach.Name} (${breach.BreachDate})\n`; summary += ` Domain: ${breach.Domain}\n`; summary += ` Description: ${breach.Description}\n`; summary += ` Compromised data: ${breach.DataClasses.join(", ")}\n`; if (index < breaches.length - 1) { summary += "\n"; } }); summary += "\nRecommendations:\n"; summary += "- Change your password for these services immediately\n"; summary += "- If you used the same password elsewhere, change those too\n"; summary += "- Enable two-factor authentication where available\n"; summary += "- Consider using a password manager"; } return { content: [ { type: "text", text: summary, }, ], }; } /** * Handle the check_password tool * Uses the k-anonymity model to check passwords without sending the full password */ private async handleCheckPassword(args: any) { if (!args.password || typeof args.password !== "string") { throw new McpError( ErrorCode.InvalidParams, "Password is required" ); } // Hash the password with SHA-1 const sha1Hash = crypto.createHash("sha1").update(args.password).digest("hex").toUpperCase(); // Get the first 5 characters (prefix) and the rest (suffix) const prefix = sha1Hash.substring(0, 5); const suffix = sha1Hash.substring(5); // Query the API with just the prefix (k-anonymity) const response = await axios.get(`https://api.pwnedpasswords.com/range/${prefix}`); // Parse the response to find if our suffix is in the list const hashes = response.data.split("\n"); let found = false; let occurrences = 0; for (const hash of hashes) { const [hashSuffix, count] = hash.split(":"); if (hashSuffix.trim() === suffix) { found = true; occurrences = parseInt(count.trim(), 10); break; } } if (found) { return { content: [ { type: "text", text: `⚠️ This password has been exposed in data breaches ${occurrences.toLocaleString()} times!\n\nRecommendations:\n- Stop using this password immediately\n- Change it on any site where you use it\n- Use a unique, strong password for each account\n- Consider using a password manager`, }, ], }; } else { return { content: [ { type: "text", text: "Good news! This password hasn't been found in any known data breaches. However, remember to use strong, unique passwords for each account and consider using a password manager.", }, ], }; } } /** * Handle the get_breach_details tool */ private async handleGetBreachDetails(args: any) { if (!args.breach_name || typeof args.breach_name !== "string") { throw new McpError( ErrorCode.InvalidParams, "Breach name is required" ); } const response = await this.axiosInstance.get(`/breach/${encodeURIComponent(args.breach_name)}`); if (!response.data) { return { content: [ { type: "text", text: `No information found for breach: ${args.breach_name}`, }, ], }; } const breach = response.data; // Format the breach data for better readability let details = `# ${breach.Name} Data Breach\n\n`; details += `**Date:** ${breach.BreachDate}\n`; details += `**Domain:** ${breach.Domain}\n`; details += `**Accounts affected:** ${breach.PwnCount.toLocaleString()}\n`; details += `**Verified:** ${breach.IsVerified ? "Yes" : "No"}\n`; details += `**Data leaked:** ${breach.DataClasses.join(", ")}\n\n`; details += `**Description:**\n${breach.Description}\n\n`; if (breach.IsFabricated) { details += "⚠️ Note: This breach has been flagged as potentially fabricated.\n\n"; } if (breach.IsSensitive) { details += "⚠️ Note: This breach contains sensitive information.\n\n"; } if (breach.IsRetired) { details += "ℹ️ Note: This breach has been retired from active display.\n\n"; } if (breach.IsSpamList) { details += "ℹ️ Note: This breach is from a spam list.\n\n"; } details += "**Recommendations:**\n"; details += "- If you had an account on this service, change your password\n"; details += "- If you used the same password elsewhere, change those too\n"; details += "- Monitor your accounts for suspicious activity\n"; details += "- Be cautious of phishing attempts that may use this leaked information"; return { content: [ { type: "text", text: details, }, ], }; } /** * Handle the list_all_breaches tool */ private async handleListAllBreaches(args: any) { const params: Record<string, any> = {}; if (args && args.domain) { params.domain = args.domain; } const response = await this.axiosInstance.get("/breaches", { params }); if (!response.data || response.data.length === 0) { return { content: [ { type: "text", text: args && args.domain ? `No breaches found for domain: ${args.domain}` : "No breaches found in the system.", }, ], }; } const breaches = response.data; // Format the breach list for better readability let summary = args && args.domain ? `Found ${breaches.length} breaches for domain ${args.domain}:\n\n` : `Found ${breaches.length} breaches in the system:\n\n`; // Sort breaches by date (newest first) breaches.sort((a: any, b: any) => { return new Date(b.BreachDate).getTime() - new Date(a.BreachDate).getTime(); }); breaches.forEach((breach: any, index: number) => { summary += `${index + 1}. ${breach.Name} (${breach.BreachDate})\n`; summary += ` Domain: ${breach.Domain}\n`; summary += ` Accounts affected: ${breach.PwnCount.toLocaleString()}\n`; summary += ` Compromised data: ${breach.DataClasses.join(", ")}\n`; if (index < breaches.length - 1) { summary += "\n"; } }); return { content: [ { type: "text", text: summary, }, ], }; } /** * Start the server */ async run() { const transport = new StdioServerTransport(); await this.server.connect(transport); console.error("Have I Been Pwned MCP server running on stdio"); } } // Create and start the server const server = new HibpServer(); server.run().catch(console.error);

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cyreslab-AI/hibp-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server