Skip to main content
Glama

MCP Kali Pentest

by Root1856
lm_studio_client.pyβ€’9.73 kB
""" LM Studio Client for AI-powered decision making Integrates with local LM Studio instance for autonomous pentesting """ import aiohttp import json import logging from typing import Dict, List, Optional, Any logger = logging.getLogger(__name__) class LMStudioClient: """Client for interacting with LM Studio local LLM""" def __init__(self, base_url: str = "http://localhost:1234/v1", model: str = "local-model"): self.base_url = base_url self.model = model self.session: Optional[aiohttp.ClientSession] = None self.connected = False async def connect(self) -> bool: """Connect to LM Studio instance""" try: self.session = aiohttp.ClientSession() # Test connection async with self.session.get(f"{self.base_url}/models") as response: if response.status == 200: models = await response.json() logger.info(f"Connected to LM Studio. Available models: {models}") self.connected = True return True else: logger.error(f"Failed to connect to LM Studio: {response.status}") return False except Exception as e: logger.error(f"Error connecting to LM Studio: {e}") return False async def chat_completion( self, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: int = 2000 ) -> str: """Send chat completion request to LM Studio""" if not self.connected or not self.session: raise RuntimeError("Not connected to LM Studio") try: payload = { "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False } async with self.session.post( f"{self.base_url}/chat/completions", json=payload, timeout=aiohttp.ClientTimeout(total=120) ) as response: if response.status == 200: data = await response.json() return data["choices"][0]["message"]["content"] else: error_text = await response.text() logger.error(f"LM Studio API error: {error_text}") raise RuntimeError(f"LM Studio API error: {response.status}") except asyncio.TimeoutError: logger.error("LM Studio request timed out") raise RuntimeError("Request timed out") except Exception as e: logger.error(f"Error in chat completion: {e}") raise async def analyze_scan_results( self, scan_type: str, results: Dict[str, Any], context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Analyze scan results and provide insights""" system_prompt = """You are an expert penetration tester and security analyst. Analyze the provided scan results and provide: 1. Summary of findings 2. Risk assessment (Critical, High, Medium, Low) 3. Suggested next steps 4. Potential exploitation paths 5. Remediation recommendations Respond in JSON format with these fields.""" user_prompt = f"""Scan Type: {scan_type} Results: {json.dumps(results, indent=2)} {f'Context: {json.dumps(context, indent=2)}' if context else ''} Provide your analysis in JSON format.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] response = await self.chat_completion(messages, temperature=0.3) try: # Parse JSON response analysis = json.loads(response) return analysis except json.JSONDecodeError: # If response is not JSON, wrap it return { "analysis": response, "parsed": False } async def suggest_next_action( self, current_findings: List[Dict[str, Any]], completed_scans: List[str], target_info: Dict[str, Any], rules_of_engagement: Dict[str, Any] ) -> Dict[str, Any]: """Suggest next penetration testing action""" system_prompt = """You are an autonomous penetration testing AI. Based on current findings and completed scans, suggest the next best action. Consider: - What information has been gathered - What vulnerabilities have been found - What tools should be used next - Risk vs reward of different approaches - Rules of engagement constraints Respond in JSON format with: { "recommended_action": "tool_name", "reasoning": "why this action", "parameters": {parameters for the tool}, "expected_outcome": "what we expect to find", "risk_level": "low/medium/high", "alternatives": [list of alternative actions] }""" user_prompt = f"""Current Findings: {json.dumps(current_findings, indent=2)} Completed Scans: {json.dumps(completed_scans, indent=2)} Target Information: {json.dumps(target_info, indent=2)} Rules of Engagement: {json.dumps(rules_of_engagement, indent=2)} What should we do next?""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] response = await self.chat_completion(messages, temperature=0.5) try: suggestion = json.loads(response) return suggestion except json.JSONDecodeError: return { "recommended_action": "manual_review", "reasoning": response, "parsed": False } async def create_attack_plan( self, target: str, scope: List[str], objectives: List[str], constraints: Dict[str, Any] ) -> Dict[str, Any]: """Create comprehensive penetration testing plan""" system_prompt = """You are a professional penetration tester creating an engagement plan. Create a detailed, phased approach considering: - Reconnaissance (passive and active) - Vulnerability identification - Exploitation planning - Post-exploitation objectives - Cleanup and reporting Provide a structured JSON plan with phases, tools, and methodologies.""" user_prompt = f"""Target: {target} Scope: {json.dumps(scope)} Objectives: {json.dumps(objectives)} Constraints: {json.dumps(constraints)} Create a comprehensive penetration testing plan.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] response = await self.chat_completion(messages, temperature=0.4, max_tokens=3000) try: plan = json.loads(response) return plan except json.JSONDecodeError: # Return structured response anyway return { "plan": response, "parsed": False } async def identify_vulnerabilities( self, service_info: Dict[str, Any] ) -> List[Dict[str, Any]]: """Identify potential vulnerabilities from service information""" system_prompt = """You are a vulnerability research expert. Analyze the provided service information and identify potential vulnerabilities, CVEs, and attack vectors. Consider version numbers, configurations, and known vulnerabilities. Respond with JSON array of vulnerabilities: [ { "name": "vulnerability name", "cve": "CVE-YYYY-XXXX or null", "severity": "critical/high/medium/low", "description": "description", "exploitation_difficulty": "easy/medium/hard", "suggested_tools": ["tool1", "tool2"], "references": ["url1", "url2"] } ]""" user_prompt = f"""Service Information: {json.dumps(service_info, indent=2)} Identify potential vulnerabilities.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] response = await self.chat_completion(messages, temperature=0.2) try: vulnerabilities = json.loads(response) return vulnerabilities if isinstance(vulnerabilities, list) else [vulnerabilities] except json.JSONDecodeError: return [{ "name": "Analysis Error", "description": response, "parsed": False }] async def generate_report_summary( self, findings: List[Dict[str, Any]], target: str, duration: str ) -> str: """Generate executive summary for penetration test report""" system_prompt = """You are a penetration testing report writer. Create a professional executive summary suitable for technical and non-technical audiences. Include: - Overview of engagement - Key findings summary - Risk assessment - Critical recommendations - Overall security posture assessment""" user_prompt = f"""Target: {target} Duration: {duration} Findings: {json.dumps(findings, indent=2)} Generate executive summary.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] response = await self.chat_completion(messages, temperature=0.4, max_tokens=2000) return response async def close(self): """Close the client session""" if self.session: await self.session.close() self.connected = False # Import asyncio for timeout import asyncio

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Root1856/mcpkali'

If you have feedback or need assistance with the MCP directory API, please join our Discord server