Skip to main content
Glama

MCPPentestBOT

by kannanprabu
llm_client.pyβ€’28.3 kB
""" LLM Client with Enhanced Security Analysis - Complete Implementation Azure OpenAI + Claude (Anthropic) + GitHub Copilot Support """ import os import json import asyncio from typing import Optional, List, Dict, Any import sys from pathlib import Path from datetime import datetime # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) # Import MCP server components directly from mcp.protocol import MCPProtocolHandler from mcp.types import MCPMethod, MCPTool from tools import ToolRegistry # Try to import from existing config.py file try: from config import AZURE_CONFIG, CLAUDE_CONFIG, COPILOT_CONFIG, LLM_PROVIDER print("βœ… Loaded configuration from config.py") CONFIG_FILE_FOUND = True except ImportError: print("⚠️ config.py not found, will use environment variables") CONFIG_FILE_FOUND = False AZURE_CONFIG = {} CLAUDE_CONFIG = {} COPILOT_CONFIG = {} LLM_PROVIDER = "azure" class DirectMCPClient: """MCP Client that runs server directly (no subprocess)""" def __init__(self): """Initialize direct MCP client""" self.protocol = MCPProtocolHandler("MCPPentestBOT", "1.0.0") self.tool_registry = ToolRegistry() self.tools_cache = None async def initialize(self) -> bool: """Initialize MCP connection""" print("πŸ”§ Initializing MCP tools...") # Load all tools tool_count = await self.tool_registry.load_all_tools() print(f"βœ… Loaded {tool_count} security tools") return True def list_tools(self) -> List[Dict[str, Any]]: """List available tools""" if self.tools_cache: return self.tools_cache tools = self.tool_registry.get_tool_definitions() self.tools_cache = tools return tools async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> tuple[Optional[str], bool]: """Call a tool directly""" try: print(f"πŸ”§ Executing: {tool_name}({arguments})") # Execute tool directly result = await self.tool_registry.execute_tool(tool_name, arguments) if result and len(result) > 0: # Extract text from content text_parts = [] is_error = False for item in result: if item.get("type") == "text": text = item.get("text", "") text_parts.append(text) # Check for error indicators if "Error:" in text or "Failed:" in text or "❌" in text: is_error = True result_text = "\n".join(text_parts) if text_parts else "No result" if is_error: print("⚠️ Tool execution had errors") else: print("βœ… Tool execution successful") return result_text, is_error else: return "No result returned from tool", True except Exception as e: error_msg = f"Tool execution failed: {str(e)}" print(f"❌ {error_msg}") return error_msg, True def get_tool_schemas_for_llm(self, provider: str = "azure") -> List[Dict[str, Any]]: """Convert MCP tools to LLM-specific function schemas""" tools = self.list_tools() if provider == "azure" or provider == "copilot": # Azure OpenAI / Copilot format return [{ "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in tools] elif provider == "claude": # Claude (Anthropic) format return [{ "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema } for tool in tools] return [] class SecurityAnalysisClient: """Enhanced LLM client with detailed security analysis and reporting""" def __init__(self, provider: str = None, config: Dict[str, Any] = None): """Initialize security analysis client""" # Use provider from config.py if not specified if provider is None and CONFIG_FILE_FOUND: provider = LLM_PROVIDER print(f"πŸ“ Using provider from config.py: {provider}") self.provider = (provider or "azure").lower() # Use config from config.py if not specified if config is None and CONFIG_FILE_FOUND: if self.provider == "azure": self.config = AZURE_CONFIG elif self.provider == "claude": self.config = CLAUDE_CONFIG elif self.provider == "copilot": self.config = COPILOT_CONFIG else: self.config = {} print(f"πŸ“ Using configuration from config.py") else: self.config = config or {} self.mcp_client = DirectMCPClient() self.conversation_history = [] self.llm_client = None self.scan_results = [] # Store scan results for reporting # Initialize provider self._init_provider() def _init_provider(self): """Initialize the selected LLM provider""" print(f"πŸ€– Initializing {self.provider.upper()} provider...") if self.provider == "azure": self._init_azure() elif self.provider == "claude": self._init_claude() elif self.provider == "copilot": self._init_copilot() else: raise ValueError(f"Unsupported provider: {self.provider}") def _init_azure(self): """Initialize Azure OpenAI""" try: from openai import AzureOpenAI self.llm_client = AzureOpenAI( api_key=self.config.get("api_key") or os.getenv("AZURE_OPENAI_API_KEY"), api_version=self.config.get("api_version", "2024-02-15-preview"), azure_endpoint=self.config.get("endpoint") or os.getenv("AZURE_OPENAI_ENDPOINT") ) self.deployment_name = self.config.get("deployment_name") or os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4") print(f"βœ… Azure OpenAI initialized (deployment: {self.deployment_name})") except ImportError: print("❌ Azure OpenAI library not installed. Run: pip install openai") raise except Exception as e: print(f"❌ Azure initialization failed: {e}") raise def _init_claude(self): """Initialize Claude (Anthropic)""" try: from anthropic import Anthropic api_key = self.config.get("api_key") or os.getenv("ANTHROPIC_API_KEY") if not api_key: raise ValueError("Claude API key not found in config.py or ANTHROPIC_API_KEY environment variable") self.llm_client = Anthropic(api_key=api_key) self.model_name = self.config.get("model", "claude-sonnet-4-20250514") print(f"βœ… Claude initialized (model: {self.model_name})") except ImportError: print("❌ Anthropic library not installed. Run: pip install anthropic") raise except Exception as e: print(f"❌ Claude initialization failed: {e}") raise def _init_copilot(self): """Initialize GitHub Copilot""" try: from openai import OpenAI self.llm_client = OpenAI( api_key=self.config.get("api_key") or os.getenv("GITHUB_TOKEN"), base_url=self.config.get("base_url", "https://api.githubcopilot.com") ) self.model_name = self.config.get("model", "gpt-4") print(f"βœ… Copilot initialized (model: {self.model_name})") except ImportError: print("❌ OpenAI library not installed. Run: pip install openai") raise except Exception as e: print(f"❌ Copilot initialization failed: {e}") raise def _get_enhanced_system_prompt(self) -> str: """Get enhanced system prompt for detailed security analysis""" return """You are MCPPentestBOT, an expert AI security analyst and penetration tester with deep knowledge of: - Network security and vulnerability assessment - SSL/TLS protocols and certificate validation - Web application security (OWASP Top 10) - Security headers and best practices - Risk assessment and threat modeling - Security compliance frameworks (PCI-DSS, HIPAA, ISO 27001) When analyzing security scan results, you MUST provide: 1. **EXECUTIVE SUMMARY** - Brief overview of findings - Overall security posture (Critical/High/Medium/Low) - Key recommendations 2. **DETAILED FINDINGS** For EACH finding, include: - **Vulnerability/Issue Name**: Clear, specific title - **Severity**: Critical/High/Medium/Low/Info - **Description**: What was found and why it matters - **Risk**: Potential impact if exploited - **Evidence**: Specific details from scan results - **Affected Component**: Port, service, URL, etc. 3. **RISK ASSESSMENT** - **Likelihood**: How easy is it to exploit? - **Impact**: What damage could occur? - **Overall Risk Score**: Combine likelihood Γ— impact 4. **REMEDIATION RECOMMENDATIONS** - **Immediate Actions**: Critical fixes needed now - **Short-term Fixes**: To implement within 30 days - **Long-term Improvements**: Strategic security enhancements - **Specific Steps**: How to fix each issue - **Testing Verification**: How to verify fixes work 5. **COMPLIANCE IMPACT** - Which standards are affected (PCI-DSS, HIPAA, etc.) - Compliance gaps identified - Audit implications 6. **ADDITIONAL CONTEXT** - Industry best practices - Related vulnerabilities to check - References (CVE, CWE, OWASP links) FORMATTING RULES: - Use clear headers (###) and bullet points - **Bold** important terms - Use emojis for severity: πŸ”΄ Critical, 🟠 High, 🟑 Medium, πŸ”΅ Low, βšͺ Info - Include specific evidence and examples - Provide actionable, not generic, recommendations - Explain technical concepts clearly for both technical and non-technical audiences Available security testing tools: - **http_headers**: Security headers validation - **nmap_scan**: Port scanning and service detection - **ssl_check**: SSL/TLS certificate analysis - **testssl_scan**: Comprehensive SSL/TLS security scan - **nikto_scan**: Web vulnerability scanning - **ping_sweep**: Network discovery - **port_check**: Port connectivity testing When a user requests a scan: 1. Execute appropriate tool(s) 2. Analyze results thoroughly 3. Provide comprehensive security report as described above 4. Suggest additional scans if needed IMPORTANT: Only scan systems the user owns or has explicit permission to test. Always remind users of this. """ async def start(self): """Start the security analysis client""" print() print("=" * 60) print("πŸ›‘οΈ MCPPentestBOT - Professional Security Analysis Platform") print("=" * 60) print() # Initialize MCP directly (no subprocess) if not await self.mcp_client.initialize(): raise RuntimeError("Failed to initialize MCP tools") # Load tools tools = self.mcp_client.list_tools() print() print(f"πŸ“¦ Available {len(tools)} security tools:") for tool in tools: print(f" β€’ {tool.name}") print() print("=" * 60) print("πŸ’¬ Ready for security analysis! Ask questions or request scans.") print() print(" πŸ“Š Commands:") print(" - 'Scan [target]' - Comprehensive security scan") print(" - 'Generate report for [target]' - Detailed security report") print(" - 'Assess risk for [finding]' - Risk analysis") print(" - 'help' - Show available commands") print(" - 'quit' - Exit") print("=" * 60) print() async def chat(self, user_message: str) -> str: """Send a message and get detailed security analysis""" if self.provider == "azure" or self.provider == "copilot": return await self._chat_azure(user_message) elif self.provider == "claude": return await self._chat_claude(user_message) async def _chat_azure(self, user_message: str) -> str: """Chat using Azure OpenAI with enhanced security analysis""" # Add user message to history self.conversation_history.append({ "role": "user", "content": user_message }) # Get tool schemas tools = self.mcp_client.get_tool_schemas_for_llm("azure") # Enhanced system message system_message = { "role": "system", "content": self._get_enhanced_system_prompt() } try: # Call LLM with enhanced prompting response = self.llm_client.chat.completions.create( model=self.deployment_name if self.provider == "azure" else self.model_name, messages=[system_message] + self.conversation_history, tools=tools, tool_choice="auto", temperature=0.7, max_tokens=4000 ) assistant_message = response.choices[0].message # Check if tool was called if assistant_message.tool_calls: # Execute tools and collect results tool_results = [] tool_outputs = [] for tool_call in assistant_message.tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) print(f"\nπŸ”§ Executing: {tool_name}({tool_args})") # Call tool via direct MCP result, is_error = await self.mcp_client.call_tool(tool_name, tool_args) # Store for reporting tool_outputs.append({ "tool": tool_name, "args": tool_args, "result": result, "error": is_error, "timestamp": datetime.now().isoformat() }) tool_results.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result }) # Store scan results self.scan_results.extend(tool_outputs) # Add assistant message with tool calls self.conversation_history.append({ "role": "assistant", "content": assistant_message.content, "tool_calls": [ { "id": tc.id, "type": "function", "function": { "name": tc.function.name, "arguments": tc.function.arguments } } for tc in assistant_message.tool_calls ] }) # Add tool results self.conversation_history.extend(tool_results) # Get final detailed analysis print("\nπŸ“Š Analyzing results...") final_response = self.llm_client.chat.completions.create( model=self.deployment_name if self.provider == "azure" else self.model_name, messages=[system_message] + self.conversation_history + [{ "role": "user", "content": """Please provide a COMPREHENSIVE SECURITY ANALYSIS following this structure: 1. EXECUTIVE SUMMARY (2-3 sentences) 2. DETAILED FINDINGS (for each vulnerability/issue found) 3. RISK ASSESSMENT (likelihood, impact, overall risk) 4. REMEDIATION RECOMMENDATIONS (immediate, short-term, long-term) 5. COMPLIANCE IMPACT (if applicable) 6. ADDITIONAL RECOMMENDATIONS Use clear formatting, severity indicators (πŸ”΄πŸŸ πŸŸ‘πŸ”΅βšͺ), and provide specific, actionable advice.""" }], temperature=0.7, max_tokens=4000 ) final_message = final_response.choices[0].message.content self.conversation_history.append({ "role": "assistant", "content": final_message }) return final_message else: # No tool call, just text response content = assistant_message.content self.conversation_history.append({ "role": "assistant", "content": content }) return content except Exception as e: error_msg = f"Error communicating with LLM: {str(e)}" print(f"❌ {error_msg}") return error_msg async def _chat_claude(self, user_message: str) -> str: """Chat using Claude with enhanced security analysis""" # Get tool schemas in Claude format tools = self.mcp_client.get_tool_schemas_for_llm("claude") # Build messages for Claude messages = self.conversation_history + [{ "role": "user", "content": user_message }] try: # Call Claude with tools response = self.llm_client.messages.create( model=self.model_name, max_tokens=4096, system=self._get_enhanced_system_prompt(), messages=messages, tools=tools ) # Process response assistant_message_content = [] tool_results = [] # Check if Claude wants to use tools for block in response.content: if block.type == "text": assistant_message_content.append({ "type": "text", "text": block.text }) elif block.type == "tool_use": # Execute tool tool_name = block.name tool_input = block.input print(f"\nπŸ”§ Executing: {tool_name}({tool_input})") # Call tool via MCP result, is_error = await self.mcp_client.call_tool(tool_name, tool_input) # Store for reporting self.scan_results.append({ "tool": tool_name, "args": tool_input, "result": result, "error": is_error, "timestamp": datetime.now().isoformat() }) # Add tool use to message assistant_message_content.append({ "type": "tool_use", "id": block.id, "name": tool_name, "input": tool_input }) # Prepare tool result tool_results.append({ "type": "tool_result", "tool_use_id": block.id, "content": result }) # Add assistant message to history self.conversation_history.append({ "role": "assistant", "content": assistant_message_content }) # If we used tools, get final analysis if tool_results: # Add tool results to conversation self.conversation_history.append({ "role": "user", "content": tool_results }) # Get final analysis print("\nπŸ“Š Analyzing results...") final_response = self.llm_client.messages.create( model=self.model_name, max_tokens=4096, system=self._get_enhanced_system_prompt(), messages=self.conversation_history + [{ "role": "user", "content": """Please provide a COMPREHENSIVE SECURITY ANALYSIS following this structure: 1. EXECUTIVE SUMMARY (2-3 sentences) 2. DETAILED FINDINGS (for each vulnerability/issue found) 3. RISK ASSESSMENT (likelihood, impact, overall risk) 4. REMEDIATION RECOMMENDATIONS (immediate, short-term, long-term) 5. COMPLIANCE IMPACT (if applicable) 6. ADDITIONAL RECOMMENDATIONS Use clear formatting, severity indicators (πŸ”΄πŸŸ πŸŸ‘πŸ”΅βšͺ), and provide specific, actionable advice.""" }] ) # Extract text from response final_text = "" for block in final_response.content: if block.type == "text": final_text += block.text # Add to history self.conversation_history.append({ "role": "assistant", "content": [{"type": "text", "text": final_text}] }) return final_text else: # No tools used, just return text response text_response = "" for block in response.content: if block.type == "text": text_response += block.text self.conversation_history.append({ "role": "user", "content": user_message }) return text_response except Exception as e: error_msg = f"Error communicating with Claude: {str(e)}" print(f"❌ {error_msg}") return error_msg async def generate_report(self, target: str = None) -> str: """Generate a comprehensive security report""" if not self.scan_results: return "No scan results available. Please run some security scans first." # Generate report using LLM report_prompt = f"""Generate a COMPREHENSIVE PROFESSIONAL SECURITY REPORT based on all scan results collected. Target: {target or 'Multiple targets'} Scan Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Include: 1. **EXECUTIVE SUMMARY** 2. **SCOPE AND METHODOLOGY** 3. **DETAILED FINDINGS** (all vulnerabilities with severity, risk, evidence) 4. **RISK ASSESSMENT MATRIX** 5. **PRIORITIZED REMEDIATION ROADMAP** 6. **COMPLIANCE GAP ANALYSIS** 7. **CONCLUSIONS AND RECOMMENDATIONS** Format as a professional penetration testing report.""" return await self.chat(report_prompt) async def run_interactive(self): """Run interactive security analysis session""" await self.start() try: while True: # Get user input user_input = input("\nπŸ‘€ You: ").strip() if not user_input: continue if user_input.lower() in ['quit', 'exit', 'bye']: print("\nπŸ‘‹ Goodbye! Stay secure!") break if user_input.lower() == 'help': self._show_help() continue if user_input.lower().startswith('report'): # Generate comprehensive report print("\nπŸ“Š Generating comprehensive security report...") response = await self.generate_report() print("\n" + response) continue # Get response with detailed analysis print("\nπŸ€– Security Analyst: ", end="", flush=True) response = await self.chat(user_input) print(response) except KeyboardInterrupt: print("\n\nπŸ‘‹ Interrupted. Stay secure!") def _show_help(self): """Show available commands""" help_text = """ ╔═══════════════════════════════════════════════════════════╗ β•‘ MCPPentestBOT - Available Commands β•‘ β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β• πŸ“Š SCANNING COMMANDS: β€’ "Scan [target]" - Comprehensive security scan β€’ "Check SSL for [target]" - SSL/TLS analysis β€’ "Scan [target] for open ports" - Port scanning β€’ "Check security headers for [target]" - HTTP headers πŸ“‹ REPORTING COMMANDS: β€’ "report" - Generate comprehensive security report β€’ "summarize findings" - Quick summary of all findings β€’ "risk assessment for [target]" - Detailed risk analysis πŸ” ANALYSIS COMMANDS: β€’ "Explain [vulnerability]" - Get detailed explanation β€’ "How to fix [issue]" - Remediation guidance β€’ "What is the risk of [finding]?" - Risk assessment πŸ’‘ GENERAL: β€’ "help" - Show this help β€’ "quit" - Exit the program EXAMPLES: πŸ‘€ "Scan github.com comprehensively" πŸ‘€ "Generate a security report" πŸ‘€ "What's the risk of missing HSTS header?" πŸ‘€ "How do I fix SSL certificate issues?" ═══════════════════════════════════════════════════════════ """ print(help_text) async def main_async(): """Async main entry point""" import argparse parser = argparse.ArgumentParser(description="MCPPentestBOT - Professional Security Analysis") parser.add_argument( "--provider", choices=["azure", "claude", "copilot"], default=None, help="LLM provider to use (optional if config.py exists)" ) args = parser.parse_args() # If config.py exists, use it if CONFIG_FILE_FOUND: print("=" * 60) print("πŸ“ Using configuration from config.py") print("=" * 60) print() try: client = SecurityAnalysisClient(provider=args.provider) await client.run_interactive() except Exception as e: print(f"❌ Failed to start: {e}") print("\nCheck your config.py file for correct credentials") else: print("❌ config.py not found") print("\nPlease create a config.py file with your credentials") def main(): """Main entry point""" asyncio.run(main_async()) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kannanprabu/MCPPentestBOT'

If you have feedback or need assistance with the MCP directory API, please join our Discord server