server.py•20.4 kB
from __future__ import annotations
import json
from dataclasses import dataclass, field
from fastmcp import FastMCP
mcp = FastMCP("crossword_tools")
@dataclass(frozen=True)
class Clue:
"探索ロジックが参照する検証済みのカギ定義"
clue_id: str
direction: str
length: int
positions: tuple[tuple[int, int], ...]
text: str
@dataclass
class PuzzleState:
"グリッドとそのカギ・候補をまとめて保持する状態表現"
grid: list[list[str]] = field(default_factory=list)
clues: dict[str, Clue] = field(default_factory=dict)
candidates: dict[str, list[str]] = field(default_factory=dict)
state = PuzzleState()
FILLABLE_CELL = "?"
BLOCK_CELL = "#"
_FULLWIDTH_DIGIT_TO_ASCII = str.maketrans({chr(ord("0") + i): str(i) for i in range(10)})
def _to_fullwidth_number(n: int) -> str:
return "".join(chr(ord("0") + int(d)) for d in str(n))
def _load_grid(grid_text: str) -> list[list[str]]:
"グリッド文字列を読み込み、全角表記の入力からセル行列を構築する"
grid: list[list[str]] = []
for raw_line in grid_text.splitlines():
if FILLABLE_CELL not in raw_line and BLOCK_CELL not in raw_line:
continue
normalized_line = raw_line.translate(_FULLWIDTH_DIGIT_TO_ASCII)
parts = normalized_line.strip().split()
if not parts:
continue
if parts[0].isdigit():
tokens = parts[1:]
else:
tokens = parts
if not tokens:
continue
for token in tokens:
if token not in {FILLABLE_CELL, BLOCK_CELL}:
raise ValueError(f"未知のマス表現を検出しました: {token}")
if grid and len(tokens) != len(grid[0]):
raise ValueError("行ごとのマス数が一致しません。入力を確認してください。")
grid.append(tokens)
return grid
def _validate_clue_payload(
payload: dict[str, object],
grid: list[list[str]],
seen_ids: set[str],
) -> Clue:
"JSON 行から取得したカギ情報を検証し、内部で扱いやすい形へ変換する"
required_keys = ("id", "direction", "row", "col", "length", "clue")
for key in required_keys:
if key not in payload:
raise ValueError(f"カギ情報に {key} がありません。")
clue_id = str(payload["id"]).strip()
if not clue_id:
raise ValueError("clue_id が空です。")
if clue_id in seen_ids:
raise ValueError(f"clue_id={clue_id} が重複しています。")
direction = str(payload["direction"]).strip().lower()
if direction not in {"across", "down"}:
raise ValueError(f"clue_id={clue_id} の direction が不正です: {payload['direction']}")
try:
row = int(payload["row"])
col = int(payload["col"])
length = int(payload["length"])
except (TypeError, ValueError) as exc:
raise ValueError(f"clue_id={clue_id} の row/col/length を整数に変換できません。") from exc
if row <= 0 or col <= 0 or length <= 0:
raise ValueError(f"clue_id={clue_id} の row/col/length が不正です。")
rows = len(grid)
cols = len(grid[0]) if rows else 0
start_r = row - 1
start_c = col - 1
if start_r >= rows or start_c >= cols:
raise ValueError(f"clue_id={clue_id} の開始位置が盤面外です。")
positions: list[tuple[int, int]] = []
for offset in range(length):
r = start_r + (offset if direction == "down" else 0)
c = start_c + (offset if direction == "across" else 0)
if r >= rows or c >= cols:
raise ValueError(f"clue_id={clue_id} の単語が盤面外にはみ出します。")
if grid[r][c] == BLOCK_CELL:
raise ValueError(f"clue_id={clue_id} が黒マスを含んでいます。")
positions.append((r, c))
clue_text = str(payload["clue"]).strip()
if not clue_text:
raise ValueError(f"clue_id={clue_id} の clue が空です。")
seen_ids.add(clue_id)
return Clue(
clue_id=clue_id,
direction=direction,
length=length,
positions=tuple(positions),
text=clue_text,
)
def _load_clues(clue_text: str, grid: list[list[str]]) -> dict[str, Clue]:
"JSON Lines 形式のカギ定義を読み込み、検証した Clue オブジェクトへ変換する"
lines = [line.strip() for line in clue_text.splitlines() if line.strip()]
if not lines:
raise ValueError("カギ情報が存在しません。")
seen_ids: set[str] = set()
clues: dict[str, Clue] = {}
for line_no, raw in enumerate(lines, start=1):
try:
payload = json.loads(raw)
except json.JSONDecodeError as exc:
raise ValueError(f"{line_no} 行目のカギ情報を JSON として読み込めません。") from exc
if not isinstance(payload, dict):
raise ValueError(f"{line_no} 行目のカギ情報が辞書形式ではありません。")
clue = _validate_clue_payload(payload, grid, seen_ids)
clues[clue.clue_id] = clue
return clues
def _ensure_setup() -> None:
"セットアップ済みかを検査し、未設定なら利用者に通知する"
if not state.clues or not state.grid:
raise RuntimeError("setup を先に呼び出してください。")
@mcp.tool()
async def setup(grid_text: str, clue_text: str) -> list[list[str]]:
"""クロスワードの盤面とカギ定義を読み込み、状態を初期化する。
Args:
grid_text (str): 行番号つきの盤面テキスト。列・行番号は全角数字で表記し、
文字が入るマスは "?"、黒マスは "#" で記述する。各行のマス数が一致している
必要がある。
clue_text (str): JSON Lines 形式のカギ定義。各行は `id`/`direction`/`row`/`col`
/`length`/`clue` を持つ辞書で、`direction` は "across" か "down"。`row` と
`col` は 1 起点の正整数。
Returns:
list[list[str]]: 正規化済みセル行列。各要素は "?" または "#" のシンボル。
Raises:
ValueError: 盤面の行長不一致・未知のセル記号・カギ定義の欠損や不正値など、
入力内容が検証に失敗した場合。
Notes:
この関数を呼び出すと既存の候補リストは破棄され、状態が再初期化される。
"""
grid = _load_grid(grid_text)
clues = _load_clues(clue_text, grid)
state.grid = grid
state.clues = clues
state.candidates.clear()
return grid
@mcp.tool()
async def register_candidates(
clue_id: str,
candidates: list[str],
) -> dict[str, list[str]]:
"""指定したカギに対して文字数がマッチする候補語を追加登録する。登録済みと除外された語をまとめて返す。
Args:
clue_id (str): 登録対象のカギ ID。`setup` で読み込んだカギ定義に存在している
必要がある。前後の空白は自動で除去される。
candidates (list[str]): 追加したい候補語のリスト。空文字は許容されない。
カギの `length` と文字数が一致しない語は登録されず、除外リストに入る。
Returns:
dict[str, list[str]]: `registered` に登録後の候補語リスト(過去の登録分を含む)、
`rejected` に長さ不一致で追加できなかった語のリストを格納する辞書。
Notes:
長さが一致した語のみ状態に追加され、既存の候補リストは保持したまま追記される。
同じ語が既に登録済みの場合は無視される。
Raises:
RuntimeError: `setup` をまだ呼び出していない場合。
ValueError: `clue_id` が空、または候補語が空文字だった場合。
KeyError: 指定した `clue_id` のカギが存在しない場合。
"""
_ensure_setup()
if not clue_id:
raise ValueError("clue_id は必須です。")
clue_id = clue_id.strip()
if clue_id not in state.clues:
raise KeyError("指定された clue_id のカギが存在しません。")
clue = state.clues[clue_id]
existing = state.candidates.get(clue_id)
if existing is None:
existing = []
rejected: list[str] = []
for candidate in candidates:
word = candidate.strip()
if not word:
raise ValueError("空の候補は登録できません。")
if len(word) == clue.length:
if word not in existing:
existing.append(word)
else:
rejected.append(word)
state.candidates[clue_id] = existing
final_registered = state.candidates.get(clue_id, [])
return {"登録済みの候補語リスト": list(final_registered), "文字数がマッチせず登録されなかった候補語リスト": rejected}
@mcp.tool()
async def get_candidates(clue_id: str) -> list[str]:
"""登録済みの候補語リストを取得する。
Args:
clue_id (str): 取得対象のカギ ID。事前に `register_candidates` で候補を登録して
いる必要がある。
Returns:
list[str]: 登録済み候補語のリスト。登録時に渡した文字列を順序どおりに返す。
Raises:
ValueError: `clue_id` が空の場合。
KeyError: 指定した `clue_id` の候補が未登録の場合。
"""
if not clue_id:
raise ValueError("clue_id は必須です。")
clue_id = clue_id.strip()
if clue_id not in state.candidates:
raise KeyError("指定された clue_id の候補が登録されていません。")
return state.candidates[clue_id]
def _solve(target_info: list[tuple[str, tuple[tuple[int, int], ...], list[str]]]) -> list[dict[str, str]]:
"候補語を総当たりして交差一致を判定し、最大数のカギが一致する割当てを収集する"
cell_letters: dict[tuple[int, int], str] = {}
assignments: dict[str, str] = {}
best_assignments: list[dict[str, str]] = []
best_size = 0
signatures: set[tuple[tuple[str, str], ...]] = set()
def update_best() -> None:
nonlocal best_size
current_size = len(assignments)
signature = tuple(sorted(assignments.items()))
if current_size > best_size:
best_size = current_size
best_assignments.clear()
signatures.clear()
if current_size == best_size and signature not in signatures:
best_assignments.append(dict(assignments))
signatures.add(signature)
def backtrack(index: int) -> None:
if index == len(target_info):
update_best()
return
clue_id, positions, candidates = target_info[index]
for word in candidates:
conflict = False
placed: list[tuple[int, int]] = []
for (row, col), char in zip(positions, word):
existing = cell_letters.get((row, col))
if existing is not None and existing != char:
conflict = True
break
if conflict:
continue
assignments[clue_id] = word
for (row, col), char in zip(positions, word):
if (row, col) not in cell_letters:
cell_letters[(row, col)] = char
placed.append((row, col))
backtrack(index + 1)
for cell in placed:
del cell_letters[cell]
del assignments[clue_id]
backtrack(index + 1)
backtrack(0)
if best_size == 0:
return []
return best_assignments
@mcp.tool()
async def search_consistent_sets(target_clue_ids: list[str] | None = None) -> list[dict[str, str]]:
"""登録済み候補から交差条件を満たす割当てを探索する。
Args:
target_clue_ids (list[str] | None): 探索対象とするカギ ID のリスト。`None` または
空リストを渡した場合は、候補が登録済みのすべてのカギを対象とする。
Returns:
list[dict[str, str]]: 整合性が取れた解集合のリスト。各要素は `clue_id` をキー、
採用した候補語を値とする辞書。複数の最大解が存在する場合は重複しない
形で列挙し、整合するカギが 1 件も無い場合は空リストを返す。
Raises:
RuntimeError: `setup` が未実行、または候補語が一件も登録されていない場合。
KeyError: `target_clue_ids` に含まれる ID がカギ定義または候補登録に存在しない
場合。
"""
_ensure_setup()
if not state.candidates:
raise RuntimeError("register_candidates を先に呼び出して候補ワードを登録してください。")
if target_clue_ids is None:
target_ids = list(state.candidates.keys())
else:
target_ids = [cid.strip() for cid in target_clue_ids if cid and cid.strip()]
if not target_ids:
target_ids = list(state.candidates.keys())
unique_ids = list(dict.fromkeys(target_ids))
target_info: list[tuple[str, tuple[tuple[int, int], ...], list[str]]] = []
for clue_id in unique_ids:
if clue_id not in state.clues:
raise KeyError(f"clue_id={clue_id} のカギが存在しません。")
if clue_id not in state.candidates:
raise KeyError(f"clue_id={clue_id} の候補が登録されていません。")
clue = state.clues[clue_id]
candidates = state.candidates[clue_id]
target_info.append((clue_id, clue.positions, candidates))
return _solve(target_info)
@mcp.tool()
async def render_solution(assignments: dict[str, str]) -> str:
"""全カギの解答を盤面へ反映し、整合性チェックを通過した描画結果を返す。
Args:
assignments (dict[str, str]): `clue_id` をキーとした解答文字列の辞書。`setup`
済みのすべてのカギに対して、黒マス以外のセルを埋める語を指定する。
各文字列は純粋なひらがなのみで構成されている必要がある。
Returns:
str: 列・行番号付きで整形したグリッド文字列。交差が一致している場合のみ
返される。
Raises:
RuntimeError: `setup` が未実行の場合。
ValueError: 未指定または未知の `clue_id` がある、解答が空文字、長さ不一致、
ひらがな以外の文字が含まれている、あるいは黒マスとの衝突・既存文字との
矛盾が発生した場合。
"""
_ensure_setup()
if not assignments:
raise ValueError("assignments が空です。全てのカギに対する解答を指定してください。")
expected_ids = set(state.clues.keys())
provided_ids = set(assignments.keys())
missing = expected_ids - provided_ids
extra = provided_ids - expected_ids
if missing:
raise ValueError(f"未指定の clue_id があります: {sorted(missing)}")
if extra:
raise ValueError(f"未知の clue_id が含まれています: {sorted(extra)}")
grid = [row[:] for row in state.grid]
for clue_id, answer in assignments.items():
clue = state.clues[clue_id]
word = answer.strip()
if not word:
raise ValueError(f"clue_id={clue_id} の解答が空文字です。")
if len(word) != clue.length:
raise ValueError(f"clue_id={clue_id} の長さが一致しません: {len(word)} ≠ {clue.length}")
for char in word:
code = ord(char)
if not (0x3041 <= code <= 0x309F):
raise ValueError(f"clue_id={clue_id} の解答にひらがな以外の文字が含まれています: {char}")
for (row_idx, col_idx), char in zip(clue.positions, word):
cell = grid[row_idx][col_idx]
if cell == BLOCK_CELL:
raise ValueError(f"clue_id={clue_id} の配置が黒マスと衝突しています。")
if cell != FILLABLE_CELL and cell != char:
raise ValueError(f"位置({row_idx + 1},{col_idx + 1}) で文字が衝突しました: {cell} vs {char}")
grid[row_idx][col_idx] = char
if not grid:
return ""
width = len(grid[0])
header = " " + " ".join(_to_fullwidth_number(i + 1) for i in range(width))
lines = [header]
for idx, row in enumerate(grid, start=1):
lines.append(f"{_to_fullwidth_number(idx)} {' '.join(row)}")
return "\n".join(lines)
@mcp.prompt()
def solve_crossword(grid_path: str, clue_path: str) -> str:
"クロスワード解法時の試行手順を詳細に案内する対話用プロンプト"
prompt = f"""
あなたはクロスワードを解くアシスタントです。以下の MCP ツールを活用しながら、
カレントディレクトリのdata/フォルダ配下にある{grid_path} のグリッドと {clue_path} のカギ定義から正解と納得できる解答集を導き出してください。
グリッドは全角数字の座標で表記され、入力可能なマスは「?」、黒マスは「#」です。解答語はひらがなで揃えて扱ってください。
利用できる主なツール:
- setup(grid_text, clue_text): グリッドとカギを読み込み初期化します。まず最初に必ず実行します。
- register_candidates(clue_id, candidates): 候補語のリストをカギに追加登録します。既存候補は保持され、重複は無視されます。
- get_candidates(clue_id): あるカギに登録済みの候補語を確認します。
- search_consistent_sets([clue_ids]): 今ある候補語の組合せから整合する割当てを探索します。部分的にチェックしたい場合は ID を絞り込んでください。
- render_solution(assignments): 全カギの解答を盤面に反映し、最終確認用の描画済みグリッドを取得します。解答はひらがなで指定してください。
解法方針:
1. まず setup を実行して状態を初期化します。
2. 候補語が不足している、あるいは整合する解が見つからない場合は register_candidates を使って語を追加します。クロスワードでは様々な可能性を考慮することが重要なので、候補単語については思いつく限り多く列挙してください。
3. search_consistent_sets を使って整合性チェックを行いますが、整合結果が出ても即確定せず、解が妥当か検討してください。納得できない場合は候補語をさらに追加したり、特定のカギだけに絞って再チェックしたりします。
4. 行き詰まったら、候補語のバリエーションを広げたり、同じカギに複数セットの候補を追加した上で再び search_consistent_sets を呼び出してください。部分的に良さそうな組を見つけたら、そのカギの候補を洗い替えするのも有効です。
5. 途中で諦めず、複数回の検索と候補追加を繰り返し、納得のいく解答が得られるまで試行錯誤を続けてください。
6. 納得できる割当てを得たら、解答をすべてひらがなでそろえたうえで render_solution を呼び出し、盤面を描画して視覚的な矛盾がないか確認してください。
重要: search_consistent_sets で得られる組み合わせはあくまで整合性を満たす候補です。内容に確信が持てない場合は、さらに候補を追加し直して再実行し、より良い解を目指してください。
最終的に render_solution で得た盤面を添えて、各カギの解答と根拠、試行錯誤が必要だったポイントの2つを整理したレポートを提示してください。
"""
return prompt
if __name__ == "__main__":
mcp.run(transport="stdio")