MCP JIRA Server

""" MCP (Model Context Protocol) implementation for Jira integration. Handles function definitions, resource management, and protocol handlers. """ from enum import Enum from typing import Dict, Any, List, Optional, Union, Callable from pydantic import BaseModel, Field import asyncio import logging from datetime import datetime from .types import ( Issue, Sprint, TeamMember, SprintStatus, IssueType, Priority, Risk ) from .jira_client import JiraClient from .scrum_master import ScrumMaster from .auth import TokenAuth logger = logging.getLogger(__name__) class MCPResourceType(str, Enum): """MCP Resource Types""" ISSUE = "issue" SPRINT = "sprint" TEAM = "team" METRICS = "metrics" REPORT = "report" class MCPFunction(BaseModel): """MCP Function Definition""" name: str description: str resource_type: MCPResourceType parameters: Dict[str, Any] returns: Dict[str, Any] handler: Optional[str] = None class MCPContext(BaseModel): """MCP Context Information""" conversation_id: str user_id: str timestamp: datetime = Field(default_factory=datetime.utcnow) metadata: Dict[str, Any] = Field(default_factory=dict) api_key: str # Add API key to context class MCPRequest(BaseModel): """MCP Request Structure""" function: str parameters: Dict[str, Any] context: MCPContext resource_type: MCPResourceType class MCPResponse(BaseModel): """MCP Response Structure""" status: str data: Optional[Dict[str, Any]] = None error: Optional[str] = None context: MCPContext class MCPProtocolHandler: """ Main handler for MCP protocol implementation. Manages resources, functions, and request processing. """ def __init__(self, jira_client: JiraClient, scrum_master: ScrumMaster, auth_handler: TokenAuth): self.jira = jira_client self.scrum_master = scrum_master self.auth_handler = auth_handler self.functions: Dict[str, MCPFunction] = {} self._register_core_functions() def _register_core_functions(self): """Register core MCP functions""" self.register_function( MCPFunction( name="create_issue", description="Create a new Jira issue", resource_type=MCPResourceType.ISSUE, parameters={ "summary": {"type": "string", "required": True}, "description": {"type": "string", "required": True}, "issue_type": {"type": "string", "enum": [t.value for t in IssueType]}, "priority": {"type": "string", "enum": [p.value for p in Priority]}, "story_points": {"type": "number", "required": False}, "assignee": {"type": "string", "required": False} }, returns={ "issue_key": {"type": "string"} }, handler="handle_create_issue" ) ) self.register_function( MCPFunction( name="plan_sprint", description="Plan a sprint with automated workload balancing", resource_type=MCPResourceType.SPRINT, parameters={ "sprint_id": {"type": "integer", "required": True}, "target_velocity": {"type": "number", "required": True}, "team_members": {"type": "array", "items": {"type": "string"}} }, returns={ "recommendations": {"type": "array", "items": {"type": "string"}} }, handler="handle_plan_sprint" ) ) # Add more core functions... def register_function(self, function: MCPFunction): """Register a new MCP function""" self.functions[function.name] = function logger.info(f"Registered MCP function: {function.name}") async def process_request(self, request: MCPRequest) -> MCPResponse: """Process an MCP request""" try: # Verify API key try: await self.auth_handler.verify_token(request.context.api_key) except Exception as auth_error: return MCPResponse( status="error", error="Authentication failed: Invalid API key", context=request.context ) # Rest of the validation and processing remains the same if request.function not in self.functions: return MCPResponse( status="error", error=f"Unknown function: {request.function}", context=request.context ) function = self.functions[request.function] if function.resource_type != request.resource_type: return MCPResponse( status="error", error=f"Invalid resource type for function {request.function}", context=request.context ) handler = getattr(self, function.handler) if not handler: return MCPResponse( status="error", error=f"Handler not implemented: {function.handler}", context=request.context ) result = await handler(request.parameters, request.context) return MCPResponse( status="success", data=result, context=request.context ) except Exception as e: logger.exception(f"Error processing MCP request: {str(e)}") return MCPResponse( status="error", error=str(e), context=request.context ) # Handler implementations async def handle_create_issue( self, parameters: Dict[str, Any], context: MCPContext ) -> Dict[str, Any]: """Handle create_issue function""" issue_key = await self.jira.create_issue( summary=parameters["summary"], description=parameters["description"], issue_type=IssueType(parameters["issue_type"]), priority=Priority(parameters["priority"]), story_points=parameters.get("story_points"), assignee=parameters.get("assignee") ) return {"issue_key": issue_key} async def handle_plan_sprint( self, parameters: Dict[str, Any], context: MCPContext ) -> Dict[str, Any]: """Handle plan_sprint function""" recommendations = await self.scrum_master.plan_sprint( sprint_id=parameters["sprint_id"], target_velocity=parameters["target_velocity"], team_members=parameters["team_members"] ) return {"recommendations": recommendations} # Resource handlers async def get_resource( self, resource_type: MCPResourceType, resource_id: str ) -> Dict[str, Any]: """Get a resource by type and ID""" handlers = { MCPResourceType.ISSUE: self.jira.get_issue, MCPResourceType.SPRINT: self.jira.get_sprint, # Add more resource handlers... } handler = handlers.get(resource_type) if not handler: raise ValueError(f"Unknown resource type: {resource_type}") return await handler(resource_id) async def update_resource( self, resource_type: MCPResourceType, resource_id: str, updates: Dict[str, Any] ) -> Dict[str, Any]: """Update a resource""" # Implement resource update logic pass