search_domain
Check web domains for security threats using Kaspersky's threat intelligence database to identify malicious activity.
Instructions
Get threat intelligence data about a web domain
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| domain | Yes |
Implementation Reference
- opentip-mcp/opentip.py:116-132 (handler)The handler function for the 'search_domain' MCP tool, including registration via @mcp.tool decorator. It calls the OpenTIP API to search for threat intelligence on a given domain.@mcp.tool( description="Get threat intelligence data about a web domain", annotations=ToolAnnotations( title="Investigate a domain", readOnlyHint=True, openWorldHint=True, ), ) async def search_domain(domain: str) -> dict[str, Any] | None: """Get threat intelligence data about a web domain Args: domain: domain that you want to investigate """ params = {"request": domain} return await opentip_request(Endpoints.search_domain, "get", params)
- opentip-mcp/opentip.py:44-51 (helper)Enum defining API endpoints used by the search_domain tool.class Endpoints(StrEnum): search_hash = "search/hash" search_ip = "search/ip" search_domain = "search/domain" search_url = "search/url" analyze_file = "scan/file" get_analysis_results = "getresult/file"
- opentip-mcp/opentip.py:53-92 (helper)Helper function that performs the actual API request for search_domain and other tools.async def opentip_request( endpoint: str, request_type: RequestType = "get", params: Optional[dict[str, Any]] = None, content: Optional[bytes] = None, headers: Optional[dict[str, str]] = None, ) -> dict[str, Any]: """Make a request to the OpenTIP API with proper error handling.""" headers = headers or {} headers = { "user-agent": "opentip-mcp-client", "x-api-key": OPENTIP_API_KEY, **headers } async with httpx.AsyncClient() as client: try: url = f"{OPENTIP_API_BASE}{endpoint}" if request_type == "get": response = await client.get( url, headers=headers, params=params, timeout=OPENTIP_API_TIMEOUT ) elif request_type == "post": response = await client.post( url, headers=headers, params=params, content=content, timeout=OPENTIP_API_TIMEOUT ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 400: return {"result": "error", "error_message": "Invalid parameters. Please check your input and try again."} elif e.response.status_code == 401: return {"result": "error", "error_message": "Authentication failed. Please ensure that you have provided the correct credentials and try again."} elif e.response.status_code == 403: return {"result": "error", "error_message": "Quota or request limit exceeded. Check your quota and limits and try again."} else: return {"result": "error", "error_message": str(e)} except Exception as e: # noqa return {"result": "error", "error_message": str(e)}