Skip to main content
Glama
keides2
by keides2
main.py19.1 kB
#!/usr/bin/env python3 """ Coverity Connect MCP Server A Model Context Protocol server for interacting with Coverity Connect (Black Duck) """ import os import sys import logging from typing import List, Dict, Any, Optional from urllib.parse import urlparse from pathlib import Path import click from mcp.server.fastmcp import FastMCP # .envファイルを読み込む(重要:最初に実行) try: from dotenv import load_dotenv env_path = Path(__file__).parent.parent.parent / '.env' if env_path.exists(): load_dotenv(env_path) # Windows環境対応: Unicode文字を使わない print(f"[OK] Loaded .env from {env_path}") except ImportError: pass # python-dotenvがない場合は環境変数を使用 from .coverity_client import CoverityClient # Configure logging import tempfile # Set log file to project directory project_root = Path(__file__).parent.parent.parent log_file = project_root / "logs" / "coverity_mcp_server.log" # Create logs directory if it doesn't exist log_file.parent.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() # Also keep console output ] ) logger = logging.getLogger(__name__) # Global client instance coverity_client: Optional[CoverityClient] = None def initialize_client() -> CoverityClient: """Initialize Coverity client with environment variables""" global coverity_client # 既存のクライアントがあれば再利用 if coverity_client is not None: logger.info("Using existing Coverity client instance") return coverity_client # 環境変数から設定を取得 coverity_url = os.getenv('COVERITY_HOST', '').strip() username = os.getenv('COVAUTHUSER', '').strip() password = os.getenv('COVAUTHKEY', '').strip() # プロキシ設定を取得(.envファイルから) proxy_host = os.getenv('PROXY_HOST', 'bypsproxy.daikin.co.jp').strip() proxy_port = os.getenv('PROXY_PORT', '3128').strip() # プロキシを環境変数に設定(aiohttpが使用) if proxy_host and proxy_port: proxy_url = f"http://{proxy_host}:{proxy_port}" os.environ['HTTP_PROXY'] = proxy_url os.environ['HTTPS_PROXY'] = proxy_url os.environ['http_proxy'] = proxy_url os.environ['https_proxy'] = proxy_url logger.info(f"Proxy configured: {proxy_url}") # デバッグ情報の出力(パスワードは隠す) logger.info("=== Coverity Client Initialization ===") logger.info(f"COVERITY_HOST: {coverity_url}") logger.info(f"COVAUTHUSER: {username}") logger.info(f"COVAUTHKEY: {'*' * 8 if password else 'NOT SET'}") logger.info(f"Proxy: {proxy_host}:{proxy_port}") logger.info(f"Working Directory: {os.getcwd()}") # 必須パラメータのチェック if not coverity_url: logger.error("COVERITY_HOST environment variable is missing") raise ValueError( "COVERITY_HOST environment variable is required. " "Please set it in your .env file or environment." ) if not username: logger.error("COVAUTHUSER environment variable is missing") raise ValueError( "COVAUTHUSER environment variable is required. " "Please set your Coverity username." ) if not password: logger.error("COVAUTHKEY environment variable is missing") raise ValueError( "COVAUTHKEY environment variable is required. " "Please set your Coverity authentication key." ) # URLを正規化(末尾のスラッシュを削除) coverity_url = coverity_url.rstrip('/') # URLをパースして設定を抽出 try: # URLに適切なスキームがない場合は追加 if not coverity_url.startswith(('http://', 'https://')): logger.warning(f"No scheme in URL: {coverity_url}, assuming https://") coverity_url = f"https://{coverity_url}" parsed_url = urlparse(coverity_url) # ホスト名の抽出 host = parsed_url.hostname if not host: # hostnameが取得できない場合はnetlocから取得 host = parsed_url.netloc.split(':')[0] if not host: raise ValueError(f"Cannot extract hostname from URL: {coverity_url}") # ポート番号の抽出(テスト成功時と同じ設定) if parsed_url.port: port = parsed_url.port elif parsed_url.scheme == 'https': port = 443 # HTTPS標準ポート else: port = 8080 # Coverity Connect標準ポート # SSL設定の判定 use_ssl = parsed_url.scheme == 'https' logger.info(f"Parsed configuration:") logger.info(f" Host: {host}") logger.info(f" Port: {port}") logger.info(f" SSL: {use_ssl}") logger.info(f" Full URL: {parsed_url.scheme}://{host}:{port}") except Exception as e: logger.error(f"Error parsing URL {coverity_url}: {e}") raise ValueError(f"Invalid COVERITY_HOST URL: {coverity_url}") # Coverityクライアントの作成 try: coverity_client = CoverityClient( host=host, port=port, use_ssl=use_ssl, username=username, password=password ) # Windows環境対応: Unicode文字を使わない logger.info(f"[OK] Successfully initialized Coverity client") logger.info(f" Server: {host}:{port}") logger.info(f" User: {username}") logger.info(f" SSL: {use_ssl}") logger.info(f" Proxy: {proxy_url if proxy_host else 'None'}") return coverity_client except Exception as e: logger.error(f"Failed to create Coverity client: {e}") logger.error("Please check:") logger.error(" 1. Network connectivity to Coverity server") logger.error(" 2. Proxy settings (if behind corporate firewall)") logger.error(" 3. Authentication credentials") logger.error(" 4. Server URL and port") raise def create_server() -> FastMCP: """Create and configure the MCP server""" # Initialize the server mcp = FastMCP("Coverity Connect MCP Server") # Initialize Coverity client try: initialize_client() logger.info("Coverity client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Coverity client: {e}") raise # Register resources using decorators @mcp.resource("coverity://projects/{project_id}/config") async def get_project_config(project_id: str) -> str: """Get project configuration""" try: client = initialize_client() project = await client.get_project(project_id) if not project: return f"Project {project_id} not found" config_info = { "project_id": project.get('projectKey'), "project_name": project.get('projectName'), "description": project.get('description', 'No description'), "created_date": project.get('createdDate'), "last_modified": project.get('lastModified'), "streams": project.get('streams', []) } return f"Project Configuration:\n{config_info}" except Exception as e: logger.error(f"Error getting project config: {e}") return f"Error: {str(e)}" @mcp.resource("coverity://streams/{stream_id}/defects") async def get_stream_defects(stream_id: str) -> str: """Get defects for a stream""" try: client = initialize_client() defects = await client.get_defects(stream_id=stream_id) if not defects: return f"No defects found for stream {stream_id}" defect_summary = [] for defect in defects[:10]: # Limit to first 10 for readability defect_summary.append({ "cid": defect.get('cid'), "checker": defect.get('checkerName'), "type": defect.get('displayType'), "severity": defect.get('displayImpact'), "status": defect.get('displayStatus'), "file": defect.get('displayFile'), "function": defect.get('displayFunction') }) return f"Stream {stream_id} Defects (showing first 10):\n{defect_summary}" except Exception as e: logger.error(f"Error getting stream defects: {e}") return f"Error: {str(e)}" # Register tools @mcp.tool() async def search_defects( query: str = "", stream_id: str = "", checker: str = "", severity: str = "", status: str = "", limit: int = 50 ) -> List[Dict[str, Any]]: """ Search for defects in Coverity Connect Args: query: General search query stream_id: Filter by stream ID checker: Filter by checker name severity: Filter by severity (High, Medium, Low) status: Filter by status (New, Triaged, Fixed, etc.) limit: Maximum number of results to return """ try: client = initialize_client() # Build filter parameters filters = {} if stream_id: filters['streamId'] = stream_id if checker: filters['checker'] = checker if severity: filters['severity'] = severity if status: filters['status'] = status defects = await client.get_defects( query=query, filters=filters, limit=limit ) return defects if defects else [] except Exception as e: logger.error(f"Error searching defects: {e}") return [{"error": str(e)}] @mcp.tool() async def get_defect_details(cid: str) -> Dict[str, Any]: """ Get detailed information about a specific defect Args: cid: Coverity Issue Identifier """ try: client = initialize_client() defect = await client.get_defect_details(cid) if not defect: return {"error": f"Defect {cid} not found"} return defect except Exception as e: logger.error(f"Error getting defect details: {e}") return {"error": str(e)} @mcp.tool() async def list_projects() -> List[Dict[str, Any]]: """List all projects in Coverity Connect""" try: client = initialize_client() projects = await client.get_projects() return projects if projects else [] except Exception as e: logger.error(f"Error listing projects: {e}") return [{"error": str(e)}] @mcp.tool() async def list_streams(project_id: str = "") -> List[Dict[str, Any]]: """ List streams, optionally filtered by project Args: project_id: Optional project ID to filter streams """ try: client = initialize_client() streams = await client.get_streams(project_id=project_id) return streams if streams else [] except Exception as e: logger.error(f"Error listing streams: {e}") return [{"error": str(e)}] @mcp.tool() async def get_project_summary(project_id: str) -> Dict[str, Any]: """ Get summary information for a project including defect counts Args: project_id: Project identifier """ try: client = initialize_client() # Get project details project = await client.get_project(project_id) if not project: return {"error": f"Project {project_id} not found"} # Get streams for this project streams = await client.get_streams(project_id=project_id) # Get defect summary for each stream stream_summaries = [] for stream in streams or []: stream_id = stream.get('name', '') defects = await client.get_defects(stream_id=stream_id, limit=1000) # Count defects by severity severity_counts = {'High': 0, 'Medium': 0, 'Low': 0} status_counts = {} for defect in defects or []: severity = defect.get('displayImpact', 'Unknown') status = defect.get('displayStatus', 'Unknown') if severity in severity_counts: severity_counts[severity] += 1 status_counts[status] = status_counts.get(status, 0) + 1 stream_summaries.append({ 'stream_name': stream_id, 'total_defects': len(defects or []), 'severity_breakdown': severity_counts, 'status_breakdown': status_counts }) return { 'project': project, 'streams': stream_summaries, 'total_streams': len(streams or []) } except Exception as e: logger.error(f"Error getting project summary: {e}") return {"error": str(e)} @mcp.tool() async def list_users( include_disabled: bool = False, limit: int = 200 ) -> List[Dict[str, Any]]: """ List all users in Coverity Connect Args: include_disabled: Include disabled users (default: False) limit: Maximum number of users to return (default: 200) """ try: client = initialize_client() users = await client.get_users( disabled=include_disabled, include_details=True, locked=False, limit=limit ) return users if users else [] except Exception as e: logger.error(f"Error listing users: {e}") return [{"error": str(e)}] @mcp.tool() async def get_user_details(username: str) -> Dict[str, Any]: """ Get detailed information about a specific user Args: username: Username to lookup """ try: client = initialize_client() user = await client.get_user_details(username) if not user: return {"error": f"User {username} not found"} return user except Exception as e: logger.error(f"Error getting user details: {e}") return {"error": str(e)} @mcp.tool() async def get_user_roles(username: str) -> Dict[str, Any]: """ Get role and permission information for a specific user Args: username: Username to lookup roles for """ try: client = initialize_client() user = await client.get_user_details(username) if not user: return {"error": f"User {username} not found"} # Extract role information roles_info = { "username": user.get('name'), "superUser": user.get('superUser', False), "groups": user.get('groupNames', []), "roles": [], "status": { "disabled": user.get('disabled', False), "locked": user.get('locked', False), "local": user.get('local', True) }, "lastLogin": user.get('lastLogin'), "dateCreated": user.get('dateCreated') } # Process role assignments for role in user.get('roleAssignments', []): role_info = { "roleName": role.get('roleName'), "scope": role.get('scope'), "roleAssignmentType": role.get('roleAssignmentType', 'user') } # Add description based on role name role_descriptions = { "administrator": "システム全体の管理権限", "projectOwner": "プロジェクトの所有者権限", "developer": "開発者権限", "analyst": "分析者権限", "viewer": "閲覧権限" } role_info["description"] = role_descriptions.get( role.get('roleName'), f"{role.get('roleName')} 権限" ) roles_info["roles"].append(role_info) return roles_info except Exception as e: logger.error(f"Error getting user roles: {e}") return {"error": str(e)} return mcp def run_server(): """Run the MCP server""" try: mcp = create_server() mcp.run() except Exception as e: logger.error(f"Failed to start server: {e}") sys.exit(1) @click.command() @click.option('--host', default='localhost', help='Coverity Connect host') @click.option('--port', default=8080, help='Coverity Connect port') @click.option('--ssl/--no-ssl', default=True, help='Use SSL connection') @click.option('--username', help='Coverity username (or set COVAUTHUSER env var)') @click.option('--password', help='Coverity password (or set COVAUTHKEY env var)') def cli(host, port, ssl, username, password): """Start the Coverity Connect MCP Server""" # Set environment variables if provided via CLI if host: os.environ['COVERITY_HOST'] = host if port: os.environ['COVERITY_PORT'] = str(port) if ssl is not None: os.environ['COVERITY_SSL'] = str(ssl) if username: os.environ['COVAUTHUSER'] = username if password: os.environ['COVAUTHKEY'] = password run_server() if __name__ == "__main__": cli()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keides2/coverity-connect-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server