Skip to main content
Glama
IMPLEMENTATION.md19.7 kB
# MCP Toggl Server - Implementation Spec ## Overview FastMCP-based MCP server that fetches and aggregates Toggl time tracking data. **References**: - FastMCP: `/jlowin/fastmcp` (Context7) - Toggl API v9: `/websites/engineering_toggl` (Context7) --- ## 1. Tools ### Tool 1: `get_workspace_users()` Lists all users in the Toggl workspace. Use this to discover available users before filtering. ```python from typing import Annotated from pydantic import Field @mcp.tool( name="get_workspace_users", description=""" Retrieve a list of all users in the Toggl workspace. Use this tool when you need to: - Discover available users for filtering - Get user IDs and emails for aggregation - Build user lists for batch operations Returns array of user objects with id, email, and name. """, tags={"users", "workspace", "reference"}, meta={"version": "1.0", "requires_auth": True} ) def get_workspace_users() -> dict: """ List all users in Toggl workspace. Returns: { "status": "success" | "error", "data": [ {"id": "12345", "email": "user@example.com", "name": "John Doe"}, ... ], "error": null | {"code": str, "message": str} } """ ``` ### Tool 2: `get_toggl_aggregated_data(start_date, end_date, user_id=None)` Fetches time entries from Toggl API for a date range and returns aggregated data grouped by user and matched entities. ```python from typing import Annotated from pydantic import Field @mcp.tool( name="get_toggl_aggregated_data", description=""" Fetch and aggregate Toggl time tracking data for a given date range. Use this tool when you need to: - Get time entry summaries for a specific period - Analyze matched Fibery entity references (#ID [DB] [TYPE]) - Calculate total hours by user and project - Generate time tracking reports The tool automatically: - Parses descriptions to extract Fibery entity references - Groups entries by user and entity - Aggregates durations and calculates hours - Caches results for 1 hour Max date range: 7 days per request (due to API limits). For larger ranges, make multiple requests and aggregate client-side. """, tags={"time-tracking", "aggregation", "reports"}, meta={"version": "1.0", "requires_auth": True, "cached": True} ) def get_toggl_aggregated_data( start_date: Annotated[str, Field( description="Start date in ISO 8601 format (YYYY-MM-DD). Example: '2025-10-06'", pattern=r'^\d{4}-\d{2}-\d{2}$' )], end_date: Annotated[str, Field( description="End date in ISO 8601 format (YYYY-MM-DD). Must be >= start_date and <= start_date + 7 days. Example: '2025-10-13'", pattern=r'^\d{4}-\d{2}-\d{2}$' )], user_id: Annotated[str | None, Field( description="Optional: Filter to single user by ID. If omitted, returns all users. Example: '12345'", default=None )] = None ) -> dict: """ Fetch and aggregate Toggl time entries. The response contains aggregated time data grouped by: 1. User (email) 2. For matched entries: (database, type, entity_id, description) 3. For unmatched entries: description Example response: { "status": "success", "data": { "users": { "user@example.com": { "user_email": "user@example.com", "matched_entities": [ { "entity_database": "Scrum", "entity_type": "Task", "entity_id": "456", "description": "Design UI", "duration_hours": 7.5 } ], "unmatched_activities": [ { "description": "Team meeting", "duration_hours": 1.0 } ], "statistics": { "total_duration_hours": 8.5, "matched_duration_hours": 7.5, "unmatched_duration_hours": 1.0 } } }, "statistics": { "total_users": 1, "total_duration_hours": 8.5, "total_matched_duration_hours": 7.5, "total_unmatched_duration_hours": 1.0 } }, "error": null } """ ``` Output structure (same regardless of filters): ```json { "status": "success" | "error", "data": { "users": { "email@domain.com": { "user_email": "email@domain.com", "matched_entities": [ { "entity_database": "Scrum", "entity_type": "Task", "entity_id": "456", "description": "Design UI", "duration_hours": 7.665 } ], "unmatched_activities": [ { "description": "Team meeting", "duration_hours": 1.19 } ], "statistics": { "total_duration_hours": 8.855, "matched_duration_hours": 7.665, "unmatched_duration_hours": 1.19 } } }, "statistics": { "total_users": 1, "total_duration_hours": 8.855, "total_matched_duration_hours": 7.665, "total_unmatched_duration_hours": 1.19 } }, "error": null | {"code": str, "message": str} } ``` --- ## 2. Toggl API Integration ### Authentication HTTP Basic Auth (base64 encoded). Load from `.env` file: ``` TOGGL_API_TOKEN=<your_token> TOGGL_WORKSPACE_ID=<workspace_id> ``` ```python from base64 import b64encode import httpx import os from dotenv import load_dotenv load_dotenv() api_token = os.getenv("TOGGL_API_TOKEN") workspace_id = os.getenv("TOGGL_WORKSPACE_ID") credentials = b64encode(f"{api_token}:api_token".encode()).decode() headers = { "Content-Type": "application/json", "Authorization": f"Basic {credentials}" } ``` ### Time Entries Fetch (Reports API v3) **Endpoint**: `GET https://api.track.toggl.com/reports/api/v3/workspace/{workspace_id}/search/time_entries` **Strategy**: Day-by-day pagination with page parameter For each day in the date range: ``` start_date=2025-10-06 end_date=2025-10-06 (same day) user_id=12301838 (optional, single user filter) page=1 (increment for pagination) ``` **Response**: Array of time entries with pagination headers: - `X-Next-ID`: Next group ID for pagination - `X-Next-Row-Number`: Next row number for pagination - `X-Is-Final`: Whether this is final page (false = more pages) **Time Entry Structure**: ```json { "id": 4132978931, "workspace_id": 1637944, "user_id": 12301838, "user_email": "user@example.com", "project_name": "ProjectA", "description": "Design UI #456 [Scrum] [Task]", "start": "2025-10-13T08:22:03+02:00", "stop": "2025-10-13T08:48:33+02:00", "duration": 1590 } ``` ### Rate Limiting Toggl: 3 requests per second **Implementation**: Exponential backoff on 429 response - Attempt 1: 60 seconds - Attempt 2: 120 seconds (2 × 60) - Attempt 3: 240 seconds (2 × 120) - Max 3 retries, then fail ```python async def _fetch_with_backoff(url, params, max_retries=3): for attempt in range(max_retries): response = await client.get(url, params=params, headers=headers) if response.status_code == 429: wait_time = 60 * (2 ** attempt) await asyncio.sleep(wait_time) continue return response raise Exception("Max retries exceeded") ``` ### Day-by-Day Pagination Fetch each day separately to avoid pagination complexity: ```python for day in date_range(start_date, end_date): day_str = day.strftime("%Y-%m-%d") page = 1 while True: params = { "start_date": day_str, "end_date": day_str, "page": page, "user_id": user_id # if provided } response = await fetch_with_backoff(url, params) entries.extend(response.json()) # Check if more pages for this day if not response.headers.get("X-Next-ID"): break # Move to next day page += 1 ``` --- ## 3. Description Parsing **Pattern**: Extract Fibery entity references from Toggl descriptions Format: `"Text #ID [DATABASE] [TYPE] [PROJECT]"` Examples: - `"Design UI #456 [Scrum] [Task] [Moneyball]"` → matched - `"Team meeting"` → unmatched - `"#123"` → matched (minimal) **Rules**: - Use **rightmost** `#ID` if multiple exist - Extract first 3 bracket values as: `[DATABASE]`, `[TYPE]`, `[PROJECT]` - Everything before `#ID` is description_clean **Implementation**: ```python import re ENTITY_PATTERN = re.compile(r'#(\d+)') METADATA_PATTERN = re.compile(r'\[([^\]]+)\]') def parse_description(description: str) -> ParsedEntry: matches = list(ENTITY_PATTERN.finditer(description)) if not matches: return ParsedEntry(description_clean=description, is_matched=False) match = matches[-1] # Rightmost entity_id = match.group(1) description_clean = description[:match.start()].strip() text_after_id = description[match.end():] brackets = [m.group(1) for m in METADATA_PATTERN.finditer(text_after_id)] return ParsedEntry( description_clean=description_clean, entity_id=entity_id, entity_database=brackets[0] if len(brackets) > 0 else None, entity_type=brackets[1] if len(brackets) > 1 else None, project=brackets[2] if len(brackets) > 2 else None, is_matched=True ) ``` --- ## 4. Data Aggregation **Group by**: 1. User (email) 2. For matched: (database, type, entity_id, description) - sum duration 3. For unmatched: description - sum duration **Calculate**: duration_hours (duration_seconds / 3600) **Example input** (4 entries): ``` Entry 1: user=alice@, desc="Task #123 [Scrum] [Task]", duration=3600 (1 hour) Entry 2: user=alice@, desc="Task #123 [Scrum] [Task]", duration=1800 (0.5 hour) Entry 3: user=alice@, desc="Another task #123 [Scrum] [Task]", duration=1800 (0.5 hour) Entry 4: user=alice@, desc="Lunch", duration=3600 (1 hour) ``` **Example output**: ```json { "users": { "alice@email.com": { "user_email": "alice@email.com", "matched_entities": [ { "entity_database": "Scrum", "entity_type": "Task", "entity_id": "123", "description": "Task", "duration_hours": 1.25 }, { "entity_database": "Scrum", "entity_type": "Task", "entity_id": "123", "description": "Another task", "duration_hours": 0.5 } ], "unmatched_activities": [ { "description": "Lunch", "duration_hours": 1.0 } ], "statistics": { "total_duration_hours": 3.25, "matched_duration_hours": 1.75, "unmatched_duration_hours": 1.0 } } } } ``` --- ## 5. Caching **Cache Key**: MD5 hash of dates + optional user_id ```python def get_cache_key(start_date, end_date, user_id=None): filter_str = "" if user_id: filter_str = f":{user_id}" key = f"{start_date}:{end_date}{filter_str}" return hashlib.md5(key.encode()).hexdigest() ``` **Storage**: - Cache directory: `cache/` - SQLite index: `cache/index.db` - JSON files: `cache/data/{uuid}.json` **SQLite Schema**: ```sql CREATE TABLE cache ( id INTEGER PRIMARY KEY, hash TEXT NOT NULL UNIQUE, filename TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ``` **TTL**: 1 hour --- ## 6. Validation & Errors ### Date Range Validation ```python def validate_date_range(start_date: str, end_date: str) -> dict | None: try: start = datetime.fromisoformat(start_date).date() end = datetime.fromisoformat(end_date).date() except ValueError: return {"code": "INVALID_DATE_FORMAT", "message": "ISO 8601 format required"} if end < start: return {"code": "INVALID_DATE_RANGE", "message": "end_date must be >= start_date"} if (end - start).days > 7: return {"code": "DATE_RANGE_EXCEEDS_LIMIT", "message": "Max 7 days per request"} return None ``` ### Error Codes - `INVALID_DATE_FORMAT` — dates not ISO 8601 - `INVALID_DATE_RANGE` — end < start - `DATE_RANGE_EXCEEDS_LIMIT` — more than 7 days - `USER_NOT_FOUND` — email not found in workspace - `API_ERROR` — Toggl API error - `INTERNAL_ERROR` — server error --- ## 7. Pipeline (get_toggl_aggregated_data) 1. **Validate** date range (format, range <= 7 days) 2. **Cache Check** → if hit, return 3. **Fetch Time Entries** (day-by-day, with backoff on 429, filter by user_id if provided) 4. **Parse Descriptions** (extract #ID and [brackets]) 5. **Aggregate** (group by user → entity → description) 6. **Cache Save** (JSON + SQLite) 7. **Return** aggregated data --- ## 8. FastMCP Integration ```python from fastmcp import FastMCP from typing import Annotated from pydantic import Field # Create server instance mcp = FastMCP( name="Toggl MCP Server", instructions="Time tracking data fetcher and aggregator using Toggl API" ) # Initialize services at startup cache_service = CacheService(cache_dir="./cache", ttl_hours=1) toggl_service = TogglService(api_token, workspace_id) parser_service = ParserService() aggregator_service = AggregatorService() # Tool 1: Get Workspace Users @mcp.tool( name="get_workspace_users", description=""" Retrieve a list of all users in the Toggl workspace. Use this tool when you need to: - Discover available users for filtering - Get user IDs and emails for aggregation - Build user lists for batch operations Returns array of user objects with id, email, and name. """, tags={"users", "workspace", "reference"}, meta={"version": "1.0", "requires_auth": True} ) async def get_workspace_users() -> dict: """List workspace users""" try: users = await toggl_service.get_workspace_users() return { "status": "success", "data": users, "error": None } except Exception as e: return { "status": "error", "data": None, "error": {"code": "API_ERROR", "message": str(e)} } # Tool 2: Get Toggl Aggregated Data @mcp.tool( name="get_toggl_aggregated_data", description=""" Fetch and aggregate Toggl time tracking data for a given date range. Use this tool when you need to: - Get time entry summaries for a specific period - Analyze matched Fibery entity references (#ID [DB] [TYPE]) - Calculate total hours by user and project - Generate time tracking reports The tool automatically: - Parses descriptions to extract Fibery entity references - Groups entries by user and entity - Aggregates durations and calculates hours - Caches results for 1 hour Max date range: 7 days per request (due to API limits). For larger ranges, make multiple requests and aggregate client-side. """, tags={"time-tracking", "aggregation", "reports"}, meta={"version": "1.0", "requires_auth": True, "cached": True} ) async def get_toggl_aggregated_data( start_date: Annotated[str, Field( description="Start date in ISO 8601 format (YYYY-MM-DD). Example: '2025-10-06'", pattern=r'^\d{4}-\d{2}-\d{2}$' )], end_date: Annotated[str, Field( description="End date in ISO 8601 format (YYYY-MM-DD). Must be >= start_date and <= start_date + 7 days. Example: '2025-10-13'", pattern=r'^\d{4}-\d{2}-\d{2}$' )], user_id: Annotated[str | None, Field( description="Optional: Filter to single user by ID. If omitted, returns all users. Example: '12345'", default=None )] = None ) -> dict: """Fetch and aggregate Toggl data for all users or single user""" try: # 1. Validate dates validation_error = validate_date_range(start_date, end_date) if validation_error: return { "status": "error", "data": None, "error": validation_error } # 2. Try cache cached_data = cache_service.get(start_date, end_date, user_id) if cached_data: return { "status": "success", "data": cached_data, "error": None, "metadata": {"source": "cache"} } # 3. Fetch time entries time_entries = await toggl_service.get_time_entries( start_date, end_date, user_id=user_id if user_id else None ) # 4. Parse descriptions parsed_entries = [ parser_service.parse_description(e["description"]) for e in time_entries ] # 5. Aggregate aggregated = aggregator_service.aggregate(parsed_entries, time_entries) # 6. Build response response_data = { "users": aggregated["users"], "statistics": aggregated["statistics"] } # 7. Cache cache_service.set(start_date, end_date, response_data, user_id) return { "status": "success", "data": response_data, "error": None, "metadata": { "source": "api", "entries_fetched": len(time_entries) } } except Exception as e: return { "status": "error", "data": None, "error": { "code": "INTERNAL_ERROR", "message": str(e) } } # Server entry point if __name__ == "__main__": mcp.run() ``` --- ## 9. Project Structure ``` src/toggl_mcp/ ├── server.py # FastMCP server + tools ├── services/ │ ├── cache_service.py # SQLite + JSON caching │ ├── toggl_service.py # Toggl API client (get_workspace_users, get_time_entries) │ ├── parser_service.py # Description parsing (#ID [brackets]) │ └── aggregator_service.py # Grouping and statistics ├── models.py # Pydantic models (ParsedEntry, etc.) ├── config.py # Config from env vars └── utils.py # Date validation, logging cache/ ├── data/ # JSON cache files └── index.db # SQLite index tests/ ├── test_parser_service.py ├── test_aggregator_service.py └── test_toggl_service.py ``` --- ## 10. Configuration Create `.env` file in project root: ``` TOGGL_API_TOKEN=<your_token> TOGGL_WORKSPACE_ID=<workspace_id> ``` Cache directory is hardcoded to: `./cache` Cache TTL is hardcoded to: 1 hour --- ## 11. Implementation Checklist - [ ] Setup FastMCP server structure - [ ] Implement CacheService (MD5 hash, SQLite, JSON) - [ ] Implement ParserService (regex #ID + brackets) - [ ] Implement AggregatorService (grouping, statistics) - [ ] Implement TogglService (API + backoff, user_id filtering) - [ ] Date range validation (≤ 7 days) - [ ] Error handling with codes - [ ] Tool 1: get_workspace_users() - [ ] Tool 2: get_toggl_aggregated_data(start_date, end_date, user_id=None) - [ ] Unit tests - [ ] Integration test

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ikido/toggl-mcp-custom'

If you have feedback or need assistance with the MCP directory API, please join our Discord server