Skip to main content
Glama

adv_scan_code

Analyze source code for security vulnerabilities using Clean Architecture. Detects issues with session-aware analysis and project context integration.

Instructions

Scan code content for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis with project context when available.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
contentYesSource code to analyze
languageYesProgramming language of the code
use_semgrepNoEnable Semgrep analysis
use_llmNoEnable LLM analysis
use_validationNoEnable LLM validation
severity_thresholdNoMinimum severity levelmedium
output_formatNoOutput format for persisted scan resultsjson

Implementation Reference

  • Primary MCP tool handler for 'adv_scan_code'. Validates inputs, chooses between session-aware or standard scanning, calls ScanApplicationService.scan_code or session analysis, persists results, formats JSON response with threats and metadata.
    async def _handle_scan_code(
        self, name: str, arguments: dict
    ) -> list[types.TextContent]:
        """Handle code scanning requests."""
        try:
            # Log MCP tool invocation at INFO level for visibility
            logger.info(f"MCP Tool Invoked: {name}")
            logger.info(f"Parameters: {arguments}")
    
            # Comprehensive input validation
            validated_args = self._input_validator.validate_mcp_arguments(
                arguments, tool_name="adv_scan_code"
            )
    
            content = validated_args.get("content", "")
            language = validated_args.get("language", "")
    
            if not content:
                raise CleanAdversaryToolError("Content parameter is required")
            if not language:
                raise CleanAdversaryToolError("Language parameter is required")
    
            use_semgrep = validated_args.get("use_semgrep", True)
            use_llm = validated_args.get(
                "use_llm", True
            )  # Default to true for code analysis
            use_validation = validated_args.get("use_validation", False)
            severity_threshold = validated_args.get("severity_threshold", "medium")
            output_format = validated_args.get("output_format", "json")
    
            # Auto-detect project context from current working directory
            project_path = Path.cwd()
    
            # Try to use session-aware analysis when available
            if self._session_manager and (use_llm or use_validation):
                # Auto-warm cache if this looks like a new project
                if not self._session_manager.session_cache.get_cached_project_context(
                    project_path
                ):
                    self._session_manager.warm_project_cache(project_path)
    
                # Use session-aware code analysis with auto-detected project context
                result = await self._handle_session_code_analysis(
                    content=content,
                    language=language,
                    project_path=str(project_path),
                    use_semgrep=use_semgrep,
                    use_llm=use_llm,
                    use_validation=use_validation,
                    severity_threshold=severity_threshold,
                    output_format=output_format,
                )
            else:
                # Fall back to standard code scan
                result = await self._scan_service.scan_code(
                    code_content=content,
                    language=language,
                    requester="mcp_client",
                    enable_semgrep=use_semgrep,
                    enable_llm=use_llm,
                    enable_validation=use_validation,
                    severity_threshold=severity_threshold,
                )
    
            # Persist scan result automatically
            try:
                output_format_enum = OutputFormat.from_string(output_format)
                file_path = await self._persistence_service.persist_scan_result(
                    result, output_format_enum
                )
                logger.info(f"Scan result persisted to {file_path}")
            except Exception as e:
                logger.warning(f"Failed to persist scan result: {e}")
                # Don't fail the scan if persistence fails
    
            formatted_result = self._format_scan_result(result)
    
            # Add persistence info to the response
            formatted_result["persistence"] = {
                "output_format": output_format,
                "file_path": file_path if "file_path" in locals() else None,
                "persisted": "file_path" in locals(),
            }
    
            # Log successful completion with key metrics
            threat_count = (
                len(result.threat_matches) if hasattr(result, "threat_matches") else 0
            )
            scan_duration = (
                getattr(result.metadata, "scan_duration_seconds", 0)
                if hasattr(result, "metadata")
                else 0
            )
            code_length = len(content) if content else 0
            logger.info(
                f"[+] MCP Tool Completed: {name} | Threats: {threat_count} | Code: {code_length} chars | Duration: {scan_duration:.2f}s"
            )
    
            return [
                types.TextContent(
                    type="text",
                    text=json.dumps(formatted_result, indent=2, default=str),
                )
            ]
    
        except (ValidationError, SecurityError, ConfigurationError) as e:
            logger.error(f"Code scan failed: {e}")
            raise CleanAdversaryToolError(f"Scan failed: {str(e)}")
        except Exception as e:
            logger.error(f"Unexpected error in code scan: {e}")
            logger.error(traceback.format_exc())
            raise CleanAdversaryToolError(f"Internal error: {str(e)}")
  • Tool dispatcher registration via @server.call_tool() decorator. Routes 'adv_scan_code' calls to the _handle_scan_code handler.
    @self.server.call_tool()
    async def tool_dispatcher(
        name: str, arguments: dict
    ) -> list[types.TextContent]:
        """Dispatch MCP tool calls to the appropriate handler."""
        if name == "adv_scan_file":
            return await self._handle_scan_file(name, arguments)
        elif name == "adv_scan_folder":
            return await self._handle_scan_folder(name, arguments)
        elif name == "adv_scan_code":
            return await self._handle_scan_code(name, arguments)
        elif name == "adv_get_status":
            return await self._handle_get_status(name, arguments)
        elif name == "adv_get_version":
            return await self._handle_get_version(name, arguments)
        elif name == "adv_mark_false_positive":
            return await self._handle_mark_false_positive(name, arguments)
        elif name == "adv_unmark_false_positive":
            return await self._handle_unmark_false_positive(name, arguments)
        else:
            raise ValueError(f"Unknown tool: {name}")
  • Input schema definition for the 'adv_scan_code' tool, including required 'content' and 'language' parameters, optional scanner toggles, severity threshold, and output format.
    Tool(
        name="adv_scan_code",
        description="Scan code content for security vulnerabilities using Clean Architecture. Automatically uses session-aware analysis with project context when available.",
        inputSchema={
            "type": "object",
            "properties": {
                "content": {
                    "type": "string",
                    "description": "Source code to analyze",
                },
                "language": {
                    "type": "string",
                    "description": "Programming language of the code",
                },
                "use_semgrep": {
                    "type": "boolean",
                    "description": "Enable Semgrep analysis",
                    "default": True,
                },
                "use_llm": {
                    "type": "boolean",
                    "description": "Enable LLM analysis",
                    "default": True,
                },
                "use_validation": {
                    "type": "boolean",
                    "description": "Enable LLM validation",
                    "default": False,
                },
                "severity_threshold": {
                    "type": "string",
                    "description": "Minimum severity level",
                    "default": "medium",
                },
                "output_format": {
                    "type": "string",
                    "description": "Output format for persisted scan results",
                    "enum": ["json", "md", "markdown", "csv"],
                    "default": "json",
                },
            },
            "required": ["content", "language"],
        },
    ),
  • Core application service method implementing code scanning logic. Creates domain ScanRequest from parameters, validates, and orchestrates scan via domain ScanOrchestrator.
    async def scan_code(
        self,
        code_content: str,
        language: str,
        *,
        requester: str = "application",
        enable_semgrep: bool = True,
        enable_llm: bool = True,
        enable_validation: bool = False,
        severity_threshold: str | None = None,
    ) -> ScanResult:
        """
        Scan code content for security vulnerabilities.
    
        Args:
            code_content: Source code to analyze
            language: Programming language of the code
            requester: Who requested the scan
            enable_semgrep: Whether to enable Semgrep scanning
            enable_llm: Whether to enable LLM analysis
            enable_validation: Whether to enable LLM validation
            severity_threshold: Minimum severity level to include
    
        Returns:
            ScanResult containing found threats and metadata
        """
        # Create domain objects
        metadata = ScanMetadata(
            scan_id=str(uuid.uuid4()),
            scan_type="code",
            timestamp=datetime.now(UTC),
            requester=requester,
            language=language,
            enable_semgrep=enable_semgrep,
            enable_llm=enable_llm,
            enable_validation=enable_validation,
        )
    
        # Use a virtual file path for code analysis
        virtual_path = FilePath.from_string(
            f"/virtual/code.{self._get_extension_for_language(language)}"
        )
    
        context = ScanContext(
            target_path=virtual_path,
            metadata=metadata,
            content=code_content,
            language=language,
        )
    
        severity_level = (
            SeverityLevel.from_string(severity_threshold)
            if severity_threshold
            else None
        )
    
        request = ScanRequest(
            context=context,
            enable_semgrep=enable_semgrep,
            enable_llm=enable_llm,
            enable_validation=enable_validation,
            severity_threshold=severity_level,
        )
    
        # Validate and execute
        self._validation_service.validate_scan_request(request)
        self._validation_service.enforce_security_constraints(context)
    
        result = await self._scan_orchestrator.execute_scan(request)
    
        return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server