container_guard_mcp.py•10.7 kB
#!/usr/bin/env python
# encoding: utf-8
# @author: rockmelodies
# @license: (C) Copyright 2013-2024, 360 Corporation Limited.
# @contact: rockysocket@gmail.com
# @software: garner
# @file: container_guard_mcp.py.py
# @time: 2025/9/21 08:20
# @desc:
# !/usr/bin/env python3
"""
ContainerGuard MCP - 容器安全监控与管理MCP服务
提供远程容器管理、安全日志分析和事件监控功能
"""
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import Context
import docker
import paramiko
import json
import asyncio
from typing import Dict, List, Any, Optional, Tuple
import subprocess
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ContainerGuard")
# 初始化MCP服务器 - 移除version参数
mcp = FastMCP("ContainerGuard")
class ContainerManager:
"""容器管理器"""
def __init__(self):
self.docker_client = None
self.ssh_clients: Dict[str, paramiko.SSHClient] = {}
self.connected_containers: Dict[str, Any] = {}
async def connect_to_docker(self, host: str, port: int = 2375,
tls_verify: bool = False, cert_path: str = None) -> Tuple[bool, str]:
"""连接到Docker守护进程"""
try:
base_url = f"tcp://{host}:{port}"
if tls_verify:
if not cert_path:
return False, "TLS验证需要证书路径"
try:
tls_config = docker.tls.TLSConfig(
ca_cert=f"{cert_path}/ca.pem",
client_cert=(f"{cert_path}/cert.pem", f"{cert_path}/key.pem"),
verify=True
)
self.docker_client = docker.DockerClient(base_url=base_url, tls=tls_config)
except Exception as e:
return False, f"TLS配置失败: {str(e)}"
else:
self.docker_client = docker.DockerClient(base_url=base_url)
# 测试连接
self.docker_client.ping()
logger.info(f"成功连接到Docker守护进程: {host}:{port}")
return True, "连接成功"
except Exception as e:
error_msg = f"连接Docker失败: {str(e)}"
logger.error(error_msg)
return False, error_msg
async def connect_via_ssh(self, host: str, port: int, username: str,
password: str = None, key_path: str = None) -> Tuple[bool, str]:
"""通过SSH连接到远程主机"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if key_path:
try:
private_key = paramiko.RSAKey.from_private_key_file(key_path)
ssh.connect(hostname=host, port=port, username=username, pkey=private_key)
except Exception as e:
return False, f"SSH密钥认证失败: {str(e)}"
elif password:
ssh.connect(hostname=host, port=port, username=username, password=password)
else:
return False, "需要密码或密钥文件进行SSH认证"
self.ssh_clients[host] = ssh
logger.info(f"成功通过SSH连接到主机: {host}:{port}")
return True, "SSH连接成功"
except Exception as e:
error_msg = f"SSH连接失败: {str(e)}"
logger.error(error_msg)
return False, error_msg
async def get_containers(self, all_containers: bool = False) -> List[Dict]:
"""获取容器列表"""
if not self.docker_client:
return [{"error": "未连接到Docker守护进程"}]
try:
containers = self.docker_client.containers.list(all=all_containers)
result = []
for container in containers:
result.append({
'id': container.id[:12],
'name': container.name,
'status': container.status,
'image': container.image.tags[0] if container.image.tags else 'unknown',
'created': container.attrs['Created']
})
return result
except Exception as e:
logger.error(f"获取容器列表失败: {str(e)}")
return [{"error": f"获取容器列表失败: {str(e)}"}]
async def get_container_logs(self, container_id: str, tail: int = 100) -> str:
"""获取容器日志"""
if not self.docker_client:
return "未连接到Docker守护进程"
try:
container = self.docker_client.containers.get(container_id)
logs = container.logs(tail=tail).decode('utf-8', errors='ignore')
return logs
except Exception as e:
return f"获取日志失败: {str(e)}"
async def execute_in_container(self, container_id: str, command: str) -> str:
"""在容器中执行命令"""
if not self.docker_client:
return "未连接到Docker守护进程"
try:
container = self.docker_client.containers.get(container_id)
result = container.exec_run(command)
output = result.output.decode('utf-8', errors='ignore')
return output if result.exit_code == 0 else f"命令执行失败 (退出码: {result.exit_code}): {output}"
except Exception as e:
return f"执行命令失败: {str(e)}"
async def analyze_security_logs(self, container_id: str) -> Dict:
"""分析容器安全日志"""
security_checks = {
'suspicious_processes': 'ps aux',
'network_connections': 'netstat -tuln',
'user_accounts': 'cat /etc/passwd',
'cron_jobs': 'crontab -l 2>/dev/null || echo "No cron jobs"',
'setuid_files': 'find / -perm -4000 -type f 2>/dev/null | head -20'
}
results = {}
for check_name, command in security_checks.items():
try:
result = await self.execute_in_container(container_id, command)
results[check_name] = result
except Exception as e:
results[check_name] = f"检查失败: {str(e)}"
return results
async def get_container_stats(self, container_id: str) -> Dict:
"""获取容器资源使用统计"""
if not self.docker_client:
return {"error": "未连接到Docker守护进程"}
try:
container = self.docker_client.containers.get(container_id)
stats = container.stats(stream=False)
return {
'cpu_usage': stats['cpu_stats']['cpu_usage']['total_usage'],
'memory_usage': stats['memory_stats']['usage'],
'memory_limit': stats['memory_stats']['limit'],
'network_io': stats['networks']
}
except Exception as e:
return {"error": f"获取统计信息失败: {str(e)}"}
# 全局容器管理器实例
container_manager = ContainerManager()
@mcp.tool()
async def connect_docker(
host: str,
port: int = 2375,
tls_verify: bool = False,
cert_path: Optional[str] = None,
ctx: Context = None
) -> str:
"""连接到远程Docker守护进程"""
success, message = await container_manager.connect_to_docker(host, port, tls_verify, cert_path)
return message
@mcp.tool()
async def connect_ssh(
host: str,
username: str,
port: int = 22,
password: Optional[str] = None,
key_path: Optional[str] = None,
ctx: Context = None
) -> str:
"""通过SSH连接到远程主机"""
success, message = await container_manager.connect_via_ssh(host, port, username, password, key_path)
return message
@mcp.tool()
async def list_containers(
all_containers: bool = False,
ctx: Context = None
) -> List[Dict]:
"""获取容器列表(包括停止的容器)"""
return await container_manager.get_containers(all_containers)
@mcp.tool()
async def get_container_logs(
container_id: str,
tail: int = 100,
ctx: Context = None
) -> str:
"""获取指定容器的日志"""
return await container_manager.get_container_logs(container_id, tail)
@mcp.tool()
async def execute_command(
container_id: str,
command: str,
ctx: Context = None
) -> str:
"""在容器中执行命令"""
return await container_manager.execute_in_container(container_id, command)
@mcp.tool()
async def security_analysis(
container_id: str,
ctx: Context = None
) -> Dict:
"""对容器进行安全分析"""
return await container_manager.analyze_security_logs(container_id)
@mcp.tool()
async def get_container_stats(
container_id: str,
ctx: Context = None
) -> Dict:
"""获取容器资源使用统计"""
return await container_manager.get_container_stats(container_id)
@mcp.tool()
async def check_container_security(
container_id: str,
ctx: Context = None
) -> Dict:
"""全面检查容器安全性"""
results = {}
# 安全检查项目
checks = [
("运行进程", "ps aux"),
("网络连接", "netstat -tuln"),
("文件权限", "find / -perm -4000 -type f 2>/dev/null | head -20"),
("敏感文件", "ls -la /etc/passwd /etc/shadow /etc/sudoers 2>/dev/null"),
("环境变量", "env"),
("安装的软件包", "dpkg -l 2>/dev/null || rpm -qa 2>/dev/null || apk info 2>/dev/null")
]
for check_name, command in checks:
try:
result = await container_manager.execute_in_container(container_id, command)
results[check_name] = result
except Exception as e:
results[check_name] = f"检查失败: {str(e)}"
return results
@mcp.tool()
async def get_system_info(
container_id: str,
ctx: Context = None
) -> Dict:
"""获取容器系统信息"""
info_commands = {
"os_info": "cat /etc/os-release",
"kernel_version": "uname -a",
"disk_usage": "df -h",
"memory_info": "free -h",
"uptime": "uptime"
}
results = {}
for key, command in info_commands.items():
try:
result = await container_manager.execute_in_container(container_id, command)
results[key] = result
except Exception as e:
results[key] = f"获取信息失败: {str(e)}"
return results
if __name__ == "__main__":
print("开始启动服务")
# 运行MCP服务器
mcp.run(transport='stdio')