Skip to main content
Glama

adv_scan_file

Scan files for security vulnerabilities using Clean Architecture with Semgrep and LLM analysis to detect issues meeting specified severity thresholds.

Instructions

Scan a file for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis when LLM is configured.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pathYesPath to the file to scan
use_semgrepNoEnable Semgrep analysis
use_llmNoEnable LLM analysis
use_validationNoEnable LLM validation
severity_thresholdNoMinimum severity levelmedium
timeout_secondsNoScan timeout in seconds
languageNoProgramming language hint
output_formatNoOutput format for persisted scan resultsjson

Implementation Reference

  • The primary handler function for the 'adv_scan_file' MCP tool. Validates input using InputValidator, orchestrates scanning via ScanApplicationService.scan_file, persists results, formats output as JSON, and returns MCP TextContent.
    async def _handle_scan_file( self, name: str, arguments: dict ) -> list[types.TextContent]: """Handle file scanning requests.""" try: # Log MCP tool invocation at INFO level for visibility logger.info(f"MCP Tool Invoked: {name}") logger.info(f"Parameters: {arguments}") # Log raw arguments received logger.debug(f"MCP scan_file raw arguments: {arguments}") # Comprehensive input validation validated_args = self._input_validator.validate_mcp_arguments( arguments, tool_name="adv_scan_file" ) # Log validated arguments logger.debug(f"MCP scan_file validated arguments: {validated_args}") # Extract validated parameters path = validated_args.get("path", "") if not path: raise CleanAdversaryToolError("Path parameter is required") use_semgrep = validated_args.get("use_semgrep", True) use_llm = validated_args.get("use_llm", False) use_validation = validated_args.get("use_validation", False) # Log extracted parameters for debugging logger.debug( f"MCP scan_file parameters - use_semgrep: {use_semgrep}, use_llm: {use_llm}, use_validation: {use_validation}" ) severity_threshold = validated_args.get("severity_threshold", "medium") timeout_seconds = validated_args.get("timeout_seconds") language = validated_args.get("language") output_format = validated_args.get("output_format", "json") # Use the same scan service as CLI for consistency and proper orchestration result = await self._scan_service.scan_file( file_path=path, requester="cli", enable_semgrep=use_semgrep, enable_llm=use_llm, enable_validation=use_validation, severity_threshold=severity_threshold, timeout_seconds=timeout_seconds, language=language, ) # Persist scan result automatically try: output_format_enum = OutputFormat.from_string(output_format) file_path = await self._persistence_service.persist_scan_result( result, output_format_enum ) logger.info(f"Scan result persisted to {file_path}") except Exception as e: logger.warning(f"Failed to persist scan result: {e}") # Don't fail the scan if persistence fails # Format result for MCP response formatted_result = self._format_scan_result(result) # Add persistence info to the response formatted_result["persistence"] = { "output_format": output_format, "file_path": file_path if "file_path" in locals() else None, "persisted": "file_path" in locals(), } # Log successful completion with key metrics threat_count = ( len(result.threat_matches) if hasattr(result, "threat_matches") else 0 ) scan_duration = ( getattr(result.metadata, "scan_duration_seconds", 0) if hasattr(result, "metadata") else 0 ) logger.info( f"[+] MCP Tool Completed: {name} | Threats: {threat_count} | Duration: {scan_duration:.2f}s" ) return [ types.TextContent( type="text", text=json.dumps(formatted_result, indent=2, default=str), ) ] except (ValidationError, SecurityError, ConfigurationError) as e: logger.error(f"File scan failed: {e}") raise CleanAdversaryToolError(f"Scan failed: {str(e)}") except Exception as e: logger.error(f"Unexpected error in file scan: {e}") logger.error(traceback.format_exc()) raise CleanAdversaryToolError(f"Internal error: {str(e)}")
  • MCP Tool schema definition including inputSchema with properties like path (required), use_semgrep, use_llm, severity_threshold, etc., defining the expected input structure and defaults for the adv_scan_file tool.
    Tool( name="adv_scan_file", description="Scan a file for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis when LLM is configured.", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file to scan", }, "use_semgrep": { "type": "boolean", "description": "Enable Semgrep analysis", "default": True, }, "use_llm": { "type": "boolean", "description": "Enable LLM analysis", "default": False, }, "use_validation": { "type": "boolean", "description": "Enable LLM validation", "default": False, }, "severity_threshold": { "type": "string", "description": "Minimum severity level", "default": "medium", }, "timeout_seconds": { "type": "integer", "description": "Scan timeout in seconds", }, "language": { "type": "string", "description": "Programming language hint", }, "output_format": { "type": "string", "description": "Output format for persisted scan results", "enum": ["json", "md", "markdown", "csv"], "default": "json", }, }, "required": ["path"], }, ),
  • Registration of the tool dispatcher decorated with @self.server.call_tool(). The dispatcher routes 'adv_scan_file' calls to the specific _handle_scan_file handler.
    @self.server.call_tool() async def tool_dispatcher( name: str, arguments: dict ) -> list[types.TextContent]: """Dispatch MCP tool calls to the appropriate handler.""" if name == "adv_scan_file": return await self._handle_scan_file(name, arguments) elif name == "adv_scan_folder": return await self._handle_scan_folder(name, arguments) elif name == "adv_scan_code": return await self._handle_scan_code(name, arguments) elif name == "adv_get_status": return await self._handle_get_status(name, arguments) elif name == "adv_get_version": return await self._handle_get_version(name, arguments) elif name == "adv_mark_false_positive": return await self._handle_mark_false_positive(name, arguments) elif name == "adv_unmark_false_positive": return await self._handle_unmark_false_positive(name, arguments) else: raise ValueError(f"Unknown tool: {name}")
  • Input validation helper specifically for 'adv_scan_file' tool, ensuring the 'path' argument is a valid, secure file path using validate_file_path.
    elif tool_name in ("adv_scan_file",): # File scanning tools expect file paths validated[key] = str( InputValidator.validate_file_path(str(value)) )
  • Core scanning helper called by the handler. Coordinates domain services (ScanOrchestrator, ThreatAggregator) and adapters (Semgrep, LLM) to perform the actual file scan and return ScanResult.
    async def scan_file( self, file_path: str, *, requester: str = "application", enable_semgrep: bool = True, enable_llm: bool = False, enable_validation: bool = False, severity_threshold: str | None = None, timeout_seconds: int | None = None, language: str | None = None, ) -> ScanResult: """ Scan a single file for security vulnerabilities. Args: file_path: Path to the file to scan requester: Who requested the scan enable_semgrep: Whether to enable Semgrep scanning enable_llm: Whether to enable LLM analysis enable_validation: Whether to enable LLM validation severity_threshold: Minimum severity level to include timeout_seconds: Scan timeout in seconds language: Programming language hint Returns: ScanResult containing found threats and metadata Raises: ValidationError: If scan parameters are invalid SecurityError: If file access is restricted ConfigurationError: If no scanners are enabled """ # Create domain objects using the proper factory method context = ScanContext.for_file( file_path=file_path, requester=requester, language=language, timeout_seconds=timeout_seconds, enable_semgrep=enable_semgrep, enable_llm=enable_llm, enable_validation=enable_validation, ) severity_level = ( SeverityLevel.from_string(severity_threshold) if severity_threshold else None ) request = ScanRequest( context=context, enable_semgrep=enable_semgrep, enable_llm=enable_llm, enable_validation=enable_validation, severity_threshold=severity_level, ) # Validate request using domain service self._validation_service.validate_scan_request(request) self._validation_service.enforce_security_constraints(context) # Execute scan using domain orchestrator result = await self._scan_orchestrator.execute_scan(request) return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server