"""
MCP 태스크 스케줄러 서버 (HTTP + 스트리밍).
- MCP Python SDK + FastAPI/uvicorn 기반
- Celery(RabbitMQ broker, Redis/SQLite backend)로 실제 작업 실행/스케줄링
"""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field
from uvicorn import Config, Server
from celery import states
from celery_app import celery_app, example_long_task
from notification import get_default_notifier
try:
# MCP Python SDK (가정: fastmcp 스타일 인터페이스)
from mcp.server.fastmcp import FastMCP
except ImportError: # pragma: no cover - 런타임 환경에 따라 다를 수 있음
FastMCP = None # type: ignore
logger = logging.getLogger("task_scheduler_mcp.server")
logging.basicConfig(level=logging.INFO)
app = FastAPI(title="Task Scheduler MCP HTTP Server")
class ScheduleRequest(BaseModel):
"""
MCP 도구에서 사용할 공통 페이로드 모델.
"""
payload: Dict[str, Any] = Field(default_factory=dict)
run_at: Optional[str] = Field(
default=None,
description="예약 실행 시간 (ISO 8601, UTC 기준). None 이면 즉시 비동기 실행.",
)
notify_target: Optional[str] = Field(
default=None,
description="결과 알림 대상 (예: 카카오톡 user id 또는 식별자).",
)
class TaskInfo(BaseModel):
task_id: str
status: str
scheduled_for: Optional[str] = None
result: Optional[Dict[str, Any]] = None
notifier = get_default_notifier()
def _parse_iso8601(ts: str) -> datetime:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _schedule_celery_task(req: ScheduleRequest) -> str:
"""
Celery 태스크를 즉시 또는 ETA 스케줄링.
"""
eta: Optional[datetime] = None
if req.run_at:
eta = _parse_iso8601(req.run_at)
# 예시로 example_long_task 사용, 실제로는 별도 도메인 태스크를 연결 가능
async_result = example_long_task.apply_async(
kwargs={"payload": req.payload},
eta=eta,
)
return async_result.id
async def _watch_and_notify(task_id: str, notify_target: Optional[str]) -> None:
"""
Celery 결과를 폴링해서 완료 시 Notifier 로 결과 전송.
MCP 툴은 즉시 응답하고, 이 코루틴은 백그라운드에서 동작.
"""
if not notify_target:
return
loop = asyncio.get_event_loop()
def _get_result():
async_res = celery_app.AsyncResult(task_id)
return async_res.state, async_res.info if async_res.ready() else None
while True:
state, info = await loop.run_in_executor(None, _get_result)
if state in states.READY_STATES:
notifier.notify(
target=notify_target,
message=f"Task {task_id} finished with state={state}",
extra={"result": info},
)
break
await asyncio.sleep(2)
#
# MCP 서버 설정
#
if FastMCP is not None:
mcp = FastMCP("task-scheduler-mcp")
@mcp.tool()
async def enqueue_task(
payload: Dict[str, Any],
notify_target: Optional[str] = None,
) -> Dict[str, Any]:
"""
**즉시 비동기 실행** 도구.
- Celery 로 바로 태스크를 던지고
- MCP 응답은 task_id 만 즉시 반환.
- 결과는 notify_target 으로 별도 알림 전송.
"""
req = ScheduleRequest(payload=payload, run_at=None, notify_target=notify_target)
task_id = _schedule_celery_task(req)
# 백그라운드에서 결과 감시 및 알림
asyncio.create_task(_watch_and_notify(task_id, notify_target))
return {
"task_id": task_id,
"status": "queued",
}
@mcp.tool()
async def schedule_task(
run_at: str,
payload: Dict[str, Any],
notify_target: Optional[str] = None,
) -> Dict[str, Any]:
"""
**예약 실행** 도구.
- run_at: ISO 8601 UTC (예: 2026-01-09T12:00:00Z)
- 지정 시간에 Celery ETA 기능으로 실행.
"""
req = ScheduleRequest(payload=payload, run_at=run_at, notify_target=notify_target)
task_id = _schedule_celery_task(req)
asyncio.create_task(_watch_and_notify(task_id, notify_target))
return {
"task_id": task_id,
"status": "scheduled",
"run_at": run_at,
}
@mcp.tool()
async def get_task_status(task_id: str) -> TaskInfo:
"""
특정 작업의 상태/결과 조회.
"""
async_res = celery_app.AsyncResult(task_id)
info: Optional[Dict[str, Any]] = None
if async_res.ready():
try:
result_val = async_res.get()
except Exception as exc: # noqa: BLE001
result_val = {"error": str(exc)}
info = {"raw": result_val}
return TaskInfo(
task_id=task_id,
status=async_res.state,
result=info,
)
@mcp.tool()
async def cancel_task(task_id: str) -> Dict[str, Any]:
"""
대기 중/실행 중인 작업 취소 시도.
"""
async_res = celery_app.AsyncResult(task_id)
revoked = async_res.revoke(terminate=True)
return {
"task_id": task_id,
"revoked": bool(revoked),
}
@app.get("/health")
async def health() -> Dict[str, str]:
return {"status": "ok"}
def run() -> None:
"""
uvicorn 을 이용해서 HTTP + MCP 서버 실행.
MCP SDK 의 HTTP 어댑터가 따로 있다면
여기서 app 에 마운트하는 식으로 확장 가능.
"""
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8000"))
config = Config(app=app, host=host, port=port, log_level="info")
server = Server(config)
logger.info("Starting Task Scheduler MCP HTTP server on %s:%s", host, port)
asyncio.run(server.serve())
if __name__ == "__main__":
run()