Skip to main content
Glama
server.py31.2 kB
""" GPT MCP Server - Main Server ============================ 설계: James Park (Backend Lead) MCP (Model Context Protocol) 서버 - GPT Desktop에서 로컬 파일시스템 접근 가능 - SSE (Server-Sent Events) 기반 통신 - JSON-RPC 2.0 프로토콜 사용법: 1. python server.py 2. ngrok http 8765 3. ChatGPT에 URL 등록 """ import os import sys import json import asyncio import logging from pathlib import Path from typing import Optional, List, Dict, Any, AsyncIterator from datetime import datetime from dataclasses import dataclass from contextlib import asynccontextmanager # FastAPI & Uvicorn from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware import uvicorn # Async File I/O import aiofiles import aiofiles.os # 로컬 모듈 from config import get_config, AppConfig, create_default_config_file from security import ( SecurityLayer, SecurityConfig, SecurityError, AccessDeniedError, FileTooLargeError, BlockedFileError, AccessMode, get_security_layer, reset_security_layer ) # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ============================================================================ # MCP Protocol Types # ============================================================================ @dataclass class MCPTool: """MCP Tool 정의""" name: str description: str inputSchema: Dict[str, Any] @dataclass class MCPRequest: """MCP 요청""" jsonrpc: str method: str params: Optional[Dict[str, Any]] = None id: Optional[int] = None @dataclass class MCPResponse: """MCP 응답""" jsonrpc: str = "2.0" result: Optional[Any] = None error: Optional[Dict[str, Any]] = None id: Optional[int] = None def to_dict(self) -> Dict: d = {"jsonrpc": self.jsonrpc} if self.result is not None: d["result"] = self.result if self.error is not None: d["error"] = self.error if self.id is not None: d["id"] = self.id return d # ============================================================================ # File Operations (비동기) # ============================================================================ class FileOperations: """ 파일 작업 핸들러 James Park의 설계 원칙: - 모든 I/O는 비동기 - 에러 핸들링 철저히 - 보안 레이어 통과 필수 """ def __init__(self, security: SecurityLayer, config: AppConfig): self.security = security self.config = config async def list_files( self, directory: str, pattern: str = "*", recursive: bool = False ) -> List[Dict[str, Any]]: """ 디렉토리 파일 목록 조회 Args: directory: 조회할 디렉토리 경로 pattern: glob 패턴 (예: *.py, *.txt) recursive: 하위 디렉토리 포함 여부 Returns: 파일 정보 리스트 """ # 보안 검증 path = self.security.validate_path(directory) if not path.exists(): raise FileNotFoundError(f"Directory not found: {directory}") if not path.is_dir(): raise ValueError(f"Not a directory: {directory}") files = [] max_items = self.config.filesystem.max_directory_items try: if recursive: glob_iter = path.rglob(pattern) else: glob_iter = path.glob(pattern) for item in glob_iter: if len(files) >= max_items: logger.warning(f"Directory listing truncated at {max_items} items") break try: # 각 파일도 보안 검증 (차단 패턴 등) self.security.validate_path(str(item)) file_info = self.security.get_safe_file_info(item) files.append(file_info) except SecurityError: # 차단된 파일은 건너뜀 continue except PermissionError as e: logger.error(f"Permission denied: {e}") raise AccessDeniedError(f"Permission denied: {directory}") # 정렬: 디렉토리 먼저, 이름순 files.sort(key=lambda x: (not x.get("is_dir", False), x.get("name", "").lower())) return files async def read_file( self, file_path: str, encoding: str = "utf-8", start_line: Optional[int] = None, end_line: Optional[int] = None ) -> Dict[str, Any]: """ 파일 내용 읽기 Args: file_path: 파일 경로 encoding: 문자 인코딩 start_line: 시작 라인 (1부터 시작) end_line: 끝 라인 Returns: 파일 내용 및 메타데이터 """ # 보안 검증 path = self.security.validate_path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if not path.is_file(): raise ValueError(f"Not a file: {file_path}") # 파일 크기 검증 self.security.check_file_size(path) # 파일 정보 stat_info = path.stat() try: async with aiofiles.open(path, mode='r', encoding=encoding) as f: if start_line is not None or end_line is not None: # 라인 범위 읽기 lines = await f.readlines() start = (start_line or 1) - 1 end = end_line or len(lines) content = "".join(lines[start:end]) total_lines = len(lines) else: content = await f.read() total_lines = content.count('\n') + 1 # 최대 라인 수 제한 if total_lines > self.config.filesystem.max_lines: lines = content.split('\n')[:self.config.filesystem.max_lines] content = '\n'.join(lines) content += f"\n\n... [Truncated at {self.config.filesystem.max_lines} lines]" # 민감 정보 마스킹 (설정에 따라) if self.config.security.mask_sensitive_content: content = self.security.sanitize_content(content) return { "path": str(path), "content": content, "encoding": encoding, "size": stat_info.st_size, "lines": total_lines, "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(), } except UnicodeDecodeError: # 바이너리 파일인 경우 return { "path": str(path), "content": None, "error": "Binary file - cannot read as text", "size": stat_info.st_size, "is_binary": True, } async def get_file_info(self, file_path: str) -> Dict[str, Any]: """파일 메타데이터 조회""" # 보안 검증 path = self.security.validate_path(file_path) if not path.exists(): raise FileNotFoundError(f"Path not found: {file_path}") return self.security.get_safe_file_info(path) async def write_file( self, file_path: str, content: str, encoding: str = "utf-8", create_dirs: bool = False ) -> Dict[str, Any]: """파일 쓰기/생성""" self.security.check_write_permission() path = self.security.validate_path(file_path) if create_dirs and not path.parent.exists(): path.parent.mkdir(parents=True, exist_ok=True) if not path.parent.exists(): raise FileNotFoundError(f"Parent directory not found: {path.parent}") existed = path.exists() try: async with aiofiles.open(path, mode='w', encoding=encoding) as f: await f.write(content) stat_info = path.stat() return { "path": str(path), "size": stat_info.st_size, "lines": content.count('\n') + 1, "encoding": encoding, "created": not existed, "success": True } except PermissionError: raise AccessDeniedError(f"Permission denied: {file_path}") async def create_directory(self, dir_path: str, parents: bool = True) -> Dict[str, Any]: """디렉토리 생성""" self.security.check_write_permission() path = self.security.validate_path(dir_path) if path.exists(): return {"path": str(path), "created": False, "exists": True, "success": True} try: path.mkdir(parents=parents, exist_ok=True) return {"path": str(path), "created": True, "exists": True, "success": True} except PermissionError: raise AccessDeniedError(f"Permission denied: {dir_path}") async def delete_file(self, file_path: str) -> Dict[str, Any]: """파일 삭제""" self.security.check_write_permission() path = self.security.validate_path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if not path.is_file(): raise ValueError(f"Not a file: {file_path}") try: path.unlink() return {"path": str(path), "deleted": True, "success": True} except PermissionError: raise AccessDeniedError(f"Permission denied: {file_path}") async def search_files( self, directory: str, query: str, file_pattern: str = "*", case_sensitive: bool = False, max_results: Optional[int] = None ) -> List[Dict[str, Any]]: """ 파일 내용 검색 Args: directory: 검색 디렉토리 query: 검색어 file_pattern: 파일 필터 패턴 case_sensitive: 대소문자 구분 max_results: 최대 결과 수 Returns: 검색 결과 리스트 """ # 보안 검증 path = self.security.validate_path(directory) if not path.exists() or not path.is_dir(): raise FileNotFoundError(f"Directory not found: {directory}") results = [] max_results = max_results or self.config.filesystem.max_search_results search_query = query if case_sensitive else query.lower() for file_path in path.rglob(file_pattern): if len(results) >= max_results: break if not file_path.is_file(): continue try: # 보안 검증 self.security.validate_path(str(file_path)) self.security.check_file_size(file_path) # 파일 읽기 async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: content = await f.read() search_content = content if case_sensitive else content.lower() if search_query in search_content: # 매칭 라인 찾기 matches = [] for i, line in enumerate(content.split('\n'), 1): search_line = line if case_sensitive else line.lower() if search_query in search_line: matches.append({ "line_number": i, "content": line.strip()[:200] # 200자 제한 }) if len(matches) >= 5: # 파일당 최대 5개 매치 break results.append({ "path": str(file_path), "matches": matches, "match_count": len(matches) }) except (SecurityError, UnicodeDecodeError, PermissionError): # 접근 불가 파일 건너뜀 continue return results # ============================================================================ # MCP Server # ============================================================================ class MCPServer: """ MCP (Model Context Protocol) 서버 GPT Desktop과 통신하는 메인 서버 클래스 """ # MCP 프로토콜 버전 PROTOCOL_VERSION = "2024-11-05" # 서버 정보 SERVER_INFO = { "name": "gpt-filesystem-mcp", "version": "1.1.0", "description": "Local filesystem access for GPT Desktop (read/write)" } def __init__(self, config: AppConfig): self.config = config self.security = self._create_security_layer() self.file_ops = FileOperations(self.security, config) self.tools = self._define_tools() def _create_security_layer(self) -> SecurityLayer: """보안 레이어 생성""" from security import SecurityConfig as SecConfig sec_config = SecConfig( allowed_directories=self.config.filesystem.allowed_directories, blocked_patterns=self.config.security.blocked_file_patterns, blocked_directories=self.config.security.blocked_directories, max_file_size=self.config.filesystem.max_file_size, max_lines=self.config.filesystem.max_lines, max_directory_items=self.config.filesystem.max_directory_items, max_search_results=self.config.filesystem.max_search_results, access_mode=AccessMode.READ_WRITE if self.config.filesystem.write_enabled else AccessMode.READ_ONLY, follow_symlinks=self.config.filesystem.follow_symlinks, ) return SecurityLayer(sec_config) def _define_tools(self) -> List[MCPTool]: """MCP Tools 정의""" tools = [ MCPTool( name="list_files", description="디렉토리의 파일 및 폴더 목록을 조회합니다. 허용된 디렉토리 내에서만 작동합니다.", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "조회할 디렉토리 경로" }, "pattern": { "type": "string", "description": "파일 필터 패턴 (예: *.py, *.txt)", "default": "*" }, "recursive": { "type": "boolean", "description": "하위 디렉토리 포함 여부", "default": False } }, "required": ["path"] } ), MCPTool( name="read_file", description="파일의 내용을 읽습니다. 텍스트 파일만 지원합니다.", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "읽을 파일의 경로" }, "encoding": { "type": "string", "description": "문자 인코딩", "default": "utf-8" }, "start_line": { "type": "integer", "description": "시작 라인 번호 (1부터 시작)" }, "end_line": { "type": "integer", "description": "끝 라인 번호" } }, "required": ["path"] } ), MCPTool( name="get_file_info", description="파일 또는 디렉토리의 메타데이터를 조회합니다 (크기, 수정일 등).", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "파일 또는 디렉토리 경로" } }, "required": ["path"] } ), MCPTool( name="search_files", description="지정된 디렉토리에서 파일 내용을 검색합니다.", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "검색할 디렉토리 경로" }, "query": { "type": "string", "description": "검색어" }, "pattern": { "type": "string", "description": "파일 필터 패턴 (예: *.py)", "default": "*" }, "case_sensitive": { "type": "boolean", "description": "대소문자 구분", "default": False } }, "required": ["path", "query"] } ), MCPTool( name="get_allowed_directories", description="접근 가능한 디렉토리 목록을 반환합니다.", inputSchema={ "type": "object", "properties": {} } ), ] if self.config.filesystem.write_enabled: tools.extend([ MCPTool( name="write_file", description="파일을 생성하거나 덮어씁니다.", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "파일 경로"}, "content": {"type": "string", "description": "파일 내용"}, "encoding": {"type": "string", "default": "utf-8"}, "create_dirs": {"type": "boolean", "default": False} }, "required": ["path", "content"] } ), MCPTool( name="create_directory", description="디렉토리를 생성합니다.", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "디렉토리 경로"}, "parents": {"type": "boolean", "default": True} }, "required": ["path"] } ), MCPTool( name="delete_file", description="파일을 삭제합니다.", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "파일 경로"} }, "required": ["path"] } ), ]) return tools async def handle_initialize(self, params: Dict) -> Dict: """MCP initialize 핸들러""" return { "protocolVersion": self.PROTOCOL_VERSION, "serverInfo": self.SERVER_INFO, "capabilities": { "tools": {} } } async def handle_list_tools(self) -> Dict: """tools/list 핸들러""" return { "tools": [ { "name": tool.name, "description": tool.description, "inputSchema": tool.inputSchema } for tool in self.tools ] } async def handle_call_tool(self, params: Dict) -> Dict: """tools/call 핸들러""" tool_name = params.get("name") arguments = params.get("arguments", {}) logger.info(f"Tool call: {tool_name} with args: {arguments}") try: if tool_name == "list_files": result = await self.file_ops.list_files( directory=arguments["path"], pattern=arguments.get("pattern", "*"), recursive=arguments.get("recursive", False) ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "read_file": result = await self.file_ops.read_file( file_path=arguments["path"], encoding=arguments.get("encoding", "utf-8"), start_line=arguments.get("start_line"), end_line=arguments.get("end_line") ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "get_file_info": result = await self.file_ops.get_file_info( file_path=arguments["path"] ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "search_files": result = await self.file_ops.search_files( directory=arguments["path"], query=arguments["query"], file_pattern=arguments.get("pattern", "*"), case_sensitive=arguments.get("case_sensitive", False) ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "get_allowed_directories": result = { "allowed_directories": self.config.filesystem.allowed_directories, "write_enabled": self.config.filesystem.write_enabled, "max_file_size": self.config.filesystem.max_file_size } return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "write_file": result = await self.file_ops.write_file( file_path=arguments["path"], content=arguments["content"], encoding=arguments.get("encoding", "utf-8"), create_dirs=arguments.get("create_dirs", False) ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "create_directory": result = await self.file_ops.create_directory( dir_path=arguments["path"], parents=arguments.get("parents", True) ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} elif tool_name == "delete_file": result = await self.file_ops.delete_file( file_path=arguments["path"] ) return {"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]} else: raise ValueError(f"Unknown tool: {tool_name}") except FileNotFoundError as e: return {"content": [{"type": "text", "text": f"Error: {e}"}], "isError": True} except SecurityError as e: return {"content": [{"type": "text", "text": f"Security Error: {e}"}], "isError": True} except Exception as e: logger.error(f"Tool error: {e}") return {"content": [{"type": "text", "text": f"Error: {e}"}], "isError": True} async def handle_request(self, request: MCPRequest) -> MCPResponse: """MCP 요청 처리""" try: if request.method == "initialize": result = await self.handle_initialize(request.params or {}) elif request.method == "tools/list": result = await self.handle_list_tools() elif request.method == "tools/call": result = await self.handle_call_tool(request.params or {}) elif request.method == "ping": result = {} else: return MCPResponse( error={"code": -32601, "message": f"Method not found: {request.method}"}, id=request.id ) return MCPResponse(result=result, id=request.id) except Exception as e: logger.error(f"Request error: {e}") return MCPResponse( error={"code": -32603, "message": str(e)}, id=request.id ) # ============================================================================ # FastAPI Application # ============================================================================ def create_app(config: Optional[AppConfig] = None) -> FastAPI: """FastAPI 애플리케이션 생성""" if config is None: config = get_config() # MCP 서버 생성 mcp_server = MCPServer(config) @asynccontextmanager async def lifespan(app: FastAPI): logger.info("=" * 50) logger.info("GPT MCP Server Starting") logger.info(f"Allowed directories: {config.filesystem.allowed_directories}") logger.info(f"Write enabled: {config.filesystem.write_enabled}") logger.info("=" * 50) yield logger.info("GPT MCP Server Shutting down") app = FastAPI( title="GPT MCP Server", description="MCP Server for GPT Desktop - Local Filesystem Access", version="1.1.0", lifespan=lifespan ) # CORS 설정 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): """서버 상태 확인""" return { "status": "running", "server": MCPServer.SERVER_INFO, "protocol_version": MCPServer.PROTOCOL_VERSION, "write_enabled": config.filesystem.write_enabled } @app.get("/health") async def health(): """헬스체크""" return {"status": "healthy"} @app.post("/mcp") async def mcp_endpoint(request: Request): """MCP JSON-RPC 엔드포인트""" try: body = await request.json() logger.debug(f"MCP Request: {body}") mcp_request = MCPRequest( jsonrpc=body.get("jsonrpc", "2.0"), method=body.get("method", ""), params=body.get("params"), id=body.get("id") ) response = await mcp_server.handle_request(mcp_request) return JSONResponse(content=response.to_dict()) except Exception as e: logger.error(f"MCP endpoint error: {e}") return JSONResponse( content={ "jsonrpc": "2.0", "error": {"code": -32700, "message": str(e)}, "id": None }, status_code=400 ) @app.get("/sse") async def sse_endpoint(request: Request): """SSE 엔드포인트 (MCP over SSE)""" async def event_generator() -> AsyncIterator[str]: # 연결 시 서버 정보 전송 init_event = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": {"serverInfo": MCPServer.SERVER_INFO} } yield f"data: {json.dumps(init_event)}\n\n" # Keep-alive while True: if await request.is_disconnected(): break yield f": keep-alive\n\n" await asyncio.sleep(30) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) return app # ============================================================================ # Main Entry Point # ============================================================================ def main(): """메인 실행 함수""" # 설정 로드 config = get_config() # 설정 파일이 없으면 생성 config_path = Path(__file__).parent / "config.yaml" if not config_path.exists(): create_default_config_file(str(config_path)) logger.info(f"Created default config file: {config_path}") logger.info("Please edit config.yaml to set allowed directories") # 로그 레벨 설정 logging.getLogger().setLevel(getattr(logging, config.server.log_level.upper())) # 서버 시작 정보 print("\n" + "=" * 60) print(" GPT MCP Server v1.1.0") print("=" * 60) print(f" Host: {config.server.host}") print(f" Port: {config.server.port}") print(f" Debug: {config.server.debug}") print(f" Write Enabled: {config.filesystem.write_enabled}") print("-" * 60) print(" Allowed Directories:") for d in config.filesystem.allowed_directories: print(f" - {d}") print("-" * 60) print(" Next Steps:") print(" 1. Run: ngrok http 8765") print(" 2. Copy the HTTPS URL") print(" 3. Add to ChatGPT MCP settings") print("=" * 60 + "\n") # FastAPI 앱 생성 및 실행 app = create_app(config) uvicorn.run( app, host=config.server.host, port=config.server.port, log_level=config.server.log_level.lower(), ) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seanshin0214/gpt-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server