jira_github_agent.py•12.9 kB
#!/usr/bin/env python3
"""
Jira-GitHub Agent
Automated agent that monitors Jira issues and creates GitHub branches
"""
import asyncio
import json
import logging
import random
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import yaml
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
class JiraGitHubAgent:
"""Agent that monitors Jira issues and creates GitHub branches automatically"""
def __init__(self, config_path: str = "agents/agent_config.yaml"):
self.config_path = config_path
self.websocket = None
self.running = False
self.processed_issues = set()
# Initialize basic logger first
self.logger = logging.getLogger(__name__)
# Load config
self.config = self._load_config()
# Setup logging with config
logging.basicConfig(
level=getattr(logging, self.config['agent']['log_level']),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def _load_config(self) -> Dict:
"""Load configuration from YAML file"""
try:
with open(self.config_path, 'r') as f:
config = yaml.safe_load(f)
print(f"Configuration loaded from {self.config_path}")
return config
except FileNotFoundError:
print(f"Configuration file not found: {self.config_path}")
raise
except yaml.YAMLError as e:
print(f"Error parsing YAML configuration: {e}")
raise
async def connect_mcp(self) -> bool:
"""Connect to MCP server with retry logic"""
mcp_config = self.config['mcp']
for attempt in range(mcp_config['retries']):
try:
self.logger.info(f"Connecting to MCP server at {mcp_config['host']} (attempt {attempt + 1})")
self.websocket = await asyncio.wait_for(
websockets.connect(mcp_config['host']),
timeout=mcp_config['timeout']
)
self.logger.info("Successfully connected to MCP server")
return True
except Exception as e:
self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < mcp_config['retries'] - 1:
await asyncio.sleep(mcp_config['retry_delay'])
self.logger.error("Failed to connect to MCP server after all retries")
return False
async def send_mcp_request(self, method: str, params: Dict) -> Optional[Dict]:
"""Send request to MCP server and get response"""
if not self.websocket:
self.logger.error("No MCP connection available")
return None
request = {
"jsonrpc": "2.0",
"id": int(time.time() * 1000),
"method": method,
"params": params
}
try:
await self.websocket.send(json.dumps(request))
response = await self.websocket.recv()
return json.loads(response)
except (ConnectionClosed, WebSocketException) as e:
self.logger.error(f"WebSocket error: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode error: {e}")
return None
async def get_monitored_issues(self) -> List[Dict]:
"""Get Jira issues that match monitoring criteria"""
jira_config = self.config['jira']
# Build JQL query
status_list = ', '.join([f"'{s}'" for s in jira_config['status_filters']])
jql_parts = [
f"project = {jira_config['project_key']}",
f"status IN ({status_list})"
]
if jira_config['monitored_tags']:
labels_query = ' OR '.join([f"labels = '{tag}'" for tag in jira_config['monitored_tags']])
jql_parts.append(f"({labels_query})")
jql = " AND ".join(jql_parts)
self.logger.debug(f"JQL Query: {jql}")
response = await self.send_mcp_request("tools/call", {
"name": "get_jira_issues",
"arguments": {
"jql": jql,
"max_results": 50
}
})
if response and "result" in response:
try:
result_data = json.loads(response["result"]["content"][0]["text"])
return result_data.get("issues", [])
except (KeyError, IndexError, json.JSONDecodeError) as e:
self.logger.error(f"Error parsing issues response: {e}")
return []
return []
def generate_branch_name(self, issue_key: str, attempt: int = 0) -> str:
"""Generate unique branch name with suffix"""
github_config = self.config['github']
prefix = github_config['branch_prefix']
if attempt == 0:
# First attempt: use random suffix
suffix = random.choice(github_config['branch_suffixes'])
else:
# Subsequent attempts: add number
suffix = f"{random.choice(github_config['branch_suffixes'])}-{attempt}"
return f"{prefix}{issue_key}-{suffix}"
async def create_github_branch(self, issue: Dict) -> bool:
"""Create GitHub branch for Jira issue with retry logic"""
issue_key = issue['key']
github_config = self.config['github']
# Try creating branch with different names if conflicts occur
for attempt in range(3):
branch_name = self.generate_branch_name(issue_key, attempt)
if self.config['agent']['dry_run']:
self.logger.info(f"DRY RUN: Would create branch '{branch_name}' for issue {issue_key}")
return True
self.logger.info(f"Creating GitHub branch '{branch_name}' for issue {issue_key}")
response = await self.send_mcp_request("tools/call", {
"name": "create_branch_for_issue",
"arguments": {
"issue_key": issue_key,
"project_id": f"{github_config['repo_owner']}/{github_config['repo_name']}",
"base_branch": github_config['base_branch'],
"branch_name": branch_name
}
})
if response and "result" in response:
self.logger.info(f"Successfully created branch '{branch_name}' for issue {issue_key}")
return True
elif response and "error" in response:
error_msg = response["error"].get("message", "Unknown error")
if "already exists" in error_msg.lower() and attempt < 2:
self.logger.warning(f"Branch name conflict, trying alternative: {error_msg}")
continue
else:
self.logger.error(f"Failed to create branch for {issue_key}: {error_msg}")
return False
else:
self.logger.error(f"Unexpected response for branch creation: {response}")
return False
return False
async def update_jira_status(self, issue_key: str) -> bool:
"""Update Jira issue status after branch creation"""
target_status = self.config['agent']['jira_status_on_branch_create']
if self.config['agent']['dry_run']:
self.logger.info(f"DRY RUN: Would update {issue_key} status to '{target_status}'")
return True
self.logger.info(f"Updating {issue_key} status to '{target_status}'")
response = await self.send_mcp_request("tools/call", {
"name": "update_issue_status",
"arguments": {
"issue_key": issue_key,
"status": target_status,
"comment": f"GitHub branch created automatically by Jira-GitHub Agent at {datetime.now().isoformat()}"
}
})
if response and "result" in response:
self.logger.info(f"Successfully updated {issue_key} status")
return True
else:
self.logger.error(f"Failed to update {issue_key} status: {response}")
return False
async def process_issue(self, issue: Dict) -> bool:
"""Process a single Jira issue"""
issue_key = issue['key']
try:
# Create GitHub branch
branch_created = await self.create_github_branch(issue)
if branch_created:
# Update Jira status
status_updated = await self.update_jira_status(issue_key)
if status_updated:
self.processed_issues.add(issue_key)
self.logger.info(f"Successfully processed issue {issue_key}")
return True
else:
self.logger.warning(f"Branch created but failed to update status for {issue_key}")
return False
else:
self.logger.error(f"Failed to create branch for {issue_key}")
return False
except Exception as e:
self.logger.error(f"Error processing issue {issue_key}: {e}")
return False
async def monitor_loop(self):
"""Main monitoring loop"""
check_interval = self.config['jira']['check_interval']
max_concurrent = self.config['agent']['max_concurrent_issues']
self.logger.info(f"Starting monitoring loop (check every {check_interval}s)")
while self.running:
try:
# Get issues to process
issues = await self.get_monitored_issues()
if not issues:
self.logger.debug("No issues found matching criteria")
else:
self.logger.info(f"Found {len(issues)} issues to evaluate")
# Filter out already processed issues
new_issues = [
issue for issue in issues
if issue['key'] not in self.processed_issues
]
if new_issues:
self.logger.info(f"Processing {len(new_issues)} new issues")
# Process issues with concurrency limit
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(issue):
async with semaphore:
return await self.process_issue(issue)
tasks = [process_with_semaphore(issue) for issue in new_issues]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if r is True)
self.logger.info(f"Processed {successful}/{len(new_issues)} issues successfully")
else:
self.logger.debug("No new issues to process")
# Wait before next check
await asyncio.sleep(check_interval)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {e}")
if not self.config['agent']['continue_on_error']:
break
await asyncio.sleep(30) # Wait before retrying
async def start(self):
"""Start the agent"""
self.logger.info("Starting Jira-GitHub Agent")
# Connect to MCP server
if not await self.connect_mcp():
self.logger.error("Failed to connect to MCP server, exiting")
return
self.running = True
try:
await self.monitor_loop()
except KeyboardInterrupt:
self.logger.info("Received interrupt signal")
finally:
await self.stop()
async def stop(self):
"""Stop the agent"""
self.logger.info("Stopping Jira-GitHub Agent")
self.running = False
if self.websocket:
await self.websocket.close()
self.logger.info("MCP connection closed")
async def main():
"""Main entry point"""
agent = JiraGitHubAgent()
await agent.start()
if __name__ == "__main__":
asyncio.run(main())