Skip to main content
Glama
keides2
by keides2
main_production.py17.7 kB
#!/usr/bin/env python3 """ Coverity Connect MCP Server - Production Version A Model Context Protocol server for interacting with Coverity Connect (Black Duck) This version uses environment variables for configuration without hardcoded credentials. """ import os import sys import logging from typing import List, Dict, Any, Optional from urllib.parse import urlparse import click from mcp.server.fastmcp import FastMCP from .coverity_client import CoverityClient # Configure logging import tempfile from pathlib import Path # Set log file to project directory project_root = Path(__file__).parent.parent.parent log_file = project_root / "logs" / "coverity_mcp_server.log" # Create logs directory if it doesn't exist log_file.parent.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() # Also keep console output ] ) logger = logging.getLogger(__name__) # Global client instance coverity_client: Optional[CoverityClient] = None def initialize_client() -> CoverityClient: """Initialize Coverity client with environment variables""" global coverity_client if coverity_client is not None: return coverity_client # Get configuration from environment variables coverity_url = os.getenv('COVERITY_HOST') username = os.getenv('COVAUTHUSER') password = os.getenv('COVAUTHKEY') logger.info("=== Configuration Loading ===") logger.info(f"COVERITY_HOST: {coverity_url}") logger.info(f"COVAUTHUSER: {username}") logger.info(f"COVAUTHKEY: {'***' if password else 'None'}") # Set proxy environment variables if not already set if not os.getenv('HTTPS_PROXY') and not os.getenv('https_proxy'): proxy_host = os.getenv('PROXY_HOST') proxy_port = os.getenv('PROXY_PORT', '') if proxy_host: proxy_url = f'http://{proxy_host}:{proxy_port}' os.environ['HTTPS_PROXY'] = proxy_url os.environ['HTTP_PROXY'] = proxy_url logger.info(f"Set proxy: {proxy_url}") # Handle missing configuration gracefully if not coverity_url: logger.error("COVERITY_HOST environment variable is required") raise ValueError("COVERITY_HOST environment variable is not set") if not username: logger.error("COVAUTHUSER environment variable is required") raise ValueError("COVAUTHUSER environment variable is not set") if not password: logger.error("COVAUTHKEY environment variable is required") raise ValueError("COVAUTHKEY environment variable is not set") # Parse URL to extract host, port, and SSL setting try: parsed_url = urlparse(coverity_url) # Extract host (remove trailing slash if present) host = parsed_url.hostname or parsed_url.netloc.split(':')[0] # Handle cases where hostname is still empty if not host: logger.error(f"Could not parse hostname from URL: {coverity_url}") raise ValueError(f"Invalid COVERITY_HOST URL: {coverity_url}") # Extract port if parsed_url.port: port = parsed_url.port elif parsed_url.scheme == 'https': port = 443 else: port = 8080 # Determine SSL setting use_ssl = parsed_url.scheme == 'https' logger.info(f"Parsed URL - Host: {host}, Port: {port}, SSL: {use_ssl}") except Exception as e: logger.error(f"Error parsing URL {coverity_url}: {e}") raise ValueError(f"Invalid COVERITY_HOST URL format: {coverity_url}") coverity_client = CoverityClient( host=host, port=port, use_ssl=use_ssl, username=username, password=password ) logger.info(f"Initialized Coverity client for {host}:{port} (SSL: {use_ssl})") return coverity_client def create_server() -> FastMCP: """Create and configure the MCP server""" # Initialize the server mcp = FastMCP("Coverity Connect MCP Server") # Initialize Coverity client try: initialize_client() logger.info("Coverity client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Coverity client: {e}") raise # Register resources using decorators @mcp.resource("coverity://projects/{project_id}/config") async def get_project_config(project_id: str) -> str: """Get project configuration""" try: client = initialize_client() project = await client.get_project(project_id) if not project: return f"Project {project_id} not found" config_info = { "project_id": project.get('projectKey'), "project_name": project.get('projectName'), "description": project.get('description', 'No description'), "created_date": project.get('createdDate'), "last_modified": project.get('lastModified'), "streams": project.get('streams', []) } return f"Project Configuration:\n{config_info}" except Exception as e: logger.error(f"Failed to get project config: {e}") return f"Error getting project configuration: {e}" @mcp.resource("coverity://streams/{stream_id}/status") async def get_stream_status(stream_id: str) -> str: """Get stream status and recent defects""" try: client = initialize_client() # Get stream details streams = await client.get_streams() stream = next((s for s in streams if s.get('name') == stream_id), None) if not stream: return f"Stream {stream_id} not found" # Get recent defects for this stream defects = await client.get_defects(stream_id=stream_id, limit=10) status_info = { "stream_id": stream.get('name'), "description": stream.get('description', 'No description'), "project_id": stream.get('projectId'), "language": stream.get('language'), "total_defects": len(defects), "recent_defects": [ { "cid": d.get('cid'), "checker": d.get('checkerName'), "impact": d.get('displayImpact'), "status": d.get('displayStatus') } for d in defects[:5] # Top 5 recent defects ] } return f"Stream Status:\n{status_info}" except Exception as e: logger.error(f"Failed to get stream status: {e}") return f"Error getting stream status: {e}" # Register tools using decorators @mcp.tool() async def list_projects() -> List[Dict[str, Any]]: """ List all projects in Coverity Connect Returns: List of projects with their basic information """ try: client = initialize_client() projects = await client.get_projects() logger.info(f"Retrieved {len(projects)} projects") return projects except Exception as e: logger.error(f"Failed to list projects: {e}") raise RuntimeError(f"Error listing projects: {e}") @mcp.tool() async def list_streams(project_id: str = "") -> List[Dict[str, Any]]: """ List streams, optionally filtered by project Args: project_id: Optional project ID to filter streams Returns: List of streams """ try: client = initialize_client() streams = await client.get_streams(project_id=project_id) logger.info(f"Retrieved {len(streams)} streams" + (f" for project {project_id}" if project_id else "")) return streams except Exception as e: logger.error(f"Failed to list streams: {e}") raise RuntimeError(f"Error listing streams: {e}") @mcp.tool() async def get_project_summary(project_id: str) -> Dict[str, Any]: """ Get summary information for a project including defect counts Args: project_id: Project identifier Returns: Project summary with defect statistics """ try: client = initialize_client() # Get project details project = await client.get_project(project_id) if not project: raise ValueError(f"Project {project_id} not found") # Get streams for this project streams = await client.get_streams(project_id=project_id) # Get defect counts for each stream total_defects = 0 stream_summaries = [] for stream in streams: stream_name = stream.get('name', '') defects = await client.get_defects(stream_id=stream_name, limit=1000) defect_count = len(defects) total_defects += defect_count stream_summaries.append({ "stream_name": stream_name, "defect_count": defect_count, "description": stream.get('description', '') }) summary = { "project_key": project.get('projectKey'), "project_name": project.get('projectName'), "description": project.get('description', ''), "created_date": project.get('createdDate'), "last_modified": project.get('lastModified'), "total_streams": len(streams), "total_defects": total_defects, "streams": stream_summaries } logger.info(f"Generated summary for project {project_id}") return summary except Exception as e: logger.error(f"Failed to get project summary: {e}") raise RuntimeError(f"Error getting project summary: {e}") @mcp.tool() async def search_defects(query: str = "", stream_id: str = "", checker: str = "", severity: str = "", status: str = "", limit: int = 50) -> List[Dict[str, Any]]: """ Search for defects in Coverity Connect Args: query: General search query stream_id: Filter by stream ID checker: Filter by checker name severity: Filter by severity (High, Medium, Low) status: Filter by status (New, Triaged, Fixed, etc.) limit: Maximum number of results to return Returns: List of defects matching the search criteria """ try: client = initialize_client() # Build filters dictionary filters = {} if checker: filters['checker'] = checker if severity: filters['severity'] = severity if status: filters['status'] = status defects = await client.get_defects( stream_id=stream_id, query=query, filters=filters, limit=limit ) logger.info(f"Found {len(defects)} defects matching search criteria") return defects except Exception as e: logger.error(f"Failed to search defects: {e}") raise RuntimeError(f"Error searching defects: {e}") @mcp.tool() async def get_defect_details(cid: str) -> Optional[Dict[str, Any]]: """ Get detailed information about a specific defect Args: cid: Coverity Issue Identifier Returns: Detailed defect information or None if not found """ try: client = initialize_client() defect = await client.get_defect_details(cid) if defect: logger.info(f"Retrieved details for defect {cid}") else: logger.warning(f"Defect {cid} not found") return defect except Exception as e: logger.error(f"Failed to get defect details: {e}") raise RuntimeError(f"Error getting defect details: {e}") @mcp.tool() async def list_users(include_disabled: bool = False, limit: int = 200) -> List[Dict[str, Any]]: """ List all users in Coverity Connect Args: include_disabled: Include disabled users (default: False) limit: Maximum number of users to return (default: 200) Returns: List of users with their information """ try: client = initialize_client() users = await client.get_users( disabled=include_disabled, limit=limit ) logger.info(f"Retrieved {len(users)} users") return users except Exception as e: logger.error(f"Failed to list users: {e}") raise RuntimeError(f"Error listing users: {e}") @mcp.tool() async def get_user_details(username: str) -> Optional[Dict[str, Any]]: """ Get detailed information about a specific user Args: username: Username to lookup Returns: User details or None if not found """ try: client = initialize_client() user = await client.get_user_details(username) if user: logger.info(f"Retrieved details for user {username}") else: logger.warning(f"User {username} not found") return user except Exception as e: logger.error(f"Failed to get user details: {e}") raise RuntimeError(f"Error getting user details: {e}") @mcp.tool() async def get_user_roles(username: str) -> Dict[str, Any]: """ Get role and permission information for a specific user Args: username: Username to lookup roles for Returns: User role information """ try: client = initialize_client() user = await client.get_user_details(username) if not user: raise ValueError(f"User {username} not found") role_info = { "username": user.get('name'), "email": user.get('email'), "groups": user.get('groupNames', []), "roles": user.get('roleAssignments', []), "super_user": user.get('superUser', False), "disabled": user.get('disabled', False), "locked": user.get('locked', False) } logger.info(f"Retrieved role information for user {username}") return role_info except Exception as e: logger.error(f"Failed to get user roles: {e}") raise RuntimeError(f"Error getting user roles: {e}") return mcp @click.command() @click.option("--host", default="localhost", help="Host to bind to") @click.option("--port", default=3000, help="Port to bind to") @click.option("--log-level", default="INFO", help="Logging level") def main(host: str, port: int, log_level: str): """Start the Coverity Connect MCP server""" # Set logging level logging.getLogger().setLevel(getattr(logging, log_level.upper())) # Check required environment variables required_vars = ['COVERITY_HOST', 'COVAUTHUSER', 'COVAUTHKEY'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: logger.error(f"Missing required environment variables: {', '.join(missing_vars)}") logger.error("Please set the following environment variables:") logger.error(" COVERITY_HOST - Coverity Connect server URL (e.g., https://coverity.company.com)") logger.error(" COVAUTHUSER - Coverity Connect username") logger.error(" COVAUTHKEY - Coverity Connect password/token") logger.error("Optional environment variables:") logger.error(" PROXY_HOST - Proxy hostname (if proxy is required)") logger.error(" PROXY_PORT - Proxy port (if proxy is required)") sys.exit(1) try: logger.info("Starting Coverity Connect MCP Server...") # Create and configure server server = create_server() # Run the server server.run(host=host, port=port) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keides2/coverity-connect-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server