import os
import logging
from typing import Optional
from datetime import date
from contextlib import asynccontextmanager
from dataclasses import dataclass
from collections.abc import AsyncIterator
import httpx
from fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPENPROJECT_URL = os.environ.get("OPENPROJECT_URL", "http://openproject:8080")
OPENPROJECT_API_KEY = os.environ.get("OPENPROJECT_API_KEY", "")
@dataclass
class AppContext:
client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
import base64
credentials = base64.b64encode(f"apikey:{OPENPROJECT_API_KEY}".encode()).decode()
client = httpx.AsyncClient(
base_url=f"{OPENPROJECT_URL}/api/v3",
headers={
"Authorization": f"Basic {credentials}",
"Content-Type": "application/json",
"Accept": "application/hal+json"
},
timeout=30.0
)
try:
logger.info(f"OpenProject MCP server connected to {OPENPROJECT_URL}")
yield AppContext(client=client)
finally:
await client.aclose()
mcp = FastMCP(
name="OpenProject MCP Server",
instructions="MCP server for OpenProject project management operations",
lifespan=app_lifespan
)
@mcp.tool()
async def list_projects(ctx: Context, page: int = 1, page_size: int = 25) -> dict:
"""List all accessible projects with pagination."""
client = ctx.request_context.lifespan_context.client
response = await client.get("/projects", params={"offset": page, "pageSize": page_size})
response.raise_for_status()
data = response.json()
return {
"projects": [{"id": p["id"], "name": p["name"], "identifier": p["identifier"]}
for p in data.get("_embedded", {}).get("elements", [])],
"total": data.get("total", 0)
}
@mcp.tool()
async def get_project(ctx: Context, project_id: int) -> dict:
"""Get details of a specific project."""
client = ctx.request_context.lifespan_context.client
response = await client.get(f"/projects/{project_id}")
if response.status_code == 404:
return {"error": f"Project {project_id} not found"}
response.raise_for_status()
return response.json()
class WorkPackageInput(BaseModel):
subject: str = Field(description="Title of the work package")
description: str = Field(default="", description="Detailed description (markdown)")
type_id: int = Field(description="Type ID (1=Task, 2=Milestone, etc.)")
status_id: int = Field(default=1, description="Status ID")
priority_id: int = Field(default=2, description="Priority ID")
assignee_id: Optional[int] = Field(default=None, description="User ID to assign")
start_date: Optional[str] = Field(default=None, description="Start date (YYYY-MM-DD)")
due_date: Optional[str] = Field(default=None, description="Due date (YYYY-MM-DD)")
estimated_hours: Optional[float] = Field(default=None, description="Estimated hours")
@mcp.tool()
async def create_work_package(ctx: Context, project_id: int, work_package: WorkPackageInput) -> dict:
"""Create a new work package in a project."""
client = ctx.request_context.lifespan_context.client
payload = {
"subject": work_package.subject,
"description": {"format": "markdown", "raw": work_package.description},
"_links": {
"type": {"href": f"/api/v3/types/{work_package.type_id}"},
"status": {"href": f"/api/v3/statuses/{work_package.status_id}"},
"priority": {"href": f"/api/v3/priorities/{work_package.priority_id}"}
}
}
if work_package.assignee_id:
payload["_links"]["assignee"] = {"href": f"/api/v3/users/{work_package.assignee_id}"}
if work_package.start_date:
payload["startDate"] = work_package.start_date
if work_package.due_date:
payload["dueDate"] = work_package.due_date
if work_package.estimated_hours:
payload["estimatedTime"] = f"PT{work_package.estimated_hours}H"
response = await client.post(f"/projects/{project_id}/work_packages", json=payload)
response.raise_for_status()
result = response.json()
return {"id": result["id"], "subject": result["subject"], "status": "created"}
@mcp.tool()
async def list_work_packages(ctx: Context, project_id: Optional[int] = None, status: str = "open",
assignee_id: Optional[int] = None, page: int = 1, page_size: int = 25) -> dict:
"""List work packages with filtering options."""
import json
client = ctx.request_context.lifespan_context.client
filters = []
if status == "open":
filters.append({"status_id": {"operator": "o", "values": []}})
elif status == "closed":
filters.append({"status_id": {"operator": "c", "values": []}})
if assignee_id:
filters.append({"assignee": {"operator": "=", "values": [str(assignee_id)]}})
params = {"offset": page, "pageSize": page_size}
if filters:
params["filters"] = json.dumps(filters)
url = f"/projects/{project_id}/work_packages" if project_id else "/work_packages"
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return {
"work_packages": [
{
"id": wp["id"], "subject": wp["subject"],
"status": wp["_links"]["status"]["title"],
"assignee": wp["_links"].get("assignee", {}).get("title", "Unassigned"),
"due_date": wp.get("dueDate")
}
for wp in data.get("_embedded", {}).get("elements", [])
],
"total": data.get("total", 0)
}
@mcp.tool()
async def update_work_package_status(ctx: Context, work_package_id: int, status_id: int, comment: str = "") -> dict:
"""Update a work package's status with optional comment."""
client = ctx.request_context.lifespan_context.client
response = await client.get(f"/work_packages/{work_package_id}")
if response.status_code == 404:
return {"error": f"Work package {work_package_id} not found"}
response.raise_for_status()
current = response.json()
payload = {
"lockVersion": current["lockVersion"],
"_links": {"status": {"href": f"/api/v3/statuses/{status_id}"}}
}
response = await client.patch(f"/work_packages/{work_package_id}", json=payload)
response.raise_for_status()
if comment:
await client.post(f"/work_packages/{work_package_id}/activities", json={"comment": {"raw": comment}})
return {"id": work_package_id, "new_status_id": status_id, "updated": True}
@mcp.tool()
async def get_work_package(ctx: Context, work_package_id: int) -> dict:
"""Get detailed information about a specific work package."""
client = ctx.request_context.lifespan_context.client
response = await client.get(f"/work_packages/{work_package_id}")
if response.status_code == 404:
return {"error": f"Work package {work_package_id} not found"}
response.raise_for_status()
wp = response.json()
return {
"id": wp["id"], "subject": wp["subject"],
"description": wp.get("description", {}).get("raw", ""),
"status": wp["_links"]["status"]["title"],
"type": wp["_links"]["type"]["title"],
"priority": wp["_links"]["priority"]["title"],
"assignee": wp["_links"].get("assignee", {}).get("title", "Unassigned"),
"start_date": wp.get("startDate"), "due_date": wp.get("dueDate"),
"estimated_hours": wp.get("estimatedTime"), "spent_hours": wp.get("spentTime"),
"percent_done": wp.get("percentageDone", 0), "lock_version": wp["lockVersion"]
}
@mcp.tool()
async def log_time(ctx: Context, work_package_id: int, hours: float, activity_id: int,
comment: str = "", spent_on: Optional[str] = None) -> dict:
"""Log time against a work package."""
client = ctx.request_context.lifespan_context.client
payload = {
"hours": f"PT{hours}H", "comment": {"raw": comment},
"spentOn": spent_on or date.today().isoformat(),
"_links": {
"workPackage": {"href": f"/api/v3/work_packages/{work_package_id}"},
"activity": {"href": f"/api/v3/time_entries/activities/{activity_id}"}
}
}
response = await client.post("/time_entries", json=payload)
response.raise_for_status()
result = response.json()
return {"id": result["id"], "hours": hours, "logged": True}
@mcp.tool()
async def get_time_entry_activities(ctx: Context) -> dict:
"""Get available time entry activity types."""
client = ctx.request_context.lifespan_context.client
response = await client.post("/time_entries/form", json={})
response.raise_for_status()
data = response.json()
activities = data.get("_embedded", {}).get("schema", {}).get("activity", {}).get("_embedded", {}).get("allowedValues", [])
return {"activities": [{"id": a["id"], "name": a["name"]} for a in activities]}
@mcp.tool()
async def get_statuses(ctx: Context) -> dict:
"""Get all available work package statuses."""
client = ctx.request_context.lifespan_context.client
response = await client.get("/statuses")
response.raise_for_status()
data = response.json()
return {"statuses": [{"id": s["id"], "name": s["name"], "is_closed": s.get("isClosed", False)}
for s in data.get("_embedded", {}).get("elements", [])]}
@mcp.tool()
async def get_types(ctx: Context) -> dict:
"""Get all available work package types."""
client = ctx.request_context.lifespan_context.client
response = await client.get("/types")
response.raise_for_status()
data = response.json()
return {"types": [{"id": t["id"], "name": t["name"], "color": t.get("color")}
for t in data.get("_embedded", {}).get("elements", [])]}
@mcp.tool()
async def get_users(ctx: Context, project_id: Optional[int] = None) -> dict:
"""Get users, optionally filtered by project membership."""
client = ctx.request_context.lifespan_context.client
params = {}
if project_id:
params["filters"] = f'[{{"member":{{"operator":"=","values":["{project_id}"]}}}}]'
response = await client.get("/users", params=params)
response.raise_for_status()
data = response.json()
return {"users": [{"id": u["id"], "name": u["name"], "email": u.get("email")}
for u in data.get("_embedded", {}).get("elements", [])]}
@mcp.tool()
async def add_comment(ctx: Context, work_package_id: int, comment: str) -> dict:
"""Add a comment to a work package."""
client = ctx.request_context.lifespan_context.client
response = await client.post(f"/work_packages/{work_package_id}/activities", json={"comment": {"raw": comment}})
response.raise_for_status()
result = response.json()
return {"id": result["id"], "commented": True}
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)