lm_studio_client.pyβ’9.73 kB
"""
LM Studio Client for AI-powered decision making
Integrates with local LM Studio instance for autonomous pentesting
"""
import aiohttp
import json
import logging
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
class LMStudioClient:
"""Client for interacting with LM Studio local LLM"""
def __init__(self, base_url: str = "http://localhost:1234/v1", model: str = "local-model"):
self.base_url = base_url
self.model = model
self.session: Optional[aiohttp.ClientSession] = None
self.connected = False
async def connect(self) -> bool:
"""Connect to LM Studio instance"""
try:
self.session = aiohttp.ClientSession()
# Test connection
async with self.session.get(f"{self.base_url}/models") as response:
if response.status == 200:
models = await response.json()
logger.info(f"Connected to LM Studio. Available models: {models}")
self.connected = True
return True
else:
logger.error(f"Failed to connect to LM Studio: {response.status}")
return False
except Exception as e:
logger.error(f"Error connecting to LM Studio: {e}")
return False
async def chat_completion(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: int = 2000
) -> str:
"""Send chat completion request to LM Studio"""
if not self.connected or not self.session:
raise RuntimeError("Not connected to LM Studio")
try:
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": False
}
async with self.session.post(
f"{self.base_url}/chat/completions",
json=payload,
timeout=aiohttp.ClientTimeout(total=120)
) as response:
if response.status == 200:
data = await response.json()
return data["choices"][0]["message"]["content"]
else:
error_text = await response.text()
logger.error(f"LM Studio API error: {error_text}")
raise RuntimeError(f"LM Studio API error: {response.status}")
except asyncio.TimeoutError:
logger.error("LM Studio request timed out")
raise RuntimeError("Request timed out")
except Exception as e:
logger.error(f"Error in chat completion: {e}")
raise
async def analyze_scan_results(
self,
scan_type: str,
results: Dict[str, Any],
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Analyze scan results and provide insights"""
system_prompt = """You are an expert penetration tester and security analyst.
Analyze the provided scan results and provide:
1. Summary of findings
2. Risk assessment (Critical, High, Medium, Low)
3. Suggested next steps
4. Potential exploitation paths
5. Remediation recommendations
Respond in JSON format with these fields."""
user_prompt = f"""Scan Type: {scan_type}
Results:
{json.dumps(results, indent=2)}
{f'Context: {json.dumps(context, indent=2)}' if context else ''}
Provide your analysis in JSON format."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
response = await self.chat_completion(messages, temperature=0.3)
try:
# Parse JSON response
analysis = json.loads(response)
return analysis
except json.JSONDecodeError:
# If response is not JSON, wrap it
return {
"analysis": response,
"parsed": False
}
async def suggest_next_action(
self,
current_findings: List[Dict[str, Any]],
completed_scans: List[str],
target_info: Dict[str, Any],
rules_of_engagement: Dict[str, Any]
) -> Dict[str, Any]:
"""Suggest next penetration testing action"""
system_prompt = """You are an autonomous penetration testing AI.
Based on current findings and completed scans, suggest the next best action.
Consider:
- What information has been gathered
- What vulnerabilities have been found
- What tools should be used next
- Risk vs reward of different approaches
- Rules of engagement constraints
Respond in JSON format with:
{
"recommended_action": "tool_name",
"reasoning": "why this action",
"parameters": {parameters for the tool},
"expected_outcome": "what we expect to find",
"risk_level": "low/medium/high",
"alternatives": [list of alternative actions]
}"""
user_prompt = f"""Current Findings:
{json.dumps(current_findings, indent=2)}
Completed Scans:
{json.dumps(completed_scans, indent=2)}
Target Information:
{json.dumps(target_info, indent=2)}
Rules of Engagement:
{json.dumps(rules_of_engagement, indent=2)}
What should we do next?"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
response = await self.chat_completion(messages, temperature=0.5)
try:
suggestion = json.loads(response)
return suggestion
except json.JSONDecodeError:
return {
"recommended_action": "manual_review",
"reasoning": response,
"parsed": False
}
async def create_attack_plan(
self,
target: str,
scope: List[str],
objectives: List[str],
constraints: Dict[str, Any]
) -> Dict[str, Any]:
"""Create comprehensive penetration testing plan"""
system_prompt = """You are a professional penetration tester creating an engagement plan.
Create a detailed, phased approach considering:
- Reconnaissance (passive and active)
- Vulnerability identification
- Exploitation planning
- Post-exploitation objectives
- Cleanup and reporting
Provide a structured JSON plan with phases, tools, and methodologies."""
user_prompt = f"""Target: {target}
Scope: {json.dumps(scope)}
Objectives: {json.dumps(objectives)}
Constraints: {json.dumps(constraints)}
Create a comprehensive penetration testing plan."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
response = await self.chat_completion(messages, temperature=0.4, max_tokens=3000)
try:
plan = json.loads(response)
return plan
except json.JSONDecodeError:
# Return structured response anyway
return {
"plan": response,
"parsed": False
}
async def identify_vulnerabilities(
self,
service_info: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Identify potential vulnerabilities from service information"""
system_prompt = """You are a vulnerability research expert.
Analyze the provided service information and identify potential vulnerabilities, CVEs, and attack vectors.
Consider version numbers, configurations, and known vulnerabilities.
Respond with JSON array of vulnerabilities:
[
{
"name": "vulnerability name",
"cve": "CVE-YYYY-XXXX or null",
"severity": "critical/high/medium/low",
"description": "description",
"exploitation_difficulty": "easy/medium/hard",
"suggested_tools": ["tool1", "tool2"],
"references": ["url1", "url2"]
}
]"""
user_prompt = f"""Service Information:
{json.dumps(service_info, indent=2)}
Identify potential vulnerabilities."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
response = await self.chat_completion(messages, temperature=0.2)
try:
vulnerabilities = json.loads(response)
return vulnerabilities if isinstance(vulnerabilities, list) else [vulnerabilities]
except json.JSONDecodeError:
return [{
"name": "Analysis Error",
"description": response,
"parsed": False
}]
async def generate_report_summary(
self,
findings: List[Dict[str, Any]],
target: str,
duration: str
) -> str:
"""Generate executive summary for penetration test report"""
system_prompt = """You are a penetration testing report writer.
Create a professional executive summary suitable for technical and non-technical audiences.
Include:
- Overview of engagement
- Key findings summary
- Risk assessment
- Critical recommendations
- Overall security posture assessment"""
user_prompt = f"""Target: {target}
Duration: {duration}
Findings:
{json.dumps(findings, indent=2)}
Generate executive summary."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
response = await self.chat_completion(messages, temperature=0.4, max_tokens=2000)
return response
async def close(self):
"""Close the client session"""
if self.session:
await self.session.close()
self.connected = False
# Import asyncio for timeout
import asyncio