Skip to main content
Glama

adv_scan_code

Analyze source code for security vulnerabilities using Clean Architecture. Detects issues with session-aware analysis and project context integration.

Instructions

Scan code content for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis with project context when available.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contentYesSource code to analyze
languageYesProgramming language of the code
use_semgrepNoEnable Semgrep analysis
use_llmNoEnable LLM analysis
use_validationNoEnable LLM validation
severity_thresholdNoMinimum severity levelmedium
output_formatNoOutput format for persisted scan resultsjson

Implementation Reference

  • Primary MCP tool handler for 'adv_scan_code'. Validates inputs, chooses between session-aware or standard scanning, calls ScanApplicationService.scan_code or session analysis, persists results, formats JSON response with threats and metadata.
    async def _handle_scan_code( self, name: str, arguments: dict ) -> list[types.TextContent]: """Handle code scanning requests.""" try: # Log MCP tool invocation at INFO level for visibility logger.info(f"MCP Tool Invoked: {name}") logger.info(f"Parameters: {arguments}") # Comprehensive input validation validated_args = self._input_validator.validate_mcp_arguments( arguments, tool_name="adv_scan_code" ) content = validated_args.get("content", "") language = validated_args.get("language", "") if not content: raise CleanAdversaryToolError("Content parameter is required") if not language: raise CleanAdversaryToolError("Language parameter is required") use_semgrep = validated_args.get("use_semgrep", True) use_llm = validated_args.get( "use_llm", True ) # Default to true for code analysis use_validation = validated_args.get("use_validation", False) severity_threshold = validated_args.get("severity_threshold", "medium") output_format = validated_args.get("output_format", "json") # Auto-detect project context from current working directory project_path = Path.cwd() # Try to use session-aware analysis when available if self._session_manager and (use_llm or use_validation): # Auto-warm cache if this looks like a new project if not self._session_manager.session_cache.get_cached_project_context( project_path ): self._session_manager.warm_project_cache(project_path) # Use session-aware code analysis with auto-detected project context result = await self._handle_session_code_analysis( content=content, language=language, project_path=str(project_path), use_semgrep=use_semgrep, use_llm=use_llm, use_validation=use_validation, severity_threshold=severity_threshold, output_format=output_format, ) else: # Fall back to standard code scan result = await self._scan_service.scan_code( code_content=content, language=language, requester="mcp_client", enable_semgrep=use_semgrep, enable_llm=use_llm, enable_validation=use_validation, severity_threshold=severity_threshold, ) # Persist scan result automatically try: output_format_enum = OutputFormat.from_string(output_format) file_path = await self._persistence_service.persist_scan_result( result, output_format_enum ) logger.info(f"Scan result persisted to {file_path}") except Exception as e: logger.warning(f"Failed to persist scan result: {e}") # Don't fail the scan if persistence fails formatted_result = self._format_scan_result(result) # Add persistence info to the response formatted_result["persistence"] = { "output_format": output_format, "file_path": file_path if "file_path" in locals() else None, "persisted": "file_path" in locals(), } # Log successful completion with key metrics threat_count = ( len(result.threat_matches) if hasattr(result, "threat_matches") else 0 ) scan_duration = ( getattr(result.metadata, "scan_duration_seconds", 0) if hasattr(result, "metadata") else 0 ) code_length = len(content) if content else 0 logger.info( f"[+] MCP Tool Completed: {name} | Threats: {threat_count} | Code: {code_length} chars | Duration: {scan_duration:.2f}s" ) return [ types.TextContent( type="text", text=json.dumps(formatted_result, indent=2, default=str), ) ] except (ValidationError, SecurityError, ConfigurationError) as e: logger.error(f"Code scan failed: {e}") raise CleanAdversaryToolError(f"Scan failed: {str(e)}") except Exception as e: logger.error(f"Unexpected error in code scan: {e}") logger.error(traceback.format_exc()) raise CleanAdversaryToolError(f"Internal error: {str(e)}")
  • Tool dispatcher registration via @server.call_tool() decorator. Routes 'adv_scan_code' calls to the _handle_scan_code handler.
    @self.server.call_tool() async def tool_dispatcher( name: str, arguments: dict ) -> list[types.TextContent]: """Dispatch MCP tool calls to the appropriate handler.""" if name == "adv_scan_file": return await self._handle_scan_file(name, arguments) elif name == "adv_scan_folder": return await self._handle_scan_folder(name, arguments) elif name == "adv_scan_code": return await self._handle_scan_code(name, arguments) elif name == "adv_get_status": return await self._handle_get_status(name, arguments) elif name == "adv_get_version": return await self._handle_get_version(name, arguments) elif name == "adv_mark_false_positive": return await self._handle_mark_false_positive(name, arguments) elif name == "adv_unmark_false_positive": return await self._handle_unmark_false_positive(name, arguments) else: raise ValueError(f"Unknown tool: {name}")
  • Input schema definition for the 'adv_scan_code' tool, including required 'content' and 'language' parameters, optional scanner toggles, severity threshold, and output format.
    Tool( name="adv_scan_code", description="Scan code content for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis with project context when available.", inputSchema={ "type": "object", "properties": { "content": { "type": "string", "description": "Source code to analyze", }, "language": { "type": "string", "description": "Programming language of the code", }, "use_semgrep": { "type": "boolean", "description": "Enable Semgrep analysis", "default": True, }, "use_llm": { "type": "boolean", "description": "Enable LLM analysis", "default": True, }, "use_validation": { "type": "boolean", "description": "Enable LLM validation", "default": False, }, "severity_threshold": { "type": "string", "description": "Minimum severity level", "default": "medium", }, "output_format": { "type": "string", "description": "Output format for persisted scan results", "enum": ["json", "md", "markdown", "csv"], "default": "json", }, }, "required": ["content", "language"], }, ),
  • Core application service method implementing code scanning logic. Creates domain ScanRequest from parameters, validates, and orchestrates scan via domain ScanOrchestrator.
    async def scan_code( self, code_content: str, language: str, *, requester: str = "application", enable_semgrep: bool = True, enable_llm: bool = True, enable_validation: bool = False, severity_threshold: str | None = None, ) -> ScanResult: """ Scan code content for security vulnerabilities. Args: code_content: Source code to analyze language: Programming language of the code requester: Who requested the scan enable_semgrep: Whether to enable Semgrep scanning enable_llm: Whether to enable LLM analysis enable_validation: Whether to enable LLM validation severity_threshold: Minimum severity level to include Returns: ScanResult containing found threats and metadata """ # Create domain objects metadata = ScanMetadata( scan_id=str(uuid.uuid4()), scan_type="code", timestamp=datetime.now(UTC), requester=requester, language=language, enable_semgrep=enable_semgrep, enable_llm=enable_llm, enable_validation=enable_validation, ) # Use a virtual file path for code analysis virtual_path = FilePath.from_string( f"/virtual/code.{self._get_extension_for_language(language)}" ) context = ScanContext( target_path=virtual_path, metadata=metadata, content=code_content, language=language, ) severity_level = ( SeverityLevel.from_string(severity_threshold) if severity_threshold else None ) request = ScanRequest( context=context, enable_semgrep=enable_semgrep, enable_llm=enable_llm, enable_validation=enable_validation, severity_threshold=severity_level, ) # Validate and execute self._validation_service.validate_scan_request(request) self._validation_service.enforce_security_constraints(context) result = await self._scan_orchestrator.execute_scan(request) return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server