Skip to main content
Glama
keides2
by keides2
tools.py17.1 kB
""" MCP Tools for Coverity Connect integration """ import json import csv from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from mcp.server.fastmcp import FastMCP # Global configuration (will be set by main module) config = None def set_config(cfg): """Set the global configuration""" global config config = cfg async def get_coverity_projects() -> str: """ Retrieve list of accessible Coverity projects Based on Black Duck Coverity Connect Web Services API """ try: # Check for covautolib availability try: from covautolib import covautolib_2, covautolib_3 except ImportError: return "Error: covautolib not available. Please check your Python environment." # Import required SOAP libraries from suds.client import Client from suds.wsse import Security, UsernameToken # Construct WebService URL wsdl_url = config.server_url + "/ws/v9/configurationservice?wsdl" # Create SOAP client with authentication client = Client(wsdl_url, proxy=config.proxies) security = Security() auth_key = UsernameToken(config.username, config.auth_key) security.tokens.append(auth_key) client.set_options(wsse=security, proxy=config.proxies) # Retrieve project list projectIdDO = client.factory.create("projectFilterSpecDataObj") projectIdDO.namePattern = "*" results = client.service.getProjects(projectIdDO) projects_info = [] for project in results: projects_info.append({ "project_id": project.projectKey, "project_name": project.id.name, "date_created": str(project.dateCreated), "user_created": project.userCreated }) return f"Found {len(projects_info)} Coverity projects:\n" + \ "\n".join([f"- {p['project_name']} (ID: {p['project_id']})" for p in projects_info]) except Exception as e: return f"Error retrieving projects: {str(e)}" async def get_project_streams(project_name: str) -> str: """ Retrieve streams for a specific project Args: project_name: Name of the Coverity project """ try: try: from covautolib import covautolib_2, covautolib_3 except ImportError: return "Error: covautolib not available" # Use SOAP API to get streams from suds.client import Client from suds.wsse import Security, UsernameToken wsdl_url = config.server_url + "/ws/v9/configurationservice?wsdl" client = Client(wsdl_url, proxy=config.proxies) security = Security() auth_key = UsernameToken(config.username, config.auth_key) security.tokens.append(auth_key) client.set_options(wsse=security, proxy=config.proxies) # Search for project projectIdDO = client.factory.create("projectFilterSpecDataObj") projectIdDO.namePattern = project_name projects = client.service.getProjects(projectIdDO) if not projects: return f"Project '{project_name}' not found" project = projects[0] streams_info = [] if hasattr(project, "streams") and project.streams: for stream in project.streams: streams_info.append({ "stream_name": stream.id.name, "description": getattr(stream, 'description', 'N/A') }) elif hasattr(project, "streamLinks") and project.streamLinks: for stream_link in project.streamLinks: streams_info.append({ "stream_name": stream_link.id.name, "description": "Linked stream" }) if not streams_info: return f"No streams found for project '{project_name}'" return f"Streams in project '{project_name}':\n" + \ "\n".join([f"- {s['stream_name']}" for s in streams_info]) except Exception as e: return f"Error retrieving streams: {str(e)}" async def get_stream_snapshots(stream_name: str, limit: int = 10) -> str: """ Retrieve snapshot history for a stream Args: stream_name: Name of the Coverity stream limit: Maximum number of snapshots to retrieve (default: 10) """ try: try: from covautolib import covautolib_2, covautolib_3 except ImportError: return "Error: covautolib not available" from suds.client import Client from suds.wsse import Security, UsernameToken wsdl_url = config.server_url + "/ws/v9/configurationservice?wsdl" client = Client(wsdl_url, proxy=config.proxies) security = Security() auth_key = UsernameToken(config.username, config.auth_key) security.tokens.append(auth_key) client.set_options(wsse=security, proxy=config.proxies) # Get snapshots for stream streamIdDO = client.factory.create("streamIdDataObj") streamIdDO.name = stream_name results = client.service.getSnapshotsForStream(streamIdDO) # Get latest snapshots up to limit snapshots_info = [] for i, snapshot in enumerate(results[:limit]): snapshots_info.append({ "snapshot_id": snapshot.id, "date_created": str(getattr(snapshot, 'dateCreated', 'N/A')), "user": getattr(snapshot, 'user', 'N/A') }) return f"Latest {len(snapshots_info)} snapshots for stream '{stream_name}':\n" + \ "\n".join([f"- ID: {s['snapshot_id']} ({s['date_created']})" for s in snapshots_info]) except Exception as e: return f"Error retrieving snapshots: {str(e)}" async def analyze_snapshot_defects( stream_name: str, snapshot_id: str, max_defects: int = 100 ) -> str: """ Analyze defects in a specific snapshot Args: stream_name: Name of the Coverity stream snapshot_id: ID of the snapshot to analyze max_defects: Maximum number of defects to analyze (default: 100) """ try: try: from covautolib import covautolib_2, covautolib_3 except ImportError: return "Error: covautolib not available" # Note: Simplified implementation for security # Full implementation would require project name resolution project_name = "Unknown" # In production, resolve from stream from suds.client import Client from suds.wsse import Security, UsernameToken defect_wsdl = config.server_url + "/ws/v9/defectservice?wsdl" client = Client(defect_wsdl, proxy=config.proxies) security = Security() auth_key = UsernameToken(config.username, config.auth_key) security.tokens.append(auth_key) client.set_options(wsse=security, proxy=config.proxies) # Create project reference projectIdDO = client.factory.create("projectIdDataObj") projectIdDO.name = project_name filterSpecDO = client.factory.create("snapshotScopeDefectFilterSpecDataObj") pageSpecDO = client.factory.create("pageSpecDataObj") pageSpecDO.pageSize = min(max_defects, 1000) pageSpecDO.startIndex = 0 snapshotScopeDO = client.factory.create("snapshotScopeSpecDataObj") snapshotScopeDO.showSelector = str(snapshot_id) # Get defects for snapshot try: mergedDefects = client.service.getMergedDefectsForSnapshotScope( projectIdDO, filterSpecDO, pageSpecDO, snapshotScopeDO ) total_defects = mergedDefects.totalNumberOfRecords if total_defects == 0: return f"No defects found in snapshot {snapshot_id}" # Aggregate by severity and checker severity_counts = {"High": 0, "Medium": 0, "Low": 0} checker_counts = {} defects_summary = [] for i, defect_id in enumerate(mergedDefects.mergedDefectIds[:max_defects]): cid = defect_id.cid # In production, get detailed info using additional API calls defects_summary.append(f"CID {cid}") # Simplified aggregation (in production, get actual severity) severity_counts["Medium"] += 1 analysis_result = f""" Snapshot {snapshot_id} Analysis Results: ===================================== Total Defects: {total_defects} Analyzed: {len(defects_summary)} defects Severity Distribution: - High: {severity_counts['High']} - Medium: {severity_counts['Medium']} - Low: {severity_counts['Low']} Sample Defects: {chr(10).join(defects_summary[:10])} {"..." if len(defects_summary) > 10 else ""} """ return analysis_result except Exception as e: return f"Error analyzing snapshot {snapshot_id}: {str(e)}" except Exception as e: return f"Error in snapshot analysis: {str(e)}" async def run_coverity_automation( group: str, project: str, branch: str, commit_message: str = "Manual trigger" ) -> str: """ Execute Coverity automation pipeline Args: group: Project group name project: Project name branch: Branch name commit_message: Commit message for the build """ try: # Enterprise automation workflow base_dir = Path(config.base_dir) project_dir = base_dir / "groups" / group / project / branch # Check project configuration config_file = base_dir / "config" / f"{project}.cfg" if not config_file.exists(): return f"Error: Configuration file {config_file} not found" # Create log directory log_dir = base_dir / "log" log_dir.mkdir(exist_ok=True) # Generate timestamp timestamp = datetime.now().strftime('%Y%m%d%H%M%S') log_file = log_dir / f"cov_auto_mcp_{group}_{project}_{branch}_{timestamp}.log" automation_steps = [ "1. Git repository update", "2. Build environment setup", "3. Coverity build execution", "4. Coverity analysis execution", "5. Results commit to Coverity Connect", "6. Issues parsing and filtering", "7. CSV report generation" ] # Simulated execution result (replace with actual pipeline execution) result = f""" Coverity Automation Pipeline Execution ===================================== Project: {group}/{project} Branch: {branch} Commit: {commit_message} Timestamp: {timestamp} Execution Steps: {chr(10).join(automation_steps)} Status: Simulated execution completed Log file: {log_file} Note: This is an MCP simulation. For actual execution, implement the full automation pipeline with proper environment setup. """ return result except Exception as e: return f"Error in automation pipeline: {str(e)}" async def parse_coverity_issues( group: str, project: str, branch: str, commit_short: str, severity_filter: str = "all" ) -> str: """ Parse and analyze Coverity issues Args: group: Project group name project: Project name branch: Branch name commit_short: Short commit hash severity_filter: Filter by severity (all, high, medium, low) """ try: # Standard Coverity issues file path base_dir = Path(config.base_dir) issues_dir = base_dir / "groups" / group / project / "cov_issues" issues_file = issues_dir / "issues.json" if not issues_file.exists(): return f"Error: Issues file {issues_file} not found" # Read JSON results try: with open(issues_file, 'r', encoding='utf-8') as f: issues_dict = json.load(f) except Exception as e: return f"Error reading issues file: {str(e)}" issues_list = issues_dict.get("issues", []) total_issues = len(issues_list) if total_issues == 0: return "No issues found in the analysis results" # Filter and aggregate by severity severity_counts = {"High": 0, "Medium": 0, "Low": 0} checker_counts = {} filtered_issues = [] for issue in issues_list: impact = issue.get("checkerProperties", {}).get("impact", "Unknown") checker = issue.get("checkerName", "Unknown") if impact in severity_counts: severity_counts[impact] += 1 checker_counts[checker] = checker_counts.get(checker, 0) + 1 # Apply severity filter if severity_filter == "all" or severity_filter.lower() == impact.lower(): filtered_issues.append({ "file": issue.get("mainEventFilePathname", "N/A"), "line": issue.get("mainEventLineNumber", "N/A"), "function": issue.get("functionDisplayName", "N/A"), "impact": impact, "checker": checker, "category": issue.get("checkerProperties", {}).get("category", "N/A") }) # Output CSV file path csv_filename = f"issues-{group}-{project}-{branch}-{commit_short}.csv" csv_filepath = issues_dir / csv_filename # Top checkers top_checkers = sorted(checker_counts.items(), key=lambda x: x[1], reverse=True)[:5] analysis_result = f""" Coverity Issues Analysis Results =============================== Project: {group}/{project}/{branch} Commit: {commit_short} Total Issues: {total_issues} Filtered Issues: {len(filtered_issues)} (filter: {severity_filter}) Severity Distribution: - High: {severity_counts['High']} - Medium: {severity_counts['Medium']} - Low: {severity_counts['Low']} Top 5 Checker Types: {chr(10).join([f"- {checker}: {count} issues" for checker, count in top_checkers])} Sample Issues: {chr(10).join([f"- {issue['file']}:{issue['line']} ({issue['impact']}) - {issue['checker']}" for issue in filtered_issues[:10]])} Output file: {csv_filepath} """ return analysis_result except Exception as e: return f"Error parsing issues: {str(e)}" async def generate_quality_report( project_name: str, include_trends: bool = True ) -> str: """ Generate comprehensive project quality report Args: project_name: Name of the project include_trends: Include trend analysis (default: True) """ try: # Quality report generation base_dir = Path(config.base_dir) # Search for project directories project_dirs = list(base_dir.glob(f"groups/*/{project_name}")) if not project_dirs: return f"Project '{project_name}' not found in {base_dir}/groups" project_dir = project_dirs[0] issues_dir = project_dir / "cov_issues" # Search for latest analysis results issue_files = list(issues_dir.glob("issues-*.csv")) if not issue_files: return f"No analysis results found for project '{project_name}'" # Report generation timestamp report_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Calculate quality score (simplified) quality_score = 85 # In production, calculate from actual metrics quality_report = f""" COVERITY QUALITY REPORT ====================== Project: {project_name} Generated: {report_time} EXECUTIVE SUMMARY ================ Quality Score: {quality_score}/100 Status: {"Good" if quality_score >= 80 else "Needs Attention" if quality_score >= 60 else "Critical"} ANALYSIS FILES FOUND ================== {chr(10).join([f"- {f.name}" for f in issue_files[-5:]])} RECOMMENDATIONS ============== 1. Focus on high-severity security issues 2. Establish regular scanning cadence 3. Implement automated triage workflows 4. Review coding standards compliance NEXT STEPS ========= 1. Run detailed analysis with parse_coverity_issues tool 2. Review specific defects with analyze_snapshot_defects tool 3. Set up automated pipeline with run_coverity_automation tool Note: This is a summary report. Use specific analysis tools for detailed insights. """ return quality_report except Exception as e: return f"Error generating quality report: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keides2/coverity-connect-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server