Skip to main content
Glama
4R9UN

FastMCP ThreatIntel

server.py4.63 kB
import asyncio import logging import os import sys import typer from fastmcp import Context, FastMCP from .settings import settings from .tools import analyze_iocs_from_file, process_single_ioc, register_tools # Create a logger logger = logging.getLogger("threatintel") def configure_logging(log_level: str = "INFO"): """Configure logging for the application.""" level = getattr(logging, log_level.upper()) logging.basicConfig( level=level, stream=sys.stderr, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger.setLevel(level) def create_server(): """Create and configure the MCP server with all tools registered.""" try: mcp = FastMCP("ThreatIntel") register_tools(mcp) logger.info("ThreatIntel MCP server created and tools registered successfully") return mcp except Exception as e: logger.error(f"Failed to create MCP server: {str(e)}") raise app = typer.Typer(help="ThreatIntel MCP Server - Analyze IOCs using threat intelligence services") def _override_api_keys( vt_api_key: str | None, otx_api_key: str | None, abuseipdb_api_key: str | None, ): """Override API keys from CLI options.""" if vt_api_key: os.environ["VIRUSTOTAL_API_KEY"] = vt_api_key settings.virustotal_api_key = vt_api_key if otx_api_key: os.environ["OTX_API_KEY"] = otx_api_key settings.otx_api_key = otx_api_key if abuseipdb_api_key: os.environ["ABUSEIPDB_API_KEY"] = abuseipdb_api_key settings.abuseipdb_api_key = abuseipdb_api_key @app.command() def run( log_level: str = typer.Option("INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR)"), vt_api_key: str | None = typer.Option(None, help="VirusTotal API key (overrides environment)"), otx_api_key: str | None = typer.Option(None, help="OTX API key (overrides environment)"), abuseipdb_api_key: str | None = typer.Option( None, help="AbuseIPDB API key (overrides environment)" ), ): """Run the ThreatIntel MCP server.""" configure_logging(log_level) _override_api_keys(vt_api_key, otx_api_key, abuseipdb_api_key) logger.info("Starting ThreatIntel MCP server") try: server = create_server() server.run() except Exception as e: logger.critical(f"Server failed to start: {str(e)}") sys.exit(1) @app.command() def batch( file_path: str = typer.Argument(..., help="Path to the file containing IOCs (one per line)"), log_level: str = typer.Option("INFO", help="Logging level"), vt_api_key: str | None = typer.Option(None, help="VirusTotal API key"), otx_api_key: str | None = typer.Option(None, help="OTX API key"), abuseipdb_api_key: str | None = typer.Option(None, help="AbuseIPDB API key"), ): """Analyze IOCs from a file in batch mode.""" configure_logging(log_level) _override_api_keys(vt_api_key, otx_api_key, abuseipdb_api_key) async def _run_batch(): # Create a mock context for standalone use mock_ctx = Context(server=None, tool_name="batch") await analyze_iocs_from_file(file_path, mock_ctx) asyncio.run(_run_batch()) @app.command() def interactive( log_level: str = typer.Option("INFO", help="Logging level"), vt_api_key: str | None = typer.Option(None, help="VirusTotal API key"), otx_api_key: str | None = typer.Option(None, help="OTX API key"), abuseipdb_api_key: str | None = typer.Option(None, help="AbuseIPDB API key"), ): """Start an interactive REPL session for IOC analysis.""" configure_logging(log_level) _override_api_keys(vt_api_key, otx_api_key, abuseipdb_api_key) async def _run_interactive(): mock_ctx = Context(server=None, tool_name="interactive") typer.echo("ThreatIntel Interactive Mode. Type 'exit' to quit.") while True: ioc_string = typer.prompt("Enter IOC to analyze") if ioc_string.lower() == "exit": break # Create a mock context for each call from .tools import create_ioc_table, get_ioc_type ioc_type = await get_ioc_type(ioc_string, mock_ctx) if ioc_type != "unknown": result = await process_single_ioc(ioc_string, ioc_type, mock_ctx) typer.echo(create_ioc_table([result])) else: typer.echo(f"Could not determine IOC type for: {ioc_string}") asyncio.run(_run_interactive()) def main(): """Entry point for the CLI.""" app() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/4R9UN/fastmcp-threatintel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server