"""Integration tests for end-to-end workflows.
Tests complete workflows including database, adapters, sync agents, and MCP tools.
"""
# ruff: noqa: SIM117 # Allow nested context managers for readability
from unittest.mock import patch
from mcp_task_aggregator.agents import SyncAgent
from mcp_task_aggregator.models import TodoSource, TodoStatus
from mcp_task_aggregator.tools.server import list_tasks_impl, list_todos_impl, sync_tasks_impl
class TestEndToEndJiraSync:
"""Integration tests for complete Jira sync workflow."""
def test_full_jira_sync_workflow(self, temp_db, mock_jira_settings, jira_issue_factory):
"""Test complete workflow from Jira fetch to database storage."""
# Create sample Jira issues
jira_issues = [
jira_issue_factory(
key="PROJ-1",
summary="Implement feature A",
status="To Do",
priority="High",
assignee="John Doe",
assignee_email="john@example.com",
labels=["backend", "urgent"],
),
jira_issue_factory(
key="PROJ-2",
summary="Fix bug in service B",
status="In Progress",
priority="Medium",
assignee="Jane Smith",
assignee_email="jane@example.com",
),
]
with patch("mcp_task_aggregator.agents.sync_agent.get_settings", return_value=mock_jira_settings):
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_class:
# Setup mock adapter
mock_adapter = mock_adapter_class.return_value
mock_adapter.fetch_tasks.return_value = jira_issues
# Mock normalize_task to create actual Todo objects
def mock_normalize(issue):
from mcp_task_aggregator.models import Tag, Todo
return Todo(
content=issue["summary"],
status=TodoStatus.TODO if issue["status"] == "To Do" else TodoStatus.IN_PROGRESS,
priority=4 if issue["priority"] == "High" else 3,
source_system=TodoSource.JIRA,
source_id=issue["key"],
source_url=f"https://test.atlassian.net/browse/{issue['key']}",
tags=[Tag(name=label) for label in issue.get("labels", [])],
)
mock_adapter.normalize_task.side_effect = mock_normalize
# Execute sync
agent = SyncAgent(db=temp_db)
summary = agent.sync_jira()
# Verify sync summary
assert summary.status == "completed"
assert summary.tasks_synced == 2
assert summary.tasks_created == 2
assert summary.tasks_updated == 0
assert len(summary.errors) == 0
# Verify tasks in database
tasks = agent.todo_repo.list(source_system=TodoSource.JIRA)
assert len(tasks) == 2
# Verify first task
task1 = agent.todo_repo.get_by_source_id(TodoSource.JIRA, "PROJ-1")
assert task1 is not None
assert task1.content == "Implement feature A"
assert task1.status == TodoStatus.TODO
assert task1.priority == 4
# Verify second task
task2 = agent.todo_repo.get_by_source_id(TodoSource.JIRA, "PROJ-2")
assert task2 is not None
assert task2.content == "Fix bug in service B"
assert task2.status == TodoStatus.IN_PROGRESS
# Verify sync log created
logs = agent.sync_log_repo.list_logs(source_system="jira")
assert len(logs) >= 1
assert logs[0]["status"] == "completed"
def test_incremental_sync_with_updates(self, temp_db, mock_jira_settings, jira_issue_factory):
"""Test that incremental sync updates existing tasks."""
# Initial sync
initial_issue = jira_issue_factory(
key="PROJ-100",
summary="Original summary",
status="To Do",
priority="Low",
)
with patch("mcp_task_aggregator.agents.sync_agent.get_settings", return_value=mock_jira_settings):
# Create agent inside patch context so it gets mocked settings
agent = SyncAgent(db=temp_db)
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_class:
mock_adapter = mock_adapter_class.return_value
mock_adapter.fetch_tasks.return_value = [initial_issue]
mock_adapter.normalize_task.side_effect = self._mock_normalize
summary1 = agent.sync_jira()
assert summary1.tasks_created == 1
# Second sync with updated task
updated_issue = jira_issue_factory(
key="PROJ-100",
summary="Updated summary",
status="In Progress",
priority="High",
)
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_class:
mock_adapter = mock_adapter_class.return_value
mock_adapter.fetch_tasks.return_value = [updated_issue]
mock_adapter.normalize_task.side_effect = self._mock_normalize
summary2 = agent.sync_jira()
assert summary2.tasks_created == 0
assert summary2.tasks_updated == 1
# Verify update
task = agent.todo_repo.get_by_source_id(TodoSource.JIRA, "PROJ-100")
assert task.content == "Updated summary"
assert task.status == TodoStatus.IN_PROGRESS
@staticmethod
def _mock_normalize(issue):
"""Helper to mock normalize_task."""
from mcp_task_aggregator.models import Tag, Todo
status_map = {
"To Do": TodoStatus.TODO,
"In Progress": TodoStatus.IN_PROGRESS,
"Done": TodoStatus.DONE,
}
priority_map = {"High": 4, "Medium": 3, "Low": 2}
return Todo(
content=issue["summary"],
status=status_map.get(issue["status"], TodoStatus.TODO),
priority=priority_map.get(issue["priority"], 0),
source_system=TodoSource.JIRA,
source_id=issue["key"],
source_url=f"https://test.atlassian.net/browse/{issue['key']}",
tags=[Tag(name=label) for label in issue.get("labels", [])],
)
class TestMCPToolsIntegration:
"""Integration tests for MCP tools with database."""
def test_list_tasks_with_seeded_db(self, temp_db_path):
"""Test list_tasks tool with seeded database."""
with patch("mcp_task_aggregator.tools.server.get_settings") as mock_settings:
mock_settings.return_value.database_path = temp_db_path
result = list_tasks_impl()
assert "tasks" in result
assert result["total"] >= 0
assert result["limit"] == 50
assert result["offset"] == 0
def test_list_tasks_filter_integration(self, temp_db_path):
"""Test list_tasks with multiple filters."""
with patch("mcp_task_aggregator.tools.server.get_settings") as mock_settings:
mock_settings.return_value.database_path = temp_db_path
# Filter by source and status
result = list_tasks_impl(source_system="jira", status="todo")
assert "tasks" in result
for task in result["tasks"]:
assert task["source_system"] == "jira"
assert task["status"] == "todo"
def test_list_todos_only_local(self, temp_db_path):
"""Test list_todos returns only local tasks."""
with patch("mcp_task_aggregator.tools.server.get_settings") as mock_settings:
mock_settings.return_value.database_path = temp_db_path
result = list_todos_impl()
assert "tasks" in result
for task in result["tasks"]:
assert task["source_system"] == "local"
def test_sync_and_list_workflow(self, temp_db_path, mock_jira_settings, jira_issue_factory):
"""Test complete sync and list workflow."""
# Setup sync
jira_issue = jira_issue_factory(key="FLOW-1", summary="Test flow")
with patch("mcp_task_aggregator.agents.sync_agent.get_settings", return_value=mock_jira_settings):
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_class:
mock_adapter = mock_adapter_class.return_value
mock_adapter.fetch_tasks.return_value = [jira_issue]
def mock_normalize(issue):
from mcp_task_aggregator.models import Todo
return Todo(
content=issue["summary"],
status=TodoStatus.TODO,
priority=3,
source_system=TodoSource.JIRA,
source_id=issue["key"],
source_url=f"https://test.atlassian.net/browse/{issue['key']}",
)
mock_adapter.normalize_task.side_effect = mock_normalize
# Execute sync via tool
with patch("mcp_task_aggregator.tools.server.get_settings") as tool_settings:
tool_settings.return_value = mock_jira_settings
tool_settings.return_value.database_path = temp_db_path
sync_result = sync_tasks_impl(source="jira")
assert sync_result["total_synced"] == 1
assert sync_result["total_created"] == 1
# List tasks via tool - patch the agent's get_settings, not server's
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = temp_db_path
list_result = list_tasks_impl(source_system="jira")
assert list_result["total"] == 1
assert list_result["tasks"][0]["source_id"] == "FLOW-1"
class TestDatabasePersistence:
"""Integration tests for database persistence."""
def test_task_persistence_across_sessions(self, temp_db_path):
"""Test that tasks persist across database connections."""
from mcp_task_aggregator.storage import Database, TodoRepository
# First session - create task
db1 = Database(temp_db_path)
repo1 = TodoRepository(db1)
task1 = repo1.create(content="Persistent task", priority=5)
task1_id = task1.id
db1.close()
# Second session - retrieve task
db2 = Database(temp_db_path)
repo2 = TodoRepository(db2)
task2 = repo2.get_by_id(task1_id)
db2.close()
assert task2 is not None
assert task2.content == "Persistent task"
assert task2.priority == 5
def test_sync_log_persistence(self, temp_db_path):
"""Test that sync logs persist across sessions."""
from mcp_task_aggregator.storage import Database, SyncLogRepository
# First session - create log
db1 = Database(temp_db_path)
log_repo1 = SyncLogRepository(db1)
log_id = log_repo1.create_log("jira", "full")
log_repo1.update_log(log_id, status="completed", tasks_synced=10)
db1.close()
# Second session - retrieve log
db2 = Database(temp_db_path)
log_repo2 = SyncLogRepository(db2)
latest = log_repo2.get_latest_sync("jira")
db2.close()
assert latest is not None
assert latest["tasks_synced"] == 10
assert latest["status"] == "completed"
class TestErrorHandlingIntegration:
"""Integration tests for error handling across layers."""
def test_sync_with_invalid_jira_data(self, temp_db, mock_jira_settings):
"""Test sync handles invalid Jira data gracefully."""
invalid_issue = {
"key": "BAD-1",
"summary": None, # Invalid: null summary
}
with patch("mcp_task_aggregator.agents.sync_agent.get_settings", return_value=mock_jira_settings):
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_class:
mock_adapter = mock_adapter_class.return_value
mock_adapter.fetch_tasks.return_value = [invalid_issue]
mock_adapter.normalize_task.side_effect = ValueError("Invalid data")
agent = SyncAgent(db=temp_db)
summary = agent.sync_jira()
assert summary.status == "failed"
assert len(summary.errors) > 0
assert summary.tasks_synced == 0
def test_list_tasks_with_database_error(self, temp_db_path):
"""Test list_tasks handles database errors gracefully."""
with patch("mcp_task_aggregator.tools.server.get_settings") as mock_settings:
mock_settings.return_value.database_path = temp_db_path
with patch("mcp_task_aggregator.tools.server.SyncAgent") as mock_agent_class:
mock_agent_class.return_value.list_tasks.side_effect = Exception("Database error")
result = list_tasks_impl()
assert "error" in result
assert result["total"] == 0
assert result["tasks"] == []
class TestConcurrentOperations:
"""Integration tests for concurrent database operations."""
def test_concurrent_reads(self, temp_db_path):
"""Test that concurrent reads work correctly."""
from mcp_task_aggregator.storage import Database, TodoRepository
# Create multiple database connections
db1 = Database(temp_db_path)
db2 = Database(temp_db_path)
repo1 = TodoRepository(db1)
repo2 = TodoRepository(db2)
# Read from both connections
tasks1 = repo1.list(limit=10)
tasks2 = repo2.list(limit=10)
assert len(tasks1) == len(tasks2)
db1.close()
db2.close()
def test_write_then_read_different_connection(self, temp_db_path):
"""Test write in one connection is visible in another."""
from mcp_task_aggregator.storage import Database, TodoRepository
# Write with first connection
db1 = Database(temp_db_path)
repo1 = TodoRepository(db1)
task = repo1.create(content="Concurrent test", priority=3)
task_id = task.id
db1.close()
# Read with second connection
db2 = Database(temp_db_path)
repo2 = TodoRepository(db2)
found = repo2.get_by_id(task_id)
db2.close()
assert found is not None
assert found.content == "Concurrent test"