"""Tests for SyncAgent.
Comprehensive tests for task synchronization orchestration.
"""
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from mcp_task_aggregator.agents import SyncAgent
from mcp_task_aggregator.models import TodoSource, TodoStatus
from mcp_task_aggregator.storage import Database
@pytest.fixture
def temp_db():
"""Create a temporary database for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
yield db, db_path
db.close()
@pytest.fixture
def sync_agent(temp_db):
"""Create a SyncAgent with temp database."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = False
mock_settings.return_value.github_configured = False
mock_settings.return_value.linear_configured = False
mock_settings.return_value.markdown_configured = False
mock_settings.return_value.stm_configured = False
agent = SyncAgent(db=db)
yield agent
class TestSyncAgentInitialization:
"""Tests for SyncAgent initialization."""
def test_sync_agent_with_provided_db(self, temp_db):
"""Test SyncAgent initialization with provided database."""
db, db_path = temp_db
agent = SyncAgent(db=db)
assert agent._db is db
assert agent.db is db
def test_sync_agent_lazy_db_creation(self):
"""Test SyncAgent creates database lazily."""
with (
patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings,
tempfile.TemporaryDirectory() as tmpdir,
):
db_path = Path(tmpdir) / "lazy.db"
mock_settings.return_value.database_path = db_path
agent = SyncAgent()
assert agent._db is None
# Access db property to trigger lazy creation
db = agent.db
assert db is not None
assert agent._db is db
def test_sync_agent_lazy_repo_creation(self, sync_agent):
"""Test SyncAgent creates repositories lazily."""
assert sync_agent._todo_repo is None
assert sync_agent._sync_log_repo is None
# Access properties to trigger lazy creation
todo_repo = sync_agent.todo_repo
sync_log_repo = sync_agent.sync_log_repo
assert todo_repo is not None
assert sync_log_repo is not None
assert sync_agent._todo_repo is todo_repo
assert sync_agent._sync_log_repo is sync_log_repo
class TestSyncJira:
"""Tests for Jira synchronization."""
def test_sync_jira_not_configured(self, sync_agent):
"""Test sync_jira fails gracefully when not configured."""
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.jira_configured = False
summary = sync_agent.sync_jira()
assert summary.source_system == "jira"
assert summary.status == "failed"
assert summary.tasks_synced == 0
assert len(summary.errors) == 1
assert "not configured" in summary.errors[0]["error"].lower()
def test_sync_jira_creates_sync_log(self, temp_db):
"""Test sync_jira creates sync log entry even when it fails."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = False
agent = SyncAgent(db=db)
summary = agent.sync_jira()
# Verify summary shows failure
assert summary.status == "failed"
assert summary.tasks_synced == 0
def test_sync_summary_to_dict(self, sync_agent):
"""Test SyncSummary to_dict conversion."""
summary = sync_agent.sync_jira()
result = summary.to_dict()
assert "source_system" in result
assert "tasks_synced" in result
assert "tasks_created" in result
assert "tasks_updated" in result
assert "errors" in result
assert "duration_ms" in result
assert "status" in result
class TestListTasks:
"""Tests for list_tasks method."""
def test_list_tasks_all(self, sync_agent):
"""Test listing all tasks."""
# Create some test tasks
sync_agent.todo_repo.create(content="Task 1", priority=3)
sync_agent.todo_repo.create(content="Task 2", priority=5)
tasks = sync_agent.list_tasks()
assert len(tasks) == 2
def test_list_tasks_filter_by_source(self, sync_agent):
"""Test filtering tasks by source system."""
sync_agent.todo_repo.create(content="Local task", source_system=TodoSource.LOCAL)
sync_agent.todo_repo.create(content="Jira task", source_system=TodoSource.JIRA, source_id="TEST-1")
local_tasks = sync_agent.list_tasks(source_system="local")
jira_tasks = sync_agent.list_tasks(source_system="jira")
assert len(local_tasks) == 1
assert len(jira_tasks) == 1
assert local_tasks[0].content == "Local task"
assert jira_tasks[0].content == "Jira task"
def test_list_tasks_filter_by_status(self, sync_agent):
"""Test filtering tasks by status."""
sync_agent.todo_repo.create(content="Todo task", status=TodoStatus.TODO)
sync_agent.todo_repo.create(content="Done task", status=TodoStatus.DONE)
todo_tasks = sync_agent.list_tasks(status="todo")
done_tasks = sync_agent.list_tasks(status="done")
assert len(todo_tasks) == 1
assert len(done_tasks) == 1
def test_list_tasks_filter_by_priority(self, sync_agent):
"""Test filtering tasks by minimum priority."""
sync_agent.todo_repo.create(content="Low priority", priority=1)
sync_agent.todo_repo.create(content="High priority", priority=5)
high_priority_tasks = sync_agent.list_tasks(priority=4)
assert len(high_priority_tasks) == 1
assert high_priority_tasks[0].content == "High priority"
def test_list_tasks_filter_by_tags(self, sync_agent):
"""Test filtering tasks by tags."""
sync_agent.todo_repo.create(content="Backend task", tags=["backend", "urgent"])
sync_agent.todo_repo.create(content="Frontend task", tags=["frontend"])
backend_tasks = sync_agent.list_tasks(tags=["backend"])
assert len(backend_tasks) == 1
assert backend_tasks[0].content == "Backend task"
def test_list_tasks_pagination(self, sync_agent):
"""Test task list pagination."""
for i in range(5):
sync_agent.todo_repo.create(content=f"Task {i}", priority=i)
# First page
page1 = sync_agent.list_tasks(limit=2, offset=0)
# Second page
page2 = sync_agent.list_tasks(limit=2, offset=2)
assert len(page1) == 2
assert len(page2) == 2
# Verify different tasks
assert page1[0].content != page2[0].content
class TestSyncAll:
"""Tests for sync_all method."""
def test_sync_all_no_sources_configured(self, sync_agent):
"""Test sync_all with no sources configured."""
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.jira_configured = False
mock_settings.return_value.github_configured = False
mock_settings.return_value.linear_configured = False
mock_settings.return_value.markdown_configured = False
mock_settings.return_value.stm_configured = False
summaries = sync_agent.sync_all()
assert len(summaries) == 0
def test_sync_all_jira_configured(self, temp_db):
"""Test sync_all includes Jira when configured."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = True
mock_settings.return_value.jira_url = "https://test.atlassian.net"
mock_settings.return_value.jira_email = "test@example.com"
mock_settings.return_value.jira_api_token = "token"
mock_settings.return_value.jira_project_key = "TEST"
mock_settings.return_value.github_configured = False
mock_settings.return_value.linear_configured = False
mock_settings.return_value.markdown_configured = False
mock_settings.return_value.stm_configured = False
agent = SyncAgent(db=db)
with patch.object(agent, "sync_jira") as mock_sync:
from mcp_task_aggregator.agents.sync_agent import SyncSummary
mock_sync.return_value = SyncSummary(source_system="jira")
summaries = agent.sync_all()
assert len(summaries) == 1
assert summaries[0].source_system == "jira"
mock_sync.assert_called_once()
def test_sync_all_markdown_configured(self, temp_db):
"""Test sync_all includes markdown when configured."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = False
mock_settings.return_value.github_configured = False
mock_settings.return_value.linear_configured = False
mock_settings.return_value.markdown_configured = True
mock_settings.return_value.stm_configured = False
agent = SyncAgent(db=db)
with patch.object(agent, "sync_markdown") as mock_sync:
from mcp_task_aggregator.agents.sync_agent import SyncSummary
mock_sync.return_value = SyncSummary(source_system="markdown")
summaries = agent.sync_all()
assert len(summaries) == 1
assert summaries[0].source_system == "markdown"
mock_sync.assert_called_once()
def test_sync_all_stm_configured(self, temp_db):
"""Test sync_all includes STM when configured."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = False
mock_settings.return_value.github_configured = False
mock_settings.return_value.linear_configured = False
mock_settings.return_value.markdown_configured = False
mock_settings.return_value.stm_configured = True
agent = SyncAgent(db=db)
with patch.object(agent, "sync_stm") as mock_sync:
from mcp_task_aggregator.agents.sync_agent import SyncSummary
mock_sync.return_value = SyncSummary(source_system="stm")
summaries = agent.sync_all()
assert len(summaries) == 1
assert summaries[0].source_system == "stm"
mock_sync.assert_called_once()
class TestSyncMarkdown:
"""Tests for markdown synchronization."""
def test_sync_markdown_not_configured(self, sync_agent):
"""Test sync_markdown returns skipped when not configured."""
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.markdown_configured = False
summary = sync_agent.sync_markdown()
assert summary.source_system == "markdown"
assert summary.status == "skipped"
assert summary.tasks_synced == 0
def test_sync_markdown_success(self, temp_db):
"""Test successful markdown sync with mocked adapter."""
from mcp_task_aggregator.models import Tag, Todo, TodoSource, TodoStatus
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.markdown_configured = True
mock_settings.return_value.markdown_search_paths = ["/fake/path"]
mock_settings.return_value.markdown_file_patterns = ["*.md"]
agent = SyncAgent(db=db)
# Mock the adapter
with patch("mcp_task_aggregator.agents.sync_agent.MarkdownAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.return_value = [
{"content": "Test task", "file_path": "/fake/file.md", "line_number": 1}
]
mock_adapter.normalize_task.return_value = Todo(
content="Test task from markdown",
status=TodoStatus.TODO,
source_system=TodoSource.MARKDOWN,
source_id="md:fake/file.md:1",
tags=[Tag(name="markdown")],
)
summary = agent.sync_markdown()
assert summary.source_system == "markdown"
assert summary.status == "completed"
assert summary.tasks_synced == 1
assert summary.tasks_created == 1
assert len(summary.errors) == 0
def test_sync_markdown_with_errors(self, temp_db):
"""Test markdown sync handles task processing errors gracefully."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.markdown_configured = True
mock_settings.return_value.markdown_search_paths = ["/fake/path"]
mock_settings.return_value.markdown_file_patterns = ["*.md"]
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.MarkdownAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.return_value = [
{"content": "Bad task", "file_path": "/fake/bad.md", "line_number": 1}
]
mock_adapter.normalize_task.side_effect = ValueError("Invalid task format")
summary = agent.sync_markdown()
assert summary.source_system == "markdown"
assert summary.status == "failed"
assert summary.tasks_synced == 0
assert len(summary.errors) == 1
def test_sync_markdown_adapter_failure(self, temp_db):
"""Test markdown sync handles adapter failures."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.markdown_configured = True
mock_settings.return_value.markdown_search_paths = ["/fake/path"]
mock_settings.return_value.markdown_file_patterns = ["*.md"]
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.MarkdownAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.side_effect = RuntimeError("Adapter failed")
summary = agent.sync_markdown()
assert summary.source_system == "markdown"
assert summary.status == "failed"
assert len(summary.errors) == 1
assert "Adapter failed" in summary.errors[0]["error"]
class TestSyncStm:
"""Tests for STM synchronization."""
def test_sync_stm_not_configured(self, sync_agent):
"""Test sync_stm returns skipped when not configured."""
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.stm_configured = False
summary = sync_agent.sync_stm()
assert summary.source_system == "stm"
assert summary.status == "skipped"
assert summary.tasks_synced == 0
def test_sync_stm_success(self, temp_db):
"""Test successful STM sync with mocked adapter."""
from mcp_task_aggregator.models import Tag, Todo, TodoSource, TodoStatus
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.stm_configured = True
mock_settings.return_value.stm_search_paths = ["/fake/workspace"]
mock_settings.return_value.stm_binary = "stm"
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.StmAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.return_value = [
{"id": "task-1", "content": "STM task", "_workspace_path": "/fake/workspace"}
]
mock_adapter.normalize_task.return_value = Todo(
content="Test task from STM",
status=TodoStatus.TODO,
source_system=TodoSource.STM,
source_id="stm:task-1",
tags=[Tag(name="stm")],
)
summary = agent.sync_stm()
assert summary.source_system == "stm"
assert summary.status == "completed"
assert summary.tasks_synced == 1
assert summary.tasks_created == 1
assert len(summary.errors) == 0
def test_sync_stm_with_errors(self, temp_db):
"""Test STM sync handles task processing errors gracefully."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.stm_configured = True
mock_settings.return_value.stm_search_paths = ["/fake/workspace"]
mock_settings.return_value.stm_binary = "stm"
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.StmAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.return_value = [
{"id": "bad-task", "content": "Bad", "_workspace_path": "/fake"}
]
mock_adapter.normalize_task.side_effect = ValueError("Invalid STM task")
summary = agent.sync_stm()
assert summary.source_system == "stm"
assert summary.status == "failed"
assert summary.tasks_synced == 0
assert len(summary.errors) == 1
def test_sync_stm_adapter_failure(self, temp_db):
"""Test STM sync handles adapter failures."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.stm_configured = True
mock_settings.return_value.stm_search_paths = ["/fake/workspace"]
mock_settings.return_value.stm_binary = "stm"
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.StmAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.side_effect = RuntimeError("STM binary not found")
summary = agent.sync_stm()
assert summary.source_system == "stm"
assert summary.status == "failed"
assert len(summary.errors) == 1
assert "STM binary not found" in summary.errors[0]["error"]
class TestSyncJiraSuccess:
"""Tests for successful Jira synchronization flows."""
def test_sync_jira_success(self, temp_db):
"""Test successful Jira sync with mocked adapter."""
from mcp_task_aggregator.models import Tag, Todo, TodoSource, TodoStatus
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = True
mock_settings.return_value.jira_url = "https://test.atlassian.net"
mock_settings.return_value.jira_email = "test@example.test"
mock_settings.return_value.jira_api_token = "fake-token"
mock_settings.return_value.jira_project_key = "TEST"
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.return_value = [
{"key": "TEST-1", "summary": "Test issue"}
]
mock_adapter.normalize_task.return_value = Todo(
content="[TEST-1] Test issue",
status=TodoStatus.TODO,
source_system=TodoSource.JIRA,
source_id="TEST-1",
source_url="https://test.atlassian.net/browse/TEST-1",
tags=[Tag(name="jira")],
)
summary = agent.sync_jira()
assert summary.source_system == "jira"
assert summary.status == "completed"
assert summary.tasks_synced == 1
assert summary.tasks_created == 1
assert len(summary.errors) == 0
def test_sync_jira_with_task_processing_error(self, temp_db):
"""Test Jira sync handles individual task errors gracefully."""
from mcp_task_aggregator.models import Tag, Todo, TodoSource, TodoStatus
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = True
mock_settings.return_value.jira_url = "https://test.atlassian.net"
mock_settings.return_value.jira_email = "test@example.test"
mock_settings.return_value.jira_api_token = "fake-token"
mock_settings.return_value.jira_project_key = "TEST"
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.return_value = [
{"key": "TEST-1", "summary": "Good issue"},
{"key": "TEST-2", "summary": "Bad issue"},
]
def normalize_side_effect(raw_task):
if raw_task["key"] == "TEST-2":
raise ValueError("Invalid task")
return Todo(
content=f"[{raw_task['key']}] {raw_task['summary']}",
status=TodoStatus.TODO,
source_system=TodoSource.JIRA,
source_id=raw_task["key"],
tags=[Tag(name="jira")],
)
mock_adapter.normalize_task.side_effect = normalize_side_effect
summary = agent.sync_jira()
assert summary.source_system == "jira"
assert summary.status == "partial"
assert summary.tasks_synced == 1
assert summary.tasks_created == 1
assert len(summary.errors) == 1
assert summary.errors[0]["task_key"] == "TEST-2"
def test_sync_jira_adapter_failure(self, temp_db):
"""Test Jira sync handles adapter failures."""
db, db_path = temp_db
with patch("mcp_task_aggregator.agents.sync_agent.get_settings") as mock_settings:
mock_settings.return_value.database_path = db_path
mock_settings.return_value.jira_configured = True
mock_settings.return_value.jira_url = "https://test.atlassian.net"
mock_settings.return_value.jira_email = "test@example.test"
mock_settings.return_value.jira_api_token = "fake-token"
mock_settings.return_value.jira_project_key = "TEST"
agent = SyncAgent(db=db)
with patch("mcp_task_aggregator.agents.sync_agent.JiraAdapter") as mock_adapter_cls:
mock_adapter = mock_adapter_cls.return_value
mock_adapter.fetch_tasks.side_effect = RuntimeError("Connection failed")
summary = agent.sync_jira()
assert summary.source_system == "jira"
assert summary.status == "failed"
assert len(summary.errors) == 1
assert "Connection failed" in summary.errors[0]["error"]