Skip to main content
Glama

ContainerGuard MCP

by rockmelodies
container_guard_mcp.py10.7 kB
#!/usr/bin/env python # encoding: utf-8 # @author: rockmelodies # @license: (C) Copyright 2013-2024, 360 Corporation Limited. # @contact: rockysocket@gmail.com # @software: garner # @file: container_guard_mcp.py.py # @time: 2025/9/21 08:20 # @desc: # !/usr/bin/env python3 """ ContainerGuard MCP - 容器安全监控与管理MCP服务 提供远程容器管理、安全日志分析和事件监控功能 """ from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import Context import docker import paramiko import json import asyncio from typing import Dict, List, Any, Optional, Tuple import subprocess import logging from datetime import datetime # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ContainerGuard") # 初始化MCP服务器 - 移除version参数 mcp = FastMCP("ContainerGuard") class ContainerManager: """容器管理器""" def __init__(self): self.docker_client = None self.ssh_clients: Dict[str, paramiko.SSHClient] = {} self.connected_containers: Dict[str, Any] = {} async def connect_to_docker(self, host: str, port: int = 2375, tls_verify: bool = False, cert_path: str = None) -> Tuple[bool, str]: """连接到Docker守护进程""" try: base_url = f"tcp://{host}:{port}" if tls_verify: if not cert_path: return False, "TLS验证需要证书路径" try: tls_config = docker.tls.TLSConfig( ca_cert=f"{cert_path}/ca.pem", client_cert=(f"{cert_path}/cert.pem", f"{cert_path}/key.pem"), verify=True ) self.docker_client = docker.DockerClient(base_url=base_url, tls=tls_config) except Exception as e: return False, f"TLS配置失败: {str(e)}" else: self.docker_client = docker.DockerClient(base_url=base_url) # 测试连接 self.docker_client.ping() logger.info(f"成功连接到Docker守护进程: {host}:{port}") return True, "连接成功" except Exception as e: error_msg = f"连接Docker失败: {str(e)}" logger.error(error_msg) return False, error_msg async def connect_via_ssh(self, host: str, port: int, username: str, password: str = None, key_path: str = None) -> Tuple[bool, str]: """通过SSH连接到远程主机""" try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if key_path: try: private_key = paramiko.RSAKey.from_private_key_file(key_path) ssh.connect(hostname=host, port=port, username=username, pkey=private_key) except Exception as e: return False, f"SSH密钥认证失败: {str(e)}" elif password: ssh.connect(hostname=host, port=port, username=username, password=password) else: return False, "需要密码或密钥文件进行SSH认证" self.ssh_clients[host] = ssh logger.info(f"成功通过SSH连接到主机: {host}:{port}") return True, "SSH连接成功" except Exception as e: error_msg = f"SSH连接失败: {str(e)}" logger.error(error_msg) return False, error_msg async def get_containers(self, all_containers: bool = False) -> List[Dict]: """获取容器列表""" if not self.docker_client: return [{"error": "未连接到Docker守护进程"}] try: containers = self.docker_client.containers.list(all=all_containers) result = [] for container in containers: result.append({ 'id': container.id[:12], 'name': container.name, 'status': container.status, 'image': container.image.tags[0] if container.image.tags else 'unknown', 'created': container.attrs['Created'] }) return result except Exception as e: logger.error(f"获取容器列表失败: {str(e)}") return [{"error": f"获取容器列表失败: {str(e)}"}] async def get_container_logs(self, container_id: str, tail: int = 100) -> str: """获取容器日志""" if not self.docker_client: return "未连接到Docker守护进程" try: container = self.docker_client.containers.get(container_id) logs = container.logs(tail=tail).decode('utf-8', errors='ignore') return logs except Exception as e: return f"获取日志失败: {str(e)}" async def execute_in_container(self, container_id: str, command: str) -> str: """在容器中执行命令""" if not self.docker_client: return "未连接到Docker守护进程" try: container = self.docker_client.containers.get(container_id) result = container.exec_run(command) output = result.output.decode('utf-8', errors='ignore') return output if result.exit_code == 0 else f"命令执行失败 (退出码: {result.exit_code}): {output}" except Exception as e: return f"执行命令失败: {str(e)}" async def analyze_security_logs(self, container_id: str) -> Dict: """分析容器安全日志""" security_checks = { 'suspicious_processes': 'ps aux', 'network_connections': 'netstat -tuln', 'user_accounts': 'cat /etc/passwd', 'cron_jobs': 'crontab -l 2>/dev/null || echo "No cron jobs"', 'setuid_files': 'find / -perm -4000 -type f 2>/dev/null | head -20' } results = {} for check_name, command in security_checks.items(): try: result = await self.execute_in_container(container_id, command) results[check_name] = result except Exception as e: results[check_name] = f"检查失败: {str(e)}" return results async def get_container_stats(self, container_id: str) -> Dict: """获取容器资源使用统计""" if not self.docker_client: return {"error": "未连接到Docker守护进程"} try: container = self.docker_client.containers.get(container_id) stats = container.stats(stream=False) return { 'cpu_usage': stats['cpu_stats']['cpu_usage']['total_usage'], 'memory_usage': stats['memory_stats']['usage'], 'memory_limit': stats['memory_stats']['limit'], 'network_io': stats['networks'] } except Exception as e: return {"error": f"获取统计信息失败: {str(e)}"} # 全局容器管理器实例 container_manager = ContainerManager() @mcp.tool() async def connect_docker( host: str, port: int = 2375, tls_verify: bool = False, cert_path: Optional[str] = None, ctx: Context = None ) -> str: """连接到远程Docker守护进程""" success, message = await container_manager.connect_to_docker(host, port, tls_verify, cert_path) return message @mcp.tool() async def connect_ssh( host: str, username: str, port: int = 22, password: Optional[str] = None, key_path: Optional[str] = None, ctx: Context = None ) -> str: """通过SSH连接到远程主机""" success, message = await container_manager.connect_via_ssh(host, port, username, password, key_path) return message @mcp.tool() async def list_containers( all_containers: bool = False, ctx: Context = None ) -> List[Dict]: """获取容器列表(包括停止的容器)""" return await container_manager.get_containers(all_containers) @mcp.tool() async def get_container_logs( container_id: str, tail: int = 100, ctx: Context = None ) -> str: """获取指定容器的日志""" return await container_manager.get_container_logs(container_id, tail) @mcp.tool() async def execute_command( container_id: str, command: str, ctx: Context = None ) -> str: """在容器中执行命令""" return await container_manager.execute_in_container(container_id, command) @mcp.tool() async def security_analysis( container_id: str, ctx: Context = None ) -> Dict: """对容器进行安全分析""" return await container_manager.analyze_security_logs(container_id) @mcp.tool() async def get_container_stats( container_id: str, ctx: Context = None ) -> Dict: """获取容器资源使用统计""" return await container_manager.get_container_stats(container_id) @mcp.tool() async def check_container_security( container_id: str, ctx: Context = None ) -> Dict: """全面检查容器安全性""" results = {} # 安全检查项目 checks = [ ("运行进程", "ps aux"), ("网络连接", "netstat -tuln"), ("文件权限", "find / -perm -4000 -type f 2>/dev/null | head -20"), ("敏感文件", "ls -la /etc/passwd /etc/shadow /etc/sudoers 2>/dev/null"), ("环境变量", "env"), ("安装的软件包", "dpkg -l 2>/dev/null || rpm -qa 2>/dev/null || apk info 2>/dev/null") ] for check_name, command in checks: try: result = await container_manager.execute_in_container(container_id, command) results[check_name] = result except Exception as e: results[check_name] = f"检查失败: {str(e)}" return results @mcp.tool() async def get_system_info( container_id: str, ctx: Context = None ) -> Dict: """获取容器系统信息""" info_commands = { "os_info": "cat /etc/os-release", "kernel_version": "uname -a", "disk_usage": "df -h", "memory_info": "free -h", "uptime": "uptime" } results = {} for key, command in info_commands.items(): try: result = await container_manager.execute_in_container(container_id, command) results[key] = result except Exception as e: results[key] = f"获取信息失败: {str(e)}" return results if __name__ == "__main__": print("开始启动服务") # 运行MCP服务器 mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rockmelodies/ContainerGuardMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server