import os
import logging
from typing import Optional, Any, Dict
from contextlib import asynccontextmanager
from dataclasses import dataclass
from collections.abc import AsyncIterator
import httpx
from fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
LEANTIME_URL = os.environ.get("LEANTIME_URL", "http://leantime:8080")
LEANTIME_API_KEY = os.environ.get("LEANTIME_API_KEY", "")
@dataclass
class AppContext:
client: httpx.AsyncClient
request_id: int
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
client = httpx.AsyncClient(
base_url=LEANTIME_URL,
headers={"x-api-key": LEANTIME_API_KEY, "Content-Type": "application/json"},
timeout=30.0
)
try:
logger.info(f"Leantime MCP server connected to {LEANTIME_URL}")
yield AppContext(client=client, request_id=0)
finally:
await client.aclose()
async def rpc_call(ctx: Context, method: str, params: Optional[Dict] = None) -> Any:
app_ctx = ctx.request_context.lifespan_context
app_ctx.request_id += 1
payload = {"method": method, "jsonrpc": "2.0", "id": str(app_ctx.request_id), "params": params or {}}
response = await app_ctx.client.post("/api/jsonrpc", json=payload)
response.raise_for_status()
result = response.json()
if "error" in result:
error = result["error"]
raise Exception(f"Leantime API Error [{error.get('code')}]: {error.get('message')}")
return result.get("result")
mcp = FastMCP(
name="Leantime MCP Server",
instructions="MCP server for Leantime project management operations",
lifespan=app_lifespan
)
@mcp.tool()
async def list_projects(ctx: Context) -> dict:
"""Get all accessible projects."""
projects = await rpc_call(ctx, "leantime.rpc.projects.getAllProjects")
return {"projects": [{"id": p.get("id"), "name": p.get("name"), "clientId": p.get("clientId")} for p in (projects or [])]}
@mcp.tool()
async def get_project_users(ctx: Context, project_id: int) -> dict:
"""Get users assigned to a specific project."""
users = await rpc_call(ctx, "leantime.rpc.projects.getUsersAssignedToProject", {"projectId": project_id})
return {"users": users or []}
class TicketInput(BaseModel):
headline: str = Field(description="Task title")
description: str = Field(default="", description="Task description")
type: str = Field(default="task", description="Type: task, bug, story, milestone, subtask")
priority: str = Field(default="", description="Priority level")
status: int = Field(default=0, description="Status: 0=Backlog, 1=To Do, 2=In Progress, 3=In Review, 4=Done")
editor_id: Optional[int] = Field(default=None, description="Assigned user ID")
milestone_id: int = Field(default=0, description="Milestone ID")
sprint: int = Field(default=0, description="Sprint ID")
tags: str = Field(default="", description="Comma-separated tags")
due_date: Optional[str] = Field(default=None, description="Due date (YYYY-MM-DD)")
plan_hours: float = Field(default=0, description="Planned hours")
@mcp.tool()
async def create_ticket(ctx: Context, project_id: int, ticket: TicketInput) -> dict:
"""Create a new ticket/task in Leantime."""
values = {
"headline": ticket.headline, "type": ticket.type, "description": ticket.description,
"projectId": project_id, "priority": ticket.priority, "status": ticket.status,
"milestoneid": ticket.milestone_id, "sprint": ticket.sprint, "tags": ticket.tags, "planHours": ticket.plan_hours
}
if ticket.editor_id:
values["editorId"] = ticket.editor_id
if ticket.due_date:
values["dateToFinish"] = ticket.due_date
ticket_id = await rpc_call(ctx, "leantime.rpc.tickets.addTicket", {"values": values})
return {"id": ticket_id, "headline": ticket.headline, "created": True}
@mcp.tool()
async def get_ticket(ctx: Context, ticket_id: int) -> dict:
"""Get details of a specific ticket."""
ticket = await rpc_call(ctx, "leantime.rpc.tickets.getTicket", {"id": str(ticket_id)})
if not ticket:
return {"error": f"Ticket {ticket_id} not found"}
return ticket
@mcp.tool()
async def update_ticket(ctx: Context, ticket_id: int, headline: Optional[str] = None, description: Optional[str] = None,
status: Optional[int] = None, priority: Optional[str] = None, editor_id: Optional[int] = None,
due_date: Optional[str] = None) -> dict:
"""Update an existing ticket's fields."""
values = {"id": ticket_id}
if headline is not None: values["headline"] = headline
if description is not None: values["description"] = description
if status is not None: values["status"] = status
if priority is not None: values["priority"] = priority
if editor_id is not None: values["editorId"] = editor_id
if due_date is not None: values["dateToFinish"] = due_date
result = await rpc_call(ctx, "leantime.rpc.tickets.updateTicket", {"id": ticket_id, "values": values})
return {"id": ticket_id, "updated": True}
@mcp.tool()
async def update_ticket_status(ctx: Context, ticket_id: int, status: int) -> dict:
"""Update ticket status. 0=Backlog, 1=To Do, 2=In Progress, 3=In Review, 4=Done"""
result = await rpc_call(ctx, "leantime.rpc.tickets.updateTicket", {"id": ticket_id, "values": {"id": ticket_id, "status": status}})
return {"id": ticket_id, "new_status": status, "updated": True}
@mcp.tool()
async def log_time(ctx: Context, ticket_id: int, hours: float, description: str = "", kind: str = "") -> dict:
"""Log time against a ticket."""
result = await rpc_call(ctx, "leantime.rpc.Timesheets.Timesheets.logTime", {"ticketId": ticket_id, "params": {"hours": hours, "description": description, "kind": kind}})
return {"ticket_id": ticket_id, "hours": hours, "logged": True}
@mcp.tool()
async def punch_in(ctx: Context, ticket_id: int) -> dict:
"""Start timer for a ticket."""
result = await rpc_call(ctx, "leantime.rpc.Timesheets.Timesheets.punchIn", {"ticketId": ticket_id})
return {"ticket_id": ticket_id, "timer_started": True}
@mcp.tool()
async def punch_out(ctx: Context, ticket_id: int) -> dict:
"""Stop timer for a ticket."""
result = await rpc_call(ctx, "leantime.rpc.Timesheets.Timesheets.punchOut", {"ticketId": ticket_id})
return {"ticket_id": ticket_id, "timer_stopped": True}
@mcp.tool()
async def get_timesheets(ctx: Context, date_from: str, date_to: str, project_id: Optional[int] = None, user_id: Optional[int] = None) -> dict:
"""Get timesheet entries within a date range."""
params = {"dateFrom": date_from, "dateTo": date_to}
if project_id: params["projectId"] = project_id
if user_id: params["userId"] = user_id
result = await rpc_call(ctx, "leantime.rpc.Timesheets.Timesheets.getAll", params)
return {"timesheets": result or []}
@mcp.tool()
async def get_sprints(ctx: Context, project_id: int) -> dict:
"""Get all sprints for a project."""
sprints = await rpc_call(ctx, "leantime.rpc.sprints.getAllSprints", {"projectId": project_id})
return {"sprints": sprints or []}
@mcp.tool()
async def get_goals(ctx: Context) -> dict:
"""Get all goals across projects."""
goals = await rpc_call(ctx, "leantime.rpc.goalcanvas.getAllGoals")
return {"goals": goals or []}
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)