togmal_mcp.py•51.6 kB
"""
ToGMAL MCP Server - Taxonomy of Generative Model Apparent Limitations
This MCP server provides tools to detect out-of-distribution behaviors and apparent
limitations in LLM interactions, offering safety interventions when needed.
Features:
- Heuristic-based anomaly detection for prompts and responses
- Pattern matching for known problematic scenarios
- Client-side evidence submission for taxonomy building
- Intervention recommendations (step breakdown, human-in-the-loop, web search)
- Privacy-preserving analysis (no LLM judge, deterministic tests)
"""
from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field, field_validator, ConfigDict
from typing import Optional, List, Dict, Any, Literal
from enum import Enum
import re
import json
import hashlib
from datetime import datetime
import math
import os
from togmal_ml_integration import get_ml_detector, combine_detections
from togmal.context_analyzer import analyze_conversation_context
from togmal.ml_tools import get_ml_discovered_tools
from togmal.config import DYNAMIC_TOOLS_ENABLED, CORE_TOOLS, ML_CLUSTERING_ENABLED, ML_TOOLS_MIN_CONFIDENCE
class FeatureExtractor:
"""
Shim for unpickling models that reference __main__.FeatureExtractor
Provides minimal methods used by ML-enhanced detection to avoid pickle import errors.
The pickled object may populate attributes like 'prompt_vectorizer' and 'scaler'.
"""
def transform_prompts(self, prompts: List[str]):
# Try to use stored vectorizer/scaler if available
try:
pv = getattr(self, "prompt_vectorizer", None)
sc = getattr(self, "scaler", None)
if pv is not None:
X = pv.transform(prompts)
X_arr = X.toarray() if hasattr(X, "toarray") else X
if sc is not None:
try:
X_arr = sc.transform(X_arr)
except Exception:
# If scaler fails, return unscaled
pass
return X_arr
except Exception:
pass
# Fallback: safe zero features
try:
import numpy as np
return np.zeros((len(prompts), 1))
except Exception:
return [[0] for _ in prompts]
# Initialize MCP server
mcp = FastMCP("togmal_mcp")
# ----------------------------------------------------------------------------
# Environment defaults for stdio MCP server
# Ensures consistent behavior when launched by Inspector or other clients
# ----------------------------------------------------------------------------
try:
# Derive HOME and USER from workspace path
_workspace = "/Users/hetalksinmaths/togmal"
_home_default = "/Users/hetalksinmaths"
_user_default = "hetalksinmaths"
_venv_bin = f"{_workspace}/.venv/bin"
except Exception:
_home_default = os.path.expanduser("~")
_user_default = os.environ.get("USER", "user")
_venv_bin = os.path.join(os.getcwd(), ".venv", "bin")
# Set common env defaults if missing
os.environ.setdefault("HOME", _home_default)
os.environ.setdefault("USER", _user_default)
os.environ.setdefault("LOGNAME", _user_default)
os.environ.setdefault("SHELL", "/bin/zsh")
# Do not override TERM; leave as provided by client/terminal
# Ensure venv bin is prefixed on PATH
_path_current = os.environ.get("PATH", "")
if _venv_bin and _venv_bin not in _path_current.split(":"):
os.environ["PATH"] = f"{_venv_bin}:{_path_current}" if _path_current else _venv_bin
# Constants
CHARACTER_LIMIT = 25000
MAX_EVIDENCE_ENTRIES = 1000
# Persistence for taxonomy (in production, this would use persistent storage)
TAXONOMY_FILE = "./data/taxonomy.json"
os.makedirs("./data", exist_ok=True)
DEFAULT_TAXONOMY: Dict[str, List[Dict[str, Any]]] = {
"math_physics_speculation": [],
"ungrounded_medical_advice": [],
"dangerous_file_operations": [],
"vibe_coding_overreach": [],
"unsupported_claims": []
}
# Initialize taxonomy and then optionally load from disk
TAXONOMY_DB: Dict[str, List[Dict[str, Any]]] = DEFAULT_TAXONOMY.copy()
try:
if os.path.exists(TAXONOMY_FILE):
with open(TAXONOMY_FILE, "r") as f:
data = json.load(f)
if isinstance(data, dict):
for k, v in data.items():
if k in TAXONOMY_DB and isinstance(v, list):
TAXONOMY_DB[k] = v
except Exception:
# If loading fails, continue with default taxonomy
pass
def save_taxonomy() -> None:
"""Persist taxonomy to disk."""
try:
with open(TAXONOMY_FILE, "w") as f:
json.dump(TAXONOMY_DB, f, indent=2, default=str)
except Exception:
# Persistence failures should not break tool execution
pass
# ============================================================================
# ENUMS AND DATA MODELS
# ============================================================================
class RiskLevel(str, Enum):
"""Risk level classification."""
LOW = "low"
MODERATE = "moderate"
HIGH = "high"
CRITICAL = "critical"
class InterventionType(str, Enum):
"""Types of safety interventions."""
STEP_BREAKDOWN = "step_breakdown"
HUMAN_IN_LOOP = "human_in_loop"
WEB_SEARCH = "web_search"
SIMPLIFIED_SCOPE = "simplified_scope"
NONE = "none"
class CategoryType(str, Enum):
"""Taxonomy categories for limitations."""
MATH_PHYSICS_SPECULATION = "math_physics_speculation"
UNGROUNDED_MEDICAL_ADVICE = "ungrounded_medical_advice"
DANGEROUS_FILE_OPERATIONS = "dangerous_file_operations"
VIBE_CODING_OVERREACH = "vibe_coding_overreach"
UNSUPPORTED_CLAIMS = "unsupported_claims"
class ResponseFormat(str, Enum):
"""Output format for tool responses."""
MARKDOWN = "markdown"
JSON = "json"
class SubmissionReason(str, Enum):
"""Why this evidence is being submitted."""
FALSE_NEGATIVE = "false_negative" # Detection missed a real issue
FALSE_POSITIVE = "false_positive" # Incorrectly flagged content
NEW_PATTERN = "new_pattern" # Novel limitation pattern not in taxonomy
LLM_FAILURE = "llm_failure" # LLM produced problematic output
EDGE_CASE = "edge_case" # Boundary case worth documenting
# ============================================================================
# INPUT MODELS
# ============================================================================
class AnalyzePromptInput(BaseModel):
"""Input model for prompt analysis."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
prompt: str = Field(
...,
description="The user prompt to analyze for potential limitations (e.g., 'Build me a quantum gravity theory', 'Write 5000 lines of code for a social network')",
min_length=1,
max_length=50000
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class AnalyzeResponseInput(BaseModel):
"""Input model for response analysis."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
response: str = Field(
...,
description="The LLM response to analyze for potential issues (e.g., medical advice, file operations, speculative claims)",
min_length=1,
max_length=100000
)
context: Optional[str] = Field(
default=None,
description="Optional context about the original prompt for better analysis",
max_length=10000
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
class SubmitEvidenceInput(BaseModel):
"""Input model for submitting evidence of limitations."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
category: CategoryType = Field(
...,
description="Category of limitation: 'math_physics_speculation', 'ungrounded_medical_advice', 'dangerous_file_operations', 'vibe_coding_overreach', or 'unsupported_claims'"
)
prompt: str = Field(
...,
description="The prompt that led to the limitation",
min_length=1,
max_length=10000
)
response: str = Field(
...,
description="The problematic response from the LLM",
min_length=1,
max_length=50000
)
description: str = Field(
...,
description="Description of why this is problematic (e.g., 'Made up physics equations', 'Suggested dangerous medication without sources')",
min_length=10,
max_length=2000
)
severity: RiskLevel = Field(
...,
description="Severity level: 'low', 'moderate', 'high', or 'critical'"
)
reason: SubmissionReason = Field(
...,
description="Why this evidence is being submitted (false_negative, false_positive, new_pattern, llm_failure, edge_case)"
)
reason_details: Optional[str] = Field(
default=None,
description="Optional free-text elaboration about the reason"
)
class GetTaxonomyInput(BaseModel):
"""Input model for retrieving taxonomy entries."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra='forbid'
)
category: Optional[CategoryType] = Field(
default=None,
description="Filter by category, or None for all categories"
)
min_severity: Optional[RiskLevel] = Field(
default=None,
description="Minimum severity level to include"
)
limit: int = Field(
default=20,
description="Maximum number of entries to return",
ge=1,
le=100
)
offset: int = Field(
default=0,
description="Number of entries to skip for pagination",
ge=0
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' for human-readable or 'json' for machine-readable"
)
# ============================================================================
# HELPER FUNCTIONS - HEURISTIC DETECTION
# ============================================================================
def detect_math_physics_speculation(text: str) -> Dict[str, Any]:
"""Detect speculative math/physics theories without proper grounding."""
patterns = {
'grand_unification': [
r'theory of everything',
r'unified (field )?theory',
r'grand unification',
r'quantum gravity (theory|model|solution)',
],
'equation_invention': [
r'new equation',
r'i discovered',
r'my formula',
r'novel equation',
],
'fundamental_constants': [
r'(redefine|change|modify) (the )?(gravitational|planck|speed of light)',
r'new fundamental constant',
],
'particle_invention': [
r'new particle',
r'i propose a particle',
r'undiscovered particle',
]
}
matches = []
for category, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, text.lower()):
matches.append(category)
break
# Check for excessive use of mathematical notation without context
math_notation_count = len(re.findall(r'[∂∫∑∏√±×÷≠≈≤≥∞∇⊗⊕]', text))
if math_notation_count > 20:
matches.append('excessive_notation')
return {
'detected': len(matches) > 0,
'categories': matches,
'confidence': min(len(matches) * 0.25, 1.0)
}
def detect_ungrounded_medical_advice(text: str) -> Dict[str, Any]:
"""Detect medical advice that lacks proper sources or qualifications."""
patterns = {
'diagnosis_claims': [
r'you (probably|definitely|likely) have',
r'sounds like (you have|it\'s)',
r'this is (clearly|obviously) (a case of)?',
],
'treatment_recommendations': [
r'you should (take|use|try)',
r'i recommend (taking|using)',
r'(the )?(best|recommended) treatment is',
],
'drug_suggestions': [
r'(take|try) \d+\s?(mg|mcg|ml)',
r'(start|begin) (taking|with)',
],
'dismissive': [
r'(don\'t|no need to) (worry|see a doctor)',
r'(it\'s|that\'s) (nothing|not serious)',
]
}
# Check for medical terms
medical_terms = [
'diagnosis', 'treatment', 'medication', 'dosage', 'prescription',
'disease', 'condition', 'syndrome', 'disorder', 'therapy'
]
matches = []
medical_term_count = sum(1 for term in medical_terms if term in text.lower())
for category, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, text.lower()):
matches.append(category)
break
# Check if sources are cited
has_sources = bool(re.search(r'(according to|source:|study|research|clinical trial)', text.lower()))
is_problematic = len(matches) > 0 and medical_term_count > 2 and not has_sources
return {
'detected': is_problematic,
'categories': matches,
'medical_term_count': medical_term_count,
'has_sources': has_sources,
'confidence': min((len(matches) + medical_term_count / 5) * 0.2, 1.0) if not has_sources else 0.0
}
def detect_dangerous_file_operations(text: str) -> Dict[str, Any]:
"""Detect potentially dangerous file operations without safeguards."""
patterns = {
'mass_deletion': [
r'(rm|del|delete|remove) (-rf?|/s|all)',
r'(shutil\.)?rmtree',
r'delete.*\*',
r'unlink.*\*',
],
'test_file_operations': [
r'(rm|del|delete|remove).*test',
r'delete.*"test"',
],
'recursive_operations': [
r'(recursive|recursively) (delete|remove)',
r'walk.*delete',
r'glob.*delete',
],
'no_confirmation': [
r'(delete|remove).*(without|no).*(prompt|confirm|ask)',
r'force.*delete',
]
}
matches = []
for category, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, text.lower()):
matches.append(category)
break
# Check for safeguards
has_safeguards = bool(re.search(
r'(confirm|prompt|ask|verify|check|backup|dry.?run|human.?in.?the.?loop)',
text.lower()
))
is_dangerous = len(matches) > 0 and not has_safeguards
return {
'detected': is_dangerous,
'categories': matches,
'has_safeguards': has_safeguards,
'confidence': min(len(matches) * 0.3, 1.0) if not has_safeguards else 0.0
}
def detect_vibe_coding_overreach(text: str) -> Dict[str, Any]:
"""Detect requests for overly ambitious coding projects without proper scoping."""
patterns = {
'massive_scope': [
r'build (me |a )?(complete|full|entire|whole) (app|application|system|platform)',
r'create.*social network',
r'make.*operating system',
r'build.*from scratch',
],
'line_count': [
r'\d{4,}\s*(lines?|loc)',
r'(thousand|several thousand|5000|10000).*lines',
],
'everything': [
r'(all|everything|every) (feature|function|capability)',
r'complete (implementation|solution)',
],
'unrealistic_timeframe': [
r'(in|within) (one|a single|5|ten) (minute|hour|day)',
]
}
matches = []
for category, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, text.lower()):
matches.append(category)
break
# Check for architectural planning words (suggests proper scoping)
has_planning = bool(re.search(
r'(architecture|design|plan|structure|module|component|phase|step)',
text.lower()
))
# Extract potential line count
line_match = re.search(r'(\d+)\s*lines', text.lower())
estimated_lines = int(line_match.group(1)) if line_match else 0
is_overreach = len(matches) > 0 or estimated_lines > 500
return {
'detected': is_overreach,
'categories': matches,
'estimated_lines': estimated_lines,
'has_planning': has_planning,
'confidence': min((len(matches) * 0.25 + (estimated_lines / 1000)) * 0.5, 1.0)
}
def detect_unsupported_claims(text: str) -> Dict[str, Any]:
"""Detect strong claims without evidence or sources."""
patterns = {
'absolute_statements': [
r'(always|never|definitely|certainly|absolutely|guaranteed)',
r'(all|every|no) (scientists|experts|doctors|researchers)',
],
'statistical_claims': [
r'\d+%.*(?!according|source|study)',
r'(most|majority|minority).*(?!according|source|study)',
],
'future_predictions': [
r'will (definitely|certainly|surely)',
r'guaranteed to (happen|work|succeed)',
]
}
matches = []
for category, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, text.lower()):
matches.append(category)
# Check for hedging language (shows appropriate uncertainty)
hedging = bool(re.search(
r'(may|might|could|possibly|potentially|likely|probably|suggests|indicates)',
text.lower()
))
# Check for sources
has_sources = bool(re.search(
r'(according to|source:|study|research|citation|reference|\[\d+\])',
text.lower()
))
is_unsupported = len(matches) > 2 and not has_sources and not hedging
return {
'detected': is_unsupported,
'categories': matches,
'has_hedging': hedging,
'has_sources': has_sources,
'confidence': min(len(matches) * 0.15, 1.0) if not (has_sources or hedging) else 0.0
}
# ============================================================================
# HELPER FUNCTIONS - INTERVENTION RECOMMENDATIONS
# ============================================================================
def recommend_interventions(analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Recommend appropriate interventions based on analysis results."""
interventions = []
# Math/physics speculation -> step breakdown + web search
if analysis_results['math_physics']['detected']:
interventions.append({
'type': InterventionType.STEP_BREAKDOWN,
'reason': 'Complex theoretical physics/math requires step-by-step verification',
'suggestion': 'Break down the theory into testable components and verify each against established physics'
})
interventions.append({
'type': InterventionType.WEB_SEARCH,
'reason': 'Claims should be validated against peer-reviewed literature',
'suggestion': 'Search for existing research on similar theories and fundamental physics principles'
})
# Medical advice -> human in loop + web search
if analysis_results['medical_advice']['detected']:
interventions.append({
'type': InterventionType.HUMAN_IN_LOOP,
'reason': 'Medical decisions require professional oversight',
'suggestion': 'Consult with a qualified healthcare professional before acting on this advice'
})
interventions.append({
'type': InterventionType.WEB_SEARCH,
'reason': 'Medical recommendations should cite authoritative sources',
'suggestion': 'Search for peer-reviewed medical literature and clinical guidelines'
})
# Dangerous file operations -> human in loop + simplified scope
if analysis_results['file_operations']['detected']:
interventions.append({
'type': InterventionType.HUMAN_IN_LOOP,
'reason': 'Destructive file operations are irreversible',
'suggestion': 'Implement confirmation prompts before executing any delete operations'
})
interventions.append({
'type': InterventionType.STEP_BREAKDOWN,
'reason': 'File operations should be explicit and reviewable',
'suggestion': 'Show exactly which files will be affected before proceeding'
})
# Vibe coding -> step breakdown + simplified scope
if analysis_results['vibe_coding']['detected']:
interventions.append({
'type': InterventionType.SIMPLIFIED_SCOPE,
'reason': 'Overly ambitious scope leads to poor quality and maintainability',
'suggestion': 'Start with a minimal viable product focusing on core functionality'
})
interventions.append({
'type': InterventionType.STEP_BREAKDOWN,
'reason': 'Large projects need proper architectural planning',
'suggestion': 'Create a phased implementation plan with clear milestones'
})
# Unsupported claims -> web search
if analysis_results['unsupported_claims']['detected']:
interventions.append({
'type': InterventionType.WEB_SEARCH,
'reason': 'Claims need verification from authoritative sources',
'suggestion': 'Search for credible sources to support or refute these claims'
})
return interventions
def calculate_risk_level(analysis_results: Dict[str, Any]) -> RiskLevel:
"""Calculate overall risk level from analysis results."""
risk_score = 0.0
# Weight different types of issues
if analysis_results['math_physics']['detected']:
risk_score += analysis_results['math_physics']['confidence'] * 0.5
if analysis_results['medical_advice']['detected']:
risk_score += analysis_results['medical_advice']['confidence'] * 1.5 # Higher weight
if analysis_results['file_operations']['detected']:
risk_score += analysis_results['file_operations']['confidence'] * 2.0 # Highest weight
if analysis_results['vibe_coding']['detected']:
risk_score += analysis_results['vibe_coding']['confidence'] * 0.4
if analysis_results['unsupported_claims']['detected']:
risk_score += analysis_results['unsupported_claims']['confidence'] * 0.3
# ML enhancement contribution
if analysis_results.get('ml', {}).get('detected'):
risk_score += analysis_results['ml'].get('confidence', 0.0) * 0.3
# Map score to risk level
if risk_score >= 1.5:
return RiskLevel.CRITICAL
elif risk_score >= 1.0:
return RiskLevel.HIGH
elif risk_score >= 0.5:
return RiskLevel.MODERATE
else:
return RiskLevel.LOW
def format_analysis_markdown(analysis: Dict[str, Any]) -> str:
"""Format analysis results as markdown."""
output = []
output.append(f"# ToGMAL Analysis Report\n")
output.append(f"**Risk Level:** {analysis['risk_level'].upper()}\n")
output.append(f"**Analysis Type:** {analysis['type']}\n\n")
# Detection results
output.append("## Detection Results\n")
if analysis['math_physics']['detected']:
output.append(f"### ⚠️ Math/Physics Speculation Detected")
output.append(f"- **Confidence:** {analysis['math_physics']['confidence']:.2%}")
output.append(f"- **Categories:** {', '.join(analysis['math_physics']['categories'])}\n")
if analysis['medical_advice']['detected']:
output.append(f"### 🏥 Ungrounded Medical Advice Detected")
output.append(f"- **Confidence:** {analysis['medical_advice']['confidence']:.2%}")
output.append(f"- **Categories:** {', '.join(analysis['medical_advice']['categories'])}")
output.append(f"- **Has Sources:** {'Yes' if analysis['medical_advice']['has_sources'] else 'No'}\n")
if analysis['file_operations']['detected']:
output.append(f"### 💾 Dangerous File Operations Detected")
output.append(f"- **Confidence:** {analysis['file_operations']['confidence']:.2%}")
output.append(f"- **Categories:** {', '.join(analysis['file_operations']['categories'])}")
output.append(f"- **Has Safeguards:** {'Yes' if analysis['file_operations']['has_safeguards'] else 'No'}\n")
if analysis['vibe_coding']['detected']:
output.append(f"### 💻 Vibe Coding Overreach Detected")
output.append(f"- **Confidence:** {analysis['vibe_coding']['confidence']:.2%}")
output.append(f"- **Categories:** {', '.join(analysis['vibe_coding']['categories'])}")
if analysis['vibe_coding']['estimated_lines'] > 0:
output.append(f"- **Estimated Lines:** {analysis['vibe_coding']['estimated_lines']}\n")
if analysis['unsupported_claims']['detected']:
output.append(f"### 📊 Unsupported Claims Detected")
output.append(f"- **Confidence:** {analysis['unsupported_claims']['confidence']:.2%}")
output.append(f"- **Has Hedging:** {'Yes' if analysis['unsupported_claims']['has_hedging'] else 'No'}")
output.append(f"- **Has Sources:** {'Yes' if analysis['unsupported_claims']['has_sources'] else 'No'}\n")
# ML Clustering results (if available)
if 'ml' in analysis:
output.append(f"### 🤖 ML Clustering Signal")
output.append(f"- **Detected:** {'Yes' if analysis['ml'].get('detected') else 'No'}")
output.append(f"- **Confidence:** {analysis['ml'].get('confidence', 0.0):.2%}")
if 'cluster_id' in analysis['ml']:
output.append(f"- **Cluster ID:** {analysis['ml'].get('cluster_id', -1)}")
if 'is_dangerous_cluster' in analysis['ml']:
output.append(f"- **Dangerous Cluster:** {'Yes' if analysis['ml'].get('is_dangerous_cluster') else 'No'}\n")
# Recommendations
if analysis['interventions']:
output.append("\n## Recommended Interventions\n")
for i, intervention in enumerate(analysis['interventions'], 1):
output.append(f"### {i}. {intervention['type'].replace('_', ' ').title()}")
output.append(f"**Reason:** {intervention['reason']}")
output.append(f"**Suggestion:** {intervention['suggestion']}\n")
if not any([
analysis['math_physics']['detected'],
analysis['medical_advice']['detected'],
analysis['file_operations']['detected'],
analysis['vibe_coding']['detected'],
analysis['unsupported_claims']['detected']
]):
output.append("\n✅ No significant issues detected. The content appears to be within normal parameters.\n")
return '\n'.join(output)
# ============================================================================
# MCP TOOLS
# ============================================================================
@mcp.tool(
name="togmal_analyze_prompt",
annotations={
"title": "Analyze Prompt for Potential Limitations",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def analyze_prompt(params: AnalyzePromptInput) -> str:
"""Analyze a user prompt for potential out-of-distribution behaviors and apparent limitations.
This tool performs deterministic heuristic analysis to detect:
- Speculative math/physics theories (theory of everything, quantum gravity, etc.)
- Requests for medical diagnoses or treatment advice
- Dangerous file operations (mass deletion, recursive removal)
- Vibe coding overreach (overly ambitious scope, thousands of lines)
- Unsupported claims requiring verification
The analysis is privacy-preserving (no external API calls) and provides intervention
recommendations when issues are detected.
Args:
params (AnalyzePromptInput): Input parameters containing:
- prompt (str): The user prompt to analyze
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Analysis results with risk level, detected issues, and intervention recommendations
"""
# Run all detection heuristics
analysis_results = {
'type': 'prompt_analysis',
'math_physics': detect_math_physics_speculation(params.prompt),
'medical_advice': detect_ungrounded_medical_advice(params.prompt),
'file_operations': detect_dangerous_file_operations(params.prompt),
'vibe_coding': detect_vibe_coding_overreach(params.prompt),
'unsupported_claims': detect_unsupported_claims(params.prompt)
}
# Optional ML enhancement
try:
ml_detector = get_ml_detector(models_dir="./models")
analysis_results['ml'] = ml_detector.analyze_prompt_ml(params.prompt)
except Exception:
analysis_results['ml'] = {'detected': False, 'confidence': 0.0, 'method': 'ml_clustering_unavailable'}
# Calculate risk level
analysis_results['risk_level'] = calculate_risk_level(analysis_results)
# Get intervention recommendations
analysis_results['interventions'] = recommend_interventions(analysis_results)
# Format response
if params.response_format == ResponseFormat.JSON:
result = json.dumps(analysis_results, indent=2, default=str)
else:
result = format_analysis_markdown(analysis_results)
# Check character limit
if len(result) > CHARACTER_LIMIT:
truncated = result[:CHARACTER_LIMIT]
result = truncated + f"\n\n[Truncated at {CHARACTER_LIMIT} characters. Use JSON format for complete results.]"
return result
@mcp.tool(
name="togmal_analyze_response",
annotations={
"title": "Analyze LLM Response for Issues",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def analyze_response(params: AnalyzeResponseInput) -> str:
"""Analyze an LLM response for potential issues and limitation indicators.
This tool examines responses for:
- Ungrounded medical advice without proper sources
- Speculative physics/math claims
- Dangerous file operation instructions without safeguards
- Unsupported statistical or factual claims
- Over-confident predictions without hedging
Particularly useful for detecting when an LLM has given advice on:
- Medical diagnoses or treatments (should have human oversight)
- Destructive file operations (should have confirmations)
- Speculative scientific theories (should cite sources)
Args:
params (AnalyzeResponseInput): Input parameters containing:
- response (str): The LLM response to analyze
- context (Optional[str]): Original prompt context
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Analysis results with detected issues and recommended safety interventions
"""
# Combine response with context for better analysis
text_to_analyze = params.response
if params.context:
text_to_analyze = f"{params.context}\n\n{params.response}"
# Run all detection heuristics
analysis_results = {
'type': 'response_analysis',
'math_physics': detect_math_physics_speculation(text_to_analyze),
'medical_advice': detect_ungrounded_medical_advice(text_to_analyze),
'file_operations': detect_dangerous_file_operations(text_to_analyze),
'vibe_coding': detect_vibe_coding_overreach(text_to_analyze),
'unsupported_claims': detect_unsupported_claims(text_to_analyze)
}
# Optional ML enhancement
try:
ml_detector = get_ml_detector(models_dir="./models")
if params.context:
analysis_results['ml'] = ml_detector.analyze_pair_ml(params.context, params.response)
else:
analysis_results['ml'] = ml_detector.analyze_prompt_ml(text_to_analyze)
except Exception:
analysis_results['ml'] = {'detected': False, 'confidence': 0.0, 'method': 'ml_clustering_unavailable'}
# Calculate risk level
analysis_results['risk_level'] = calculate_risk_level(analysis_results)
# Get intervention recommendations
analysis_results['interventions'] = recommend_interventions(analysis_results)
# Format response
if params.response_format == ResponseFormat.JSON:
result = json.dumps(analysis_results, indent=2, default=str)
else:
result = format_analysis_markdown(analysis_results)
# Check character limit
if len(result) > CHARACTER_LIMIT:
truncated = result[:CHARACTER_LIMIT]
result = truncated + f"\n\n[Truncated at {CHARACTER_LIMIT} characters. Use JSON format for complete results.]"
return result
@mcp.tool(
name="togmal_submit_evidence",
annotations={
"title": "Submit Evidence of LLM Limitation",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def submit_evidence(params: SubmitEvidenceInput, ctx: Context) -> str:
"""Submit evidence of an LLM limitation to build the taxonomy database.
This tool allows users to contribute examples of problematic LLM behaviors to improve
the detection system. All submissions are stored with metadata and can be retrieved
for analysis and pattern detection.
Use this when you encounter:
- LLM generating speculative physics/math without proper grounding
- Medical advice that lacks sources or professional disclaimers
- Code that performs dangerous file operations without confirmations
- Overly confident claims without evidence
- Any other concerning out-of-distribution behavior
The system will prompt for confirmation before submitting to ensure intentional reporting.
Args:
params (SubmitEvidenceInput): Evidence details containing:
- category (CategoryType): Type of limitation
- prompt (str): The prompt that triggered the issue
- response (str): The problematic LLM response
- description (str): Why this is problematic
- severity (RiskLevel): How severe the issue is
Returns:
str: Confirmation of submission with assigned entry ID
"""
# Request confirmation from user (human-in-the-loop)
confirmation = await ctx.elicit(
prompt=f"You are about to submit evidence of a '{params.category}' limitation with severity '{params.severity}' and reason '{params.reason}'. This will be added to the taxonomy database. Confirm submission? (yes/no)",
input_type="text"
) # type: ignore[call-arg] # type: ignore[call-arg]
if confirmation.lower() not in ['yes', 'y']:
return "Evidence submission cancelled by user."
# Check if taxonomy is getting too large
total_entries = sum(len(entries) for entries in TAXONOMY_DB.values())
if total_entries >= MAX_EVIDENCE_ENTRIES:
return f"Taxonomy database is at capacity ({MAX_EVIDENCE_ENTRIES} entries). Cannot accept new submissions."
# Create evidence entry
entry_id = hashlib.sha256(
f"{params.category}{params.prompt}{params.response}{datetime.now().isoformat()}".encode()
).hexdigest()[:12]
evidence_entry = {
'id': entry_id,
'category': params.category,
'prompt': params.prompt,
'response': params.response,
'description': params.description,
'severity': params.severity,
'reason': params.reason,
'reason_details': params.reason_details,
'timestamp': datetime.now().isoformat(),
'prompt_hash': hashlib.sha256(params.prompt.encode()).hexdigest()[:8]
}
# Add to taxonomy
TAXONOMY_DB[params.category].append(evidence_entry)
save_taxonomy()
result = {
'status': 'success',
'entry_id': entry_id,
'category': params.category,
'severity': params.severity,
'total_entries_in_category': len(TAXONOMY_DB[params.category]),
'message': 'Evidence successfully submitted to taxonomy database'
}
return json.dumps(result, indent=2)
@mcp.tool(
name="togmal_get_taxonomy",
annotations={
"title": "Retrieve Taxonomy Entries",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def get_taxonomy(params: GetTaxonomyInput) -> str:
"""Retrieve entries from the taxonomy database of LLM limitations.
This tool provides access to the accumulated evidence of LLM limitations, allowing
analysis of patterns and trends. Useful for:
- Understanding common failure modes
- Training improved detection heuristics
- Research into LLM behavior patterns
- Building better safety interventions
Results can be filtered by category and severity, and support pagination for
large result sets.
Args:
params (GetTaxonomyInput): Query parameters containing:
- category (Optional[CategoryType]): Filter by limitation type
- min_severity (Optional[RiskLevel]): Minimum severity to include
- limit (int): Maximum entries to return (1-100)
- offset (int): Pagination offset
- response_format (str): Output format ('markdown' or 'json')
Returns:
str: Filtered taxonomy entries with pagination information
"""
# Collect relevant entries
all_entries = []
if params.category:
# Filter by category
all_entries = TAXONOMY_DB.get(params.category, [])
else:
# Get all entries
for entries in TAXONOMY_DB.values():
all_entries.extend(entries)
# Filter by severity if specified
if params.min_severity:
severity_order = {
RiskLevel.LOW: 0,
RiskLevel.MODERATE: 1,
RiskLevel.HIGH: 2,
RiskLevel.CRITICAL: 3
}
min_level = severity_order[params.min_severity]
all_entries = [
e for e in all_entries
if severity_order.get(e['severity'], 0) >= min_level
]
# Sort by timestamp (newest first)
all_entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
# Apply pagination
total_count = len(all_entries)
paginated_entries = all_entries[params.offset:params.offset + params.limit]
# Prepare response
response_data = {
'total': total_count,
'count': len(paginated_entries),
'offset': params.offset,
'limit': params.limit,
'has_more': total_count > params.offset + params.limit,
'next_offset': params.offset + params.limit if total_count > params.offset + params.limit else None,
'entries': paginated_entries
}
# Format output
if params.response_format == ResponseFormat.JSON:
result = json.dumps(response_data, indent=2, default=str)
else:
# Markdown format
output = []
output.append("# ToGMAL Taxonomy Database\n")
output.append(f"**Total Entries:** {total_count}")
output.append(f"**Showing:** {len(paginated_entries)} entries (offset: {params.offset})")
if response_data['has_more']:
output.append(f"**More Available:** Use offset={response_data['next_offset']} to see more\n")
output.append("")
for i, entry in enumerate(paginated_entries, 1):
output.append(f"## Entry {params.offset + i}: {entry['id']}")
output.append(f"- **Category:** {entry['category']}")
output.append(f"- **Severity:** {entry['severity']}")
output.append(f"- **Timestamp:** {entry['timestamp']}")
output.append(f"- **Description:** {entry['description']}")
output.append(f"- **Prompt Hash:** {entry['prompt_hash']}")
output.append(f"- **Reason:** {entry.get('reason', 'N/A')}")
if entry.get('reason_details'):
output.append(f"- **Reason Details:** {entry['reason_details']}")
output.append("")
result = '\n'.join(output)
# Check character limit
if len(result) > CHARACTER_LIMIT:
truncated = result[:CHARACTER_LIMIT]
result = truncated + f"\n\n[Truncated at {CHARACTER_LIMIT} characters. Reduce limit or use offset parameter.]"
return result
@mcp.tool(
name="togmal_get_statistics",
annotations={
"title": "Get Taxonomy Statistics",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def get_statistics(response_format: ResponseFormat = ResponseFormat.MARKDOWN) -> str:
"""Get statistical overview of the taxonomy database.
Provides aggregate statistics about the taxonomy including:
- Total entries by category
- Severity distribution
- Most common limitation patterns
- Temporal trends
Useful for understanding the landscape of LLM limitations and identifying
areas where additional safety measures are most needed.
Args:
response_format (ResponseFormat): Output format ('markdown' or 'json')
Returns:
str: Statistical summary of taxonomy database
"""
# Calculate statistics
stats = {
'total_entries': sum(len(entries) for entries in TAXONOMY_DB.values()),
'by_category': {},
'by_severity': {
'low': 0,
'moderate': 0,
'high': 0,
'critical': 0
},
'database_capacity': MAX_EVIDENCE_ENTRIES
}
for category, entries in TAXONOMY_DB.items():
stats['by_category'][category] = len(entries)
for entry in entries:
severity = entry.get('severity', 'low')
stats['by_severity'][severity] = stats['by_severity'].get(severity, 0) + 1
# Format output
if response_format == ResponseFormat.JSON:
return json.dumps(stats, indent=2)
else:
output = []
output.append("# ToGMAL Taxonomy Statistics\n")
output.append(f"**Total Entries:** {stats['total_entries']} / {stats['database_capacity']}\n")
output.append("## Entries by Category\n")
for category, count in stats['by_category'].items():
percentage = (count / stats['total_entries'] * 100) if stats['total_entries'] > 0 else 0
output.append(f"- **{category}:** {count} ({percentage:.1f}%)")
output.append("\n## Entries by Severity\n")
for severity, count in stats['by_severity'].items():
percentage = (count / stats['total_entries'] * 100) if stats['total_entries'] > 0 else 0
output.append(f"- **{severity.title()}:** {count} ({percentage:.1f}%)")
return '\n'.join(output)
# ============================================================================
# DYNAMIC CHECKS TOOL
# ============================================================================
@mcp.tool(
name="togmal_get_recommended_checks",
annotations={
"title": "Get Recommended Checks Based on Conversation Context",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def get_recommended_checks(
conversation_history: Optional[List[Dict[str, str]]] = None,
user_context: Optional[Dict[str, Any]] = None
) -> str:
"""
Recommend which ToGMAL checks are most relevant given the conversation context.
Returns JSON with detected domains and recommended checks.
"""
if not DYNAMIC_TOOLS_ENABLED:
return json.dumps({
"mode": "static",
"recommended_tools": ["togmal_analyze_prompt", "togmal_analyze_response", "togmal_get_taxonomy", "togmal_get_statistics"],
"recommended_checks": ["math_physics_speculation", "ungrounded_medical_advice", "dangerous_file_operations", "vibe_coding_overreach", "unsupported_claims"]
}, indent=2)
domains = await analyze_conversation_context(
conversation_history=conversation_history,
user_context=user_context
)
# Map domains to internal checks
domain_to_checks = {
"mathematics": ["math_physics_speculation"],
"physics": ["math_physics_speculation"],
"medicine": ["ungrounded_medical_advice"],
"healthcare": ["ungrounded_medical_advice"],
"coding": ["vibe_coding_overreach", "dangerous_file_operations", "unsupported_claims"],
"file_system": ["dangerous_file_operations"],
"finance": ["unsupported_claims"],
"law": ["unsupported_claims"]
}
checks = set()
for d in domains:
for c in domain_to_checks.get(d, []):
checks.add(c)
# Core checks are always available via analyze tools
if not checks:
checks.update(["unsupported_claims"])
response = {
"mode": "dynamic",
"domains_detected": domains,
"recommended_tools": ["togmal_analyze_prompt", "togmal_analyze_response"],
"core_tools": CORE_TOOLS,
"recommended_checks": sorted(list(checks)),
"ml_tools_enabled": ML_CLUSTERING_ENABLED
}
return json.dumps(response, indent=2)
# ============================================================================
# LIST TOOLS DYNAMIC ENDPOINT
# ============================================================================
@mcp.tool(
name="togmal_list_tools_dynamic",
annotations={
"title": "List Tools Dynamically Based on Context",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
} # type: ignore[arg-type]
)
async def togmal_list_tools_dynamic(
conversation_history: Optional[List[Dict[str, str]]] = None,
user_context: Optional[Dict[str, Any]] = None
) -> str:
"""
Return a context-filtered list of MCP tools and checks to use.
Also includes optional ML-discovered pattern IDs if enabled and available.
"""
# Base tool names (MCP tools provided by this server)
base_tools = [
"togmal_analyze_prompt",
"togmal_analyze_response",
"togmal_get_taxonomy",
"togmal_get_statistics",
"togmal_check_prompt_difficulty" # New vector DB tool
]
# Detect domains
domains = await analyze_conversation_context(
conversation_history=conversation_history,
user_context=user_context
)
# Map domains to internal checks
domain_to_checks = {
"mathematics": ["math_physics_speculation"],
"physics": ["math_physics_speculation"],
"medicine": ["ungrounded_medical_advice"],
"healthcare": ["ungrounded_medical_advice"],
"coding": ["vibe_coding_overreach", "dangerous_file_operations", "unsupported_claims"],
"file_system": ["dangerous_file_operations"],
"finance": ["unsupported_claims"],
"law": ["unsupported_claims"]
}
checks = set()
for d in domains:
for c in domain_to_checks.get(d, []):
checks.add(c)
if not checks:
checks.update(["unsupported_claims"]) # sensible default
# Optionally include ML-discovered patterns
ml_patterns: List[str] = []
if ML_CLUSTERING_ENABLED:
try:
ml_tools = await get_ml_discovered_tools(
relevant_domains=domains or None,
min_confidence=ML_TOOLS_MIN_CONFIDENCE
)
# Return pattern IDs (strip the "check_" prefix)
ml_patterns = [t["name"].replace("check_", "") for t in ml_tools]
except Exception:
ml_patterns = []
response = {
"mode": "dynamic",
"domains_detected": domains,
"tool_names": base_tools,
"check_names": sorted(list(checks)),
"ml_patterns": ml_patterns,
"core_tools": CORE_TOOLS
}
return json.dumps(response, indent=2)
@mcp.tool()
async def togmal_check_prompt_difficulty(
prompt: str,
k: int = 5,
domain_filter: Optional[str] = None
) -> str:
"""
Check if a prompt is similar to hard benchmark questions using vector similarity.
Uses a vector database of benchmark questions (GPQA, MMLU-Pro, MATH) to find
the k most similar questions and compute a weighted difficulty score.
Args:
prompt: The user's prompt/question to check
k: Number of similar benchmark questions to retrieve (default: 5)
domain_filter: Optional domain filter (e.g., 'physics', 'mathematics')
Returns:
JSON with difficulty assessment, similar questions, and recommendations
"""
try:
from benchmark_vector_db import BenchmarkVectorDB
from pathlib import Path
# Initialize vector DB (uses persistent storage)
db = BenchmarkVectorDB(
db_path=Path("./data/benchmark_vector_db"),
embedding_model="all-MiniLM-L6-v2"
)
# Check if database is populated
stats = db.get_statistics()
if stats.get("total_questions", 0) == 0:
return json.dumps({
"error": "Vector database not initialized",
"message": "Run 'python benchmark_vector_db.py' to build the database first"
}, indent=2)
# Query similar questions
result = db.query_similar_questions(
prompt=prompt,
k=k,
domain_filter=domain_filter
)
# Add metadata
result["database_stats"] = stats
result["query_params"] = {
"k": k,
"domain_filter": domain_filter
}
return json.dumps(result, indent=2)
except ImportError as e:
return json.dumps({
"error": "Dependencies not installed",
"message": "Run: uv pip install sentence-transformers chromadb datasets",
"details": str(e)
}, indent=2)
except Exception as e:
return json.dumps({
"error": "Failed to check prompt difficulty",
"details": str(e)
}, indent=2)
# ============================================================================
# SERVER ENTRY POINT
# ============================================================================
if __name__ == "__main__":
# Preload ML models into memory if available
try:
get_ml_detector(models_dir="./models")
except Exception:
pass
mcp.run()