import os
from celery import Celery
BROKER_URL = os.getenv("BROKER_URL", "amqp://guest:guest@localhost:5672//")
# 결과 백엔드: REDIS_URL 우선, 없으면 SQLite DB 사용
REDIS_URL = os.getenv("REDIS_URL")
if REDIS_URL:
RESULT_BACKEND = REDIS_URL
else:
# Celery SQLAlchemy result backend 사용 (로컬 파일 DB)
RESULT_BACKEND = os.getenv(
"SQLITE_URL", "db+sqlite:///./celery_results.sqlite3"
)
celery_app = Celery(
"task_scheduler_mcp",
broker=BROKER_URL,
backend=RESULT_BACKEND,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)
@celery_app.task(bind=True)
def example_long_task(self, payload: dict) -> dict:
"""
데모용 긴 작업 예제 태스크.
실제 MCP 도구에서 이 태스크를 재사용하거나
별도 태스크를 정의해서 스케줄링에 사용하면 된다.
"""
import time
for i in range(5):
time.sleep(1)
self.update_state(
state="PROGRESS",
meta={"step": i + 1, "total": 5},
)
return {
"status": "done",
"echo": payload,
}