Skip to main content
Glama
by pangeacyber
ip-intel.ts6.91 kB
import type { FastMCP } from 'fastmcp'; import { IPIntelService, PangeaConfig } from 'pangea-node-sdk'; import { z } from 'zod'; import { aiGuard } from '../guard.js'; import type { FastMCPSessionAuth, ServerContext } from '../types.js'; export function registerIpIntelTools< T extends FastMCPSessionAuth = FastMCPSessionAuth, >({ server, context }: { server: FastMCP<T>; context: ServerContext }) { const lookupIpAddressReputationParameters = z.object({ ipAddresses: z .array(z.string().ip()) .min(1) .max(100) .describe('The IP addresses to be looked up'), }); server.addTool({ name: 'lookup_ip_address_reputation', description: 'Look up reputation score(s) for one or more IP addresses.', parameters: lookupIpAddressReputationParameters, execute: aiGuard<T, typeof lookupIpAddressReputationParameters>( context, async ({ ipAddresses }) => { const ipIntel = new IPIntelService( context.apiToken, new PangeaConfig({ domain: 'aws.us.pangea.cloud' }) ); const response = await ipIntel.reputationBulk(ipAddresses); if (!response.success) { return { content: [ { type: 'text', text: 'Failed to retrieve reputation data', }, ], }; } const formattedReputation = Object.entries(response.result.data).map( ([ipAddress, reputation]) => `${ipAddress}: ${reputation.verdict} (score ${reputation.score}) (categories ${reputation.category.join(', ')})` ); return { content: [ { type: 'text', text: `Reputation data:\n\n${formattedReputation.join('\n')}`, }, ], }; } ), }); const lookupDomainFromIpAddressParameters = z.object({ ipAddresses: z .array(z.string().ip()) .min(1) .max(100) .describe('The IP addresses to be looked up'), }); server.addTool({ name: 'lookup_domain_from_ip_address', description: 'Retrieve the domain name associated with one or more IP addresses.', parameters: lookupDomainFromIpAddressParameters, execute: aiGuard<T, typeof lookupDomainFromIpAddressParameters>( context, async ({ ipAddresses }) => { const ipIntel = new IPIntelService( context.apiToken, new PangeaConfig({ domain: 'aws.us.pangea.cloud' }) ); const response = await ipIntel.getDomainBulk(ipAddresses); if (!response.success) { return { content: [ { type: 'text', text: 'Failed to retrieve domains', }, ], }; } const formattedDomains = Object.entries(response.result.data).map( ([ipAddress, result]) => `${ipAddress}: ${result.domain_found ? result.domain : 'No domain found'}` ); return { content: [ { type: 'text', text: `Domain data:\n\n${formattedDomains.join('\n')}`, }, ], }; } ), }); const isProxyParameters = z.object({ ipAddresses: z .array(z.string().ip()) .min(1) .max(100) .describe('The IP addresses to be looked up'), }); server.addTool({ name: 'is_proxy', description: 'Determine if one or more IP addresses originate from a proxy.', parameters: isProxyParameters, execute: aiGuard<T, typeof isProxyParameters>( context, async ({ ipAddresses }) => { const ipIntel = new IPIntelService( context.apiToken, new PangeaConfig({ domain: 'aws.us.pangea.cloud' }) ); const response = await ipIntel.isProxyBulk(ipAddresses); if (!response.success) { return { content: [ { type: 'text', text: 'Failed to retrieve domains', }, ], }; } const formattedResults = Object.entries(response.result.data).map( ([ipAddress, result]) => `${ipAddress}: ${result.is_proxy ? 'Is a proxy.' : 'Is not a proxy.'}` ); return { content: [ { type: 'text', text: `Proxy data:\n\n${formattedResults.join('\n')}`, }, ], }; } ), }); const isVpnParameters = z.object({ ipAddresses: z .array(z.string().ip()) .min(1) .max(100) .describe('The IP addresses to be looked up'), }); server.addTool({ name: 'is_vpn', description: 'Determine if one or more IP addresses originate from a VPN.', parameters: isVpnParameters, execute: aiGuard<T, typeof isVpnParameters>( context, async ({ ipAddresses }) => { const ipIntel = new IPIntelService( context.apiToken, new PangeaConfig({ domain: 'aws.us.pangea.cloud' }) ); const response = await ipIntel.isVPNBulk(ipAddresses); if (!response.success) { return { content: [ { type: 'text', text: 'Failed to retrieve domains', }, ], }; } const formattedResults = Object.entries(response.result.data).map( ([ipAddress, result]) => `${ipAddress}: ${result.is_vpn ? 'Is a VPN.' : 'Is not a VPN.'}` ); return { content: [ { type: 'text', text: `VPN data:\n\n${formattedResults.join('\n')}`, }, ], }; } ), }); const geolocateParameters = z.object({ ipAddresses: z .array(z.string().ip()) .min(1) .max(100) .describe('The IP addresses to be looked up'), }); server.addTool({ name: 'geolocate', description: 'Geolocate, or retrieve location information associated with, one or more IP addresses.', parameters: geolocateParameters, execute: aiGuard<T, typeof geolocateParameters>( context, async ({ ipAddresses }) => { const ipIntel = new IPIntelService( context.apiToken, new PangeaConfig({ domain: 'aws.us.pangea.cloud' }) ); const response = await ipIntel.geolocateBulk(ipAddresses); if (!response.success) { return { content: [ { type: 'text', text: 'Failed to retrieve domains', }, ], }; } return { content: [ { type: 'text', text: JSON.stringify(response.result.data, null, 2), }, ], }; } ), }); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pangeacyber/pangea-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server