Skip to main content
Glama

StealthMole MCP Server

Official
by StealthMole
server.py•23.2 kB
""" StealthMole MCP Server Access threat intelligence from Deep & Dark Web through StealthMole API API Documentation: https://api.stealthmole.com """ import uuid import jwt import requests from datetime import datetime, timezone from typing import Optional, Dict, Any, List from mcp.server.fastmcp import Context, FastMCP from pydantic import BaseModel, Field from smithery.decorators import smithery # Configuration Schema class ConfigSchema(BaseModel): """StealthMole API configuration.""" access_key: str = Field(..., description="Your StealthMole API access key") secret_key: str = Field(..., description="Your StealthMole API secret key") # Helper Functions def generate_jwt_token(access_key: str, secret_key: str) -> str: """ Generate JWT authentication token for StealthMole API. Args: access_key: Issued access key secret_key: Issued secret key Returns: JWT token string """ payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'iat': int(datetime.now(timezone.utc).timestamp()) } return jwt.encode(payload, secret_key, algorithm='HS256') def make_api_request( endpoint: str, access_key: str, secret_key: str, method: str = 'GET', params: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Make authenticated request to StealthMole API. Args: endpoint: API endpoint path (e.g., '/v2/dt/search/keyword/targets') access_key: API access key secret_key: API secret key method: HTTP method (GET, POST, etc.) params: Query parameters data: Request body data Returns: API response as dictionary """ base_url = 'https://api.stealthmole.com' url = f"{base_url}{endpoint}" # Generate JWT token token = generate_jwt_token(access_key, secret_key) headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } # Make request response = requests.request( method=method, url=url, headers=headers, params=params, json=data, timeout=30 ) # Handle response response.raise_for_status() return response.json() @smithery.server(config_schema=ConfigSchema) def create_server(): """Create and configure the StealthMole MCP server.""" server = FastMCP("StealthMole API") # ========== Darkweb Tracker API ========== @server.tool() def dt_search_targets(indicator: str, ctx: Context) -> str: """ Get list of searchable targets for a Darkweb Tracker indicator. Args: indicator: Search indicator (e.g., keyword, email, domain, ip, etc.) Available indicators: adsense, analyticsid, band, bitcoin, blueprint, creditcard, cve, discord, document, domain, email, ethereum, exefile, facebook, filehosting, googledrive, gps, hash, hashstring, i2p, i2purl, id, image, instagram, ioc, ip, kakaotalk, keyword, kssn, leakedaudio, leakedemailfile, leakedvideo, line, linkedin, malware, monero, otherfile, pastebin, pgp, serverstatus, session, shorten, sshkey, sslkey, tel, telegram, tor, torurl, twitter, url Returns: JSON string with list of searchable targets and total count """ config = ctx.session_config result = make_api_request( f'/v2/dt/search/{indicator}/targets', config.access_key, config.secret_key ) return str(result) @server.tool() def dt_search_target( indicator: str, targets: str, text: str, limit: int = 100, order_type: str = "createDate", order: str = "desc", ctx: Context = None ) -> str: """ Search Darkweb Tracker for specific indicator and targets. Args: indicator: Search indicator type targets: Comma-separated list of targets to search text: Search query text (supports AND, OR, NOT operators) limit: Number of results (default: 100, max: 100) order_type: Sort by createDate or value order: asc or desc (default: desc) Returns: JSON string with search results """ config = ctx.session_config params = { 'targets': targets, 'text': text, 'limit': limit, 'orderType': order_type, 'order': order } result = make_api_request( f'/v2/dt/search/{indicator}/target', config.access_key, config.secret_key, params=params ) return str(result) @server.tool() def dt_search_all( indicator: str, text: str, limit: int = 100, order_type: str = "createDate", order: str = "desc", ctx: Context = None ) -> str: """ Search Darkweb Tracker across all targets for an indicator. Args: indicator: Search indicator type text: Search query text limit: Number of results (default: 100, max: 100) order_type: Sort by createDate or value order: asc or desc Returns: JSON string with search results from all targets """ config = ctx.session_config params = { 'text': text, 'limit': limit, 'orderType': order_type, 'order': order } result = make_api_request( f'/v2/dt/search/{indicator}/target/all', config.access_key, config.secret_key, params=params ) return str(result) @server.tool() def dt_search_by_id( search_id: str, limit: int = 100, cursor: int = 0, order_type: str = "createDate", order: str = "desc", ctx: Context = None ) -> str: """ Get paginated Darkweb Tracker search results by search ID. Args: search_id: Search result ID from previous search limit: Number of results (default: 100, max: 100) cursor: Pagination cursor (default: 0) order_type: Sort by createDate or value order: asc or desc Returns: JSON string with paginated results """ config = ctx.session_config params = { 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } result = make_api_request( f'/v2/dt/search/{search_id}', config.access_key, config.secret_key, params=params ) return str(result) @server.tool() def dt_get_node_details( node_id: str, parent_id: Optional[str] = None, data_from: bool = False, include_url: bool = False, include_contents: bool = True, ctx: Context = None ) -> str: """ Get detailed information for a Darkweb Tracker node. Args: node_id: Node ID from search results parent_id: Parent node ID (optional) data_from: Include data source list include_url: Include URL list include_contents: Include HTML source contents Returns: JSON string with detailed node information """ config = ctx.session_config params = { 'id': node_id, 'data_from': data_from, 'include_url': include_url, 'include_contents': include_contents } if parent_id: params['pid'] = parent_id result = make_api_request( '/v2/dt/node', config.access_key, config.secret_key, params=params ) return str(result) # ========== Telegram Tracker API ========== @server.tool() def tt_search_targets(indicator: str, ctx: Context) -> str: """ Get list of searchable targets for a Telegram Tracker indicator. Args: indicator: Search indicator (e.g., keyword, telegram.channel, telegram.user, etc.) Returns: JSON string with list of searchable targets """ config = ctx.session_config result = make_api_request( f'/v2/tt/search/{indicator}/targets', config.access_key, config.secret_key ) return str(result) @server.tool() def tt_search_target( indicator: str, targets: str, text: str, limit: int = 100, order_type: str = "createDate", order: str = "desc", ctx: Context = None ) -> str: """ Search Telegram Tracker for specific indicator and targets. Args: indicator: Search indicator type targets: Comma-separated list of targets text: Search query text limit: Number of results (max: 100) order_type: Sort by createDate or value order: asc or desc Returns: JSON string with search results """ config = ctx.session_config params = { 'targets': targets, 'text': text, 'limit': limit, 'orderType': order_type, 'order': order } result = make_api_request( f'/v2/tt/search/{indicator}/target', config.access_key, config.secret_key, params=params ) return str(result) @server.tool() def tt_get_node_details( node_id: str, parent_id: Optional[str] = None, data_from: bool = False, include_url: bool = False, include_contents: bool = True, ctx: Context = None ) -> str: """ Get detailed information for a Telegram Tracker node. Args: node_id: Node ID from search results parent_id: Parent node ID (optional) data_from: Include data source list include_url: Include URL list include_contents: Include HTML source contents Returns: JSON string with detailed node information """ config = ctx.session_config params = { 'id': node_id, 'data_from': data_from, 'include_url': include_url, 'include_contents': include_contents } if parent_id: params['pid'] = parent_id result = make_api_request( '/v2/tt/node', config.access_key, config.secret_key, params=params ) return str(result) # ========== Credential Lookout API ========== @server.tool() def cl_search( query: str, limit: int = 50, cursor: int = 0, order_type: str = "leakedDate", order: str = "desc", start: Optional[int] = None, end: Optional[int] = None, ctx: Context = None ) -> str: """ Search Credential Lookout for leaked credentials. Args: query: Search query with indicators (domain:, email:, id:, password:, after:, before:) limit: Number of results (default: 50, max: 50) cursor: Pagination cursor order_type: Sort by leakedDate, domain, email, password, or leakedFrom order: asc or desc start: Filter by UTC timestamp (data added after) end: Filter by UTC timestamp (data added before) Returns: JSON string with leaked credential results """ config = ctx.session_config params = { 'query': query, 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if start: params['start'] = start if end: params['end'] = end result = make_api_request( '/v2/cl/search', config.access_key, config.secret_key, params=params ) return str(result) # ========== Compromised Data Set API ========== @server.tool() def cds_search( query: str, limit: int = 50, cursor: int = 0, order_type: str = "leakedDate", order: str = "desc", start: Optional[int] = None, end: Optional[int] = None, ctx: Context = None ) -> str: """ Search Compromised Data Set for infected device leaks. Args: query: Search query with indicators (domain:, url:, email:, id:, password:, ip:, country:, after:, before:) limit: Number of results (default: 50, max: 50) cursor: Pagination cursor order_type: Sort by leakedDate, host, user, password, or regdate order: asc or desc start: Filter by UTC timestamp end: Filter by UTC timestamp Returns: JSON string with compromised data results """ config = ctx.session_config params = { 'query': query, 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if start: params['start'] = start if end: params['end'] = end result = make_api_request( '/v2/cds/search', config.access_key, config.secret_key, params=params ) return str(result) @server.tool() def cds_get_node_details(node_id: str, ctx: Context) -> str: """ Get detailed information for a Compromised Data Set node (Cyber Security Edition required). Args: node_id: Node ID from search results Returns: JSON string with detailed CDS information including stealer path and type """ config = ctx.session_config params = {'id': node_id} result = make_api_request( '/v2/cds/node', config.access_key, config.secret_key, params=params ) return str(result) # ========== Combo Binder API ========== @server.tool() def cb_search( query: str, limit: int = 50, cursor: int = 0, order_type: str = "leakedDate", order: str = "desc", start: Optional[int] = None, end: Optional[int] = None, ctx: Context = None ) -> str: """ Search Combo Binder for leaked ID/Password combos. Args: query: Search query with indicators (domain:, email:, id:, password:, after:, before:) limit: Number of results (max: 50) cursor: Pagination cursor order_type: Sort by leakedDate, user, or password order: asc or desc start: Filter by UTC timestamp end: Filter by UTC timestamp Returns: JSON string with combo leak results """ config = ctx.session_config params = { 'query': query, 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if start: params['start'] = start if end: params['end'] = end result = make_api_request( '/v2/cb/search', config.access_key, config.secret_key, params=params ) return str(result) # ========== ULP Binder API ========== @server.tool() def ub_search( query: str, limit: int = 50, cursor: int = 0, order_type: str = "leakedDate", order: str = "desc", start: Optional[int] = None, end: Optional[int] = None, ctx: Context = None ) -> str: """ Search ULP Binder for leaked accounts in URL-Login-Password format. Args: query: Search query with indicators (domain:, url:, email:, id:, password:, after:, before:) limit: Number of results (max: 50) cursor: Pagination cursor order_type: Sort by leakedDate, host, user, or password order: asc or desc start: Filter by UTC timestamp end: Filter by UTC timestamp Returns: JSON string with ULP leak results """ config = ctx.session_config params = { 'query': query, 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if start: params['start'] = start if end: params['end'] = end result = make_api_request( '/v2/ub/search', config.access_key, config.secret_key, params=params ) return str(result) # ========== Ransomware Monitoring API ========== @server.tool() def rm_search( query: Optional[str] = None, limit: int = 50, cursor: int = 0, order_type: str = "detectionTime", order: str = "desc", ctx: Context = None ) -> str: """ Search Ransomware Monitoring for ransomware breach incidents. Args: query: Search query with indicators (torurl:, domain:) or empty for recent list limit: Number of results (max: 50) cursor: Pagination cursor order_type: Sort by detectionTime, victim, or attackGroup order: asc or desc Returns: JSON string with ransomware incident results """ config = ctx.session_config params = { 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if query: params['query'] = query result = make_api_request( '/v2/rm/search', config.access_key, config.secret_key, params=params ) return str(result) # ========== Government Monitoring API ========== @server.tool() def gm_search( query: Optional[str] = None, limit: int = 50, cursor: int = 0, order_type: str = "detectionTime", order: str = "desc", ctx: Context = None ) -> str: """ Search Government Monitoring for threats against government and public sector. Args: query: Search query with indicators (url:, id:) or empty for recent list limit: Number of results (max: 50) cursor: Pagination cursor order_type: Sort by detectionTime, title, or author order: asc or desc Returns: JSON string with government threat results """ config = ctx.session_config params = { 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if query: params['query'] = query result = make_api_request( '/v2/gm/search', config.access_key, config.secret_key, params=params ) return str(result) # ========== Leaked Monitoring API ========== @server.tool() def lm_search( query: Optional[str] = None, limit: int = 50, cursor: int = 0, order_type: str = "detectionTime", order: str = "desc", ctx: Context = None ) -> str: """ Search Leaked Monitoring for threats against enterprise and private sector. Args: query: Search query with indicators (url:, id:) or empty for recent list limit: Number of results (max: 50) cursor: Pagination cursor order_type: Sort by detectionTime, title, or author order: asc or desc Returns: JSON string with leaked threat results """ config = ctx.session_config params = { 'limit': limit, 'cursor': cursor, 'orderType': order_type, 'order': order } if query: params['query'] = query result = make_api_request( '/v2/lm/search', config.access_key, config.secret_key, params=params ) return str(result) # ========== Management API ========== @server.tool() def get_user_quotas(ctx: Context) -> str: """ Get API query usage per service for the current month. Returns: JSON string with quota information (allowed and used queries) for each service """ config = ctx.session_config result = make_api_request( '/v2/user/quotas', config.access_key, config.secret_key ) return str(result) # ========== Resources ========== @server.resource("stealthmole://api-info") def api_info() -> str: """StealthMole API information and documentation.""" return """ StealthMole API v2.2 (November 2024) Base URL: https://api.stealthmole.com Available Services: 1. Darkweb Tracker (DT) - Search Deep & Dark web content 2. Telegram Tracker (TT) - Search Telegram channels, users, messages 3. Credential Lookout (CL) - Search leaked credentials 4. Compromised Data Set (CDS) - Search infected device leaks 5. Combo Binder (CB) - Search ID/Password combos 6. ULP Binder (UB) - Search URL-Login-Password leaks 7. Ransomware Monitoring (RM) - Monitor ransomware incidents 8. Government Monitoring (GM) - Monitor government sector threats 9. Leaked Monitoring (LM) - Monitor enterprise threats 10. Management API - Track API usage quotas Authentication: JWT tokens with HS256 signing Supported Operators: AND, OR, NOT (max 3 OR, max 5 total per query) For detailed documentation, visit: https://api.stealthmole.com """ @server.resource("stealthmole://indicators") def indicators_info() -> str: """Available search indicators for Darkweb Tracker.""" return """ Darkweb Tracker Indicators: Network & Infrastructure: - domain, ip, tor, torurl, i2p, i2purl, url Identifiers: - email, id, tel, kssn (Korean SSN) Financial: - bitcoin, ethereum, monero, creditcard Files: - document, exefile, image, otherfile, leakedaudio, leakedemailfile, leakedvideo - hash, hashstring, blueprint Social & Messaging: - facebook, twitter, instagram, linkedin, telegram, discord, kakaotalk, line, session Services & Data: - googledrive, filehosting, pastebin, shorten - pgp, sshkey, sslkey, serverstatus Security: - cve, ioc, malware Misc: - adsense, analyticsid, gps, keyword, band """ return server

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/StealthMole/stealthmole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server