Skip to main content
Glama

brain-trust

by bernierllc
server.py46.2 kB
#!/usr/bin/env python3 """ Contextual Q&A MCP Server with OpenAI integration and plan review capabilities. """ import asyncio import json import logging import os from datetime import datetime from enum import Enum from pathlib import Path from typing import Annotated, Any, Dict, List, Optional, cast import openai import structlog from fastapi import HTTPException, Request from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastmcp import FastMCP from fastmcp.server.dependencies import get_http_headers from openai.types.chat import ChatCompletionMessageParam from pydantic import BaseModel, Field from starlette.responses import Response from starlette.status import HTTP_404_NOT_FOUND try: import importlib # lazy optional import to avoid static import errors _pc = importlib.import_module("prometheus_client") CONTENT_TYPE_LATEST = _pc.CONTENT_TYPE_LATEST Counter = _pc.Counter # type: ignore[attr-defined] generate_latest = _pc.generate_latest # type: ignore[attr-defined] PROMETHEUS_AVAILABLE = True except ( ModuleNotFoundError, ImportError, AttributeError, ): # pragma: no cover - optional dependency fallback PROMETHEUS_AVAILABLE = False class _DummyCounter: def labels(self, *args: Any, **kwargs: Any) -> "_DummyCounter": _ = (args, kwargs) return self def inc(self, n: int = 1) -> None: _ = n return None def Counter(_name: str, _desc: str, _labelnames: List[str]) -> _DummyCounter: # type: ignore[misc] return _DummyCounter() CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" def generate_latest() -> bytes: # type: ignore[misc] return b"" # Get environment and log level from environment variables ENVIRONMENT = os.getenv("ENVIRONMENT", "development") LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENVIRONMENT == "development" else "INFO") TRACK_METRICS_DB = os.getenv("TRACK_METRICS_DB", "false").strip().lower() == "true" DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL") ENABLE_DEMOS = ( os.getenv("ENABLE_DEMOS", "true" if ENVIRONMENT == "development" else "false") .strip() .lower() == "true" ) # Configure structured logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer(), ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) # Set the log level logging.basicConfig(level=LOG_LEVEL) logger = structlog.get_logger() # Initialize FastMCP server mcp = FastMCP("brain-trust") # Create HTTP app for custom routes/static assets http_app = mcp.http_app() # Serve static files from dist directory dist_path = Path(__file__).parent / "dist" if dist_path.exists(): http_app.mount( "/assets", StaticFiles(directory=str(dist_path / "assets")), name="assets" ) def get_config_from_headers() -> Dict[str, Any]: """Extract configuration from HTTP headers.""" headers = get_http_headers() config: Dict[str, Any] = {} # Read API key from header if api_key := headers.get("x-openai-api-key"): config["api_key"] = api_key # Read model from header if model := headers.get("x-openai-model"): config["model"] = model # Read max tokens from header if max_tokens_str := headers.get("x-openai-max-tokens"): try: config["max_tokens"] = int(max_tokens_str) except ValueError: logger.warning(f"Invalid max_tokens header: {max_tokens_str}") logger.debug( "Configuration from headers", has_api_key=bool(config.get("api_key")), api_key_length=len(config.get("api_key", "")), model=config.get("model"), max_tokens=config.get("max_tokens"), ) return config # Helper functions for logging def mask_api_key(api_key: str) -> str: """Mask API key for logging in production.""" if not api_key: return "None" if ENVIRONMENT == "production" or LOG_LEVEL != "DEBUG": return f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***" return api_key def log_openai_request( model: str, messages: List[Dict[str, Any]], max_tokens: int, api_key: str ) -> None: """Log OpenAI request details.""" # Only log minimal, aggregate details; never log prompts or headers message_count = len(messages) prompt_chars = 0 for message in messages: content = message.get("content", "") prompt_chars += ( len(content) if isinstance(content, str) else len(json.dumps(content)) ) logger.debug( "OpenAI API Request", environment=ENVIRONMENT, log_level=LOG_LEVEL, model=model, message_count=message_count, prompt_chars=prompt_chars, max_tokens=max_tokens, has_api_key=bool(api_key), api_key_length=len(api_key) if api_key else 0, ) def log_openai_response(response: Any) -> None: """Log OpenAI response details.""" logger.debug( "OpenAI API Response", environment=ENVIRONMENT, log_level=LOG_LEVEL, response_id=getattr(response, "id", None), model=getattr(response, "model", None), usage=getattr(response, "usage", None), choices_count=len(response.choices) if hasattr(response, "choices") else 0, response_headers={ "content-type": "application/json", }, ) def log_mcp_call(tool_name: str, **kwargs: Any) -> None: """Log incoming MCP tool call.""" # Mask sensitive parameters safe_kwargs: Dict[str, Any] = {} for key, value in kwargs.items(): # Drop API keys entirely if key.lower() in {"api_key", "authorization", "proxy_authorization"}: continue # Replace potentially sensitive textual content with lengths only if key in {"question", "context", "plan_content", "plan_content_preview"}: if isinstance(value, str): safe_kwargs[f"{key}_length"] = len(value) else: safe_kwargs[f"{key}_present"] = value is not None continue # Sanitize messages arrays if ever passed if key == "messages": safe_kwargs["message_count"] = ( len(value) if hasattr(value, "__len__") else 0 # type: ignore[arg-type] ) continue # Pass through non-sensitive metadata safe_kwargs[key] = value logger.debug( "MCP Tool Call Received", environment=ENVIRONMENT, log_level=LOG_LEVEL, tool_name=tool_name, parameters=safe_kwargs, ) # Data Models class ReviewLevel(str, Enum): """Review levels for plan analysis.""" QUICK = "quick" # Basic structure and completeness STANDARD = "standard" # Detailed analysis with suggestions COMPREHENSIVE = "comprehensive" # Deep analysis with alternatives DEEP_DIVE = "deep_dive" # Technical analysis with implementation considerations EXPERT = "expert" # Professional-level review with best practices class PlanReview(BaseModel): """Plan review result.""" plan_id: str review_level: ReviewLevel overall_score: float strengths: List[str] weaknesses: List[str] suggestions: List[str] detailed_feedback: str reviewed_at: datetime = Field(default_factory=datetime.now) # In-memory storage for plan reviews plan_reviews: Dict[str, PlanReview] = {} # Metrics: minimal, aggregate counters REQUEST_COUNTER = Counter( "proxied_requests_total", "Count of requests proxied/handled by this server", ["tool", "status"], ) # Simple in-memory tallies for homepage display REQUEST_TALLIES: Dict[str, Dict[str, int]] = {} def increment_tally(tool: str, status: str) -> None: tool_map = REQUEST_TALLIES.setdefault(tool, {}) tool_map[status] = tool_map.get(status, 0) + 1 # Optional Postgres metrics storage _db_conn: Any = None _metrics_table_ready: bool = False def initialize_metrics_db() -> None: """Initialize DB connection and ensure metrics table exists if enabled.""" global _db_conn, _metrics_table_ready if not TRACK_METRICS_DB: return if not DATABASE_URL: logger.error( "DB metrics enabled but DATABASE_URL/SUPABASE_DB_URL is not set", track_metrics_db=TRACK_METRICS_DB, ) return # Import lazily to avoid hard dependency when disabled try: psycopg = __import__("psycopg") except ImportError as exc: _metrics_table_ready = False logger.error("DB metrics init failed: psycopg not installed", error=str(exc)) return try: if _db_conn is None: _db_conn = psycopg.connect(DATABASE_URL, autocommit=True) # type: ignore[arg-type] with _db_conn.cursor() as cur: # type: ignore[union-attr] cur.execute( """ create table if not exists public.request_counts ( tool text not null, status text not null, day date not null, count bigint not null default 0, primary key (tool, status, day) ); """ ) _metrics_table_ready = True logger.info("Metrics table ready", table="public.request_counts") except psycopg.Error as exc: # type: ignore[attr-defined] _metrics_table_ready = False logger.error("DB metrics init failed", error=str(exc)) def db_increment(tool: str, status: str) -> None: """Increment DB counters; safe no-op if disabled or unavailable.""" if not TRACK_METRICS_DB or not DATABASE_URL: return # Read-only access to globals; no global declaration needed # Import lazily; if missing, log and return try: psycopg = __import__("psycopg") except ImportError as exc: logger.error( "DB metrics increment failed: psycopg not installed", error=str(exc) ) return if _db_conn is None or not _metrics_table_ready: initialize_metrics_db() if _db_conn is None or not _metrics_table_ready: return try: with _db_conn.cursor() as cur: # type: ignore[union-attr] cur.execute( """ insert into public.request_counts (tool, status, day, count) values (%s, %s, current_date, 1) on conflict (tool, status, day) do update set count = public.request_counts.count + 1; """, (tool, status), ) except psycopg.Error as exc: # type: ignore[attr-defined] logger.error("DB metrics increment failed", error=str(exc)) async def async_db_increment(tool: str, status: str) -> None: await asyncio.to_thread(db_increment, tool, status) # OpenAI Integration Tools @mcp.tool() async def phone_a_friend( question: Annotated[str, "The question to ask OpenAI"], context: Annotated[ Optional[str], "Optional context information to provide background for the question", ] = None, model: Annotated[ Optional[str], "OpenAI model to use (optional if set in headers)" ] = None, max_tokens: Annotated[ Optional[int], "Maximum tokens for response (optional if set in headers)" ] = None, ) -> str: """Phone a friend (OpenAI) to get help with a question.""" # Get configuration from headers header_config = get_config_from_headers() # Use parameters if provided, otherwise fall back to headers final_api_key = header_config.get("api_key") final_model = model or header_config.get("model", "gpt-4") final_max_tokens = max_tokens or header_config.get("max_tokens", 1000) # Validate API key is available if not final_api_key: raise ValueError("API key must be provided in X-OpenAI-API-Key header") # Log incoming MCP call log_mcp_call( "phone_a_friend", question=question[:100] if len(question) > 100 else question, context=context[:100] if context and len(context) > 100 else context, model=final_model, model_source="parameter" if model else "header", max_tokens=final_max_tokens, max_tokens_source="parameter" if max_tokens else "header", ) # Build prompt with optional context if context: prompt = ( f"Context: {context}\n\n" f"Question: {question}\n\n" f"Please provide a comprehensive answer." ) else: prompt = f"Question: {question}\n\nPlease provide a comprehensive answer." messages = cast( List[ChatCompletionMessageParam], [{"role": "user", "content": prompt}] ) try: # Create OpenAI client with API key from headers client = openai.OpenAI(api_key=final_api_key) # Log OpenAI request log_openai_request( final_model, [{"role": "user", "content": prompt}], final_max_tokens, final_api_key, ) response = client.chat.completions.create( model=final_model, messages=messages, # type: ignore[arg-type] max_tokens=final_max_tokens, temperature=0.3, ) # Log OpenAI response log_openai_response(response) answer = response.choices[0].message.content if not answer: raise ValueError("Empty response from OpenAI") logger.info("Friend called successfully", question=question[:50]) result: str = answer.strip() # Avoid logging content; record only length logger.debug("Friend answer produced", question_length=len(question)) # Metrics: success REQUEST_COUNTER.labels(tool="phone_a_friend", status="success").inc() increment_tally("phone_a_friend", "success") asyncio.create_task(async_db_increment("phone_a_friend", "success")) return result except Exception as e: # Metrics: error REQUEST_COUNTER.labels(tool="phone_a_friend", status="error").inc() increment_tally("phone_a_friend", "error") asyncio.create_task(async_db_increment("phone_a_friend", "error")) logger.error( "Failed to phone a friend", error=str(e), error_type=type(e).__name__, api_key_provided=bool(final_api_key), api_key_length=len(final_api_key) if final_api_key else 0, ) raise # Plan Review Tool @mcp.tool() async def review_plan( plan_content: Annotated[str, "The full content of the plan document to review"], review_level: Annotated[ ReviewLevel, "Level of review depth: 'quick', 'standard', 'comprehensive', 'deep_dive', or 'expert'", ] = ReviewLevel.STANDARD, context: Annotated[ Optional[str], "Optional context information about the project, team, or constraints", ] = None, plan_id: Annotated[Optional[str], "Optional identifier for the plan"] = None, focus_areas: Annotated[ Optional[List[str]], ( "Specific areas to focus on " "(e.g., 'timeline', 'resources', 'risks', 'budget')" ), ] = None, model: Annotated[ Optional[str], "OpenAI model to use (optional if set in headers)" ] = None, max_tokens: Annotated[ Optional[int], "Maximum tokens for response (optional if set in headers)" ] = None, ) -> Dict[str, Any]: """ Review a plan file and provide feedback based on the specified review level. Args: plan_content: The content of the plan file to review review_level: Level of review depth (quick, standard, comprehensive, deep_dive, expert) context: Optional context information about the project, team, or constraints plan_id: Optional identifier for the plan focus_areas: Optional list of specific areas to focus the review on model: OpenAI model to use (optional if set in headers, default: gpt-4) max_tokens: Maximum tokens for response (optional if set in headers, default: 2000) Returns: Dictionary containing review results and feedback """ # Get configuration from headers header_config = get_config_from_headers() # Use parameters if provided, otherwise fall back to headers final_api_key = header_config.get("api_key") final_model = model or header_config.get("model", "gpt-4") final_max_tokens = max_tokens or header_config.get("max_tokens", 2000) # Validate API key is available if not final_api_key: raise ValueError("API key must be provided in X-OpenAI-API-Key header") # Log incoming MCP call log_mcp_call( "review_plan", plan_content_length=len(plan_content), review_level=review_level, context_length=len(context) if context else 0, plan_id=plan_id, focus_areas=focus_areas, model=final_model, model_source="parameter" if model else "header", max_tokens=final_max_tokens, max_tokens_source="parameter" if max_tokens else "header", ) # Generate plan ID if not provided if not plan_id: plan_id = f"plan_{int(datetime.now().timestamp())}" # Create review prompt based on level # All reviews follow the Master Review Framework with varying depth review_prompts = { ReviewLevel.QUICK: """ Provide a quick review of this plan using the following framework: STRUCTURE & ORGANIZATION: - Is the plan logically structured and easy to follow? COMPLETENESS: - Are key sections present (objectives, scope, timeline)? CLARITY: - Any obvious ambiguities or "must-fix" clarity issues? ASSUMPTIONS & RISKS: - Any glaring unstated assumptions or obvious risks? Keep feedback concise, actionable, and in checklist form if possible. Provide 1-2 key improvement suggestions. """, ReviewLevel.STANDARD: """ Provide a standard review of this plan using the following framework: STRUCTURE & ORGANIZATION: - Is the plan logically structured and easy to follow? COMPLETENESS: - Are all key sections present (objectives, scope, resources, risks, timeline, success criteria)? - Is there appropriate level of detail for each section? CLARITY: - Is the language unambiguous, readable, and accessible to stakeholders? ASSUMPTIONS & DEPENDENCIES: - Are hidden assumptions, constraints, or external dependencies called out? RISKS: - What risks or failure modes are unaddressed? FEASIBILITY: - Are timeline and resource estimates realistic? Provide specific suggestions and 2-3 clarifying questions the plan should answer. """, ReviewLevel.COMPREHENSIVE: """ Provide a comprehensive review of this plan using the following framework: STRUCTURE & ORGANIZATION: - Is the plan logically structured and easy to follow? - Does it flow naturally from problem → solution → implementation? COMPLETENESS: - Are all key sections present and thoroughly developed (objectives, scope, resources, risks, timeline, success criteria)? - Is the level of detail appropriate for each section? CLARITY: - Is the language unambiguous, readable, and accessible to all stakeholders? - Are technical terms and concepts clearly explained? ASSUMPTIONS & DEPENDENCIES: - Are hidden assumptions, constraints, or external dependencies explicitly called out? - What implicit assumptions need to be validated? RISKS: - What risks, failure modes, or edge cases are unaddressed? - Are mitigation strategies defined for key risks? FEASIBILITY: - Are timeline and resource estimates realistic? - Is the plan testable and measurable? - Can success be validated objectively? ALTERNATIVES: - Have trade-offs and alternative approaches been considered? - Are design decisions justified? VALIDATION: - Does the plan define success criteria, KPIs, or metrics? - How will progress be tracked and measured? STAKEHOLDERS: - Are roles, responsibilities, and stakeholder impacts clear? LONG-TERM SUSTAINABILITY: - Does the plan account for scalability, maintainability, and adaptability? Provide detailed feedback with examples, alternatives, and 3-5 clarifying questions that expose potential blind spots. """, ReviewLevel.DEEP_DIVE: """ Provide a deep-dive technical review of this plan using the following framework: STRUCTURE & ORGANIZATION: - Evaluate logical flow, section coherence, and information architecture - Assess whether structure supports understanding and execution COMPLETENESS: - Section-by-section completeness audit (objectives, scope, resources, risks, timeline, success criteria, rollout plan) - Identify missing technical details, specifications, or requirements CLARITY: - Evaluate technical precision and unambiguous language - Assess readability for both technical and non-technical stakeholders ASSUMPTIONS & DEPENDENCIES: - Identify ALL stated and unstated assumptions - Map out dependency chains and potential bottlenecks - Validate technical feasibility of each assumption RISKS: - Comprehensive risk analysis: technical, operational, security, performance - Failure mode analysis (FMEA-style): what could go wrong and when? - Edge cases, race conditions, and boundary conditions - Mitigation and rollback strategies for each major risk FEASIBILITY: - Detailed timeline realism check with critical path analysis - Resource allocation validation (team capacity, skills, budget) - Technical feasibility of proposed solutions - Testing strategy and validation approach ALTERNATIVES: - Compare against alternative technical approaches - Evaluate trade-offs (performance vs complexity, cost vs speed, etc.) - Justify architectural and design decisions VALIDATION: - Define measurable success criteria and KPIs - Specify testing, monitoring, and observability requirements - Outline validation checkpoints throughout implementation STAKEHOLDERS: - Map stakeholder roles, responsibilities, and approval gates - Identify communication touchpoints and escalation paths LONG-TERM SUSTAINABILITY: - Scalability analysis: how will this perform at 10x, 100x scale? - Maintainability: code quality, documentation, knowledge transfer - Adaptability: how easily can this evolve with changing requirements? - Operational considerations: deployment, monitoring, incident response Provide rigorous, technically detailed feedback with specific examples, actionable improvements, and 4-6 probing questions. """, ReviewLevel.EXPERT: """ Provide an expert-level review of this plan using the Master Review Framework with professional rigor: STRUCTURE & ORGANIZATION: - Evaluate against industry-standard plan structures (PRDs, RFCs, technical specifications) - Assess information architecture and accessibility for diverse audiences COMPLETENESS: - Comprehensive audit of all sections (objectives, scope, resources, risks, timeline, success criteria, rollout, communication plan) - Evaluate against professional planning standards and best practices - Identify gaps that would concern executive stakeholders or auditors CLARITY: - Assess precision, unambiguity, and professional communication standards - Evaluate for multi-stakeholder accessibility (technical, business, executive) - Check for regulatory or compliance language requirements ASSUMPTIONS & DEPENDENCIES: - Exhaustive mapping of assumptions with validation requirements - Dependency analysis including external systems, teams, and third parties - Constraint analysis (technical, business, legal, compliance) - Market or competitive landscape assumptions RISKS: - Enterprise-level risk assessment (technical, operational, business, legal, reputational) - Comprehensive failure mode analysis with probability and impact assessment - Security, privacy, and compliance risks - Business continuity and disaster recovery considerations - Risk mitigation, transfer, acceptance strategies FEASIBILITY: - Multi-dimensional feasibility analysis: technical, operational, financial, organizational - Realistic timeline assessment with uncertainty ranges - Resource allocation optimization and capacity planning - Financial modeling and ROI analysis where applicable - Testability, measurability, and validation strategy ALTERNATIVES: - Comprehensive alternatives analysis with decision matrices - Trade-off evaluation across multiple dimensions (cost, time, quality, risk) - Competitive analysis and industry benchmarking - Build vs buy vs partner considerations VALIDATION: - Define SMART success criteria and KPIs aligned with business objectives - Comprehensive testing strategy (unit, integration, system, acceptance) - Monitoring, observability, and alerting requirements - Metrics dashboard and reporting cadence - Go/no-go decision criteria at each milestone STAKEHOLDERS: - Complete stakeholder mapping with RACI matrix - Communication plan with appropriate cadence and channels - Change management and stakeholder buy-in strategy - Executive reporting and governance structure LONG-TERM SUSTAINABILITY: - Scalability with specific load projections and capacity planning - Maintainability with documentation, knowledge transfer, and support plans - Adaptability and extensibility for future requirements - Total cost of ownership (TCO) analysis - Technical debt management strategy - Operational excellence: SLAs, SLOs, error budgets - Team sustainability: on-call rotation, burnout prevention Provide expert insights with industry context, citing best practices and standards where relevant. Suggest measurable improvements with business impact. Provide 5-7 strategic questions the leadership team should address before execution. """, } base_prompt = review_prompts[review_level] # Add focus areas if specified if focus_areas: areas = ", ".join(focus_areas) focus_text = f"\n\nFocus the review specifically on these areas: {areas}" base_prompt += focus_text # Add context if provided context_section = "" if context: context_section = f"\n\nAdditional Context:\n{context}\n" prompt = f""" {base_prompt} {context_section} Plan Content: {plan_content} Please provide your review in the following JSON format: {{ "overall_score": 0.0-1.0, "strengths": ["strength1", "strength2", ...], "weaknesses": ["weakness1", "weakness2", ...], "suggestions": ["suggestion1", "suggestion2", ...], "detailed_feedback": "comprehensive feedback text" }} """ messages = [{"role": "user", "content": prompt}] try: # Create OpenAI client with API key from headers client = openai.OpenAI(api_key=final_api_key) # Log OpenAI request log_openai_request( final_model, [dict(m) for m in messages], final_max_tokens, final_api_key ) response = client.chat.completions.create( model=final_model, messages=messages, # type: ignore[arg-type] max_tokens=final_max_tokens, temperature=0.3, ) # Log OpenAI response log_openai_response(response) # Parse JSON response review_content = response.choices[0].message.content if not review_content: raise ValueError("Empty response from OpenAI") review_text: str = review_content.strip() # Try to extract JSON from the response try: # Look for JSON in the response start_idx = review_text.find("{") end_idx = review_text.rfind("}") + 1 if start_idx != -1 and end_idx != 0: json_text = review_text[start_idx:end_idx] review_data = json.loads(json_text) else: # Fallback if no JSON found review_data = { "overall_score": 0.7, "strengths": ["Plan structure is present"], "weaknesses": ["Unable to parse detailed review"], "suggestions": ["Review the plan manually"], "detailed_feedback": review_text, } except json.JSONDecodeError: # Fallback if JSON parsing fails review_data = { "overall_score": 0.7, "strengths": ["Plan structure is present"], "weaknesses": ["Unable to parse detailed review"], "suggestions": ["Review the plan manually"], "detailed_feedback": review_text, } # Create plan review object plan_review = PlanReview( plan_id=plan_id, review_level=review_level, overall_score=review_data.get("overall_score", 0.7), strengths=review_data.get("strengths", []), weaknesses=review_data.get("weaknesses", []), suggestions=review_data.get("suggestions", []), detailed_feedback=review_data.get("detailed_feedback", review_text), ) # Store the review plan_reviews[plan_id] = plan_review logger.info( "Plan reviewed", plan_id=plan_id, review_level=review_level, score=plan_review.overall_score, ) reviewed_at_value: datetime = cast( datetime, getattr(plan_review, "reviewed_at") ) reviewed_at_iso: str = reviewed_at_value.isoformat() # Metrics: success REQUEST_COUNTER.labels(tool="review_plan", status="success").inc() increment_tally("review_plan", "success") asyncio.create_task(async_db_increment("review_plan", "success")) return { "plan_id": plan_id, "review_level": review_level, "overall_score": plan_review.overall_score, "strengths": plan_review.strengths, "weaknesses": plan_review.weaknesses, "suggestions": plan_review.suggestions, "detailed_feedback": plan_review.detailed_feedback, "reviewed_at": reviewed_at_iso, } except Exception as e: # Metrics: error REQUEST_COUNTER.labels(tool="review_plan", status="error").inc() increment_tally("review_plan", "error") asyncio.create_task(async_db_increment("review_plan", "error")) logger.error( "Failed to review plan", error=str(e), error_type=type(e).__name__, plan_id=plan_id, api_key_provided=bool(final_api_key), api_key_length=len(final_api_key) if final_api_key else 0, ) raise # Health check endpoint @mcp.tool() async def health_check() -> Dict[str, Any]: """Check server health and status.""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "plan_reviews_count": len(plan_reviews), } # REST API endpoints for interactive demo class DemoRequest(BaseModel): """Request model for demo API endpoints.""" question: Optional[str] = None context: Optional[str] = None plan_content: Optional[str] = None review_level: Optional[str] = "standard" focus_areas: Optional[List[str]] = None api_key: str @mcp.custom_route("/api/demo/phone-a-friend", methods=["POST"]) async def demo_phone_a_friend(request: Request) -> JSONResponse: """REST API endpoint for phone_a_friend demo.""" try: if not ENABLE_DEMOS: raise HTTPException(status_code=404, detail="Not Found") payload = await request.json() data = DemoRequest.model_validate(payload) if not data.question: raise HTTPException(status_code=400, detail="Question is required") if not data.api_key: raise HTTPException(status_code=400, detail="API key is required") if data.context: prompt = ( f"Context: {data.context}\n\n" f"Question: {data.question}\n\n" f"Please provide a comprehensive answer." ) else: prompt = ( f"Question: {data.question}\n\n" f"Please provide a comprehensive answer." ) messages = cast( List[ChatCompletionMessageParam], [{"role": "user", "content": prompt}] ) client = openai.OpenAI(api_key=data.api_key) response = client.chat.completions.create( model="gpt-4", messages=messages, max_tokens=1000, temperature=0.3, ) answer = response.choices[0].message.content if not answer: raise HTTPException(status_code=500, detail="Empty response from OpenAI") # Metrics: success REQUEST_COUNTER.labels(tool="demo_phone_a_friend", status="success").inc() increment_tally("demo_phone_a_friend", "success") asyncio.create_task(async_db_increment("demo_phone_a_friend", "success")) return JSONResponse(content={"answer": answer.strip()}) except openai.AuthenticationError as exc: REQUEST_COUNTER.labels(tool="demo_phone_a_friend", status="error").inc() increment_tally("demo_phone_a_friend", "error") asyncio.create_task(async_db_increment("demo_phone_a_friend", "error")) raise HTTPException(status_code=401, detail="Invalid API key") from exc except openai.RateLimitError as exc: REQUEST_COUNTER.labels(tool="demo_phone_a_friend", status="error").inc() increment_tally("demo_phone_a_friend", "error") asyncio.create_task(async_db_increment("demo_phone_a_friend", "error")) raise HTTPException(status_code=429, detail="Rate limit exceeded") from exc except Exception as exc: logger.error("Demo API error", error=str(exc)) REQUEST_COUNTER.labels(tool="demo_phone_a_friend", status="error").inc() increment_tally("demo_phone_a_friend", "error") asyncio.create_task(async_db_increment("demo_phone_a_friend", "error")) raise HTTPException(status_code=500, detail=str(exc)) from exc @mcp.custom_route("/api/metrics/summary", methods=["GET"]) async def metrics_summary(_request: Request) -> JSONResponse: """Return minimal in-memory tallies for homepage display.""" # Prefer DB totals when enabled; fall back to in-memory tallies if TRACK_METRICS_DB and DATABASE_URL and _metrics_table_ready: try: psycopg = __import__("psycopg") if _db_conn is None: initialize_metrics_db() if _db_conn is not None and _metrics_table_ready: with _db_conn.cursor() as cur: # type: ignore[union-attr] cur.execute( """ select tool, status, sum(count) as total from public.request_counts group by tool, status order by tool, status; """ ) rows = cur.fetchall() tallies: Dict[str, Dict[str, int]] = {} for tool, status, total in rows: per = tallies.setdefault(tool, {}) per[status] = int(total) return JSONResponse( content={ "tallies": tallies, "source": "database", "generated_at": datetime.now().isoformat(), } ) except psycopg.Error as exc: # type: ignore[attr-defined] logger.error("DB metrics summary query failed", error=str(exc)) # Shallow copy to avoid mutation during serialization snapshot: Dict[str, Dict[str, int]] = { k: dict(v) for k, v in REQUEST_TALLIES.items() } return JSONResponse( content={ "tallies": snapshot, "source": "memory", "generated_at": datetime.now().isoformat(), } ) @mcp.custom_route("/api/demo/review-plan", methods=["POST"]) async def demo_review_plan(request: Request) -> JSONResponse: """REST API endpoint for review_plan demo.""" try: if not ENABLE_DEMOS: raise HTTPException(status_code=404, detail="Not Found") payload = await request.json() data = DemoRequest.model_validate(payload) if not data.plan_content: raise HTTPException(status_code=400, detail="Plan content is required") if not data.api_key: raise HTTPException(status_code=400, detail="API key is required") review_level = ReviewLevel(data.review_level or "standard") review_prompts = { ReviewLevel.QUICK: "Provide a quick review focusing on structure and completeness.", ReviewLevel.STANDARD: "Provide a standard review covering key areas.", ReviewLevel.COMPREHENSIVE: "Provide a comprehensive review with detailed analysis.", ReviewLevel.DEEP_DIVE: "Provide a deep-dive technical review.", ReviewLevel.EXPERT: "Provide an expert-level professional review.", } prompt = f""" {review_prompts[review_level]} Plan Content: {data.plan_content} Please provide your review in JSON format: {{ "overall_score": 0.0-1.0, "strengths": ["strength1", "strength2"], "weaknesses": ["weakness1", "weakness2"], "suggestions": ["suggestion1", "suggestion2"], "detailed_feedback": "comprehensive feedback text" }} """ messages = cast( List[ChatCompletionMessageParam], [{"role": "user", "content": prompt}] ) client = openai.OpenAI(api_key=data.api_key) response = client.chat.completions.create( model="gpt-4", messages=messages, max_tokens=2000, temperature=0.3, ) review_content = response.choices[0].message.content if not review_content: raise HTTPException(status_code=500, detail="Empty response from OpenAI") review_text = review_content.strip() try: start_idx = review_text.find("{") end_idx = review_text.rfind("}") + 1 if start_idx != -1 and end_idx != 0: json_text = review_text[start_idx:end_idx] review_data = json.loads(json_text) else: review_data = { "overall_score": 0.7, "strengths": ["Plan structure is present"], "weaknesses": ["Unable to parse detailed review"], "suggestions": ["Review the plan manually"], "detailed_feedback": review_text, } except json.JSONDecodeError: review_data = { "overall_score": 0.7, "strengths": ["Plan structure is present"], "weaknesses": ["Unable to parse detailed review"], "suggestions": ["Review the plan manually"], "detailed_feedback": review_text, } # Metrics: success REQUEST_COUNTER.labels(tool="demo_review_plan", status="success").inc() increment_tally("demo_review_plan", "success") asyncio.create_task(async_db_increment("demo_review_plan", "success")) return JSONResponse(content=review_data) except openai.AuthenticationError as exc: REQUEST_COUNTER.labels(tool="demo_review_plan", status="error").inc() increment_tally("demo_review_plan", "error") asyncio.create_task(async_db_increment("demo_review_plan", "error")) raise HTTPException(status_code=401, detail="Invalid API key") from exc except openai.RateLimitError as exc: REQUEST_COUNTER.labels(tool="demo_review_plan", status="error").inc() increment_tally("demo_review_plan", "error") asyncio.create_task(async_db_increment("demo_review_plan", "error")) raise HTTPException(status_code=429, detail="Rate limit exceeded") from exc except Exception as exc: logger.error("Demo API error", error=str(exc)) REQUEST_COUNTER.labels(tool="demo_review_plan", status="error").inc() increment_tally("demo_review_plan", "error") asyncio.create_task(async_db_increment("demo_review_plan", "error")) raise HTTPException(status_code=500, detail=str(exc)) from exc @mcp.custom_route("/api/demo/health", methods=["GET"]) async def demo_health(_request: Request) -> JSONResponse: """REST API endpoint for health check demo.""" return JSONResponse( content={ "status": "healthy", "timestamp": datetime.now().isoformat(), "plan_reviews_count": len(plan_reviews), } ) @mcp.custom_route("/", methods=["GET"]) async def serve_homepage(_request: Request) -> FileResponse: """Serve the homepage.""" index_path = dist_path / "index.html" if index_path.exists(): return FileResponse(str(index_path)) return FileResponse("frontend/index.html") # Container/infra health probe @mcp.custom_route("/health", methods=["GET"]) async def health_route(_request: Request) -> JSONResponse: return JSONResponse( content={ "status": "healthy", "timestamp": datetime.now().isoformat(), "plan_reviews_count": len(plan_reviews), } ) # Static asset fallback route (ensures assets are served even if mount is not applied) @mcp.custom_route("/assets/{asset_path:path}", methods=["GET"]) async def serve_asset(_request: Request) -> Response: # Extract the matched path from the URL # Starlette injects it into scope; FastMCP passes through Request # Using request.url.path to compute relative path after "/assets/" url_path = _request.url.path prefix = "/assets/" if url_path.startswith(prefix): path = url_path.replace(prefix, "", 1) else: path = url_path.lstrip("/") asset_file = dist_path / "assets" / path if asset_file.is_file(): return FileResponse(str(asset_file)) return JSONResponse({"detail": "Not Found"}, status_code=HTTP_404_NOT_FOUND) @mcp.custom_route("/favicon.ico", methods=["GET"]) async def favicon(_request: Request) -> Response: # Prefer SVG; many browsers accept SVG favicons svg_path = dist_path / "favicon.svg" if svg_path.is_file(): return FileResponse(str(svg_path), media_type="image/svg+xml") # Fallback to built icon in frontend assets svg_src = Path(__file__).parent / "frontend" / "src" / "assets" / "favicon.svg" if svg_src.is_file(): return FileResponse(str(svg_src), media_type="image/svg+xml") return JSONResponse({"detail": "Not Found"}, status_code=HTTP_404_NOT_FOUND) @mcp.custom_route("/favicon.svg", methods=["GET"]) async def favicon_svg(_request: Request) -> Response: svg_path = dist_path / "favicon.svg" if svg_path.is_file(): return FileResponse(str(svg_path), media_type="image/svg+xml") svg_src = Path(__file__).parent / "frontend" / "src" / "assets" / "favicon.svg" if svg_src.is_file(): return FileResponse(str(svg_src), media_type="image/svg+xml") return JSONResponse({"detail": "Not Found"}, status_code=HTTP_404_NOT_FOUND) # Server startup logging logger.info( "MCP server initialized successfully", environment=ENVIRONMENT, log_level=LOG_LEVEL, debug_logging_enabled=(LOG_LEVEL == "DEBUG"), sensitive_data_logging=(ENVIRONMENT == "development" and LOG_LEVEL == "DEBUG"), ) # Metrics exposition endpoint @mcp.custom_route("/metrics", methods=["GET"]) async def metrics_endpoint(_request: Request) -> Response: return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) # Rebind exported http_app after all routes are registered, for tests and embedding http_app = mcp.http_app() if __name__ == "__main__": # Run the server # Initialize DB metrics table if enabled initialize_metrics_db() logger.info( "Starting MCP server", transport="http", host="0.0.0.0", port=8000, environment=ENVIRONMENT, log_level=LOG_LEVEL, ) mcp.run(transport="http", host="0.0.0.0", port=8000)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bernierllc/brain-trust-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server