#!/usr/bin/env python3
"""
Complete Terminal MCP - 완전한 터미널 명령 실행 시스템
안전하고 효율적인 터미널 명령 실행 기능을 제공하는 모듈화된 MCP 패키지
"""
import os
import subprocess
import shlex
import time
import logging
import json
import platform
import psutil
import threading
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
from .models import (
CommandCategory, CommandInfo, ExecutionResult, ProcessInfo,
SystemInfo, CommandHistory, EnvironmentVariable
)
logger = logging.getLogger(__name__)
class CompleteTerminalMCP:
"""완전한 터미널 MCP 구현"""
def __init__(self, working_directory: str = None, timeout_default: int = 120):
"""
CompleteTerminalMCP 초기화
Args:
working_directory: 기본 작업 디렉토리
timeout_default: 기본 타임아웃 (초)
"""
self.logger = logging.getLogger("CompleteTerminalMCP")
# 기본 설정
self.working_directory = working_directory or os.getcwd()
self.timeout_default = timeout_default
self.command_history: List[CommandHistory] = []
self.active_processes: Dict[int, subprocess.Popen] = {}
# 환경변수 캐시
self.env_cache: Dict[str, str] = dict(os.environ)
# 안전한 명령어 목록
self.safe_commands = {
'ls', 'pwd', 'echo', 'cat', 'head', 'tail', 'grep', 'find',
'ps', 'top', 'df', 'du', 'free', 'uname', 'whoami', 'date',
'which', 'whereis', 'file', 'wc', 'sort', 'uniq', 'cut'
}
# 위험한 명령어 목록
self.dangerous_commands = {
'rm', 'del', 'rmdir', 'format', 'fdisk', 'mkfs', 'dd',
'chmod', 'chown', 'su', 'sudo', 'passwd', 'useradd', 'userdel',
'shutdown', 'reboot', 'halt', 'init', 'kill', 'killall'
}
# GPU 환경변수 설정 (CLAUDE.md 지침)
self._setup_gpu_environment()
self.logger.info("CompleteTerminalMCP 초기화 완료")
def _setup_gpu_environment(self):
"""GPU 환경변수 설정 (CLAUDE.md 지침 반영)"""
try:
self.env_cache.update({
'TF_CPP_MIN_LOG_LEVEL': '2',
'TF_FORCE_GPU_ALLOW_GROWTH': 'true',
'CUDA_VISIBLE_DEVICES': '0'
})
self.logger.debug("GPU 환경변수 설정 완료")
except Exception as e:
self.logger.warning(f"GPU 환경변수 설정 실패: {e}")
def _categorize_command(self, command: str) -> CommandCategory:
"""명령어 카테고리 분류"""
cmd_parts = shlex.split(command.strip())
if not cmd_parts:
return CommandCategory.SAFE
base_cmd = cmd_parts[0].lower()
if base_cmd in self.dangerous_commands:
return CommandCategory.DANGEROUS
elif base_cmd in self.safe_commands:
return CommandCategory.SAFE
elif base_cmd in ['python', 'node', 'npm', 'pip', 'git']:
return CommandCategory.DEVELOPMENT
elif base_cmd in ['curl', 'wget', 'ping', 'nslookup']:
return CommandCategory.NETWORK
else:
return CommandCategory.SYSTEM
def _validate_command(self, command: str) -> tuple[bool, str]:
"""명령어 안전성 검증"""
try:
# 기본 검증
if not command or not command.strip():
return False, "빈 명령어입니다"
# 위험한 패턴 검사
dangerous_patterns = ['rm -rf /', 'format c:', '> /dev/null', 'dd if=']
for pattern in dangerous_patterns:
if pattern in command.lower():
return False, f"위험한 패턴 감지: {pattern}"
# 카테고리 확인
category = self._categorize_command(command)
if category == CommandCategory.DANGEROUS:
return False, "위험한 명령어입니다"
return True, "안전한 명령어입니다"
except Exception as e:
return False, f"명령어 검증 실패: {str(e)}"
# =============================================================================
# MCP 표준 인터페이스 메서드들
# =============================================================================
def execute_command(
self,
command: str,
timeout: int = None,
cwd: str = None,
env: Dict[str, str] = None
) -> Dict[str, Any]:
"""
터미널 명령 실행 (MCP 표준 인터페이스)
Args:
command: 실행할 명령어
timeout: 타임아웃 (초, 기본값: self.timeout_default)
cwd: 작업 디렉토리 (기본값: self.working_directory)
env: 환경변수 (기본값: self.env_cache)
Returns:
실행 결과
"""
start_time = time.time()
try:
self.logger.info(f"명령 실행 요청: {command}")
# 명령어 검증
is_safe, validation_msg = self._validate_command(command)
if not is_safe:
return {
"success": False,
"error": f"명령어 검증 실패: {validation_msg}",
"command": command
}
# 기본값 설정
timeout = timeout or self.timeout_default
cwd = cwd or self.working_directory
env_vars = {**self.env_cache, **(env or {})}
# 타임아웃이 있는 명령 실행 (CLAUDE.md 지침)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(self._execute_subprocess, command, cwd, env_vars)
try:
result = future.result(timeout=timeout)
except FutureTimeoutError:
return {
"success": False,
"error": f"명령 실행 타임아웃 ({timeout}초)",
"command": command,
"execution_time": time.time() - start_time
}
# 실행 시간 계산
execution_time = time.time() - start_time
result["execution_time"] = execution_time
# 히스토리에 추가
self._add_to_history(command, cwd, result.get("exit_code", -1), execution_time, len(result.get("output", "")))
self.logger.info(f"명령 실행 완료: {command} (시간: {execution_time:.2f}초)")
return result
except Exception as e:
error_msg = f"명령 실행 실패: {str(e)}"
self.logger.error(error_msg)
return {
"success": False,
"error": error_msg,
"command": command,
"execution_time": time.time() - start_time
}
def _execute_subprocess(self, command: str, cwd: str, env: Dict[str, str]) -> Dict[str, Any]:
"""서브프로세스 실행"""
try:
# 프로세스 시작
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=cwd,
env=env
)
# 활성 프로세스 목록에 추가
self.active_processes[process.pid] = process
try:
# 프로세스 완료 대기
stdout, stderr = process.communicate()
exit_code = process.returncode
return {
"success": exit_code == 0,
"output": stdout,
"error": stderr if stderr else None,
"exit_code": exit_code,
"command": command,
"pid": process.pid
}
finally:
# 활성 프로세스 목록에서 제거
if process.pid in self.active_processes:
del self.active_processes[process.pid]
except Exception as e:
return {
"success": False,
"error": f"프로세스 실행 실패: {str(e)}",
"command": command
}
def get_command_history(self, limit: int = 50) -> Dict[str, Any]:
"""명령 히스토리 조회"""
try:
history = self.command_history[-limit:] if limit > 0 else self.command_history
return {
"success": True,
"total_commands": len(self.command_history),
"returned_commands": len(history),
"history": [
{
"id": h.id,
"command": h.command,
"working_directory": h.working_directory,
"exit_code": h.exit_code,
"execution_time": h.execution_time,
"timestamp": h.timestamp
}
for h in history
]
}
except Exception as e:
return {"success": False, "error": f"히스토리 조회 실패: {str(e)}"}
def kill_process(self, pid: int, force: bool = False) -> Dict[str, Any]:
"""프로세스 종료"""
try:
# 프로세스 존재 확인
if not psutil.pid_exists(pid):
return {"success": False, "error": f"프로세스 {pid}가 존재하지 않습니다"}
process = psutil.Process(pid)
process_name = process.name()
# 프로세스 종료
if force:
process.kill() # SIGKILL
action = "강제 종료"
else:
process.terminate() # SIGTERM
action = "정상 종료"
# 종료 확인 (최대 5초 대기)
try:
process.wait(timeout=5)
self.logger.info(f"프로세스 {action}: {pid} ({process_name})")
return {
"success": True,
"message": f"프로세스 {pid} ({process_name})이 {action}되었습니다",
"pid": pid,
"process_name": process_name,
"action": action
}
except psutil.TimeoutExpired:
return {
"success": False,
"error": f"프로세스 {pid} 종료 시간 초과",
"pid": pid
}
except psutil.NoSuchProcess:
return {"success": False, "error": f"프로세스 {pid}를 찾을 수 없습니다"}
except psutil.AccessDenied:
return {"success": False, "error": f"프로세스 {pid}에 대한 접근 권한이 없습니다"}
except Exception as e:
return {"success": False, "error": f"프로세스 종료 실패: {str(e)}"}
def get_system_info(self) -> Dict[str, Any]:
"""시스템 정보 조회"""
try:
# 기본 시스템 정보
system_info = {
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"architecture": platform.architecture()[0],
"hostname": platform.node(),
"cpu_count": psutil.cpu_count(),
"cpu_freq": dict(psutil.cpu_freq()._asdict()) if psutil.cpu_freq() else {},
}
# 메모리 정보
memory = psutil.virtual_memory()
system_info.update({
"memory_total": memory.total,
"memory_available": memory.available,
"memory_percent": memory.percent
})
# 디스크 정보
disk = psutil.disk_usage('/')
system_info["disk_usage"] = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
}
# 네트워크 인터페이스
system_info["network_interfaces"] = {}
for interface, addrs in psutil.net_if_addrs().items():
system_info["network_interfaces"][interface] = [
{"family": addr.family.name, "address": addr.address, "netmask": addr.netmask}
for addr in addrs
]
# 부팅 시간과 로드 평균
system_info["boot_time"] = psutil.boot_time()
if hasattr(os, 'getloadavg'):
system_info["load_average"] = list(os.getloadavg())
else:
system_info["load_average"] = [0.0, 0.0, 0.0] # Windows 등
return {
"success": True,
"system_info": system_info
}
except Exception as e:
return {"success": False, "error": f"시스템 정보 조회 실패: {str(e)}"}
def change_directory(self, path: str) -> Dict[str, Any]:
"""작업 디렉토리 변경"""
try:
# 경로 정규화
new_path = os.path.abspath(os.path.expanduser(path))
# 디렉토리 존재 확인
if not os.path.exists(new_path):
return {"success": False, "error": f"디렉토리가 존재하지 않습니다: {new_path}"}
if not os.path.isdir(new_path):
return {"success": False, "error": f"파일입니다, 디렉토리가 아닙니다: {new_path}"}
# 디렉토리 변경
old_path = self.working_directory
self.working_directory = new_path
os.chdir(new_path)
self.logger.info(f"작업 디렉토리 변경: {old_path} -> {new_path}")
return {
"success": True,
"message": "작업 디렉토리가 변경되었습니다",
"old_directory": old_path,
"new_directory": new_path
}
except Exception as e:
return {"success": False, "error": f"디렉토리 변경 실패: {str(e)}"}
def get_environment_variables(self, pattern: str = None) -> Dict[str, Any]:
"""환경변수 조회"""
try:
env_vars = {}
for name, value in self.env_cache.items():
if pattern is None or pattern.lower() in name.lower():
env_vars[name] = value
return {
"success": True,
"total_variables": len(self.env_cache),
"matched_variables": len(env_vars),
"variables": env_vars
}
except Exception as e:
return {"success": False, "error": f"환경변수 조회 실패: {str(e)}"}
def set_environment_variable(self, name: str, value: str, export: bool = True) -> Dict[str, Any]:
"""환경변수 설정"""
try:
# 환경변수 설정
self.env_cache[name] = value
if export:
os.environ[name] = value
self.logger.info(f"환경변수 설정: {name}={value}")
return {
"success": True,
"message": f"환경변수 {name}이 설정되었습니다",
"name": name,
"value": value,
"exported": export
}
except Exception as e:
return {"success": False, "error": f"환경변수 설정 실패: {str(e)}"}
def _add_to_history(self, command: str, cwd: str, exit_code: int, execution_time: float, output_length: int):
"""히스토리에 명령 추가"""
try:
history_entry = CommandHistory(
id=len(self.command_history) + 1,
command=command,
working_directory=cwd,
exit_code=exit_code,
execution_time=execution_time,
output_length=output_length,
timestamp=time.strftime("%Y-%m-%d %H:%M:%S")
)
self.command_history.append(history_entry)
# 히스토리 크기 제한 (최대 1000개)
if len(self.command_history) > 1000:
self.command_history = self.command_history[-1000:]
except Exception as e:
self.logger.error(f"히스토리 추가 실패: {e}")
def cleanup(self):
"""리소스 정리"""
try:
self.logger.info("CompleteTerminalMCP 정리 시작")
# 활성 프로세스 종료
for pid, process in list(self.active_processes.items()):
try:
if process.poll() is None: # 아직 실행 중
process.terminate()
process.wait(timeout=5)
except Exception as e:
self.logger.warning(f"프로세스 {pid} 정리 실패: {e}")
self.active_processes.clear()
self.logger.info("CompleteTerminalMCP 정리 완료")
except Exception as e:
self.logger.error(f"정리 중 오류: {e}")
# MCP 표준 인터페이스를 위한 래퍼 함수들
def execute_command(command: str, timeout: int = None, cwd: str = None, env: Dict[str, str] = None) -> Dict[str, Any]:
"""전역 execute_command 함수 (MCP 호환성)"""
terminal = CompleteTerminalMCP()
return terminal.execute_command(command, timeout, cwd, env)
def get_command_history(limit: int = 50) -> Dict[str, Any]:
"""전역 get_command_history 함수 (MCP 호환성)"""
terminal = CompleteTerminalMCP()
return terminal.get_command_history(limit)
def kill_process(pid: int, force: bool = False) -> Dict[str, Any]:
"""전역 kill_process 함수 (MCP 호환성)"""
terminal = CompleteTerminalMCP()
return terminal.kill_process(pid, force)
def get_system_info() -> Dict[str, Any]:
"""전역 get_system_info 함수 (MCP 호환성)"""
terminal = CompleteTerminalMCP()
return terminal.get_system_info()
def change_directory(path: str) -> Dict[str, Any]:
"""전역 change_directory 함수 (MCP 호환성)"""
terminal = CompleteTerminalMCP()
return terminal.change_directory(path)