server.py•11.4 kB
from fastmcp import FastMCP
from fastmcp.prompts.prompt import PromptMessage, TextContent
from pydantic import BaseModel, Field, ConfigDict, computed_field
from typing import Optional, Dict, Any
import httpx
from contextlib import asynccontextmanager
from smithery.decorators import smithery
import solvedac_server.search_query as sq
TIER_NAMES = ["Bronze", "Silver", "Gold", "Platinum", "Diamond", "Ruby"]
TIER_SYMBOLS = ["V", "IV", "III", "II", "I"]
DEFAULT_LIMIT = 5
SOLVEDAC_API_BASE_URL = "https://solved.ac/api/v3"
state = {"http_client": None}
def get_tier_name_from_level(level: int) -> str:
    """solved.ac의 숫자 레벨을 문자열 티어 이름(예: Platinum V)으로 변환합니다."""
    if level == 0:
        return "Unrated"
    
    if 1 <= level <= 30:
        # (level - 1) // 5 = 0(Bronze) ~ 5(Ruby)
        tier_index = (level - 1) // 5
        # (level - 1) % 5 = 0(V) ~ 4(I)
        sub_tier_index = (level - 1) % 5
        
        main_tier = TIER_NAMES[tier_index]
        symbol = TIER_SYMBOLS[sub_tier_index]
        
        return f"{main_tier} {symbol}"
    
    if level == 31:
        return "Master"
    return "Unknown"
class UserShowResponse(BaseModel):
    model_config = ConfigDict(extra="ignore")
    
 
    handle: str
    tier: int   
    rating: int
    solvedCount: int
    @computed_field
    @property
    def tier_name(self) -> str:
        """
        API에서 받은 숫자 티어(self.tier)를
        헬퍼 함수를 이용해 문자열 이름(예: Platinum V)으로 변환합니다.
        """
        return get_tier_name_from_level(self.tier)
async def get_user_info_core(handle: str) -> UserShowResponse:
    client: httpx.AsyncClient = state.get("http_client")
    if not client:
        raise RuntimeError("HTTP Client is not available.")
    try:
        resp = await client.get("/user/show", params={"handle": handle})
        resp.raise_for_status()
        return UserShowResponse.model_validate(resp.json())
    except httpx.HTTPStatusError as e:
        s = e.response.status_code
        if s == 404:
            raise ValueError(
                f"사용자 핸들 '{handle}'을(를) 찾을 수 없습니다."
            ) from e
        if s == 429:
            raise RuntimeError(
                "요청이 많습니다(429). 잠시 후 다시 시도하세요."
            ) from e
        if 500 <= s < 600:
            raise RuntimeError(
                "solved.ac 서버 오류가 발생했습니다. 잠시 후 다시 시도하세요."
            ) from e
        raise
    except httpx.RequestError as e:
        raise RuntimeError(
            f"네트워크 오류로 사용자 정보를 가져오지 못했습니다: {e}"
        ) from e
        
@asynccontextmanager
async def lifespan(app: FastMCP):
    state["http_client"] = httpx.AsyncClient(
        base_url=SOLVEDAC_API_BASE_URL,
        headers={"X-Solvedac-Language": "ko"},
        timeout=10.0,
        follow_redirects=True,
    )
    try:
        yield
    finally:
        if state["http_client"]:
            await state["http_client"].aclose()
        state["http_client"] = None
