Skip to main content
Glama

Regression-JIRA Integration System

by nanyang12138
server.py22.9 kB
""" Regression-JIRA MCP Server Main entry point for the MCP server. Provides unified PostgreSQL and JIRA tools for regression test analysis. """ import os import json import sys from typing import Optional from mcp.server import Server from mcp.types import Tool, TextContent from dotenv import load_dotenv from .db_queries import RegressionDB from .jira_client import JiraClient from .log_analyzer import LogAnalyzer from .error_matcher import ErrorMatcher from .utils import extract_keywords_from_test_name # Load environment variables load_dotenv() # Initialize server app = Server("regression-jira-mcp") # Global instances db: Optional[RegressionDB] = None jira: Optional[JiraClient] = None log_analyzer: Optional[LogAnalyzer] = None error_matcher: Optional[ErrorMatcher] = None def initialize(): """Initialize all clients""" global db, jira, log_analyzer, error_matcher try: db = RegressionDB() jira = JiraClient() log_analyzer = LogAnalyzer( max_lines=None, # Scan entire log file, no limit ends_only=None # Scan entire file, not just ends ) error_matcher = ErrorMatcher() except Exception as e: print(f"Error initializing: {str(e)}", file=sys.stderr) raise # ============================================================================ # PostgreSQL Tools # ============================================================================ @app.list_tools() async def list_tools() -> list[Tool]: """List all available tools""" return [ Tool( name="query_failed_tests", description="Query failed tests from PostgreSQL database", inputSchema={ "type": "object", "properties": { "regression_run_id": { "type": "integer", "description": "Regression run ID (optional if project/regression names provided)" }, "project_name": { "type": "string", "description": "Project name (optional)" }, "regression_name": { "type": "string", "description": "Regression name (optional)" }, "limit": { "type": "integer", "description": "Maximum number of results (optional, omit for no limit)" }, "include_logs": { "type": "boolean", "description": "Whether to analyze log files (default: true)", "default": True } } } ), Tool( name="get_test_details", description="Get detailed information for a specific test", inputSchema={ "type": "object", "properties": { "test_name": { "type": "string", "description": "Test name" }, "regression_run_id": { "type": "integer", "description": "Regression run ID (optional)" }, "analyze_logs": { "type": "boolean", "description": "Whether to analyze log files (default: true)", "default": True } }, "required": ["test_name"] } ), Tool( name="get_regression_summary", description="Get summary statistics for a regression run", inputSchema={ "type": "object", "properties": { "regression_run_id": { "type": "integer", "description": "Regression run ID" } }, "required": ["regression_run_id"] } ), Tool( name="analyze_test_log", description="Analyze test log file to extract error information", inputSchema={ "type": "object", "properties": { "test_name": { "type": "string", "description": "Test name" }, "regression_run_id": { "type": "integer", "description": "Regression run ID" } }, "required": ["test_name", "regression_run_id"] } ), Tool( name="search_jira_issues", description="Search JIRA issues using JQL query", inputSchema={ "type": "object", "properties": { "jql": { "type": "string", "description": "JQL query string" }, "max_results": { "type": "integer", "description": "Maximum results (default: 50)", "default": 50 } }, "required": ["jql"] } ), Tool( name="search_jira_by_text", description="Simple text search in JIRA", inputSchema={ "type": "object", "properties": { "search_text": { "type": "string", "description": "Text to search for" }, "status_filter": { "type": "string", "description": "Filter by status (e.g., 'Resolved', 'Closed')" }, "project_filter": { "type": "string", "description": "Filter by project key" }, "max_results": { "type": "integer", "description": "Maximum results (default: 20)", "default": 20 } }, "required": ["search_text"] } ), Tool( name="get_jira_issue", description="Get detailed information for a specific JIRA issue", inputSchema={ "type": "object", "properties": { "issue_key": { "type": "string", "description": "JIRA issue key (e.g., 'PROJ-1234')" }, "include_comments": { "type": "boolean", "description": "Include comments (default: true)", "default": True } }, "required": ["issue_key"] } ), Tool( name="find_solutions_for_test", description="🌟 One-click solution finder: Find JIRA solutions for a failed test", inputSchema={ "type": "object", "properties": { "test_name": { "type": "string", "description": "Test name" }, "regression_run_id": { "type": "integer", "description": "Regression run ID (optional)" }, "max_jira_results": { "type": "integer", "description": "Maximum JIRA results (default: 10)", "default": 10 } }, "required": ["test_name"] } ), Tool( name="batch_find_solutions", description="Batch find JIRA solutions for multiple failed tests", inputSchema={ "type": "object", "properties": { "regression_run_id": { "type": "integer", "description": "Regression run ID" }, "limit": { "type": "integer", "description": "Number of failed tests to process (default: 10)", "default": 10 } }, "required": ["regression_run_id"] } ), Tool( name="list_regression_runs", description="List recent regression runs", inputSchema={ "type": "object", "properties": { "project_name": { "type": "string", "description": "Filter by project name (optional)" }, "limit": { "type": "integer", "description": "Maximum results (optional, omit for no limit)" } } } ) ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" try: if name == "query_failed_tests": result = await query_failed_tests_tool(arguments) elif name == "get_test_details": result = await get_test_details_tool(arguments) elif name == "get_regression_summary": result = await get_regression_summary_tool(arguments) elif name == "analyze_test_log": result = await analyze_test_log_tool(arguments) elif name == "search_jira_issues": result = await search_jira_issues_tool(arguments) elif name == "search_jira_by_text": result = await search_jira_by_text_tool(arguments) elif name == "get_jira_issue": result = await get_jira_issue_tool(arguments) elif name == "find_solutions_for_test": result = await find_solutions_for_test_tool(arguments) elif name == "batch_find_solutions": result = await batch_find_solutions_tool(arguments) elif name == "list_regression_runs": result = await list_regression_runs_tool(arguments) else: raise ValueError(f"Unknown tool: {name}") return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: error_result = {"error": str(e), "tool": name} return [TextContent(type="text", text=json.dumps(error_result, indent=2))] # ============================================================================ # Tool Implementations # ============================================================================ async def query_failed_tests_tool(args: dict) -> dict: """Query failed tests from database""" regression_run_id = args.get('regression_run_id') project_name = args.get('project_name') regression_name = args.get('regression_name') limit = args.get('limit', 10) include_logs = args.get('include_logs', True) tests = db.query_failed_tests( regression_run_id=regression_run_id, project_name=project_name, regression_name=regression_name, limit=limit ) # Optionally analyze logs if include_logs and tests: for test in tests: try: log_path = db.get_log_file_path( test['test_object_run_id'], test.get('regression_run_id', regression_run_id) ) if log_path and os.path.exists(log_path): error_sig = log_analyzer.analyze_failure( log_path, test.get('block_name'), test['test_name'] ) test['error_analysis'] = { 'signature': error_sig.signature, 'keywords': error_sig.error_keywords, 'error_level': error_sig.error_level, 'tool': error_sig.tool } test['log_file'] = log_path except Exception as e: test['error_analysis'] = {'error': str(e)} return { 'total_failed': len(tests), 'regression_run_id': regression_run_id, 'tests': tests } async def get_test_details_tool(args: dict) -> dict: """Get test details with optional log analysis""" test_name = args['test_name'] regression_run_id = args.get('regression_run_id') analyze_logs = args.get('analyze_logs', True) test_info = db.get_test_by_name( test_name, regression_run_id=regression_run_id ) if not test_info: return {'error': f'Test {test_name} not found'} # Get log analysis if requested if analyze_logs and test_info.get('failed_job_run_ref'): try: log_path = db.get_log_file_path( test_info['test_object_run_id'], test_info['regression_run_id'] ) if log_path and os.path.exists(log_path): error_sig = log_analyzer.analyze_failure( log_path, test_info.get('block_name'), test_name ) test_info['error_analysis'] = error_sig.to_dict() test_info['log_file'] = log_path except Exception as e: test_info['error_analysis'] = {'error': str(e)} return test_info async def get_regression_summary_tool(args: dict) -> dict: """Get regression run summary""" regression_run_id = args['regression_run_id'] return db.get_regression_summary(regression_run_id) async def analyze_test_log_tool(args: dict) -> dict: """Analyze test log file""" test_name = args['test_name'] regression_run_id = args['regression_run_id'] # Get test info test_info = db.get_test_by_name(test_name, regression_run_id=regression_run_id) if not test_info: return {'error': f'Test {test_name} not found'} # Get log file path log_path = db.get_log_file_path( test_info['test_object_run_id'], regression_run_id ) if not log_path: return {'error': 'Log file path not found in database'} if not os.path.exists(log_path): return { 'error': 'Log file not accessible', 'log_path': log_path, 'note': 'Using test name for keyword extraction', 'keywords': extract_keywords_from_test_name(test_name) } # Analyze log error_sig = log_analyzer.analyze_failure( log_path, test_info.get('block_name'), test_name ) return { 'test_name': test_name, 'log_file': log_path, 'analysis': error_sig.to_dict(), 'log_tail': log_analyzer.get_log_tail(log_path, num_lines=50) } async def search_jira_issues_tool(args: dict) -> dict: """Search JIRA issues""" jql = args['jql'] max_results = args.get('max_results', 50) issues = jira.search_issues(jql, max_results=max_results) return { 'total': len(issues), 'query': jql, 'issues': issues } async def search_jira_by_text_tool(args: dict) -> dict: """Simple JIRA text search""" search_text = args['search_text'] status_filter = args.get('status_filter') project_filter = args.get('project_filter') max_results = args.get('max_results', 20) issues = jira.search_by_text( search_text, status_filter=status_filter, project_filter=project_filter, max_results=max_results ) return { 'total': len(issues), 'search_text': search_text, 'issues': issues } async def get_jira_issue_tool(args: dict) -> dict: """Get JIRA issue details""" issue_key = args['issue_key'] include_comments = args.get('include_comments', True) issue = jira.get_issue(issue_key, include_comments=include_comments) if not issue: return {'error': f'Issue {issue_key} not found'} # Extract solution solution = jira.extract_solution_from_issue(issue) if solution: issue['solution_summary'] = solution return issue async def find_solutions_for_test_tool(args: dict) -> dict: """ 🌟 ONE-CLICK SOLUTION FINDER The most powerful tool - combines PostgreSQL query, log analysis, and intelligent JIRA matching in one call. """ test_name = args['test_name'] regression_run_id = args.get('regression_run_id') max_jira_results = args.get('max_jira_results', 10) # Step 1: Get test info from database test_info = db.get_test_by_name(test_name, regression_run_id=regression_run_id) if not test_info: return {'error': f'Test {test_name} not found'} # Step 2: Analyze log file error_keywords = [] error_signature = "" log_analysis = None if test_info.get('failed_job_run_ref'): try: log_path = db.get_log_file_path( test_info['test_object_run_id'], test_info['regression_run_id'] ) if log_path and os.path.exists(log_path): error_sig = log_analyzer.analyze_failure( log_path, test_info.get('block_name'), test_name ) error_keywords = error_sig.error_keywords error_signature = error_sig.signature log_analysis = error_sig.to_dict() else: # Fallback to test name keywords error_keywords = extract_keywords_from_test_name(test_name) error_signature = f"Test {test_name} failed (log not accessible)" except Exception as e: error_keywords = extract_keywords_from_test_name(test_name) error_signature = f"Test {test_name} failed (log analysis error: {str(e)})" else: error_keywords = extract_keywords_from_test_name(test_name) error_signature = f"Test {test_name} failed" # Step 3: Search JIRA jira_results = [] if error_keywords: # Build JQL query jql = error_matcher.build_jira_jql( keywords=error_keywords[:5], status_filter='Resolved' ) try: jira_results = jira.search_issues(jql, max_results=max_jira_results * 2) except Exception as e: jira_results = [] # Step 4: Intelligent matching and ranking matched_issues = error_matcher.match_jira_issues( error_signature, error_keywords, jira_results, min_score=0.3, max_results=max_jira_results ) # Convert matches to dict jira_matches = [match.to_dict() for match in matched_issues] return { 'test_info': { 'test_name': test_name, 'status': test_info.get('status'), 'block_name': test_info.get('block_name'), 'regression_run_id': test_info.get('regression_run_id'), 'num_errors': test_info.get('num_error'), 'num_warnings': test_info.get('num_warning') }, 'error_analysis': { 'signature': error_signature, 'keywords': error_keywords, 'log_analysis': log_analysis }, 'jira_matches': jira_matches, 'summary': { 'total_jira_found': len(jira_results), 'relevant_matches': len(jira_matches), 'top_match_score': jira_matches[0]['similarity_score'] if jira_matches else 0 } } async def batch_find_solutions_tool(args: dict) -> dict: """Batch find solutions for multiple failed tests""" regression_run_id = args['regression_run_id'] limit = args.get('limit', 10) # Get failed tests failed_tests = db.query_failed_tests( regression_run_id=regression_run_id, limit=limit ) results = [] tests_with_solutions = 0 for test in failed_tests: # Find solutions for each test try: solution_result = await find_solutions_for_test_tool({ 'test_name': test['test_name'], 'regression_run_id': regression_run_id, 'max_jira_results': 5 }) has_solution = len(solution_result.get('jira_matches', [])) > 0 if has_solution: tests_with_solutions += 1 results.append({ 'test_name': test['test_name'], 'has_solution': has_solution, 'jira_matches': solution_result.get('jira_matches', [])[:3], # Top 3 'error_keywords': solution_result.get('error_analysis', {}).get('keywords', []) }) except Exception as e: results.append({ 'test_name': test['test_name'], 'has_solution': False, 'error': str(e) }) return { 'regression_run_id': regression_run_id, 'total_failed_tests': len(failed_tests), 'processed': len(results), 'tests_with_solutions': tests_with_solutions, 'tests_without_solutions': len(results) - tests_with_solutions, 'results': results } async def list_regression_runs_tool(args: dict) -> dict: """List regression runs""" project_name = args.get('project_name') limit = args.get('limit') # None if not provided runs = db.list_regression_runs(project_name=project_name, limit=limit) return { 'total': len(runs), 'project_filter': project_name, 'limit_used': limit, 'runs': runs } # ============================================================================ # Server Lifecycle # ============================================================================ def main(): """Main entry point""" import asyncio from mcp.server.stdio import stdio_server # Initialize clients initialize() async def run(): async with stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options()) try: asyncio.run(run()) finally: # Cleanup if db: db.close() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nanyang12138/regression-jira-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server