"""MCP Server implementation using FastMCP.
Provides MCP tools for task listing and synchronization.
"""
from __future__ import annotations
from typing import Any
from fastmcp import FastMCP
from loguru import logger
from pydantic import BaseModel, Field
from mcp_task_aggregator.agents import SyncAgent
from mcp_task_aggregator.config import get_settings
from mcp_task_aggregator.logging import setup_logging
# Initialize FastMCP server
mcp = FastMCP(
name="mcp-task-aggregator",
version="0.1.0",
)
class ListTasksParams(BaseModel):
"""Parameters for list_tasks tool."""
source_system: str | None = Field(
default=None,
description="Filter by source system (local, jira, github, linear, markdown, stm)",
)
status: str | None = Field(
default=None,
description="Filter by status (todo, in_progress, done, blocked, in_review, cancelled)",
)
priority: int | None = Field(
default=None,
description="Filter by minimum priority (0-5)",
)
tags: list[str] | None = Field(
default=None,
description="Filter by tags (all must match)",
)
limit: int = Field(
default=50,
description="Maximum number of tasks to return",
ge=1,
le=200,
)
offset: int = Field(
default=0,
description="Number of tasks to skip for pagination",
ge=0,
)
class SyncTasksParams(BaseModel):
"""Parameters for sync_tasks tool."""
source: str = Field(
default="all",
description="Source to sync (jira, github, linear, markdown, stm, all)",
)
full_refresh: bool = Field(
default=False,
description="If true, sync all tasks regardless of changes",
)
class TaskResponse(BaseModel):
"""Response model for a single task."""
id: int | None
content: str
status: str
priority: int
source_system: str
source_id: str | None
source_url: str | None
due_date: str | None
tags: list[str]
created_at: str
updated_at: str
class ListTasksResponse(BaseModel):
"""Response model for list_tasks tool."""
tasks: list[TaskResponse]
total: int
limit: int
offset: int
class SyncResponse(BaseModel):
"""Response model for sync_tasks tool."""
summaries: list[dict[str, Any]]
total_synced: int
total_created: int
total_updated: int
total_errors: int
# Core implementation functions (testable without MCP decorator)
def list_tasks_impl(
source_system: str | None = None,
status: str | None = None,
priority: int | None = None,
tags: list[str] | None = None,
limit: int = 50,
offset: int = 0,
) -> dict[str, Any]:
"""List tasks from the unified task database.
Retrieves tasks with optional filtering by source system, status,
priority, and tags. Supports pagination.
Args:
source_system: Filter by source (local, jira, github, linear, markdown, stm)
status: Filter by status (todo, in_progress, done, blocked, in_review, cancelled)
priority: Filter by minimum priority (0-5)
tags: Filter by tags (all must match)
limit: Maximum results (1-200, default 50)
offset: Skip results for pagination
Returns:
Dictionary with tasks list and pagination info
"""
try:
agent = SyncAgent()
tasks = agent.list_tasks(
source_system=source_system,
status=status,
priority=priority,
tags=tags,
limit=limit,
offset=offset,
)
task_responses = []
for task in tasks:
task_responses.append(
{
"id": task.id,
"content": task.content,
"status": task.status.value,
"priority": task.priority,
"source_system": task.source_system.value,
"source_id": task.source_id,
"source_url": task.source_url,
"due_date": task.due_date.isoformat() if task.due_date else None,
"tags": [t.name for t in task.tags],
"created_at": task.created_at.isoformat(),
"updated_at": task.updated_at.isoformat(),
}
)
return {
"tasks": task_responses,
"total": len(task_responses),
"limit": limit,
"offset": offset,
}
except Exception as e:
logger.exception(f"Error listing tasks: {e}")
return {
"error": str(e),
"tasks": [],
"total": 0,
"limit": limit,
"offset": offset,
}
def list_todos_impl(
status: str | None = None,
priority: int | None = None,
limit: int = 50,
) -> dict[str, Any]:
"""List local todos (alias for list_tasks with source_system=local).
Convenience method for listing locally-created tasks only.
Args:
status: Filter by status
priority: Filter by minimum priority
limit: Maximum results
Returns:
Dictionary with todos list
"""
return list_tasks_impl(
source_system="local",
status=status,
priority=priority,
limit=limit,
)
def sync_tasks_impl(
source: str = "all",
full_refresh: bool = False,
) -> dict[str, Any]:
"""Synchronize tasks from external systems.
Fetches tasks from configured external systems (Jira, GitHub, Linear,
Markdown, STM) and upserts them into the local database.
Args:
source: Source to sync (jira, github, linear, markdown, stm, all)
full_refresh: If true, sync all tasks regardless of changes
Returns:
Dictionary with sync results for each source
"""
try:
agent = SyncAgent()
if source == "all":
summaries = agent.sync_all(full_refresh=full_refresh)
elif source == "jira":
summaries = [agent.sync_jira(full_refresh=full_refresh)]
elif source == "markdown":
summaries = [agent.sync_markdown(full_refresh=full_refresh)]
elif source == "stm":
summaries = [agent.sync_stm(full_refresh=full_refresh)]
else:
return {
"error": f"Unknown source: {source}. Valid options: jira, github, linear, markdown, stm, all",
"summaries": [],
"total_synced": 0,
"total_created": 0,
"total_updated": 0,
"total_errors": 0,
}
# Aggregate results
total_synced = sum(s.tasks_synced for s in summaries)
total_created = sum(s.tasks_created for s in summaries)
total_updated = sum(s.tasks_updated for s in summaries)
total_errors = sum(len(s.errors) for s in summaries)
return {
"summaries": [s.to_dict() for s in summaries],
"total_synced": total_synced,
"total_created": total_created,
"total_updated": total_updated,
"total_errors": total_errors,
}
except Exception as e:
logger.exception(f"Error syncing tasks: {e}")
return {
"error": str(e),
"summaries": [],
"total_synced": 0,
"total_created": 0,
"total_updated": 0,
"total_errors": 0,
}
# MCP Tool registrations
@mcp.tool()
def list_tasks(
source_system: str | None = None,
status: str | None = None,
priority: int | None = None,
tags: list[str] | None = None,
limit: int = 50,
offset: int = 0,
) -> dict[str, Any]:
"""List tasks from the unified task database.
Retrieves tasks with optional filtering by source system, status,
priority, and tags. Supports pagination.
Args:
source_system: Filter by source (local, jira, github, linear, markdown, stm)
status: Filter by status (todo, in_progress, done, blocked, in_review, cancelled)
priority: Filter by minimum priority (0-5)
tags: Filter by tags (all must match)
limit: Maximum results (1-200, default 50)
offset: Skip results for pagination
Returns:
Dictionary with tasks list and pagination info
"""
return list_tasks_impl(
source_system=source_system,
status=status,
priority=priority,
tags=tags,
limit=limit,
offset=offset,
)
@mcp.tool()
def list_todos(
status: str | None = None,
priority: int | None = None,
limit: int = 50,
) -> dict[str, Any]:
"""List local todos (alias for list_tasks with source_system=local).
Convenience method for listing locally-created tasks only.
Args:
status: Filter by status
priority: Filter by minimum priority
limit: Maximum results
Returns:
Dictionary with todos list
"""
return list_todos_impl(status=status, priority=priority, limit=limit)
@mcp.tool()
def sync_tasks(
source: str = "all",
full_refresh: bool = False,
) -> dict[str, Any]:
"""Synchronize tasks from external systems.
Fetches tasks from configured external systems (Jira, GitHub, Linear,
Markdown, STM) and upserts them into the local database.
Args:
source: Source to sync (jira, github, linear, markdown, stm, all)
full_refresh: If true, sync all tasks regardless of changes
Returns:
Dictionary with sync results for each source
"""
return sync_tasks_impl(source=source, full_refresh=full_refresh)
def run_server() -> None:
"""Run the MCP server."""
setup_logging()
settings = get_settings()
logger.info(f"Starting MCP Task Aggregator server (db: {settings.database_path})")
mcp.run()