llm_client.pyβ’28.3 kB
"""
LLM Client with Enhanced Security Analysis - Complete Implementation
Azure OpenAI + Claude (Anthropic) + GitHub Copilot Support
"""
import os
import json
import asyncio
from typing import Optional, List, Dict, Any
import sys
from pathlib import Path
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
# Import MCP server components directly
from mcp.protocol import MCPProtocolHandler
from mcp.types import MCPMethod, MCPTool
from tools import ToolRegistry
# Try to import from existing config.py file
try:
from config import AZURE_CONFIG, CLAUDE_CONFIG, COPILOT_CONFIG, LLM_PROVIDER
print("β
Loaded configuration from config.py")
CONFIG_FILE_FOUND = True
except ImportError:
print("β οΈ config.py not found, will use environment variables")
CONFIG_FILE_FOUND = False
AZURE_CONFIG = {}
CLAUDE_CONFIG = {}
COPILOT_CONFIG = {}
LLM_PROVIDER = "azure"
class DirectMCPClient:
"""MCP Client that runs server directly (no subprocess)"""
def __init__(self):
"""Initialize direct MCP client"""
self.protocol = MCPProtocolHandler("MCPPentestBOT", "1.0.0")
self.tool_registry = ToolRegistry()
self.tools_cache = None
async def initialize(self) -> bool:
"""Initialize MCP connection"""
print("π§ Initializing MCP tools...")
# Load all tools
tool_count = await self.tool_registry.load_all_tools()
print(f"β
Loaded {tool_count} security tools")
return True
def list_tools(self) -> List[Dict[str, Any]]:
"""List available tools"""
if self.tools_cache:
return self.tools_cache
tools = self.tool_registry.get_tool_definitions()
self.tools_cache = tools
return tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> tuple[Optional[str], bool]:
"""Call a tool directly"""
try:
print(f"π§ Executing: {tool_name}({arguments})")
# Execute tool directly
result = await self.tool_registry.execute_tool(tool_name, arguments)
if result and len(result) > 0:
# Extract text from content
text_parts = []
is_error = False
for item in result:
if item.get("type") == "text":
text = item.get("text", "")
text_parts.append(text)
# Check for error indicators
if "Error:" in text or "Failed:" in text or "β" in text:
is_error = True
result_text = "\n".join(text_parts) if text_parts else "No result"
if is_error:
print("β οΈ Tool execution had errors")
else:
print("β
Tool execution successful")
return result_text, is_error
else:
return "No result returned from tool", True
except Exception as e:
error_msg = f"Tool execution failed: {str(e)}"
print(f"β {error_msg}")
return error_msg, True
def get_tool_schemas_for_llm(self, provider: str = "azure") -> List[Dict[str, Any]]:
"""Convert MCP tools to LLM-specific function schemas"""
tools = self.list_tools()
if provider == "azure" or provider == "copilot":
# Azure OpenAI / Copilot format
return [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in tools]
elif provider == "claude":
# Claude (Anthropic) format
return [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
} for tool in tools]
return []
class SecurityAnalysisClient:
"""Enhanced LLM client with detailed security analysis and reporting"""
def __init__(self, provider: str = None, config: Dict[str, Any] = None):
"""Initialize security analysis client"""
# Use provider from config.py if not specified
if provider is None and CONFIG_FILE_FOUND:
provider = LLM_PROVIDER
print(f"π Using provider from config.py: {provider}")
self.provider = (provider or "azure").lower()
# Use config from config.py if not specified
if config is None and CONFIG_FILE_FOUND:
if self.provider == "azure":
self.config = AZURE_CONFIG
elif self.provider == "claude":
self.config = CLAUDE_CONFIG
elif self.provider == "copilot":
self.config = COPILOT_CONFIG
else:
self.config = {}
print(f"π Using configuration from config.py")
else:
self.config = config or {}
self.mcp_client = DirectMCPClient()
self.conversation_history = []
self.llm_client = None
self.scan_results = [] # Store scan results for reporting
# Initialize provider
self._init_provider()
def _init_provider(self):
"""Initialize the selected LLM provider"""
print(f"π€ Initializing {self.provider.upper()} provider...")
if self.provider == "azure":
self._init_azure()
elif self.provider == "claude":
self._init_claude()
elif self.provider == "copilot":
self._init_copilot()
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def _init_azure(self):
"""Initialize Azure OpenAI"""
try:
from openai import AzureOpenAI
self.llm_client = AzureOpenAI(
api_key=self.config.get("api_key") or os.getenv("AZURE_OPENAI_API_KEY"),
api_version=self.config.get("api_version", "2024-02-15-preview"),
azure_endpoint=self.config.get("endpoint") or os.getenv("AZURE_OPENAI_ENDPOINT")
)
self.deployment_name = self.config.get("deployment_name") or os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
print(f"β
Azure OpenAI initialized (deployment: {self.deployment_name})")
except ImportError:
print("β Azure OpenAI library not installed. Run: pip install openai")
raise
except Exception as e:
print(f"β Azure initialization failed: {e}")
raise
def _init_claude(self):
"""Initialize Claude (Anthropic)"""
try:
from anthropic import Anthropic
api_key = self.config.get("api_key") or os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("Claude API key not found in config.py or ANTHROPIC_API_KEY environment variable")
self.llm_client = Anthropic(api_key=api_key)
self.model_name = self.config.get("model", "claude-sonnet-4-20250514")
print(f"β
Claude initialized (model: {self.model_name})")
except ImportError:
print("β Anthropic library not installed. Run: pip install anthropic")
raise
except Exception as e:
print(f"β Claude initialization failed: {e}")
raise
def _init_copilot(self):
"""Initialize GitHub Copilot"""
try:
from openai import OpenAI
self.llm_client = OpenAI(
api_key=self.config.get("api_key") or os.getenv("GITHUB_TOKEN"),
base_url=self.config.get("base_url", "https://api.githubcopilot.com")
)
self.model_name = self.config.get("model", "gpt-4")
print(f"β
Copilot initialized (model: {self.model_name})")
except ImportError:
print("β OpenAI library not installed. Run: pip install openai")
raise
except Exception as e:
print(f"β Copilot initialization failed: {e}")
raise
def _get_enhanced_system_prompt(self) -> str:
"""Get enhanced system prompt for detailed security analysis"""
return """You are MCPPentestBOT, an expert AI security analyst and penetration tester with deep knowledge of:
- Network security and vulnerability assessment
- SSL/TLS protocols and certificate validation
- Web application security (OWASP Top 10)
- Security headers and best practices
- Risk assessment and threat modeling
- Security compliance frameworks (PCI-DSS, HIPAA, ISO 27001)
When analyzing security scan results, you MUST provide:
1. **EXECUTIVE SUMMARY**
- Brief overview of findings
- Overall security posture (Critical/High/Medium/Low)
- Key recommendations
2. **DETAILED FINDINGS**
For EACH finding, include:
- **Vulnerability/Issue Name**: Clear, specific title
- **Severity**: Critical/High/Medium/Low/Info
- **Description**: What was found and why it matters
- **Risk**: Potential impact if exploited
- **Evidence**: Specific details from scan results
- **Affected Component**: Port, service, URL, etc.
3. **RISK ASSESSMENT**
- **Likelihood**: How easy is it to exploit?
- **Impact**: What damage could occur?
- **Overall Risk Score**: Combine likelihood Γ impact
4. **REMEDIATION RECOMMENDATIONS**
- **Immediate Actions**: Critical fixes needed now
- **Short-term Fixes**: To implement within 30 days
- **Long-term Improvements**: Strategic security enhancements
- **Specific Steps**: How to fix each issue
- **Testing Verification**: How to verify fixes work
5. **COMPLIANCE IMPACT**
- Which standards are affected (PCI-DSS, HIPAA, etc.)
- Compliance gaps identified
- Audit implications
6. **ADDITIONAL CONTEXT**
- Industry best practices
- Related vulnerabilities to check
- References (CVE, CWE, OWASP links)
FORMATTING RULES:
- Use clear headers (###) and bullet points
- **Bold** important terms
- Use emojis for severity: π΄ Critical, π High, π‘ Medium, π΅ Low, βͺ Info
- Include specific evidence and examples
- Provide actionable, not generic, recommendations
- Explain technical concepts clearly for both technical and non-technical audiences
Available security testing tools:
- **http_headers**: Security headers validation
- **nmap_scan**: Port scanning and service detection
- **ssl_check**: SSL/TLS certificate analysis
- **testssl_scan**: Comprehensive SSL/TLS security scan
- **nikto_scan**: Web vulnerability scanning
- **ping_sweep**: Network discovery
- **port_check**: Port connectivity testing
When a user requests a scan:
1. Execute appropriate tool(s)
2. Analyze results thoroughly
3. Provide comprehensive security report as described above
4. Suggest additional scans if needed
IMPORTANT: Only scan systems the user owns or has explicit permission to test. Always remind users of this.
"""
async def start(self):
"""Start the security analysis client"""
print()
print("=" * 60)
print("π‘οΈ MCPPentestBOT - Professional Security Analysis Platform")
print("=" * 60)
print()
# Initialize MCP directly (no subprocess)
if not await self.mcp_client.initialize():
raise RuntimeError("Failed to initialize MCP tools")
# Load tools
tools = self.mcp_client.list_tools()
print()
print(f"π¦ Available {len(tools)} security tools:")
for tool in tools:
print(f" β’ {tool.name}")
print()
print("=" * 60)
print("π¬ Ready for security analysis! Ask questions or request scans.")
print()
print(" π Commands:")
print(" - 'Scan [target]' - Comprehensive security scan")
print(" - 'Generate report for [target]' - Detailed security report")
print(" - 'Assess risk for [finding]' - Risk analysis")
print(" - 'help' - Show available commands")
print(" - 'quit' - Exit")
print("=" * 60)
print()
async def chat(self, user_message: str) -> str:
"""Send a message and get detailed security analysis"""
if self.provider == "azure" or self.provider == "copilot":
return await self._chat_azure(user_message)
elif self.provider == "claude":
return await self._chat_claude(user_message)
async def _chat_azure(self, user_message: str) -> str:
"""Chat using Azure OpenAI with enhanced security analysis"""
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": user_message
})
# Get tool schemas
tools = self.mcp_client.get_tool_schemas_for_llm("azure")
# Enhanced system message
system_message = {
"role": "system",
"content": self._get_enhanced_system_prompt()
}
try:
# Call LLM with enhanced prompting
response = self.llm_client.chat.completions.create(
model=self.deployment_name if self.provider == "azure" else self.model_name,
messages=[system_message] + self.conversation_history,
tools=tools,
tool_choice="auto",
temperature=0.7,
max_tokens=4000
)
assistant_message = response.choices[0].message
# Check if tool was called
if assistant_message.tool_calls:
# Execute tools and collect results
tool_results = []
tool_outputs = []
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
print(f"\nπ§ Executing: {tool_name}({tool_args})")
# Call tool via direct MCP
result, is_error = await self.mcp_client.call_tool(tool_name, tool_args)
# Store for reporting
tool_outputs.append({
"tool": tool_name,
"args": tool_args,
"result": result,
"error": is_error,
"timestamp": datetime.now().isoformat()
})
tool_results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
# Store scan results
self.scan_results.extend(tool_outputs)
# Add assistant message with tool calls
self.conversation_history.append({
"role": "assistant",
"content": assistant_message.content,
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
} for tc in assistant_message.tool_calls
]
})
# Add tool results
self.conversation_history.extend(tool_results)
# Get final detailed analysis
print("\nπ Analyzing results...")
final_response = self.llm_client.chat.completions.create(
model=self.deployment_name if self.provider == "azure" else self.model_name,
messages=[system_message] + self.conversation_history + [{
"role": "user",
"content": """Please provide a COMPREHENSIVE SECURITY ANALYSIS following this structure:
1. EXECUTIVE SUMMARY (2-3 sentences)
2. DETAILED FINDINGS (for each vulnerability/issue found)
3. RISK ASSESSMENT (likelihood, impact, overall risk)
4. REMEDIATION RECOMMENDATIONS (immediate, short-term, long-term)
5. COMPLIANCE IMPACT (if applicable)
6. ADDITIONAL RECOMMENDATIONS
Use clear formatting, severity indicators (π΄π π‘π΅βͺ), and provide specific, actionable advice."""
}],
temperature=0.7,
max_tokens=4000
)
final_message = final_response.choices[0].message.content
self.conversation_history.append({
"role": "assistant",
"content": final_message
})
return final_message
else:
# No tool call, just text response
content = assistant_message.content
self.conversation_history.append({
"role": "assistant",
"content": content
})
return content
except Exception as e:
error_msg = f"Error communicating with LLM: {str(e)}"
print(f"β {error_msg}")
return error_msg
async def _chat_claude(self, user_message: str) -> str:
"""Chat using Claude with enhanced security analysis"""
# Get tool schemas in Claude format
tools = self.mcp_client.get_tool_schemas_for_llm("claude")
# Build messages for Claude
messages = self.conversation_history + [{
"role": "user",
"content": user_message
}]
try:
# Call Claude with tools
response = self.llm_client.messages.create(
model=self.model_name,
max_tokens=4096,
system=self._get_enhanced_system_prompt(),
messages=messages,
tools=tools
)
# Process response
assistant_message_content = []
tool_results = []
# Check if Claude wants to use tools
for block in response.content:
if block.type == "text":
assistant_message_content.append({
"type": "text",
"text": block.text
})
elif block.type == "tool_use":
# Execute tool
tool_name = block.name
tool_input = block.input
print(f"\nπ§ Executing: {tool_name}({tool_input})")
# Call tool via MCP
result, is_error = await self.mcp_client.call_tool(tool_name, tool_input)
# Store for reporting
self.scan_results.append({
"tool": tool_name,
"args": tool_input,
"result": result,
"error": is_error,
"timestamp": datetime.now().isoformat()
})
# Add tool use to message
assistant_message_content.append({
"type": "tool_use",
"id": block.id,
"name": tool_name,
"input": tool_input
})
# Prepare tool result
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result
})
# Add assistant message to history
self.conversation_history.append({
"role": "assistant",
"content": assistant_message_content
})
# If we used tools, get final analysis
if tool_results:
# Add tool results to conversation
self.conversation_history.append({
"role": "user",
"content": tool_results
})
# Get final analysis
print("\nπ Analyzing results...")
final_response = self.llm_client.messages.create(
model=self.model_name,
max_tokens=4096,
system=self._get_enhanced_system_prompt(),
messages=self.conversation_history + [{
"role": "user",
"content": """Please provide a COMPREHENSIVE SECURITY ANALYSIS following this structure:
1. EXECUTIVE SUMMARY (2-3 sentences)
2. DETAILED FINDINGS (for each vulnerability/issue found)
3. RISK ASSESSMENT (likelihood, impact, overall risk)
4. REMEDIATION RECOMMENDATIONS (immediate, short-term, long-term)
5. COMPLIANCE IMPACT (if applicable)
6. ADDITIONAL RECOMMENDATIONS
Use clear formatting, severity indicators (π΄π π‘π΅βͺ), and provide specific, actionable advice."""
}]
)
# Extract text from response
final_text = ""
for block in final_response.content:
if block.type == "text":
final_text += block.text
# Add to history
self.conversation_history.append({
"role": "assistant",
"content": [{"type": "text", "text": final_text}]
})
return final_text
else:
# No tools used, just return text response
text_response = ""
for block in response.content:
if block.type == "text":
text_response += block.text
self.conversation_history.append({
"role": "user",
"content": user_message
})
return text_response
except Exception as e:
error_msg = f"Error communicating with Claude: {str(e)}"
print(f"β {error_msg}")
return error_msg
async def generate_report(self, target: str = None) -> str:
"""Generate a comprehensive security report"""
if not self.scan_results:
return "No scan results available. Please run some security scans first."
# Generate report using LLM
report_prompt = f"""Generate a COMPREHENSIVE PROFESSIONAL SECURITY REPORT based on all scan results collected.
Target: {target or 'Multiple targets'}
Scan Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Include:
1. **EXECUTIVE SUMMARY**
2. **SCOPE AND METHODOLOGY**
3. **DETAILED FINDINGS** (all vulnerabilities with severity, risk, evidence)
4. **RISK ASSESSMENT MATRIX**
5. **PRIORITIZED REMEDIATION ROADMAP**
6. **COMPLIANCE GAP ANALYSIS**
7. **CONCLUSIONS AND RECOMMENDATIONS**
Format as a professional penetration testing report."""
return await self.chat(report_prompt)
async def run_interactive(self):
"""Run interactive security analysis session"""
await self.start()
try:
while True:
# Get user input
user_input = input("\nπ€ You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'bye']:
print("\nπ Goodbye! Stay secure!")
break
if user_input.lower() == 'help':
self._show_help()
continue
if user_input.lower().startswith('report'):
# Generate comprehensive report
print("\nπ Generating comprehensive security report...")
response = await self.generate_report()
print("\n" + response)
continue
# Get response with detailed analysis
print("\nπ€ Security Analyst: ", end="", flush=True)
response = await self.chat(user_input)
print(response)
except KeyboardInterrupt:
print("\n\nπ Interrupted. Stay secure!")
def _show_help(self):
"""Show available commands"""
help_text = """
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MCPPentestBOT - Available Commands β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π SCANNING COMMANDS:
β’ "Scan [target]" - Comprehensive security scan
β’ "Check SSL for [target]" - SSL/TLS analysis
β’ "Scan [target] for open ports" - Port scanning
β’ "Check security headers for [target]" - HTTP headers
π REPORTING COMMANDS:
β’ "report" - Generate comprehensive security report
β’ "summarize findings" - Quick summary of all findings
β’ "risk assessment for [target]" - Detailed risk analysis
π ANALYSIS COMMANDS:
β’ "Explain [vulnerability]" - Get detailed explanation
β’ "How to fix [issue]" - Remediation guidance
β’ "What is the risk of [finding]?" - Risk assessment
π‘ GENERAL:
β’ "help" - Show this help
β’ "quit" - Exit the program
EXAMPLES:
π€ "Scan github.com comprehensively"
π€ "Generate a security report"
π€ "What's the risk of missing HSTS header?"
π€ "How do I fix SSL certificate issues?"
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
print(help_text)
async def main_async():
"""Async main entry point"""
import argparse
parser = argparse.ArgumentParser(description="MCPPentestBOT - Professional Security Analysis")
parser.add_argument(
"--provider",
choices=["azure", "claude", "copilot"],
default=None,
help="LLM provider to use (optional if config.py exists)"
)
args = parser.parse_args()
# If config.py exists, use it
if CONFIG_FILE_FOUND:
print("=" * 60)
print("π Using configuration from config.py")
print("=" * 60)
print()
try:
client = SecurityAnalysisClient(provider=args.provider)
await client.run_interactive()
except Exception as e:
print(f"β Failed to start: {e}")
print("\nCheck your config.py file for correct credentials")
else:
print("β config.py not found")
print("\nPlease create a config.py file with your credentials")
def main():
"""Main entry point"""
asyncio.run(main_async())
if __name__ == "__main__":
main()