aggregator_service.py•7.09 kB
"""Aggregator service for grouping and computing statistics on time entries."""
from typing import List, Dict, Any
from collections import defaultdict
from ..models import (
ParsedEntry,
TimeEntry,
MatchedEntity,
UnmatchedActivity,
UserStatistics,
UserAggregation,
OverallStatistics,
AggregatedData,
)
class AggregatorService:
"""Aggregates time entries by user and entity."""
def aggregate(
self, parsed_entries: List[ParsedEntry], time_entries: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Aggregate time entries grouped by user and entity.
Rules:
1. Group by user (email or user_id)
2. For matched entries: group by (database, type, entity_id, description_clean)
3. For unmatched entries: group by description_clean
4. Calculate duration_hours (duration_seconds / 3600)
Args:
parsed_entries: List of ParsedEntry objects
time_entries: List of raw time entry dicts from Toggl API
Note: Toggl Reports API returns nested structure with 'time_entries' array
Returns:
Dict with 'users' and 'statistics' keys
"""
# Group by user
users_data: Dict[str, Dict[str, Any]] = defaultdict(
lambda: {"matched_entities": defaultdict(float), "unmatched_activities": defaultdict(float)}
)
# Process each entry
skipped_count = 0
processed_count = 0
from ..utils import log_info
for i, (parsed, time_entry) in enumerate(zip(parsed_entries, time_entries)):
# Toggl Reports API returns entries with nested 'time_entries' array
# Each "time_entry" in the response is actually a group with multiple nested time entries
nested_entries = time_entry.get("time_entries", [])
if i < 3:
log_info(f"Entry {i}: has {len(nested_entries)} nested time_entries, keys={list(time_entry.keys())[:5]}")
# If no nested entries, this might be a flat structure - process as single entry
if not nested_entries:
nested_entries = [time_entry]
# Process each nested time entry
for j, nested_entry in enumerate(nested_entries):
# Get user identifier
user_identifier = nested_entry.get("user_id") or time_entry.get("user_id")
if not user_identifier:
user_identifier = nested_entry.get("username") or time_entry.get("username") or "unknown"
# Get duration - try various field names
duration_ms = nested_entry.get("duration_ms") or nested_entry.get("dur") or nested_entry.get("duration")
if duration_ms is None:
duration_ms = time_entry.get("duration_ms") or time_entry.get("dur") or time_entry.get("duration", 0)
# Log first nested entries to see structure
if i < 2 and j == 0:
log_info(f"Nested entry {i}.0: user={user_identifier}, duration_ms={duration_ms}, nested_keys={list(nested_entry.keys())}")
# Convert to seconds if in milliseconds
if duration_ms and duration_ms > 100000: # Likely milliseconds
duration_seconds = duration_ms / 1000
else:
duration_seconds = duration_ms or 0
# Skip entries with invalid duration
if duration_seconds <= 0:
skipped_count += 1
continue
processed_count += 1
duration_hours = duration_seconds / 3600.0
# Group by parsed entity data
if parsed.is_matched:
# Matched entry - group by (db, type, entity_id, description_clean)
key = (
parsed.entity_database,
parsed.entity_type,
parsed.entity_id,
parsed.description_clean,
)
users_data[user_identifier]["matched_entities"][key] += duration_hours
else:
# Unmatched entry - group by description_clean
users_data[user_identifier]["unmatched_activities"][parsed.description_clean] += (
duration_hours
)
# Log processing summary
from ..utils import log_info
log_info(f"Aggregation: processed {processed_count} entries, skipped {skipped_count} entries, got {len(users_data)} users")
# Build response
users_response = {}
total_matched_hours = 0.0
total_unmatched_hours = 0.0
for user_identifier, data in users_data.items():
# Build matched entities list
matched_entities = []
for (db, entity_type, entity_id, desc_clean), hours in data[
"matched_entities"
].items():
matched_entities.append(
MatchedEntity(
entity_database=db,
entity_type=entity_type,
entity_id=entity_id,
description=desc_clean,
duration_hours=round(hours, 3),
)
)
total_matched_hours += hours
# Build unmatched activities list
unmatched_activities = []
for desc, hours in data["unmatched_activities"].items():
unmatched_activities.append(
UnmatchedActivity(description=desc, duration_hours=round(hours, 3))
)
total_unmatched_hours += hours
# Calculate user statistics
total_hours = sum(e.duration_hours for e in matched_entities) + sum(
a.duration_hours for a in unmatched_activities
)
user_stats = UserStatistics(
total_duration_hours=round(total_hours, 3),
matched_duration_hours=sum(e.duration_hours for e in matched_entities),
unmatched_duration_hours=sum(a.duration_hours for a in unmatched_activities),
)
users_response[user_identifier] = UserAggregation(
user_email=user_identifier,
matched_entities=matched_entities,
unmatched_activities=unmatched_activities,
statistics=user_stats,
).model_dump()
# Calculate overall statistics
total_hours = total_matched_hours + total_unmatched_hours
overall_stats = OverallStatistics(
total_users=len(users_response),
total_duration_hours=round(total_hours, 3),
total_matched_duration_hours=round(total_matched_hours, 3),
total_unmatched_duration_hours=round(total_unmatched_hours, 3),
)
return {
"users": users_response,
"statistics": overall_stats.model_dump(),
}