Skip to main content
Glama

MCP JIRA Server

by Warzuponus
mcp_protocol.py6 kB
""" MCP (Model Context Protocol) implementation for Jira integration. Handles function definitions, resource management, and protocol handlers. """ from enum import Enum from typing import Dict, Any, List, Optional, Union, Callable from pydantic import BaseModel, Field import asyncio import logging from datetime import datetime, timezone from .types import ( Issue, Sprint, TeamMember, SprintStatus, IssueType, Priority, Risk ) from .jira_client import JiraClient logger = logging.getLogger(__name__) class MCPResourceType(str, Enum): """MCP Resource Types""" ISSUE = "issue" SPRINT = "sprint" TEAM = "team" METRICS = "metrics" REPORT = "report" class MCPFunction(BaseModel): """MCP Function Definition""" name: str description: str resource_type: MCPResourceType parameters: Dict[str, Any] returns: Dict[str, Any] handler: Optional[str] = None class MCPContext(BaseModel): """MCP Context Information""" conversation_id: str user_id: str timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) metadata: Dict[str, Any] = Field(default_factory=dict) class MCPRequest(BaseModel): """MCP Request Structure""" function: str parameters: Dict[str, Any] context: MCPContext resource_type: MCPResourceType class MCPResponse(BaseModel): """MCP Response Structure""" status: str data: Optional[Dict[str, Any]] = None error: Optional[str] = None context: MCPContext class MCPProtocolHandler: """ Main handler for MCP protocol implementation. Manages resources, functions, and request processing. """ def __init__(self, jira_client: JiraClient): self.jira = jira_client self.functions: Dict[str, MCPFunction] = {} self._register_core_functions() def _register_core_functions(self): """Register core MCP functions""" self.register_function( MCPFunction( name="create_issue", description="Create a new Jira issue", resource_type=MCPResourceType.ISSUE, parameters={ "summary": {"type": "string", "required": True}, "description": {"type": "string", "required": True}, "issue_type": {"type": "string", "enum": [t.value for t in IssueType]}, "priority": {"type": "string", "enum": [p.value for p in Priority]}, "story_points": {"type": "number", "required": False}, "assignee": {"type": "string", "required": False} }, returns={ "issue_key": {"type": "string"} }, handler="handle_create_issue" ) ) def register_function(self, function: MCPFunction): """Register a new MCP function""" self.functions[function.name] = function logger.info(f"Registered MCP function: {function.name}") async def process_request(self, request: MCPRequest) -> MCPResponse: """Process an MCP request""" try: if request.function not in self.functions: return MCPResponse( status="error", error=f"Unknown function: {request.function}", context=request.context ) function = self.functions[request.function] if function.resource_type != request.resource_type: return MCPResponse( status="error", error=f"Invalid resource type for function {request.function}", context=request.context ) handler = getattr(self, function.handler) if not handler: return MCPResponse( status="error", error=f"Handler not implemented: {function.handler}", context=request.context ) result = await handler(request.parameters, request.context) return MCPResponse( status="success", data=result, context=request.context ) except Exception as e: logger.exception(f"Error processing MCP request: {str(e)}") return MCPResponse( status="error", error=str(e), context=request.context ) # Handler implementations async def handle_create_issue( self, parameters: Dict[str, Any], context: MCPContext ) -> Dict[str, Any]: """Handle create_issue function""" issue_key = await self.jira.create_issue( summary=parameters["summary"], description=parameters["description"], issue_type=IssueType(parameters["issue_type"]), priority=Priority(parameters["priority"]), story_points=parameters.get("story_points"), assignee=parameters.get("assignee") ) return {"issue_key": issue_key} # Resource handlers async def get_resource( self, resource_type: MCPResourceType, resource_id: str ) -> Dict[str, Any]: """Get a resource by type and ID""" handlers = { MCPResourceType.ISSUE: self.jira.get_issue, MCPResourceType.SPRINT: self.jira.get_sprint, # Add more resource handlers... } handler = handlers.get(resource_type) if not handler: raise ValueError(f"Unknown resource type: {resource_type}") return await handler(resource_id) async def update_resource( self, resource_type: MCPResourceType, resource_id: str, updates: Dict[str, Any] ) -> Dict[str, Any]: """Update a resource""" # Implement resource update logic pass

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Warzuponus/mcp-jira'

If you have feedback or need assistance with the MCP directory API, please join our Discord server