port_check.py•3.18 kB
"""
Port Check Tool
Simple port connectivity testing
"""
from typing import Dict, Any, List
from .base_tool import BaseTool
import socket
class PortCheckTool(BaseTool):
"""Port connectivity testing tool"""
def __init__(self):
super().__init__()
self.name = "port_check"
self.description = "Simple port connectivity testing. Checks if specific ports are open/accessible on a target host."
def get_tool_definition(self) -> Dict[str, Any]:
"""Return MCP-compatible tool definition"""
return {
"name": self.name,
"description": self.description,
"inputSchema": {
"type": "object",
"properties": {
"target": {
"type": "string",
"description": "Target host or IP address (e.g., example.com or 192.168.1.1)"
},
"ports": {
"type": "string",
"description": "Comma-separated list of ports to check (e.g., '22,80,443' or '80')"
}
},
"required": ["target", "ports"]
}
}
async def execute(self, arguments: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Execute port check"""
try:
target = arguments.get("target", "")
ports_str = arguments.get("ports", "")
if not target:
return self.format_error("Target is required")
if not ports_str:
return self.format_error("Ports are required")
# Parse ports
try:
ports = [int(p.strip()) for p in ports_str.split(",")]
except ValueError:
return self.format_error("Invalid port format. Use comma-separated numbers (e.g., '22,80,443')")
# Check each port
results = []
for port in ports:
status = self._check_port(target, port)
results.append(f"Port {port}/tcp: {status}")
formatted = f"✅ Port Check Results for {target}\n{'='*60}\n"
formatted += "\n".join(results)
formatted += f"\n{'='*60}"
return self.format_success(formatted)
except Exception as e:
return self.format_error(f"Execution failed: {str(e)}")
def _check_port(self, host: str, port: int, timeout: int = 2) -> str:
"""Check if a port is open"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return "✅ OPEN"
else:
return "❌ CLOSED"
except socket.gaierror:
return "❌ Hostname resolution failed"
except socket.timeout:
return "⚠️ TIMEOUT"
except Exception as e:
return f"❌ ERROR: {str(e)}"
tool_instance = PortCheckTool()