#!/usr/bin/env python3
"""
MCP Anki Server - A proper Model Context Protocol server for Anki flashcards.
This server provides MCP-compliant tools for retrieving Anki data via JSON-RPC over stdin/stdout.
The server acts as a data bridge - Claude handles all analysis.
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Load configuration
def load_config():
"""Load configuration from config.json if available, fallback to environment variables."""
config = {
"anki_connect": {
"url": "http://localhost:8765",
"timeout": 30
},
"server": {
"name": "anki-mcp",
"log_level": "INFO"
},
"limits": {
"max_cards_per_query": 50,
"max_search_results": 100
},
"write_operations": {
"enabled": True,
"require_confirmation": ["delete_cards", "delete_deck"],
"batch_size_limit": 100,
"backup_before_delete": False
}
}
# Try to load from config.json
config_file = os.getenv("CONFIG_FILE", "config.json")
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
file_config = json.load(f)
# Merge file config with defaults
config.update(file_config)
except Exception as e:
print(f"Warning: Could not load {config_file}: {e}")
# Override with environment variables if present
config["anki_connect"]["url"] = os.getenv("ANKI_CONNECT_URL", config["anki_connect"]["url"])
config["server"]["log_level"] = os.getenv("LOG_LEVEL", config["server"]["log_level"])
return config
# Load configuration
CONFIG = load_config()
# Configure logging
log_level = getattr(logging, CONFIG["server"]["log_level"].upper(), logging.INFO)
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
# Configuration variables for backward compatibility
ANKI_CONNECT_URL = CONFIG["anki_connect"]["url"]
ANKI_CONNECT_TIMEOUT = CONFIG["anki_connect"]["timeout"]
MAX_CARDS_PER_QUERY = CONFIG["limits"]["max_cards_per_query"]
WRITE_OPERATIONS_ENABLED = CONFIG["write_operations"]["enabled"]
BATCH_SIZE_LIMIT = CONFIG["write_operations"]["batch_size_limit"]
# Create MCP server instance
server = Server(CONFIG["server"]["name"])
# AnkiConnect client functions
async def anki_connect_request(action: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a request to AnkiConnect and return the response."""
if params is None:
params = {}
payload = {
"action": action,
"version": 6,
"params": params
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(ANKI_CONNECT_URL, json=payload, timeout=ANKI_CONNECT_TIMEOUT)
response.raise_for_status()
result = response.json()
if result.get("error"):
raise Exception(f"AnkiConnect error: {result['error']}")
return result.get("result")
except httpx.RequestError as e:
raise Exception(f"Failed to connect to AnkiConnect: {str(e)}")
except Exception as e:
raise Exception(f"AnkiConnect request failed: {str(e)}")
# Helper functions for write operations
def create_success_response(operation: str, result: Any, message: str = None) -> Dict[str, Any]:
"""Create a standardized success response for write operations."""
return {
"success": True,
"operation": operation,
"result": result,
"message": message or f"{operation} completed successfully",
"timestamp": datetime.now().isoformat()
}
def create_error_response(operation: str, error_type: str, message: str, details: Dict[str, Any] = None) -> Dict[str, Any]:
"""Create a standardized error response for write operations."""
return {
"success": False,
"operation": operation,
"error": {
"type": error_type,
"message": message,
"details": details or {}
},
"timestamp": datetime.now().isoformat()
}
def validate_write_operations_enabled(operation: str) -> None:
"""Check if write operations are enabled."""
if not WRITE_OPERATIONS_ENABLED:
raise Exception(f"Write operations are disabled. Enable them in config to use {operation}.")
def validate_confirmation_required(operation: str, confirmation: bool = False) -> None:
"""Check if confirmation is required for destructive operations."""
if operation in CONFIG["write_operations"]["require_confirmation"]:
if not confirmation:
raise Exception(f"Confirmation required for {operation}. Set confirm_deletion=true to proceed.")
def validate_batch_size(items: List[Any], operation: str) -> None:
"""Validate batch operation size."""
if len(items) > BATCH_SIZE_LIMIT:
raise Exception(f"Batch size {len(items)} exceeds limit of {BATCH_SIZE_LIMIT} for {operation}.")
# MCP Tool Definitions
@server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available MCP tools."""
return [
Tool(
name="get_card_data",
description="Retrieve detailed information about a specific Anki card including content, statistics, and scheduling data.",
inputSchema={
"type": "object",
"properties": {
"card_id": {
"type": "integer",
"description": "The ID of the Anki card to retrieve"
}
},
"required": ["card_id"]
}
),
Tool(
name="get_deck_overview",
description="Retrieve comprehensive statistics and information about an Anki deck.",
inputSchema={
"type": "object",
"properties": {
"deck_name": {
"type": "string",
"description": "The name of the Anki deck to analyze"
}
},
"required": ["deck_name"]
}
),
Tool(
name="get_deck_cards",
description="Retrieve all cards from a specified deck with their detailed information.",
inputSchema={
"type": "object",
"properties": {
"deck_name": {
"type": "string",
"description": "The name of the Anki deck"
},
"limit": {
"type": "integer",
"description": "Maximum number of cards to return (optional, default 50)",
"default": 50
}
},
"required": ["deck_name"]
}
),
Tool(
name="get_lapsed_cards",
description="Retrieve cards that were failed (lapsed) within a recent number of days.",
inputSchema={
"type": "object",
"properties": {
"deck_name": {
"type": "string",
"description": "The name of the Anki deck (use '*' for all decks)"
},
"days": {
"type": "integer",
"description": "Number of recent days to look back for lapsed cards"
}
},
"required": ["deck_name", "days"]
}
),
Tool(
name="search_cards",
description="Search for cards using Anki's search syntax.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Anki search query (e.g., 'tag:difficult', 'deck:Spanish', 'is:due')"
},
"limit": {
"type": "integer",
"description": "Maximum number of cards to return (optional, default 50)",
"default": 50
}
},
"required": ["query"]
}
),
Tool(
name="list_decks",
description="List all available Anki decks with basic information.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
# Write Operations - Card Creation
Tool(
name="create_card",
description="Create a single flashcard in a specified deck",
inputSchema={
"type": "object",
"properties": {
"deck_name": {"type": "string", "description": "Target deck name"},
"model_name": {"type": "string", "description": "Note type/model name"},
"fields": {
"type": "object",
"description": "Field name-value pairs",
"additionalProperties": {"type": "string"}
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Optional tags for the card"
}
},
"required": ["deck_name", "model_name", "fields"]
}
),
Tool(
name="create_cards_batch",
description="Create multiple cards in a single operation for efficiency",
inputSchema={
"type": "object",
"properties": {
"cards": {
"type": "array",
"items": {
"type": "object",
"properties": {
"deck_name": {"type": "string"},
"model_name": {"type": "string"},
"fields": {"type": "object"},
"tags": {"type": "array", "items": {"type": "string"}}
},
"required": ["deck_name", "model_name", "fields"]
}
}
},
"required": ["cards"]
}
),
# Write Operations - Card Updates
Tool(
name="update_card",
description="Update fields and tags of an existing card",
inputSchema={
"type": "object",
"properties": {
"note_id": {"type": "integer", "description": "Anki note ID"},
"fields": {
"type": "object",
"description": "Fields to update (partial update supported)",
"additionalProperties": {"type": "string"}
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Complete tags array (replaces existing tags)"
}
},
"required": ["note_id"]
}
),
Tool(
name="update_cards_batch",
description="Update multiple cards efficiently",
inputSchema={
"type": "object",
"properties": {
"updates": {
"type": "array",
"items": {
"type": "object",
"properties": {
"note_id": {"type": "integer"},
"fields": {"type": "object"},
"tags": {"type": "array", "items": {"type": "string"}}
},
"required": ["note_id"]
}
}
},
"required": ["updates"]
}
),
# Write Operations - Card Management
Tool(
name="suspend_cards",
description="Suspend cards to temporarily remove them from review",
inputSchema={
"type": "object",
"properties": {
"card_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of card IDs to suspend"
}
},
"required": ["card_ids"]
}
),
Tool(
name="unsuspend_cards",
description="Unsuspend previously suspended cards",
inputSchema={
"type": "object",
"properties": {
"card_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of card IDs to unsuspend"
}
},
"required": ["card_ids"]
}
),
Tool(
name="delete_cards",
description="Permanently delete cards (requires confirmation)",
inputSchema={
"type": "object",
"properties": {
"note_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of note IDs to delete"
},
"confirm_deletion": {
"type": "boolean",
"description": "Must be true to confirm permanent deletion"
}
},
"required": ["note_ids", "confirm_deletion"]
}
),
# Write Operations - Deck Management
Tool(
name="create_deck",
description="Create a new deck or subdeck",
inputSchema={
"type": "object",
"properties": {
"deck_name": {
"type": "string",
"description": "Deck name (use :: for subdecks, e.g., 'Languages::Spanish')"
}
},
"required": ["deck_name"]
}
),
Tool(
name="move_cards_to_deck",
description="Move cards between decks",
inputSchema={
"type": "object",
"properties": {
"card_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of card IDs to move"
},
"target_deck": {
"type": "string",
"description": "Target deck name"
}
},
"required": ["card_ids", "target_deck"]
}
),
Tool(
name="delete_deck",
description="Delete a deck and optionally its cards",
inputSchema={
"type": "object",
"properties": {
"deck_name": {"type": "string"},
"delete_cards": {
"type": "boolean",
"description": "If true, delete cards; if false, move to default deck"
},
"confirm_deletion": {
"type": "boolean",
"description": "Must be true to confirm deck deletion"
}
},
"required": ["deck_name", "delete_cards", "confirm_deletion"]
}
),
# Write Operations - Tag Management
Tool(
name="add_tags_to_cards",
description="Add tags to multiple cards without replacing existing tags",
inputSchema={
"type": "object",
"properties": {
"note_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of note IDs to tag"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Tags to add"
}
},
"required": ["note_ids", "tags"]
}
),
Tool(
name="remove_tags_from_cards",
description="Remove specific tags from multiple cards",
inputSchema={
"type": "object",
"properties": {
"note_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of note IDs"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Tags to remove"
}
},
"required": ["note_ids", "tags"]
}
)
]
# MCP Tool Implementations
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle MCP tool calls."""
try:
if name == "get_card_data":
card_id = arguments.get("card_id")
if not card_id:
return [TextContent(type="text", text="Error: card_id is required")]
result = await get_card_data_function(card_id)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_deck_overview":
deck_name = arguments.get("deck_name")
if not deck_name:
return [TextContent(type="text", text="Error: deck_name is required")]
result = await get_deck_overview_function(deck_name)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_deck_cards":
deck_name = arguments.get("deck_name")
limit = arguments.get("limit", MAX_CARDS_PER_QUERY)
if not deck_name:
return [TextContent(type="text", text="Error: deck_name is required")]
result = await get_deck_cards_function(deck_name, limit)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_lapsed_cards":
deck_name = arguments.get("deck_name")
days = arguments.get("days")
if not deck_name or days is None:
return [TextContent(type="text", text="Error: deck_name and days are required")]
result = await get_lapsed_cards_function(deck_name, days)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "search_cards":
query = arguments.get("query")
limit = arguments.get("limit", MAX_CARDS_PER_QUERY)
if not query:
return [TextContent(type="text", text="Error: query is required")]
result = await search_cards_function(query, limit)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "list_decks":
result = await list_decks_function()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Write Operations - Card Creation
elif name == "create_card":
result = await create_card_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "create_cards_batch":
result = await create_cards_batch_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Write Operations - Card Updates
elif name == "update_card":
result = await update_card_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "update_cards_batch":
result = await update_cards_batch_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Write Operations - Card Management
elif name == "suspend_cards":
result = await suspend_cards_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "unsuspend_cards":
result = await unsuspend_cards_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "delete_cards":
result = await delete_cards_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Write Operations - Deck Management
elif name == "create_deck":
result = await create_deck_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "move_cards_to_deck":
result = await move_cards_to_deck_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "delete_deck":
result = await delete_deck_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Write Operations - Tag Management
elif name == "add_tags_to_cards":
result = await add_tags_to_cards_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "remove_tags_from_cards":
result = await remove_tags_from_cards_function(**arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
else:
return [TextContent(type="text", text=f"Error: Unknown tool '{name}'")]
except Exception as e:
logger.error(f"Tool execution failed for {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
# Data retrieval functions
async def get_card_data_function(card_id: int) -> Dict[str, Any]:
"""Retrieve detailed card information."""
try:
# Get card information
card_info = await anki_connect_request("cardsInfo", {"cards": [card_id]})
if not card_info:
return {"error": f"Card {card_id} not found", "card_id": card_id}
card_data = card_info[0]
# Get note information
note_id = card_data.get("note")
note_info = await anki_connect_request("notesInfo", {"notes": [note_id]})
note_data = note_info[0] if note_info else {}
# Get model information
model_name = note_data.get("modelName", "")
# Get deck name
deck_name = card_data.get("deckName", "")
# Structure the response
return {
"card_id": card_id,
"note_id": note_id,
"deck_name": deck_name,
"model_name": model_name,
"card_type": card_data.get("type"),
"queue": card_data.get("queue"),
"due": card_data.get("due"),
"interval": card_data.get("interval"),
"ease_factor": card_data.get("factor", 0) / 1000.0 if card_data.get("factor") else 0,
"reviews": card_data.get("reps", 0),
"lapses": card_data.get("lapses", 0),
"left": card_data.get("left", 0),
"fields": note_data.get("fields", {}),
"tags": note_data.get("tags", []),
"creation_time": card_data.get("cardId", 0),
"modification_time": note_data.get("mod", 0),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to get card data for {card_id}: {e}")
return {"error": f"Failed to retrieve card {card_id}: {str(e)}", "card_id": card_id}
async def get_deck_overview_function(deck_name: str) -> Dict[str, Any]:
"""Get comprehensive deck overview."""
try:
# Get deck statistics
deck_stats = await anki_connect_request("getDeckStats", {"decks": [deck_name]})
# Get all cards in deck for additional analysis
card_ids = await anki_connect_request("findCards", {"query": f"deck:\"{deck_name}\""})
# Get sample cards for analysis
sample_cards = []
if card_ids:
sample_size = min(10, len(card_ids))
for i in range(0, sample_size):
card_data = await get_card_data_function(card_ids[i])
if "error" not in card_data:
sample_cards.append(card_data)
# Calculate additional metrics
card_types = {"new": 0, "learning": 0, "review": 0, "relearning": 0}
ease_factors = []
intervals = []
lapses_total = 0
for card_id in card_ids[:100]: # Limit to first 100 for performance
try:
card_info = await anki_connect_request("cardsInfo", {"cards": [card_id]})
if card_info:
card = card_info[0]
card_type = card.get("type", 0)
if card_type == 0:
card_types["new"] += 1
elif card_type == 1:
card_types["learning"] += 1
elif card_type == 2:
card_types["review"] += 1
elif card_type == 3:
card_types["relearning"] += 1
if card.get("factor"):
ease_factors.append(card["factor"] / 1000.0)
if card.get("interval"):
intervals.append(card["interval"])
if card.get("lapses"):
lapses_total += card["lapses"]
except:
continue
return {
"deck_name": deck_name,
"total_cards": len(card_ids),
"basic_stats": deck_stats.get(deck_name, {}) if isinstance(deck_stats, dict) else deck_stats,
"card_distribution": card_types,
"difficulty_metrics": {
"avg_ease_factor": sum(ease_factors) / len(ease_factors) if ease_factors else 0,
"min_ease_factor": min(ease_factors) if ease_factors else 0,
"max_ease_factor": max(ease_factors) if ease_factors else 0,
"avg_interval": sum(intervals) / len(intervals) if intervals else 0,
"max_interval": max(intervals) if intervals else 0,
"total_lapses": lapses_total,
"cards_analyzed": min(100, len(card_ids))
},
"sample_cards": sample_cards,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to get deck overview for {deck_name}: {e}")
return {"error": f"Failed to get overview for deck '{deck_name}': {str(e)}", "deck_name": deck_name}
async def get_deck_cards_function(deck_name: str, limit: int = None) -> Dict[str, Any]:
"""Get all cards from a deck."""
try:
# Use configured default if no limit specified
if limit is None:
limit = MAX_CARDS_PER_QUERY
# Find all cards in the deck
card_ids = await anki_connect_request("findCards", {"query": f"deck:\"{deck_name}\""})
if not card_ids:
return {
"deck_name": deck_name,
"total_cards": 0,
"returned_cards": 0,
"cards": [],
"message": "No cards found in this deck.",
"timestamp": datetime.now().isoformat()
}
# Limit the number of cards
if limit:
card_ids = card_ids[:limit]
# Get detailed info for each card
cards = []
for card_id in card_ids:
try:
card_data = await get_card_data_function(card_id)
if "error" not in card_data:
cards.append(card_data)
except Exception as e:
logger.error(f"Failed to get data for card {card_id}: {e}")
continue
return {
"deck_name": deck_name,
"total_cards_in_deck": len(await anki_connect_request("findCards", {"query": f"deck:\"{deck_name}\""})),
"returned_cards": len(cards),
"cards": cards,
"limit_applied": limit,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to get cards for deck {deck_name}: {e}")
return {"error": f"Failed to retrieve cards from deck '{deck_name}': {str(e)}", "deck_name": deck_name}
async def get_lapsed_cards_function(deck_name: str, days: int) -> Dict[str, Any]:
"""Get cards that were failed within the specified number of days."""
try:
# Build query for lapsed cards
if deck_name == "*":
query = f"rated:{days}:1"
else:
query = f"deck:\"{deck_name}\" rated:{days}:1"
# Find lapsed cards
card_ids = await anki_connect_request("findCards", {"query": query})
if not card_ids:
return {
"deck_name": deck_name,
"days": days,
"lapsed_cards": [],
"count": 0,
"message": "No lapsed cards found for the specified criteria.",
"timestamp": datetime.now().isoformat()
}
# Get detailed card information
cards = []
for card_id in card_ids:
try:
card_data = await get_card_data_function(card_id)
if "error" not in card_data:
cards.append(card_data)
except Exception as e:
logger.error(f"Failed to get data for lapsed card {card_id}: {e}")
continue
return {
"deck_name": deck_name,
"days": days,
"lapsed_cards": cards,
"count": len(cards),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to get lapsed cards for {deck_name}: {e}")
return {"error": f"Failed to retrieve lapsed cards: {str(e)}", "deck_name": deck_name, "days": days}
async def search_cards_function(query: str, limit: int = None) -> Dict[str, Any]:
"""Search for cards using Anki's search syntax."""
try:
# Use configured default if no limit specified
if limit is None:
limit = MAX_CARDS_PER_QUERY
# Find cards matching the query
card_ids = await anki_connect_request("findCards", {"query": query})
if not card_ids:
return {
"query": query,
"cards": [],
"count": 0,
"message": "No cards found matching the search criteria.",
"timestamp": datetime.now().isoformat()
}
# Limit results
if limit:
card_ids = card_ids[:limit]
# Get detailed card information
cards = []
for card_id in card_ids:
try:
card_data = await get_card_data_function(card_id)
if "error" not in card_data:
cards.append(card_data)
except Exception as e:
logger.error(f"Failed to get data for card {card_id}: {e}")
continue
return {
"query": query,
"total_matches": len(await anki_connect_request("findCards", {"query": query})),
"returned_cards": len(cards),
"cards": cards,
"limit_applied": limit,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to search cards with query '{query}': {e}")
return {"error": f"Search failed: {str(e)}", "query": query}
async def list_decks_function() -> Dict[str, Any]:
"""List all available decks."""
try:
# Get all deck names
deck_names = await anki_connect_request("deckNames")
# Get detailed info for each deck
decks = []
for deck_name in deck_names:
try:
# Get card count
card_ids = await anki_connect_request("findCards", {"query": f"deck:\"{deck_name}\""})
card_count = len(card_ids)
# Get deck stats
try:
deck_stats = await anki_connect_request("getDeckStats", {"decks": [deck_name]})
stats = deck_stats.get(deck_name, {}) if isinstance(deck_stats, dict) else {}
except:
stats = {}
decks.append({
"name": deck_name,
"card_count": card_count,
"stats": stats
})
except Exception as e:
logger.error(f"Failed to get info for deck {deck_name}: {e}")
decks.append({
"name": deck_name,
"card_count": 0,
"error": str(e)
})
return {
"decks": decks,
"total_decks": len(decks),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to list decks: {e}")
return {"error": f"Failed to list decks: {str(e)}"}
# ============================================================================
# WRITE OPERATIONS - Card Creation
# ============================================================================
async def create_card_function(deck_name: str, model_name: str, fields: Dict[str, str], tags: List[str] = None) -> Dict[str, Any]:
"""Create a single flashcard in a specified deck."""
try:
validate_write_operations_enabled("create_card")
# Prepare note data
note_data = {
"deckName": deck_name,
"modelName": model_name,
"fields": fields,
"tags": tags or []
}
# Create the note
result = await anki_connect_request("addNote", {"note": note_data})
if result:
return create_success_response(
"create_card",
{"note_id": result},
f"Card created successfully in deck '{deck_name}'"
)
else:
return create_error_response("create_card", "CreationError", "Failed to create card")
except Exception as e:
logger.error(f"Failed to create card: {e}")
return create_error_response("create_card", "CreationError", str(e))
async def create_cards_batch_function(cards: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create multiple cards in a single operation for efficiency."""
try:
validate_write_operations_enabled("create_cards_batch")
validate_batch_size(cards, "create_cards_batch")
# Prepare notes data
notes_data = []
for card in cards:
note_data = {
"deckName": card["deck_name"],
"modelName": card["model_name"],
"fields": card["fields"],
"tags": card.get("tags", [])
}
notes_data.append(note_data)
# Create all notes
result = await anki_connect_request("addNotes", {"notes": notes_data})
success_count = len([note_id for note_id in result if note_id is not None])
failure_count = len([note_id for note_id in result if note_id is None])
return create_success_response(
"create_cards_batch",
{
"note_ids": result,
"success_count": success_count,
"failure_count": failure_count,
"total_attempted": len(cards)
},
f"Batch creation completed: {success_count} succeeded, {failure_count} failed"
)
except Exception as e:
logger.error(f"Failed to create cards batch: {e}")
return create_error_response("create_cards_batch", "BatchCreationError", str(e))
# ============================================================================
# WRITE OPERATIONS - Card Updates
# ============================================================================
async def update_card_function(note_id: int, fields: Dict[str, str] = None, tags: List[str] = None) -> Dict[str, Any]:
"""Update fields and tags of an existing card."""
try:
validate_write_operations_enabled("update_card")
# Update fields if provided
if fields:
update_data = {
"note": {
"id": note_id,
"fields": fields
}
}
await anki_connect_request("updateNoteFields", update_data)
# Update tags if provided
if tags is not None:
await anki_connect_request("updateNoteTags", {
"note": note_id,
"tags": " ".join(tags)
})
return create_success_response(
"update_card",
{"note_id": note_id, "updated_fields": bool(fields), "updated_tags": tags is not None},
f"Card {note_id} updated successfully"
)
except Exception as e:
logger.error(f"Failed to update card {note_id}: {e}")
return create_error_response("update_card", "UpdateError", str(e))
async def update_cards_batch_function(updates: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Update multiple cards efficiently."""
try:
validate_write_operations_enabled("update_cards_batch")
validate_batch_size(updates, "update_cards_batch")
success_count = 0
failure_count = 0
errors = []
for update in updates:
try:
note_id = update["note_id"]
fields = update.get("fields")
tags = update.get("tags")
# Update fields if provided
if fields:
await anki_connect_request("updateNoteFields", {
"note": {"id": note_id, "fields": fields}
})
# Update tags if provided
if tags is not None:
await anki_connect_request("updateNoteTags", {
"note": note_id,
"tags": " ".join(tags)
})
success_count += 1
except Exception as e:
failure_count += 1
errors.append(f"Note {note_id}: {str(e)}")
return create_success_response(
"update_cards_batch",
{
"success_count": success_count,
"failure_count": failure_count,
"total_attempted": len(updates),
"errors": errors
},
f"Batch update completed: {success_count} succeeded, {failure_count} failed"
)
except Exception as e:
logger.error(f"Failed to update cards batch: {e}")
return create_error_response("update_cards_batch", "BatchUpdateError", str(e))
# ============================================================================
# WRITE OPERATIONS - Card Management
# ============================================================================
async def suspend_cards_function(card_ids: List[int]) -> Dict[str, Any]:
"""Suspend cards to temporarily remove them from review."""
try:
validate_write_operations_enabled("suspend_cards")
await anki_connect_request("suspend", {"cards": card_ids})
return create_success_response(
"suspend_cards",
{"suspended_count": len(card_ids), "card_ids": card_ids},
f"Successfully suspended {len(card_ids)} cards"
)
except Exception as e:
logger.error(f"Failed to suspend cards: {e}")
return create_error_response("suspend_cards", "SuspendError", str(e))
async def unsuspend_cards_function(card_ids: List[int]) -> Dict[str, Any]:
"""Unsuspend previously suspended cards."""
try:
validate_write_operations_enabled("unsuspend_cards")
await anki_connect_request("unsuspend", {"cards": card_ids})
return create_success_response(
"unsuspend_cards",
{"unsuspended_count": len(card_ids), "card_ids": card_ids},
f"Successfully unsuspended {len(card_ids)} cards"
)
except Exception as e:
logger.error(f"Failed to unsuspend cards: {e}")
return create_error_response("unsuspend_cards", "UnsuspendError", str(e))
async def delete_cards_function(note_ids: List[int], confirm_deletion: bool) -> Dict[str, Any]:
"""Permanently delete cards (requires confirmation)."""
try:
validate_write_operations_enabled("delete_cards")
validate_confirmation_required("delete_cards", confirm_deletion)
await anki_connect_request("deleteNotes", {"notes": note_ids})
return create_success_response(
"delete_cards",
{"deleted_count": len(note_ids), "note_ids": note_ids},
f"Successfully deleted {len(note_ids)} notes"
)
except Exception as e:
logger.error(f"Failed to delete cards: {e}")
return create_error_response("delete_cards", "DeletionError", str(e))
# ============================================================================
# WRITE OPERATIONS - Deck Management
# ============================================================================
async def create_deck_function(deck_name: str) -> Dict[str, Any]:
"""Create a new deck or subdeck."""
try:
validate_write_operations_enabled("create_deck")
deck_id = await anki_connect_request("createDeck", {"deck": deck_name})
return create_success_response(
"create_deck",
{"deck_id": deck_id, "deck_name": deck_name},
f"Successfully created deck '{deck_name}'"
)
except Exception as e:
logger.error(f"Failed to create deck {deck_name}: {e}")
return create_error_response("create_deck", "DeckCreationError", str(e))
async def move_cards_to_deck_function(card_ids: List[int], target_deck: str) -> Dict[str, Any]:
"""Move cards between decks."""
try:
validate_write_operations_enabled("move_cards_to_deck")
await anki_connect_request("changeDeck", {"cards": card_ids, "deck": target_deck})
return create_success_response(
"move_cards_to_deck",
{"moved_count": len(card_ids), "card_ids": card_ids, "target_deck": target_deck},
f"Successfully moved {len(card_ids)} cards to '{target_deck}'"
)
except Exception as e:
logger.error(f"Failed to move cards to deck {target_deck}: {e}")
return create_error_response("move_cards_to_deck", "MoveError", str(e))
async def delete_deck_function(deck_name: str, delete_cards: bool, confirm_deletion: bool) -> Dict[str, Any]:
"""Delete a deck and optionally its cards."""
try:
validate_write_operations_enabled("delete_deck")
validate_confirmation_required("delete_deck", confirm_deletion)
await anki_connect_request("deleteDecks", {"decks": [deck_name], "cardsToo": delete_cards})
return create_success_response(
"delete_deck",
{"deck_name": deck_name, "deleted_cards": delete_cards},
f"Successfully deleted deck '{deck_name}'" + (" and its cards" if delete_cards else "")
)
except Exception as e:
logger.error(f"Failed to delete deck {deck_name}: {e}")
return create_error_response("delete_deck", "DeckDeletionError", str(e))
# ============================================================================
# WRITE OPERATIONS - Tag Management
# ============================================================================
async def add_tags_to_cards_function(note_ids: List[int], tags: List[str]) -> Dict[str, Any]:
"""Add tags to multiple cards without replacing existing tags."""
try:
validate_write_operations_enabled("add_tags_to_cards")
await anki_connect_request("addTags", {"notes": note_ids, "tags": " ".join(tags)})
return create_success_response(
"add_tags_to_cards",
{"tagged_count": len(note_ids), "note_ids": note_ids, "tags_added": tags},
f"Successfully added tags {tags} to {len(note_ids)} notes"
)
except Exception as e:
logger.error(f"Failed to add tags to cards: {e}")
return create_error_response("add_tags_to_cards", "TaggingError", str(e))
async def remove_tags_from_cards_function(note_ids: List[int], tags: List[str]) -> Dict[str, Any]:
"""Remove specific tags from multiple cards."""
try:
validate_write_operations_enabled("remove_tags_from_cards")
await anki_connect_request("removeTags", {"notes": note_ids, "tags": " ".join(tags)})
return create_success_response(
"remove_tags_from_cards",
{"untagged_count": len(note_ids), "note_ids": note_ids, "tags_removed": tags},
f"Successfully removed tags {tags} from {len(note_ids)} notes"
)
except Exception as e:
logger.error(f"Failed to remove tags from cards: {e}")
return create_error_response("remove_tags_from_cards", "UntaggingError", str(e))
async def main():
"""Run the MCP server."""
logger.info("Starting Anki MCP Data Bridge Server...")
logger.info(f"AnkiConnect URL: {ANKI_CONNECT_URL}")
logger.info("This server provides Anki data - Claude handles all analysis")
# Test AnkiConnect connection
try:
version = await anki_connect_request("version")
logger.info(f"✅ AnkiConnect connection successful (version {version})")
except Exception as e:
logger.warning(f"⚠️ AnkiConnect connection failed: {e}")
logger.info(" Make sure Anki is running with AnkiConnect add-on installed")
logger.info("🚀 MCP Data Bridge ready - Claude Desktop can now connect")
# Run the MCP server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())