Skip to main content
Glama

adv_scan_file

Scan files for security vulnerabilities using Clean Architecture with Semgrep and LLM analysis to detect issues meeting specified severity thresholds.

Instructions

Scan a file for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis when LLM is configured.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pathYesPath to the file to scan
use_semgrepNoEnable Semgrep analysis
use_llmNoEnable LLM analysis
use_validationNoEnable LLM validation
severity_thresholdNoMinimum severity levelmedium
timeout_secondsNoScan timeout in seconds
languageNoProgramming language hint
output_formatNoOutput format for persisted scan resultsjson

Implementation Reference

  • The primary handler function for the 'adv_scan_file' MCP tool. Validates input using InputValidator, orchestrates scanning via ScanApplicationService.scan_file, persists results, formats output as JSON, and returns MCP TextContent.
    async def _handle_scan_file(
        self, name: str, arguments: dict
    ) -> list[types.TextContent]:
        """Handle file scanning requests."""
        try:
            # Log MCP tool invocation at INFO level for visibility
            logger.info(f"MCP Tool Invoked: {name}")
            logger.info(f"Parameters: {arguments}")
    
            # Log raw arguments received
            logger.debug(f"MCP scan_file raw arguments: {arguments}")
    
            # Comprehensive input validation
            validated_args = self._input_validator.validate_mcp_arguments(
                arguments, tool_name="adv_scan_file"
            )
    
            # Log validated arguments
            logger.debug(f"MCP scan_file validated arguments: {validated_args}")
    
            # Extract validated parameters
            path = validated_args.get("path", "")
            if not path:
                raise CleanAdversaryToolError("Path parameter is required")
    
            use_semgrep = validated_args.get("use_semgrep", True)
            use_llm = validated_args.get("use_llm", False)
            use_validation = validated_args.get("use_validation", False)
    
            # Log extracted parameters for debugging
            logger.debug(
                f"MCP scan_file parameters - use_semgrep: {use_semgrep}, use_llm: {use_llm}, use_validation: {use_validation}"
            )
            severity_threshold = validated_args.get("severity_threshold", "medium")
            timeout_seconds = validated_args.get("timeout_seconds")
            language = validated_args.get("language")
            output_format = validated_args.get("output_format", "json")
    
            # Use the same scan service as CLI for consistency and proper orchestration
            result = await self._scan_service.scan_file(
                file_path=path,
                requester="cli",
                enable_semgrep=use_semgrep,
                enable_llm=use_llm,
                enable_validation=use_validation,
                severity_threshold=severity_threshold,
                timeout_seconds=timeout_seconds,
                language=language,
            )
    
            # Persist scan result automatically
            try:
                output_format_enum = OutputFormat.from_string(output_format)
                file_path = await self._persistence_service.persist_scan_result(
                    result, output_format_enum
                )
                logger.info(f"Scan result persisted to {file_path}")
            except Exception as e:
                logger.warning(f"Failed to persist scan result: {e}")
                # Don't fail the scan if persistence fails
    
            # Format result for MCP response
            formatted_result = self._format_scan_result(result)
    
            # Add persistence info to the response
            formatted_result["persistence"] = {
                "output_format": output_format,
                "file_path": file_path if "file_path" in locals() else None,
                "persisted": "file_path" in locals(),
            }
    
            # Log successful completion with key metrics
            threat_count = (
                len(result.threat_matches) if hasattr(result, "threat_matches") else 0
            )
            scan_duration = (
                getattr(result.metadata, "scan_duration_seconds", 0)
                if hasattr(result, "metadata")
                else 0
            )
            logger.info(
                f"[+] MCP Tool Completed: {name} | Threats: {threat_count} | Duration: {scan_duration:.2f}s"
            )
    
            return [
                types.TextContent(
                    type="text",
                    text=json.dumps(formatted_result, indent=2, default=str),
                )
            ]
    
        except (ValidationError, SecurityError, ConfigurationError) as e:
            logger.error(f"File scan failed: {e}")
            raise CleanAdversaryToolError(f"Scan failed: {str(e)}")
        except Exception as e:
            logger.error(f"Unexpected error in file scan: {e}")
            logger.error(traceback.format_exc())
            raise CleanAdversaryToolError(f"Internal error: {str(e)}")
  • MCP Tool schema definition including inputSchema with properties like path (required), use_semgrep, use_llm, severity_threshold, etc., defining the expected input structure and defaults for the adv_scan_file tool.
    Tool(
        name="adv_scan_file",
        description="Scan a file for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis when LLM is configured.",
        inputSchema={
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the file to scan",
                },
                "use_semgrep": {
                    "type": "boolean",
                    "description": "Enable Semgrep analysis",
                    "default": True,
                },
                "use_llm": {
                    "type": "boolean",
                    "description": "Enable LLM analysis",
                    "default": False,
                },
                "use_validation": {
                    "type": "boolean",
                    "description": "Enable LLM validation",
                    "default": False,
                },
                "severity_threshold": {
                    "type": "string",
                    "description": "Minimum severity level",
                    "default": "medium",
                },
                "timeout_seconds": {
                    "type": "integer",
                    "description": "Scan timeout in seconds",
                },
                "language": {
                    "type": "string",
                    "description": "Programming language hint",
                },
                "output_format": {
                    "type": "string",
                    "description": "Output format for persisted scan results",
                    "enum": ["json", "md", "markdown", "csv"],
                    "default": "json",
                },
            },
            "required": ["path"],
        },
    ),
  • Registration of the tool dispatcher decorated with @self.server.call_tool(). The dispatcher routes 'adv_scan_file' calls to the specific _handle_scan_file handler.
    @self.server.call_tool()
    async def tool_dispatcher(
        name: str, arguments: dict
    ) -> list[types.TextContent]:
        """Dispatch MCP tool calls to the appropriate handler."""
        if name == "adv_scan_file":
            return await self._handle_scan_file(name, arguments)
        elif name == "adv_scan_folder":
            return await self._handle_scan_folder(name, arguments)
        elif name == "adv_scan_code":
            return await self._handle_scan_code(name, arguments)
        elif name == "adv_get_status":
            return await self._handle_get_status(name, arguments)
        elif name == "adv_get_version":
            return await self._handle_get_version(name, arguments)
        elif name == "adv_mark_false_positive":
            return await self._handle_mark_false_positive(name, arguments)
        elif name == "adv_unmark_false_positive":
            return await self._handle_unmark_false_positive(name, arguments)
        else:
            raise ValueError(f"Unknown tool: {name}")
  • Input validation helper specifically for 'adv_scan_file' tool, ensuring the 'path' argument is a valid, secure file path using validate_file_path.
    elif tool_name in ("adv_scan_file",):
        # File scanning tools expect file paths
        validated[key] = str(
            InputValidator.validate_file_path(str(value))
        )
  • Core scanning helper called by the handler. Coordinates domain services (ScanOrchestrator, ThreatAggregator) and adapters (Semgrep, LLM) to perform the actual file scan and return ScanResult.
    async def scan_file(
        self,
        file_path: str,
        *,
        requester: str = "application",
        enable_semgrep: bool = True,
        enable_llm: bool = False,
        enable_validation: bool = False,
        severity_threshold: str | None = None,
        timeout_seconds: int | None = None,
        language: str | None = None,
    ) -> ScanResult:
        """
        Scan a single file for security vulnerabilities.
    
        Args:
            file_path: Path to the file to scan
            requester: Who requested the scan
            enable_semgrep: Whether to enable Semgrep scanning
            enable_llm: Whether to enable LLM analysis
            enable_validation: Whether to enable LLM validation
            severity_threshold: Minimum severity level to include
            timeout_seconds: Scan timeout in seconds
            language: Programming language hint
    
        Returns:
            ScanResult containing found threats and metadata
    
        Raises:
            ValidationError: If scan parameters are invalid
            SecurityError: If file access is restricted
            ConfigurationError: If no scanners are enabled
        """
        # Create domain objects using the proper factory method
        context = ScanContext.for_file(
            file_path=file_path,
            requester=requester,
            language=language,
            timeout_seconds=timeout_seconds,
            enable_semgrep=enable_semgrep,
            enable_llm=enable_llm,
            enable_validation=enable_validation,
        )
    
        severity_level = (
            SeverityLevel.from_string(severity_threshold)
            if severity_threshold
            else None
        )
    
        request = ScanRequest(
            context=context,
            enable_semgrep=enable_semgrep,
            enable_llm=enable_llm,
            enable_validation=enable_validation,
            severity_threshold=severity_level,
        )
    
        # Validate request using domain service
        self._validation_service.validate_scan_request(request)
        self._validation_service.enforce_security_constraints(context)
    
        # Execute scan using domain orchestrator
        result = await self._scan_orchestrator.execute_scan(request)
    
        return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server