remote.py•8.32 kB
from fastmcp import FastMCP
import os
import aiosqlite
import asyncio
mcp = FastMCP(name="expense_tracker")
DB_PATH = os.path.join(os.path.dirname(__file__), "expenses.db")
CATEGORIES_PATH = os.path.join(os.path.dirname(__file__), "categories.json")
async def init_db():
"""Initialize the SQLite database and create the expenses table if it doesn't exist"""
async with aiosqlite.connect(DB_PATH) as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS expenses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
date TEXT NOT NULL,
amount REAL NOT NULL,
category TEXT NOT NULL,
subcategory TEXT DEFAULT NULL,
note TEXT
)
''')
await conn.commit()
# Ensure DB initialized on startup
asyncio.get_event_loop().create_task(init_db())
@mcp.tool
async def add_expense(user_id: str, date: str, amount: float, category: str, subcategory: str|None = None, note: str|None = None):
"""Add a new expense entry
Args:
user_id (str): ID of the user
date (str): Date of the expense in YYYY-MM-DD format
amount (float): Amount of the expense
category (str): Category of the expense
subcategory (str|None): Subcategory of the expense (optional)
note (str|None): Additional note about the expense (optional)
Returns:
dict: Status message with the ID of the added expense
"""
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.execute('''
INSERT INTO expenses (user_id, date, amount, category, subcategory, note)
VALUES (?, ?, ?, ?, ?, ?)
''', (user_id, date, amount, category, subcategory, note))
await conn.commit()
return {"status": "ok", "id": cursor.lastrowid, "message": "Expense added successfully"}
@mcp.tool
async def list_expenses(user_id: str):
"""List all expense entries
Args:
user_id (str): ID of the user
Returns:
list: List of all expense entries
"""
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.execute('SELECT * FROM expenses WHERE user_id = ? ORDER BY date DESC', (user_id,))
rows = await cursor.fetchall()
expenses = [
{
"id": row[0],
"user_id": row[1],
"date": row[2],
"amount": row[3],
"category": row[4],
"subcategory": row[5],
"note": row[6]
}
for row in rows
]
return expenses
@mcp.tool
async def list_expenses_in_range(user_id: str, start_date:str, end_date:str):
"""List all expense entries within a date range
Args:
user_id (str): ID of the user
start_date (str): Start date in YYYY-MM-DD format
end_date (str): End date in YYYY-MM-DD format
Returns:
list: List of expense entries within the specified date range
"""
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.execute(
"""SELECT * FROM expenses WHERE user_id = ? AND date BETWEEN ? AND ? ORDER BY date DESC""",
(user_id, start_date, end_date)
)
rows = await cursor.fetchall()
expenses = [
{
"id": row[0],
"user_id": row[1],
"date": row[2],
"amount": row[3],
"category": row[4],
"subcategory": row[5],
"note": row[6]
}
for row in rows
]
return expenses
@mcp.tool
async def delete_expense(user_id: str, expense_id: int):
"""Delete an expense entry by ID
Args:
user_id (str): ID of the user
expense_id (int): ID of the expense to delete
Returns:
dict: Status message
"""
async with aiosqlite.connect(DB_PATH) as conn:
await conn.execute('DELETE FROM expenses WHERE id = ? AND user_id = ?', (expense_id, user_id))
await conn.commit()
return {"status": "ok", "message": "Expense deleted successfully"}
@mcp.tool
async def get_expense(user_id: str, expense_id: int):
"""Get details of a specific expense entry by ID
Args:
user_id (str): ID of the user
expense_id (int): ID of the expense to retrieve
Returns:
dict: Expense details or error message
"""
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.execute('SELECT * FROM expenses WHERE id = ? AND user_id = ?', (expense_id, user_id))
row = await cursor.fetchone()
if row:
expense = {
"id": row[0],
"user_id": row[1],
"date": row[2],
"amount": row[3],
"category": row[4],
"subcategory": row[5],
"note": row[6]
}
return expense
else:
return {"status": "error", "message": "Expense not found"}
@mcp.tool
async def update_expense(user_id: str, expense_id: int, date: str|None = None, amount: float|None = None, category: str|None = None, subcategory: str|None = None, note: str|None = None):
"""Update an existing expense entry by ID
Args:
user_id (str): ID of the user
expense_id (int): ID of the expense to update
date (str|None): New date in YYYY-MM-DD format (optional)
amount (float|None): New amount (optional)
category (str|None): New category (optional)
subcategory (str|None): New subcategory (optional)
note (str|None): New note (optional)
Returns:
dict: Status message
"""
async with aiosqlite.connect(DB_PATH) as conn:
fields = []
values = []
if date is not None:
fields.append("date = ?")
values.append(date)
if amount is not None:
fields.append("amount = ?")
values.append(amount)
if category is not None:
fields.append("category = ?")
values.append(category)
if subcategory is not None:
fields.append("subcategory = ?")
values.append(subcategory)
if note is not None:
fields.append("note = ?")
values.append(note)
values.extend([expense_id, user_id])
sql = f'UPDATE expenses SET {", ".join(fields)} WHERE id = ? AND user_id = ?'
await conn.execute(sql, values)
await conn.commit()
return {"status": "ok", "message": "Expense updated successfully"}
@mcp.tool
async def summarize(user_id: str, start_date:str, end_date:str, category:str|None=None):
"""Summarize expenses within a date range, optionally filtered by category
Args:
user_id (str): ID of the user
start_date (str): Start date in YYYY-MM-DD format
end_date (str): End date in YYYY-MM-DD format
category (str|None): Category to filter by (optional)
Returns:
dict: Total expense amount
"""
async with aiosqlite.connect(DB_PATH) as conn:
if category:
cursor = await conn.execute(
"""SELECT SUM(amount) FROM expenses WHERE user_id = ? AND date BETWEEN ? AND ? AND category = ?""",
(user_id, start_date, end_date, category)
)
else:
cursor = await conn.execute(
"""SELECT SUM(amount) FROM expenses WHERE user_id = ? AND date BETWEEN ? AND ?""",
(user_id, start_date, end_date)
)
total = await cursor.fetchone()
return {"total_expense": total[0] if total and total[0] else 0}
@mcp.resource("expense://categories", mime_type="application/json")
async def categories():
"""Get the list of expense categories from the categories.json file
Returns:
str: JSON string of categories
"""
return await asyncio.to_thread(lambda: open(CATEGORIES_PATH, "r", encoding="utf-8").read())
if __name__ == "__main__":
mcp.run(transport="http", host="0.0.0.0", port=8000) ## For remote server, default transport is "stdio", we have changed it to "streamable http"