websocket_server.py•37.6 kB
#!/usr/bin/env python3
"""
WebSocket MCP Server for Jira-GitLab Integration
Provides WebSocket interface for MCP tools and resources
"""
import asyncio
import json
import logging
import uuid
from typing import Any, Dict, List, Optional, Set
import websockets
from websockets.server import serve, WebSocketServerProtocol
from websockets.exceptions import ConnectionClosed, WebSocketException
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Resource,
Tool,
TextContent,
LoggingLevel
)
from connectors.jira_client import JiraClient
from connectors.github_client import GitHubClient
from connectors.openai_client import OpenAIClient, OpenAIError
from utils.error_handler import MCPError, handle_errors
from utils.config import load_config
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WebSocketMCPServer:
"""WebSocket-based MCP Server"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.server = Server("jira-gitlab-mcp-ws")
self.connected_clients: Set[WebSocketServerProtocol] = set()
# Get project key from config
self.project_key = config.get("jira", {}).get("project_key", "DEMO")
# Initialize clients
self.jira_client: Optional[JiraClient] = None
self.github_client: Optional[GitHubClient] = None
self.openai_client: Optional[OpenAIClient] = None
# Setup MCP server handlers
self._setup_handlers()
def _setup_handlers(self):
"""Setup MCP server handlers"""
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available MCP resources"""
return [
Resource(
uri="jira://issues",
name="Jira Issues",
description="Access to Jira issues in the configured project",
mimeType="application/json"
),
Resource(
uri="github://projects",
name="GitHub Projects",
description="Access to GitHub repositories and branches",
mimeType="application/json"
)
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read MCP resource content"""
try:
if uri == "jira://issues":
if not self.jira_client:
raise MCPError("Jira client not initialized")
issues = await self.jira_client.get_issues()
return json.dumps({
"issues": [
{
"key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": issue["fields"]["assignee"]["displayName"] if issue["fields"]["assignee"] else None,
"created": issue["fields"]["created"],
"updated": issue["fields"]["updated"]
}
for issue in issues
]
}, indent=2)
elif uri == "github://projects":
if not self.github_client:
raise MCPError("GitHub client not initialized")
projects = await self.github_client.get_projects()
return json.dumps({
"projects": [
{
"id": project["id"],
"name": project["name"],
"web_url": project["web_url"],
"default_branch": project["default_branch"]
}
for project in projects
]
}, indent=2)
else:
raise MCPError(f"Unknown resource URI: {uri}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
raise MCPError(f"Failed to read resource: {str(e)}")
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available MCP tools"""
return [
Tool(
name="create_branch_for_issue",
description="Create a GitLab branch for a Jira issue and link them",
inputSchema={
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Jira issue key (e.g., PROJ-123)"
},
"project_id": {
"type": "integer",
"description": "GitLab project ID"
},
"base_branch": {
"type": "string",
"description": "Base branch to create from (default: main)",
"default": "main"
}
},
"required": ["issue_key", "project_id"]
}
),
Tool(
name="get_jira_issues",
description="Fetch Jira issues using JQL query",
inputSchema={
"type": "object",
"properties": {
"jql": {
"type": "string",
"description": "JQL query string (optional)",
"default": f"project = {self.project_key} AND status = 'To Do'"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 50
}
},
"required": []
}
),
Tool(
name="comment_on_issue",
description="Add a comment to a Jira issue",
inputSchema={
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Jira issue key"
},
"comment": {
"type": "string",
"description": "Comment text to add"
}
},
"required": ["issue_key", "comment"]
}
),
Tool(
name="get_issues_by_tags",
description="Fetch Jira issues by project and tags (labels)",
inputSchema={
"type": "object",
"properties": {
"project_key": {
"type": "string",
"description": "Jira project key (e.g., PROJ, DEMO)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of tags/labels to filter by (e.g., ['AI-Fix', 'AutoFix'])",
"minItems": 1,
"maxItems": 10
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 50,
"minimum": 1,
"maximum": 100
}
},
"required": ["project_key", "tags"]
}
),
Tool(
name="analyze_and_fix_issue",
description="Use AI to analyze a Jira issue and generate code fixes",
inputSchema={
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Jira issue key to analyze and fix"
},
"model": {
"type": "string",
"description": "AI model to use for analysis",
"default": "gpt-4-turbo",
"enum": ["gpt-4-turbo", "gpt-4", "claude-3-sonnet", "claude-3-haiku"]
},
"validation_level": {
"type": "string",
"description": "Code validation strictness",
"default": "basic",
"enum": ["basic", "strict"]
},
"include_context": {
"type": "boolean",
"description": "Include repository context in analysis",
"default": True
}
},
"required": ["issue_key"]
}
)
]
@self.server.call_tool()
@handle_errors
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle MCP tool calls"""
if name == "create_branch_for_issue":
issue_key = arguments["issue_key"]
project_id = arguments["project_id"]
base_branch = arguments.get("base_branch", "main")
if not self.jira_client or not self.github_client:
raise MCPError("Clients not initialized")
# Create branch with correct naming convention: feature/{issueid}-fix
branch_name = f"feature/{issue_key}-fix"
try:
# Create the branch
branch_url = await self.github_client.create_branch(project_id, branch_name, base_branch)
# Add comment to Jira issue
comment = f"GitLab branch created: {branch_url}"
await self.jira_client.comment_issue(issue_key, comment)
return [TextContent(
type="text",
text=f"Successfully created branch '{branch_name}' and linked to issue {issue_key}.\nBranch URL: {branch_url}"
)]
except Exception as e:
logger.error(f"Error creating branch for issue {issue_key}: {e}")
raise MCPError(f"Failed to create branch: {str(e)}")
elif name == "get_jira_issues":
jql = arguments.get("jql", f"project = {self.project_key} AND status = 'To Do'")
max_results = arguments.get("max_results", 50)
if not self.jira_client:
raise MCPError("Jira client not initialized")
try:
issues = await self.jira_client.get_issues(jql, max_results)
result = {
"total": len(issues),
"issues": [
{
"key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": issue["fields"]["assignee"]["displayName"] if issue["fields"]["assignee"] else None,
"priority": issue["fields"]["priority"]["name"] if issue["fields"]["priority"] else None,
"created": issue["fields"]["created"],
"updated": issue["fields"]["updated"]
}
for issue in issues
]
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Error fetching Jira issues: {e}")
raise MCPError(f"Failed to fetch issues: {str(e)}")
elif name == "comment_on_issue":
issue_key = arguments["issue_key"]
comment = arguments["comment"]
if not self.jira_client:
raise MCPError("Jira client not initialized")
try:
await self.jira_client.comment_issue(issue_key, comment)
return [TextContent(
type="text",
text=f"Successfully added comment to issue {issue_key}"
)]
except Exception as e:
logger.error(f"Error commenting on issue {issue_key}: {e}")
raise MCPError(f"Failed to add comment: {str(e)}")
elif name == "get_issues_by_tags":
project_key = arguments["project_key"]
tags = arguments["tags"]
max_results = arguments.get("max_results", 50)
if not self.jira_client:
raise MCPError("Jira client not initialized")
try:
issues = await self.jira_client.get_issues_by_tags(project_key, tags, max_results)
result = {
"project": project_key,
"tags": tags,
"total": len(issues),
"issues": [
{
"key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": issue["fields"]["assignee"]["displayName"] if issue["fields"]["assignee"] else None,
"priority": issue["fields"]["priority"]["name"] if issue["fields"]["priority"] else None,
"created": issue["fields"]["created"],
"updated": issue["fields"]["updated"]
}
for issue in issues
]
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Error fetching issues by tags: {e}")
raise MCPError(f"Failed to fetch issues by tags: {str(e)}")
elif name == "analyze_and_fix_issue":
issue_key = arguments["issue_key"]
model = arguments.get("model", "gpt-4-turbo")
validation_level = arguments.get("validation_level", "basic")
include_context = arguments.get("include_context", True)
if not self.jira_client:
raise MCPError("Jira client not initialized")
try:
# Get issue details
issue = await self.jira_client.get_issue(issue_key)
if not issue:
raise MCPError(f"Issue {issue_key} not found")
# Use real OpenAI analysis if configured
if self.openai_client:
try:
analysis_result = await self.openai_client.analyze_issue(
issue, include_context, validation_level
)
# Format response
response = {
"issue_key": analysis_result.issue_key,
"summary": issue["fields"]["summary"],
"description": issue["fields"]["description"],
"analysis": analysis_result.analysis,
"suggested_fixes": analysis_result.suggested_fixes,
"confidence_score": analysis_result.confidence_score,
"model_used": analysis_result.model_used,
"tokens_used": analysis_result.tokens_used,
"cost_estimate": analysis_result.cost_estimate,
"validation_level": validation_level
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
except OpenAIError as e:
logger.warning(f"OpenAI analysis failed: {e}, falling back to simulation")
# Fall through to simulation
except Exception as e:
logger.warning(f"AI analysis error: {e}, falling back to simulation")
# Fall through to simulation
# Fallback: simulated analysis
analysis = {
"issue_key": issue_key,
"summary": issue["fields"]["summary"],
"description": issue["fields"]["description"],
"analysis": f"Simulated AI analysis (OpenAI not configured or failed)",
"suggested_fixes": [
{
"path": "src/main.py",
"content": f"# AI-generated fix for {issue_key}\n# {issue['fields']['summary']}\nprint('Fixed issue')",
"action": "update",
"language": "python",
"description": "Example fix implementation"
}
],
"confidence_score": 0.5,
"validation_level": validation_level,
"model_used": "simulation",
"tokens_used": 0,
"cost_estimate": 0.0
}
return [TextContent(
type="text",
text=json.dumps(analysis, indent=2)
)]
except Exception as e:
logger.error(f"Error analyzing issue {issue_key}: {e}")
raise MCPError(f"Failed to analyze issue: {str(e)}")
else:
raise MCPError(f"Unknown tool: {name}")
async def initialize_clients(self):
"""Initialize API clients"""
try:
# Initialize clients
self.jira_client = JiraClient(self.config["jira"])
self.github_client = GitHubClient(self.config["github"])
# Initialize OpenAI client if configured
if "ai" in self.config and self.config["ai"].get("provider") == "openai":
try:
self.openai_client = OpenAIClient(self.config["ai"])
await self.openai_client.test_connection()
logger.info("OpenAI client initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize OpenAI client: {e}")
logger.info("Continuing without AI capabilities")
self.openai_client = None
else:
logger.info("OpenAI not configured, AI analysis will use simulation")
self.openai_client = None
# Test connections
await self.jira_client.test_connection()
await self.github_client.test_connection()
logger.info("All clients initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize clients: {e}")
raise
async def handle_websocket_message(self, websocket: WebSocketServerProtocol, message: str):
"""Handle incoming WebSocket message"""
request_id = None
try:
# Parse message
data = json.loads(message)
request_id = data.get("request_id", str(uuid.uuid4()))
message_type = data.get("type")
response = {
"request_id": request_id,
"type": "response"
}
if message_type == "list_tools":
# Call the list_tools handler directly
tools = await self._get_tools()
response["data"] = [tool.model_dump() for tool in tools]
elif message_type == "list_resources":
# Call the list_resources handler directly
resources = await self._get_resources()
response["data"] = resources
elif message_type == "read_resource":
uri = data.get("uri")
if not uri:
raise MCPError("Missing resource URI")
content = await self._read_resource(uri)
response["data"] = content
elif message_type == "call_tool":
tool_name = data.get("name")
arguments = data.get("arguments", {})
if not tool_name:
raise MCPError("Missing tool name")
result = await self._call_tool(tool_name, arguments)
response["data"] = [content.model_dump() for content in result]
else:
raise MCPError(f"Unknown message type: {message_type}")
response["success"] = True
except Exception as e:
logger.error(f"Error handling WebSocket message: {e}")
response = {
"request_id": request_id or str(uuid.uuid4()),
"type": "error",
"success": False,
"error": str(e)
}
# Send response
await websocket.send(json.dumps(response))
async def _get_tools(self) -> List[Tool]:
"""Get list of available tools"""
return [
Tool(
name="create_branch_for_issue",
description="Create a GitLab branch for a Jira issue and link them",
inputSchema={
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Jira issue key (e.g., PROJ-123)"
},
"project_id": {
"type": "integer",
"description": "GitLab project ID"
},
"base_branch": {
"type": "string",
"description": "Base branch to create from (default: main)",
"default": "main"
}
},
"required": ["issue_key", "project_id"]
}
),
Tool(
name="get_jira_issues",
description="Fetch Jira issues using JQL query",
inputSchema={
"type": "object",
"properties": {
"jql": {
"type": "string",
"description": "JQL query string (optional)",
"default": f"project = {self.project_key} AND status = 'To Do'"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 50
}
},
"required": []
}
),
Tool(
name="comment_on_issue",
description="Add a comment to a Jira issue",
inputSchema={
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Jira issue key"
},
"comment": {
"type": "string",
"description": "Comment text to add"
}
},
"required": ["issue_key", "comment"]
}
),
Tool(
name="get_issues_by_tags",
description="Fetch Jira issues by project and tags (labels)",
inputSchema={
"type": "object",
"properties": {
"project_key": {
"type": "string",
"description": "Jira project key (e.g., PROJ, DEMO)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of tags/labels to filter by (e.g., ['AI-Fix', 'AutoFix'])",
"minItems": 1,
"maxItems": 10
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 50,
"minimum": 1,
"maximum": 100
}
},
"required": ["project_key", "tags"]
}
),
Tool(
name="analyze_and_fix_issue",
description="Use AI to analyze a Jira issue and generate code fixes",
inputSchema={
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Jira issue key to analyze and fix"
},
"model": {
"type": "string",
"description": "AI model to use for analysis",
"default": "gpt-4-turbo",
"enum": ["gpt-4-turbo", "gpt-4", "claude-3-sonnet", "claude-3-haiku"]
},
"validation_level": {
"type": "string",
"description": "Code validation strictness",
"default": "basic",
"enum": ["basic", "strict"]
},
"include_context": {
"type": "boolean",
"description": "Include repository context in analysis",
"default": True
}
},
"required": ["issue_key"]
}
)
]
async def _get_resources(self) -> List[Dict[str, Any]]:
"""Get list of available resources"""
return [
{
"uri": "jira://issues",
"name": "Jira Issues",
"description": "Access to Jira issues in the configured project",
"mimeType": "application/json"
},
{
"uri": "github://projects",
"name": "GitHub Projects",
"description": "Access to GitHub repositories and branches",
"mimeType": "application/json"
}
]
async def _read_resource(self, uri: str) -> str:
"""Read resource content"""
try:
if uri == "jira://issues":
if not self.jira_client:
raise MCPError("Jira client not initialized")
issues = await self.jira_client.get_issues()
return json.dumps({
"issues": [
{
"key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": issue["fields"]["assignee"]["displayName"] if issue["fields"]["assignee"] else None,
"created": issue["fields"]["created"],
"updated": issue["fields"]["updated"]
}
for issue in issues
]
}, indent=2)
elif uri == "github://projects":
if not self.github_client:
raise MCPError("GitHub client not initialized")
projects = await self.github_client.get_projects()
return json.dumps({
"projects": [
{
"id": project["id"],
"name": project["name"],
"web_url": project["web_url"],
"default_branch": project["default_branch"]
}
for project in projects
]
}, indent=2)
else:
raise MCPError(f"Unknown resource URI: {uri}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
raise MCPError(f"Failed to read resource: {str(e)}")
async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Call tool with arguments"""
if name == "get_jira_issues":
jql = arguments.get("jql", f"project = {self.project_key} AND status = 'To Do'")
max_results = arguments.get("max_results", 50)
if not self.jira_client:
raise MCPError("Jira client not initialized")
try:
issues = await self.jira_client.get_issues(jql, max_results)
result = {
"total": len(issues),
"issues": [
{
"key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": issue["fields"]["assignee"]["displayName"] if issue["fields"]["assignee"] else None,
"priority": issue["fields"]["priority"]["name"] if issue["fields"]["priority"] else None,
"created": issue["fields"]["created"],
"updated": issue["fields"]["updated"]
}
for issue in issues
]
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Error fetching Jira issues: {e}")
raise MCPError(f"Failed to fetch issues: {str(e)}")
elif name == "comment_on_issue":
issue_key = arguments["issue_key"]
comment = arguments["comment"]
if not self.jira_client:
raise MCPError("Jira client not initialized")
try:
await self.jira_client.comment_issue(issue_key, comment)
return [TextContent(
type="text",
text=f"Successfully added comment to issue {issue_key}"
)]
except Exception as e:
logger.error(f"Error commenting on issue {issue_key}: {e}")
raise MCPError(f"Failed to add comment: {str(e)}")
else:
# For now, return a simulated response for other tools
return [TextContent(
type="text",
text=f"Tool '{name}' executed successfully (simulated response)"
)]
async def handle_client_connection(self, websocket: WebSocketServerProtocol, path: str):
"""Handle new WebSocket client connection"""
client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}"
logger.info(f"New WebSocket connection from {client_id}")
# Add to connected clients
self.connected_clients.add(websocket)
try:
# Send welcome message
welcome = {
"type": "welcome",
"server": "jira-gitlab-mcp-ws",
"version": "1.0.0",
"capabilities": {
"tools": True,
"resources": True
}
}
await websocket.send(json.dumps(welcome))
# Handle messages
async for message in websocket:
await self.handle_websocket_message(websocket, message)
except ConnectionClosed:
logger.info(f"WebSocket connection closed for {client_id}")
except WebSocketException as e:
logger.error(f"WebSocket error for {client_id}: {e}")
except Exception as e:
logger.error(f"Unexpected error for {client_id}: {e}")
finally:
# Remove from connected clients
self.connected_clients.discard(websocket)
logger.info(f"Cleaned up connection for {client_id}")
async def start_server(self):
"""Start the WebSocket server"""
ws_config = self.config["server"]["websocket"]
host = ws_config["host"]
port = ws_config["port"]
logger.info(f"Starting WebSocket MCP server on {host}:{port}")
# Initialize clients first
await self.initialize_clients()
# Start WebSocket server
async with serve(
self.handle_client_connection,
host,
port,
ping_interval=ws_config.get("ping_interval", 20),
ping_timeout=ws_config.get("ping_timeout", 10),
max_size=ws_config.get("max_message_size", 1024 * 1024), # 1MB default
) as server:
logger.info(f"WebSocket MCP server running on ws://{host}:{port}")
# Keep server running
await asyncio.Future() # Run forever
async def main():
"""Main entry point for WebSocket server"""
try:
# Load configuration
config = load_config()
# Create and start WebSocket server
ws_server = WebSocketMCPServer(config)
await ws_server.start_server()
except KeyboardInterrupt:
logger.info("Server shutdown requested")
except Exception as e:
logger.error(f"Failed to start WebSocket server: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())