Skip to main content
Glama
server.py45.4 kB
""" ProPublica Nonprofit Explorer MCP Server This module implements the main MCP server with tools for accessing ProPublica's Nonprofit Explorer API data for CRM integration and prospect research. """ import asyncio import logging from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Union import json import csv import io import sys import os from mcp.server.fastmcp import FastMCP import mcp.types as types from .api_client import ProPublicaClient from .models import ( NonprofitOrganization, Filing, SearchResult, FinancialSummary, CRMExport, APIError, NTEE_CATEGORIES, SUBSECTION_CODES, US_STATES ) # Configure logging to go to stderr (not stdout) for MCP compatibility import sys logging.basicConfig( level=logging.INFO, stream=sys.stderr, # Ensure logs go to stderr, not stdout format='%(asctime)s [%(levelname)-8s] %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("propublica-mcp") # Initialize API client api_client = ProPublicaClient() @mcp.tool() async def search_nonprofits( query: str, state: Optional[str] = None, ntee_code: Optional[str] = None, subsection_code: Optional[str] = None, page: int = 0, per_page: int = 25 ) -> str: """ Search for nonprofit organizations using ProPublica's database. Args: query: Search term (organization name, keywords, etc.) state: Two-letter state code (e.g., 'CA', 'NY') ntee_code: NTEE category code (e.g., 'A01', 'B20') subsection_code: 501(c) subsection code (e.g., '3', '4', '6') page: Page number for pagination (default: 0) per_page: Results per page, max 25 (default: 25) Returns: JSON string with search results including organizations and metadata """ try: # Validate inputs if state and state not in US_STATES: return json.dumps({ "error": f"Invalid state code '{state}'. Must be one of: {', '.join(sorted(US_STATES))}" }) if ntee_code and (not ntee_code.isdigit() or int(ntee_code) not in NTEE_CATEGORIES): return json.dumps({ "error": f"Invalid NTEE code '{ntee_code}'. Check NTEE category list." }) if subsection_code and (not subsection_code.isdigit() or int(subsection_code) not in SUBSECTION_CODES): return json.dumps({ "error": f"Invalid subsection code '{subsection_code}'. Must be one of: {', '.join(map(str, SUBSECTION_CODES.keys()))}" }) if per_page > 25: per_page = 25 # Perform search results = await api_client.search_organizations( query=query, state=state, ntee_category=int(ntee_code) if ntee_code else None, subsection_code=int(subsection_code) if subsection_code else None, page=page, limit=per_page ) # Format response response = { "search_query": query, "filters": { "state": state, "ntee_code": ntee_code, "subsection_code": subsection_code }, "search_metadata": { "query": query, "filters_applied": { "state": state, "ntee_code": ntee_code, "subsection_code": subsection_code } }, "pagination": { "page": page, "per_page": per_page, "total_results": results.total_results, "has_more": len(results.organizations) == per_page }, "organizations": [org.model_dump() for org in results.organizations], "generated_at": datetime.now(timezone.utc).isoformat() } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error searching nonprofits: {e}") return json.dumps({ "error": f"Search failed: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def get_organization(ein: str) -> str: """ Get detailed information about a specific nonprofit organization. Args: ein: Employer Identification Number (9 digits, with or without hyphen) Returns: JSON string with detailed organization information """ try: # Clean EIN format clean_ein = ein.replace("-", "").strip() if not clean_ein.isdigit() or len(clean_ein) != 9: return json.dumps({ "error": "Invalid EIN format. Must be 9 digits (e.g., '123456789' or '12-3456789')" }) # Get organization details organization = await api_client.get_organization(clean_ein) # Format response response = { "organization": organization.model_dump(), "retrieved_at": datetime.now(timezone.utc).isoformat() } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error getting organization {ein}: {e}") return json.dumps({ "error": f"Failed to retrieve organization: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def get_organization_filings(ein: str, limit: int = 10) -> str: """ Get Form 990 filings for a specific nonprofit organization. Args: ein: Employer Identification Number (9 digits, with or without hyphen) limit: Maximum number of filings to retrieve (default: 10, max: 100) Returns: JSON string with filing information and financial data """ try: # Clean EIN format clean_ein = ein.replace("-", "").strip() if not clean_ein.isdigit() or len(clean_ein) != 9: return json.dumps({ "error": "Invalid EIN format. Must be 9 digits (e.g., '123456789' or '12-3456789')" }) # Limit validation if limit > 100: limit = 100 # Get filings filings = await api_client.get_organization_filings(clean_ein) # Limit results limited_filings = filings[:limit] if len(filings) > limit else filings # Create filing summary filing_summary = { "total_filings": len(filings), "filings_returned": len(limited_filings), "year_range": { "earliest": min([f.tax_year for f in filings if f.tax_year]) if filings else None, "latest": max([f.tax_year for f in filings if f.tax_year]) if filings else None }, "form_types": list(set([f.form_type for f in filings if f.form_type])), "total_revenue_range": { "min": min([f.totrevenue for f in filings if f.totrevenue]) if filings else None, "max": max([f.totrevenue for f in filings if f.totrevenue]) if filings else None } } # Format response response = { "ein": clean_ein, "total_filings_available": len(filings), "filings_returned": len(limited_filings), "filing_summary": filing_summary, "filings": [filing.model_dump() for filing in limited_filings], "retrieved_at": datetime.now(timezone.utc).isoformat() } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error getting filings for {ein}: {e}") return json.dumps({ "error": f"Failed to retrieve filings: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def analyze_nonprofit_financials(ein: str, years: int = 3) -> str: """ Analyze financial trends and key metrics for a nonprofit organization. Args: ein: Employer Identification Number (9 digits, with or without hyphen) years: Number of recent years to analyze (default: 3, max: 10) Returns: JSON string with financial analysis and trends """ try: # Clean EIN format clean_ein = ein.replace("-", "").strip() if not clean_ein.isdigit() or len(clean_ein) != 9: return json.dumps({ "error": "Invalid EIN format. Must be 9 digits (e.g., '123456789' or '12-3456789')" }) # Limit years if years > 10: years = 10 # Get organization and filings organization = await api_client.get_organization(clean_ein) filings = await api_client.get_organization_filings(clean_ein) # Limit to recent filings recent_filings = filings[:years] if len(filings) > years else filings if not recent_filings: return json.dumps({ "error": "No financial data available for analysis" }) # Calculate financial trends financial_data = [] for filing in recent_filings: if filing.totrevenue is not None or filing.totfuncexpns is not None: financial_data.append({ "tax_year": filing.tax_year, "total_revenue": filing.totrevenue, "total_expenses": filing.totfuncexpns, "net_assets": filing.net_assets, "filing_date": filing.filing_date }) # Calculate trends trends = {} if len(financial_data) >= 2: latest = financial_data[0] previous = financial_data[1] if latest["total_revenue"] and previous["total_revenue"]: revenue_change = ((latest["total_revenue"] - previous["total_revenue"]) / previous["total_revenue"]) * 100 trends["revenue_change_percent"] = round(revenue_change, 2) if latest["total_expenses"] and previous["total_expenses"]: expense_change = ((latest["total_expenses"] - previous["total_expenses"]) / previous["total_expenses"]) * 100 trends["expense_change_percent"] = round(expense_change, 2) # Calculate ratios for latest year ratios = {} if financial_data: latest = financial_data[0] if latest["total_revenue"] and latest["total_expenses"]: ratios["expense_ratio"] = round((latest["total_expenses"] / latest["total_revenue"]) * 100, 2) ratios["surplus_deficit"] = latest["total_revenue"] - latest["total_expenses"] # Create summary using only fields that exist in FinancialSummary model summary = FinancialSummary( ein=clean_ein, organization_name=organization.name, year_range_start=financial_data[-1]["tax_year"] if financial_data else datetime.now(timezone.utc).year - years, year_range_end=financial_data[0]["tax_year"] if financial_data else datetime.now(timezone.utc).year, filings_analyzed=len(financial_data), avg_revenue=sum(f["total_revenue"] for f in financial_data if f["total_revenue"]) / len([f for f in financial_data if f["total_revenue"]]) if any(f["total_revenue"] for f in financial_data) else None, revenue_trend="increasing" if trends.get("revenue_change_percent", 0) > 5 else "decreasing" if trends.get("revenue_change_percent", 0) < -5 else "stable", avg_expenses=sum(f["total_expenses"] for f in financial_data if f["total_expenses"]) / len([f for f in financial_data if f["total_expenses"]]) if any(f["total_expenses"] for f in financial_data) else None, avg_expense_ratio=ratios.get("expense_ratio", 0) / 100 if ratios.get("expense_ratio") else None, expense_trend="increasing" if trends.get("expense_change_percent", 0) > 5 else "decreasing" if trends.get("expense_change_percent", 0) < -5 else "stable", avg_net_assets=sum(f["net_assets"] for f in financial_data if f["net_assets"]) / len([f for f in financial_data if f["net_assets"]]) if any(f["net_assets"] for f in financial_data) else None, annual_data=financial_data ) # Format response response = { "financial_summary": summary.model_dump(), "detailed_data": financial_data, "trends": trends, "ratios": ratios, "analysis_notes": [ f"Analysis covers {len(financial_data)} years of financial data", "Revenue and expense trends calculated year-over-year", "Expense ratio shows total expenses as % of total revenue", "All amounts in USD" ] } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error analyzing financials for {ein}: {e}") return json.dumps({ "error": f"Financial analysis failed: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def search_similar_nonprofits( ein: str, radius_miles: Optional[int] = None, same_ntee: bool = True, min_revenue: Optional[int] = None, max_revenue: Optional[int] = None, limit: int = 10 ) -> str: """ Find nonprofits similar to a given organization based on various criteria. Args: ein: Reference organization's EIN (9 digits, with or without hyphen) radius_miles: Geographic radius for location-based search same_ntee: Whether to limit to same NTEE category (default: True) min_revenue: Minimum annual revenue filter max_revenue: Maximum annual revenue filter limit: Maximum number of similar organizations to return (default: 10, max: 25) Returns: JSON string with similar organizations and comparison metrics """ try: # Clean EIN format clean_ein = ein.replace("-", "").strip() if not clean_ein.isdigit() or len(clean_ein) != 9: return json.dumps({ "error": "Invalid EIN format. Must be 9 digits (e.g., '123456789' or '12-3456789')" }) # Limit validation if limit > 25: limit = 25 # Get reference organization reference_org = await api_client.get_organization(clean_ein) # Build search criteria based on reference organization search_params = {} # Extract NTEE category number from the NTEE code (e.g., "A01" -> 1) ntee_category = None if same_ntee and reference_org.ntee_code: # NTEE codes start with a letter indicating the category # Map letters to numbers: A=1, B=2, etc. ntee_letter = reference_org.ntee_code[0].upper() ntee_category = ord(ntee_letter) - ord('A') + 1 if ntee_letter.isalpha() else None if reference_org.state: search_params["state"] = reference_org.state # Search for similar organizations # Use organization type/category as search term if available search_query = reference_org.ntee_code or "nonprofit" results = await api_client.search_organizations( query=search_query, limit=limit + 5, # Get a few extra to filter out the reference org state=search_params.get("state"), ntee_category=ntee_category ) # Filter out the reference organization and apply revenue filters similar_orgs = [] for org in results.organizations: if org.ein == clean_ein: continue # Skip the reference organization # Apply revenue filters if specified # Note: NonprofitOrganization model doesn't have income_amount field # Revenue filtering would require filing data, skipping for now pass similar_orgs.append(org) if len(similar_orgs) >= limit: break # Create comparison metrics comparisons = [] for org in similar_orgs: comparison = { "organization": org.model_dump(), "similarity_factors": { "same_state": org.state == reference_org.state, "same_ntee_category": (org.ntee_code and reference_org.ntee_code and org.ntee_code[:3] == reference_org.ntee_code[:3]), "similar_revenue_range": "unknown" # Revenue data not available in basic org data } } comparisons.append(comparison) # Format response response = { "reference_organization": { "ein": reference_org.ein, "name": reference_org.name, "state": reference_org.state, "ntee_code": reference_org.ntee_code, "revenue": None # Revenue data not available in basic org data }, "search_criteria": { "same_ntee": same_ntee, "radius_miles": radius_miles, "min_revenue": min_revenue, "max_revenue": max_revenue, "limit": limit }, "similar_organizations_found": len(comparisons), "similar_organizations": comparisons, "generated_at": datetime.now(timezone.utc).isoformat() } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error finding similar nonprofits for {ein}: {e}") return json.dumps({ "error": f"Similar organization search failed: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def search_nonprofits_with_pdfs( query: str, limit: int = 10 ) -> str: """ Search for nonprofit organizations that have PDF Form 990 filings available. Args: query: Search term (organization name, keywords, etc.) limit: Maximum number of organizations to return (default: 10) Returns: JSON string with organizations that have PDF filings available """ try: # Use the client for the PDF search method organizations = await api_client.get_organizations_with_pdfs(query, limit) response = { "search_query": query, "pdf_organizations_found": len(organizations), "organizations": organizations, "search_criteria": { "have_pdfs": True, "pdf_url_required": True }, "generated_at": datetime.now(timezone.utc).isoformat() } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error searching nonprofits with PDFs: {e}") return json.dumps({ "error": f"PDF search failed: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def get_most_recent_pdf(ein: str) -> str: """ Get the most recent Form 990 PDF filing available for a specific organization. This function searches through all filings for an organization, starting with the most recent year and working backwards until it finds a filing with an available PDF. Args: ein: Employer Identification Number (9 digits, with or without hyphen) Returns: JSON string with the most recent PDF filing info or error message """ try: # Clean EIN format clean_ein = ein.replace("-", "").strip() if not clean_ein.isdigit() or len(clean_ein) != 9: return json.dumps({ "error": "Invalid EIN format. Must be 9 digits (e.g., '123456789' or '12-3456789')" }) # Get the most recent PDF filing pdf_filing = await api_client.get_most_recent_pdf_filing(clean_ein) if pdf_filing is None: return json.dumps({ "ein": clean_ein, "has_pdf": False, "message": "No PDF filings found for this organization", "searched_at": datetime.now(timezone.utc).isoformat() }) response = { "ein": clean_ein, "organization_name": pdf_filing["organization_name"], "has_pdf": True, "most_recent_pdf": { "tax_year": pdf_filing["tax_year"], "form_type": pdf_filing["form_type"], "pdf_url": pdf_filing["pdf_url"], "filing_date": pdf_filing["filing_date"] }, "download_instructions": { "method": "GET", "url": pdf_filing["pdf_url"], "note": "This URL will redirect to the actual PDF file on ProPublica's servers" }, "retrieved_at": datetime.now(timezone.utc).isoformat() } return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error getting most recent PDF for {ein}: {e}") return json.dumps({ "error": f"Failed to get most recent PDF: {str(e)}", "error_type": type(e).__name__ }) @mcp.tool() async def export_nonprofit_data( eins: List[str], format: str = "json", include_financials: bool = True, include_filings: bool = False, max_filings_per_org: int = 3 ) -> str: """ Export comprehensive data for multiple nonprofit organizations in various formats. Args: eins: List of EINs to export (up to 10 organizations) format: Export format: 'json', 'csv' (default: 'json') include_financials: Whether to include financial analysis (default: True) include_filings: Whether to include recent filings (default: False) max_filings_per_org: Max filings per org if include_filings=True (default: 3) Returns: Formatted data export suitable for CRM integration or analysis """ try: # Validate inputs if not eins or len(eins) == 0: return json.dumps({ "error": "No EINs provided for export" }) if len(eins) > 10: return json.dumps({ "error": "Maximum 10 organizations allowed per export" }) if format not in ["json", "csv"]: return json.dumps({ "error": "Invalid format. Must be 'json' or 'csv'" }) # Clean EINs clean_eins = [] for ein in eins: clean_ein = ein.replace("-", "").strip() if not clean_ein.isdigit() or len(clean_ein) != 9: return json.dumps({ "error": f"Invalid EIN format: {ein}. Must be 9 digits" }) clean_eins.append(clean_ein) # Collect data for each organization export_data = [] errors = [] for ein in clean_eins: try: # Get basic organization data org = await api_client.get_organization(ein) org_data = { "ein": ein, "organization_name": org.name, "sub_name": org.sub_name, "street_address": org.address, "city": org.city, "state": org.state, "zipcode": org.zipcode, "ntee_code": org.ntee_code, "subsection_code": org.subseccd, "guidestar_url": org.guidestar_url, "nccs_url": org.nccs_url, "updated": org.updated.isoformat() if org.updated else None } # Add financial analysis if requested if include_financials: try: filings = await api_client.get_organization_filings(ein) if filings: latest_filing = filings[0] org_data.update({ "latest_filing_year": latest_filing.tax_year, "latest_total_revenue": latest_filing.totrevenue, "latest_total_expenses": latest_filing.totfuncexpns, "latest_net_assets": latest_filing.net_assets, "latest_filing_date": latest_filing.filing_date.isoformat() if latest_filing.filing_date else None }) except Exception as e: logger.warning(f"Could not get financial data for {ein}: {e}") # Add recent filings if requested if include_filings: try: filings = await api_client.get_organization_filings(ein) recent_filings = filings[:max_filings_per_org] org_data["recent_filings"] = [filing.model_dump() for filing in recent_filings] except Exception as e: logger.warning(f"Could not get filings for {ein}: {e}") export_data.append(org_data) except Exception as e: errors.append({ "ein": ein, "error": str(e) }) logger.error(f"Error exporting data for {ein}: {e}") # Create export result - using a simple dict instead of CRMExport model for now export_result = { "export_id": f"propublica_export_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", "generated_at": datetime.now(timezone.utc).isoformat(), "total_organizations": len(clean_eins), "successful_exports": len(export_data), "failed_exports": len(errors), "export_format": format, "organizations": export_data, "errors": errors, "metadata": { "include_financials": include_financials, "include_filings": include_filings, "max_filings_per_org": max_filings_per_org if include_filings else 0, "api_version": "v2", "source": "ProPublica Nonprofit Explorer" } } # Format output based on requested format if format == "json": return json.dumps(export_result, indent=2) elif format == "csv": # Create CSV output output = io.StringIO() if export_data: # Define the fieldnames in a specific order starting with key fields key_fields = ["ein", "organization_name", "sub_name", "street_address", "city", "state", "zipcode", "ntee_code", "subsection_code"] # Get all possible fieldnames from the data all_fieldnames = set() for org in export_data: all_fieldnames.update(org.keys()) # Remove complex fields that don't work well in CSV all_fieldnames.discard("recent_filings") all_fieldnames.discard("classification_codes") # Start with key fields, then add remaining fields fieldnames = [] for field in key_fields: if field in all_fieldnames: fieldnames.append(field) all_fieldnames.remove(field) # Add remaining fields in sorted order fieldnames.extend(sorted(list(all_fieldnames))) writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() for org in export_data: # Create a clean row without complex fields clean_row = {k: v for k, v in org.items() if k in fieldnames} writer.writerow(clean_row) csv_content = output.getvalue() output.close() # Return CSV with metadata return json.dumps({ "export_metadata": { "export_id": export_result["export_id"], "generated_at": export_result["generated_at"], "total_organizations": export_result["total_organizations"], "successful_exports": export_result["successful_exports"], "failed_exports": export_result["failed_exports"], "errors": export_result["errors"] }, "csv_data": csv_content }, indent=2) except Exception as e: logger.error(f"Error exporting nonprofit data: {e}") return json.dumps({ "error": f"Export failed: {str(e)}", "error_type": type(e).__name__ }) def _get_revenue_similarity(revenue1: Optional[int], revenue2: Optional[int]) -> str: """Helper function to determine revenue similarity category.""" if not revenue1 or not revenue2: return "unknown" ratio = min(revenue1, revenue2) / max(revenue1, revenue2) if ratio > 0.8: return "very_similar" elif ratio > 0.5: return "similar" elif ratio > 0.2: return "somewhat_similar" else: return "different" # Note: Resource handling removed for now to focus on core tools # Resources can be added back later once the core functionality is working # Main function to run the server def main(): """Main function to run the ProPublica MCP server.""" import argparse import os parser = argparse.ArgumentParser(description="ProPublica Nonprofit Explorer MCP Server") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) parser.add_argument("--http", action="store_true", help="Run HTTP server instead of stdio") parser.add_argument("--host", default="127.0.0.1", help="Host to bind HTTP server to") parser.add_argument("--port", type=int, default=8080, help="Port to bind HTTP server to") args = parser.parse_args() # Set log level logging.getLogger().setLevel(getattr(logging, args.log_level)) # Auto-detect HTTP mode for cloud deployment # Check for common cloud deployment environment variables cloud_deployment = any([ os.getenv("PORT"), # Common cloud port env var os.getenv("DO_APP_URL"), # DigitalOcean specific os.getenv("RAILWAY_PUBLIC_DOMAIN"), # Railway os.getenv("RENDER_EXTERNAL_URL"), # Render os.getenv("CF_PAGES_URL"), # Cloudflare Pages/Workers args.http # Explicit flag ]) if cloud_deployment: # Run HTTP server for cloud deployment with Streamable HTTP transport logger.info("Starting ProPublica MCP server in Streamable HTTP mode") # Use PORT environment variable if available (common for cloud platforms) port = int(os.getenv("PORT", args.port)) host = "0.0.0.0" if cloud_deployment else args.host # Bind to all interfaces in cloud try: from starlette.applications import Starlette from starlette.routing import Route from starlette.responses import JSONResponse, StreamingResponse from starlette.requests import Request import uvicorn import uuid logger.info(f"HTTP server will bind to {host}:{port}") # Store for session management sessions = {} async def mcp_endpoint(request: Request): """Single MCP endpoint that handles both GET and POST as per 2025-03-26 spec""" try: # Validate Origin header for security (when present) origin = request.headers.get("origin") if origin and origin not in ["https://cursor.sh", "https://localhost", "http://localhost"]: # For now, we'll allow all origins but log them logger.warning(f"Request from origin: {origin}") if request.method == "POST": # Handle JSON-RPC messages sent to server # Check required Accept header accept_header = request.headers.get("accept", "") if "application/json" not in accept_header and "text/event-stream" not in accept_header: return JSONResponse( {"error": "Accept header must include application/json and/or text/event-stream"}, status_code=400 ) # Parse request body try: body = await request.body() if not body: return JSONResponse({"error": "Empty request body"}, status_code=400) json_data = json.loads(body.decode('utf-8')) except json.JSONDecodeError as e: return JSONResponse({"error": f"Invalid JSON: {str(e)}"}, status_code=400) # Check session ID if required session_id = request.headers.get("mcp-session-id") # Handle the JSON-RPC message through FastMCP try: # Handle different types of JSON-RPC messages if isinstance(json_data, dict): # Single message response = await handle_jsonrpc_message(json_data, session_id) elif isinstance(json_data, list): # Batch messages responses = [] for msg in json_data: resp = await handle_jsonrpc_message(msg, session_id) responses.append(resp) response = responses else: return JSONResponse( {"error": "Invalid JSON-RPC format"}, status_code=400 ) # For initialize requests, optionally set session ID if (isinstance(json_data, dict) and json_data.get("method") == "initialize"): new_session_id = str(uuid.uuid4()) sessions[new_session_id] = {"created": datetime.now(timezone.utc)} headers = {"mcp-session-id": new_session_id} return JSONResponse(response, headers=headers) return JSONResponse(response) except Exception as e: logger.error(f"Error processing MCP message: {e}") return JSONResponse( {"error": f"Failed to process message: {str(e)}"}, status_code=500 ) elif request.method == "GET": # Handle GET requests for SSE streams (optional in spec) accept_header = request.headers.get("accept", "") if "text/event-stream" not in accept_header: return JSONResponse( {"error": "GET requires Accept: text/event-stream"}, status_code=405 ) # For now, we don't implement GET SSE streams # This is optional per the spec return JSONResponse( {"error": "GET SSE streams not implemented"}, status_code=405 ) else: return JSONResponse( {"error": "Method not allowed. Use POST or GET."}, status_code=405 ) except Exception as e: logger.error(f"Endpoint error: {e}") return JSONResponse( {"error": f"Internal server error: {str(e)}"}, status_code=500 ) async def handle_jsonrpc_message(message: dict, session_id: Optional[str] = None): """Handle a single JSON-RPC message""" try: method = message.get("method") params = message.get("params", {}) msg_id = message.get("id") # Handle initialize if method == "initialize": return { "jsonrpc": "2.0", "result": { "protocolVersion": "2025-03-26", "capabilities": { "tools": {} }, "serverInfo": { "name": "propublica-mcp", "version": "1.0.0" } }, "id": msg_id } # Handle tools/list elif method == "tools/list": try: # Use FastMCP's built-in list_tools method tools_list = await mcp.list_tools() # Convert tool objects to the expected JSON format tools = [] for tool in tools_list: tools.append({ "name": tool.name, "description": tool.description, "inputSchema": tool.inputSchema }) return { "jsonrpc": "2.0", "result": { "tools": tools }, "id": msg_id } except Exception as e: logger.error(f"Error listing tools: {e}") return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Failed to list tools: {str(e)}" }, "id": msg_id } # Handle tools/call elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if not tool_name: return { "jsonrpc": "2.0", "error": { "code": -32602, "message": "Missing tool name" }, "id": msg_id } # Call the tool using FastMCP try: result = await mcp.call_tool(tool_name, arguments) # result is a list of TextContent objects content = [] for item in result: if hasattr(item, 'type') and hasattr(item, 'text'): content.append({ "type": item.type, "text": item.text }) else: # Fallback for other content types content.append({ "type": "text", "text": str(item) }) return { "jsonrpc": "2.0", "result": { "content": content, "isError": False }, "id": msg_id } except Exception as e: logger.error(f"Tool {tool_name} error: {e}") return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Tool execution failed: {str(e)}" }, "id": msg_id } # Handle notifications (no response needed) elif msg_id is None: logger.info(f"Received notification: {method}") return None # Unknown method else: return { "jsonrpc": "2.0", "error": { "code": -32601, "message": f"Method not found: {method}" }, "id": msg_id } except Exception as e: logger.error(f"Error handling JSON-RPC message: {e}") return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Internal error: {str(e)}" }, "id": message.get("id") } async def health_check(request: Request): """Health check endpoint for cloud platforms""" try: return JSONResponse({ "status": "healthy", "server": "propublica-mcp", "version": "2025-03-26", "transport": "streamable-http" }) except Exception as e: logger.error(f"Health check error: {e}") return JSONResponse({"status": "unhealthy", "error": str(e)}, status_code=500) # Create Starlette app with single MCP endpoint app = Starlette( routes=[ Route("/", endpoint=mcp_endpoint, methods=["GET", "POST"]), Route("/health", endpoint=health_check, methods=["GET"]), ] ) # Run with uvicorn uvicorn.run(app, host=host, port=port, log_level=args.log_level.lower()) except ImportError as e: logger.error(f"HTTP dependencies not available: {e}") logger.error("Please install: pip install starlette uvicorn") return 1 else: # Run stdio server for local MCP usage logger.info("Starting ProPublica MCP server in stdio mode") mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/asachs01/propublica-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server