Skip to main content
Glama

Jira-GitLab MCP Server

by gabbar910
jira_github_agent.py12.9 kB
#!/usr/bin/env python3 """ Jira-GitHub Agent Automated agent that monitors Jira issues and creates GitHub branches """ import asyncio import json import logging import random import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional import yaml import websockets from websockets.exceptions import ConnectionClosed, WebSocketException class JiraGitHubAgent: """Agent that monitors Jira issues and creates GitHub branches automatically""" def __init__(self, config_path: str = "agents/agent_config.yaml"): self.config_path = config_path self.websocket = None self.running = False self.processed_issues = set() # Initialize basic logger first self.logger = logging.getLogger(__name__) # Load config self.config = self._load_config() # Setup logging with config logging.basicConfig( level=getattr(logging, self.config['agent']['log_level']), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def _load_config(self) -> Dict: """Load configuration from YAML file""" try: with open(self.config_path, 'r') as f: config = yaml.safe_load(f) print(f"Configuration loaded from {self.config_path}") return config except FileNotFoundError: print(f"Configuration file not found: {self.config_path}") raise except yaml.YAMLError as e: print(f"Error parsing YAML configuration: {e}") raise async def connect_mcp(self) -> bool: """Connect to MCP server with retry logic""" mcp_config = self.config['mcp'] for attempt in range(mcp_config['retries']): try: self.logger.info(f"Connecting to MCP server at {mcp_config['host']} (attempt {attempt + 1})") self.websocket = await asyncio.wait_for( websockets.connect(mcp_config['host']), timeout=mcp_config['timeout'] ) self.logger.info("Successfully connected to MCP server") return True except Exception as e: self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}") if attempt < mcp_config['retries'] - 1: await asyncio.sleep(mcp_config['retry_delay']) self.logger.error("Failed to connect to MCP server after all retries") return False async def send_mcp_request(self, method: str, params: Dict) -> Optional[Dict]: """Send request to MCP server and get response""" if not self.websocket: self.logger.error("No MCP connection available") return None request = { "jsonrpc": "2.0", "id": int(time.time() * 1000), "method": method, "params": params } try: await self.websocket.send(json.dumps(request)) response = await self.websocket.recv() return json.loads(response) except (ConnectionClosed, WebSocketException) as e: self.logger.error(f"WebSocket error: {e}") return None except json.JSONDecodeError as e: self.logger.error(f"JSON decode error: {e}") return None async def get_monitored_issues(self) -> List[Dict]: """Get Jira issues that match monitoring criteria""" jira_config = self.config['jira'] # Build JQL query status_list = ', '.join([f"'{s}'" for s in jira_config['status_filters']]) jql_parts = [ f"project = {jira_config['project_key']}", f"status IN ({status_list})" ] if jira_config['monitored_tags']: labels_query = ' OR '.join([f"labels = '{tag}'" for tag in jira_config['monitored_tags']]) jql_parts.append(f"({labels_query})") jql = " AND ".join(jql_parts) self.logger.debug(f"JQL Query: {jql}") response = await self.send_mcp_request("tools/call", { "name": "get_jira_issues", "arguments": { "jql": jql, "max_results": 50 } }) if response and "result" in response: try: result_data = json.loads(response["result"]["content"][0]["text"]) return result_data.get("issues", []) except (KeyError, IndexError, json.JSONDecodeError) as e: self.logger.error(f"Error parsing issues response: {e}") return [] return [] def generate_branch_name(self, issue_key: str, attempt: int = 0) -> str: """Generate unique branch name with suffix""" github_config = self.config['github'] prefix = github_config['branch_prefix'] if attempt == 0: # First attempt: use random suffix suffix = random.choice(github_config['branch_suffixes']) else: # Subsequent attempts: add number suffix = f"{random.choice(github_config['branch_suffixes'])}-{attempt}" return f"{prefix}{issue_key}-{suffix}" async def create_github_branch(self, issue: Dict) -> bool: """Create GitHub branch for Jira issue with retry logic""" issue_key = issue['key'] github_config = self.config['github'] # Try creating branch with different names if conflicts occur for attempt in range(3): branch_name = self.generate_branch_name(issue_key, attempt) if self.config['agent']['dry_run']: self.logger.info(f"DRY RUN: Would create branch '{branch_name}' for issue {issue_key}") return True self.logger.info(f"Creating GitHub branch '{branch_name}' for issue {issue_key}") response = await self.send_mcp_request("tools/call", { "name": "create_branch_for_issue", "arguments": { "issue_key": issue_key, "project_id": f"{github_config['repo_owner']}/{github_config['repo_name']}", "base_branch": github_config['base_branch'], "branch_name": branch_name } }) if response and "result" in response: self.logger.info(f"Successfully created branch '{branch_name}' for issue {issue_key}") return True elif response and "error" in response: error_msg = response["error"].get("message", "Unknown error") if "already exists" in error_msg.lower() and attempt < 2: self.logger.warning(f"Branch name conflict, trying alternative: {error_msg}") continue else: self.logger.error(f"Failed to create branch for {issue_key}: {error_msg}") return False else: self.logger.error(f"Unexpected response for branch creation: {response}") return False return False async def update_jira_status(self, issue_key: str) -> bool: """Update Jira issue status after branch creation""" target_status = self.config['agent']['jira_status_on_branch_create'] if self.config['agent']['dry_run']: self.logger.info(f"DRY RUN: Would update {issue_key} status to '{target_status}'") return True self.logger.info(f"Updating {issue_key} status to '{target_status}'") response = await self.send_mcp_request("tools/call", { "name": "update_issue_status", "arguments": { "issue_key": issue_key, "status": target_status, "comment": f"GitHub branch created automatically by Jira-GitHub Agent at {datetime.now().isoformat()}" } }) if response and "result" in response: self.logger.info(f"Successfully updated {issue_key} status") return True else: self.logger.error(f"Failed to update {issue_key} status: {response}") return False async def process_issue(self, issue: Dict) -> bool: """Process a single Jira issue""" issue_key = issue['key'] try: # Create GitHub branch branch_created = await self.create_github_branch(issue) if branch_created: # Update Jira status status_updated = await self.update_jira_status(issue_key) if status_updated: self.processed_issues.add(issue_key) self.logger.info(f"Successfully processed issue {issue_key}") return True else: self.logger.warning(f"Branch created but failed to update status for {issue_key}") return False else: self.logger.error(f"Failed to create branch for {issue_key}") return False except Exception as e: self.logger.error(f"Error processing issue {issue_key}: {e}") return False async def monitor_loop(self): """Main monitoring loop""" check_interval = self.config['jira']['check_interval'] max_concurrent = self.config['agent']['max_concurrent_issues'] self.logger.info(f"Starting monitoring loop (check every {check_interval}s)") while self.running: try: # Get issues to process issues = await self.get_monitored_issues() if not issues: self.logger.debug("No issues found matching criteria") else: self.logger.info(f"Found {len(issues)} issues to evaluate") # Filter out already processed issues new_issues = [ issue for issue in issues if issue['key'] not in self.processed_issues ] if new_issues: self.logger.info(f"Processing {len(new_issues)} new issues") # Process issues with concurrency limit semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore(issue): async with semaphore: return await self.process_issue(issue) tasks = [process_with_semaphore(issue) for issue in new_issues] results = await asyncio.gather(*tasks, return_exceptions=True) successful = sum(1 for r in results if r is True) self.logger.info(f"Processed {successful}/{len(new_issues)} issues successfully") else: self.logger.debug("No new issues to process") # Wait before next check await asyncio.sleep(check_interval) except Exception as e: self.logger.error(f"Error in monitoring loop: {e}") if not self.config['agent']['continue_on_error']: break await asyncio.sleep(30) # Wait before retrying async def start(self): """Start the agent""" self.logger.info("Starting Jira-GitHub Agent") # Connect to MCP server if not await self.connect_mcp(): self.logger.error("Failed to connect to MCP server, exiting") return self.running = True try: await self.monitor_loop() except KeyboardInterrupt: self.logger.info("Received interrupt signal") finally: await self.stop() async def stop(self): """Stop the agent""" self.logger.info("Stopping Jira-GitHub Agent") self.running = False if self.websocket: await self.websocket.close() self.logger.info("MCP connection closed") async def main(): """Main entry point""" agent = JiraGitHubAgent() await agent.start() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gabbar910/MCPJiraGitlab'

If you have feedback or need assistance with the MCP directory API, please join our Discord server