main.py•21.6 kB
#!/usr/bin/env python3
"""
Claude Code Multi-Process MCP Server
基于FastMCP的多进程Claude Code执行服务器
"""
import json
import os
import signal
import subprocess
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, List
from fastmcp import FastMCP
# 创建MCP服务器实例
server = FastMCP(name="cc-multi-process", instructions="提供Claude Code的多进程并行执行能力")
# 任务存储目录
TASK_STORAGE_DIR = Path("/tmp/cc_process_tasks")
TASK_STORAGE_DIR.mkdir(exist_ok=True)
# 日志配置
LOG_FILE = Path("/tmp/cc_process_mcp.log")
def setup_logging():
"""配置日志系统"""
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
logger = setup_logging()
def is_process_alive(pid: int) -> bool:
"""检测进程是否还在运行(排除僵尸进程)"""
if pid is None:
return False
try:
# 发送信号 0 检测进程是否存在
os.kill(pid, 0)
# 进程存在,但需要检查是否为僵尸进程
try:
# 使用 ps 命令检查进程状态
result = subprocess.run(
['ps', '-p', str(pid), '-o', 'stat='],
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
timeout=1
)
if result.returncode == 0:
stat = result.stdout.strip()
# 如果状态以 'Z' 开头,说明是僵尸进程
if stat.startswith('Z'):
logger.debug(f"PID {pid} is a zombie process")
return False
return True
else:
# ps 命令失败,进程可能已经不存在
return False
except Exception as e:
logger.warning(f"Failed to check process state for PID {pid}: {e}")
# 如果无法检查状态,保守地认为进程还活着
return True
except ProcessLookupError:
# 进程不存在
return False
except PermissionError:
# 进程存在但没有权限,假设它还活着
return True
except Exception as e:
logger.warning(f"Unexpected error checking PID {pid}: {e}")
return False
def safe_read_file(file_path: Path) -> str:
"""安全地读取文件,处理各种编码和异常问题"""
if not file_path.exists():
logger.debug(f"File {file_path} does not exist")
return ""
try:
# 优先尝试严格的 UTF-8 读取
content = file_path.read_text(encoding='utf-8')
logger.debug(f"Successfully read {file_path} with UTF-8 encoding ({len(content)} chars)")
return content
except UnicodeDecodeError as e:
# 降级为 replace 模式
logger.warning(f"UTF-8 decode error for {file_path}: {e}, using replace mode")
try:
content = file_path.read_text(encoding='utf-8', errors='replace')
logger.debug(f"Read {file_path} with replace mode ({len(content)} chars)")
return content
except Exception as e2:
logger.error(f"Failed to read {file_path} even with replace mode: {e2}")
return f"[Error reading file: {e2}]"
except PermissionError as e:
logger.error(f"Permission denied reading {file_path}: {e}")
return f"[Permission denied: {e}]"
except Exception as e:
logger.error(f"Unexpected error reading {file_path}: {e}")
return f"[Unexpected error: {e}]"
def extract_result_from_claude_output(stdout: str, stderr: str, output_format: str = "text") -> str:
"""从 Claude Code 输出中提取结果"""
logger.debug(f"Extracting result: stdout_len={len(stdout)}, stderr_len={len(stderr)}, format={output_format}")
# 处理 stream-json 格式(修复输出丢失问题的关键)
if output_format == "stream-json":
try:
lines = [l for l in stdout.strip().split('\n') if l.strip()]
logger.debug(f"stream-json: found {len(lines)} JSONL lines")
# 查找 type="result" 的对象
for line in lines:
try:
obj = json.loads(line)
if obj.get('type') == 'result':
result = obj.get('result', '')
logger.info(f"Extracted stream-json result: {len(result)} chars")
return result
except json.JSONDecodeError:
continue
# 如果没找到 result 对象,返回所有内容
logger.warning("No 'result' object found in stream-json, returning raw output")
return stdout.strip()
except Exception as e:
logger.warning(f"stream-json parse failed: {e}, falling back to text mode")
return stdout.strip()
if output_format == "json":
try:
data = json.loads(stdout.strip())
if isinstance(data, dict):
result = json.dumps(data, indent=2, ensure_ascii=False)
logger.debug(f"Extracted JSON result: {len(result)} chars")
return result
return stdout.strip()
except json.JSONDecodeError as e:
logger.warning(f"JSON decode failed: {e}, falling back to text mode")
result = stdout.strip()
# 检查 strip 是否导致内容丢失
if len(stdout) > 0 and len(result) == 0:
logger.warning(f"stdout.strip() returned empty! Original length={len(stdout)}, repr={repr(stdout[:100])}")
if not result and stderr:
result = stderr.strip()
logger.debug(f"stdout is empty, using stderr: {len(result)} chars")
if not result:
logger.warning("No output from Claude Code (both stdout and stderr empty)")
return "No output from Claude Code"
logger.debug(f"Extracted text result: {len(result)} chars")
return result
class TaskManager:
"""任务管理器"""
def __init__(self):
self.active_processes: Dict[str, subprocess.Popen] = {}
self.setup_signal_handlers()
def setup_signal_handlers(self):
"""设置信号处理器来自动清理子进程"""
def sigchld_handler(signum, frame):
"""SIGCHLD 信号处理器:自动回收已终止的子进程,防止僵尸进程累积"""
while True:
try:
# os.waitpid(-1, os.WNOHANG):
# -1: 等待任意子进程
# os.WNOHANG: 非阻塞模式,如果没有已终止的子进程立即返回 (0, 0)
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
# 没有更多已终止的子进程
break
# 记录回收信息
exit_code = os.WEXITSTATUS(status) if os.WIFEXITED(status) else -1
logger.debug(f"Reaped child process PID {pid}, exit_code={exit_code}, status={status}")
except ChildProcessError:
# 没有子进程了
break
except Exception as e:
# 处理其他异常(不应该发生,但保险起见)
logger.warning(f"Error in SIGCHLD handler: {e}")
break
signal.signal(signal.SIGCHLD, sigchld_handler)
logger.info("SIGCHLD handler registered for automatic zombie process reaping")
def create_task_directory(self, task_id: str) -> Path:
"""为任务创建专用目录"""
task_dir = TASK_STORAGE_DIR / task_id
task_dir.mkdir(exist_ok=True)
return task_dir
def save_task_result(self, task_id: str, result_data: Dict[str, Any]) -> None:
"""保存任务结果"""
result_file = TASK_STORAGE_DIR / task_id / "result.json"
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(result_data, f, ensure_ascii=False, indent=2)
logger.info(f"Task result saved for {task_id}")
def load_task_result(self, task_id: str) -> Optional[Dict[str, Any]]:
"""加载任务结果"""
result_file = TASK_STORAGE_DIR / task_id / "result.json"
if result_file.exists():
try:
with open(result_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Failed to load result for {task_id}: {e}")
return None
def is_task_running(self, task_id: str) -> bool:
"""检查任务是否仍在运行"""
if task_id not in self.active_processes:
return False
proc = self.active_processes[task_id]
if proc.poll() is not None:
# 进程已结束,清理
del self.active_processes[task_id]
return False
return True
# 全局任务管理器实例
task_manager = TaskManager()
@server.tool
def execute_cc_task(
prompt: str,
working_dir: Optional[str] = None,
model: Optional[str] = None,
skip_permissions: bool = True,
timeout: Optional[int] = None
) -> str:
"""
执行Claude Code任务(同步方式)
Args:
prompt: 任务描述
working_dir: 工作目录路径
model: 使用的模型 (sonnet, opus, haiku)
skip_permissions: 是否跳过权限检查
timeout: 超时时间(秒)
Returns:
任务执行结果
"""
task_id = str(uuid.uuid4())
task_dir = task_manager.create_task_directory(task_id)
logger.info(f"Starting sync task {task_id}: {prompt[:100]}...")
try:
# 构建命令 - 修复:使用正确的Claude Code CLI格式
cmd = ['claude', '--print', prompt]
if model:
cmd.extend(['--model', model])
if skip_permissions:
cmd.append('--dangerously-skip-permissions')
# 执行命令
result = subprocess.run(
cmd,
stdin=subprocess.DEVNULL, # 避免子进程从父进程的 stdin 读取
capture_output=True,
text=True,
timeout=timeout,
cwd=working_dir or os.getcwd()
)
# 提取Claude Code的输出结果
extracted_result = extract_result_from_claude_output(result.stdout, result.stderr)
# 准备结果数据
result_data = {
"task_id": task_id,
"status": "completed" if result.returncode == 0 else "failed",
"return_code": result.returncode,
"result": extracted_result,
"stdout": result.stdout,
"stderr": result.stderr,
"completed_at": datetime.now().isoformat(),
"command": " ".join(cmd),
"working_dir": working_dir or os.getcwd()
}
# 保存结果
task_manager.save_task_result(task_id, result_data)
logger.info(f"Sync task {task_id} completed with code {result.returncode}")
return json.dumps(result_data, ensure_ascii=False, indent=2)
except subprocess.TimeoutExpired:
result_data = {
"task_id": task_id,
"status": "timeout",
"error": f"Task timed out after {timeout} seconds",
"completed_at": datetime.now().isoformat()
}
task_manager.save_task_result(task_id, result_data)
return json.dumps(result_data, ensure_ascii=False)
except Exception as e:
logger.error(f"Error in sync task {task_id}: {e}")
result_data = {
"task_id": task_id,
"status": "error",
"error": str(e),
"completed_at": datetime.now().isoformat()
}
task_manager.save_task_result(task_id, result_data)
return json.dumps(result_data, ensure_ascii=False)
@server.tool
def start_cc_task_async(
prompt: str,
working_dir: Optional[str] = None,
model: Optional[str] = None,
skip_permissions: bool = True,
timeout: Optional[int] = None
) -> str:
"""
异步启动Claude Code任务
Args:
prompt: 任务描述
working_dir: 工作目录路径
model: 使用的模型 (sonnet, opus, haiku)
skip_permissions: 是否跳过权限检查
timeout: 超时时间(秒)
Returns:
任务ID字符串
"""
task_id = str(uuid.uuid4())
task_dir = task_manager.create_task_directory(task_id)
logger.info(f"Starting async task {task_id}: {prompt[:100]}...")
try:
# 构建命令 - 修复:使用正确的Claude Code CLI格式
cmd = ['claude', '--print', prompt]
if model:
cmd.extend(['--model', model])
if skip_permissions:
cmd.append('--dangerously-skip-permissions')
# 创建输出文件
stdout_file = task_dir.with_suffix('.stdout')
stderr_file = task_dir.with_suffix('.stderr')
cwd = working_dir or os.getcwd()
# 启动后台进程
with open(stdout_file, 'w') as stdout_f, open(stderr_file, 'w') as stderr_f:
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL, # 重定向 stdin 到 /dev/null,避免子进程竞争读取父进程的 stdin
stdout=stdout_f,
stderr=stderr_f,
text=True,
cwd=cwd,
start_new_session=True
)
# 存储进程引用
task_manager.active_processes[task_id] = proc
# 创建初始状态文件
initial_status = {
"task_id": task_id,
"status": "running",
"started_at": datetime.now().isoformat(),
"command": " ".join(cmd),
"working_dir": cwd,
"pid": proc.pid
}
task_manager.save_task_result(task_id, initial_status)
logger.info(f"Async task {task_id} started with PID {proc.pid}")
return task_id
except Exception as e:
logger.error(f"Failed to start async task {task_id}: {e}")
error_status = {
"task_id": task_id,
"status": "failed_to_start",
"error": str(e),
"started_at": datetime.now().isoformat()
}
task_manager.save_task_result(task_id, error_status)
return task_id
@server.tool
def check_task_status(task_id: str) -> str:
"""
检查异步任务状态
Args:
task_id: 任务ID
Returns:
任务状态和结果的JSON字符串
"""
logger.info(f"Checking status for task {task_id}")
task_dir = TASK_STORAGE_DIR / task_id
# 读取元数据
result_data = task_manager.load_task_result(task_id)
if not result_data:
return json.dumps({
"task_id": task_id,
"status": "not_found",
"error": f"Task {task_id} not found"
}, ensure_ascii=False)
pid = result_data.get('pid')
working_dir = result_data.get('working_dir', os.getcwd())
# 检查进程是否还在运行
process_alive = is_process_alive(pid)
# 检查文件修改时间
stdout_file = task_dir.with_suffix('.stdout')
stderr_file = task_dir.with_suffix('.stderr')
is_running = False
if stdout_file.exists() or stderr_file.exists():
latest_mtime = max(
stdout_file.stat().st_mtime if stdout_file.exists() else 0,
stderr_file.stat().st_mtime if stderr_file.exists() else 0
)
idle_time = time.time() - latest_mtime
logger.debug(f"Task {task_id} idle time: {idle_time:.1f}s, process_alive={process_alive}")
# 混合判断:进程存活 AND 最近有输出(或刚启动)
if process_alive:
if idle_time < 10:
is_running = True
else:
# 进程存活但长时间无输出,仍认为在运行(可能是长耗时任务)
is_running = True
else:
# 进程已退出
is_running = False
else:
# 文件不存在,但进程可能刚启动
is_running = process_alive
if is_running:
elapsed = time.time() - datetime.fromisoformat(result_data['started_at']).timestamp()
logger.debug(f"Task {task_id} is still running, elapsed={elapsed:.1f}s")
return json.dumps({
'status': 'running',
'task_id': task_id,
'elapsed_seconds': int(elapsed),
'command': result_data.get('command', 'N/A'),
'working_dir': working_dir
}, ensure_ascii=False)
else:
# 任务已完成,读取结果
logger.debug(f"Task {task_id} completed, reading output files")
try:
stdout = safe_read_file(stdout_file)
stderr = safe_read_file(stderr_file)
result = extract_result_from_claude_output(stdout, stderr)
# 更新元数据
result_data.update({
'status': 'completed',
'result': result,
'completed_at': datetime.now().isoformat(),
'elapsed_seconds': int(time.time() - datetime.fromisoformat(result_data['started_at']).timestamp())
})
task_manager.save_task_result(task_id, result_data)
logger.info(f"Task {task_id} completed successfully, result length={len(result)}")
return json.dumps(result_data, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to read output for task {task_id}: {e}")
return json.dumps({
'status': 'error',
'task_id': task_id,
'error': f'Failed to read output: {str(e)}'
}, ensure_ascii=False)
@server.tool
def list_active_tasks() -> str:
"""
列出所有活跃的任务
Returns:
活跃任务列表的JSON字符串
"""
active_tasks = []
# 检查所有任务目录
if TASK_STORAGE_DIR.exists():
for task_dir in TASK_STORAGE_DIR.iterdir():
if task_dir.is_dir():
task_id = task_dir.name
result_data = task_manager.load_task_result(task_id)
if result_data:
pid = result_data.get('pid')
# 检查进程是否还活着
if is_process_alive(pid):
elapsed = int(time.time() - datetime.fromisoformat(result_data['started_at']).timestamp())
task_info = {
"task_id": task_id,
"pid": pid,
"status": "running",
"elapsed_seconds": elapsed,
"command": result_data.get("command", "Unknown"),
"working_dir": result_data.get("working_dir", "N/A"),
"started_at": result_data.get("started_at")
}
active_tasks.append(task_info)
return json.dumps({
"active_tasks": active_tasks,
"total_count": len(active_tasks)
}, ensure_ascii=False, indent=2)
@server.tool
def cleanup_task(task_id: str) -> str:
"""
清理指定任务及其相关数据
Args:
task_id: 要清理的任务ID
Returns:
清理结果的JSON字符串
"""
logger.info(f"Cleaning up task {task_id}")
# 检查任务是否存在并获取PID
result_data = task_manager.load_task_result(task_id)
if result_data:
pid = result_data.get('pid')
# 尝试终止进程
if pid and is_process_alive(pid):
try:
os.kill(pid, signal.SIGTERM)
# 等待进程优雅退出
import time
time.sleep(1)
if is_process_alive(pid):
# 强制杀死
os.kill(pid, signal.SIGKILL)
logger.info(f"Force killed process {pid} for task {task_id}")
else:
logger.info(f"Process {pid} for task {task_id} terminated gracefully")
except ProcessLookupError:
logger.debug(f"Process {pid} for task {task_id} already terminated")
except Exception as e:
logger.error(f"Error terminating process for task {task_id}: {e}")
# 删除任务目录和相关文件
task_dir = TASK_STORAGE_DIR / task_id
try:
import shutil
if task_dir.exists():
shutil.rmtree(task_dir)
logger.info(f"Task directory {task_dir} removed")
# 删除输出文件
stdout_file = task_dir.with_suffix('.stdout')
stderr_file = task_dir.with_suffix('.stderr')
for file in [stdout_file, stderr_file]:
if file.exists():
file.unlink()
logger.debug(f"Removed output file {file}")
except Exception as e:
logger.error(f"Error removing task directory {task_dir}: {e}")
return json.dumps({
"task_id": task_id,
"status": "cleaned",
"message": "Task and associated data have been cleaned up"
}, ensure_ascii=False)
if __name__ == "__main__":
logger.info("Starting Claude Code Multi-Process MCP Server")
server.run()