Skip to main content
Glama

adv_scan_code

Scan source code for security vulnerabilities using customizable severity thresholds, exploit examples, and validation techniques. Results are saved in JSON or Markdown formats for detailed analysis.

Instructions

Scan source code for security vulnerabilities. Results are saved as .adversary.json or .adversary.md in the specified directory.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contentYesSource code content to scan
include_exploitsNoWhether to include exploit examples
output_formatNoOutput format for results (json or markdown)json
pathNoDirectory path where results should be saved.
severity_thresholdNoMinimum severity threshold (low, medium, high, critical)medium
use_llmNoWhether to include LLM analysis prompts (for use with your client's LLM)
use_semgrepNoWhether to include Semgrep analysis
use_validationNoWhether to use LLM validation to filter false positives

Implementation Reference

  • The primary handler function for the 'adv_scan_code' MCP tool. Validates input parameters, determines whether to use session-aware analysis or standard scanning, executes the scan via ScanApplicationService.scan_code or session analysis, persists results, formats the output as JSON, and returns TextContent response.
    async def _handle_scan_code( self, name: str, arguments: dict ) -> list[types.TextContent]: """Handle code scanning requests.""" try: # Log MCP tool invocation at INFO level for visibility logger.info(f"MCP Tool Invoked: {name}") logger.info(f"Parameters: {arguments}") # Comprehensive input validation validated_args = self._input_validator.validate_mcp_arguments( arguments, tool_name="adv_scan_code" ) content = validated_args.get("content", "") language = validated_args.get("language", "") if not content: raise CleanAdversaryToolError("Content parameter is required") if not language: raise CleanAdversaryToolError("Language parameter is required") use_semgrep = validated_args.get("use_semgrep", True) use_llm = validated_args.get( "use_llm", True ) # Default to true for code analysis use_validation = validated_args.get("use_validation", False) severity_threshold = validated_args.get("severity_threshold", "medium") output_format = validated_args.get("output_format", "json") # Auto-detect project context from current working directory project_path = Path.cwd() # Try to use session-aware analysis when available if self._session_manager and (use_llm or use_validation): # Auto-warm cache if this looks like a new project if not self._session_manager.session_cache.get_cached_project_context( project_path ): self._session_manager.warm_project_cache(project_path) # Use session-aware code analysis with auto-detected project context result = await self._handle_session_code_analysis( content=content, language=language, project_path=str(project_path), use_semgrep=use_semgrep, use_llm=use_llm, use_validation=use_validation, severity_threshold=severity_threshold, output_format=output_format, ) else: # Fall back to standard code scan result = await self._scan_service.scan_code( code_content=content, language=language, requester="mcp_client", enable_semgrep=use_semgrep, enable_llm=use_llm, enable_validation=use_validation, severity_threshold=severity_threshold, ) # Persist scan result automatically try: output_format_enum = OutputFormat.from_string(output_format) file_path = await self._persistence_service.persist_scan_result( result, output_format_enum ) logger.info(f"Scan result persisted to {file_path}") except Exception as e: logger.warning(f"Failed to persist scan result: {e}") # Don't fail the scan if persistence fails formatted_result = self._format_scan_result(result) # Add persistence info to the response formatted_result["persistence"] = { "output_format": output_format, "file_path": file_path if "file_path" in locals() else None, "persisted": "file_path" in locals(), } # Log successful completion with key metrics threat_count = ( len(result.threat_matches) if hasattr(result, "threat_matches") else 0 ) scan_duration = ( getattr(result.metadata, "scan_duration_seconds", 0) if hasattr(result, "metadata") else 0 ) code_length = len(content) if content else 0 logger.info( f"[+] MCP Tool Completed: {name} | Threats: {threat_count} | Code: {code_length} chars | Duration: {scan_duration:.2f}s" ) return [ types.TextContent( type="text", text=json.dumps(formatted_result, indent=2, default=str), ) ] except (ValidationError, SecurityError, ConfigurationError) as e: logger.error(f"Code scan failed: {e}") raise CleanAdversaryToolError(f"Scan failed: {str(e)}") except Exception as e: logger.error(f"Unexpected error in code scan: {e}") logger.error(traceback.format_exc()) raise CleanAdversaryToolError(f"Internal error: {str(e)}")
  • Input schema and Tool definition for 'adv_scan_code' returned by get_tools(), defining parameters like content, language, scanner options, and output format.
    Tool( name="adv_scan_code", description="Scan code content for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis with project context when available.", inputSchema={ "type": "object", "properties": { "content": { "type": "string", "description": "Source code to analyze", }, "language": { "type": "string", "description": "Programming language of the code", }, "use_semgrep": { "type": "boolean", "description": "Enable Semgrep analysis", "default": True, }, "use_llm": { "type": "boolean", "description": "Enable LLM analysis", "default": True, }, "use_validation": { "type": "boolean", "description": "Enable LLM validation", "default": False, }, "severity_threshold": { "type": "string", "description": "Minimum severity level", "default": "medium", }, "output_format": { "type": "string", "description": "Output format for persisted scan results", "enum": ["json", "md", "markdown", "csv"], "default": "json", }, }, "required": ["content", "language"], }, ),
  • Registration of the tool dispatcher in _register_tools() that routes 'adv_scan_code' calls to the _handle_scan_code handler function.
    @self.server.call_tool() async def tool_dispatcher( name: str, arguments: dict ) -> list[types.TextContent]: """Dispatch MCP tool calls to the appropriate handler.""" if name == "adv_scan_file": return await self._handle_scan_file(name, arguments) elif name == "adv_scan_folder": return await self._handle_scan_folder(name, arguments) elif name == "adv_scan_code": return await self._handle_scan_code(name, arguments) elif name == "adv_get_status": return await self._handle_get_status(name, arguments) elif name == "adv_get_version": return await self._handle_get_version(name, arguments) elif name == "adv_mark_false_positive": return await self._handle_mark_false_positive(name, arguments) elif name == "adv_unmark_false_positive": return await self._handle_unmark_false_positive(name, arguments) else: raise ValueError(f"Unknown tool: {name}")
  • Application service method scan_code() called by the handler for standard (non-session) code scanning. Creates domain ScanRequest from parameters and delegates to domain ScanOrchestrator.execute_scan() for actual scanning.
    async def scan_code( self, code_content: str, language: str, *, requester: str = "application", enable_semgrep: bool = True, enable_llm: bool = True, enable_validation: bool = False, severity_threshold: str | None = None, ) -> ScanResult: """ Scan code content for security vulnerabilities. Args: code_content: Source code to analyze language: Programming language of the code requester: Who requested the scan enable_semgrep: Whether to enable Semgrep scanning enable_llm: Whether to enable LLM analysis enable_validation: Whether to enable LLM validation severity_threshold: Minimum severity level to include Returns: ScanResult containing found threats and metadata """ # Create domain objects metadata = ScanMetadata( scan_id=str(uuid.uuid4()), scan_type="code", timestamp=datetime.now(UTC), requester=requester, language=language, enable_semgrep=enable_semgrep, enable_llm=enable_llm, enable_validation=enable_validation, ) # Use a virtual file path for code analysis virtual_path = FilePath.from_string( f"/virtual/code.{self._get_extension_for_language(language)}" ) context = ScanContext( target_path=virtual_path, metadata=metadata, content=code_content, language=language, ) severity_level = ( SeverityLevel.from_string(severity_threshold) if severity_threshold else None ) request = ScanRequest( context=context, enable_semgrep=enable_semgrep, enable_llm=enable_llm, enable_validation=enable_validation, severity_threshold=severity_level, ) # Validate and execute self._validation_service.validate_scan_request(request) self._validation_service.enforce_security_constraints(context) result = await self._scan_orchestrator.execute_scan(request) return result

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server