server.py•25.4 kB
import json
import os
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Annotated, Any, Dict, List, Optional, cast
import mcp.types as types # Import MCP types
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from xdg import XDG_CONFIG_HOME
from ynab.api.accounts_api import AccountsApi
from ynab.api.budgets_api import BudgetsApi
from ynab.api.categories_api import CategoriesApi
from ynab.api.transactions_api import TransactionsApi
from ynab.api_client import ApiClient
from ynab.configuration import Configuration
from ynab.models.account import Account
from ynab.models.category import Category
from ynab.models.category_group_with_categories import CategoryGroupWithCategories
from ynab.models.existing_transaction import ExistingTransaction
from ynab.models.new_transaction import NewTransaction
from ynab.models.post_transactions_wrapper import PostTransactionsWrapper
from ynab.models.put_transaction_wrapper import PutTransactionWrapper
from ynab.models.transaction_detail import TransactionDetail
# 1. Load environment variables
load_dotenv(verbose=True)
# 2. Globals / configuration
ynab_api_key = os.environ.get("YNAB_API_KEY")
# Set up XDG config directory
CONFIG_DIR = Path(XDG_CONFIG_HOME) / "mcp-ynab"
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
PREFERRED_BUDGET_ID_FILE = CONFIG_DIR / "preferred_budget_id.json"
BUDGET_CATEGORY_CACHE_FILE = CONFIG_DIR / "budget_category_cache.json"
# 3. Private helper functions
async def _get_client() -> ApiClient:
"""Get a configured YNAB API client. Reads API key from environment variables."""
if not ynab_api_key:
raise ValueError("YNAB_API_KEY not found in environment variables")
configuration = Configuration(access_token=ynab_api_key)
return ApiClient(configuration)
class AsyncYNABClient:
"""Async context manager for YNAB API client."""
def __init__(self):
self.client: Optional[ApiClient] = None
async def __aenter__(self) -> ApiClient:
self.client = await _get_client()
return self.client
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
# ApiClient doesn't have a close method, but we'll keep the context manager pattern
pass
async def get_ynab_client() -> AsyncYNABClient:
"""Get an async YNAB client context manager."""
return AsyncYNABClient()
def _get_empty_table(headers: List[str]) -> str:
"""Create an empty markdown table with just headers."""
widths = [len(h) + 2 for h in headers]
header_line = "| " + " | ".join(f"{headers[i]:<{widths[i]}}" for i in range(len(headers))) + " |\n"
sep_line = "|" + "|".join("-" * (widths[i] + 2) for i in range(len(headers))) + "|\n"
return header_line + sep_line + "\n"
def _get_column_widths(headers: List[str], rows: List[List[str]], col_count: int) -> List[int]:
"""Calculate column widths based on content."""
widths = [len(h) for h in headers]
for row in rows:
for i in range(col_count):
widths[i] = max(widths[i], len(row[i]))
return [w + 2 for w in widths]
def _format_table_line(items: List[str], widths: List[int], alignments: List[str]) -> str:
"""Format a single line of the markdown table."""
line = "| "
for i, item in enumerate(items):
if alignments[i] == "right":
line += f"{item:>{widths[i]}} | "
else:
line += f"{item:<{widths[i]}} | "
return line.rstrip() + "\n"
def _build_markdown_table(
rows: List[List[str]], headers: List[str], alignments: Optional[List[str]] = None
) -> str:
"""Build a markdown table from rows and headers."""
if not rows:
return _get_empty_table(headers)
alignments = alignments if alignments is not None else ["left"] * len(headers)
col_count = len(headers)
widths = _get_column_widths(headers, rows, col_count)
header_line = _format_table_line(headers, widths, alignments)
sep_line = "|" + "|".join("-" * (w + 1) for w in widths) + "|\n"
row_lines = "".join(_format_table_line(row, widths, alignments) for row in rows)
return header_line + sep_line + row_lines
def _format_accounts_output(accounts: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Format account data into a user-friendly structure."""
account_groups: Dict[str, List[Dict[str, Any]]] = {}
type_order = [
"checking",
"savings",
"creditCard",
"mortgage",
"autoLoan",
"studentLoan",
"otherAsset",
"otherLiability",
]
type_display_names = {
"checking": "Checking Accounts",
"savings": "Savings Accounts",
"creditCard": "Credit Cards",
"mortgage": "Mortgages",
"autoLoan": "Auto Loans",
"studentLoan": "Student Loans",
"otherAsset": "Other Assets",
"otherLiability": "Other Liabilities",
}
for account in accounts:
if account.get("closed", False) or account.get("deleted", False):
continue
acct_type = account["type"]
if acct_type not in account_groups:
account_groups[acct_type] = []
balance = float(account["balance"]) / 1000
account_groups[acct_type].append(
{
"name": account["name"],
"balance": f"${balance:,.2f}",
"balance_raw": balance,
"id": account["id"],
}
)
for group in account_groups.values():
group.sort(key=lambda x: abs(x["balance_raw"]), reverse=True)
output: Dict[str, Any] = {
"accounts": [],
"summary": {
"total_assets": 0.0,
"total_liabilities": 0.0,
"net_worth": 0.0,
},
}
for acct_type in type_order:
if acct_type in account_groups and account_groups[acct_type]:
group_data = {
"type": type_display_names.get(acct_type, acct_type),
"accounts": account_groups[acct_type],
}
group_total = sum(acct["balance_raw"] for acct in account_groups[acct_type])
group_data["total"] = f"${group_total:,.2f}"
if acct_type in ["checking", "savings", "otherAsset"]:
output["summary"]["total_assets"] += group_total
elif acct_type in [
"creditCard",
"mortgage",
"autoLoan",
"studentLoan",
"otherLiability",
]:
output["summary"]["total_liabilities"] += abs(group_total)
output["accounts"].append(group_data)
output["summary"]["net_worth_raw"] = (
output["summary"]["total_assets"] - output["summary"]["total_liabilities"]
)
output["summary"]["total_assets"] = f"${output['summary']['total_assets']:,.2f}"
output["summary"]["total_liabilities"] = f"${output['summary']['total_liabilities']:,.2f}"
output["summary"]["net_worth"] = f"${output['summary']['net_worth_raw']:,.2f}"
return output
def _load_json_file(filename: str | Path) -> Dict[str, Any]:
"""Load JSON data from a file."""
try:
with open(filename, "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_json_file(filename: str | Path, data: Dict[str, Any]) -> None:
"""Save JSON data to a file."""
with open(filename, "w") as f:
json.dump(data, f, indent=2)
# 4. Create the MCP server instance
mcp = FastMCP("YNAB")
# Define resources
class YNABResources:
def __init__(self):
self._preferred_budget_id: Optional[str] = None
self._category_cache: Dict[str, List[Dict[str, Any]]] = {}
self._load_data()
def _load_data(self) -> None:
"""Load data from files."""
try:
with open(PREFERRED_BUDGET_ID_FILE, "r") as f:
self._preferred_budget_id = f.read().strip() or None
except FileNotFoundError:
self._preferred_budget_id = None
try:
self._category_cache = _load_json_file(BUDGET_CATEGORY_CACHE_FILE)
except FileNotFoundError:
self._category_cache = {}
def get_preferred_budget_id(self) -> Optional[str]:
"""Get the preferred budget ID."""
return self._preferred_budget_id
def set_preferred_budget_id(self, budget_id: str) -> None:
"""Set the preferred budget ID."""
self._preferred_budget_id = budget_id
with open(PREFERRED_BUDGET_ID_FILE, "w") as f:
f.write(budget_id)
def get_cached_categories(self, budget_id: str) -> list[types.TextContent]:
"""Get categories from the cache formatted for MCP resources."""
cached_categories = self._category_cache.get(budget_id, [])
return [
types.TextContent(
type="text", text=f"{cat.get('name', 'Unnamed')} (ID: {cat.get('id', 'N/A')})"
)
for cat in cached_categories
]
def cache_categories(self, budget_id: str, categories: List[Dict[str, Any]]) -> None:
"""Cache categories for a budget ID."""
self._category_cache[budget_id] = [
{
"id": cat.get("id"),
"name": cat.get("name"),
"group": cat.get("category_group_name"),
}
for cat in categories
]
_save_json_file(BUDGET_CATEGORY_CACHE_FILE, self._category_cache)
# Instantiate the resources
ynab_resources = YNABResources()
# Define resources using decorators
@mcp.resource("ynab://preferences/budget_id")
def get_preferred_budget_id() -> Optional[str]:
"""Get the preferred YNAB budget ID."""
return ynab_resources.get_preferred_budget_id()
@mcp.resource("ynab://categories/{budget_id}")
def get_cached_categories(budget_id: str) -> list[types.TextContent]:
"""Get cached categories for a budget ID."""
return ynab_resources.get_cached_categories(budget_id)
# 5. Public tool functions
async def _find_category_id(client: ApiClient, budget_id: str, category_name: str) -> Optional[str]:
"""Find a category ID by name."""
categories_api = CategoriesApi(client)
categories_response = categories_api.get_categories(budget_id)
categories = categories_response.data.category_groups
for group in categories:
for cat in group.categories:
if cat.name.lower() == category_name.lower():
return cat.id
return None
@mcp.tool()
async def create_transaction(
account_id: str,
amount: Annotated[float, Field(description="Amount in dollars")],
payee_name: str,
category_name: Optional[str] = None,
memo: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a new transaction in YNAB."""
async with await get_ynab_client() as client:
transactions_api = TransactionsApi(client)
budgets_api = BudgetsApi(client)
amount_milliunits = int(amount * 1000)
# Use preferred budget ID if available, otherwise fetch a list of budgets
budget_id = ynab_resources.get_preferred_budget_id()
if not budget_id:
budgets_response = budgets_api.get_budgets()
budget_id = budgets_response.data.budgets[0].id
category_id = None
if category_name:
category_id = await _find_category_id(client, budget_id, category_name)
# Create transaction data
transaction = NewTransaction(
account_id=account_id,
date=date.today(),
amount=amount_milliunits,
payee_name=payee_name,
memo=memo,
category_id=category_id,
)
wrapper = PostTransactionsWrapper(transaction=transaction)
response = transactions_api.create_transaction(budget_id, wrapper)
if response.data and response.data.transaction:
return response.data.transaction.to_dict()
return {}
@mcp.tool()
async def get_account_balance(account_id: str) -> float:
"""Get the current balance of a YNAB account (in dollars)."""
async with await get_ynab_client() as client:
accounts_api = AccountsApi(client)
budgets_api = BudgetsApi(client)
budgets_response = budgets_api.get_budgets()
budget_id = budgets_response.data.budgets[0].id
response = accounts_api.get_account_by_id(budget_id, account_id)
return float(response.data.account.balance) / 1000
@mcp.tool()
async def get_budgets() -> str:
"""List all YNAB budgets in Markdown format."""
async with await get_ynab_client() as client:
budgets_api = BudgetsApi(client)
budgets_response = budgets_api.get_budgets()
budgets_list = budgets_response.data.budgets
markdown = "# YNAB Budgets\n\n"
if not budgets_list:
markdown += "_No budgets found._"
else:
for budget in budgets_list:
b = budget.to_dict()
markdown += f"- **{b.get('name', 'Unnamed Budget')}** (ID: {b.get('id')})\n"
return markdown
@mcp.tool()
async def get_accounts(budget_id: str) -> str:
"""List all YNAB accounts in a specific budget in Markdown format."""
async with await get_ynab_client() as client:
accounts_api = AccountsApi(client)
all_accounts: List[Dict[str, Any]] = []
response = accounts_api.get_accounts(budget_id)
for account in response.data.accounts:
if isinstance(account, Account):
all_accounts.append(account.to_dict())
formatted = _format_accounts_output(all_accounts)
markdown = "# YNAB Account Summary\n\n"
markdown += "## Summary\n"
markdown += f"- **Total Assets:** {formatted['summary']['total_assets']}\n"
markdown += f"- **Total Liabilities:** {formatted['summary']['total_liabilities']}\n"
markdown += f"- **Net Worth:** {formatted['summary']['net_worth']}\n\n"
for group in formatted["accounts"]:
markdown += f"## {group['type']}\n"
markdown += f"**Group Total:** {group['total']}\n\n"
rows = []
for acct in group["accounts"]:
rows.append([acct["name"], acct["balance"], acct["id"]])
markdown += _build_markdown_table(
rows, ["Account Name", "Balance", "ID"], ["left", "right", "left"]
)
markdown += "\n"
return markdown
@mcp.tool()
async def get_transactions(budget_id: str, account_id: str) -> str:
"""Get recent transactions for a specific account in a specific budget."""
async with await get_ynab_client() as client:
transactions_api = TransactionsApi(client)
all_transactions: List[TransactionDetail] = []
since_date = datetime.now().replace(day=1).date()
response = transactions_api.get_transactions_by_account(
budget_id, account_id, since_date=since_date
)
all_transactions.extend(response.data.transactions)
markdown = "# Recent Transactions\n\n"
if not all_transactions:
return markdown + "_No recent transactions found._\n"
headers = ["ID", "Date", "Amount", "Payee Name", "Category Name", "Memo"]
align = ["left", "left", "right", "left", "left", "left"]
rows = []
for txn in all_transactions:
amount_str = f"${txn.amount / 1000:,.2f}"
rows.append(
[
txn.id,
txn.var_date.strftime("%Y-%m-%d"),
amount_str,
txn.payee_name or "N/A",
txn.category_name or "N/A",
txn.memo or "",
]
)
markdown += _build_markdown_table(rows, headers, align)
return markdown
def _get_transaction_row(
txn: TransactionDetail, account_map: Dict[str, str], filter_type: str
) -> List[str]:
"""Format a transaction into a row for the markdown table."""
amount_dollars = float(txn.amount) / 1000
amount_str = f"${abs(amount_dollars):,.2f}"
if amount_dollars < 0:
amount_str = f"-{amount_str}"
status = []
if not txn.category_id:
status.append("Uncategorized")
if not txn.approved:
status.append("Unapproved")
return [
txn.id,
txn.var_date.strftime("%Y-%m-%d"),
account_map.get(txn.account_id, "Unknown"),
amount_str,
txn.payee_name or "N/A",
", ".join(status),
txn.memo or "",
]
def _filter_transactions(
transactions: List[TransactionDetail], filter_type: str
) -> List[TransactionDetail]:
"""Filter transactions based on the filter type."""
needs_attention = []
for txn in transactions:
if isinstance(txn, TransactionDetail):
needs_category = filter_type in ["uncategorized", "both"] and not txn.category_id
needs_approval = filter_type in ["unapproved", "both"] and not txn.approved
if needs_category or needs_approval:
needs_attention.append(txn)
return needs_attention
@mcp.tool()
async def get_transactions_needing_attention(
budget_id: str,
filter_type: Annotated[
str,
Field(
description="Type of transactions to show. One of: 'uncategorized', 'unapproved', 'both'"
),
] = "both",
days_back: Annotated[
Optional[int], Field(description="Number of days to look back (default 30, None for all)")
] = 30,
) -> str:
"""List transactions that need attention based on specified filter type in a YNAB budget."""
filter_type = filter_type.lower()
if filter_type not in ["uncategorized", "unapproved", "both"]:
return "Error: Invalid filter_type. Must be 'uncategorized', 'unapproved', or 'both'"
async with await get_ynab_client() as client:
transactions_api = TransactionsApi(client)
accounts_api = AccountsApi(client)
accounts_response = accounts_api.get_accounts(budget_id)
account_map = {
account.id: account.name
for account in accounts_response.data.accounts
if not account.closed and not account.deleted
}
since_date = (datetime.now() - timedelta(days=days_back)).date() if days_back else None
response = transactions_api.get_transactions(budget_id, since_date=since_date)
needs_attention = _filter_transactions(response.data.transactions, filter_type)
markdown = f"# Transactions Needing Attention ({filter_type.title()})\n\n"
if not needs_attention:
return markdown + "_No transactions need attention._"
markdown += "**Filters Applied:**\n"
markdown += f"- Filter type: {filter_type}\n"
if days_back:
markdown += f"- Looking back {days_back} days\n"
markdown += "\n"
headers = ["ID", "Date", "Account", "Amount", "Payee", "Status", "Memo"]
align = ["left", "left", "left", "right", "left", "left", "left"]
rows = [_get_transaction_row(txn, account_map, filter_type) for txn in needs_attention]
markdown += _build_markdown_table(rows, headers, align)
return markdown
@mcp.tool()
def _find_transaction_by_id(
transactions: List[TransactionDetail], transaction_id: str, id_type: str
) -> Optional[TransactionDetail]:
"""Find a transaction by its ID and ID type."""
for txn in transactions:
if (
(id_type == "id" and txn.id == transaction_id)
or (id_type == "import_id" and txn.import_id == transaction_id)
or (
id_type == "transfer_transaction_id"
and txn.transfer_transaction_id == transaction_id
)
or (
id_type == "matched_transaction_id" and txn.matched_transaction_id == transaction_id
)
):
return txn
return None
async def categorize_transaction(
budget_id: str,
transaction_id: str,
category_id: str,
id_type: str = "id", # One of: "id", "import_id", "transfer_transaction_id", "matched_transaction_id"
) -> str:
"""Categorize a transaction for a given YNAB budget with the provided category ID.
Args:
budget_id: The YNAB budget ID
transaction_id: The transaction identifier
category_id: The category ID to assign
id_type: The type of transaction ID being provided. One of:
- "id": Direct transaction ID (default)
- "import_id": YNAB import ID format (YNAB:[milliunit_amount]:[iso_date]:[occurrence])
- "transfer_transaction_id": ID of a transfer transaction
- "matched_transaction_id": ID of a matched transaction
"""
async with await get_ynab_client() as client:
transactions_api = TransactionsApi(client)
# Get since_date for import_id type
since_date = None
if id_type == "import_id" and ":" in transaction_id:
try:
since_date = datetime.strptime(transaction_id.split(":")[2], "%Y-%m-%d").date()
except (ValueError, IndexError):
pass
response = transactions_api.get_transactions(budget_id, since_date=since_date)
target_transaction = _find_transaction_by_id(
response.data.transactions, transaction_id, id_type
)
if target_transaction:
wrapper = PutTransactionWrapper(
transaction=ExistingTransaction(
account_id=target_transaction.account_id,
amount=target_transaction.amount,
category_id=category_id,
)
)
transactions_api.update_transaction(
budget_id=budget_id,
transaction_id=target_transaction.id,
data=wrapper,
)
return f"Transaction {transaction_id} (type: {id_type}) categorized as {category_id}."
return f"Transaction {transaction_id} (type: {id_type}) not found."
def _process_category_data(category: Category | Dict[str, Any]) -> tuple[str, str, float, float]:
"""Process category data and return tuple of (id, name, budgeted, activity)."""
if isinstance(category, Category):
return category.id, category.name, category.budgeted, category.activity
cat_dict = cast(Dict[str, Any], category)
return cat_dict["id"], cat_dict["name"], cat_dict["budgeted"], cat_dict["activity"]
def _format_dollar_amount(amount: float) -> str:
"""Format a dollar amount with proper sign and formatting."""
amount_str = f"${abs(amount):,.2f}"
return f"-{amount_str}" if amount < 0 else amount_str
@mcp.tool()
async def get_categories(budget_id: str) -> str:
"""List all transaction categories for a given YNAB budget in Markdown format."""
async with await get_ynab_client() as client:
categories_api = CategoriesApi(client)
response = categories_api.get_categories(budget_id)
groups = response.data.category_groups
markdown = "# YNAB Categories\n\n"
headers = ["Category ID", "Category Name", "Budgeted", "Activity"]
align = ["left", "left", "right", "right"]
for group in groups:
if isinstance(group, CategoryGroupWithCategories):
categories_list = group.categories
group_name = group.name
else:
group_dict = cast(Dict[str, Any], group.to_dict())
categories_list = group_dict["categories"]
group_name = group_dict["name"]
if not categories_list:
continue
markdown += f"## {group_name}\n\n"
rows = []
for category in categories_list:
cat_id, name, budgeted, activity = _process_category_data(category)
budgeted_dollars = float(budgeted) / 1000 if budgeted else 0
activity_dollars = float(activity) / 1000 if activity else 0
rows.append(
[
cat_id,
name,
_format_dollar_amount(budgeted_dollars),
_format_dollar_amount(activity_dollars),
]
)
table_md = _build_markdown_table(rows, headers, align)
markdown += table_md + "\n"
return markdown
@mcp.tool()
async def set_preferred_budget_id(budget_id: str) -> str:
"""Set the preferred YNAB budget ID."""
ynab_resources.set_preferred_budget_id(budget_id)
return f"Preferred budget ID set to {budget_id}"
@mcp.tool()
async def cache_categories(budget_id: str) -> str:
"""Cache all categories for a given YNAB budget ID."""
async with await get_ynab_client() as client:
categories_api = CategoriesApi(client)
response = categories_api.get_categories(budget_id)
groups = response.data.category_groups
categories = []
for group in groups:
if isinstance(group, CategoryGroupWithCategories):
categories.extend(group.categories)
ynab_resources.cache_categories(budget_id, [cat.to_dict() for cat in categories])
return f"Categories cached for budget ID {budget_id}"