http_server.py•8.71 kB
"""
MCP 서버용 FastAPI HTTP 래퍼
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, Any, Optional
import asyncio
import json
from datetime import datetime
import sys
import os
# 프로젝트 루트 디렉토리를 Python 패스에 추가
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.services import CalendarService
from src.models import CalendarEventRequest, EventCategory, EventStatus
from src.exceptions import CalendarException
app = FastAPI(title="Ittae MCP Server", version="1.0.0")
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 서비스 인스턴스
calendar_service = CalendarService()
DEFAULT_USER_ID = 1
# 요청/응답 모델
class McpRequest(BaseModel):
method: str
params: Dict[str, Any]
class McpResponse(BaseModel):
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
@app.post("/mcp")
async def handle_mcp_call(request: McpRequest) -> McpResponse:
"""MCP 함수 호출을 처리합니다."""
try:
if request.method != "tools/call":
raise HTTPException(status_code=400, detail="Unsupported method")
function_name = request.params.get("name")
arguments = request.params.get("arguments", {})
# 함수 호출
result = await execute_function(function_name, arguments)
return McpResponse(result=result)
except Exception as e:
return McpResponse(
error={
"code": -1,
"message": str(e)
}
)
async def execute_function(function_name: str, args: Dict[str, Any]) -> Any:
"""MCP 함수를 실행합니다."""
if function_name == "get_all_events":
result = calendar_service.fetch_events(DEFAULT_USER_ID)
if result.success and result.data:
return [calendar_service.to_response(event).dict() for event in result.data]
return []
elif function_name == "get_event_by_id":
event_id = args.get("event_id")
if not event_id:
raise ValueError("event_id is required")
result = calendar_service.get_event_by_id(event_id, DEFAULT_USER_ID)
if result.success and result.data:
return calendar_service.to_response(result.data).dict()
raise ValueError(f"Event {event_id} not found")
elif function_name == "create_calendar_event":
# 시간 파싱
start_time_str = args.get("start_time")
if start_time_str:
try:
start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00'))
except ValueError:
raise ValueError(f"Invalid start_time format: {start_time_str}")
else:
raise ValueError("start_time is required")
# 카테고리 검증
category_str = args.get("category")
if category_str not in [cat.value for cat in EventCategory]:
raise ValueError(f"Invalid category: {category_str}")
request = CalendarEventRequest(
title=args.get("title"),
description=args.get("description"),
location=args.get("location"),
start_time=start_time,
duration=args.get("duration"),
category=EventCategory(category_str),
stamina_cost=args.get("stamina_cost", 0)
)
result = calendar_service.create_event(request, DEFAULT_USER_ID)
if result.success and result.data:
return calendar_service.to_response(result.data).dict()
raise ValueError(result.error or "Failed to create event")
elif function_name == "update_calendar_event":
event_id = args.get("event_id")
if not event_id:
raise ValueError("event_id is required")
# 시간 파싱
start_time_str = args.get("start_time")
if start_time_str:
try:
start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00'))
except ValueError:
raise ValueError(f"Invalid start_time format: {start_time_str}")
else:
raise ValueError("start_time is required")
# 카테고리 검증
category_str = args.get("category")
if category_str not in [cat.value for cat in EventCategory]:
raise ValueError(f"Invalid category: {category_str}")
# 상태 검증 (선택사항)
status = None
status_str = args.get("status")
if status_str:
if status_str not in [stat.value for stat in EventStatus]:
raise ValueError(f"Invalid status: {status_str}")
status = EventStatus(status_str)
request = CalendarEventRequest(
title=args.get("title"),
description=args.get("description"),
location=args.get("location"),
start_time=start_time,
duration=args.get("duration"),
category=EventCategory(category_str),
stamina_cost=args.get("stamina_cost", 0),
status=status
)
result = calendar_service.update_event(event_id, request, DEFAULT_USER_ID)
if result.success and result.data:
return calendar_service.to_response(result.data).dict()
raise ValueError(result.error or "Failed to update event")
elif function_name == "delete_calendar_event":
event_id = args.get("event_id")
if not event_id:
raise ValueError("event_id is required")
result = calendar_service.delete_event(event_id, DEFAULT_USER_ID)
if result.success:
return f"Event {event_id} deleted successfully"
raise ValueError(result.error or "Failed to delete event")
elif function_name == "complete_event":
event_id = args.get("event_id")
stamina_after = args.get("stamina_after")
if not event_id:
raise ValueError("event_id is required")
if stamina_after is None:
raise ValueError("stamina_after is required")
result = calendar_service.complete_event(event_id, DEFAULT_USER_ID, stamina_after)
if result.success and result.data:
return calendar_service.to_response(result.data).dict()
raise ValueError(result.error or "Failed to complete event")
elif function_name == "get_events_by_category":
category_str = args.get("category")
if not category_str:
raise ValueError("category is required")
if category_str not in [cat.value for cat in EventCategory]:
raise ValueError(f"Invalid category: {category_str}")
result = calendar_service.fetch_events(DEFAULT_USER_ID)
if result.success and result.data:
filtered_events = [
event for event in result.data
if event.category == category_str
]
return [calendar_service.to_response(event).dict() for event in filtered_events]
return []
elif function_name == "get_events_by_date":
date_str = args.get("date")
if not date_str:
raise ValueError("date is required")
try:
target_date = datetime.fromisoformat(date_str).date()
except ValueError:
raise ValueError(f"Invalid date format: {date_str}")
result = calendar_service.fetch_events(DEFAULT_USER_ID)
if result.success and result.data:
filtered_events = [
event for event in result.data
if event.start_time.date() == target_date
]
return [calendar_service.to_response(event).dict() for event in filtered_events]
return []
else:
raise ValueError(f"Unknown function: {function_name}")
@app.get("/health")
async def health_check():
"""헬스 체크 엔드포인트"""
return {
"status": "healthy",
"service": "mcp-server",
"timestamp": datetime.now().isoformat()
}
@app.get("/")
async def root():
"""루트 엔드포인트"""
return {
"message": "Ittae MCP Server",
"version": "1.0.0",
"endpoints": {
"mcp": "/mcp",
"health": "/health"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)