Argus

"""Repository analysis and security assessment tools.""" from mcp.server.fastmcp import FastMCP, server from mcp.server.fastmcp.resources import types from pydantic import BaseModel import os import subprocess from typing import List, Optional, Union, Dict, Any import tempfile import shutil from pathlib import Path import hashlib import gitdb from git import Repo, GitCommandError import json from enum import Enum import xml.etree.ElementTree as ET import platform import urllib.request import stat from collections import Counter import re from datetime import datetime # def is_libmagic_installed() -> bool: # """Check if libmagic is installed by trying to use it.""" # try: # import magic # magic.Magic() # return True # except: # return False # def ensure_system_dependencies() -> None: # """Ensure system-level dependencies are installed.""" # # Skip if SKIP_SYSTEM_CHECK environment variable is set # if os.environ.get("SKIP_SYSTEM_CHECK"): # return # # Skip if libmagic is already working # if is_libmagic_installed(): # return # system = platform.system().lower() # if system == "darwin": # try: # # Check if Homebrew exists but don't raise error if it doesn't # homebrew_exists = subprocess.run( # ["which", "brew"], # capture_output=True # ).returncode == 0 # if homebrew_exists: # # Only try to install libmagic if Homebrew is available # try: # subprocess.run(["brew", "list", "libmagic"], check=True, capture_output=True) # except subprocess.CalledProcessError: # subprocess.run(["brew", "install", "libmagic"], check=True) # else: # print("WARNING: Homebrew not found. Please install libmagic manually if needed.") # except Exception as e: # print(f"WARNING: Could not install libmagic: {str(e)}") # elif system == "linux": # try: # subprocess.run(["sudo", "apt-get", "update"], check=True) # subprocess.run(["sudo", "apt-get", "install", "-y", "libmagic1"], check=True) # except subprocess.CalledProcessError as e: # print(f"WARNING: Could not install libmagic: {str(e)}") # print("Please install libmagic manually if needed.") def ensure_dependencies() -> None: """Ensure all required tools are installed.""" # First ensure system dependencies # ensure_system_dependencies() # Then check and install PMD if not is_pmd_installed(): install_pmd() # Check and install Trivy if not is_trivy_installed(): install_trivy() def is_pmd_installed() -> bool: """Check if PMD is installed.""" try: subprocess.run(["pmd", "--version"], capture_output=True) return True except FileNotFoundError: return False def is_trivy_installed() -> bool: """Check if Trivy is installed.""" try: subprocess.run(["trivy", "--version"], capture_output=True) return True except FileNotFoundError: return False def install_pmd() -> None: """Install PMD.""" system = platform.system().lower() if system == "darwin": # macOS subprocess.run(["brew", "install", "pmd"], check=True) elif system == "linux": # Download and install PMD pmd_version = "7.0.0-rc4" pmd_url = f"https://github.com/pmd/pmd/releases/download/pmd_releases/{pmd_version}/pmd-bin-{pmd_version}.zip" install_dir = os.path.expanduser("~/.local/bin") # Create install directory if it doesn't exist os.makedirs(install_dir, exist_ok=True) # Download and extract PMD with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp_file: urllib.request.urlretrieve(pmd_url, tmp_file.name) with zipfile.ZipFile(tmp_file.name, 'r') as zip_ref: zip_ref.extractall(install_dir) # Create symlink to PMD script pmd_script = os.path.join(install_dir, f"pmd-bin-{pmd_version}/bin/pmd") pmd_link = os.path.join(install_dir, "pmd") os.chmod(pmd_script, os.stat(pmd_script).st_mode | stat.S_IEXEC) if os.path.exists(pmd_link): os.remove(pmd_link) os.symlink(pmd_script, pmd_link) # Add to PATH if not already there if install_dir not in os.environ['PATH']: with open(os.path.expanduser("~/.bashrc"), "a") as bashrc: bashrc.write(f'\nexport PATH="{install_dir}:$PATH"\n') else: raise Exception(f"Unsupported operating system: {system}") def install_trivy() -> None: """Install Trivy.""" system = platform.system().lower() if system == "darwin": # macOS subprocess.run(["brew", "install", "aquasecurity/trivy/trivy"], check=True) elif system == "linux": # Add Trivy repository and install subprocess.run([ "sudo", "apt-get", "install", "wget", "apt-transport-https", "gnupg", "lsb-release" ], check=True) # Download and add Trivy GPG key subprocess.run([ "wget", "-qO", "-", "https://aquasecurity.github.io/trivy-repo/deb/public.key", "|", "gpg", "--dearmor", "|", "sudo", "tee", "/usr/share/keyrings/trivy.gpg", ">", "/dev/null" ], check=True, shell=True) # Add Trivy repository subprocess.run([ "echo", "deb [signed-by=/usr/share/keyrings/trivy.gpg]", "https://aquasecurity.github.io/trivy-repo/deb", "$(lsb_release -sc)", "main", "|", "sudo", "tee", "/etc/apt/sources.list.d/trivy.list" ], check=True, shell=True) # Update and install Trivy subprocess.run(["sudo", "apt-get", "update"], check=True) subprocess.run(["sudo", "apt-get", "install", "trivy", "-y"], check=True) else: raise Exception(f"Unsupported operating system: {system}") # Initialize dependencies when module loads ensure_dependencies() # Input schemas class GitLabCredentials(BaseModel): api_key: str class AnalyzeRepositoryInput(BaseModel): repo_url: str gitlab_credentials: Optional[GitLabCredentials] = None branch: Optional[str] = None class InspectFilesInput(BaseModel): repo_url: str file_paths: List[str] gitlab_credentials: Optional[GitLabCredentials] = None branch: Optional[str] = None class EnumerateBranchesInput(BaseModel): repo_url: str gitlab_credentials: Optional[GitLabCredentials] = None class DiffInput(BaseModel): repo_url: str source: Optional[str] = None # Branch or commit ID, if None uses current target: Optional[str] = None # Branch or commit ID, if None uses previous commit file_path: Optional[str] = None # Specific file to diff, if None diffs all changes gitlab_credentials: Optional[GitLabCredentials] = None # Add new input schemas class SecurityScanInput(BaseModel): repo_url: str gitlab_credentials: Optional[GitLabCredentials] = None scan_type: Optional[str] = "trivy" # Only trivy option remains branch: Optional[str] = None class CodeQualityInput(BaseModel): repo_url: str language: str # "go" or "java" gitlab_credentials: Optional[GitLabCredentials] = None branch: Optional[str] = None class TeamsWebhookConfig(BaseModel): """Microsoft Teams webhook configuration.""" url: str channel: Optional[str] = None mention_users: Optional[List[str]] = None mcp = FastMCP( "Repository Tools", dependencies=[ "GitPython", "gitdb", "requests", "pylint", "bandit", ], log_level="WARNING" ) LANGUAGE_PATTERNS = { 'go': { 'extensions': {'.go'}, 'markers': {'package ', 'import (', 'func '}, }, 'java': { 'extensions': {'.java', '.kt', '.scala'}, 'markers': {'public class', 'package ', 'import java'}, }, 'python': { 'extensions': {'.py', '.pyx', '.pyi'}, 'markers': {'def ', 'class ', 'import ', 'from '}, }, 'javascript': { 'extensions': {'.js', '.jsx', '.ts', '.tsx'}, 'markers': {'function', 'const ', 'let ', 'import '}, } } ANALYSIS_TOOLS = { 'go': ['gocyclo', 'golangci-lint', 'trivy'], 'java': ['pmd', 'trivy'], 'python': ['pylint', 'bandit', 'trivy'], 'javascript': ['eslint', 'trivy'] } def get_authenticated_url(repo_url: str, gitlab_credentials: Optional[GitLabCredentials] = None) -> str: """Convert repository URL to include authentication if credentials provided.""" if not gitlab_credentials: return repo_url # Handle GitLab URLs if "gitlab.com" in repo_url: if repo_url.startswith("https://"): return repo_url.replace("https://", f"https://oauth2:{gitlab_credentials.api_key}@") elif repo_url.startswith("git@"): return repo_url.replace("git@gitlab.com:", f"https://oauth2:{gitlab_credentials.api_key}@gitlab.com/") return repo_url def clone_repo(repo_url: str, gitlab_credentials: Optional[GitLabCredentials] = None, branch: Optional[str] = None) -> str: """Clone or retrieve an existing repository from cache and return its path.""" # Generate cache directory name based on URL, credentials and branch cache_key = f"{repo_url}:{gitlab_credentials.api_key if gitlab_credentials else ''}:{branch or 'default'}" repo_hash = hashlib.sha256(cache_key.encode()).hexdigest()[:12] temp_dir = os.path.join(tempfile.gettempdir(), f"repo_cache_{repo_hash}") authenticated_url = get_authenticated_url(repo_url, gitlab_credentials) # If directory exists and is a valid git repo, fetch updates if os.path.exists(temp_dir): try: repo = Repo(temp_dir) if not repo.bare and repo.remote().url == authenticated_url: repo.remote().fetch() if branch: repo.git.checkout(branch) return temp_dir # If URLs don't match, clean up and re-clone shutil.rmtree(temp_dir, ignore_errors=True) except: shutil.rmtree(temp_dir, ignore_errors=True) # Create directory and clone repository os.makedirs(temp_dir, exist_ok=True) try: if branch: Repo.clone_from(authenticated_url, temp_dir, branch=branch) else: Repo.clone_from(authenticated_url, temp_dir) return temp_dir except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) raise Exception(f"Repository cloning failed: {str(e)}") def get_directory_tree(path: str, prefix: str = "") -> str: """Generate a tree-like directory structure string""" output = "" entries = os.listdir(path) entries.sort() for i, entry in enumerate(entries): if entry.startswith('.git'): continue is_last = i == len(entries) - 1 current_prefix = "└── " if is_last else "├── " next_prefix = " " if is_last else "│ " entry_path = os.path.join(path, entry) output += prefix + current_prefix + entry + "\n" if os.path.isdir(entry_path): output += get_directory_tree(entry_path, prefix + next_prefix) return output def create_gitlab_credentials(creds: Optional[Union[str, dict]]) -> Optional[GitLabCredentials]: """Convert various credential formats to GitLabCredentials.""" if not creds: return None if isinstance(creds, str): return GitLabCredentials(api_key=creds) if isinstance(creds, dict): return GitLabCredentials(**creds) return None def get_diff_changes(repo_path: str, source: Optional[str], target: Optional[str], file_path: Optional[str] = None) -> str: """Get diff between two commits/branches.""" try: repo = Repo(repo_path) # Handle source if source: source_commit = repo.commit(source) else: source_commit = repo.head.commit # Handle target if target: target_commit = repo.commit(target) else: target_commit = source_commit.parents[0] if source_commit.parents else None if not target_commit: return "No previous commit found to compare with." # Generate diff if file_path: diff = repo.git.diff(target_commit, source_commit, '--', file_path) else: diff = repo.git.diff(target_commit, source_commit) return diff if diff else "No changes found." except GitCommandError as e: return f"Git diff failed: {str(e)}" except Exception as e: return f"Error generating diff: {str(e)}" def run_trivy_scan(repo_path: str) -> Dict[str, Any]: """Run Trivy vulnerability scanner on repository.""" try: result = subprocess.run( ["trivy", "fs", "--format", "json", repo_path], capture_output=True, text=True, check=True ) return json.loads(result.stdout) except subprocess.CalledProcessError as e: return {"error": f"Trivy scan failed: {e.stderr}"} except json.JSONDecodeError: return {"error": "Failed to parse Trivy output"} except FileNotFoundError: return {"error": "Trivy not installed. Please install Trivy first."} def run_gocyclo_analysis(repo_path: str) -> Dict[str, Any]: """Run cyclomatic complexity analysis on Go code.""" try: result = subprocess.run( ["gocyclo", "-avg", "-over=10", "."], cwd=repo_path, capture_output=True, text=True ) metrics = { "cyclomatic_complexity": [], "average_complexity": 0, "high_complexity_functions": 0 } if result.stdout: lines = result.stdout.strip().split('\n') total_complexity = 0 for line in lines: if line: parts = line.split() if len(parts) >= 4: complexity = int(parts[0]) function_name = parts[-1] file_path = parts[-2] metrics["cyclomatic_complexity"].append({ "complexity": complexity, "function": function_name, "file": file_path }) total_complexity += complexity if complexity > 10: metrics["high_complexity_functions"] += 1 if len(lines) > 0: metrics["average_complexity"] = total_complexity / len(lines) return metrics except FileNotFoundError: return {"error": "gocyclo not installed. Please install with: go install github.com/fzipp/gocyclo/cmd/gocyclo@latest"} except Exception as e: return {"error": f"Failed to run gocyclo: {str(e)}"} def run_pmd_analysis(repo_path: str) -> Dict[str, Any]: """Run PMD static code analysis on Java code and return raw output.""" try: # Create temporary file for PMD output with tempfile.NamedTemporaryFile(suffix='.xml', delete=False) as tmp_file: output_path = tmp_file.name # Run PMD with output to file result = subprocess.run( [ "pmd", "check", "-d", repo_path, "-R", "rulesets/java/quickstart.xml", "-f", "xml", "-r", output_path ], capture_output=True, text=True ) try: with open(output_path, 'r') as f: xml_content = f.read() return {"raw_output": xml_content} finally: os.unlink(output_path) except FileNotFoundError: return {"error": "PMD not installed. Please install PMD from https://pmd.github.io/"} except Exception as e: return {"error": f"Failed to run PMD: {str(e)}"} def detect_repository_languages(repo_path: str) -> Dict[str, float]: """ Detect programming languages used in the repository. Returns a dictionary of language -> confidence score (0-1). """ file_count = Counter() language_confidence = {} total_files = 0 for root, _, files in os.walk(repo_path): if '.git' in root: continue for file in files: file_path = os.path.join(root, file) _, ext = os.path.splitext(file_path) # Skip files larger than 1MB if os.path.getsize(file_path) > 1_000_000: continue # Check file extension for lang, patterns in LANGUAGE_PATTERNS.items(): if ext in patterns['extensions']: try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read(1024) # Read first 1KB marker_count = sum(1 for marker in patterns['markers'] if marker in content) if marker_count > 0: file_count[lang] += 1 total_files += 1 except: continue # Calculate confidence scores if total_files > 0: for lang, count in file_count.items(): language_confidence[lang] = count / total_files return language_confidence def get_analysis_tools(languages: Dict[str, float], min_confidence: float = 0.1) -> List[str]: """Get appropriate analysis tools based on detected languages.""" selected_tools = set() for lang, confidence in languages.items(): if confidence >= min_confidence: selected_tools.update(ANALYSIS_TOOLS.get(lang, [])) return list(selected_tools) def run_language_specific_analysis(repo_path: str, language: str) -> Dict[str, Any]: """Run language-specific analysis tools.""" results = {} if language == 'go': results['gocyclo'] = run_gocyclo_analysis(repo_path) # Add golangci-lint results here if implemented elif language == 'java': results['pmd'] = run_pmd_analysis(repo_path) elif language == 'python': # Implement Python-specific analysis results['pylint'] = run_pylint_analysis(repo_path) results['bandit'] = run_bandit_analysis(repo_path) elif language == 'javascript': # Implement JavaScript-specific analysis results['eslint'] = run_eslint_analysis(repo_path) return results def run_pylint_analysis(repo_path: str) -> Dict[str, Any]: """Run Pylint analysis on Python code.""" try: result = subprocess.run( ["pylint", "--output-format=json", repo_path], capture_output=True, text=True ) return json.loads(result.stdout) if result.stdout else {"error": "No output from pylint"} except Exception as e: return {"error": f"Pylint analysis failed: {str(e)}"} def run_bandit_analysis(repo_path: str) -> Dict[str, Any]: """Run Bandit security analysis on Python code.""" try: result = subprocess.run( ["bandit", "-r", "-f", "json", repo_path], capture_output=True, text=True ) return json.loads(result.stdout) if result.stdout else {"error": "No output from bandit"} except Exception as e: return {"error": f"Bandit analysis failed: {str(e)}"} def run_eslint_analysis(repo_path: str) -> Dict[str, Any]: """Run ESLint analysis on JavaScript/TypeScript code.""" try: result = subprocess.run( ["eslint", "-f", "json", repo_path], capture_output=True, text=True ) return json.loads(result.stdout) if result.stdout else {"error": "No output from eslint"} except Exception as e: return {"error": f"ESLint analysis failed: {str(e)}"} def format_trivy_results(scan_results: Dict[str, Any]) -> Dict[str, Any]: """Format Trivy scan results for Teams message.""" vulnerabilities = scan_results.get("vulnerabilities", []) return { "Total Vulnerabilities": len(vulnerabilities), "Critical": sum(1 for v in vulnerabilities if v.get("severity") == "CRITICAL"), "High": sum(1 for v in vulnerabilities if v.get("severity") == "HIGH"), "Medium": sum(1 for v in vulnerabilities if v.get("severity") == "MEDIUM"), "Low": sum(1 for v in vulnerabilities if v.get("severity") == "LOW") } @mcp.tool() def analyze_repository_structure(*, repo_url: str, gitlab_credentials: Optional[Union[str, dict]] = None, branch: Optional[str] = None) -> str: """ Generate a tree representation of a repository's file structure. Args: repo_url: Repository URL to analyze gitlab_credentials: Optional GitLab token string or credentials dict branch: Optional branch name to clone """ try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds, branch) tree = get_directory_tree(repo_path) return tree except Exception as e: return f"Repository analysis failed: {str(e)}" @mcp.tool() def inspect_repository_files(*, repo_url: str, file_paths: List[str], gitlab_credentials: Optional[Union[str, dict]] = None, branch: Optional[str] = None) -> dict[str, str]: """Extract and return contents of specified repository files.""" # Log the input arguments print(f"inspect_repository_files called with repo_url={repo_url}, file_paths={file_paths}, gitlab_credentials={gitlab_credentials}") try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds, branch) results = {} for file_path in file_paths: full_path = os.path.join(repo_path, file_path) # Check if file exists if not os.path.isfile(full_path): results[file_path] = f"Error: File not found" continue try: with open(full_path, 'r', encoding='utf-8') as f: results[file_path] = f.read() except Exception as e: results[file_path] = f"Error reading file: {str(e)}" return results except Exception as e: return {"error": f"Repository inspection failed: {str(e)}"} @mcp.tool() def enumerate_branches(*, repo_url: str, gitlab_credentials: Optional[Union[str, dict]] = None) -> List[str]: """Retrieve all branch names from a repository.""" try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds) repo = Repo(repo_path) # Get list of branches branches = [branch.name for branch in repo.branches] return branches except Exception as e: return [f"Branch enumeration failed: {str(e)}"] @mcp.tool() def compare_git_changes(*, repo_url: str, source: Optional[str] = None, target: Optional[str] = None, file_path: Optional[str] = None, gitlab_credentials: Optional[Union[str, dict]] = None ) -> str: """ Compare changes between git commits or branches. Args: repo_url: Repository URL source: Source branch/commit (default: current HEAD) target: Target branch/commit (default: previous commit) file_path: Specific file to compare (optional) gitlab_credentials: Optional GitLab credentials """ try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds) return get_diff_changes(repo_path, source, target, file_path) except Exception as e: return f"Comparison failed: {str(e)}" @mcp.tool() def get_commit_history(*, repo_url: str, branch: Optional[str] = None, max_count: int = 10, gitlab_credentials: Optional[Union[str, dict]] = None ) -> List[dict]: """ Get commit history for a repository branch. Args: repo_url: Repository URL branch: Branch name (default: current branch) max_count: Maximum number of commits to return gitlab_credentials: Optional GitLab credentials """ try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds, branch) repo = Repo(repo_path) if (branch): repo.git.checkout(branch) commits = [] for commit in repo.iter_commits(max_count=max_count): commits.append({ 'hash': commit.hexsha, 'author': f"{commit.author.name} <{commit.author.email}>", 'date': commit.committed_datetime.isoformat(), 'message': commit.message.strip() }) return commits except Exception as e: return [{'error': f"Failed to get commit history: {str(e)}"}] @mcp.tool() def security_scan_repository(*, repo_url: str, scan_type: str = "trivy", gitlab_credentials: Optional[Union[str, dict]] = None, branch: Optional[str] = None ) -> Dict[str, Any]: """Perform security scanning on a repository using Trivy.""" try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds, branch) if scan_type != "trivy": return {"error": "Only Trivy scanning is supported"} scan_results = run_trivy_scan(repo_path) return { "trivy_scan": scan_results, "summary": format_trivy_results(scan_results) } except Exception as e: return {"error": f"Security scan failed: {str(e)}"} @mcp.tool() def fetch_all_branches(*, repo_url: str, gitlab_credentials: Optional[Union[str, dict]] = None) -> Dict[str, Any]: """ Fetch all branches from a repository and ensure they are up to date. Args: repo_url: Repository URL to fetch from gitlab_credentials: Optional GitLab credentials Returns: Dictionary containing branch information or error """ try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds) repo = Repo(repo_path) # Fetch all remotes and their branches for remote in repo.remotes: remote.fetch() # Get all branches (both local and remote) branches = { "local": [branch.name for branch in repo.heads], "remote": [ref.name for ref in repo.remote().refs if not ref.name.endswith('/HEAD')], "current": repo.active_branch.name } return { "status": "success", "branches": branches } except Exception as e: return { "status": "error", "error": f"Failed to fetch branches: {str(e)}" } @mcp.prompt() def analyze_pmd_violations(pmd_output: str) -> str: return f"""As a seasoned software engineer, meticulously review the code quality analysis results generated by PMD. Craft a comprehensive, professional, and technically detailed summary that encompasses the following aspects: 1. Provide an overall assessment of the codebase's health, highlighting the general adherence to coding standards and best practices. 2. Identify and emphasize the most critical issues that demand immediate attention, detailing their potential impact on system stability, performance, and security. 3. Analyze and describe any recurring patterns or trends in the issues found, offering insights into common pitfalls or areas of weakness in the code. 4. Offer specific, actionable recommendations for improvement, suggesting concrete steps to remediate identified issues and enhance code quality. 5. Prioritize the issues in a logical order for addressing them, balancing urgency, impact, and effort required for resolution. PMD output: {pmd_output}""" @mcp.tool() def analyze_code_quality(*, repo_url: str, language: Optional[str] = None, gitlab_credentials: Optional[Union[str, dict]] = None, branch: Optional[str] = None ) -> Dict[str, Any]: """ Analyze code quality with automatic language detection and tool selection. Args: repo_url: Repository URL language: Optional language override gitlab_credentials: Optional GitLab credentials branch: Optional branch name to clone Returns: Dictionary containing analysis results """ try: creds = create_gitlab_credentials(gitlab_credentials) repo_path = clone_repo(repo_url, creds, branch) # Detect languages if not specified if not language: detected_languages = detect_repository_languages(repo_path) if not detected_languages: return { "status": "error", "error": "No supported programming languages detected" } else: detected_languages = {language.lower(): 1.0} # Get appropriate analysis tools tools = get_analysis_tools(detected_languages) if not tools: return { "status": "error", "error": "No suitable analysis tools found for detected languages" } # Run analysis for each detected language results = { "status": "success", "languages": detected_languages, "analysis": {} } for lang, confidence in detected_languages.items(): if confidence >= 0.1: # Only analyze if confidence is above 10% results["analysis"][lang] = run_language_specific_analysis(repo_path, lang) # Always run Trivy for security scanning results["security_scan"] = run_trivy_scan(repo_path) return results except Exception as e: return { "status": "error", "error": f"Code quality analysis failed: {str(e)}" }