Skip to main content
Glama
ZeroPathAI

ZeroPath MCP Server

Official
by ZeroPathAI
server.py33.9 kB
""" ZeroPath MCP Server Provides tools for interacting with ZeroPath security findings via the MCP protocol. """ import json import os import requests from mcp.server.fastmcp import FastMCP # Environment variables token_id = os.getenv("ZEROPATH_TOKEN_ID") token_secret = os.getenv("ZEROPATH_TOKEN_SECRET") org_id = os.getenv("ZEROPATH_ORG_ID") # API base URL API_BASE_URL = "https://zeropath.com/api/v1" def make_api_request(endpoint, payload=None, include_org=True): """Make authenticated API request to ZeroPath.""" if not token_id or not token_secret: return None, "Error: Zeropath API credentials not found in environment variables" headers = { "X-ZeroPath-API-Token-Id": token_id, "X-ZeroPath-API-Token-Secret": token_secret, "Content-Type": "application/json" } if payload is None: payload = {} if include_org and org_id: payload["organizationId"] = org_id try: response = requests.post( f"{API_BASE_URL}/{endpoint}", headers=headers, json=payload ) return response, None except Exception as e: return None, f"Error: {str(e)}" def handle_response(response, error, success_processor=None): """Handle API response with standard error checking.""" if error: return error if response.status_code == 200: if success_processor: return success_processor(response.json()) return response.json() elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" # Check if required environment variables are set if not token_id or not token_secret or not org_id: missing_vars = [] if not token_id: missing_vars.append("ZEROPATH_TOKEN_ID") if not token_secret: missing_vars.append("ZEROPATH_TOKEN_SECRET") if not org_id: missing_vars.append("ZEROPATH_ORG_ID") raise EnvironmentError(f"Missing required environment variables: {', '.join(missing_vars)}") mcp = FastMCP("Zeropath") # ============================================================================= # NOTE: This MCP server requires an admin/organization API key from ZeroPath. # Regular user API keys will not have access to most of these endpoints. # Generate an org API key at: https://zeropath.com/settings/api-keys # ============================================================================= @mcp.tool() def search_vulnerabilities(search_query=None): """ Search for vulnerabilities using the Zeropath API with a simple search query. """ payload = {} if search_query: payload["searchQuery"] = search_query response, error = make_api_request("issues/search", payload) if error: return error if response.status_code == 200: return process_vulnerability_response(response.json()) elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def get_issue(issue_id): """ Get a specific vulnerability issue by its ID, including patch information if available. Args: issue_id (str): The ID of the issue to retrieve """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request("issues/get", {"issueId": issue_id}) if error: return error if response.status_code == 200: raw_response = response.json() if not raw_response: return "Error: Empty response received from API" return process_issue_response(raw_response) elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def approve_patch(issue_id): """ Approve a patch for a specific vulnerability issue. Args: issue_id (str): The ID of the issue whose patch should be approved """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request("issues/approve-patch", {"issueId": issue_id}) if error: return error if response.status_code == 200: return "Patch approved successfully" elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" # ============================================================================= # BUG TRIAGE TOOLS # ============================================================================= @mcp.tool() def mark_true_positive(issue_id: str) -> str: """ Mark a security issue as a true positive (confirmed vulnerability). Args: issue_id: The ID of the issue to mark as true positive """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request( "issues/mark-true-positive", {"issueId": issue_id} ) if error: return error if response.status_code == 200: return f"Issue {issue_id} marked as true positive successfully" elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def mark_false_positive(issue_id: str) -> str: """ Mark a security issue as a false positive (not a real vulnerability). Args: issue_id: The ID of the issue to mark as false positive """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request( "issues/mark-false-positive", {"issueId": issue_id} ) if error: return error if response.status_code == 200: return f"Issue {issue_id} marked as false positive successfully" elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def archive_issue(issue_id: str) -> str: """ Archive a security issue to remove it from active view. Args: issue_id: The ID of the issue to archive """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request( "issues/archive", {"issueId": issue_id} ) if error: return error if response.status_code == 200: return f"Issue {issue_id} archived successfully" elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def unarchive_issue(issue_id: str) -> str: """ Unarchive a previously archived security issue to restore it to active view. Args: issue_id: The ID of the issue to unarchive """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request( "issues/unarchive", {"issueId": issue_id} ) if error: return error if response.status_code == 200: return f"Issue {issue_id} unarchived successfully" elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def generate_patch(issue_id: str) -> str: """ Generate an automated patch/fix for a security issue. Args: issue_id: The ID of the issue to generate a patch for """ if not issue_id: return "Error: Issue ID is required" response, error = make_api_request( "issues/generate-patch", {"issueId": issue_id} ) if error: return error if response.status_code == 200: result = response.json() if result.get("patch"): return f"Patch generated successfully for issue {issue_id}. Use get_issue({issue_id}) to view the patch details." return f"Patch generation initiated for issue {issue_id}. This may take a moment." elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" # ============================================================================= # SCAN MANAGEMENT TOOLS # ============================================================================= @mcp.tool() def start_scan(repository_ids: list[str] = None) -> str: """ Start a new security scan on one or more repositories. Args: repository_ids: List of repository IDs to scan """ if not repository_ids: return "Error: At least one repository ID is required" # Handle case where repository_ids might be passed as a JSON string if isinstance(repository_ids, str): try: repository_ids = json.loads(repository_ids) except json.JSONDecodeError: # If it's a single ID as a string, wrap it in a list repository_ids = [repository_ids] response, error = make_api_request( "scans/start", {"repositoryIds": repository_ids} ) if error: return error if response.status_code == 200: result = response.json() scan_id = result.get("scanId", result.get("id", "unknown")) return f"Scan started successfully. Scan ID: {scan_id}" elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def list_scans( search_query: str = None, repository_ids: list[str] = None, scan_type: str = None, page: int = 1, page_size: int = 10 ) -> str: """ List security scans with optional filtering and pagination. Args: search_query: Optional search term to filter scans repository_ids: Optional list of repository IDs to filter by scan_type: Optional scan type filter (FullScan, PrScan, SCAScan) page: Page number (default: 1) page_size: Number of results per page (default: 10) """ payload = { "page": page, "pageSize": page_size } if search_query: payload["searchQuery"] = search_query if repository_ids: payload["repositoryIds"] = repository_ids if scan_type: valid_types = ["FullScan", "PrScan", "SCAScan"] if scan_type not in valid_types: return f"Error: Invalid scan type. Must be one of: {', '.join(valid_types)}" payload["scanType"] = scan_type response, error = make_api_request("scans/list", payload) if error: return error if response.status_code == 200: return process_scans_response(response.json()) elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" def process_scans_response(raw_response): """Process scans list response into readable format.""" if "error" in raw_response: return f"Error: {raw_response['error']}" scans = raw_response.get("scans", raw_response.get("items", [])) if not scans: return "No scans found." total_count = raw_response.get("totalCount", len(scans)) result = f"Found {total_count} scan(s).\n\n" for i, scan in enumerate(scans, 1): result += f"Scan {i}:\n" result += f" ID: {scan.get('id', 'N/A')}\n" result += f" Status: {scan.get('status', 'N/A')}\n" result += f" Type: {scan.get('scanType', 'N/A')}\n" result += f" Repository: {scan.get('repositoryName', scan.get('repositoryId', 'N/A'))}\n" result += f" Branch: {scan.get('branch', 'N/A')}\n" result += f" Created: {scan.get('createdAt', 'N/A')}\n" result += f" Updated: {scan.get('updatedAt', 'N/A')}\n" # Issue counts if available if scan.get('openIssues') is not None: result += f" Open Issues: {scan.get('openIssues', 0)}\n" if scan.get('patchedIssues') is not None: result += f" Patched Issues: {scan.get('patchedIssues', 0)}\n" if scan.get('falsePositiveIssues') is not None: result += f" False Positives: {scan.get('falsePositiveIssues', 0)}\n" result += "\n" # Pagination info if "page" in raw_response or "currentPage" in raw_response: result += f"Page: {raw_response.get('page', raw_response.get('currentPage', 1))}\n" result += f"Page Size: {raw_response.get('pageSize', len(scans))}\n" return result # ============================================================================= # REPOSITORY MANAGEMENT TOOLS # ============================================================================= @mcp.tool() def list_repositories(search_query: str = None) -> str: """ List all repositories in the organization. Args: search_query: Optional search term to filter repositories """ payload = {} if search_query: payload["searchQuery"] = search_query response, error = make_api_request("repositories/list", payload) if error: return error if response.status_code == 200: return process_repositories_response(response.json()) elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" def process_repositories_response(raw_response): """Process repositories list response into readable format.""" # Handle list response directly if isinstance(raw_response, list): repos = raw_response else: if "error" in raw_response: return f"Error: {raw_response['error']}" repos = raw_response.get("repositories", raw_response.get("items", [])) if not repos: return "No repositories found." total_count = len(repos) result = f"Found {total_count} repository(ies).\n\n" for i, repo in enumerate(repos, 1): result += f"Repository {i}:\n" result += f" ID: {repo.get('id', 'N/A')}\n" result += f" Name: {repo.get('name', 'N/A')}\n" result += f" Full Name: {repo.get('fullName', repo.get('full_name', 'N/A'))}\n" result += f" Provider: {repo.get('provider', 'N/A')}\n" result += f" Default Branch: {repo.get('defaultBranch', repo.get('scanBranch', 'N/A'))}\n" result += f" PR Scanning: {repo.get('prScanningEnabled', 'N/A')}\n" result += f" Last Scanned: {repo.get('lastScannedAt', 'N/A')}\n" result += "\n" return result # ============================================================================= # STATS & ANALYTICS TOOLS # ============================================================================= @mcp.tool() def get_security_posture() -> str: """ Get the overall security posture metrics for the organization. Returns security score, vulnerability trends, and risk assessment. """ response, error = make_api_request("stats/securityPosture") if error: return error if response.status_code == 200: return process_stats_response(response.json(), "Security Posture") elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def get_issues_by_vuln_class() -> str: """ Get issue statistics grouped by vulnerability class/type. Shows distribution of vulnerabilities by category (XSS, SQLi, etc.). """ response, error = make_api_request("stats/issuesByVulnClass") if error: return error if response.status_code == 200: return process_stats_response(response.json(), "Issues by Vulnerability Class") elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def get_summary_statistics() -> str: """ Get aggregated summary statistics across the organization. Includes total issues, patches, repositories, and key metrics. """ response, error = make_api_request("stats/summary") if error: return error if response.status_code == 200: return process_stats_response(response.json(), "Summary Statistics") elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" def process_stats_response(raw_response, title): """Process stats response into readable format.""" # Handle list response directly if isinstance(raw_response, list): result = f"=== {title} ===\n\n" for i, item in enumerate(raw_response, 1): if isinstance(item, dict): result += f"Item {i}:\n" for k, v in item.items(): formatted_key = ''.join(' ' + c if c.isupper() else c for c in str(k)).strip().title() result += f" {formatted_key}: {v}\n" result += "\n" else: result += f" - {item}\n" return result if isinstance(raw_response, dict) and "error" in raw_response: return f"Error: {raw_response['error']}" result = f"=== {title} ===\n\n" def format_value(key, value, indent=0): """Format a key-value pair with proper indentation.""" prefix = " " * indent if isinstance(value, dict): output = f"{prefix}{key}:\n" for k, v in value.items(): output += format_value(k, v, indent + 1) return output elif isinstance(value, list): output = f"{prefix}{key}:\n" for i, item in enumerate(value): if isinstance(item, dict): output += f"{prefix} Item {i + 1}:\n" for k, v in item.items(): output += format_value(k, v, indent + 2) else: output += f"{prefix} - {item}\n" return output else: # Format the key nicely (camelCase to Title Case) formatted_key = ''.join(' ' + c if c.isupper() else c for c in key).strip().title() return f"{prefix}{formatted_key}: {value}\n" for key, value in raw_response.items(): result += format_value(key, value) return result # ============================================================================= # SCA (SOFTWARE COMPOSITION ANALYSIS) TOOLS # ============================================================================= @mcp.tool() def list_sca_vulnerabilities( search_query: str = None, repository_ids: list[str] = None, ecosystems: list[str] = None, transitivity: str = None, page: int = 1, page_size: int = 50 ) -> str: """ Search for SCA (Software Composition Analysis) vulnerabilities in dependencies. Args: search_query: Optional search term to filter vulnerabilities repository_ids: Optional list of repository IDs to filter by ecosystems: Optional list of ecosystems to filter (npm, pip, maven, etc.) transitivity: Optional filter by dependency type (direct, transitive) page: Page number (default: 1) page_size: Number of results per page (default: 50) """ payload = { "page": page, "pageSize": page_size } if search_query: payload["searchQuery"] = search_query if repository_ids: payload["repositoryIds"] = repository_ids if ecosystems: payload["ecosystems"] = ecosystems if transitivity: if transitivity not in ["direct", "transitive"]: return "Error: transitivity must be 'direct' or 'transitive'" payload["transitivity"] = transitivity response, error = make_api_request("sca/vulnerabilities/search", payload) if error: return error if response.status_code == 200: return process_sca_vulnerabilities_response(response.json()) elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def get_sca_vulnerability(vulnerability_id: str) -> str: """ Get detailed information about a specific SCA vulnerability. Args: vulnerability_id: The ID of the vulnerability to retrieve """ if not vulnerability_id: return "Error: Vulnerability ID is required" response, error = make_api_request( "sca/vulnerabilities/get", {"id": vulnerability_id} ) if error: return error if response.status_code == 200: vuln = response.json() output = "SCA Vulnerability Details:\n\n" output += f"ID: {vuln.get('id', 'N/A')}\n" output += f"Package: {vuln.get('packageName', 'N/A')}\n" output += f"Version: {vuln.get('version', 'N/A')}\n" output += f"Ecosystem: {vuln.get('ecosystem', 'N/A')}\n" output += f"Severity: {vuln.get('severity', 'N/A')}\n" output += f"CVSS Score: {vuln.get('cvssScore', vuln.get('severityScore', 'N/A'))}\n" # Advisory info if vuln.get('aliases'): output += f"Aliases: {', '.join(vuln['aliases'])}\n" if vuln.get('cve'): output += f"CVE: {vuln['cve']}\n" output += f"\nSummary: {vuln.get('summary', 'N/A')}\n" output += f"\nDescription: {vuln.get('description', 'N/A')}\n" # Fix info if vuln.get('fixedVersion'): output += f"\nFixed in Version: {vuln['fixedVersion']}\n" if vuln.get('references'): output += "\nReferences:\n" for ref in vuln['references'][:5]: # Limit to 5 references output += f" - {ref}\n" return output elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" @mcp.tool() def list_sca_repositories() -> str: """ List repositories with their aggregated dependency inventory information. Shows which repositories have been analyzed for dependencies. """ response, error = make_api_request("sca/repositories/search") if error: return error if response.status_code == 200: result = response.json() repos = result.get("repositories", result if isinstance(result, list) else []) if not repos: return "No repositories with SCA data found." output = f"Found {len(repos)} repository(ies) with SCA data:\n\n" for i, repo in enumerate(repos, 1): output += f"Repository {i}:\n" output += f" ID: {repo.get('id', repo.get('repositoryId', 'N/A'))}\n" output += f" Name: {repo.get('name', repo.get('repositoryName', 'N/A'))}\n" output += f" Total Dependencies: {repo.get('totalDependencies', repo.get('dependencyCount', 'N/A'))}\n" output += f" Vulnerable Packages: {repo.get('vulnerablePackages', 'N/A')}\n" output += f" Critical: {repo.get('criticalCount', 'N/A')}\n" output += f" High: {repo.get('highCount', 'N/A')}\n" output += f" Medium: {repo.get('mediumCount', 'N/A')}\n" output += f" Low: {repo.get('lowCount', 'N/A')}\n" output += "\n" return output elif response.status_code == 401: return "Error: Unauthorized - check API credentials" elif response.status_code == 400: return f"Error: Bad request - {response.text}" else: return f"Error: API returned status {response.status_code}: {response.text}" def process_sca_vulnerabilities_response(raw_response): """Process SCA vulnerabilities search response into readable format.""" if "error" in raw_response: return f"Error: {raw_response['error']}" vulns = raw_response.get("vulnerabilities", raw_response.get("items", [])) if not vulns: return "No SCA vulnerabilities found." total_count = raw_response.get("totalCount", len(vulns)) result = f"Found {total_count} SCA vulnerability(ies).\n\n" for i, vuln in enumerate(vulns, 1): result += f"Vulnerability {i}:\n" result += f" ID: {vuln.get('id', 'N/A')}\n" # Package info pkg = vuln.get('package', {}) if pkg: result += f" Package: {pkg.get('name', 'N/A')} @ {pkg.get('version', 'N/A')}\n" result += f" Ecosystem: {pkg.get('ecosystem', 'N/A')}\n" result += f" Manifest: {pkg.get('manifestPath', 'N/A')}\n" else: result += f" Package: {vuln.get('packageName', 'N/A')}\n" # Metadata meta = vuln.get('metadata', {}) if meta: result += f" Severity: {meta.get('severity', 'N/A')}\n" result += f" Score: {meta.get('severityScore', 'N/A')}\n" result += f" Summary: {meta.get('summary', 'N/A')}\n" if meta.get('aliases'): result += f" Aliases: {', '.join(meta['aliases'][:3])}\n" else: result += f" Severity: {vuln.get('severity', 'N/A')}\n" result += f" Repository: {vuln.get('repositoryId', 'N/A')}\n" result += f" Branch: {vuln.get('branch', 'N/A')}\n" result += "\n" # Pagination info if "page" in raw_response: result += f"Page: {raw_response.get('page', 1)} | " result += f"Page Size: {raw_response.get('pageSize', len(vulns))} | " result += f"Total: {total_count}\n" return result def process_issue_response(issue): """ Process a single issue response into a readable format, focusing on the issue details and patch. """ if not issue: return "Error: Empty issue data" if "error" in issue and issue["error"]: return f"Error: {issue['error']}" # Check if we have a valid issue (must have an id at minimum) if not issue.get('id'): return "Error: Invalid issue data received - missing ID" # Get patch information if available patch = issue.get("patch") or issue.get("vulnerabilityPatch") result = "Issue Details:\n" result += f"ID: {issue.get('id', 'N/A')}\n" result += f"Status: {issue.get('status', 'N/A')}\n" result += f"Title: {issue.get('generatedTitle', 'N/A')}\n" result += f"Description: {issue.get('generatedDescription', 'N/A')}\n" result += f"Language: {issue.get('language', 'N/A')}\n" result += f"Vulnerability Class: {issue.get('vulnClass', 'N/A')}\n" if issue.get("cwes"): result += f"CWEs: {', '.join(issue.get('cwes', []))}\n" result += f"Severity: {issue.get('severity', 'N/A')}\n" result += f"Affected File: {issue.get('affectedFile', 'N/A')}\n" if issue.get("startLine") and issue.get("endLine"): result += f"Location: Lines {issue.get('startLine')} to {issue.get('endLine')}\n" result += f"Validation Status: {issue.get('validated', 'N/A')}\n" result += f"Unpatchable: {issue.get('unpatchable', False)}\n" result += f"Triage Phase: {issue.get('triagePhase', 'N/A')}\n" # Add code segment if available if issue.get("sastCodeSegment"): result += "\nVulnerable Code Segment:\n" result += f"```\n{issue.get('sastCodeSegment')}\n```\n" # Add patch information if available if patch and not issue.get("unpatchable", False): result += "\n========== PATCH INFORMATION ==========\n" result += f"PR Link: {patch.get('prLink', 'N/A')}\n" result += f"PR Title: {patch.get('prTitle', 'N/A')}\n" result += f"PR Description: {patch.get('prDescription', 'N/A')}\n" result += f"PR Status: {patch.get('pullRequestStatus', 'N/A')}\n" result += f"Validation Status: {patch.get('validated', 'N/A')}\n" result += f"Created At: {patch.get('createdAt', 'N/A')}\n" result += f"Updated At: {patch.get('updatedAt', 'N/A')}\n" # Add git diff if available if patch.get("gitDiff"): result += "\n========== PATCH ID & GIT DIFF ==========\n" result += f"PATCH ID: {patch.get('id', 'N/A')}\n" result += "========================================\n" result += "Git Diff:\n" result += f"```diff\n{patch.get('gitDiff')}\n```\n" return result def process_vulnerability_response(raw_response): """ Process the raw API response into a more readable format for LLMs. Extracts and organizes the most relevant information in plain text format. """ if "error" in raw_response: return f"Error: {raw_response['error']}" if "issues" not in raw_response: return "No vulnerability issues found in the response." # Count totals and categorize issues total_issues = len(raw_response["issues"]) patchable_count = sum(1 for issue in raw_response["issues"] if not issue.get("unpatchable", False)) unpatchable_count = sum(1 for issue in raw_response["issues"] if issue.get("unpatchable", True)) # Build a formatted text response result = f"Found {total_issues} vulnerability issues. {patchable_count} are patchable, {unpatchable_count} are unpatchable.\n\n" # Process each issue for i, issue in enumerate(raw_response["issues"], 1): result += f"Issue {i}:\n" result += f"ID: {issue.get('id')}\n" result += f"Status: {issue.get('status', 'unknown')}\n" # Include all fields that exist if issue.get("type"): result += f"Type: {issue.get('type')}\n" if issue.get("patchable") is not None: patchable = not issue.get("unpatchable", False) result += f"Patchable: {patchable}\n" if issue.get("language"): result += f"Language: {issue['language']}\n" if issue.get("score") is not None: result += f"Score: {issue['score']}\n" if issue.get("severity") is not None: result += f"Severity: {issue['severity']}\n" if issue.get("generatedTitle"): result += f"Title: {issue['generatedTitle']}\n" if issue.get("generatedDescription"): result += f"Description: {issue['generatedDescription']}\n" if issue.get("affectedFile"): result += f"Affected File: {issue['affectedFile']}\n" if issue.get("cwes"): result += f"CWEs: {', '.join(issue['cwes'])}\n" if issue.get("validated"): result += f"Validation Status: {issue['validated']}\n" if issue.get("triagePhase"): result += f"Triage Phase: {issue['triagePhase']}\n" # Add patch information if available if issue.get("vulnerabilityPatch") and not issue.get("unpatchable", False): patch = issue["vulnerabilityPatch"] result += "\n--- PATCH INFORMATION ---\n" result += f"PATCH ID: {patch.get('id', 'N/A')}\n" result += "------------------------\n" result += "Has Patch: Yes\n" if patch.get("pullRequestStatus"): result += f"Patch Status: {patch['pullRequestStatus']}\n" # Add extra space between issues result += "\n" # Include pagination info if available if "currentPage" in raw_response or "pageSize" in raw_response: result += "Pagination Info:\n" result += f"Current Page: {raw_response.get('currentPage', 1)}\n" result += f"Page Size: {raw_response.get('pageSize', total_issues)}\n" return result

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ZeroPathAI/zeropath-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server