Skip to main content
Glama
scan_tools.py8.01 kB
"""YARA scanning tools for Claude MCP integration. This module provides tools for scanning files and URLs with YARA rules. It uses direct function calls with proper error handling. """ import base64 import logging from json import JSONDecodeError from typing import Any, Dict, List, Optional from yaraflux_mcp_server.mcp_tools.base import register_tool from yaraflux_mcp_server.storage import get_storage_client from yaraflux_mcp_server.yara_service import YaraError, yara_service # Configure logging logger = logging.getLogger(__name__) @register_tool() def scan_url( url: str, rule_names: Optional[List[str]] = None, sources: Optional[List[str]] = None, timeout: Optional[int] = None ) -> Dict[str, Any]: """Scan a file from a URL with YARA rules. This function downloads and scans a file from the provided URL using YARA rules. It's particularly useful for scanning potentially malicious files without storing them locally on the user's machine. For LLM users connecting through MCP, this can be invoked with natural language like: "Can you scan this URL for malware: https://example.com/suspicious-file.exe" "Analyze https://example.com/document.pdf for malicious patterns" "Check if the file at this URL contains known threats: https://example.com/sample.exe" Args: url: URL of the file to scan rule_names: Optional list of rule names to match (if None, match all) sources: Optional list of sources to match rules from (if None, match all) timeout: Optional timeout in seconds (if None, use default) Returns: Scan result containing file details, scan status, and any matches found """ try: # Fetch and scan the file result = yara_service.fetch_and_scan(url=url, rule_names=rule_names, sources=sources, timeout=timeout) return { "success": True, "scan_id": str(result.scan_id), "file_name": result.file_name, "file_size": result.file_size, "file_hash": result.file_hash, "scan_time": result.scan_time, "timeout_reached": result.timeout_reached, "matches": [match.model_dump() for match in result.matches], "match_count": len(result.matches), } except YaraError as e: logger.error(f"Error scanning URL {url}: {str(e)}") return {"success": False, "message": str(e), "error_type": "YaraError"} except Exception as e: logger.error(f"Unexpected error scanning URL {url}: {str(e)}") return {"success": False, "message": f"Unexpected error: {str(e)}"} @register_tool() def scan_data( data: str, filename: str, *, encoding: str = "base64", rule_names: Optional[List[str]] = None, sources: Optional[List[str]] = None, timeout: Optional[int] = None, ) -> Dict[str, Any]: """Scan in-memory data with YARA rules. This function scans provided binary or text data using YARA rules. It supports both base64-encoded data and plain text, making it versatile for various sources of potentially malicious content. For LLM users connecting through MCP, this can be invoked with natural language like: "Scan this base64 data: SGVsbG8gV29ybGQ=" "Can you check if this text contains malicious patterns: eval(atob('ZXZhbChwcm9tcHQoKSk7'))" "Analyze this string for malware signatures: document.write(unescape('%3C%73%63%72%69%70%74%3E'))" Args: data: Data to scan (base64-encoded by default) filename: Name of the file for reference encoding: Encoding of the data ("base64" or "text") rule_names: Optional list of rule names to match (if None, match all) sources: Optional list of sources to match rules from (if None, match all) timeout: Optional timeout in seconds (if None, use default) Returns: Scan result containing match details and file metadata """ try: # Validate parameters if not filename: raise ValueError("Filename cannot be empty") if not data: raise ValueError("Empty data") # Validate encoding if encoding not in ["base64", "text"]: raise ValueError(f"Unsupported encoding: {encoding}") # Decode the data if encoding == "base64": # Validate base64 format before attempting to decode # Check if the data contains valid base64 characters (allowing for padding) import re # pylint: disable=import-outside-toplevel if not re.match(r"^[A-Za-z0-9+/]*={0,2}$", data): raise ValueError("Invalid base64 format") try: decoded_data = base64.b64decode(data) except Exception as e: raise ValueError(f"Invalid base64 data: {str(e)}") from e else: # encoding == "text" decoded_data = data.encode("utf-8") # Scan the data result = yara_service.match_data( data=decoded_data, file_name=filename, rule_names=rule_names, sources=sources, timeout=timeout ) return { "success": True, "scan_id": str(result.scan_id), "file_name": result.file_name, "file_size": result.file_size, "file_hash": result.file_hash, "scan_time": result.scan_time, "timeout_reached": result.timeout_reached, "matches": [match.model_dump() for match in result.matches], "match_count": len(result.matches), } except YaraError as e: logger.error(f"Error scanning data: {str(e)}") return {"success": False, "message": str(e), "error_type": "YaraError"} except ValueError as e: logger.error(f"Value error in scan_data: {str(e)}") return {"success": False, "message": str(e), "error_type": "ValueError"} except Exception as e: logger.error(f"Unexpected error scanning data: {str(e)}") return {"success": False, "message": f"Unexpected error: {str(e)}"} @register_tool() def get_scan_result(scan_id: str) -> Dict[str, Any]: """Get a scan result by ID. This function retrieves previously saved scan results using their unique ID. It allows users to access historical scan data and analyze matches without rescanning the content. For LLM users connecting through MCP, this can be invoked with natural language like: "Show me the results from scan abc123" "Retrieve the details for scan ID xyz789" "What were the findings from my previous scan?" Args: scan_id: ID of the scan result Returns: Complete scan result including file metadata and any matches found """ try: # Validate scan_id if not scan_id: raise ValueError("Scan ID cannot be empty") # Get the result from storage storage = get_storage_client() result_data = storage.get_result(scan_id) # Validate result_data is valid JSON if isinstance(result_data, str): try: # Try to parse as JSON if it's a string import json # pylint: disable=import-outside-toplevel result_data = json.loads(result_data) except ImportError as e: raise ImportError(f"Error loading JSON module: {str(e)}") from e except JSONDecodeError as e: raise ValueError(f"Invalid JSON data: {str(e)}") from e except ValueError as e: raise ValueError(f"Invalid JSON data: {str(e)}") from e return {"success": True, "result": result_data} except ValueError as e: logger.error(f"Value error in get_scan_result: {str(e)}") return {"success": False, "message": str(e)} except Exception as e: # pylint: disable=broad-except logger.error(f"Error getting scan result {scan_id}: {str(e)}") return {"success": False, "message": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'

If you have feedback or need assistance with the MCP directory API, please join our Discord server