Skip to main content
Glama

PR Reviewer

pr_analyzer.py4.07 kB
import sys import os import traceback from typing import Any, List, Dict from mcp.server.fastmcp import FastMCP from github_integration import fetch_pr_changes from notion_client import Client from dotenv import load_dotenv class PRAnalyzer: def __init__(self): # Load environment variables load_dotenv() # Initialize MCP Server self.mcp = FastMCP("github_pr_analysis") print("MCP Server initialized", file=sys.stderr) # Initialize Notion client self._init_notion() # Register MCP tools self._register_tools() def _init_notion(self): """Initialize the Notion client with API key and page ID.""" try: self.notion_api_key = os.getenv("NOTION_API_KEY") self.notion_page_id = os.getenv("NOTION_PAGE_ID") if not self.notion_api_key or not self.notion_page_id: raise ValueError("Missing Notion API key or page ID in environment variables") self.notion = Client(auth=self.notion_api_key) print(f"Notion client initialized successfully", file=sys.stderr) print(f"Using Notion page ID: {self.notion_page_id}", file=sys.stderr) except Exception as e: print(f"Error initializing Notion client: {str(e)}", file=sys.stderr) traceback.print_exc(file=sys.stderr) sys.exit(1) def _register_tools(self): """Register MCP tools for PR analysis.""" @self.mcp.tool() async def fetch_pr(repo_owner: str, repo_name: str, pr_number: int) -> Dict[str, Any]: """Fetch changes from a GitHub pull request.""" print(f"Fetching PR #{pr_number} from {repo_owner}/{repo_name}", file=sys.stderr) try: pr_info = fetch_pr_changes(repo_owner, repo_name, pr_number) if pr_info is None: print("No changes returned from fetch_pr_changes", file=sys.stderr) return {} print(f"Successfully fetched PR information", file=sys.stderr) return pr_info except Exception as e: print(f"Error fetching PR: {str(e)}", file=sys.stderr) traceback.print_exc(file=sys.stderr) return {} @self.mcp.tool() async def create_notion_page(title: str, content: str) -> str: """Create a Notion page with PR analysis.""" print(f"Creating Notion page: {title}", file=sys.stderr) try: self.notion.pages.create( parent={"type": "page_id", "page_id": self.notion_page_id}, properties={"title": {"title": [{"text": {"content": title}}]}}, children=[{ "object": "block", "type": "paragraph", "paragraph": { "rich_text": [{ "type": "text", "text": {"content": content} }] } }] ) print(f"Notion page '{title}' created successfully!", file=sys.stderr) return f"Notion page '{title}' created successfully!" except Exception as e: error_msg = f"Error creating Notion page: {str(e)}" print(error_msg, file=sys.stderr) traceback.print_exc(file=sys.stderr) return error_msg def run(self): """Start the MCP server.""" try: print("Running MCP Server for GitHub PR Analysis...", file=sys.stderr) self.mcp.run(transport="stdio") except Exception as e: print(f"Fatal Error in MCP Server: {str(e)}", file=sys.stderr) traceback.print_exc(file=sys.stderr) sys.exit(1) if __name__ == "__main__": analyzer = PRAnalyzer() analyzer.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/willianmarcel/mcp-pr-reviewer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server