Skip to main content
Glama
marlinkcyber

SpiderFoot MCP Server

by marlinkcyber
server.py14.3 kB
#!/usr/bin/env python3 """ SpiderFoot MCP Server A Model Context Protocol server that provides SpiderFoot scanning capabilities. Supports starting scans, monitoring progress, and retrieving results. """ import asyncio import json import logging import os from typing import Any, Dict, List, Optional from dotenv import load_dotenv from mcp.server.fastmcp.server import FastMCP from spiderfoot_client import SpiderFootClient, SpiderFootAPIError # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("SpiderFoot Scanner") # Global client instance spiderfoot_client: Optional[SpiderFootClient] = None # Scan tracking active_scans: Dict[str, Dict[str, Any]] = {} def get_client() -> SpiderFootClient: """Get or create SpiderFoot client instance.""" global spiderfoot_client if spiderfoot_client is None: url = os.getenv('SPIDERFOOT_URL', 'http://localhost:5001') username = os.getenv('SPIDERFOOT_USERNAME', 'admin') password = os.getenv('SPIDERFOOT_PASSWORD', '') if not password: raise ValueError("SPIDERFOOT_PASSWORD environment variable is required") spiderfoot_client = SpiderFootClient(url, username, password) # logger.info(f"Initialized SpiderFoot client for {url}") return spiderfoot_client @mcp.tool() def start_scan(target: str, scan_name: str, modules: Optional[List[str]] = None, use_case: str = "all") -> Dict[str, Any]: """ Start a new SpiderFoot scan. Args: target: Target to scan (domain, IP address, etc.) scan_name: Unique name for the scan modules: List of specific modules to use (optional, uses all if not specified) use_case: Scan use case - "all", "investigate", "passive", or "footprint" Returns: Dictionary with scan information including scan_id """ try: client = get_client() result = client.start_scan(target, scan_name, modules, use_case) # Extract scan ID from result scan_id = result.get('id') if not scan_id: raise SpiderFootAPIError("No scan ID returned from API") # Track the scan active_scans[scan_id] = { 'target': target, 'scan_name': scan_name, 'modules': modules or [], 'use_case': use_case, 'status': 'started', 'created_at': result.get('created', '') } return { 'success': True, 'scan_id': scan_id, 'message': f"Started scan '{scan_name}' for target '{target}'", 'details': result } except Exception as e: logger.error(f"Failed to start scan: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to start scan '{scan_name}'" } @mcp.tool() def get_scan_status(scan_id: str) -> Dict[str, Any]: """ Get the current status of a scan. Args: scan_id: The scan ID to check Returns: Dictionary with scan status information """ try: client = get_client() status = client.get_scan_status(scan_id) # Update tracked scan info if scan_id in active_scans: active_scans[scan_id]['status'] = status.get('status', 'unknown') return { 'success': True, 'scan_id': scan_id, 'status': status } except Exception as e: logger.error(f"Failed to get scan status: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to get status for scan {scan_id}" } @mcp.tool() def list_scans() -> Dict[str, Any]: """ List all scans on the SpiderFoot server. Returns: Dictionary with list of all scans """ try: client = get_client() scans = client.get_scan_list() return { 'success': True, 'scans': scans, 'count': len(scans) } except Exception as e: logger.error(f"Failed to list scans: {e}") return { 'success': False, 'error': str(e), 'message': "Failed to retrieve scan list" } @mcp.tool() def stop_scan(scan_id: str) -> Dict[str, Any]: """ Stop a running scan. Args: scan_id: The scan ID to stop Returns: Dictionary with operation result """ try: client = get_client() result = client.stop_scan(scan_id) # Update tracked scan status if scan_id in active_scans: active_scans[scan_id]['status'] = 'stopped' return { 'success': True, 'scan_id': scan_id, 'message': f"Stopped scan {scan_id}", 'details': result } except Exception as e: logger.error(f"Failed to stop scan: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to stop scan {scan_id}" } @mcp.tool() def delete_scan(scan_id: str) -> Dict[str, Any]: """ Delete a scan and all its data. Args: scan_id: The scan ID to delete Returns: Dictionary with operation result """ try: client = get_client() result = client.delete_scan(scan_id) # Remove from tracked scans if scan_id in active_scans: del active_scans[scan_id] return { 'success': True, 'scan_id': scan_id, 'message': f"Deleted scan {scan_id}", 'details': result } except Exception as e: logger.error(f"Failed to delete scan: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to delete scan {scan_id}" } @mcp.tool() def get_scan_results(scan_id: str, event_type: Optional[str] = None) -> Dict[str, Any]: """ Get results from a completed or running scan. Args: scan_id: The scan ID to get results for event_type: Optional filter for specific event types Returns: Dictionary with scan results """ try: client = get_client() results = client.get_scan_results(scan_id, event_type) return { 'success': True, 'scan_id': scan_id, 'results': results, 'count': len(results), 'event_type_filter': event_type } except Exception as e: logger.error(f"Failed to get scan results: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to get results for scan {scan_id}" } @mcp.tool() def get_scan_summary(scan_id: str, by: str = "module") -> Dict[str, Any]: """ Get a summary of scan results. Args: scan_id: The scan ID to get summary for by: Summary filter type ("module", "type", etc.) Returns: Dictionary with scan summary """ try: client = get_client() summary = client.get_scan_summary(scan_id, by) return { 'success': True, 'scan_id': scan_id, 'summary': summary } except Exception as e: logger.error(f"Failed to get scan summary: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to get summary for scan {scan_id}" } @mcp.tool() def wait_for_scan_completion(scan_id: str, poll_interval: int = 5, timeout: Optional[int] = None) -> Dict[str, Any]: """ Wait for a scan to complete and return final status. Args: scan_id: The scan ID to monitor poll_interval: Seconds between status checks (default: 5) timeout: Maximum seconds to wait (default: no timeout) Returns: Dictionary with final scan status """ try: client = get_client() final_status = client.wait_for_completion(scan_id, poll_interval, timeout) # Update tracked scan status if scan_id in active_scans: active_scans[scan_id]['status'] = final_status.get('status', 'unknown') return { 'success': True, 'scan_id': scan_id, 'message': f"Scan {scan_id} completed", 'final_status': final_status } except Exception as e: logger.error(f"Failed to wait for scan completion: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to wait for scan {scan_id} completion" } @mcp.tool() def export_scan_results(scan_id: str, export_format: str = 'json') -> Dict[str, Any]: """ Export scan results in specified format. Args: scan_id: The scan ID to export export_format: Export format (json, csv, etc.) Returns: Dictionary with exported data """ try: client = get_client() exported_data = client.export_scan_results(scan_id, export_format) return { 'success': True, 'scan_id': scan_id, 'format': export_format, 'data': exported_data } except Exception as e: logger.error(f"Failed to export scan results: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to export results for scan {scan_id}" } @mcp.tool() def get_available_modules() -> Dict[str, Any]: """ Get list of available SpiderFoot modules. Returns: Dictionary with available modules """ try: client = get_client() modules = client.get_modules() return { 'success': True, 'modules': modules } except Exception as e: logger.error(f"Failed to get modules: {e}") return { 'success': False, 'error': str(e), 'message': "Failed to retrieve available modules" } @mcp.tool() def search_scan_results(query: str, scan_id: Optional[str] = None) -> Dict[str, Any]: """ Search across scan results. Args: query: Search query string scan_id: Optional scan ID to limit search to specific scan Returns: Dictionary with search results """ try: client = get_client() results = client.search_results(query, scan_id) return { 'success': True, 'query': query, 'scan_id': scan_id, 'results': results, 'count': len(results) } except Exception as e: logger.error(f"Failed to search results: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to search for '{query}'" } @mcp.tool() def get_scan_log(scan_id: str, limit: Optional[int] = None, from_rowid: Optional[int] = None) -> Dict[str, Any]: """ Get log entries for a scan. Args: scan_id: The scan ID to get logs for limit: Maximum number of log entries to return from_rowid: Start from specific row ID for pagination Returns: Dictionary with log entries """ try: client = get_client() log_entries = client.get_scan_log(scan_id, limit, from_rowid) return { 'success': True, 'scan_id': scan_id, 'log_entries': log_entries, 'count': len(log_entries), 'limit': limit, 'from_rowid': from_rowid } except Exception as e: logger.error(f"Failed to get scan log: {e}") return { 'success': False, 'error': str(e), 'message': f"Failed to get log for scan {scan_id}" } @mcp.tool() def get_active_scans_summary() -> Dict[str, Any]: """ Get summary of scans being tracked by this MCP server. Returns: Dictionary with active scans summary """ return { 'success': True, 'active_scans': active_scans, 'count': len(active_scans) } @mcp.tool() def ping() -> Dict[str, Any]: """ Test connectivity to the SpiderFoot server. Returns: Dictionary with server connectivity status and version information """ try: client = get_client() ping_result = client.ping() return { 'success': True, 'status': ping_result.get('status'), 'server_version': ping_result.get('server_version'), 'message': ping_result.get('message', 'Server ping successful'), 'server_url': client.base_url } except Exception as e: logger.error(f"Failed to ping server: {e}") return { 'success': False, 'error': str(e), 'message': "Failed to ping SpiderFoot server" } def main(): """Main entry point for the MCP server.""" import sys # Validate environment variables required_env_vars = ['SPIDERFOOT_URL', 'SPIDERFOOT_USERNAME', 'SPIDERFOOT_PASSWORD'] missing_vars = [var for var in required_env_vars if not os.getenv(var)] if missing_vars: logger.error(f"Missing required environment variables: {', '.join(missing_vars)}") logger.error("Please set these variables or create a .env file") sys.exit(1) # Test connection try: client = get_client() scans = client.get_scan_list() except Exception as e: logger.error(f"Failed to connect to SpiderFoot: {e}") sys.exit(1) # Run the server mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marlinkcyber/spiderfoot-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server