test_aggregator_service.py•6.87 kB
"""Unit tests for AggregatorService."""
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../src"))
from toggl_mcp.services.aggregator_service import AggregatorService
from toggl_mcp.models import ParsedEntry
def test_aggregator_basic():
"""Test basic aggregation."""
aggregator = AggregatorService()
# Create test entries
parsed = [
ParsedEntry(
description_clean="Task",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
ParsedEntry(
description_clean="Lunch",
is_matched=False,
),
]
time_entries = [
{
"user_email": "alice@example.com",
"description": "Task #123 [Scrum] [Task]",
"duration": 3600, # 1 hour
},
{
"user_email": "alice@example.com",
"description": "Lunch",
"duration": 1800, # 0.5 hour
},
]
result = aggregator.aggregate(parsed, time_entries)
assert len(result["users"]) == 1
assert "alice@example.com" in result["users"]
user_data = result["users"]["alice@example.com"]
assert len(user_data["matched_entities"]) == 1
assert len(user_data["unmatched_activities"]) == 1
assert user_data["statistics"]["total_duration_hours"] == 1.5
assert user_data["statistics"]["matched_duration_hours"] == 1.0
assert user_data["statistics"]["unmatched_duration_hours"] == 0.5
def test_aggregator_multiple_users():
"""Test aggregation with multiple users."""
aggregator = AggregatorService()
parsed = [
ParsedEntry(
description_clean="Task",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
ParsedEntry(
description_clean="Task",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
]
time_entries = [
{
"user_email": "alice@example.com",
"description": "Task #123 [Scrum] [Task]",
"duration": 3600,
},
{
"user_email": "bob@example.com",
"description": "Task #123 [Scrum] [Task]",
"duration": 1800,
},
]
result = aggregator.aggregate(parsed, time_entries)
assert len(result["users"]) == 2
assert result["statistics"]["total_users"] == 2
assert result["statistics"]["total_duration_hours"] == 1.5
def test_aggregator_grouping_by_description():
"""Test that identical entities with different descriptions are grouped separately."""
aggregator = AggregatorService()
parsed = [
ParsedEntry(
description_clean="Task 1",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
ParsedEntry(
description_clean="Task 1",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
ParsedEntry(
description_clean="Another Task",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
]
time_entries = [
{
"user_email": "alice@example.com",
"description": "Task 1 #123 [Scrum] [Task]",
"duration": 3600,
},
{
"user_email": "alice@example.com",
"description": "Task 1 #123 [Scrum] [Task]",
"duration": 1800,
},
{
"user_email": "alice@example.com",
"description": "Another Task #123 [Scrum] [Task]",
"duration": 1800,
},
]
result = aggregator.aggregate(parsed, time_entries)
user_data = result["users"]["alice@example.com"]
assert len(user_data["matched_entities"]) == 2
# Find the two entries
entries_by_desc = {e["description"]: e for e in user_data["matched_entities"]}
assert entries_by_desc["Task 1"]["duration_hours"] == 1.5 # 3600 + 1800 = 5400 / 3600 = 1.5
assert entries_by_desc["Another Task"]["duration_hours"] == 0.5 # 1800 / 3600 = 0.5
def test_aggregator_unmatched_grouping():
"""Test that unmatched activities are grouped by description."""
aggregator = AggregatorService()
parsed = [
ParsedEntry(description_clean="Lunch", is_matched=False),
ParsedEntry(description_clean="Lunch", is_matched=False),
ParsedEntry(description_clean="Meeting", is_matched=False),
]
time_entries = [
{
"user_email": "alice@example.com",
"description": "Lunch",
"duration": 1800,
},
{
"user_email": "alice@example.com",
"description": "Lunch",
"duration": 1800,
},
{
"user_email": "alice@example.com",
"description": "Meeting",
"duration": 3600,
},
]
result = aggregator.aggregate(parsed, time_entries)
user_data = result["users"]["alice@example.com"]
assert len(user_data["unmatched_activities"]) == 2
activities_by_desc = {a["description"]: a for a in user_data["unmatched_activities"]}
assert activities_by_desc["Lunch"]["duration_hours"] == 1.0
assert activities_by_desc["Meeting"]["duration_hours"] == 1.0
def test_aggregator_statistics():
"""Test that statistics are calculated correctly."""
aggregator = AggregatorService()
parsed = [
ParsedEntry(
description_clean="Task",
entity_id="123",
entity_database="Scrum",
entity_type="Task",
is_matched=True,
),
ParsedEntry(description_clean="Unmatched", is_matched=False),
]
time_entries = [
{
"user_email": "alice@example.com",
"description": "Task #123 [Scrum] [Task]",
"duration": 5400, # 1.5 hours
},
{
"user_email": "alice@example.com",
"description": "Unmatched",
"duration": 3600, # 1 hour
},
]
result = aggregator.aggregate(parsed, time_entries)
assert result["statistics"]["total_duration_hours"] == 2.5
assert result["statistics"]["total_matched_duration_hours"] == 1.5
assert result["statistics"]["total_unmatched_duration_hours"] == 1.0
if __name__ == "__main__":
test_aggregator_basic()
test_aggregator_multiple_users()
test_aggregator_grouping_by_description()
test_aggregator_unmatched_grouping()
test_aggregator_statistics()
print("All aggregator tests passed!")