opentip.py•7.6 kB
#!/usr/bin/env python3
#
# © 2025 AO Kaspersky Lab. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import os
import re
from enum import StrEnum
from typing import Any, Literal, Optional
import httpx
from mcp.server.fastmcp import FastMCP, Context
from mcp.types import ToolAnnotations
# Initialize FastMCP server
mcp = FastMCP("Kaspersky OpenTIP")
# Regex pattern for valid hash types (md5, sha1, sha256) and ips
hash_pattern = re.compile(r'^(0x)?(?:[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})$')
ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
# Constants
OPENTIP_API_BASE = "https://opentip.kaspersky.com/api/v1/"
OPENTIP_API_KEY = os.getenv("OPENTIP_API_KEY")
OPENTIP_API_TIMEOUT = float(os.getenv("OPENTIP_API_TIMEOUT", 30.0))
if OPENTIP_API_KEY is None:
    raise KeyError("Please, set OPENTIP_API_KEY evnironment variable.")
RequestType = Literal["get", "post"]
class Endpoints(StrEnum):
    search_hash = "search/hash"
    search_ip = "search/ip"
    search_domain = "search/domain"
    search_url = "search/url"
    analyze_file = "scan/file"
    get_analysis_results = "getresult/file"
async def opentip_request(
    endpoint: str,
    request_type: RequestType = "get",
    params: Optional[dict[str, Any]] = None,
    content: Optional[bytes] = None,
    headers: Optional[dict[str, str]] = None,
) -> dict[str, Any]:
    """Make a request to the OpenTIP API with proper error handling."""
    headers = headers or {}
    headers = {
        "user-agent": "opentip-mcp-client",
        "x-api-key": OPENTIP_API_KEY,
        **headers
    }
    async with httpx.AsyncClient() as client:
        try:
            url = f"{OPENTIP_API_BASE}{endpoint}"
            if request_type == "get":
                response = await client.get(
                    url, headers=headers, params=params, timeout=OPENTIP_API_TIMEOUT
                )
            elif request_type == "post":
                response = await client.post(
                    url, headers=headers, params=params, content=content, timeout=OPENTIP_API_TIMEOUT
                )
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 400:
                return {"result": "error", "error_message": "Invalid parameters. Please check your input and try again."}
            elif e.response.status_code == 401:
                return {"result": "error", "error_message": "Authentication failed. Please ensure that you have provided the correct credentials and try again."}
            elif e.response.status_code == 403:
                return {"result": "error", "error_message": "Quota or request limit exceeded. Check your quota and limits and try again."}
            else:
                return {"result": "error", "error_message": str(e)}
        except Exception as e:  # noqa
            return {"result": "error", "error_message": str(e)}
@mcp.tool(
    description="Get threat intelligence information about a file by hash (md5, sha1, sha256)",
    annotations=ToolAnnotations(
        title="Investigate a file by hash",
        readOnlyHint=True,
        openWorldHint=True,
    ),
)
async def search_hash(file_hash: str) -> dict[str, Any] | None:
    """Get threat intelligence information about a file by hash (md5, sha1, sha256)
    Args:
        file_hash: hash that you want to investigate
    """
    if not hash_pattern.match(file_hash):
        return {"result": "error", "error_message": "Invalid hash format. Please provide a valid md5, sha1, or sha256 hash."}
    params = {"request": file_hash}
    return await opentip_request(Endpoints.search_hash, "get", params)
@mcp.tool(
    description="Get threat intelligence data about a web domain",
    annotations=ToolAnnotations(
        title="Investigate a domain",
        readOnlyHint=True,
        openWorldHint=True,
    ),
)
async def search_domain(domain: str) -> dict[str, Any] | None:
    """Get threat intelligence data about a web domain
    Args:
        domain: domain that you want to investigate
    """
    params = {"request": domain}
    return await opentip_request(Endpoints.search_domain, "get", params)
@mcp.tool(
    description="Get threat intelligence data about an IP address",
    annotations=ToolAnnotations(
        title="Investigate an IP",
        readOnlyHint=True,
        openWorldHint=True,
    ),
)
async def search_ip(ip: str) -> dict[str, Any] | None:
    """Get threat intelligence data about an IP address
    Args:
        ip: IPv4 address that you want to investigate
    """
    if not ip_pattern.match(ip):
        return {"result": "error", "error_message": "Invalid IP address format. Please provide a valid IPv4 address."}
    params = {"request": ip}
    return await opentip_request(Endpoints.search_ip, "get", params)
@mcp.tool(
    description="Get threat intelligence data about a URL",
    annotations=ToolAnnotations(
        title="Investigate a URL",
        readOnlyHint=True,
        openWorldHint=True,
    ),
)
async def search_url(url: str) -> dict[str, Any] | None:
    """Get threat intelligence data about a URL
    Args:
        url: the web address that you want to investigate
    """
    params = {"request": url}
    return await opentip_request(Endpoints.search_url, "get", params)
@mcp.tool(
    description="Get full analysis results for a file that was submitted via the web portal.",
    annotations=ToolAnnotations(
        title="Get full analysis results for a file",
        readOnlyHint=True,
        openWorldHint=True,
    ),
)
async def get_full_analysis_result(file_hash: str) -> dict[str, Any] | None:
    """Get full analysis results for a file that was submitted via the web portal.
    Args:
        file_hash: The hash of the file that you want to get analysis results for.
    """
    params = {"request": file_hash}
    return await opentip_request(Endpoints.get_analysis_results, "post", params)
@mcp.tool(
    description="Submit a file for basic analysis using the OpenTIP API.",
    annotations=ToolAnnotations(
        title="Analyze a file by uploading it",
        readOnlyHint=False,
        openWorldHint=True,
    ),
)
async def analyze_file(filename: str, full_file_path: str) -> dict[str, Any] | None:
    """Submit a file for basic analysis using the OpenTIP API.
    Args:
        filename: The name of the file to analyze.
        full_file_path: The full path to the file on the local system.
    """
    params = {"filename": filename}
    headers = {
        "Content-Type": "application/octet-stream",
    }
    try:
        with open(full_file_path, "rb") as f:
            file_data = f.read()
        return await opentip_request(
            endpoint=Endpoints.analyze_file,
            request_type="post",
            params=params,
            content=file_data,
            headers=headers,
        )
    except Exception as e:  # noqa
        return {"result": "error", "error_message": str(e)}
if __name__ == "__main__":
    mcp.run(transport="stdio")