#!/usr/bin/env python3
"""
Paymo Timesheet Automation Script
Automate time entry creation in Paymo from structured meeting/work data.
Can run as CLI or MCP server.
"""
import requests
import yaml
import json
import sys
import time
import os
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Tuple, Any
from pathlib import Path
import pytz
from dateutil import parser as dateparser
from rich.console import Console
from rich.table import Table
from rich import print as rprint
import click
import io
# When running as MCP server, disable all console output to avoid filling Claude's context
# Claude Desktop captures stderr output and includes it in the conversation
_is_mcp_mode = len(sys.argv) > 1 and sys.argv[1] == 'mcp'
console = Console(file=io.StringIO() if _is_mcp_mode else sys.stdout, quiet=_is_mcp_mode)
# MCP Server support (optional)
MCP_AVAILABLE = False
try:
from mcp.server.fastmcp import FastMCP
MCP_AVAILABLE = True
except ImportError:
pass
class PaymoClient:
"""Wrapper for Paymo API calls"""
def __init__(self, api_key: str, base_url: str = "https://app.paymoapp.com/api/"):
self.api_key = api_key
self.base_url = base_url.rstrip('/') + '/'
self.session = requests.Session()
self.session.auth = (api_key, 'X')
self.session.headers.update({
'Accept': 'application/json',
'Content-Type': 'application/json'
})
def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
"""Make API request with error handling"""
url = f"{self.base_url}{endpoint.lstrip('/')}"
try:
response = self.session.request(method, url, **kwargs)
# Check rate limiting headers
# Note: Paymo may return multiple values like "4, 3" for different rate limit buckets
remaining = response.headers.get('X-Ratelimit-Remaining')
limit = response.headers.get('X-Ratelimit-Limit')
decay = response.headers.get('X-Ratelimit-Decay-Period')
if remaining:
try:
# Handle comma-separated values by taking the minimum
remaining_values = [int(x.strip()) for x in remaining.split(',')]
remaining_min = min(remaining_values)
if remaining_min < 5:
console.print(f"[yellow]⚠ Rate limit: {remaining}/{limit} remaining (resets in {decay}s)[/yellow]")
except ValueError:
pass # Ignore malformed rate limit headers
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = e.response.headers.get('Retry-After', '60')
console.print(f"[red]Rate limit exceeded! Must wait {retry_after}s[/red]")
# Re-raise with the retry_after info attached
e.retry_after = int(retry_after)
else:
console.print(f"[red]API Error: {e}[/red]")
if hasattr(e.response, 'text'):
console.print(f"[red]Response: {e.response.text}[/red]")
raise
except requests.exceptions.RequestException as e:
console.print(f"[red]Request failed: {e}[/red]")
raise
def get_clients(self, active_only: bool = True) -> List[Dict]:
"""List all clients"""
endpoint = "clients"
if active_only:
endpoint += "?where=active=true"
response = self._request('GET', endpoint)
return response.get('clients', [])
def create_client(self, name: str, **kwargs) -> Dict:
"""Create a new client"""
data = {'name': name, **kwargs}
response = self._request('POST', 'clients', json=data)
return response.get('clients', [{}])[0] if 'clients' in response else response
def get_projects(self, active_only: bool = True) -> List[Dict]:
"""List all projects"""
endpoint = "projects"
if active_only:
endpoint += "?where=active=true"
response = self._request('GET', endpoint)
return response.get('projects', [])
def get_tasks(self, project_id: Optional[int] = None) -> List[Dict]:
"""List tasks, optionally filtered by project"""
endpoint = "tasks"
if project_id:
endpoint += f"?where=project_id={project_id}"
response = self._request('GET', endpoint)
return response.get('tasks', [])
def get_entries(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> List[Dict]:
"""List time entries within date range"""
endpoint = "entries"
if start_date and end_date:
# Convert dates to ISO format
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
start_iso = start_dt.strftime('%Y-%m-%dT00:00:00Z')
end_iso = end_dt.strftime('%Y-%m-%dT23:59:59Z')
endpoint += f'?where=time_interval in ("{start_iso}","{end_iso}")'
response = self._request('GET', endpoint)
return response.get('entries', [])
def create_entry(self, task_id: int, **kwargs) -> Dict:
"""
Create a new time entry
Args:
task_id: Required task ID
**kwargs: Either (start_time, end_time) or (date, duration)
Plus optional description, billed, etc.
"""
data = {'task_id': task_id, **kwargs}
response = self._request('POST', 'entries', json=data)
return response
def create_entries_batch(self, entries: List[Dict]) -> Dict:
"""
Create multiple time entries in one API call
Args:
entries: List of entry dicts, each with task_id and time data
"""
response = self._request('POST', 'entries', json=entries)
return response
def delete_entry(self, entry_id: int) -> Dict:
"""Delete a time entry by ID"""
response = self._request('DELETE', f'entries/{entry_id}')
return response
def update_entry(self, entry_id: int, **kwargs) -> Dict:
"""Update a time entry (billed, description, etc.)"""
response = self._request('PUT', f'entries/{entry_id}', json=kwargs)
return response.get('entries', [response])[0] if 'entries' in response else response
def create_task(self, project_id: int, name: str, billable: bool = True) -> Dict:
"""Create a new task in a project"""
data = {
'project_id': project_id,
'name': name,
'billable': billable
}
response = self._request('POST', 'tasks', json=data)
return response
def update_task(self, task_id: int, **kwargs) -> Dict:
"""Update a task (name, billable, etc.)"""
response = self._request('PUT', f'tasks/{task_id}', json=kwargs)
return response.get('tasks', [response])[0] if 'tasks' in response else response
def create_project(self, name: str, client_id: int, **kwargs) -> Dict:
"""Create a new project"""
data = {'name': name, 'client_id': client_id, **kwargs}
response = self._request('POST', 'projects', json=data)
return response.get('projects', [{}])[0] if 'projects' in response else response
def update_project(self, project_id: int, **kwargs) -> Dict:
"""Update an existing project"""
response = self._request('PUT', f'projects/{project_id}', json=kwargs)
return response.get('projects', [{}])[0] if 'projects' in response else response
def get_invoices(self, client_id: Optional[int] = None, status: Optional[str] = None) -> List[Dict]:
"""
List invoices, optionally filtered by client and status
Args:
client_id: Filter by client ID
status: Filter by status (sent, viewed, paid, etc.)
"""
endpoint = "invoices"
filters = []
if client_id:
filters.append(f"client_id={client_id}")
if status:
filters.append(f"status={status}")
if filters:
endpoint += "?where=" + " and ".join(filters)
response = self._request('GET', endpoint)
return response.get('invoices', [])
def get_invoice(self, invoice_id: int) -> Dict:
"""Get detailed invoice information"""
response = self._request('GET', f'invoices/{invoice_id}')
return response.get('invoices', [{}])[0]
def get_outstanding_invoices_last_week(self) -> List[Dict]:
"""Get outstanding invoices (sent or viewed) from the last 7 days"""
from datetime import datetime, timedelta
all_invoices = self.get_invoices()
# Filter for outstanding (sent or viewed) from last 7 days
week_ago = datetime.now() - timedelta(days=7)
outstanding = []
for inv in all_invoices:
status = inv.get('status', '').lower()
if status in ['sent', 'viewed']:
inv_date_str = inv.get('date', '')
if inv_date_str:
inv_date = datetime.strptime(inv_date_str, '%Y-%m-%d')
if inv_date >= week_ago:
outstanding.append(inv)
return outstanding
def export_invoice_entries_csv(self, invoice_id: int,
include_date: bool = True,
include_start_time: bool = True,
include_end_time: bool = True) -> str:
"""
Export CSV of entries that are actually on a specific invoice
Args:
invoice_id: Invoice ID
include_date: Include date column (default True)
include_start_time: Include start time column (default True)
include_end_time: Include end time column (default True)
Returns:
CSV content as string
"""
import csv
import io
import html
import time
# Get invoice with items
response = self._request('GET', f'invoices/{invoice_id}?include=invoiceitems')
invoice = response.get('invoices', [{}])[0]
invoice_items = invoice.get('invoiceitems', [])
# Get invoice item IDs
invoice_item_ids = set(item.get('id') for item in invoice_items if item.get('id'))
if not invoice_item_ids:
# Build header based on included columns using csv writer
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
header_parts = []
if include_date:
header_parts.append('Date')
if include_start_time:
header_parts.append('Start Time')
if include_end_time:
header_parts.append('End Time')
header_parts.extend(['Duration (hours)', 'Task', 'Description', 'Billed', 'Entry ID'])
writer.writerow(header_parts)
return output.getvalue()
# Get all entries and filter by invoice_item_id
# We need to fetch entries to check their invoice_item_id
# Use a broad date range - go back 3 months from invoice date to catch all entries
inv_date = invoice.get('date', '')
if inv_date:
from datetime import datetime, timedelta
inv_dt = datetime.strptime(inv_date, '%Y-%m-%d')
# Get entries from 90 days before invoice to invoice date
start_date = (inv_dt - timedelta(days=90)).strftime('%Y-%m-%d')
end_date = inv_date
else:
# Fallback - get last 90 days
from datetime import datetime, timedelta
now = datetime.now()
start_date = (now - timedelta(days=90)).strftime('%Y-%m-%d')
end_date = now.strftime('%Y-%m-%d')
all_entries = self.get_entries(start_date, end_date)
# Filter to only entries on this invoice
entries = [e for e in all_entries if e.get('invoice_item_id') in invoice_item_ids]
# Sort entries by start date (earliest first)
def get_entry_sort_key(entry):
# Use start_time if available, otherwise use date
if entry.get('start_time'):
return entry.get('start_time')
elif entry.get('date'):
return entry.get('date')
else:
# Fallback to entry ID if no date info
return str(entry.get('id', 0)).zfill(20)
entries.sort(key=get_entry_sort_key)
# Build task cache - fetch all unique tasks upfront
task_cache = {}
unique_task_ids = set(e.get('task_id') for e in entries if e.get('task_id'))
for task_id in unique_task_ids:
try:
time.sleep(2) # 2 second delay to avoid rate limits
task_response = self._request('GET', f'tasks/{task_id}')
task_data = task_response.get('tasks', [{}])[0] if 'tasks' in task_response else {}
task_cache[task_id] = task_data.get('name', '')
except Exception as e:
# If we hit a rate limit, wait and retry once
if '429' in str(e):
console.print(f"Rate limit hit, waiting 6 seconds...")
time.sleep(6)
try:
task_response = self._request('GET', f'tasks/{task_id}')
task_data = task_response.get('tasks', [{}])[0] if 'tasks' in task_response else {}
task_cache[task_id] = task_data.get('name', '')
except Exception as retry_err:
console.print(f"Warning: Failed to fetch task {task_id} after retry: {retry_err}")
task_cache[task_id] = ''
else:
console.print(f"Warning: Failed to fetch task {task_id}: {e}")
task_cache[task_id] = ''
# Create CSV with QUOTE_ALL for maximum compatibility
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
# Header - build based on included columns
header = []
if include_date:
header.append('Date')
if include_start_time:
header.append('Start Time')
if include_end_time:
header.append('End Time')
header.extend(['Duration (hours)', 'Task', 'Description', 'Billed', 'Entry ID'])
writer.writerow(header)
# Rows
for entry in entries:
# Get task name from cache
task_id = entry.get('task_id')
task_name = task_cache.get(task_id, '') if task_id else ''
# Clean description (strip HTML tags and decode entities)
description = entry.get('description', '')
if description:
# Remove HTML tags
import re
description = re.sub(r'<[^>]+>', '', description)
# Decode HTML entities
description = html.unescape(description)
description = description.strip()
# Calculate duration
if entry.get('duration'):
duration_hours = entry['duration'] / 3600
else:
start = dateparser.parse(entry.get('start_time', ''))
end = dateparser.parse(entry.get('end_time', ''))
duration_hours = (end - start).total_seconds() / 3600 if start and end else 0
# Extract date from start_time if date field is empty
entry_date = entry.get('date', '')
if not entry_date and entry.get('start_time'):
entry_date = entry.get('start_time', '')[:10]
# Build row based on included columns
row = []
if include_date:
row.append(entry_date)
if include_start_time:
row.append(entry.get('start_time', ''))
if include_end_time:
row.append(entry.get('end_time', ''))
row.extend([
f"{duration_hours:.2f}",
task_name,
description,
'Yes' if entry.get('billed') else 'No',
entry.get('id', '')
])
writer.writerow(row)
return output.getvalue()
def export_timesheet_csv(self, start_date: str, end_date: str,
project_id: Optional[int] = None) -> str:
"""
Export timesheet to CSV format by fetching entries and formatting
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
project_id: Optional project filter
Returns:
CSV content as string
"""
import csv
import io
import html
import time
# Get all entries for date range
entries = self.get_entries(start_date, end_date)
# Filter by project if specified
if project_id:
entries = [e for e in entries if e.get('project_id') == project_id]
# Sort entries by start date (earliest first)
def get_entry_sort_key(entry):
# Use start_time if available, otherwise use date
if entry.get('start_time'):
return entry.get('start_time')
elif entry.get('date'):
return entry.get('date')
else:
# Fallback to entry ID if no date info
return str(entry.get('id', 0)).zfill(20)
entries.sort(key=get_entry_sort_key)
# Build task cache - fetch all unique tasks upfront
task_cache = {}
unique_task_ids = set(e.get('task_id') for e in entries if e.get('task_id'))
for task_id in unique_task_ids:
try:
time.sleep(2) # 2 second delay to avoid rate limits
task_response = self._request('GET', f'tasks/{task_id}')
task_data = task_response.get('tasks', [{}])[0] if 'tasks' in task_response else {}
task_cache[task_id] = task_data.get('name', '')
except Exception as e:
# If we hit a rate limit, wait and retry once
if '429' in str(e):
console.print(f"Rate limit hit, waiting 6 seconds...")
time.sleep(6)
try:
task_response = self._request('GET', f'tasks/{task_id}')
task_data = task_response.get('tasks', [{}])[0] if 'tasks' in task_response else {}
task_cache[task_id] = task_data.get('name', '')
except Exception as retry_err:
console.print(f"Warning: Failed to fetch task {task_id} after retry: {retry_err}")
task_cache[task_id] = ''
else:
console.print(f"Warning: Failed to fetch task {task_id}: {e}")
task_cache[task_id] = ''
# Create CSV with QUOTE_ALL for maximum compatibility
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
# Header
writer.writerow(['Date', 'Start Time', 'End Time', 'Duration (hours)',
'Task', 'Description', 'Billed', 'Entry ID'])
# Rows
for entry in entries:
# Get task name from cache
task_id = entry.get('task_id')
task_name = task_cache.get(task_id, '') if task_id else ''
# Clean description (strip HTML tags and decode entities)
description = entry.get('description', '')
if description:
# Remove HTML tags
import re
description = re.sub(r'<[^>]+>', '', description)
# Decode HTML entities
description = html.unescape(description)
description = description.strip()
# Calculate duration
if entry.get('duration'):
duration_hours = entry['duration'] / 3600
else:
start = dateparser.parse(entry.get('start_time', ''))
end = dateparser.parse(entry.get('end_time', ''))
duration_hours = (end - start).total_seconds() / 3600 if start and end else 0
# Extract date from start_time if date field is empty
entry_date = entry.get('date', '')
if not entry_date and entry.get('start_time'):
entry_date = entry.get('start_time', '')[:10]
writer.writerow([
entry_date,
entry.get('start_time', ''),
entry.get('end_time', ''),
f"{duration_hours:.2f}",
task_name,
description,
'Yes' if entry.get('billed') else 'No',
entry.get('id', '')
])
return output.getvalue()
def find_project_by_name(self, name: str) -> Optional[Dict]:
"""Find project by partial name match (case-insensitive)"""
projects = self.get_projects()
name_lower = name.lower()
for project in projects:
if name_lower in project.get('name', '').lower():
return project
return None
def find_task_by_name(self, project_id: int, name: str) -> Optional[Dict]:
"""Find task within project by name"""
tasks = self.get_tasks(project_id)
name_lower = name.lower()
for task in tasks:
if name_lower in task.get('name', '').lower():
return task
return None
def get_expenses(self, project_id: Optional[int] = None, client_id: Optional[int] = None,
start_date: Optional[str] = None, end_date: Optional[str] = None,
billed: Optional[bool] = None) -> List[Dict]:
"""
List expenses with optional filters
Args:
project_id: Filter by project ID
client_id: Filter by client ID
start_date: Filter from date (YYYY-MM-DD)
end_date: Filter to date (YYYY-MM-DD)
billed: Filter by billed status (True=billed, False=unbilled, None=all)
Returns:
List of expenses
"""
endpoint = "expenses"
filters = []
if project_id:
filters.append(f"project_id={project_id}")
if client_id:
filters.append(f"client_id={client_id}")
if start_date:
filters.append(f"date>={start_date}")
if end_date:
filters.append(f"date<={end_date}")
if billed is not None:
filters.append(f"billed={'true' if billed else 'false'}")
if filters:
endpoint += "?where=" + " and ".join(filters)
response = self._request('GET', endpoint)
return response.get('expenses', [])
def create_expense(self, project_id: int, amount: float, date: str,
description: Optional[str] = None,
expense_category_id: Optional[int] = None,
billable: bool = True,
file_path: Optional[str] = None) -> Dict:
"""
Create a new expense entry
Args:
project_id: Project to attach expense to
amount: Expense amount
date: Date in YYYY-MM-DD format
description: Expense description
expense_category_id: Category ID (travel, meals, etc.)
billable: Whether expense is billable (default True)
file_path: Path to receipt image/PDF for upload
Returns:
Created expense with id, amount, date, description, billable, billed
"""
data = {
'project_id': project_id,
'amount': amount,
'date': date,
'billable': billable
}
if description:
data['description'] = description
if expense_category_id:
data['expense_category_id'] = expense_category_id
if file_path:
# Handle file upload with multipart/form-data
import os
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
with open(file_path, 'rb') as f:
files = {'file': f}
# For file upload, send as form data instead of JSON
headers = {'Accept': 'application/json'}
url = f"{self.base_url}expenses"
response = self.session.post(url, data=data, files=files, headers=headers)
# Check rate limiting
remaining = response.headers.get('X-Ratelimit-Remaining')
limit = response.headers.get('X-Ratelimit-Limit')
decay = response.headers.get('X-Ratelimit-Decay-Period')
if remaining and int(remaining) < 5:
console.print(f"[yellow]⚠ Rate limit: {remaining}/{limit} remaining (resets in {decay}s)[/yellow]")
response.raise_for_status()
return response.json().get('expenses', [response.json()])[0] if 'expenses' in response.json() else response.json()
else:
response = self._request('POST', 'expenses', json=data)
return response.get('expenses', [response])[0] if 'expenses' in response else response
def delete_expense(self, expense_id: int) -> Dict:
"""
Delete an expense by ID
Args:
expense_id: Expense ID to delete
Returns:
Success confirmation
"""
response = self._request('DELETE', f'expenses/{expense_id}')
return response
def update_expense(self, expense_id: int, **kwargs) -> Dict:
"""
Update an expense (mark as billed, update amount, etc.)
Args:
expense_id: Expense ID
**kwargs: Fields to update (billed, amount, description, etc.)
Returns:
Updated expense
"""
response = self._request('PUT', f'expenses/{expense_id}', json=kwargs)
return response.get('expenses', [response])[0] if 'expenses' in response else response
def get_expense_categories(self) -> List[Dict]:
"""
List available expense categories
Returns:
List of categories with id, name
"""
try:
response = self._request('GET', 'expensecategories')
return response.get('expensecategories', [])
except:
# Try with underscore if camelCase fails
response = self._request('GET', 'expense_categories')
return response.get('expense_categories', [])
class TimesheetProcessor:
"""Process timesheet YAML and create Paymo entries"""
def __init__(self, client: PaymoClient, config: Dict):
self.client = client
self.config = config
self.default_tz = pytz.timezone(config.get('timezone', 'America/Chicago'))
def load_timesheet(self, filepath: str) -> Dict:
"""Load and validate timesheet YAML"""
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
# Validate required fields
if 'entries' not in data:
raise ValueError("Timesheet must have 'entries' field")
return data
def resolve_project_task(self, matter: str) -> Tuple[int, int]:
"""Resolve matter name to (project_id, task_id)"""
# First check config mappings
projects_config = self.config.get('projects', {})
if matter in projects_config:
project_id = projects_config[matter].get('project_id')
task_id = projects_config[matter].get('task_id')
return project_id, task_id
# Otherwise search by name
project = self.client.find_project_by_name(matter)
if not project:
raise ValueError(f"Could not find project matching '{matter}'")
project_id = project['id']
# Get first task in project (or could prompt user)
tasks = self.client.get_tasks(project_id)
if not tasks:
raise ValueError(f"Project '{project['name']}' has no tasks")
task_id = tasks[0]['id']
console.print(f"[yellow]Using project: {project['name']} (ID: {project_id})[/yellow]")
console.print(f"[yellow]Using task: {tasks[0]['name']} (ID: {task_id})[/yellow]")
return project_id, task_id
def convert_to_utc(self, date: str, time: str, tz: str) -> str:
"""Convert local datetime to UTC ISO format"""
timezone = pytz.timezone(tz) if tz else self.default_tz
# Parse date and time
dt_str = f"{date} {time}"
local_dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M')
# Localize and convert to UTC
local_dt = timezone.localize(local_dt)
utc_dt = local_dt.astimezone(pytz.UTC)
return utc_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
def process_entry(self, entry: Dict, task_id: int) -> Dict:
"""Convert timesheet entry to Paymo API format"""
# Allow entry to override task_id
entry_task_id = entry.get('task_id', task_id)
api_entry = {'task_id': entry_task_id}
# Get timezone
tz = entry.get('timezone', self.config.get('timezone', 'America/Chicago'))
# Handle two formats: (start_time, end_time) or (duration_hours)
if 'start_time' in entry and 'end_time' in entry:
# Convert to UTC
api_entry['start_time'] = self.convert_to_utc(
entry['date'], entry['start_time'], tz
)
api_entry['end_time'] = self.convert_to_utc(
entry['date'], entry['end_time'], tz
)
elif 'duration_hours' in entry:
# Use date + duration
api_entry['date'] = entry['date']
api_entry['duration'] = int(entry['duration_hours'] * 3600)
else:
raise ValueError(f"Entry must have either (start_time, end_time) or duration_hours: {entry}")
# Add description
if 'description' in entry:
api_entry['description'] = entry['description']
# Add billed flag if specified
if 'billed' in entry:
api_entry['billed'] = entry['billed']
return api_entry
def calculate_duration(self, entry: Dict) -> float:
"""Calculate duration in hours for preview"""
if 'duration_hours' in entry:
return entry['duration_hours']
# Calculate from start/end times
tz = entry.get('timezone', self.config.get('timezone', 'America/Chicago'))
timezone = pytz.timezone(tz)
start_str = f"{entry['date']} {entry['start_time']}"
end_str = f"{entry['date']} {entry['end_time']}"
start_dt = datetime.strptime(start_str, '%Y-%m-%d %H:%M')
end_dt = datetime.strptime(end_str, '%Y-%m-%d %H:%M')
start_dt = timezone.localize(start_dt)
end_dt = timezone.localize(end_dt)
duration = (end_dt - start_dt).total_seconds() / 3600
return duration
def preview(self, filepath: str) -> List[Dict]:
"""Preview entries without creating"""
data = self.load_timesheet(filepath)
entries = data['entries']
matter = data.get('matter', 'Unknown')
rate = data.get('rate', 0)
# Create table
table = Table(title=f"Timesheet Preview: {matter}")
table.add_column("Date", style="cyan")
table.add_column("Time", style="magenta")
table.add_column("Duration", style="green")
table.add_column("Hours", style="yellow")
table.add_column("Description", style="white")
total_hours = 0
for entry in entries:
date = entry['date']
duration_hours = self.calculate_duration(entry)
total_hours += duration_hours
# Format time range or duration
if 'start_time' in entry:
time_str = f"{entry['start_time']}-{entry['end_time']}"
else:
time_str = "—"
# Format duration
hours = int(duration_hours)
minutes = int((duration_hours - hours) * 60)
duration_str = f"{hours}:{minutes:02d}"
description = entry.get('description', '')
if len(description) > 50:
description = description[:47] + "..."
table.add_row(
date,
time_str,
duration_str,
f"{duration_hours:.2f}",
description
)
console.print(table)
# Summary
total_billing = total_hours * rate if rate else 0
console.print(f"\n[bold]Total: {total_hours:.2f} hours[/bold]", end="")
if rate:
console.print(f" [bold green](${total_billing:,.2f} at ${rate}/hr)[/bold green]")
else:
console.print()
return entries
def submit(self, filepath: str, dry_run: bool = False, auto_confirm: bool = False) -> List[Dict]:
"""Create all entries from timesheet"""
data = self.load_timesheet(filepath)
entries = data['entries']
matter = data.get('matter')
if not matter:
raise ValueError("Timesheet must specify 'matter' field")
# Resolve project and task
console.print(f"\n[bold]Resolving project for matter: {matter}[/bold]")
project_id, task_id = self.resolve_project_task(matter)
# Preview first
console.print(f"\n[bold]Preview of entries to create:[/bold]")
self.preview(filepath)
if dry_run:
console.print("\n[yellow]Dry run - no entries created[/yellow]")
return []
# Confirm
if not auto_confirm:
if not click.confirm("\nCreate these entries in Paymo?"):
console.print("[yellow]Cancelled[/yellow]")
return []
else:
console.print("\n[green]Auto-confirmed - proceeding with creation[/green]")
# Create entries as batch
console.print(f"\n[bold]Creating {len(entries)} entries in batch...[/bold]")
try:
# Process all entries
api_entries = [self.process_entry(entry, task_id) for entry in entries]
# Try batch creation first
try:
result = self.client.create_entries_batch(api_entries)
console.print(f"[green]✓ Successfully created {len(entries)} entries in one API call[/green]")
return result
except Exception as batch_error:
# If batch fails, fall back to individual creation
console.print(f"[yellow]Batch creation failed, trying individual entries...[/yellow]")
console.print(f"[yellow]Error: {batch_error}[/yellow]")
created = []
for i, (entry, api_entry) in enumerate(zip(entries, api_entries), 1):
try:
console.print(f"[{i}/{len(entries)}] Creating entry for {entry['date']}...", end=" ")
result = self.client.create_entry(**api_entry)
created.append(result)
console.print("[green]✓[/green]")
# Add delay between calls to avoid rate limiting
if i < len(entries):
time.sleep(2)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = getattr(e, 'retry_after', 60)
console.print(f"[yellow]⏳ Rate limited, waiting {retry_after}s...[/yellow]")
time.sleep(retry_after)
# Retry this entry
try:
result = self.client.create_entry(**api_entry)
created.append(result)
console.print("[green]✓ (after retry)[/green]")
except Exception as retry_error:
console.print(f"[red]✗ Retry failed: {retry_error}[/red]")
else:
console.print(f"[red]✗ Error: {e}[/red]")
except Exception as e:
console.print(f"[red]✗ Error: {e}[/red]")
console.print(f"\n[bold green]Successfully created {len(created)} entries[/bold green]")
return created
except Exception as e:
console.print(f"[red]Error processing entries: {e}[/red]")
return []
def load_config() -> Dict:
"""Load configuration from ~/.mcp-config/paymo/ and ~/.mcp-auth/paymo/"""
config_dir = Path.home() / '.mcp-config' / 'paymo'
auth_dir = Path.home() / '.mcp-auth' / 'paymo'
# Start with defaults
config = {
'timezone': 'America/Chicago',
'projects': {}
}
# Load non-sensitive config
config_path = config_dir / 'config.json'
if config_path.exists():
with open(config_path, 'r') as f:
config.update(json.load(f))
# Load API key from auth
auth_path = auth_dir / 'auth.json'
if auth_path.exists():
with open(auth_path, 'r') as f:
auth_data = json.load(f)
config['api_key'] = auth_data.get('api_key')
else:
console.print(f"[yellow]Warning: Auth file not found at {auth_path}[/yellow]")
console.print("[yellow]Will prompt for API key if needed[/yellow]")
return config
@click.group()
def cli():
"""Paymo Timesheet Automation Tool"""
pass
@cli.command()
def list_projects():
"""List all active Paymo projects"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
projects = client.get_projects()
table = Table(title="Paymo Projects")
table.add_column("ID", style="cyan")
table.add_column("Name", style="white")
table.add_column("Client", style="magenta")
table.add_column("Active", style="green")
for project in projects:
table.add_row(
str(project['id']),
project.get('name', ''),
project.get('client_name', ''),
"✓" if project.get('active') else "✗"
)
console.print(table)
@cli.command()
@click.option('--project-id', type=int, required=True, help='Project ID')
def list_tasks(project_id: int):
"""List tasks for a project"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
tasks = client.get_tasks(project_id)
table = Table(title=f"Tasks for Project {project_id}")
table.add_column("ID", style="cyan")
table.add_column("Name", style="white")
table.add_column("Billable", style="green")
for task in tasks:
table.add_row(
str(task['id']),
task.get('name', ''),
"✓" if task.get('billable') else "✗"
)
console.print(table)
@cli.command()
@click.option('--start', help='Start date (YYYY-MM-DD)')
@click.option('--end', help='End date (YYYY-MM-DD)')
def list_entries(start: str, end: str):
"""List time entries for a date range"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
entries = client.get_entries(start, end)
table = Table(title=f"Time Entries ({start} to {end})")
table.add_column("ID", style="cyan")
table.add_column("Date", style="magenta")
table.add_column("Duration", style="green")
table.add_column("Description", style="white")
total_seconds = 0
for entry in entries:
entry_id = str(entry['id'])
date = entry.get('date', '')
# Calculate duration
if entry.get('duration'):
duration_sec = entry['duration']
else:
start_time = dateparser.parse(entry.get('start_time', ''))
end_time = dateparser.parse(entry.get('end_time', ''))
duration_sec = (end_time - start_time).total_seconds()
total_seconds += duration_sec
hours = int(duration_sec / 3600)
minutes = int((duration_sec % 3600) / 60)
duration_str = f"{hours}:{minutes:02d}"
description = entry.get('description', '')[:50]
table.add_row(entry_id, date, duration_str, description)
console.print(table)
total_hours = total_seconds / 3600
console.print(f"\n[bold]Total: {total_hours:.2f} hours[/bold]")
@cli.command()
@click.argument('filepath', type=click.Path(exists=True))
def preview(filepath: str):
"""Preview timesheet entries without creating them"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
processor = TimesheetProcessor(client, config)
processor.preview(filepath)
@cli.command()
@click.argument('filepath', type=click.Path(exists=True))
@click.option('--dry-run', is_flag=True, help='Preview only, do not create entries')
@click.option('--yes', '-y', is_flag=True, help='Skip confirmation prompt')
def submit(filepath: str, dry_run: bool, yes: bool):
"""Submit timesheet entries to Paymo"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
processor = TimesheetProcessor(client, config)
processor.submit(filepath, dry_run=dry_run, auto_confirm=yes)
@cli.command()
@click.argument('entry_ids', nargs=-1, type=int, required=True)
@click.option('--yes', '-y', is_flag=True, help='Skip confirmation prompt')
def delete(entry_ids: tuple, yes: bool):
"""Delete time entries by ID"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
console.print(f"\n[bold red]About to delete {len(entry_ids)} entries:[/bold red]")
for entry_id in entry_ids:
console.print(f" - Entry ID: {entry_id}")
if not yes:
if not click.confirm("\nAre you sure you want to delete these entries?"):
console.print("[yellow]Cancelled[/yellow]")
return
deleted = 0
for entry_id in entry_ids:
try:
console.print(f"Deleting entry {entry_id}...", end=" ")
client.delete_entry(entry_id)
console.print("[green]✓[/green]")
deleted += 1
except Exception as e:
console.print(f"[red]✗ Error: {e}[/red]")
console.print(f"\n[bold green]Successfully deleted {deleted} entries[/bold green]")
# Main entry point moved to end
@cli.command()
@click.option('--client-id', type=int, help='Filter by client ID')
def list_invoices(client_id: Optional[int]):
"""List Paymo invoices"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
invoices = client.get_invoices(client_id)
table = Table(title="Paymo Invoices")
table.add_column("ID", style="cyan")
table.add_column("Number", style="white")
table.add_column("Client", style="magenta")
table.add_column("Amount", style="green")
table.add_column("Date", style="yellow")
table.add_column("Status", style="blue")
for invoice in invoices:
table.add_row(
str(invoice.get('id', '')),
invoice.get('number', ''),
invoice.get('client_name', ''),
f"${invoice.get('total', 0):,.2f}",
invoice.get('date', ''),
invoice.get('status', '')
)
console.print(table)
@cli.command()
@click.option('--start', required=True, help='Start date (YYYY-MM-DD)')
@click.option('--end', required=True, help='End date (YYYY-MM-DD)')
@click.option('--project-id', type=int, help='Filter by project ID')
@click.option('--output', '-o', help='Output file path')
def export_timesheet(start: str, end: str, project_id: Optional[int], output: Optional[str]):
"""Export timesheet to CSV"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
console.print(f"\n[bold]Exporting timesheet: {start} to {end}[/bold]")
if project_id:
console.print(f"[yellow]Project ID: {project_id}[/yellow]")
try:
csv_content = client.export_timesheet_csv(start, end, project_id)
# Determine output filename
if not output:
output = f"paymo_timesheet_{start}_{end}.csv"
# Save file
with open(output, 'w') as f:
f.write(csv_content)
console.print(f"[green]✓ Exported to: {output}[/green]")
console.print(f"[green] Size: {len(csv_content):,} bytes[/green]")
except Exception as e:
console.print(f"[red]Error exporting timesheet: {e}[/red]")
raise
# MCP Server Implementation
if MCP_AVAILABLE:
mcp = FastMCP("Paymo Timesheet Manager")
@mcp.tool()
def list_paymo_clients(include_inactive: bool = False) -> List[Dict[str, Any]]:
"""
List Paymo clients with essential details only
Args:
include_inactive: If True, includes inactive/archived clients (default: False)
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
clients = client.get_clients(active_only=not include_inactive)
# Return only essential fields to minimize context usage
return [{
'id': c.get('id'),
'name': c.get('name'),
'active': c.get('active', True)
} for c in clients]
@mcp.tool()
def create_paymo_client(
name: str,
address: Optional[str] = None,
city: Optional[str] = None,
state: Optional[str] = None,
postal_code: Optional[str] = None,
country: Optional[str] = None,
phone: Optional[str] = None,
email: Optional[str] = None,
website: Optional[str] = None,
fiscal_information: Optional[str] = None,
active: bool = True
) -> Dict[str, Any]:
"""
Create a new Paymo client with contact information
Args:
name: Client/company name (required, e.g., "Baker McKenzie")
address: Street address
city: City
state: State/Province
postal_code: ZIP/Postal code
country: Country
phone: Phone number
email: Primary contact email
website: Website URL
fiscal_information: Tax ID or fiscal info
active: Whether client is active (default: True)
Returns:
Created client details: id, name, email, active status
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
# Build kwargs for optional parameters
kwargs = {'active': active}
if address:
kwargs['address'] = address
if city:
kwargs['city'] = city
if state:
kwargs['state'] = state
if postal_code:
kwargs['postal_code'] = postal_code
if country:
kwargs['country'] = country
if phone:
kwargs['phone'] = phone
if email:
kwargs['email'] = email
if website:
kwargs['website'] = website
if fiscal_information:
kwargs['fiscal_information'] = fiscal_information
c = client.create_client(name, **kwargs)
return {
'id': c.get('id'),
'name': c.get('name'),
'email': c.get('email'),
'active': c.get('active')
}
@mcp.tool()
def list_paymo_projects(include_inactive: bool = False) -> List[Dict[str, Any]]:
"""
List Paymo projects with essential details only
Args:
include_inactive: If True, includes archived/inactive projects (default: False)
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
projects = client.get_projects(active_only=not include_inactive)
# Return essential fields for filtering/querying while removing noise
# Keep: identification, status, billing fields
# Remove: UI fields (color), internal IDs (workflow_id, budget_id), arrays (users, managers), timestamps
return [{
'id': p.get('id'),
'name': p.get('name'),
'code': p.get('code'),
'client_id': p.get('client_id'),
'client_name': p.get('client_name'),
'active': p.get('active'),
'billable': p.get('billable'),
'price_per_hour': p.get('price_per_hour'),
'flat_billing': p.get('flat_billing'),
'invoiced': p.get('invoiced')
} for p in projects]
@mcp.tool()
def create_paymo_project(
name: str,
client_id: int,
code: Optional[str] = None,
price_per_hour: Optional[float] = None,
billable: bool = True,
flat_billing: bool = False,
active: bool = True,
hourly_billing_mode: str = "project",
adjustable_hours: bool = True
) -> Dict[str, Any]:
"""
Create a new Paymo project
Args:
name: Project name (e.g., "MacKinnon v. Meta")
client_id: Paymo client ID
code: Short project code (e.g., "MVM")
price_per_hour: Hourly billing rate (e.g., 675)
billable: Whether project is billable (default: True)
flat_billing: Use flat rate instead of hourly (default: False)
active: Whether project is active (default: True)
hourly_billing_mode: Billing mode - "project" or "task" (default: "project")
adjustable_hours: Auto-adjust budget based on task budgets (default: True)
"""
# Convert parameters to proper types (MCP may pass strings)
try:
client_id = int(client_id)
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid client_id '{client_id}': {e}")
if price_per_hour is not None:
try:
price_per_hour = float(price_per_hour)
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid price_per_hour '{price_per_hour}': {e}")
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
# Build kwargs for optional parameters
kwargs = {
'billable': billable,
'flat_billing': flat_billing,
'active': active,
'hourly_billing_mode': hourly_billing_mode,
'adjustable_hours': adjustable_hours
}
if code:
kwargs['code'] = code
if price_per_hour is not None:
kwargs['price_per_hour'] = price_per_hour
p = client.create_project(name, client_id, **kwargs)
return {
'id': p.get('id'),
'name': p.get('name'),
'code': p.get('code'),
'client_id': p.get('client_id'),
'price_per_hour': p.get('price_per_hour'),
'hourly_billing_mode': p.get('hourly_billing_mode'),
'adjustable_hours': p.get('adjustable_hours'),
'billable': p.get('billable'),
'active': p.get('active')
}
@mcp.tool()
def update_paymo_project(
project_id: int,
name: Optional[str] = None,
client_id: Optional[int] = None,
code: Optional[str] = None,
price_per_hour: Optional[float] = None,
billable: Optional[bool] = None,
flat_billing: Optional[bool] = None,
active: Optional[bool] = None,
hourly_billing_mode: Optional[str] = None,
adjust_price: Optional[bool] = None
) -> Dict[str, Any]:
"""
Update an existing Paymo project
Args:
project_id: Paymo project ID
name: Project name
client_id: Change the client assigned to this project
code: Short project code
price_per_hour: Hourly billing rate
billable: Whether project is billable
flat_billing: Use flat rate instead of hourly
active: Whether project is active
hourly_billing_mode: Billing mode - "project" or "task"
adjust_price: Budget estimate adjusted automatically
"""
# Convert parameters to proper types (MCP may pass strings)
try:
project_id = int(project_id)
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid project_id '{project_id}': {e}")
if client_id is not None:
try:
client_id = int(client_id)
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid client_id '{client_id}': {e}")
if price_per_hour is not None:
try:
price_per_hour = float(price_per_hour)
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid price_per_hour '{price_per_hour}': {e}")
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
# Build payload with only provided values
payload = {}
if name is not None:
payload['name'] = name
if client_id is not None:
payload['client_id'] = client_id
if code is not None:
payload['code'] = code
if price_per_hour is not None:
payload['price_per_hour'] = price_per_hour
if billable is not None:
payload['billable'] = billable
if flat_billing is not None:
payload['flat_billing'] = flat_billing
if active is not None:
payload['active'] = active
if hourly_billing_mode is not None:
payload['hourly_billing_mode'] = hourly_billing_mode
if adjust_price is not None:
payload['adjust_price'] = adjust_price
p = client.update_project(project_id, **payload)
return {
'id': p.get('id'),
'name': p.get('name'),
'code': p.get('code'),
'client_id': p.get('client_id'),
'price_per_hour': p.get('price_per_hour'),
'hourly_billing_mode': p.get('hourly_billing_mode'),
'adjust_price': p.get('adjust_price'),
'billable': p.get('billable'),
'active': p.get('active')
}
@mcp.tool()
def list_paymo_tasks(project_id: int) -> List[Dict[str, Any]]:
"""List tasks for a specific Paymo project with essential details only"""
# Convert parameters to proper types (MCP may pass strings)
project_id = int(project_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
tasks = client.get_tasks(project_id)
# Return only essential fields to minimize context usage
# Include description since it's often empty but useful when present
return [{
'id': t.get('id'),
'name': t.get('name'),
'description': t.get('description', ''),
'billable': t.get('billable', True)
} for t in tasks]
@mcp.tool()
def rename_paymo_task(task_id: int, name: str) -> Dict[str, Any]:
"""
Rename a Paymo task
Args:
task_id: Paymo task ID
name: New name for the task
"""
# Convert parameters to proper types (MCP may pass strings)
task_id = int(task_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
return client.update_task(task_id, name=name)
@mcp.tool()
def create_paymo_task(
project_id: int,
name: str,
billable: bool = True
) -> Dict[str, Any]:
"""
Create a new task in a Paymo project
Args:
project_id: Paymo project ID
name: Task name (e.g., "Document Review")
billable: Whether task is billable (default: True)
"""
# Convert parameters to proper types (MCP may pass strings)
project_id = int(project_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
result = client.create_task(project_id, name, billable)
t = result.get('tasks', [{}])[0] if 'tasks' in result else result
return {
'id': t.get('id'),
'name': t.get('name'),
'project_id': t.get('project_id'),
'billable': t.get('billable')
}
@mcp.tool()
def create_paymo_entry(
task_id: int,
date: str,
description: str,
duration_hours: float = None,
start_time: str = None,
end_time: str = None,
added_manually: bool = True,
timezone: str = "America/Chicago"
) -> Dict[str, Any]:
"""
Create a single time entry in Paymo.
Provide EITHER:
- duration_hours for simple duration logging (e.g., "3 hours on this task"), OR
- start_time + end_time for precise time blocks (e.g., "12:30 PM - 3:21 PM")
Args:
task_id: Paymo task ID
date: Date in YYYY-MM-DD format
description: Entry description
duration_hours: Hours worked (use this OR start_time/end_time)
start_time: Start time in HH:MM 24-hour format (use with end_time)
end_time: End time in HH:MM 24-hour format (use with start_time)
added_manually: Entry type - True for manual/form entry (default), False for timer-tracked
timezone: IANA timezone for start/end times (default: America/Chicago)
"""
# Convert parameters to proper types (MCP may pass strings)
task_id = int(task_id)
if duration_hours is not None:
duration_hours = float(duration_hours)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
# Build payload based on what was provided
payload = {
'task_id': task_id,
'date': date,
'description': description,
'added_manually': added_manually
}
if start_time and end_time:
# Precise time block entry - Paymo calculates duration from start/end
# Convert local timezone to UTC for API
from zoneinfo import ZoneInfo
local_tz = ZoneInfo(timezone)
utc = ZoneInfo("UTC")
start_dt = datetime.strptime(f"{date} {start_time}", "%Y-%m-%d %H:%M")
start_dt = start_dt.replace(tzinfo=local_tz).astimezone(utc)
end_dt = datetime.strptime(f"{date} {end_time}", "%Y-%m-%d %H:%M")
end_dt = end_dt.replace(tzinfo=local_tz).astimezone(utc)
payload['start_time'] = start_dt.strftime("%Y-%m-%dT%H:%M:%S")
payload['end_time'] = end_dt.strftime("%Y-%m-%dT%H:%M:%S")
elif duration_hours is not None:
# Simple duration entry
payload['duration'] = int(duration_hours * 3600)
else:
raise ValueError("Provide either duration_hours OR both start_time and end_time")
return client.create_entry(**payload)
@mcp.tool()
def submit_paymo_timesheet(yaml_content: str) -> Dict[str, Any]:
"""
Submit a complete timesheet from YAML content
Args:
yaml_content: YAML timesheet content with entries
Returns:
Summary of created entries
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
# Parse YAML
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(yaml_content)
yaml_file = f.name
try:
client = PaymoClient(api_key)
processor = TimesheetProcessor(client, config)
created = processor.submit(yaml_file, auto_confirm=True)
return {
"success": True,
"entries_created": len(created),
"entries": created
}
finally:
os.unlink(yaml_file)
@mcp.tool()
def export_paymo_timesheet(
start_date: str,
end_date: str,
project_id: Optional[int] = None
) -> str:
"""
Export timesheet as CSV content
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
project_id: Optional project filter
Returns:
CSV content as string (can be saved to file or displayed)
"""
# Convert parameters to proper types (MCP may pass strings)
if project_id is not None:
project_id = int(project_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
return client.export_timesheet_csv(start_date, end_date, project_id)
@mcp.tool()
def list_paymo_invoices(client_id: Optional[int] = None, status: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List Paymo invoices with essential details only
Args:
client_id: Filter by client ID
status: Filter by status (sent, viewed, paid)
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
invoices = client.get_invoices(client_id, status)
# Return only essential fields to minimize context usage
# Keep: identification, client, amounts, dates, status
# Remove: internal IDs, arrays, detailed line items
return [{
'id': inv.get('id'),
'number': inv.get('number'),
'client_id': inv.get('client_id'),
'client_name': inv.get('client_name'),
'date': inv.get('date'),
'due_date': inv.get('due_date'),
'status': inv.get('status'),
'subtotal': inv.get('subtotal'),
'total': inv.get('total'),
'currency': inv.get('currency', 'USD')
} for inv in invoices]
@mcp.tool()
def get_projects_without_recent_invoices(days: int = 30) -> List[Dict[str, Any]]:
"""
Get active projects that haven't been invoiced in the specified number of days.
Efficient for queries like "which projects haven't I invoiced this month?"
Args:
days: Number of days to look back (default 30)
Returns:
List of projects without recent invoices: project name, client, last invoice date, days since last invoice
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
# Get all active projects
projects = client.get_projects()
# Get all invoices (we only need recent ones but API doesn't support date filtering)
invoices = client.get_invoices()
# Build map of project -> most recent invoice date
project_last_invoice = {}
for invoice in invoices:
inv_date_str = invoice.get('date', '')
if not inv_date_str:
continue
inv_date = datetime.strptime(inv_date_str, '%Y-%m-%d')
# Find which projects are on this invoice by checking invoice items
# For now, use client_id as proxy (simplification)
client_id = invoice.get('client_id')
for project in projects:
if project.get('client_id') == client_id:
project_id = project.get('id')
if project_id not in project_last_invoice or inv_date > project_last_invoice[project_id]:
project_last_invoice[project_id] = inv_date
# Build result: projects without recent invoices
result = []
for project in projects:
if not project.get('active'):
continue
project_id = project.get('id')
last_invoice_date = project_last_invoice.get(project_id)
# Include if: no invoice ever, or last invoice before cutoff
if not last_invoice_date or last_invoice_date < cutoff_date:
days_since = (datetime.now() - last_invoice_date).days if last_invoice_date else 999
result.append({
'project_id': project_id,
'project_name': project.get('name'),
'client_name': project.get('client_name'),
'last_invoice_date': last_invoice_date.strftime('%Y-%m-%d') if last_invoice_date else 'Never',
'days_since_invoice': days_since
})
# Sort by days since invoice descending
result.sort(key=lambda x: x['days_since_invoice'], reverse=True)
return result
@mcp.tool()
def get_outstanding_invoices_last_week() -> List[Dict[str, Any]]:
"""Get outstanding invoices (sent or viewed) from the last 7 days"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
return client.get_outstanding_invoices_last_week()
@mcp.tool()
def export_invoice_timesheet(invoice_id: int) -> str:
"""
Export timesheet CSV for entries on a specific invoice
Args:
invoice_id: Invoice ID to export
Returns:
CSV content as string (can be saved to file or displayed)
"""
# Convert parameters to proper types (MCP may pass strings)
invoice_id = int(invoice_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
return client.export_invoice_entries_csv(invoice_id)
@mcp.tool()
def delete_paymo_entry(entry_id: int) -> str:
"""
Delete a time entry by ID
Args:
entry_id: The ID of the time entry to delete
"""
# Convert parameters to proper types (MCP may pass strings)
entry_id = int(entry_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
try:
client.delete_entry(entry_id)
return f"Successfully deleted entry {entry_id}"
except Exception as e:
return f"Failed to delete entry: {e}"
@mcp.tool()
def mark_paymo_entry_billed(entry_id: int, billed: bool = True) -> Dict[str, Any]:
"""
Mark a time entry as billed or unbilled
Args:
entry_id: The ID of the time entry
billed: True to mark as billed, False to mark as unbilled (default: True)
"""
# Convert parameters to proper types (MCP may pass strings)
entry_id = int(entry_id)
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
return client.update_entry(entry_id, billed=billed)
@mcp.tool()
def list_paymo_entries(
start_date: str,
end_date: str,
project_id: Optional[int] = None,
billed: Optional[bool] = None
) -> List[Dict[str, Any]]:
"""
List time entries with optional filters
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
project_id: Optional project filter
billed: Optional filter - True for billed, False for unbilled, None for all
Returns:
List of time entries with task names, durations, descriptions
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
# Get entries
entries = client.get_entries(start_date, end_date)
# Filter by project if specified
if project_id is not None:
entries = [e for e in entries if e.get('project_id') == project_id]
# Filter by billed status if specified
if billed is not None:
entries = [e for e in entries if e.get('billed') == billed]
# Enhance entries with task names and readable data
# Use cache to avoid repeated API calls for same task_id
task_cache = {}
result = []
for entry in entries:
# Get task name (with caching)
task_id = entry.get('task_id')
task_name = ''
if task_id:
# Check cache first
if task_id in task_cache:
task_name = task_cache[task_id]
else:
# Fetch from API and cache result
try:
import time
time.sleep(0.5) # Small delay to avoid rate limits
task_response = client._request('GET', f'tasks/{task_id}')
task_data = task_response.get('tasks', [{}])[0]
task_name = task_data.get('name', '')
task_cache[task_id] = task_name
except Exception as e:
# If rate limited, retry once after delay
if '429' in str(e):
try:
time.sleep(2)
task_response = client._request('GET', f'tasks/{task_id}')
task_data = task_response.get('tasks', [{}])[0]
task_name = task_data.get('name', '')
task_cache[task_id] = task_name
except Exception as retry_err:
task_name = f'Task {task_id}'
task_cache[task_id] = task_name
else:
task_name = f'Task {task_id}'
task_cache[task_id] = task_name
# Calculate duration in hours
duration_hours = entry.get('duration', 0) / 3600 if entry.get('duration') else 0
# Clean description
description = entry.get('description', '')
if description:
import re
import html
description = re.sub(r'<[^>]+>', '', description)
description = html.unescape(description).strip()
result.append({
'id': entry.get('id'),
'project_id': entry.get('project_id'),
'task_id': task_id,
'task_name': task_name,
'date': entry.get('date', ''),
'start_time': entry.get('start_time', ''),
'end_time': entry.get('end_time', ''),
'duration_hours': round(duration_hours, 2),
'description': description,
'billed': entry.get('billed', False),
'price': entry.get('price', 0)
})
return result
@mcp.tool()
def get_projects_needing_invoicing(
month: str = None,
min_unbilled_hours: float = 0.0
) -> Dict[str, Any]:
"""
Single efficient query combining invoice recency, unbilled hours, and filtering.
Perfect for "what active projects need invoicing?"
Args:
month: Month in YYYY-MM format (defaults to current month)
min_unbilled_hours: Minimum unbilled hours to include (default 0)
Returns:
Dict with projects_needing_invoicing, month, total_unbilled
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
from datetime import datetime, timedelta
# Parse month or default to current
if month:
month_start = datetime.strptime(month + '-01', '%Y-%m-%d')
else:
month_start = datetime.now().replace(day=1)
month = month_start.strftime('%Y-%m')
# Calculate date range: start of month to now (or end of month if past month)
end_date = datetime.now()
if end_date < month_start:
# Future month requested, use end of that month
next_month = month_start.replace(day=28) + timedelta(days=4)
end_date = next_month - timedelta(days=next_month.day)
# Get all active projects
projects = client.get_projects()
active_projects = [p for p in projects if p.get('active')]
# Get all entries for the last 90 days (to check both billed and unbilled)
# Use tomorrow as end date to catch entries for "today" in all timezones
start_search = (datetime.now() - timedelta(days=90)).strftime('%Y-%m-%d')
end_search = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
all_entries = client.get_entries(start_search, end_search)
# Process each project
results = []
total_unbilled = 0
for project in active_projects:
project_id = project.get('id')
# Get entries for this project
project_entries = [e for e in all_entries if e.get('project_id') == project_id]
# Find last invoice date (most recent billed entry)
billed_entries = [e for e in project_entries if e.get('invoice_item_id')]
last_invoice_date = None
if billed_entries:
# Find most recent billed entry
for entry in billed_entries:
entry_date_str = entry.get('date') or entry.get('start_time', '')[:10]
if entry_date_str:
entry_date = datetime.strptime(entry_date_str, '%Y-%m-%d')
if not last_invoice_date or entry_date > last_invoice_date:
last_invoice_date = entry_date
# Calculate unbilled hours/amount
unbilled_entries = [e for e in project_entries if not e.get('billed', False)]
unbilled_hours = sum(e.get('duration', 0) / 3600 for e in unbilled_entries)
rate = project.get('price_per_hour', 0)
unbilled_amount = unbilled_hours * rate
# Filter: must have unbilled hours AND not invoiced this month
has_unbilled = unbilled_hours >= min_unbilled_hours
not_invoiced_this_month = not last_invoice_date or last_invoice_date < month_start
if has_unbilled and not_invoiced_this_month:
results.append({
'project_id': project_id,
'project_name': project.get('name'),
'client_name': project.get('client_name'),
'rate': rate,
'last_invoice_date': last_invoice_date.strftime('%Y-%m-%d') if last_invoice_date else None,
'unbilled_hours': round(unbilled_hours, 2),
'unbilled_amount': round(unbilled_amount, 2)
})
total_unbilled += unbilled_amount
# Sort by unbilled amount descending
results.sort(key=lambda x: x['unbilled_amount'], reverse=True)
return {
'projects_needing_invoicing': results,
'month': month,
'total_unbilled': round(total_unbilled, 2)
}
@mcp.tool()
def get_unbilled_summary(
start_date: str = None,
end_date: str = None
) -> List[Dict[str, Any]]:
"""
Get unbilled hours and revenue summary by project.
Efficient query that returns only aggregated data, not individual entries.
Args:
start_date: Optional start date (YYYY-MM-DD), defaults to 60 days ago
end_date: Optional end date (YYYY-MM-DD), defaults to tomorrow (to catch today's entries in all timezones)
Returns:
List of projects with unbilled summary: project name, client, rate, unbilled hours, unbilled amount
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured")
client = PaymoClient(api_key)
# Default date range: last 60 days to tomorrow (to catch timezone edge cases)
from datetime import datetime, timedelta
if not start_date:
start_date = (datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d')
if not end_date:
# Use tomorrow to ensure we catch entries for "today" in all timezones
# and any entries dated for tomorrow that were entered early
end_date = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
# Get all projects
projects = client.get_projects()
# Get all unbilled entries (without fetching task names - more efficient)
all_entries = client.get_entries(start_date, end_date)
unbilled_entries = [e for e in all_entries if not e.get('billed', False)]
# Aggregate by project
project_summary = {}
for entry in unbilled_entries:
project_id = entry.get('project_id')
if not project_id:
continue
if project_id not in project_summary:
project_summary[project_id] = {
'total_hours': 0,
'total_amount': 0
}
# Add hours
duration_hours = entry.get('duration', 0) / 3600 if entry.get('duration') else 0
project_summary[project_id]['total_hours'] += duration_hours
# Add amount (use entry price if available, otherwise calculate from duration)
price = entry.get('price', 0)
if not price and duration_hours > 0:
# Find project rate
project = next((p for p in projects if p.get('id') == project_id), None)
if project:
price = duration_hours * project.get('price_per_hour', 0)
project_summary[project_id]['total_amount'] += price
# Build result with project details
result = []
for project_id, summary in project_summary.items():
project = next((p for p in projects if p.get('id') == project_id), None)
if project and summary['total_hours'] > 0:
result.append({
'project_id': project_id,
'project_name': project.get('name'),
'client_name': project.get('client_name'),
'rate': project.get('price_per_hour'),
'unbilled_hours': round(summary['total_hours'], 2),
'unbilled_amount': round(summary['total_amount'], 2)
})
# Sort by unbilled amount descending
result.sort(key=lambda x: x['unbilled_amount'], reverse=True)
return result
@mcp.tool()
def list_paymo_expenses(
project_id: Optional[int] = None,
client_id: Optional[int] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
billed: Optional[bool] = None
) -> List[Dict[str, Any]]:
"""
List expenses with optional filters
Args:
project_id: Filter by project ID
client_id: Filter by client ID
start_date: Filter from date (YYYY-MM-DD)
end_date: Filter to date (YYYY-MM-DD)
billed: Filter by billed status (True=billed, False=unbilled, None=all)
Returns:
List of expenses with id, project_name, client_name, date, amount, description, billed
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
expenses = client.get_expenses(
project_id=project_id,
client_id=client_id,
start_date=start_date,
end_date=end_date,
billed=billed
)
# Enrich with project and client names
projects = {p['id']: p for p in client.get_projects()}
clients = {c['id']: c for c in client.get_clients()}
result = []
for expense in expenses:
project = projects.get(expense.get('project_id'), {})
client_obj = clients.get(project.get('client_id'), {})
result.append({
'id': expense.get('id'),
'project_name': project.get('name'),
'client_name': client_obj.get('name'),
'date': expense.get('date'),
'amount': expense.get('amount'),
'description': expense.get('description'),
'billable': expense.get('billable'),
'billed': expense.get('billed'),
'expense_category_id': expense.get('expense_category_id')
})
return result
@mcp.tool()
def create_paymo_expense(
project_id: int,
amount: float,
date: str,
description: Optional[str] = None,
expense_category_id: Optional[int] = None,
billable: bool = True,
file_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a new expense entry
Args:
project_id: Project to attach expense to
amount: Expense amount (decimal number)
date: Date in YYYY-MM-DD format
description: Expense description
expense_category_id: Category ID (use list_expense_categories to see options)
billable: Whether expense is billable (default True)
file_path: Optional path to receipt image/PDF for upload
Returns:
Created expense with id, amount, date, description, billable, billed
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
expense = client.create_expense(
project_id=project_id,
amount=amount,
date=date,
description=description,
expense_category_id=expense_category_id,
billable=billable,
file_path=file_path
)
return {
'id': expense.get('id'),
'project_id': expense.get('project_id'),
'amount': expense.get('amount'),
'date': expense.get('date'),
'description': expense.get('description'),
'billable': expense.get('billable'),
'billed': expense.get('billed'),
'expense_category_id': expense.get('expense_category_id')
}
@mcp.tool()
def delete_paymo_expense(expense_id: int) -> Dict[str, str]:
"""
Delete an expense by ID
Args:
expense_id: Expense ID to delete
Returns:
Success confirmation
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
client.delete_expense(expense_id)
return {'status': 'success', 'message': f'Expense {expense_id} deleted'}
@mcp.tool()
def mark_paymo_expense_billed(expense_id: int, billed: bool = True) -> Dict[str, Any]:
"""
Mark an expense as billed or unbilled
Args:
expense_id: Expense ID
billed: Billed status (default True)
Returns:
Updated expense
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
expense = client.update_expense(expense_id, billed=billed)
return {
'id': expense.get('id'),
'amount': expense.get('amount'),
'date': expense.get('date'),
'description': expense.get('description'),
'billed': expense.get('billed')
}
@mcp.tool()
def list_expense_categories() -> List[Dict[str, Any]]:
"""
List available expense categories
Returns:
List of categories with id and name
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
categories = client.get_expense_categories()
return [
{
'id': cat.get('id'),
'name': cat.get('name')
}
for cat in categories
]
@mcp.tool()
def get_unbilled_expenses_summary(
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Get unbilled expenses summary by project (mirrors get_unbilled_summary for time entries)
Args:
start_date: Optional start date (YYYY-MM-DD), defaults to 60 days ago
end_date: Optional end date (YYYY-MM-DD), defaults to tomorrow
Returns:
List of projects with unbilled_expense_count and unbilled_expense_total
"""
config = load_config()
api_key = config.get('api_key')
if not api_key:
raise ValueError("API key not configured in ~/.mcp-auth/paymo/auth.json")
client = PaymoClient(api_key)
# Default date range: last 60 days to tomorrow
from datetime import datetime, timedelta
if not start_date:
start_date = (datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d')
if not end_date:
end_date = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
# Get all projects
projects = {p['id']: p for p in client.get_projects()}
clients = {c['id']: c for c in client.get_clients()}
# Get unbilled expenses
all_expenses = client.get_expenses(start_date=start_date, end_date=end_date, billed=False)
# Aggregate by project
project_summary = {}
for expense in all_expenses:
project_id = expense.get('project_id')
if project_id not in project_summary:
project_summary[project_id] = {
'count': 0,
'total': 0.0
}
project_summary[project_id]['count'] += 1
project_summary[project_id]['total'] += float(expense.get('amount', 0))
# Build result
result = []
for project_id, summary in project_summary.items():
project = projects.get(project_id, {})
client_obj = clients.get(project.get('client_id'), {})
result.append({
'project_id': project_id,
'project_name': project.get('name'),
'client_name': client_obj.get('name'),
'unbilled_expense_count': summary['count'],
'unbilled_expense_total': round(summary['total'], 2)
})
# Sort by unbilled total descending
result.sort(key=lambda x: x['unbilled_expense_total'], reverse=True)
return result
def run_mcp_server():
"""Run as MCP server"""
if not MCP_AVAILABLE:
console.print("[red]Error: fastmcp not installed. Install with: pip install fastmcp[/red]")
sys.exit(1)
# Don't print to stdout - interferes with MCP JSON-RPC protocol
mcp.run()
@cli.command()
@click.option('--status', help='Filter by status (sent, viewed, paid)')
@click.option('--last-week', is_flag=True, help='Only show invoices from last 7 days')
def list_invoices_filtered(status: Optional[str], last_week: bool):
"""List Paymo invoices with filters"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
if last_week:
invoices = client.get_outstanding_invoices_last_week()
console.print(f"\n[bold]Outstanding invoices from last 7 days[/bold]\n")
else:
invoices = client.get_invoices(status=status)
table = Table(title="Paymo Invoices")
table.add_column("ID", style="cyan")
table.add_column("Number", style="white")
table.add_column("Client", style="magenta")
table.add_column("Amount", style="green")
table.add_column("Date", style="yellow")
table.add_column("Status", style="blue")
total = 0
for invoice in invoices:
amount = invoice.get('total', 0)
total += amount
table.add_row(
str(invoice.get('id', '')),
invoice.get('number', ''),
invoice.get('client_name', ''),
f"${amount:,.2f}",
invoice.get('date', ''),
invoice.get('status', '')
)
console.print(table)
console.print(f"\n[bold]Total: ${total:,.2f}[/bold]")
console.print(f"[bold]Count: {len(invoices)} invoices[/bold]\n")
@cli.command()
@click.option('--invoice-id', type=int, help='Specific invoice ID')
@click.option('--last-week', is_flag=True, help='Export for all outstanding invoices from last week')
@click.option('--output-dir', '-o', default='.', help='Output directory for exports')
@click.option('--no-date', is_flag=True, help='Exclude date column')
@click.option('--no-start-time', is_flag=True, help='Exclude start time column')
@click.option('--no-end-time', is_flag=True, help='Exclude end time column')
def export_invoice_timesheets(invoice_id: Optional[int], last_week: bool, output_dir: str,
no_date: bool, no_start_time: bool, no_end_time: bool):
"""Export timesheets for invoice(s)"""
config = load_config()
api_key = config.get('api_key') or click.prompt('Paymo API Key', hide_input=True)
client = PaymoClient(api_key)
# Determine which invoices to export
if invoice_id:
invoices = [client.get_invoice(invoice_id)]
elif last_week:
invoices = client.get_outstanding_invoices_last_week()
console.print(f"\n[bold]Found {len(invoices)} outstanding invoices from last week[/bold]\n")
else:
console.print("[red]Error: Must specify --invoice-id or --last-week[/red]")
return
if not invoices:
console.print("[yellow]No invoices found[/yellow]")
return
# Export each invoice
import os
os.makedirs(output_dir, exist_ok=True)
for inv in invoices:
inv_id = inv.get('id')
inv_number = inv.get('number', f'INV-{inv_id}')
inv_date = inv.get('date', '')
# Get invoice details to find time entries
# Use invoice date and calculate billing period (usually monthly)
if inv_date:
from datetime import datetime, timedelta
inv_dt = datetime.strptime(inv_date, '%Y-%m-%d')
# Assume monthly billing - use first day of month to invoice date
start_date = inv_dt.replace(day=1).strftime('%Y-%m-%d')
end_date = inv_date
else:
# Fallback - use current month
from datetime import datetime
now = datetime.now()
start_date = now.replace(day=1).strftime('%Y-%m-%d')
end_date = now.strftime('%Y-%m-%d')
console.print(f"\n[bold]Exporting: {inv_number}[/bold]")
console.print(f" Period: {start_date} to {end_date}")
console.print(f" Amount: ${inv.get('total', 0):,.2f}")
try:
# Export timesheet - use invoice-specific method to get only entries on this invoice
csv_content = client.export_invoice_entries_csv(
inv_id,
include_date=not no_date,
include_start_time=not no_start_time,
include_end_time=not no_end_time
)
# Save file
filename = f"{inv_number.replace('#', '').replace('/', '-')}_timesheet.csv"
output_path = os.path.join(output_dir, filename)
with open(output_path, 'w') as f:
f.write(csv_content)
console.print(f" [green]✓ Saved: {output_path}[/green]")
except Exception as e:
console.print(f" [red]✗ Error: {e}[/red]")
console.print(f"\n[bold green]Exported {len(invoices)} invoice timesheets[/bold green]\n")
if __name__ == '__main__':
# Check if running as MCP server
if len(sys.argv) > 1 and sys.argv[1] == 'mcp':
run_mcp_server()
else:
cli()