MCP JIRA Server
by Warzuponus
"""
MCP (Model Context Protocol) implementation for Jira integration.
Handles function definitions, resource management, and protocol handlers.
"""
from enum import Enum
from typing import Dict, Any, List, Optional, Union, Callable
from pydantic import BaseModel, Field
import asyncio
import logging
from datetime import datetime
from .types import (
Issue, Sprint, TeamMember, SprintStatus,
IssueType, Priority, Risk
)
from .jira_client import JiraClient
from .scrum_master import ScrumMaster
from .auth import TokenAuth
logger = logging.getLogger(__name__)
class MCPResourceType(str, Enum):
"""MCP Resource Types"""
ISSUE = "issue"
SPRINT = "sprint"
TEAM = "team"
METRICS = "metrics"
REPORT = "report"
class MCPFunction(BaseModel):
"""MCP Function Definition"""
name: str
description: str
resource_type: MCPResourceType
parameters: Dict[str, Any]
returns: Dict[str, Any]
handler: Optional[str] = None
class MCPContext(BaseModel):
"""MCP Context Information"""
conversation_id: str
user_id: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: Dict[str, Any] = Field(default_factory=dict)
api_key: str # Add API key to context
class MCPRequest(BaseModel):
"""MCP Request Structure"""
function: str
parameters: Dict[str, Any]
context: MCPContext
resource_type: MCPResourceType
class MCPResponse(BaseModel):
"""MCP Response Structure"""
status: str
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
context: MCPContext
class MCPProtocolHandler:
"""
Main handler for MCP protocol implementation.
Manages resources, functions, and request processing.
"""
def __init__(self, jira_client: JiraClient, scrum_master: ScrumMaster, auth_handler: TokenAuth):
self.jira = jira_client
self.scrum_master = scrum_master
self.auth_handler = auth_handler
self.functions: Dict[str, MCPFunction] = {}
self._register_core_functions()
def _register_core_functions(self):
"""Register core MCP functions"""
self.register_function(
MCPFunction(
name="create_issue",
description="Create a new Jira issue",
resource_type=MCPResourceType.ISSUE,
parameters={
"summary": {"type": "string", "required": True},
"description": {"type": "string", "required": True},
"issue_type": {"type": "string", "enum": [t.value for t in IssueType]},
"priority": {"type": "string", "enum": [p.value for p in Priority]},
"story_points": {"type": "number", "required": False},
"assignee": {"type": "string", "required": False}
},
returns={
"issue_key": {"type": "string"}
},
handler="handle_create_issue"
)
)
self.register_function(
MCPFunction(
name="plan_sprint",
description="Plan a sprint with automated workload balancing",
resource_type=MCPResourceType.SPRINT,
parameters={
"sprint_id": {"type": "integer", "required": True},
"target_velocity": {"type": "number", "required": True},
"team_members": {"type": "array", "items": {"type": "string"}}
},
returns={
"recommendations": {"type": "array", "items": {"type": "string"}}
},
handler="handle_plan_sprint"
)
)
# Add more core functions...
def register_function(self, function: MCPFunction):
"""Register a new MCP function"""
self.functions[function.name] = function
logger.info(f"Registered MCP function: {function.name}")
async def process_request(self, request: MCPRequest) -> MCPResponse:
"""Process an MCP request"""
try:
# Verify API key
try:
await self.auth_handler.verify_token(request.context.api_key)
except Exception as auth_error:
return MCPResponse(
status="error",
error="Authentication failed: Invalid API key",
context=request.context
)
# Rest of the validation and processing remains the same
if request.function not in self.functions:
return MCPResponse(
status="error",
error=f"Unknown function: {request.function}",
context=request.context
)
function = self.functions[request.function]
if function.resource_type != request.resource_type:
return MCPResponse(
status="error",
error=f"Invalid resource type for function {request.function}",
context=request.context
)
handler = getattr(self, function.handler)
if not handler:
return MCPResponse(
status="error",
error=f"Handler not implemented: {function.handler}",
context=request.context
)
result = await handler(request.parameters, request.context)
return MCPResponse(
status="success",
data=result,
context=request.context
)
except Exception as e:
logger.exception(f"Error processing MCP request: {str(e)}")
return MCPResponse(
status="error",
error=str(e),
context=request.context
)
# Handler implementations
async def handle_create_issue(
self,
parameters: Dict[str, Any],
context: MCPContext
) -> Dict[str, Any]:
"""Handle create_issue function"""
issue_key = await self.jira.create_issue(
summary=parameters["summary"],
description=parameters["description"],
issue_type=IssueType(parameters["issue_type"]),
priority=Priority(parameters["priority"]),
story_points=parameters.get("story_points"),
assignee=parameters.get("assignee")
)
return {"issue_key": issue_key}
async def handle_plan_sprint(
self,
parameters: Dict[str, Any],
context: MCPContext
) -> Dict[str, Any]:
"""Handle plan_sprint function"""
recommendations = await self.scrum_master.plan_sprint(
sprint_id=parameters["sprint_id"],
target_velocity=parameters["target_velocity"],
team_members=parameters["team_members"]
)
return {"recommendations": recommendations}
# Resource handlers
async def get_resource(
self,
resource_type: MCPResourceType,
resource_id: str
) -> Dict[str, Any]:
"""Get a resource by type and ID"""
handlers = {
MCPResourceType.ISSUE: self.jira.get_issue,
MCPResourceType.SPRINT: self.jira.get_sprint,
# Add more resource handlers...
}
handler = handlers.get(resource_type)
if not handler:
raise ValueError(f"Unknown resource type: {resource_type}")
return await handler(resource_id)
async def update_resource(
self,
resource_type: MCPResourceType,
resource_id: str,
updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Update a resource"""
# Implement resource update logic
pass