Skip to main content
Glama

Revenue Intelligence MCP Server

by drew6050
server.py18.8 kB
#!/usr/bin/env python3 """ Revenue Intelligence MCP Server A production-ready MCP server demonstrating ML system integration patterns for customer-facing business teams. Provides lead scoring, churn detection, and conversion insights with full observability. """ import logging from typing import Any from datetime import datetime, timedelta import mcp.types as types from mcp.server import Server from mcp.server.stdio import stdio_server from config import ( MODEL_VERSION, TRAINING_DATE, MODEL_PERFORMANCE_METRICS, FEATURE_IMPORTANCE, LOG_LEVEL, LOG_FORMAT ) from data_store import ( get_account, get_lead, store_prediction_log, get_prediction_count_24h, get_all_accounts ) from scoring import ( score_lead, detect_churn_risk, calculate_conversion_probability ) # Configure logging logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) logger = logging.getLogger(__name__) # Initialize MCP server app = Server("revenue-intel-mcp") # Track server start time for uptime calculation SERVER_START_TIME = datetime.utcnow() @app.list_resources() async def list_resources() -> list[types.Resource]: """ List available resources in the Revenue Intelligence system. Resources expose CRM data and model metadata. """ logger.info("Listing available resources") return [ types.Resource( uri="crm://accounts/list", name="All CRM Accounts", mimeType="application/json", description="List of all customer accounts with usage signals" ), types.Resource( uri="models://lead_scorer/metadata", name="Lead Scorer Model Metadata", mimeType="application/json", description="Model version, performance metrics, and drift status" ) ] @app.read_resource() async def read_resource(uri: str) -> str: """ Read a specific resource by URI. Supported URIs: - crm://accounts/{account_id} - Get specific account data - crm://leads/{lead_id} - Get specific lead data - models://lead_scorer/metadata - Get model metadata """ logger.info(f"Reading resource: {uri}") # Parse URI if uri.startswith("crm://accounts/"): account_id = uri.replace("crm://accounts/", "") if account_id == "list": # Return all accounts accounts = get_all_accounts() import json return json.dumps(accounts, indent=2) account = get_account(account_id) if not account: raise ValueError(f"Account not found: {account_id}") import json return json.dumps(account, indent=2) elif uri.startswith("crm://leads/"): lead_id = uri.replace("crm://leads/", "") lead = get_lead(lead_id) if not lead: raise ValueError(f"Lead not found: {lead_id}") import json return json.dumps(lead, indent=2) elif uri == "models://lead_scorer/metadata": # Calculate drift status (simplified) prediction_count = get_prediction_count_24h() drift_status = "normal" if prediction_count < 1000 else "warning" metadata = { "model_version": MODEL_VERSION, "training_date": TRAINING_DATE, "performance_metrics": MODEL_PERFORMANCE_METRICS, "feature_importance": FEATURE_IMPORTANCE, "drift_status": drift_status } import json return json.dumps(metadata, indent=2) else: raise ValueError(f"Unknown resource URI: {uri}") @app.list_tools() async def list_tools() -> list[types.Tool]: """ List available tools for revenue intelligence operations. Tools enable ML predictions and system monitoring. """ logger.info("Listing available tools") return [ types.Tool( name="score_lead", description="Score a lead based on company attributes and engagement signals. Returns score (0-100), tier (hot/warm/cold), and feature attributions.", inputSchema={ "type": "object", "properties": { "company_name": { "type": "string", "description": "Name of the company" }, "signals": { "type": "object", "description": "Engagement signals (website_visits_30d, demo_requested, email_engagement_score, etc.)", "properties": { "website_visits_30d": {"type": "number"}, "demo_requested": {"type": "boolean"}, "whitepaper_downloads": {"type": "number"}, "email_engagement_score": {"type": "number"}, "linkedin_engagement": {"type": "boolean"}, "free_trial_started": {"type": "boolean"} } }, "industry": { "type": "string", "description": "Company industry (technology, finance, healthcare, etc.)", "default": "technology" }, "employee_count": { "type": "number", "description": "Number of employees", "default": 100 } }, "required": ["company_name", "signals"] } ), types.Tool( name="get_conversion_insights", description="Analyze trial account and predict conversion probability with recommended actions.", inputSchema={ "type": "object", "properties": { "account_id": { "type": "string", "description": "Account ID (e.g., acc_002)" } }, "required": ["account_id"] } ), types.Tool( name="detect_churn_risk", description="Analyze account health and detect churn risk with suggested interventions.", inputSchema={ "type": "object", "properties": { "account_id": { "type": "string", "description": "Account ID (e.g., acc_001)" } }, "required": ["account_id"] } ), types.Tool( name="check_model_health", description="Check ML model health metrics, uptime, and drift status.", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="log_prediction", description="Log a prediction for monitoring and drift detection.", inputSchema={ "type": "object", "properties": { "prediction_data": { "type": "object", "description": "Prediction details including type, input, and result", "properties": { "prediction_type": {"type": "string"}, "input_data": {"type": "object"}, "prediction_result": {"type": "object"} }, "required": ["prediction_type", "input_data", "prediction_result"] } }, "required": ["prediction_data"] } ) ] @app.call_tool() async def call_tool(name: str, arguments: Any) -> list[types.TextContent]: """ Execute a tool with the provided arguments. """ logger.info(f"Calling tool: {name}") import json try: if name == "score_lead": # Extract parameters company_name = arguments["company_name"] signals = arguments["signals"] industry = arguments.get("industry", "technology") employee_count = arguments.get("employee_count", 100) # Score the lead result = score_lead(company_name, signals, industry, employee_count) # Log the prediction store_prediction_log( prediction_type="lead_score", input_data={ "company_name": company_name, "signals": signals, "industry": industry, "employee_count": employee_count }, prediction_result=result, model_version=MODEL_VERSION ) return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "get_conversion_insights": account_id = arguments["account_id"] account = get_account(account_id) if not account: raise ValueError(f"Account not found: {account_id}") if account["plan"] != "trial": return [types.TextContent( type="text", text=json.dumps({ "error": f"Account {account_id} is not a trial account (plan: {account['plan']})" }, indent=2) )] result = calculate_conversion_probability(account) # Log the prediction store_prediction_log( prediction_type="conversion_probability", input_data={"account_id": account_id}, prediction_result=result, model_version=MODEL_VERSION ) return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "detect_churn_risk": account_id = arguments["account_id"] account = get_account(account_id) if not account: raise ValueError(f"Account not found: {account_id}") result = detect_churn_risk(account) # Log the prediction store_prediction_log( prediction_type="churn_risk", input_data={"account_id": account_id}, prediction_result=result, model_version=MODEL_VERSION ) return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "check_model_health": uptime_seconds = (datetime.utcnow() - SERVER_START_TIME).total_seconds() uptime_hours = uptime_seconds / 3600 prediction_count = get_prediction_count_24h() # Simple drift detection (would compare distributions in production) drift_detected = prediction_count > 1000 # Simplified threshold health_status = { "model_version": MODEL_VERSION, "uptime_hours": round(uptime_hours, 2), "prediction_count_24h": prediction_count, "drift_detected": drift_detected, "accuracy_last_7d": MODEL_PERFORMANCE_METRICS["accuracy"], "performance_metrics": MODEL_PERFORMANCE_METRICS, "alerts": [ "High prediction volume - monitoring for drift" ] if drift_detected else [] } return [types.TextContent( type="text", text=json.dumps(health_status, indent=2) )] elif name == "log_prediction": prediction_data = arguments["prediction_data"] result = store_prediction_log( prediction_type=prediction_data["prediction_type"], input_data=prediction_data["input_data"], prediction_result=prediction_data["prediction_result"], model_version=MODEL_VERSION ) return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Error executing tool {name}: {str(e)}") return [types.TextContent( type="text", text=json.dumps({ "error": str(e), "tool": name }, indent=2) )] @app.list_prompts() async def list_prompts() -> list[types.Prompt]: """ List available prompt templates for revenue intelligence workflows. """ logger.info("Listing available prompts") return [ types.Prompt( name="analyze-account-expansion", description="Template for CS team to assess upsell opportunity for an account", arguments=[ types.PromptArgument( name="account_id", description="Account ID to analyze", required=True ) ] ), types.Prompt( name="weekly-lead-report", description="Template for Sales leadership pipeline quality report", arguments=[ types.PromptArgument( name="week_number", description="Week number for the report", required=False ) ] ), types.Prompt( name="explain-low-score", description="Template to generate explanation for why a lead scored poorly", arguments=[ types.PromptArgument( name="lead_id", description="Lead ID to explain", required=True ) ] ) ] @app.get_prompt() async def get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult: """ Get a specific prompt template with arguments filled in. """ logger.info(f"Getting prompt: {name}") if name == "analyze-account-expansion": account_id = arguments.get("account_id", "acc_001") if arguments else "acc_001" account = get_account(account_id) if not account: raise ValueError(f"Account not found: {account_id}") prompt_text = f"""# Account Expansion Analysis: {account['company']} **Account ID:** {account_id} **Current Plan:** {account['plan']} **MRR:** ${account['mrr']} ## Task Analyze this account's usage signals and determine: 1. Upsell readiness score (0-100) 2. Recommended next tier 3. Key talking points for CS conversation 4. Estimated expansion revenue potential ## Usage Signals {account['usage_signals']} Please provide a structured analysis with specific recommendations. """ return types.GetPromptResult( description=f"Account expansion analysis for {account['company']}", messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=prompt_text ) ) ] ) elif name == "weekly-lead-report": week_number = arguments.get("week_number", "45") if arguments else "45" prompt_text = f"""# Weekly Lead Quality Report - Week {week_number} ## Task Generate a leadership summary of lead pipeline quality including: 1. **Lead Volume & Velocity** - Total new leads this week - Hot/Warm/Cold distribution - Week-over-week trend 2. **Quality Metrics** - Average lead score - Demo request rate - Trial start rate - Top performing industries 3. **Pipeline Health** - High-value opportunities (enterprise leads scoring >80) - At-risk leads (engaged but not converting) - Recommended focus areas 4. **Action Items** - Leads requiring immediate follow-up - Campaigns to optimize - Resource allocation recommendations Please analyze the lead data and provide a concise executive summary. """ return types.GetPromptResult( description=f"Weekly lead pipeline quality report for week {week_number}", messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=prompt_text ) ) ] ) elif name == "explain-low-score": lead_id = arguments.get("lead_id", "lead_001") if arguments else "lead_001" lead = get_lead(lead_id) if not lead: raise ValueError(f"Lead not found: {lead_id}") # Score the lead result = score_lead( company_name=lead["company"], signals=lead["signals"], industry=lead["industry"], employee_count=lead["employee_count"] ) prompt_text = f"""# Low Lead Score Explanation: {lead['company']} **Lead ID:** {lead_id} **Score:** {result['score']}/100 ({result['tier']} tier) **Industry:** {lead['industry']} **Size:** {lead['employee_count']} employees ## Current Engagement Signals {lead['signals']} ## Task This lead scored in the {result['tier']} tier. Please provide: 1. **Root Cause Analysis** - Which signals are weakest? - What's missing compared to high-scoring leads? 2. **Improvement Plan** - Specific actions to increase engagement - Content/campaigns to deploy - Timeline for re-scoring 3. **Resource Assessment** - Is this lead worth continued investment? - Should we adjust qualification criteria? Provide a structured analysis with specific, actionable recommendations. """ return types.GetPromptResult( description=f"Explanation for low score of {lead['company']}", messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=prompt_text ) ) ] ) else: raise ValueError(f"Unknown prompt: {name}") async def main(): """Run the MCP server.""" logger.info(f"Starting Revenue Intelligence MCP Server (Model: {MODEL_VERSION})") async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": import asyncio asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/drew6050/revenue-intel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server