get_container_logs
Retrieve container logs from remote servers via SSH to monitor application behavior and troubleshoot issues by specifying parameters like container name, log tail length, and time range.
Instructions
获取指定容器的日志
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| hostname | Yes | ||
| username | Yes | ||
| password | No | ||
| port | No | ||
| container | No | ||
| tail | No | ||
| since | No | ||
| timeout | No |
Implementation Reference
- Core handler implementation for get_container_logs: connects via SSH, executes 'docker logs' command on remote server, parses and returns logs with metadata.@handle_exceptions def get_container_logs( hostname: str, username: str, password: str = "", port: int = 22, container: str = "", # 容器ID或名称 tail: int = 100, # 获取最后多少行日志 since: str = "", # 从什么时间开始的日志,例如 "2023-01-01T00:00:00" timeout: int = 30 ) -> dict: """获取指定容器的日志""" result = InspectionResult() if not container: result.status = "error" result.error = "必须指定容器ID或名称" return result.dict() try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查Docker是否安装 stdin, stdout, stderr = ssh.exec_command("command -v docker") if not stdout.read().strip(): result.status = "error" result.error = "Docker未安装在目标服务器上" return result.dict() # 构建命令 cmd = f"docker logs --tail {tail}" if since: cmd += f" --since '{since}'" cmd += f" {container}" # 执行命令 stdin, stdout, stderr = ssh.exec_command(cmd) log_output = stdout.read().decode('utf-8') error_output = stderr.read().decode('utf-8') if error_output: result.status = "error" result.error = f"获取容器日志失败: {error_output}" return result.dict() # 设置结果 result.status = "success" result.data = {"logs": log_output.strip().split("\n")} result.raw_outputs = {"container_logs": log_output} log_lines = len(log_output.strip().split("\n")) if log_output.strip() else 0 result.summary = f"获取到容器 {container} 的 {log_lines} 行日志" except Exception as e: result.status = "error" result.error = f"获取容器日志失败: {str(e)}" return result.dict()
- Alternative handler implementation for get_container_logs in SSE variant: similar SSH-based Docker logs retrieval.@handle_exceptions def get_container_logs( hostname: str, username: str, password: str = "", port: int = 22, container: str = "", # 容器ID或名称 tail: int = 100, # 获取最后多少行日志 since: str = "", # 从什么时间开始,如 "2021-01-01T00:00:00" timeout: int = 30 ) -> dict: """获取Docker容器的日志""" result = InspectionResult() if not container: result.status = "error" result.error = "必须指定容器ID或名称" return result.dict() try: with SSHManager(hostname, username, password, port, timeout) as ssh: # 检查Docker是否安装 stdin, stdout, stderr = ssh.exec_command("command -v docker", timeout=timeout) if not stdout.read().strip(): result.status = "error" result.error = "Docker未安装在目标服务器上" return result.dict() # 构建命令 cmd = f"docker logs --tail {tail}" if since: cmd += f" --since '{since}'" cmd += f" {container}" # 执行命令 stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout) log_output = stdout.read().decode('utf-8') error_output = stderr.read().decode('utf-8') if error_output: result.status = "error" result.error = f"获取容器日志失败: {error_output}" return result.dict() # 设置结果 result.status = "success" result.data = {"logs": log_output.strip().split("\n")} result.raw_outputs = {"container_logs": log_output} log_lines = len(log_output.strip().split("\n")) if log_output.strip() else 0 result.summary = f"获取到容器 {container} 的 {log_lines} 行日志" except Exception as e: result.status = "error" result.error = f"获取容器日志失败: {str(e)}" return result.dict()
- Tool schema definition including name, description, and parameter types/defaults for get_container_logs.{"name": "get_container_logs", "description": "获取Docker容器的日志", "parameters": [ {"name": "hostname", "type": "str", "default": None}, {"name": "username", "type": "str", "default": None}, {"name": "password", "type": "str", "default": ""}, {"name": "port", "type": "int", "default": 22}, {"name": "container", "type": "str", "default": ""}, {"name": "tail", "type": "int", "default": 100}, {"name": "since", "type": "str", "default": ""}, {"name": "timeout", "type": "int", "default": 30} ]},
- server_monitor/main.py:42-61 (registration)Registration of get_container_logs in the tools dictionary and dynamic MCP tool registration.tools_dict = { 'get_memory_info': get_memory_info, 'remote_server_inspection': remote_server_inspection, 'get_system_load': get_system_load, 'monitor_processes': monitor_processes, 'check_service_status': check_service_status, 'get_os_details': get_os_details, 'check_ssh_risk_logins': check_ssh_risk_logins, 'check_firewall_config': check_firewall_config, 'security_vulnerability_scan': security_vulnerability_scan, 'backup_critical_files': backup_critical_files, 'inspect_network': inspect_network, 'analyze_logs': analyze_logs, 'list_docker_containers': list_docker_containers, 'list_docker_images': list_docker_images, 'list_docker_volumes': list_docker_volumes, 'get_container_logs': get_container_logs, 'monitor_container_stats': monitor_container_stats, 'check_docker_health': check_docker_health }
- server_monitor_sse/server.py:262-278 (registration)Dispatcher registration and argument handling for get_container_logs in SSE server tool handler.elif name == "get_container_logs": required_args = ["hostname", "username", "container"] for arg in required_args: if arg not in arguments: raise ValueError(f"Missing required argument '{arg}'") result = get_container_logs( hostname=arguments["hostname"], username=arguments["username"], password=arguments.get("password", ""), port=arguments.get("port", 22), container=arguments["container"], tail=arguments.get("tail", 100), since=arguments.get("since", ""), timeout=arguments.get("timeout", 30) )