@smithery.server()
def create_server():
    app = FastMCP(name="SolvedAcAPI", lifespan=lifespan)
    # ---------- 1) 도구 함수: 모델 호출 함수(실행 버튼) ----------
    @app.tool(
        name="solvedac_get_user_info",
        description="solved.ac 사용자의 레이팅/티어/푼 문제 수 조회"
    )
    async def get_user_info_tool(
        handle: str = Field(..., description="사용자 핸들")
    ):
        return await get_user_info_core(handle)
    @app.tool(
        name="solvedac_refresh_tag_catalog",
        description="solved.ac 태그 카탈로그를 강제로 다시 로드합니다.",
    )
    async def refresh_tag_catalog_tool():
        client: httpx.AsyncClient = state.get("http_client")
        raw = await sq.fetch_tag_list_core(client)
        cat = sq.build_tag_catalog(raw)
        state["tag_catalog"] = cat
        return {"refreshed": True, "count": len(cat.tags)}
    @app.tool(
        name="solvedac_search_problems",
        description=(
            "난이도/태그/키워드 검색 (예: tier:g5..p5 #dp). "
            "공식 태그 정규화(옵션) 후, -@handle/-t@handle + sort=random."
        ),
    )
    async def search_problems_tool(
        query: str = Field(
            ...,
            description="검색 쿼리 (예: 'tier:g5..p5 #dp' 또는 '완전탐색')",
        ),
        limit: int = Field(
            DEFAULT_LIMIT, ge=1, le=20, description="반환 개수"
        ),
        handle: Optional[str] = Field(
            None, description="사용자 solved.ac 핸들"
        ),
        max_attempts: int = Field(
            3, ge=1, le=5, description="랜덤 정렬 재시도(중복 제거)"
        ),
        strict_tags: bool = Field(
            False, description="모르는 태그는 에러(True)/무시(False)"
        ),
        enable_bare_infer: bool = Field(
            False, description="맨바닥 단어(예: '완전탐색')를 태그로 승격"
        ),
    ):
        q0 = (query or "").strip()
        # 0) 태그가 실제로 필요할 때만 카탈로그 사용
        if sq._needs_tag_catalog(q0, enable_bare_infer):
            cat = await sq._ensure_tag_catalog(state)
            # 1) 명시 태그(#dp, tag:dp ...) 정규화
            q_norm, unknown, _ = sq.normalize_query_tags(
                q0, cat.token_to_key, strict=strict_tags
            )
            if strict_tags and unknown:
                raise ValueError(f"알 수 없는 태그: {', '.join(unknown)}")
            # 2) 요청 시에만 bare-word 승격 (예: '완전탐색' → '#bruteforce')
            if enable_bare_infer:
                inferred = sq.infer_bare_tag_keys(q_norm, cat.token_to_key)
                if inferred:
                    q_norm = (
                        f"{q_norm} " + " ".join(f"#{k}" for k in inferred)
                    )
        else:
            # 태그 필요 없음 → 그대로 진행
            q_norm = q0
        # 3) 푼/시도 제외 필터 부착
        q = sq._augment_query_with_user_filters(q_norm, handle)
        # 4) 서버 랜덤 정렬 + 중복 제거 + 재시도
        uniq: Dict[int, Any] = {}
        last_resp = None
        for _ in range(max_attempts):
            resp = await sq.search_problems_core(
                state, query=q, page=1, sort="random"
            )
            last_resp = resp
            for it in (resp.items or []):
                pid = getattr(it, "problemId", None) or getattr(it, "id", None)
                if pid is not None and pid not in uniq:
                    uniq[pid] = it
                    if len(uniq) >= limit:
                        break
            if len(uniq) >= limit:
                break
        if last_resp is None:
            raise RuntimeError("검색 응답이 없습니다.")
        items = list(uniq.values())[:limit]
        last_resp.items = items
        last_resp.count = len(items)  # ✅ 반환 개수로 세팅 (테스트가 기대하는 계약)
        return last_resp
    
    # ---------- 2) 리소스 함수: 모델이 참고할 자료(자료실 주소) ----------
    @app.resource(
        "solvedac://tags/catalog",
        description="solved.ac 공식 태그 목록을 로드하여 서버 메모리에 보관합니다.",
    )
    async def solvedac_tag_catalog_resource():
        client: httpx.AsyncClient = state.get("http_client")
        if not client:
            raise RuntimeError("HTTP Client is not available.")
        cat = state.get("tag_catalog")
        if cat is None:
            raw = await sq.fetch_tag_list_core(client)
            cat = sq.build_tag_catalog(raw)
            state["tag_catalog"] = cat
        return {"count": len(cat.tags), "keys": [t.key for t in cat.tags]}
    @app.resource(
        "solvedac://tags/catalog",
        description="solved.ac 공식 태그 목록을 로드하여 서버 메모리에 보관합니다.",
    )
    async def solvedac_tag_catalog_resource():
        cat = await sq._ensure_tag_catalog(state)  # ← 내부 헬퍼 사용
        return {"count": len(cat.tags), "keys": [t.key for t in cat.tags]}
    
    @app.resource("solvedac://users/{handle}",
                  description="특정 solved.ac 사용자의 기본 정보(레이팅, 티어, 푼 문제 수)를 조회합니다.")
    async def get_user_info(
        handle: str = Field(..., description="조회하려는 사용자의 solved.ac 핸들/아이디"),
    ) -> UserShowResponse:
        return await get_user_info_core(handle)
    @app.resource("solvedac://problems/search/{stub}",
                  description="난이도, 태그, 키워드 등으로 solved.ac 문제를 검색합니다. (예: query='tier:s5..g5 tag:dp')")
    async def search_problems(
        query: str = Field(..., description="문제 검색 쿼리"),
        page: int = Field(1, ge=1, description="페이지 번호(1부터 시작)"),
        stub: str = Field("_", description="템플릿 제약 대응용 더미 세그먼트(무시됨)"),
    ) -> sq.ProblemSearchResponse:
        return await sq.search_problems_core(state, query=query, page=page)
    # ---------- 3) 프롬프트 함수: 모델이 참고할 행동(행동 절차 카드) ----------
    @app.prompt(
        name="solvedac.search-workflow",
        description="자연어 조건을 solved.ac 검색 쿼리로 변환하고, 해당 쿼리로 문제 후보를 검토합니다.",
        tags={"solvedac", "search"},
    )
    def search_workflow_prompt(
        natural_request: str = Field(
            ..., description="예: '실버~골드 사이 DP 5문제, 비슷한 태그는 제외'"
        ),
        page: int = Field(1, ge=1, description="검색 페이지"),
    ) -> list[PromptMessage]:
        sys = PromptMessage(
            role="assistant",
            content=TextContent(
                type="text",
                text=(
                    "You are a Solved.ac search assistant.\n"
                    "1) Convert the user's request into a precise Solved.ac query string "
                    "(e.g., `tier:g5..p5 tag:dfs -tag:greedy`).\n"
                    "2) Do NOT browse the web.\n"
                    "3) Call the MCP TOOL `solvedac_search_problems` with {query, page}.\n"
                    "4) Rank top 5 by suitability and show: problemId, titleKo, level."
                ),
            ),
        )
        usr = PromptMessage(
            role="user",
            content=TextContent(
                type="text",
                text=(
                    f"요청(자연어): {natural_request}\n"
                    f"페이지: {page}\n"
                    f"규칙: 쿼리를 먼저 제시하고, 이어서 리소스를 호출해 결과를 평가하세요."
                ),
            ),
        )
        return [sys, usr]
    get_user_info_for_test = get_user_info_core
    search_problems_for_test = sq.search_problems_core
    search_workflow_prompt_for_test = sq.search_workflow_prompt_core
    
    return app