Skip to main content
Glama

Toggl MCP Server

by ikido
aggregator_service.py7.09 kB
"""Aggregator service for grouping and computing statistics on time entries.""" from typing import List, Dict, Any from collections import defaultdict from ..models import ( ParsedEntry, TimeEntry, MatchedEntity, UnmatchedActivity, UserStatistics, UserAggregation, OverallStatistics, AggregatedData, ) class AggregatorService: """Aggregates time entries by user and entity.""" def aggregate( self, parsed_entries: List[ParsedEntry], time_entries: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Aggregate time entries grouped by user and entity. Rules: 1. Group by user (email or user_id) 2. For matched entries: group by (database, type, entity_id, description_clean) 3. For unmatched entries: group by description_clean 4. Calculate duration_hours (duration_seconds / 3600) Args: parsed_entries: List of ParsedEntry objects time_entries: List of raw time entry dicts from Toggl API Note: Toggl Reports API returns nested structure with 'time_entries' array Returns: Dict with 'users' and 'statistics' keys """ # Group by user users_data: Dict[str, Dict[str, Any]] = defaultdict( lambda: {"matched_entities": defaultdict(float), "unmatched_activities": defaultdict(float)} ) # Process each entry skipped_count = 0 processed_count = 0 from ..utils import log_info for i, (parsed, time_entry) in enumerate(zip(parsed_entries, time_entries)): # Toggl Reports API returns entries with nested 'time_entries' array # Each "time_entry" in the response is actually a group with multiple nested time entries nested_entries = time_entry.get("time_entries", []) if i < 3: log_info(f"Entry {i}: has {len(nested_entries)} nested time_entries, keys={list(time_entry.keys())[:5]}") # If no nested entries, this might be a flat structure - process as single entry if not nested_entries: nested_entries = [time_entry] # Process each nested time entry for j, nested_entry in enumerate(nested_entries): # Get user identifier user_identifier = nested_entry.get("user_id") or time_entry.get("user_id") if not user_identifier: user_identifier = nested_entry.get("username") or time_entry.get("username") or "unknown" # Get duration - try various field names duration_ms = nested_entry.get("duration_ms") or nested_entry.get("dur") or nested_entry.get("duration") if duration_ms is None: duration_ms = time_entry.get("duration_ms") or time_entry.get("dur") or time_entry.get("duration", 0) # Log first nested entries to see structure if i < 2 and j == 0: log_info(f"Nested entry {i}.0: user={user_identifier}, duration_ms={duration_ms}, nested_keys={list(nested_entry.keys())}") # Convert to seconds if in milliseconds if duration_ms and duration_ms > 100000: # Likely milliseconds duration_seconds = duration_ms / 1000 else: duration_seconds = duration_ms or 0 # Skip entries with invalid duration if duration_seconds <= 0: skipped_count += 1 continue processed_count += 1 duration_hours = duration_seconds / 3600.0 # Group by parsed entity data if parsed.is_matched: # Matched entry - group by (db, type, entity_id, description_clean) key = ( parsed.entity_database, parsed.entity_type, parsed.entity_id, parsed.description_clean, ) users_data[user_identifier]["matched_entities"][key] += duration_hours else: # Unmatched entry - group by description_clean users_data[user_identifier]["unmatched_activities"][parsed.description_clean] += ( duration_hours ) # Log processing summary from ..utils import log_info log_info(f"Aggregation: processed {processed_count} entries, skipped {skipped_count} entries, got {len(users_data)} users") # Build response users_response = {} total_matched_hours = 0.0 total_unmatched_hours = 0.0 for user_identifier, data in users_data.items(): # Build matched entities list matched_entities = [] for (db, entity_type, entity_id, desc_clean), hours in data[ "matched_entities" ].items(): matched_entities.append( MatchedEntity( entity_database=db, entity_type=entity_type, entity_id=entity_id, description=desc_clean, duration_hours=round(hours, 3), ) ) total_matched_hours += hours # Build unmatched activities list unmatched_activities = [] for desc, hours in data["unmatched_activities"].items(): unmatched_activities.append( UnmatchedActivity(description=desc, duration_hours=round(hours, 3)) ) total_unmatched_hours += hours # Calculate user statistics total_hours = sum(e.duration_hours for e in matched_entities) + sum( a.duration_hours for a in unmatched_activities ) user_stats = UserStatistics( total_duration_hours=round(total_hours, 3), matched_duration_hours=sum(e.duration_hours for e in matched_entities), unmatched_duration_hours=sum(a.duration_hours for a in unmatched_activities), ) users_response[user_identifier] = UserAggregation( user_email=user_identifier, matched_entities=matched_entities, unmatched_activities=unmatched_activities, statistics=user_stats, ).model_dump() # Calculate overall statistics total_hours = total_matched_hours + total_unmatched_hours overall_stats = OverallStatistics( total_users=len(users_response), total_duration_hours=round(total_hours, 3), total_matched_duration_hours=round(total_matched_hours, 3), total_unmatched_duration_hours=round(total_unmatched_hours, 3), ) return { "users": users_response, "statistics": overall_stats.model_dump(), }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ikido/toggl-mcp-custom'

If you have feedback or need assistance with the MCP directory API, please join our Discord server