scan_content
Scan text content for AVE vulnerabilities, including toxic flow combinations. Get findings with severity, OWASP categories, and remediation guidance.
Instructions
Scan raw text content for AVE security vulnerabilities.
Use this to check skill file content, system prompts, MCP tool descriptions, or any agentic AI component before using it.
Returns findings with AVE IDs, severity, OWASP MCP categories, and links to full remediation guidance. Also detects toxic flows where two findings combine into a complete attack chain.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| content | Yes | The text content to scan (skill file, system prompt, etc.) | |
| label | No | Optional label for the content in the output (default: submitted-content) | submitted-content |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- bawbel_mcp/server.py:194-230 (handler)The scan_content function is the primary handler for the scan_content tool. It accepts raw text content, writes it to a temporary file, runs the 'bawbel scan' CLI command on it via _run_bawbel, and formats the result using _format_scan_result.
def scan_content( content: str, label: str = "submitted-content", ) -> str: """ Scan raw text content for AVE security vulnerabilities. Use this to check skill file content, system prompts, MCP tool descriptions, or any agentic AI component before using it. Returns findings with AVE IDs, severity, OWASP MCP categories, and links to full remediation guidance. Also detects toxic flows where two findings combine into a complete attack chain. Args: content: The text content to scan (skill file, system prompt, etc.) label: Optional label for the content in the output (default: submitted-content) """ if not content or not content.strip(): return "Error: content is empty" if len(content.encode("utf-8")) > MAX_CONTENT_BYTES: return f"Error: content exceeds {MAX_CONTENT_BYTES // 1024}KB limit" with tempfile.NamedTemporaryFile( mode="w", suffix=".md", prefix="bawbel_mcp_", delete=False, encoding="utf-8" ) as f: f.write(content) tmp_path = f.name try: result = _run_bawbel(["scan"], input_file=tmp_path) finally: Path(tmp_path).unlink(missing_ok=True) return _format_scan_result(result) - bawbel_mcp/server.py:193-193 (registration)The tool is registered via the @mcp.tool() decorator from FastMCP, which automatically registers scan_content as an MCP tool.
@mcp.tool() - bawbel_mcp/server.py:54-102 (helper)The _run_bawbel helper function executes the 'bawbel' CLI with arguments and parses JSON output. scan_content calls it with ['scan'] and the temp file path.
def _run_bawbel(args: list[str], input_file: Optional[str] = None) -> dict: """ Run bawbel CLI and return parsed JSON output. Returns error dict on failure. """ cmd = ["bawbel"] + args + ["--format", "json"] if input_file: cmd.append(input_file) try: result = subprocess.run( # nosec B603 # noqa: S603 cmd, capture_output=True, text=True, timeout=60, ) raw = result.stdout.strip() if not raw: return { "error": result.stderr.strip() or "Scanner produced no output", "findings": [], "toxic_flows": [], "risk_score": 0, } # JSON output is a list of file results start = raw.find("[") if start < 0: return {"error": raw[:300], "findings": [], "toxic_flows": [], "risk_score": 0} results = json.loads(raw[start:]) if results: return results[0] return {"findings": [], "toxic_flows": [], "risk_score": 0} except subprocess.TimeoutExpired: return {"error": "Scan timeout (60s)", "findings": [], "toxic_flows": [], "risk_score": 0} except json.JSONDecodeError as e: return {"error": f"Parse error: {e}", "findings": [], "toxic_flows": [], "risk_score": 0} except FileNotFoundError: return { "error": ( "bawbel CLI not found. " "Install with: pip install bawbel-scanner" ), "findings": [], "toxic_flows": [], "risk_score": 0, } - bawbel_mcp/server.py:134-187 (helper)The _format_scan_result helper formats the scan result dict into a human-readable string for the MCP response. scan_content calls this to return results to the user.
def _format_scan_result(result: dict) -> str: """Format scan result for human-readable MCP response.""" if result.get("error"): return f"Error: {result['error']}" findings = result.get("findings", []) toxic_flows = result.get("toxic_flows", []) risk_score = result.get("risk_score", 0) max_severity = result.get("max_severity", "NONE") lines = [] if not findings and not toxic_flows: lines.append("Clean: no findings detected.") lines.append(f"Risk score: {risk_score:.1f}/10") return "\n".join(lines) lines.append(f"Risk score: {risk_score:.1f}/10 ({max_severity})") lines.append(f"Findings: {len(findings)} Toxic flows: {len(toxic_flows)}") lines.append("") if findings: lines.append("FINDINGS") lines.append("-" * 50) for f in findings: sev = f.get("severity", "?") ave_id = f.get("ave_id", "") title = f.get("title", "") line_no = f.get("line_number") owasp_mcp = ", ".join(f.get("owasp_mcp", [])) lines.append(f"[{sev}] {ave_id} {title}") if line_no: lines.append(f" Line {line_no}") if owasp_mcp: lines.append(f" OWASP MCP: {owasp_mcp}") lines.append( f" Details: {PIRANHA_API}/records/{ave_id}" ) lines.append("") if toxic_flows: lines.append("TOXIC FLOWS DETECTED") lines.append("-" * 50) for flow in toxic_flows: title = flow.get("title", "") cvss = flow.get("cvss_ai", 0) caps = " + ".join(flow.get("capabilities", [])) ave_ids = ", ".join(flow.get("ave_ids", [])) lines.append(f"⛓ CRITICAL {cvss} {title}") lines.append(f" Chain: {caps}") lines.append(f" AVEs: {ave_ids}") lines.append("") return "\n".join(lines)