"""Sync Agent for orchestrating task synchronization.
Coordinates adapter calls, normalization, and database upserts.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from loguru import logger
from mcp_task_aggregator.adapters import (
JiraAdapter,
JiraConfig,
MarkdownAdapter,
MarkdownConfig,
StmAdapter,
StmConfig,
)
from mcp_task_aggregator.config import get_settings
from mcp_task_aggregator.models import Todo, TodoSource
from mcp_task_aggregator.storage import Database, SyncLogRepository, TodoRepository
@dataclass
class SyncSummary:
"""Summary of a sync operation."""
source_system: str
tasks_synced: int = 0
tasks_created: int = 0
tasks_updated: int = 0
errors: list[dict[str, Any]] = field(default_factory=list)
duration_ms: int = 0
status: str = "completed"
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"source_system": self.source_system,
"tasks_synced": self.tasks_synced,
"tasks_created": self.tasks_created,
"tasks_updated": self.tasks_updated,
"errors": self.errors,
"duration_ms": self.duration_ms,
"status": self.status,
}
class SyncAgent:
"""Agent that orchestrates synchronization from external systems."""
def __init__(self, db: Database | None = None) -> None:
"""Initialize the sync agent.
Args:
db: Database instance. If not provided, creates one using settings.
"""
self.settings = get_settings()
self._db = db
self._todo_repo: TodoRepository | None = None
self._sync_log_repo: SyncLogRepository | None = None
@property
def db(self) -> Database:
"""Get or create database connection."""
if self._db is None:
self._db = Database(self.settings.database_path)
return self._db
@property
def todo_repo(self) -> TodoRepository:
"""Get or create todo repository."""
if self._todo_repo is None:
self._todo_repo = TodoRepository(self.db)
return self._todo_repo
@property
def sync_log_repo(self) -> SyncLogRepository:
"""Get or create sync log repository."""
if self._sync_log_repo is None:
self._sync_log_repo = SyncLogRepository(self.db)
return self._sync_log_repo
def sync_jira(self, full_refresh: bool = False) -> SyncSummary:
"""Sync tasks from Jira.
Args:
full_refresh: If True, syncs all tasks. If False, only updates changed.
Returns:
SyncSummary with results.
"""
start_time = datetime.now()
summary = SyncSummary(source_system="jira")
if not self.settings.jira_configured:
summary.status = "failed"
summary.errors.append(
{"error": "Jira not configured", "details": "Missing JIRA_URL, JIRA_EMAIL, or JIRA_API_TOKEN"}
)
logger.error("Jira sync failed: credentials not configured")
return summary
# Create sync log entry
log_id = self.sync_log_repo.create_log(
source_system="jira",
sync_type="full" if full_refresh else "incremental",
)
try:
# Initialize Jira adapter
config = JiraConfig(
url=self.settings.jira_url,
email=self.settings.jira_email,
api_token=self.settings.jira_api_token,
project_key=self.settings.jira_project_key,
)
adapter = JiraAdapter(config)
# Fetch tasks from Jira
logger.info(f"Starting Jira sync (full_refresh={full_refresh})")
raw_tasks = adapter.fetch_tasks()
# Normalize and upsert each task
for raw_task in raw_tasks:
try:
task = adapter.normalize_task(raw_task)
tag_names = [t.name for t in task.tags]
_, was_created = self.todo_repo.upsert_by_source_id(
source_system=TodoSource.JIRA,
source_id=task.source_id,
content=task.content,
status=task.status,
priority=task.priority,
due_date=task.due_date,
source_url=task.source_url,
external_metadata=task.external_metadata.model_dump_json() if task.external_metadata else None,
sync_hash=task.sync_hash,
tags=tag_names,
)
summary.tasks_synced += 1
if was_created:
summary.tasks_created += 1
else:
summary.tasks_updated += 1
except Exception as e:
logger.error(f"Error processing task {raw_task.get('key')}: {e}")
summary.errors.append(
{
"task_key": raw_task.get("key"),
"error": str(e),
}
)
# Calculate duration
end_time = datetime.now()
summary.duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Determine final status
if summary.errors:
summary.status = "partial" if summary.tasks_synced > 0 else "failed"
else:
summary.status = "completed"
logger.info(
f"Jira sync completed: {summary.tasks_synced} tasks "
f"({summary.tasks_created} created, {summary.tasks_updated} updated)"
)
except Exception as e:
logger.exception(f"Jira sync failed: {e}")
summary.status = "failed"
summary.errors.append({"error": str(e)})
end_time = datetime.now()
summary.duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Update sync log
self.sync_log_repo.update_log(
log_id,
status=summary.status,
tasks_synced=summary.tasks_synced,
tasks_created=summary.tasks_created,
tasks_updated=summary.tasks_updated,
errors=summary.errors if summary.errors else None,
)
return summary
def sync_markdown(self, full_refresh: bool = False) -> SyncSummary:
"""Sync tasks from local markdown files.
Args:
full_refresh: If True, syncs all tasks. If False, only updates changed.
Returns:
SyncSummary with results.
"""
start_time = datetime.now()
summary = SyncSummary(source_system="markdown")
if not self.settings.markdown_configured:
summary.status = "skipped"
logger.debug("Markdown sync skipped: not configured or disabled")
return summary
# Create sync log entry
log_id = self.sync_log_repo.create_log(
source_system="markdown",
sync_type="full" if full_refresh else "incremental",
)
try:
# Initialize Markdown adapter
config = MarkdownConfig(
search_paths=self.settings.markdown_search_paths,
file_patterns=self.settings.markdown_file_patterns,
)
adapter = MarkdownAdapter(config)
# Fetch tasks from markdown files
logger.info(f"Starting markdown sync (full_refresh={full_refresh})")
raw_tasks = adapter.fetch_tasks()
# Normalize and upsert each task
for raw_task in raw_tasks:
try:
task = adapter.normalize_task(raw_task)
tag_names = [t.name for t in task.tags]
_, was_created = self.todo_repo.upsert_by_source_id(
source_system=TodoSource.MARKDOWN,
source_id=task.source_id,
content=task.content,
status=task.status,
priority=task.priority,
due_date=task.due_date,
source_url=task.source_url,
external_metadata=task.external_metadata.model_dump_json() if task.external_metadata else None,
sync_hash=task.sync_hash,
tags=tag_names,
)
summary.tasks_synced += 1
if was_created:
summary.tasks_created += 1
else:
summary.tasks_updated += 1
except Exception as e:
logger.error(
f"Error processing markdown task from {raw_task.get('file_path')}:{raw_task.get('line_number')}: {e}"
)
summary.errors.append(
{
"file_path": raw_task.get("file_path"),
"line_number": raw_task.get("line_number"),
"error": str(e),
}
)
# Calculate duration
end_time = datetime.now()
summary.duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Determine final status
if summary.errors:
summary.status = "partial" if summary.tasks_synced > 0 else "failed"
else:
summary.status = "completed"
logger.info(
f"Markdown sync completed: {summary.tasks_synced} tasks "
f"({summary.tasks_created} created, {summary.tasks_updated} updated)"
)
except Exception as e:
logger.exception(f"Markdown sync failed: {e}")
summary.status = "failed"
summary.errors.append({"error": str(e)})
end_time = datetime.now()
summary.duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Update sync log
self.sync_log_repo.update_log(
log_id,
status=summary.status,
tasks_synced=summary.tasks_synced,
tasks_created=summary.tasks_created,
tasks_updated=summary.tasks_updated,
errors=summary.errors if summary.errors else None,
)
return summary
def sync_stm(self, full_refresh: bool = False) -> SyncSummary:
"""Sync tasks from STM (Simple Task Master) workspaces.
Args:
full_refresh: If True, syncs all tasks. If False, only updates changed.
Returns:
SyncSummary with results.
"""
start_time = datetime.now()
summary = SyncSummary(source_system="stm")
if not self.settings.stm_configured:
summary.status = "skipped"
logger.debug("STM sync skipped: not configured or disabled")
return summary
# Create sync log entry
log_id = self.sync_log_repo.create_log(
source_system="stm",
sync_type="full" if full_refresh else "incremental",
)
try:
# Initialize STM adapter
config = StmConfig(
search_paths=self.settings.stm_search_paths,
stm_binary=self.settings.stm_binary,
)
adapter = StmAdapter(config)
# Fetch tasks from STM workspaces
logger.info(f"Starting STM sync (full_refresh={full_refresh})")
raw_tasks = adapter.fetch_tasks()
# Normalize and upsert each task
for raw_task in raw_tasks:
try:
task = adapter.normalize_task(raw_task)
tag_names = [t.name for t in task.tags]
_, was_created = self.todo_repo.upsert_by_source_id(
source_system=TodoSource.STM,
source_id=task.source_id,
content=task.content,
status=task.status,
priority=task.priority,
due_date=task.due_date,
source_url=task.source_url,
external_metadata=task.external_metadata.model_dump_json() if task.external_metadata else None,
sync_hash=task.sync_hash,
tags=tag_names,
)
summary.tasks_synced += 1
if was_created:
summary.tasks_created += 1
else:
summary.tasks_updated += 1
except Exception as e:
logger.error(f"Error processing STM task {raw_task.get('id')}: {e}")
summary.errors.append(
{
"task_id": raw_task.get("id"),
"workspace": raw_task.get("_workspace_path"),
"error": str(e),
}
)
# Calculate duration
end_time = datetime.now()
summary.duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Determine final status
if summary.errors:
summary.status = "partial" if summary.tasks_synced > 0 else "failed"
else:
summary.status = "completed"
logger.info(
f"STM sync completed: {summary.tasks_synced} tasks "
f"({summary.tasks_created} created, {summary.tasks_updated} updated)"
)
except Exception as e:
logger.exception(f"STM sync failed: {e}")
summary.status = "failed"
summary.errors.append({"error": str(e)})
end_time = datetime.now()
summary.duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Update sync log
self.sync_log_repo.update_log(
log_id,
status=summary.status,
tasks_synced=summary.tasks_synced,
tasks_created=summary.tasks_created,
tasks_updated=summary.tasks_updated,
errors=summary.errors if summary.errors else None,
)
return summary
def sync_all(self, full_refresh: bool = False) -> list[SyncSummary]:
"""Sync tasks from all configured sources.
Args:
full_refresh: If True, syncs all tasks from all sources.
Returns:
List of SyncSummary for each source.
"""
summaries = []
# Sync Jira if configured
if self.settings.jira_configured:
summaries.append(self.sync_jira(full_refresh))
# Sync markdown files if configured
if self.settings.markdown_configured:
summaries.append(self.sync_markdown(full_refresh))
# Sync STM workspaces if configured
if self.settings.stm_configured:
summaries.append(self.sync_stm(full_refresh))
# GitHub and Linear would be added here in Phase 2
return summaries
def list_tasks(
self,
source_system: str | None = None,
status: str | None = None,
priority: int | None = None,
tags: list[str] | None = None,
limit: int = 50,
offset: int = 0,
) -> list[Todo]:
"""List tasks with optional filtering.
Args:
source_system: Filter by source (local, jira, github, linear).
status: Filter by status.
priority: Filter by minimum priority.
tags: Filter by tags (all must match).
limit: Maximum results to return.
offset: Number of results to skip.
Returns:
List of matching Todo objects.
"""
source = TodoSource(source_system) if source_system else None
return self.todo_repo.list(
source_system=source,
status=status,
priority=priority,
tags=tags,
limit=limit,
offset=offset,
)