Skip to main content
Glama

MCP YNAB Server

by klauern
server.py25.4 kB
import json import os from datetime import date, datetime, timedelta from pathlib import Path from typing import Annotated, Any, Dict, List, Optional, cast import mcp.types as types # Import MCP types from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from pydantic import Field from xdg import XDG_CONFIG_HOME from ynab.api.accounts_api import AccountsApi from ynab.api.budgets_api import BudgetsApi from ynab.api.categories_api import CategoriesApi from ynab.api.transactions_api import TransactionsApi from ynab.api_client import ApiClient from ynab.configuration import Configuration from ynab.models.account import Account from ynab.models.category import Category from ynab.models.category_group_with_categories import CategoryGroupWithCategories from ynab.models.existing_transaction import ExistingTransaction from ynab.models.new_transaction import NewTransaction from ynab.models.post_transactions_wrapper import PostTransactionsWrapper from ynab.models.put_transaction_wrapper import PutTransactionWrapper from ynab.models.transaction_detail import TransactionDetail # 1. Load environment variables load_dotenv(verbose=True) # 2. Globals / configuration ynab_api_key = os.environ.get("YNAB_API_KEY") # Set up XDG config directory CONFIG_DIR = Path(XDG_CONFIG_HOME) / "mcp-ynab" CONFIG_DIR.mkdir(parents=True, exist_ok=True) PREFERRED_BUDGET_ID_FILE = CONFIG_DIR / "preferred_budget_id.json" BUDGET_CATEGORY_CACHE_FILE = CONFIG_DIR / "budget_category_cache.json" # 3. Private helper functions async def _get_client() -> ApiClient: """Get a configured YNAB API client. Reads API key from environment variables.""" if not ynab_api_key: raise ValueError("YNAB_API_KEY not found in environment variables") configuration = Configuration(access_token=ynab_api_key) return ApiClient(configuration) class AsyncYNABClient: """Async context manager for YNAB API client.""" def __init__(self): self.client: Optional[ApiClient] = None async def __aenter__(self) -> ApiClient: self.client = await _get_client() return self.client async def __aexit__(self, exc_type, exc_val, exc_tb): if self.client: # ApiClient doesn't have a close method, but we'll keep the context manager pattern pass async def get_ynab_client() -> AsyncYNABClient: """Get an async YNAB client context manager.""" return AsyncYNABClient() def _get_empty_table(headers: List[str]) -> str: """Create an empty markdown table with just headers.""" widths = [len(h) + 2 for h in headers] header_line = "| " + " | ".join(f"{headers[i]:<{widths[i]}}" for i in range(len(headers))) + " |\n" sep_line = "|" + "|".join("-" * (widths[i] + 2) for i in range(len(headers))) + "|\n" return header_line + sep_line + "\n" def _get_column_widths(headers: List[str], rows: List[List[str]], col_count: int) -> List[int]: """Calculate column widths based on content.""" widths = [len(h) for h in headers] for row in rows: for i in range(col_count): widths[i] = max(widths[i], len(row[i])) return [w + 2 for w in widths] def _format_table_line(items: List[str], widths: List[int], alignments: List[str]) -> str: """Format a single line of the markdown table.""" line = "| " for i, item in enumerate(items): if alignments[i] == "right": line += f"{item:>{widths[i]}} | " else: line += f"{item:<{widths[i]}} | " return line.rstrip() + "\n" def _build_markdown_table( rows: List[List[str]], headers: List[str], alignments: Optional[List[str]] = None ) -> str: """Build a markdown table from rows and headers.""" if not rows: return _get_empty_table(headers) alignments = alignments if alignments is not None else ["left"] * len(headers) col_count = len(headers) widths = _get_column_widths(headers, rows, col_count) header_line = _format_table_line(headers, widths, alignments) sep_line = "|" + "|".join("-" * (w + 1) for w in widths) + "|\n" row_lines = "".join(_format_table_line(row, widths, alignments) for row in rows) return header_line + sep_line + row_lines def _format_accounts_output(accounts: List[Dict[str, Any]]) -> Dict[str, Any]: """Format account data into a user-friendly structure.""" account_groups: Dict[str, List[Dict[str, Any]]] = {} type_order = [ "checking", "savings", "creditCard", "mortgage", "autoLoan", "studentLoan", "otherAsset", "otherLiability", ] type_display_names = { "checking": "Checking Accounts", "savings": "Savings Accounts", "creditCard": "Credit Cards", "mortgage": "Mortgages", "autoLoan": "Auto Loans", "studentLoan": "Student Loans", "otherAsset": "Other Assets", "otherLiability": "Other Liabilities", } for account in accounts: if account.get("closed", False) or account.get("deleted", False): continue acct_type = account["type"] if acct_type not in account_groups: account_groups[acct_type] = [] balance = float(account["balance"]) / 1000 account_groups[acct_type].append( { "name": account["name"], "balance": f"${balance:,.2f}", "balance_raw": balance, "id": account["id"], } ) for group in account_groups.values(): group.sort(key=lambda x: abs(x["balance_raw"]), reverse=True) output: Dict[str, Any] = { "accounts": [], "summary": { "total_assets": 0.0, "total_liabilities": 0.0, "net_worth": 0.0, }, } for acct_type in type_order: if acct_type in account_groups and account_groups[acct_type]: group_data = { "type": type_display_names.get(acct_type, acct_type), "accounts": account_groups[acct_type], } group_total = sum(acct["balance_raw"] for acct in account_groups[acct_type]) group_data["total"] = f"${group_total:,.2f}" if acct_type in ["checking", "savings", "otherAsset"]: output["summary"]["total_assets"] += group_total elif acct_type in [ "creditCard", "mortgage", "autoLoan", "studentLoan", "otherLiability", ]: output["summary"]["total_liabilities"] += abs(group_total) output["accounts"].append(group_data) output["summary"]["net_worth_raw"] = ( output["summary"]["total_assets"] - output["summary"]["total_liabilities"] ) output["summary"]["total_assets"] = f"${output['summary']['total_assets']:,.2f}" output["summary"]["total_liabilities"] = f"${output['summary']['total_liabilities']:,.2f}" output["summary"]["net_worth"] = f"${output['summary']['net_worth_raw']:,.2f}" return output def _load_json_file(filename: str | Path) -> Dict[str, Any]: """Load JSON data from a file.""" try: with open(filename, "r") as f: return json.load(f) except FileNotFoundError: return {} def _save_json_file(filename: str | Path, data: Dict[str, Any]) -> None: """Save JSON data to a file.""" with open(filename, "w") as f: json.dump(data, f, indent=2) # 4. Create the MCP server instance mcp = FastMCP("YNAB") # Define resources class YNABResources: def __init__(self): self._preferred_budget_id: Optional[str] = None self._category_cache: Dict[str, List[Dict[str, Any]]] = {} self._load_data() def _load_data(self) -> None: """Load data from files.""" try: with open(PREFERRED_BUDGET_ID_FILE, "r") as f: self._preferred_budget_id = f.read().strip() or None except FileNotFoundError: self._preferred_budget_id = None try: self._category_cache = _load_json_file(BUDGET_CATEGORY_CACHE_FILE) except FileNotFoundError: self._category_cache = {} def get_preferred_budget_id(self) -> Optional[str]: """Get the preferred budget ID.""" return self._preferred_budget_id def set_preferred_budget_id(self, budget_id: str) -> None: """Set the preferred budget ID.""" self._preferred_budget_id = budget_id with open(PREFERRED_BUDGET_ID_FILE, "w") as f: f.write(budget_id) def get_cached_categories(self, budget_id: str) -> list[types.TextContent]: """Get categories from the cache formatted for MCP resources.""" cached_categories = self._category_cache.get(budget_id, []) return [ types.TextContent( type="text", text=f"{cat.get('name', 'Unnamed')} (ID: {cat.get('id', 'N/A')})" ) for cat in cached_categories ] def cache_categories(self, budget_id: str, categories: List[Dict[str, Any]]) -> None: """Cache categories for a budget ID.""" self._category_cache[budget_id] = [ { "id": cat.get("id"), "name": cat.get("name"), "group": cat.get("category_group_name"), } for cat in categories ] _save_json_file(BUDGET_CATEGORY_CACHE_FILE, self._category_cache) # Instantiate the resources ynab_resources = YNABResources() # Define resources using decorators @mcp.resource("ynab://preferences/budget_id") def get_preferred_budget_id() -> Optional[str]: """Get the preferred YNAB budget ID.""" return ynab_resources.get_preferred_budget_id() @mcp.resource("ynab://categories/{budget_id}") def get_cached_categories(budget_id: str) -> list[types.TextContent]: """Get cached categories for a budget ID.""" return ynab_resources.get_cached_categories(budget_id) # 5. Public tool functions async def _find_category_id(client: ApiClient, budget_id: str, category_name: str) -> Optional[str]: """Find a category ID by name.""" categories_api = CategoriesApi(client) categories_response = categories_api.get_categories(budget_id) categories = categories_response.data.category_groups for group in categories: for cat in group.categories: if cat.name.lower() == category_name.lower(): return cat.id return None @mcp.tool() async def create_transaction( account_id: str, amount: Annotated[float, Field(description="Amount in dollars")], payee_name: str, category_name: Optional[str] = None, memo: Optional[str] = None, ) -> Dict[str, Any]: """Create a new transaction in YNAB.""" async with await get_ynab_client() as client: transactions_api = TransactionsApi(client) budgets_api = BudgetsApi(client) amount_milliunits = int(amount * 1000) # Use preferred budget ID if available, otherwise fetch a list of budgets budget_id = ynab_resources.get_preferred_budget_id() if not budget_id: budgets_response = budgets_api.get_budgets() budget_id = budgets_response.data.budgets[0].id category_id = None if category_name: category_id = await _find_category_id(client, budget_id, category_name) # Create transaction data transaction = NewTransaction( account_id=account_id, date=date.today(), amount=amount_milliunits, payee_name=payee_name, memo=memo, category_id=category_id, ) wrapper = PostTransactionsWrapper(transaction=transaction) response = transactions_api.create_transaction(budget_id, wrapper) if response.data and response.data.transaction: return response.data.transaction.to_dict() return {} @mcp.tool() async def get_account_balance(account_id: str) -> float: """Get the current balance of a YNAB account (in dollars).""" async with await get_ynab_client() as client: accounts_api = AccountsApi(client) budgets_api = BudgetsApi(client) budgets_response = budgets_api.get_budgets() budget_id = budgets_response.data.budgets[0].id response = accounts_api.get_account_by_id(budget_id, account_id) return float(response.data.account.balance) / 1000 @mcp.tool() async def get_budgets() -> str: """List all YNAB budgets in Markdown format.""" async with await get_ynab_client() as client: budgets_api = BudgetsApi(client) budgets_response = budgets_api.get_budgets() budgets_list = budgets_response.data.budgets markdown = "# YNAB Budgets\n\n" if not budgets_list: markdown += "_No budgets found._" else: for budget in budgets_list: b = budget.to_dict() markdown += f"- **{b.get('name', 'Unnamed Budget')}** (ID: {b.get('id')})\n" return markdown @mcp.tool() async def get_accounts(budget_id: str) -> str: """List all YNAB accounts in a specific budget in Markdown format.""" async with await get_ynab_client() as client: accounts_api = AccountsApi(client) all_accounts: List[Dict[str, Any]] = [] response = accounts_api.get_accounts(budget_id) for account in response.data.accounts: if isinstance(account, Account): all_accounts.append(account.to_dict()) formatted = _format_accounts_output(all_accounts) markdown = "# YNAB Account Summary\n\n" markdown += "## Summary\n" markdown += f"- **Total Assets:** {formatted['summary']['total_assets']}\n" markdown += f"- **Total Liabilities:** {formatted['summary']['total_liabilities']}\n" markdown += f"- **Net Worth:** {formatted['summary']['net_worth']}\n\n" for group in formatted["accounts"]: markdown += f"## {group['type']}\n" markdown += f"**Group Total:** {group['total']}\n\n" rows = [] for acct in group["accounts"]: rows.append([acct["name"], acct["balance"], acct["id"]]) markdown += _build_markdown_table( rows, ["Account Name", "Balance", "ID"], ["left", "right", "left"] ) markdown += "\n" return markdown @mcp.tool() async def get_transactions(budget_id: str, account_id: str) -> str: """Get recent transactions for a specific account in a specific budget.""" async with await get_ynab_client() as client: transactions_api = TransactionsApi(client) all_transactions: List[TransactionDetail] = [] since_date = datetime.now().replace(day=1).date() response = transactions_api.get_transactions_by_account( budget_id, account_id, since_date=since_date ) all_transactions.extend(response.data.transactions) markdown = "# Recent Transactions\n\n" if not all_transactions: return markdown + "_No recent transactions found._\n" headers = ["ID", "Date", "Amount", "Payee Name", "Category Name", "Memo"] align = ["left", "left", "right", "left", "left", "left"] rows = [] for txn in all_transactions: amount_str = f"${txn.amount / 1000:,.2f}" rows.append( [ txn.id, txn.var_date.strftime("%Y-%m-%d"), amount_str, txn.payee_name or "N/A", txn.category_name or "N/A", txn.memo or "", ] ) markdown += _build_markdown_table(rows, headers, align) return markdown def _get_transaction_row( txn: TransactionDetail, account_map: Dict[str, str], filter_type: str ) -> List[str]: """Format a transaction into a row for the markdown table.""" amount_dollars = float(txn.amount) / 1000 amount_str = f"${abs(amount_dollars):,.2f}" if amount_dollars < 0: amount_str = f"-{amount_str}" status = [] if not txn.category_id: status.append("Uncategorized") if not txn.approved: status.append("Unapproved") return [ txn.id, txn.var_date.strftime("%Y-%m-%d"), account_map.get(txn.account_id, "Unknown"), amount_str, txn.payee_name or "N/A", ", ".join(status), txn.memo or "", ] def _filter_transactions( transactions: List[TransactionDetail], filter_type: str ) -> List[TransactionDetail]: """Filter transactions based on the filter type.""" needs_attention = [] for txn in transactions: if isinstance(txn, TransactionDetail): needs_category = filter_type in ["uncategorized", "both"] and not txn.category_id needs_approval = filter_type in ["unapproved", "both"] and not txn.approved if needs_category or needs_approval: needs_attention.append(txn) return needs_attention @mcp.tool() async def get_transactions_needing_attention( budget_id: str, filter_type: Annotated[ str, Field( description="Type of transactions to show. One of: 'uncategorized', 'unapproved', 'both'" ), ] = "both", days_back: Annotated[ Optional[int], Field(description="Number of days to look back (default 30, None for all)") ] = 30, ) -> str: """List transactions that need attention based on specified filter type in a YNAB budget.""" filter_type = filter_type.lower() if filter_type not in ["uncategorized", "unapproved", "both"]: return "Error: Invalid filter_type. Must be 'uncategorized', 'unapproved', or 'both'" async with await get_ynab_client() as client: transactions_api = TransactionsApi(client) accounts_api = AccountsApi(client) accounts_response = accounts_api.get_accounts(budget_id) account_map = { account.id: account.name for account in accounts_response.data.accounts if not account.closed and not account.deleted } since_date = (datetime.now() - timedelta(days=days_back)).date() if days_back else None response = transactions_api.get_transactions(budget_id, since_date=since_date) needs_attention = _filter_transactions(response.data.transactions, filter_type) markdown = f"# Transactions Needing Attention ({filter_type.title()})\n\n" if not needs_attention: return markdown + "_No transactions need attention._" markdown += "**Filters Applied:**\n" markdown += f"- Filter type: {filter_type}\n" if days_back: markdown += f"- Looking back {days_back} days\n" markdown += "\n" headers = ["ID", "Date", "Account", "Amount", "Payee", "Status", "Memo"] align = ["left", "left", "left", "right", "left", "left", "left"] rows = [_get_transaction_row(txn, account_map, filter_type) for txn in needs_attention] markdown += _build_markdown_table(rows, headers, align) return markdown @mcp.tool() def _find_transaction_by_id( transactions: List[TransactionDetail], transaction_id: str, id_type: str ) -> Optional[TransactionDetail]: """Find a transaction by its ID and ID type.""" for txn in transactions: if ( (id_type == "id" and txn.id == transaction_id) or (id_type == "import_id" and txn.import_id == transaction_id) or ( id_type == "transfer_transaction_id" and txn.transfer_transaction_id == transaction_id ) or ( id_type == "matched_transaction_id" and txn.matched_transaction_id == transaction_id ) ): return txn return None async def categorize_transaction( budget_id: str, transaction_id: str, category_id: str, id_type: str = "id", # One of: "id", "import_id", "transfer_transaction_id", "matched_transaction_id" ) -> str: """Categorize a transaction for a given YNAB budget with the provided category ID. Args: budget_id: The YNAB budget ID transaction_id: The transaction identifier category_id: The category ID to assign id_type: The type of transaction ID being provided. One of: - "id": Direct transaction ID (default) - "import_id": YNAB import ID format (YNAB:[milliunit_amount]:[iso_date]:[occurrence]) - "transfer_transaction_id": ID of a transfer transaction - "matched_transaction_id": ID of a matched transaction """ async with await get_ynab_client() as client: transactions_api = TransactionsApi(client) # Get since_date for import_id type since_date = None if id_type == "import_id" and ":" in transaction_id: try: since_date = datetime.strptime(transaction_id.split(":")[2], "%Y-%m-%d").date() except (ValueError, IndexError): pass response = transactions_api.get_transactions(budget_id, since_date=since_date) target_transaction = _find_transaction_by_id( response.data.transactions, transaction_id, id_type ) if target_transaction: wrapper = PutTransactionWrapper( transaction=ExistingTransaction( account_id=target_transaction.account_id, amount=target_transaction.amount, category_id=category_id, ) ) transactions_api.update_transaction( budget_id=budget_id, transaction_id=target_transaction.id, data=wrapper, ) return f"Transaction {transaction_id} (type: {id_type}) categorized as {category_id}." return f"Transaction {transaction_id} (type: {id_type}) not found." def _process_category_data(category: Category | Dict[str, Any]) -> tuple[str, str, float, float]: """Process category data and return tuple of (id, name, budgeted, activity).""" if isinstance(category, Category): return category.id, category.name, category.budgeted, category.activity cat_dict = cast(Dict[str, Any], category) return cat_dict["id"], cat_dict["name"], cat_dict["budgeted"], cat_dict["activity"] def _format_dollar_amount(amount: float) -> str: """Format a dollar amount with proper sign and formatting.""" amount_str = f"${abs(amount):,.2f}" return f"-{amount_str}" if amount < 0 else amount_str @mcp.tool() async def get_categories(budget_id: str) -> str: """List all transaction categories for a given YNAB budget in Markdown format.""" async with await get_ynab_client() as client: categories_api = CategoriesApi(client) response = categories_api.get_categories(budget_id) groups = response.data.category_groups markdown = "# YNAB Categories\n\n" headers = ["Category ID", "Category Name", "Budgeted", "Activity"] align = ["left", "left", "right", "right"] for group in groups: if isinstance(group, CategoryGroupWithCategories): categories_list = group.categories group_name = group.name else: group_dict = cast(Dict[str, Any], group.to_dict()) categories_list = group_dict["categories"] group_name = group_dict["name"] if not categories_list: continue markdown += f"## {group_name}\n\n" rows = [] for category in categories_list: cat_id, name, budgeted, activity = _process_category_data(category) budgeted_dollars = float(budgeted) / 1000 if budgeted else 0 activity_dollars = float(activity) / 1000 if activity else 0 rows.append( [ cat_id, name, _format_dollar_amount(budgeted_dollars), _format_dollar_amount(activity_dollars), ] ) table_md = _build_markdown_table(rows, headers, align) markdown += table_md + "\n" return markdown @mcp.tool() async def set_preferred_budget_id(budget_id: str) -> str: """Set the preferred YNAB budget ID.""" ynab_resources.set_preferred_budget_id(budget_id) return f"Preferred budget ID set to {budget_id}" @mcp.tool() async def cache_categories(budget_id: str) -> str: """Cache all categories for a given YNAB budget ID.""" async with await get_ynab_client() as client: categories_api = CategoriesApi(client) response = categories_api.get_categories(budget_id) groups = response.data.category_groups categories = [] for group in groups: if isinstance(group, CategoryGroupWithCategories): categories.extend(group.categories) ynab_resources.cache_categories(budget_id, [cat.to_dict() for cat in categories]) return f"Categories cached for budget ID {budget_id}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/klauern/mcp-ynab'

If you have feedback or need assistance with the MCP directory API, please join our Discord server