TOGGL_PIPELINE_DETAILED.md•21.3 kB
# Toggl Data Pipeline - Detailed Architecture & Implementation
**Date**: 2025-10-16
**Purpose**: Comprehensive documentation of how Toggl data is fetched, parsed, and aggregated in volt-agent
---
## Table of Contents
1. [Overview](#overview)
2. [Stage 1: Fetching from Toggl API](#stage-1-fetching-from-toggl-api)
3. [Stage 2: Parsing Descriptions](#stage-2-parsing-descriptions)
4. [Stage 3: Aggregating by User](#stage-3-aggregating-by-user)
5. [Output Structures](#output-structures)
6. [Code Examples](#code-examples)
7. [Performance & Scaling](#performance--scaling)
---
## Overview
The Toggl pipeline consists of three distinct stages that transform raw time tracking data into a structured format that can be matched with Fibery entities:
```
┌─────────────────────────────────────────────────────────────────┐
│ INPUT: Start Date, End Date, (Optional) User Email Filter │
└──────────────────────┬──────────────────────────────────────────┘
│
▼
┌──────────────────────────────┐
│ STAGE 1: Fetch from Toggl │ (Activity: fetch_toggl_data)
│ - Get all time entries │
│ - Day-by-day pagination │
│ - Backoff/retry on 429 │
│ - Returns: raw_toggl_data.json
└──────────────────┬───────────┘
│
▼
┌──────────────────────────────┐
│ STAGE 2: Parse Descriptions │ (Part of aggregate_toggl_data)
│ - Extract Fibery references │
│ - Pattern: #ID [DB] [TYPE] │
│ - Clean descriptions │
│ - Classify matched/unmatched │
└──────────────────┬───────────┘
│
▼
┌──────────────────────────────┐
│ STAGE 3: Aggregate by User │ (Activity: aggregate_toggl_data)
│ - Group by user │
│ - Group matched by entity │
│ - Calculate statistics │
│ - Returns: toggl_aggregated.json
└──────────────────┬───────────┘
│
▼
┌───────────────────────────────────────────────────────┐
│ OUTPUT: toggl_aggregated.json │
│ Ready for STAGE 2: Fibery Enrichment │
└───────────────────────────────────────────────────────┘
```
---
## STAGE 1: Fetching from Toggl API
### 1.1 Entry Point: `fetch_toggl_data()`
**File**: `src/activities/toggl_activities.py`
```python
@activity.defn(name="fetch_toggl_data")
async def fetch_toggl_data(
run_id: str,
start_date: str, # "2025-10-06"
end_date: str, # "2025-10-13"
user_emails: list[str] = None # ["user@email.com", ...]
) -> dict:
"""
Fetches all Toggl time entries for the given date range.
Process:
1. Initialize TogglClient
2. Fetch all users from Toggl workspace
3. Filter users by email (if provided)
4. Fetch time entries for filtered users
5. Save raw data to storage
"""
```
### 1.2 TogglClient: API Communication
**File**: `src/toggl/client.py`
#### Configuration
```python
TOGGL_API_BASE = "https://api.track.toggl.com/reports/api/v3"
TOGGL_API_TOKEN = os.environ.get("TOGGL_API_TOKEN")
WORKSPACE_ID = os.environ.get("TOGGL_WORKSPACE_ID")
```
#### Key Methods
**`get_workspace_users(workspace_id)`**
- Fetches all users in workspace
- Returns: `List[{id, name, email}]`
- Used to map user IDs to emails
**`get_time_entries(start_date, end_date, user_ids=None)`**
- Fetches time entries for date range
- **Day-by-day chunking** to avoid pagination limits
- Returns: List of time entries grouped by user/project/description
### 1.3 Day-by-Day Pagination Strategy
Why day-by-day?
- Toggl Reports API paginates within a day (50 entries per page)
- Multi-day requests can have thousands of entries → difficult to paginate
- **Solution**: Fetch each day independently, concatenate results
```python
# Pseudocode for pagination
for each_day in date_range:
page_num = 1
while True:
response = toggl_api.get_time_entries(
workspace_id=WORKSPACE_ID,
start_date=each_day,
end_date=each_day,
page=page_num,
user_ids=user_ids
)
entries.extend(response.data)
if not response.has_more:
break # Move to next day
page_num += 1
# Continue pagination within same day
```
#### Pagination Headers (Grouped Response)
Toggl API response when `grouped=true`:
```
Response Headers:
├── X-Next-ID: "5321857" (next group ID to fetch)
├── X-Next-Row-Number: "42" (next row in group)
└── X-Is-Final: false (more data available)
Response Body (grouped entries):
[
{
"id": 4132978931,
"user_id": 12301838,
"user_email": "aleksandr.pylaev@wearevolt.com",
"project_name": "ProjectA",
"description": "Task #123 [Scrum] [Feature]",
"start": "2025-10-13T08:22:03+02:00",
"stop": "2025-10-13T08:48:33+02:00",
"duration": 1590, // seconds
"time_entries": [
// Individual entries within this group
]
}
]
```
### 1.4 Rate Limiting & Retry Logic
Toggl API rate limit: **3 requests per second**
Implementation:
```python
def _fetch_with_backoff(self, url, method="GET", max_retries=3):
"""
Fetches with exponential backoff on 429 (rate limit).
Strategy:
1. First attempt: immediate
2. If 429 received:
- Retry 1: wait 60 seconds
- Retry 2: wait 120 seconds (2 * 60)
- Retry 3: wait 240 seconds (2 * 120)
3. After max_retries: raise exception
"""
for attempt in range(max_retries):
try:
response = requests.request(method, url, headers=auth_headers)
if response.status_code == 429:
wait_time = 60 * (2 ** attempt) # Exponential backoff
activity.logger.warning(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
return response
except Exception as e:
if attempt == max_retries - 1:
raise
```
### 1.5 Raw Toggl Data Output
**File**: `tmp/runs/{run_id}/raw_toggl_data.json`
```json
{
"run_id": "run_2025-10-14-09-27-18",
"start_date": "2025-10-13",
"end_date": "2025-10-13",
"user_emails_filter": null,
"fetched_at": "2025-10-14T09:27:18.877898Z",
"time_entries": [
{
"id": 4132978931,
"workspace_id": 1637944,
"user_id": 12301838,
"username": "Aleksandr Pylaev",
"user_email": "aleksandr.pylaev@wearevolt.com",
"description": "Design user interface #456 [Product] [Design]",
"start": "2025-10-13T08:22:03+02:00",
"stop": "2025-10-13T08:48:33+02:00",
"duration": 1590,
"tags": ["design", "ui"],
"project_id": 189340,
"project_name": "ProjectName",
"billable": false
},
// ... more entries
],
"statistics": {
"total_entries": 50,
"total_duration_seconds": 331691,
"unique_users": 13
}
}
```
---
## STAGE 2: Parsing Descriptions
### 2.1 Overview
Each Toggl time entry has a description that may contain optional Fibery entity metadata:
```
Format: "<Description> #<ENTITY_ID> [<DATABASE>] [<TYPE>] [<PROJECT>]"
Examples:
✅ "Design user interface #456 [Scrum] [Task] [Moneyball]"
→ Matched to Scrum/Task with ID 456
✅ "Review code #789"
→ Partially matched (no database/type, will need fallback)
✅ "Team meeting"
→ Unmatched (no entity reference)
✅ "#123"
→ Matched but minimal metadata
```
### 2.2 FiberyParser Implementation
**File**: `src/parser/fibery_parser.py`
```python
class FiberyParser:
"""Parses Toggl descriptions to extract Fibery entity references."""
ENTITY_PATTERN = re.compile(r'#(\d+)') # Rightmost #ID
METADATA_PATTERN = re.compile(r'\[([^\]]+)\]') # Bracket tags
def parse_description(self, description: str) -> dict:
"""
Extracts entity metadata from description.
Returns:
{
'description_clean': str, # Text before entity reference
'entity_id': str | None, # "456"
'entity_database': str | None, # "Scrum"
'entity_type': str | None, # "Task"
'project': str | None, # "Moneyball"
'is_matched': bool # True if entity_id found
}
"""
```
#### Parsing Logic (Step-by-Step)
**Step 1: Find Entity ID**
```python
# Search for rightmost #<number>
matches = list(ENTITY_PATTERN.finditer(description))
if not matches:
return {"is_matched": False, "description_clean": description}
# Use LAST match (rightmost)
match = matches[-1]
entity_id = match.group(1) # "456"
description_before_id = description[:match.start()].strip()
```
**Step 2: Extract Metadata Brackets**
```python
# Find all [xxx] tags after the entity ID
text_after_id = description[match.end():]
bracket_matches = list(METADATA_PATTERN.finditer(text_after_id))
# Extract bracket contents
brackets = [m.group(1) for m in bracket_matches]
# ["Scrum", "Task", "Moneyball"]
# Map to fields
if len(brackets) >= 1:
entity_database = brackets[0] # "Scrum"
if len(brackets) >= 2:
entity_type = brackets[1] # "Task"
if len(brackets) >= 3:
project = brackets[2] # "Moneyball"
```
**Step 3: Return Parsed Result**
```python
return {
'description_clean': description_before_id,
'entity_id': entity_id,
'entity_database': entity_database,
'entity_type': entity_type,
'project': project,
'is_matched': True
}
```
### 2.3 Parser Edge Cases
| Description | Result | Notes |
|-------------|--------|-------|
| `"Work #123 #456 [Scrum]"` | Uses `#456` | Rightmost wins |
| `"Design #123 [Scrum] [Task] [P1] [P2]"` | DB=Scrum, Type=Task, Project=P1 | Takes first 3 brackets |
| `"#123"` | Matched, no metadata | Valid but incomplete |
| `"Issue 123"` | Unmatched | No # sign |
| `"#abc"` | Unmatched | Not a number |
| `"Task\n#456\n[Scrum]"` | Matched | Regex handles newlines |
---
## STAGE 3: Aggregating by User
### 3.1 Aggregation Logic
**File**: `src/activities/toggl_activities.py` → `aggregate_toggl_data(run_id)`
#### Input
- `raw_toggl_data.json`: List of ~50-100 raw time entries
#### Process
**Step 1: Load and Parse All Entries**
```python
raw_entries = storage.load_raw_toggl_data(run_id)
parsed_entries = []
for entry in raw_entries["time_entries"]:
parsed = parser.parse_description(entry["description"])
parsed_entries.append({
**entry,
**parsed # Add is_matched, entity_id, etc.
})
```
**Step 2: Group by User**
```python
entries_by_user = defaultdict(list)
for entry in parsed_entries:
user_email = entry["user_email"]
entries_by_user[user_email].append(entry)
```
**Step 3: For Each User, Group Matched Entries**
```python
for user_email, user_entries in entries_by_user.items():
# Separate matched from unmatched
matched = [e for e in user_entries if e["is_matched"]]
unmatched = [e for e in user_entries if not e["is_matched"]]
# Group matched by (database, type, entity_id)
matched_grouped = defaultdict(list)
for entry in matched:
key = (
entry["entity_database"],
entry["entity_type"],
entry["entity_id"]
)
matched_grouped[key].append(entry)
# Within each entity, group by description
matched_entities = []
for (db, typ, eid), entries_for_entity in matched_grouped.items():
# Group by description
by_desc = defaultdict(list)
for e in entries_for_entity:
by_desc[e["description_clean"]].append(e)
# Create aggregated entries
entity_data = {
"entity_database": db,
"entity_type": typ,
"entity_id": eid,
"project": entries_for_entity[0].get("project"),
"duration_seconds": sum(e["duration"] for e in entries_for_entity),
"entries_count": len(entries_for_entity),
"entries": [
{
"description": desc,
"duration_seconds": sum(e["duration"] for e in desc_entries),
"duration_hours": sum(e["duration"] for e in desc_entries) / 3600,
"entry_count": len(desc_entries)
}
for desc, desc_entries in by_desc.items()
]
}
matched_entities.append(entity_data)
# Group unmatched by description only
unmatched_grouped = defaultdict(list)
for entry in unmatched:
unmatched_grouped[entry["description_clean"]].append(entry)
unmatched_activities = [
{
"description": desc,
"duration_seconds": sum(e["duration"] for e in entries),
"duration_hours": sum(e["duration"] for e in entries) / 3600,
"entries_count": len(entries)
}
for desc, entries in unmatched_grouped.items()
]
```
**Step 4: Calculate User Statistics**
```python
user_data = {
"user_email": user_email,
"matched_entities": matched_entities,
"unmatched_activities": unmatched_activities,
"statistics": {
"total_duration_seconds": sum(e["duration"] for e in user_entries),
"matched_duration_seconds": sum(e["duration"] for e in matched),
"unmatched_duration_seconds": sum(e["duration"] for e in unmatched),
"total_entries": len(user_entries),
"matched_entries": len(matched),
"unmatched_entries": len(unmatched)
}
}
```
### 3.2 Final Aggregated Output
**File**: `tmp/runs/{run_id}/toggl_aggregated.json`
```json
{
"run_id": "run_2025-10-14-09-27-18",
"aggregated_at": "2025-10-14T09:27:18.901445Z",
"start_date": "2025-10-13",
"end_date": "2025-10-13",
"users": {
"aleksandr.pylaev@wearevolt.com": {
"user_email": "aleksandr.pylaev@wearevolt.com",
"matched_entities": [
{
"entity_database": "Scrum",
"entity_type": "Task",
"entity_id": "456",
"project": "Moneyball",
"duration_seconds": 27594,
"entries_count": 2,
"entries": [
{
"description": "Design user interface",
"duration_seconds": 27594,
"duration_hours": 7.665,
"entry_count": 2
}
]
}
],
"unmatched_activities": [
{
"description": "Team meeting",
"duration_seconds": 4284,
"duration_hours": 1.19,
"entries_count": 1
}
],
"statistics": {
"total_duration_seconds": 31878,
"matched_duration_seconds": 27594,
"unmatched_duration_seconds": 4284,
"total_entries": 3,
"matched_entries": 2,
"unmatched_entries": 1
}
},
// ... more users
},
"statistics": {
"total_users": 13,
"total_matched_entities": 19,
"total_unmatched_activities": 15,
"total_duration_seconds": 331878,
"total_matched_duration_seconds": 299594,
"total_unmatched_duration_seconds": 32284
}
}
```
---
## Output Structures
### Raw Toggl Data (`raw_toggl_data.json`)
- **Time Entry** (individual record from Toggl API)
- **Statistics** (count and total duration)
### Aggregated Toggl Data (`toggl_aggregated.json`)
- **Per-User Data**
- Matched Entities (grouped by database/type/entity_id)
- Unmatched Activities (grouped by description)
- Statistics (user totals)
- **Global Statistics** (team totals)
### Data Flow to Next Stages
```
toggl_aggregated.json
↓
[STAGE 2: Fibery Enrichment]
- Loads toggl_aggregated.json
- For each matched entity, queries Fibery GraphQL
- Combines with enriched Fibery data
- Outputs: enriched_data.json
↓
[STAGE 3: Markdown Reports]
- Uses enriched_data + toggl_aggregated
- Generates individual and team reports
```
---
## Code Examples
### Example 1: Parsing a Description
```python
parser = FiberyParser()
# Matched entry
result = parser.parse_description("Design UI #456 [Scrum] [Task] [Moneyball]")
# {
# 'description_clean': 'Design UI',
# 'entity_id': '456',
# 'entity_database': 'Scrum',
# 'entity_type': 'Task',
# 'project': 'Moneyball',
# 'is_matched': True
# }
# Unmatched entry
result = parser.parse_description("Team sync meeting")
# {
# 'description_clean': 'Team sync meeting',
# 'entity_id': None,
# 'is_matched': False
# }
```
### Example 2: Aggregation Result
Input (5 raw entries):
```
[
{entry_id: 1, user: "alice@", description: "Task #123 [Scrum]", duration: 3600},
{entry_id: 2, user: "alice@", description: "Task #123 [Scrum]", duration: 1800},
{entry_id: 3, user: "alice@", description: "Task #456 [Scrum]", duration: 5400},
{entry_id: 4, user: "alice@", description: "Lunch", duration: 3600},
{entry_id: 5, user: "bob@", description: "Task #123 [Scrum]", duration: 2700},
]
```
Output (aggregated):
```json
{
"users": {
"alice@email.com": {
"matched_entities": [
{
"entity_id": "123",
"duration_seconds": 5400, // 3600 + 1800
"entries_count": 2,
"entries": [{"description": "Task", "duration_seconds": 5400, "entry_count": 2}]
},
{
"entity_id": "456",
"duration_seconds": 5400,
"entries_count": 1
}
],
"unmatched_activities": [
{"description": "Lunch", "duration_seconds": 3600}
]
},
"bob@email.com": {
"matched_entities": [
{"entity_id": "123", "duration_seconds": 2700}
]
}
}
}
```
---
## Performance & Scaling
### 1. Fetch Performance
**Typical Scenario**:
- Date range: 1 week (7 days)
- Users: 13
- Entries per user per day: ~5-10
**Calculation**:
- API calls: 13 users × 7 days = 91 calls
- Rate limit: 3 req/sec → ~30 seconds minimum
- With backoff delays: 40-50 seconds typical
- With occasional 429s: 50-90 seconds
### 2. Parse Performance
**Per Entry**: ~1ms (regex match + extraction)
- 1000 entries × 1ms = 1 second
### 3. Aggregation Performance
**Grouping**: O(n) where n = number of entries
- 1000 entries: < 100ms
**Total Pipeline**: ~60-120 seconds for typical week
### 4. Memory Usage
- Raw entries: ~50KB per 100 entries
- Parsed entries: ~60KB per 100 entries
- Aggregated result: ~30KB per 100 entries
- Typical: <20MB total
### 5. Bottlenecks
1. **Toggl API Rate Limiting** (3 req/sec)
- Solution: Parallelize by user (already implemented)
2. **Pagination** (50 entries per page)
- Solution: Day-by-day chunking reduces pages needed
3. **Large Descriptions** (some are very long)
- Solution: None needed, minimal impact
---
## Integration with volt-agent Pipeline
### Current Workflow (Temporal)
```
TogglFiberyPipeline (workflow)
├── cleanup_toggl_stage
├── fetch_toggl_data (Activity 1)
│ └── Calls TogglClient.get_time_entries()
│ └── Saves: raw_toggl_data.json
├── aggregate_toggl_data (Activity 2)
│ ├── Loads: raw_toggl_data.json
│ ├── Calls: FiberyParser.parse_description() for each entry
│ ├── Groups and aggregates
│ └── Saves: toggl_aggregated.json
├── generate_toggl_report (Activity 3)
│ └── Markdown summary
└── ... [next stages]
```
### MCP Integration (Future)
```
TogglFiberyPipeline (workflow)
├── Call MCP Server: get_toggl_aggregated_data(start, end)
│ └── MCP Server internally:
│ ├── Fetches from Toggl
│ ├── Parses descriptions
│ └── Aggregates results
│ └── Returns: toggl_aggregated.json
└── ... [next stages]
```
---
## Appendix: Configuration & Secrets
### Environment Variables
```bash
# Required
TOGGL_API_TOKEN=your_toggl_api_token
TOGGL_WORKSPACE_ID=1637944
# Optional
TOGGL_API_LOG_LEVEL=info
TOGGL_RETRY_MAX_ATTEMPTS=3
TOGGL_RETRY_INITIAL_BACKOFF_SECONDS=60
```
### Getting Toggl API Token
1. Go to https://toggl.com/app/profile
2. Scroll to "API Token"
3. Copy token
### Toggl API Documentation
- Reports API v3: https://engineering.toggl.com/docs/reports/timesheets/reports
- Rate Limiting: 3 requests per second per token
- Pagination: Max 50 entries per request