# MCP Toggl Server - Implementation Spec
## Overview
FastMCP-based MCP server that fetches and aggregates Toggl time tracking data.
**References**:
- FastMCP: `/jlowin/fastmcp` (Context7)
- Toggl API v9: `/websites/engineering_toggl` (Context7)
---
## 1. Tools
### Tool 1: `get_workspace_users()`
Lists all users in the Toggl workspace. Use this to discover available users before filtering.
```python
from typing import Annotated
from pydantic import Field
@mcp.tool(
name="get_workspace_users",
description="""
Retrieve a list of all users in the Toggl workspace.
Use this tool when you need to:
- Discover available users for filtering
- Get user IDs and emails for aggregation
- Build user lists for batch operations
Returns array of user objects with id, email, and name.
""",
tags={"users", "workspace", "reference"},
meta={"version": "1.0", "requires_auth": True}
)
def get_workspace_users() -> dict:
"""
List all users in Toggl workspace.
Returns:
{
"status": "success" | "error",
"data": [
{"id": "12345", "email": "user@example.com", "name": "John Doe"},
...
],
"error": null | {"code": str, "message": str}
}
"""
```
### Tool 2: `get_toggl_aggregated_data(start_date, end_date, user_id=None)`
Fetches time entries from Toggl API for a date range and returns aggregated data grouped by user and matched entities.
```python
from typing import Annotated
from pydantic import Field
@mcp.tool(
name="get_toggl_aggregated_data",
description="""
Fetch and aggregate Toggl time tracking data for a given date range.
Use this tool when you need to:
- Get time entry summaries for a specific period
- Analyze matched Fibery entity references (#ID [DB] [TYPE])
- Calculate total hours by user and project
- Generate time tracking reports
The tool automatically:
- Parses descriptions to extract Fibery entity references
- Groups entries by user and entity
- Aggregates durations and calculates hours
- Caches results for 1 hour
Max date range: 7 days per request (due to API limits).
For larger ranges, make multiple requests and aggregate client-side.
""",
tags={"time-tracking", "aggregation", "reports"},
meta={"version": "1.0", "requires_auth": True, "cached": True}
)
def get_toggl_aggregated_data(
start_date: Annotated[str, Field(
description="Start date in ISO 8601 format (YYYY-MM-DD). Example: '2025-10-06'",
pattern=r'^\d{4}-\d{2}-\d{2}$'
)],
end_date: Annotated[str, Field(
description="End date in ISO 8601 format (YYYY-MM-DD). Must be >= start_date and <= start_date + 7 days. Example: '2025-10-13'",
pattern=r'^\d{4}-\d{2}-\d{2}$'
)],
user_id: Annotated[str | None, Field(
description="Optional: Filter to single user by ID. If omitted, returns all users. Example: '12345'",
default=None
)] = None
) -> dict:
"""
Fetch and aggregate Toggl time entries.
The response contains aggregated time data grouped by:
1. User (email)
2. For matched entries: (database, type, entity_id, description)
3. For unmatched entries: description
Example response:
{
"status": "success",
"data": {
"users": {
"user@example.com": {
"user_email": "user@example.com",
"matched_entities": [
{
"entity_database": "Scrum",
"entity_type": "Task",
"entity_id": "456",
"description": "Design UI",
"duration_hours": 7.5
}
],
"unmatched_activities": [
{
"description": "Team meeting",
"duration_hours": 1.0
}
],
"statistics": {
"total_duration_hours": 8.5,
"matched_duration_hours": 7.5,
"unmatched_duration_hours": 1.0
}
}
},
"statistics": {
"total_users": 1,
"total_duration_hours": 8.5,
"total_matched_duration_hours": 7.5,
"total_unmatched_duration_hours": 1.0
}
},
"error": null
}
"""
```
Output structure (same regardless of filters):
```json
{
"status": "success" | "error",
"data": {
"users": {
"email@domain.com": {
"user_email": "email@domain.com",
"matched_entities": [
{
"entity_database": "Scrum",
"entity_type": "Task",
"entity_id": "456",
"description": "Design UI",
"duration_hours": 7.665
}
],
"unmatched_activities": [
{
"description": "Team meeting",
"duration_hours": 1.19
}
],
"statistics": {
"total_duration_hours": 8.855,
"matched_duration_hours": 7.665,
"unmatched_duration_hours": 1.19
}
}
},
"statistics": {
"total_users": 1,
"total_duration_hours": 8.855,
"total_matched_duration_hours": 7.665,
"total_unmatched_duration_hours": 1.19
}
},
"error": null | {"code": str, "message": str}
}
```
---
## 2. Toggl API Integration
### Authentication
HTTP Basic Auth (base64 encoded). Load from `.env` file:
```
TOGGL_API_TOKEN=<your_token>
TOGGL_WORKSPACE_ID=<workspace_id>
```
```python
from base64 import b64encode
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
api_token = os.getenv("TOGGL_API_TOKEN")
workspace_id = os.getenv("TOGGL_WORKSPACE_ID")
credentials = b64encode(f"{api_token}:api_token".encode()).decode()
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {credentials}"
}
```
### Time Entries Fetch (Reports API v3)
**Endpoint**: `GET https://api.track.toggl.com/reports/api/v3/workspace/{workspace_id}/search/time_entries`
**Strategy**: Day-by-day pagination with page parameter
For each day in the date range:
```
start_date=2025-10-06
end_date=2025-10-06 (same day)
user_id=12301838 (optional, single user filter)
page=1 (increment for pagination)
```
**Response**: Array of time entries with pagination headers:
- `X-Next-ID`: Next group ID for pagination
- `X-Next-Row-Number`: Next row number for pagination
- `X-Is-Final`: Whether this is final page (false = more pages)
**Time Entry Structure**:
```json
{
"id": 4132978931,
"workspace_id": 1637944,
"user_id": 12301838,
"user_email": "user@example.com",
"project_name": "ProjectA",
"description": "Design UI #456 [Scrum] [Task]",
"start": "2025-10-13T08:22:03+02:00",
"stop": "2025-10-13T08:48:33+02:00",
"duration": 1590
}
```
### Rate Limiting
Toggl: 3 requests per second
**Implementation**: Exponential backoff on 429 response
- Attempt 1: 60 seconds
- Attempt 2: 120 seconds (2 × 60)
- Attempt 3: 240 seconds (2 × 120)
- Max 3 retries, then fail
```python
async def _fetch_with_backoff(url, params, max_retries=3):
for attempt in range(max_retries):
response = await client.get(url, params=params, headers=headers)
if response.status_code == 429:
wait_time = 60 * (2 ** attempt)
await asyncio.sleep(wait_time)
continue
return response
raise Exception("Max retries exceeded")
```
### Day-by-Day Pagination
Fetch each day separately to avoid pagination complexity:
```python
for day in date_range(start_date, end_date):
day_str = day.strftime("%Y-%m-%d")
page = 1
while True:
params = {
"start_date": day_str,
"end_date": day_str,
"page": page,
"user_id": user_id # if provided
}
response = await fetch_with_backoff(url, params)
entries.extend(response.json())
# Check if more pages for this day
if not response.headers.get("X-Next-ID"):
break # Move to next day
page += 1
```
---
## 3. Description Parsing
**Pattern**: Extract Fibery entity references from Toggl descriptions
Format: `"Text #ID [DATABASE] [TYPE] [PROJECT]"`
Examples:
- `"Design UI #456 [Scrum] [Task] [Moneyball]"` → matched
- `"Team meeting"` → unmatched
- `"#123"` → matched (minimal)
**Rules**:
- Use **rightmost** `#ID` if multiple exist
- Extract first 3 bracket values as: `[DATABASE]`, `[TYPE]`, `[PROJECT]`
- Everything before `#ID` is description_clean
**Implementation**:
```python
import re
ENTITY_PATTERN = re.compile(r'#(\d+)')
METADATA_PATTERN = re.compile(r'\[([^\]]+)\]')
def parse_description(description: str) -> ParsedEntry:
matches = list(ENTITY_PATTERN.finditer(description))
if not matches:
return ParsedEntry(description_clean=description, is_matched=False)
match = matches[-1] # Rightmost
entity_id = match.group(1)
description_clean = description[:match.start()].strip()
text_after_id = description[match.end():]
brackets = [m.group(1) for m in METADATA_PATTERN.finditer(text_after_id)]
return ParsedEntry(
description_clean=description_clean,
entity_id=entity_id,
entity_database=brackets[0] if len(brackets) > 0 else None,
entity_type=brackets[1] if len(brackets) > 1 else None,
project=brackets[2] if len(brackets) > 2 else None,
is_matched=True
)
```
---
## 4. Data Aggregation
**Group by**:
1. User (email)
2. For matched: (database, type, entity_id, description) - sum duration
3. For unmatched: description - sum duration
**Calculate**: duration_hours (duration_seconds / 3600)
**Example input** (4 entries):
```
Entry 1: user=alice@, desc="Task #123 [Scrum] [Task]", duration=3600 (1 hour)
Entry 2: user=alice@, desc="Task #123 [Scrum] [Task]", duration=1800 (0.5 hour)
Entry 3: user=alice@, desc="Another task #123 [Scrum] [Task]", duration=1800 (0.5 hour)
Entry 4: user=alice@, desc="Lunch", duration=3600 (1 hour)
```
**Example output**:
```json
{
"users": {
"alice@email.com": {
"user_email": "alice@email.com",
"matched_entities": [
{
"entity_database": "Scrum",
"entity_type": "Task",
"entity_id": "123",
"description": "Task",
"duration_hours": 1.25
},
{
"entity_database": "Scrum",
"entity_type": "Task",
"entity_id": "123",
"description": "Another task",
"duration_hours": 0.5
}
],
"unmatched_activities": [
{
"description": "Lunch",
"duration_hours": 1.0
}
],
"statistics": {
"total_duration_hours": 3.25,
"matched_duration_hours": 1.75,
"unmatched_duration_hours": 1.0
}
}
}
}
```
---
## 5. Caching
**Cache Key**: MD5 hash of dates + optional user_id
```python
def get_cache_key(start_date, end_date, user_id=None):
filter_str = ""
if user_id:
filter_str = f":{user_id}"
key = f"{start_date}:{end_date}{filter_str}"
return hashlib.md5(key.encode()).hexdigest()
```
**Storage**:
- Cache directory: `cache/`
- SQLite index: `cache/index.db`
- JSON files: `cache/data/{uuid}.json`
**SQLite Schema**:
```sql
CREATE TABLE cache (
id INTEGER PRIMARY KEY,
hash TEXT NOT NULL UNIQUE,
filename TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
```
**TTL**: 1 hour
---
## 6. Validation & Errors
### Date Range Validation
```python
def validate_date_range(start_date: str, end_date: str) -> dict | None:
try:
start = datetime.fromisoformat(start_date).date()
end = datetime.fromisoformat(end_date).date()
except ValueError:
return {"code": "INVALID_DATE_FORMAT", "message": "ISO 8601 format required"}
if end < start:
return {"code": "INVALID_DATE_RANGE", "message": "end_date must be >= start_date"}
if (end - start).days > 7:
return {"code": "DATE_RANGE_EXCEEDS_LIMIT", "message": "Max 7 days per request"}
return None
```
### Error Codes
- `INVALID_DATE_FORMAT` — dates not ISO 8601
- `INVALID_DATE_RANGE` — end < start
- `DATE_RANGE_EXCEEDS_LIMIT` — more than 7 days
- `USER_NOT_FOUND` — email not found in workspace
- `API_ERROR` — Toggl API error
- `INTERNAL_ERROR` — server error
---
## 7. Pipeline (get_toggl_aggregated_data)
1. **Validate** date range (format, range <= 7 days)
2. **Cache Check** → if hit, return
3. **Fetch Time Entries** (day-by-day, with backoff on 429, filter by user_id if provided)
4. **Parse Descriptions** (extract #ID and [brackets])
5. **Aggregate** (group by user → entity → description)
6. **Cache Save** (JSON + SQLite)
7. **Return** aggregated data
---
## 8. FastMCP Integration
```python
from fastmcp import FastMCP
from typing import Annotated
from pydantic import Field
# Create server instance
mcp = FastMCP(
name="Toggl MCP Server",
instructions="Time tracking data fetcher and aggregator using Toggl API"
)
# Initialize services at startup
cache_service = CacheService(cache_dir="./cache", ttl_hours=1)
toggl_service = TogglService(api_token, workspace_id)
parser_service = ParserService()
aggregator_service = AggregatorService()
# Tool 1: Get Workspace Users
@mcp.tool(
name="get_workspace_users",
description="""
Retrieve a list of all users in the Toggl workspace.
Use this tool when you need to:
- Discover available users for filtering
- Get user IDs and emails for aggregation
- Build user lists for batch operations
Returns array of user objects with id, email, and name.
""",
tags={"users", "workspace", "reference"},
meta={"version": "1.0", "requires_auth": True}
)
async def get_workspace_users() -> dict:
"""List workspace users"""
try:
users = await toggl_service.get_workspace_users()
return {
"status": "success",
"data": users,
"error": None
}
except Exception as e:
return {
"status": "error",
"data": None,
"error": {"code": "API_ERROR", "message": str(e)}
}
# Tool 2: Get Toggl Aggregated Data
@mcp.tool(
name="get_toggl_aggregated_data",
description="""
Fetch and aggregate Toggl time tracking data for a given date range.
Use this tool when you need to:
- Get time entry summaries for a specific period
- Analyze matched Fibery entity references (#ID [DB] [TYPE])
- Calculate total hours by user and project
- Generate time tracking reports
The tool automatically:
- Parses descriptions to extract Fibery entity references
- Groups entries by user and entity
- Aggregates durations and calculates hours
- Caches results for 1 hour
Max date range: 7 days per request (due to API limits).
For larger ranges, make multiple requests and aggregate client-side.
""",
tags={"time-tracking", "aggregation", "reports"},
meta={"version": "1.0", "requires_auth": True, "cached": True}
)
async def get_toggl_aggregated_data(
start_date: Annotated[str, Field(
description="Start date in ISO 8601 format (YYYY-MM-DD). Example: '2025-10-06'",
pattern=r'^\d{4}-\d{2}-\d{2}$'
)],
end_date: Annotated[str, Field(
description="End date in ISO 8601 format (YYYY-MM-DD). Must be >= start_date and <= start_date + 7 days. Example: '2025-10-13'",
pattern=r'^\d{4}-\d{2}-\d{2}$'
)],
user_id: Annotated[str | None, Field(
description="Optional: Filter to single user by ID. If omitted, returns all users. Example: '12345'",
default=None
)] = None
) -> dict:
"""Fetch and aggregate Toggl data for all users or single user"""
try:
# 1. Validate dates
validation_error = validate_date_range(start_date, end_date)
if validation_error:
return {
"status": "error",
"data": None,
"error": validation_error
}
# 2. Try cache
cached_data = cache_service.get(start_date, end_date, user_id)
if cached_data:
return {
"status": "success",
"data": cached_data,
"error": None,
"metadata": {"source": "cache"}
}
# 3. Fetch time entries
time_entries = await toggl_service.get_time_entries(
start_date, end_date,
user_id=user_id if user_id else None
)
# 4. Parse descriptions
parsed_entries = [
parser_service.parse_description(e["description"])
for e in time_entries
]
# 5. Aggregate
aggregated = aggregator_service.aggregate(parsed_entries, time_entries)
# 6. Build response
response_data = {
"users": aggregated["users"],
"statistics": aggregated["statistics"]
}
# 7. Cache
cache_service.set(start_date, end_date, response_data, user_id)
return {
"status": "success",
"data": response_data,
"error": None,
"metadata": {
"source": "api",
"entries_fetched": len(time_entries)
}
}
except Exception as e:
return {
"status": "error",
"data": None,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
# Server entry point
if __name__ == "__main__":
mcp.run()
```
---
## 9. Project Structure
```
src/toggl_mcp/
├── server.py # FastMCP server + tools
├── services/
│ ├── cache_service.py # SQLite + JSON caching
│ ├── toggl_service.py # Toggl API client (get_workspace_users, get_time_entries)
│ ├── parser_service.py # Description parsing (#ID [brackets])
│ └── aggregator_service.py # Grouping and statistics
├── models.py # Pydantic models (ParsedEntry, etc.)
├── config.py # Config from env vars
└── utils.py # Date validation, logging
cache/
├── data/ # JSON cache files
└── index.db # SQLite index
tests/
├── test_parser_service.py
├── test_aggregator_service.py
└── test_toggl_service.py
```
---
## 10. Configuration
Create `.env` file in project root:
```
TOGGL_API_TOKEN=<your_token>
TOGGL_WORKSPACE_ID=<workspace_id>
```
Cache directory is hardcoded to: `./cache`
Cache TTL is hardcoded to: 1 hour
---
## 11. Implementation Checklist
- [ ] Setup FastMCP server structure
- [ ] Implement CacheService (MD5 hash, SQLite, JSON)
- [ ] Implement ParserService (regex #ID + brackets)
- [ ] Implement AggregatorService (grouping, statistics)
- [ ] Implement TogglService (API + backoff, user_id filtering)
- [ ] Date range validation (≤ 7 days)
- [ ] Error handling with codes
- [ ] Tool 1: get_workspace_users()
- [ ] Tool 2: get_toggl_aggregated_data(start_date, end_date, user_id=None)
- [ ] Unit tests
- [ ] Integration test