"""Expense Tracker MCP Server - Main Entry Point"""
from pathlib import Path
from typing import Annotated
from fastmcp import FastMCP, Context
from fastmcp.exceptions import ToolError
from expense_tracker.categorizer import categorize_item
from expense_tracker.database import (
init_database,
insert_receipt,
insert_items,
query_item_history,
get_all_item_types,
)
from expense_tracker.models import LineItem
from expense_tracker.pdf_parser import parse_pdf_receipt
# Initialize FastMCP server
mcp = FastMCP(name="Expense Tracker", version="0.1.0")
# Initialize database on startup
init_database()
@mcp.tool
async def import_receipt_from_pdf(
pdf_path: Annotated[str, "Absolute path to PDF receipt file"],
ctx: Context,
) -> dict:
"""Import and parse a receipt from a PDF file.
This tool:
1. Extracts text from the PDF
2. Parses receipt metadata (store, date, totals)
3. Extracts line items with prices
4. Categorizes each item using hybrid approach (static rules + LLM)
5. Stores everything in SQLite database
Args:
pdf_path: Absolute path to the PDF receipt file
ctx: FastMCP context for logging and LLM access
Returns:
Summary of imported receipt including store, date, item count, and category breakdown
"""
try:
await ctx.info(f"Starting import of receipt: {pdf_path}")
# Validate path
path = Path(pdf_path).expanduser().resolve()
if not path.exists():
raise ToolError(f"PDF file not found: {pdf_path}")
if not path.suffix.lower() == ".pdf":
raise ToolError(f"File must be a PDF: {pdf_path}")
# Parse PDF
await ctx.info("Extracting text from PDF...")
receipt, raw_items = parse_pdf_receipt(path)
await ctx.info(
f"Parsed receipt: {receipt.store_name} on {receipt.purchase_date}"
)
await ctx.info(f"Found {len(raw_items)} line items")
# Categorize items
await ctx.info("Categorizing items...")
categorized_items = []
item_type_counts = {}
for idx, item_dict in enumerate(raw_items):
# Categorize using hybrid approach
item_type = await categorize_item(item_dict["item_name"], ctx)
# Create LineItem object
line_item = LineItem(
item_name_raw=item_dict["item_name"],
item_type=item_type,
quantity=item_dict["quantity"],
line_total=item_dict["price"],
)
categorized_items.append(line_item)
# Track category counts
item_type_counts[item_type] = item_type_counts.get(item_type, 0) + 1
await ctx.debug(
f" [{idx+1}/{len(raw_items)}] {item_dict['item_name']} -> {item_type}"
)
# Insert into database
await ctx.info("Saving to database...")
receipt_id = insert_receipt(receipt)
insert_items(receipt_id, categorized_items)
await ctx.info(f"Successfully imported receipt #{receipt_id}")
# Return summary
return {
"status": "success",
"receipt_id": receipt_id,
"store_name": receipt.store_name,
"purchase_date": receipt.purchase_date,
"total": receipt.total,
"items_count": len(categorized_items),
"item_types": item_type_counts,
"message": f"Successfully imported {len(categorized_items)} items from {receipt.store_name}",
}
except FileNotFoundError as e:
raise ToolError(f"File not found: {str(e)}")
except ValueError as e:
raise ToolError(f"Failed to parse receipt: {str(e)}")
except Exception as e:
await ctx.error(f"Unexpected error during import: {e}")
raise ToolError(f"Failed to import receipt: {str(e)}")
@mcp.tool
async def get_item_history(
item_type: Annotated[str, "Item category to query (e.g., 'milk', 'bread', 'eggs')"],
time_range_days: Annotated[
int, "Number of days to look back (default: 365)"
] = 365,
) -> dict:
"""Query purchase history for a specific item type.
Returns detailed purchase history including:
- List of all purchases with dates, stores, quantities, and prices
- Statistics: total purchases, date range, average frequency, total spent
Args:
item_type: Category to query (e.g., 'milk', 'bread', 'eggs')
time_range_days: Number of days to look back (default: 365)
Returns:
Dictionary with purchases list and aggregated statistics
"""
try:
result = query_item_history(item_type, time_range_days)
if not result["purchases"]:
return {
"item_type": item_type,
"purchases": [],
"stats": {
"total_purchases": 0,
"message": f"No purchases found for '{item_type}' in the last {time_range_days} days",
},
}
return {
"item_type": item_type,
"purchases": result["purchases"],
"stats": result["stats"],
}
except Exception as e:
raise ToolError(f"Failed to query item history: {str(e)}")
@mcp.tool
async def list_item_types() -> dict:
"""List all item types/categories in the database with statistics.
Returns summary statistics for each item type including:
- Total number of purchases
- Most recent purchase date
- Total amount spent
Useful for getting an overview of all tracked expense categories.
Returns:
Dictionary with list of all item types and their statistics
"""
try:
item_types = get_all_item_types()
if not item_types:
return {
"item_types": [],
"message": "No items in database yet. Import a receipt to get started!",
}
return {
"item_types": item_types,
"total_categories": len(item_types),
"message": f"Found {len(item_types)} different item categories",
}
except Exception as e:
raise ToolError(f"Failed to list item types: {str(e)}")
if __name__ == "__main__":
# Run the MCP server
mcp.run()