Skip to main content
Glama

Process Mining MCP Server

by mostapow
pm4py_tools.py25.2 kB
import os import random from contextlib import contextmanager from dotenv import load_dotenv from fastmcp import FastMCP from pandas import DataFrame from pydantic import BaseModel, Field from typing import List, Optional, Literal, Dict, Any import pm4py import pandas as pd import numpy as np from sqlalchemy import create_engine, text from config import DB_CONFIG, TABLE_SCHEMA, DatabaseConfig, TableSchema # Load environment variables load_dotenv() # =========================== # DATABASE CONNECTION SCHEMAS # =========================== class LogSource(BaseModel): # For database sources db_config: Optional[DatabaseConfig] = Field(None, description="Database configuration") table_schema: Optional[TableSchema] = Field(None, description="Event log table schema") # Caching options use_cache: bool = Field(default=True, description="Cache loaded event logs") cache_key: Optional[str] = Field(None, description="Custom cache key") # =========================== # DATABASE UTILITIES # =========================== class EventLogCache: """Simple in-memory cache for event logs""" _cache: Dict[str, DataFrame] = {} @classmethod def get(cls, key: str) -> Optional[DataFrame]: return cls._cache.get(key) @classmethod def set(cls, key: str, log: DataFrame): cls._cache[key] = log @classmethod def clear(cls): cls._cache.clear() @contextmanager def get_db_connection(config: DatabaseConfig): """Context manager for database connections""" engine = create_engine(config.connection_string) conn = engine.connect() try: yield conn finally: conn.close() engine.dispose() def build_event_log_query(schema: TableSchema, db_schema: str = "public") -> str: """Build SQL query for extracting event log""" # Base columns columns = [ f"{schema.case_column} as \"case:concept:name\"", f"{schema.activity_column} as \"concept:name\"", f"{schema.timestamp_column} as \"time:timestamp\"" ] # Add resource column if specified if schema.resource_column: columns.append(f"{schema.resource_column} as \"org:resource\"") # Add additional columns if schema.additional_columns: columns.extend(schema.additional_columns) # Build query query = f""" SELECT {', '.join(columns)} FROM {db_schema}.{schema.table_name} """ # Add WHERE clause if schema.where_clause: query += f"\nWHERE {schema.where_clause}" return query def load_from_database(config: DatabaseConfig, schema: TableSchema) -> DataFrame: """Load event log from database""" # Build query query = build_event_log_query(schema, config.db_schema) # Execute query and load into DataFrame with get_db_connection(config) as conn: df = pd.read_sql(text(query), conn) # Ensure timestamp column is datetime df['time:timestamp'] = pd.to_datetime(df['time:timestamp'], utc=True) # Convert to PM4Py event log return df # =========================== # ENHANCED LOAD FUNCTION # =========================== def load_event_log(source: LogSource) -> DataFrame: """ Load event log from database. """ # Generate cache key if source.use_cache: if source.cache_key: cache_key = source.cache_key else: cache_key = f"db:{source.db_config.database}:{source.table_schema.table_name}" # Check cache cached_log = EventLogCache.get(cache_key) if cached_log is not None: return cached_log df = load_from_database(source.db_config, source.table_schema) # Cache if enabled if source.use_cache: EventLogCache.set(cache_key, df) return df mcp = FastMCP("process-mining-server") table_name: str = Field(description="Name of the event log table") case_column: str = Field(description="Column containing case IDs") activity_column: str = Field(description="Column containing activity names") timestamp_column: str = Field(description="Column containing timestamps") log_source = LogSource( db_config=DB_CONFIG, table_schema=TABLE_SCHEMA, cache_key=f"db:{TABLE_SCHEMA.table_name}", ) class BasicStatsOutput(BaseModel): """Output for basic statistics analysis""" total_cases: int total_events: int total_activities: int avg_events_per_case: float log_timeframe: Dict[str, Any] case_duration_stats: Dict[str, float] resource_stats: Optional[Dict[str, Any]] = None activity_frequencies: Optional[Dict[str, float]] = None @mcp.tool("get_basic_stats") def get_basic_stats() -> BasicStatsOutput: """ Get basic statistics about an event log. Returns fundamental metrics including: - Total number of cases, events, and unique activities - Average events per case - Log timeframe (start, end, duration) - Case duration statistics (mean, median, min, max) - Resource utilization patterns - Activity frequencies (relative frequencies for all activities) """ df = load_event_log(log_source) # Calculate case durations case_durations = df.groupby('case:concept:name')['time:timestamp'].agg(['min', 'max']) case_durations['duration'] = (case_durations['max'] - case_durations['min']).dt.total_seconds() / 3600 # hours # Basic stats basic_stats = { "total_cases": df['case:concept:name'].nunique(), "total_events": len(df), "total_activities": df['concept:name'].nunique(), "avg_events_per_case": len(df) / df['case:concept:name'].nunique(), "log_timeframe": { "start": df['time:timestamp'].min().isoformat(), "end": df['time:timestamp'].max().isoformat(), "duration_days": (df['time:timestamp'].max() - df['time:timestamp'].min()).days }, "case_duration_stats": { "mean_hours": float(case_durations['duration'].mean()), "median_hours": float(case_durations['duration'].median()), "std_hours": float(case_durations['duration'].std()), "min_hours": float(case_durations['duration'].min()), "max_hours": float(case_durations['duration'].max()) } } # Resource statistics (if resource column exists) if 'org:resource' in df.columns: resource_stats = df.groupby('org:resource').agg({ 'case:concept:name': 'nunique', 'concept:name': 'count' }) basic_stats["resource_stats"] = { "total_resources": len(resource_stats), "avg_events_per_resource": float(resource_stats['concept:name'].mean()), "avg_cases_per_resource": float(resource_stats['case:concept:name'].mean()), "resource_utilization": { "min_cases": int(resource_stats['case:concept:name'].min()), "max_cases": int(resource_stats['case:concept:name'].max()), "min_events": int(resource_stats['concept:name'].min()), "max_events": int(resource_stats['concept:name'].max()) } } # Activity frequencies activity_freq = df['concept:name'].value_counts(normalize=True) * 100 # Convert to percentages basic_stats["activity_frequencies"] = activity_freq.to_dict() # All activities with their relative frequencies return BasicStatsOutput(**basic_stats) class ProcessDiscoveryInput(BaseModel): """Input for process discovery""" discovery_type: Literal["dfg", "petri_net"] = Field( default="petri_net", description="Type of process model to discover" ) discovery_algorithm: Optional[Literal["inductive", "alpha", "heuristic"]] = Field( default="inductive", description="Algorithm to use for Petri net discovery (only used if discovery_type is petri_net)" ) save_model: bool = Field( default=False, description="Save the model to a file" ) class ProcessInfo(BaseModel): """Information about discovered process model""" model_type: str model_description: str model_stats: Dict[str, Any] class ProcessDiscoveryOutput(BaseModel): """Output for process discovery""" process_info: ProcessInfo @mcp.tool("discover_process") def discover_process(params: ProcessDiscoveryInput) -> ProcessDiscoveryOutput: """ Discover a process model from the event log. Args: params: ProcessDiscoveryInput containing: - discovery_type: Type of model to discover ("dfg" or "petri_net") - discovery_algorithm: Algorithm to use for Petri net discovery (only for petri_net) Returns: ProcessDiscoveryOutput containing: - process_info: Information about the discovered model including: - model_type: Type of discovered model - model_description: Natural language description of the model - model_stats: Statistics about the model """ df = load_event_log(log_source) log = pm4py.convert_to_event_log(df) # Create output directory if it doesn't exist output_dir = "process_models" os.makedirs(output_dir, exist_ok=True) if params.discovery_type == "dfg": # Discover DFG dfg = pm4py.discover_dfg(log) # Get DFG abstraction using LLM model_description = pm4py.llm.abstract_dfg(log) # Calculate DFG statistics model_stats = { "num_activities": len(dfg[0]), "num_arcs": len(dfg[1]), "max_frequency": max(dfg[1].values()) if dfg[1] else 0, "min_frequency": min(dfg[1].values()) if dfg[1] else 0 } # Save DFG visualization if params.save_model: from pm4py.visualization.dfg import visualizer as dfg_visualizer gviz = dfg_visualizer.apply(dfg[0], dfg[1], log=log, variant=dfg_visualizer.Variants.FREQUENCY) dfg_visualizer.save(gviz, os.path.join(output_dir, "dfg_model.png")) process_info = ProcessInfo( model_type="directly-follows graph", model_description=model_description, model_stats=model_stats ) else: # petri_net # Discover Petri net if params.discovery_algorithm == "inductive": net, im, fm = pm4py.discover_petri_net_inductive(log) elif params.discovery_algorithm == "alpha": net, im, fm = pm4py.discover_petri_net_alpha(log) else: # heuristic net, im, fm = pm4py.discover_petri_net_heuristics(log) # Get Petri net abstraction using LLM model_description = pm4py.llm.abstract_petri_net(net, im, fm) # Calculate Petri net statistics model_stats = { "places": len(net.places), "transitions": len(net.transitions), "arcs": len(net.arcs), "algorithm": params.discovery_algorithm } # Save Petri net visualization if params.save_model: from pm4py.visualization.petri_net import visualizer as pn_visualizer gviz = pn_visualizer.apply(net, im, fm, variant=pn_visualizer.Variants.WO_DECORATION) pn_visualizer.save(gviz, os.path.join(output_dir, "petri_net_model.png")) process_info = ProcessInfo( model_type="petri net", model_description=model_description, model_stats=model_stats, ) return ProcessDiscoveryOutput(process_info=process_info) @mcp.tool("get_process_variants_summary") def get_process_variants_summary() -> Dict[str, str]: """ Get a natural language summary of process variants. """ df = load_event_log(log_source) log = pm4py.convert_to_event_log(df) return { "summary": pm4py.llm.abstract_variants(log), "total_variants": len(pm4py.get_variants(log)) } class ConformanceCheckInput(BaseModel): """Input for conformance checking""" case_id: str discovery_algorithm: Optional[Literal["inductive", "alpha", "heuristic"]] = Field( default="inductive", description="Algorithm to use for Petri net discovery (only used if reference_model_type is petri_net)" ) @mcp.tool("check_partial_conformance") def check_partial_conformance(params: ConformanceCheckInput) -> bool: """ Check if a partial trace conforms to the discovered process model. Discovers a process model from the event log and checks if the current trace for the specified case follows the expected process flow using token-based replay. Args: params: ConformanceCheckInput with case_id and discovery_algorithm Returns: bool: True if conformant (no missing tokens), False if violations detected """ random.seed(112) case_id = params.case_id df = load_event_log(log_source) case_trace = df[df["case:concept:name"] == case_id].copy() case_trace = case_trace.reset_index(drop=True) df = df[df["case:concept:name"] != case_id].copy() if params.discovery_algorithm == "inductive": net, im, fm = pm4py.discover_petri_net_inductive(df) elif params.discovery_algorithm == "alpha": net, im, fm = pm4py.discover_petri_net_alpha(df) else: # heuristic net, im, fm = pm4py.discover_petri_net_heuristics(df) from pm4py.algo.conformance.tokenreplay import algorithm as token_replay # Calculate fitness using token-based replay replayed_traces = token_replay.apply(case_trace, net, im, fm) result = replayed_traces[0] return result["missing_tokens"] == 0 class PerformanceAnalysisInput(BaseModel): """Input for performance analysis""" bottleneck_threshold: float = Field( default=1.5, description="Threshold for bottleneck detection (times the median duration)" ) percentage_of_cases: float = Field( default=1.0, ge=0.0, le=1.0, description="Percentage of cases to analyze" ) class ActivityTiming(BaseModel): """Timing information for an activity""" name: str avg_duration: float # in hours median_duration: float # in hours min_duration: float # in hours max_duration: float # in hours std_duration: float # in hours is_bottleneck: bool waiting_time: Optional[float] = None # in hours frequency: int class PathTiming(BaseModel): """Timing information for a path""" path: List[str] avg_duration: float # in hours median_duration: float # in hours frequency: int bottleneck_activities: List[str] class PerformanceAnalysisOutput(BaseModel): """Output for performance analysis""" case_duration_stats: Dict[str, float] activity_timings: List[ActivityTiming] bottleneck_analysis: Dict[str, Any] @mcp.tool("analyze_performance") def analyze_performance(params: PerformanceAnalysisInput) -> PerformanceAnalysisOutput: """ Analyze process performance including timing and bottlenecks. Args: params: PerformanceAnalysisInput containing: - bottleneck_threshold: Threshold for bottleneck detection - percentage_of_cases: Percentage of cases to analyze Returns: PerformanceAnalysisOutput containing: - case_duration_stats: Statistics about case durations - activity_timings: Detailed timing information for each activity - bottleneck_analysis: Analysis of bottlenecks """ random.seed(112) df = load_event_log(log_source) # Sample cases if needed if params.percentage_of_cases < 1.0: case_ids = df["case:concept:name"].unique() sampled_cases = random.sample(list(case_ids), int(len(case_ids) * params.percentage_of_cases)) df = df[df["case:concept:name"].isin(sampled_cases)] # Convert timestamps to datetime if they aren't already df['time:timestamp'] = pd.to_datetime(df['time:timestamp']) # Calculate case durations case_durations = df.groupby('case:concept:name')['time:timestamp'].agg(['min', 'max']) case_durations['duration'] = (case_durations['max'] - case_durations['min']).dt.total_seconds() / 3600 # hours # Calculate activity durations df['next_timestamp'] = df.groupby('case:concept:name')['time:timestamp'].shift(-1) df['activity_duration'] = (df['next_timestamp'] - df['time:timestamp']).dt.total_seconds() / 3600 # hours # Analyze activity timings activity_stats = df.groupby('concept:name').agg({ 'activity_duration': ['mean', 'median', 'min', 'max', 'std', 'count'] }) # Calculate bottleneck threshold median_duration = activity_stats[('activity_duration', 'median')].median() bottleneck_threshold = median_duration * params.bottleneck_threshold # Prepare activity timings activity_timings = [] for activity in activity_stats.index: stats = activity_stats.loc[activity] activity_timings.append(ActivityTiming( name=activity, avg_duration=float(stats[('activity_duration', 'mean')]), median_duration=float(stats[('activity_duration', 'median')]), min_duration=float(stats[('activity_duration', 'min')]), max_duration=float(stats[('activity_duration', 'max')]), std_duration=float(stats[('activity_duration', 'std')]), is_bottleneck=float(stats[('activity_duration', 'median')]) > bottleneck_threshold, frequency=int(stats[('activity_duration', 'count')]) )) # Prepare bottleneck analysis bottleneck_activities = [a for a in activity_timings if a.is_bottleneck] bottleneck_analysis = { "total_bottlenecks": len(bottleneck_activities), "bottleneck_activities": [a.name for a in bottleneck_activities], "bottleneck_impact": { "avg_delay": float(sum(a.avg_duration for a in bottleneck_activities) / len( bottleneck_activities)) if bottleneck_activities else 0.0, "max_delay": float(max(a.max_duration for a in bottleneck_activities)) if bottleneck_activities else 0.0, "affected_cases": int(sum(a.frequency for a in bottleneck_activities)) } } return PerformanceAnalysisOutput( case_duration_stats={ "mean": float(case_durations['duration'].mean()), "median": float(case_durations['duration'].median()), "min": float(case_durations['duration'].min()), "max": float(case_durations['duration'].max()), "std": float(case_durations['duration'].std()) }, activity_timings=activity_timings, bottleneck_analysis=bottleneck_analysis ) class VariantAnalysisInput(BaseModel): """Input for variant analysis""" top_k: int = Field(default=10, ge=1, le=100, description="Number of top variants to return") coverage_threshold: float = Field(default=0.8, ge=0.0, le=1.0, description="Calculate how many variants cover this percentage of cases") percentage_of_cases: float = Field(default=1.0, ge=0.0, le=1.0, description="Percentage of cases to analyze") class VariantInfo(BaseModel): """Information about a process variant""" rank: int trace: tuple count: int percentage: float cumulative_percentage: float num_activities: int class VariantAnalysisOutput(BaseModel): """Output for variant analysis""" total_variants: int top_variants: List[VariantInfo] coverage_analysis: Dict[str, Any] variant_complexity: Dict[str, float] @mcp.tool("analyze_variants") def analyze_variants(params: VariantAnalysisInput) -> VariantAnalysisOutput: """ Analyze process variants (unique paths through the process). Args: params: VariantAnalysisInput containing: - top_k: Number of top variants to return - coverage_threshold: Threshold for coverage analysis - percentage_of_cases: Percentage of cases to analyze Returns: VariantAnalysisOutput containing: - total_variants: Total number of unique variants - top_variants: Details of top K most frequent variants - coverage_analysis: Coverage analysis results - variant_complexity: Variant complexity metrics """ random.seed(113) df = load_event_log(log_source) # Sample cases if needed if params.percentage_of_cases < 1.0: case_ids = df["case:concept:name"].unique() sampled_cases = random.sample(list(case_ids), int(len(case_ids) * params.percentage_of_cases)) df = df[df["case:concept:name"].isin(sampled_cases)] log = pm4py.convert_to_event_log(df) # Get variants directly from the log variants = pm4py.get_variants(log) variant_counts = {variant: len(cases) for variant, cases in variants.items()} # Sort variants by count variants_sorted = sorted(variant_counts.items(), key=lambda x: x[1], reverse=True) total_cases = sum(count for _, count in variants_sorted) # Find variants covering threshold percentage cumulative_count = 0 threshold_variants = [] for variant, count in variants_sorted: cumulative_count += count threshold_variants.append((variant, count)) if cumulative_count / total_cases >= params.coverage_threshold: break # Build top variants list top_variants = [] cumulative = 0 for i, (variant, count) in enumerate(variants_sorted[:params.top_k]): cumulative += count # Convert variant to tuple of activities trace = tuple(variant) top_variants.append(VariantInfo( rank=i + 1, trace=trace, count=count, percentage=float(count / total_cases * 100), cumulative_percentage=float(cumulative / total_cases * 100), num_activities=len(trace) )) # Calculate variant complexity metrics variant_lengths = [len(variant) for variant, _ in variants_sorted] return VariantAnalysisOutput( total_variants=len(variants_sorted), top_variants=top_variants, coverage_analysis={ "variants_for_threshold": len(threshold_variants), "threshold_percentage": params.coverage_threshold * 100, "percentage_of_total_variants": float(len(threshold_variants) / len(variants_sorted) * 100) }, variant_complexity={ "mean_length": float(np.mean(variant_lengths)), "median_length": float(np.median(variant_lengths)), "min_length": int(np.min(variant_lengths)), "max_length": int(np.max(variant_lengths)) } ) class ActivityAnalysisInput(BaseModel): """Input for activity analysis""" top_k: int = Field(default=10, ge=1, le=100, description="Number of top activities to analyze in detail") include_statistics: bool = Field(default=True, description="Include distribution statistics") class ActivityInfo(BaseModel): """Information about a single activity""" name: str frequency: int percentage: float cases_involved: int first_occurrence: str last_occurrence: str class ActivityAnalysisOutput(BaseModel): """Output for activity analysis""" unique_activities: int top_activities: List[ActivityInfo] distribution_stats: Optional[Dict[str, float]] = None @mcp.tool("analyze_activities") def analyze_activities(params: ActivityAnalysisInput) -> ActivityAnalysisOutput: """ Analyze activities in the event log. Provides: - Count of unique activities - Detailed information about top K activities - Activity frequency distribution statistics """ df = load_event_log(log_source) # Activity frequency activity_freq = df['concept:name'].value_counts() # Activity statistics activity_stats = df.groupby('concept:name').agg({ 'case:concept:name': 'nunique', 'time:timestamp': ['count', 'min', 'max'] }) # Build top activities list top_activities = [] for activity in activity_freq.head(params.top_k).index: top_activities.append(ActivityInfo( name=activity, frequency=int(activity_freq[activity]), percentage=float(activity_freq[activity] / len(df) * 100), cases_involved=int(activity_stats.loc[activity, ('case:concept:name', 'nunique')]), first_occurrence=activity_stats.loc[activity, ('time:timestamp', 'min')].isoformat(), last_occurrence=activity_stats.loc[activity, ('time:timestamp', 'max')].isoformat() )) output = ActivityAnalysisOutput( unique_activities=len(activity_freq), top_activities=top_activities ) if params.include_statistics: output.distribution_stats = { "mean": float(activity_freq.mean()), "std": float(activity_freq.std()), "min": int(activity_freq.min()), "max": int(activity_freq.max()) } return output if __name__ == "__main__": # Initialize and run the server mcp.run(transport='stdio') #SSE Transport #mcp.run(transport="sse", host="127.0.0.1", port=8000)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mostapow/mcp4pm'

If you have feedback or need assistance with the MCP directory API, please join our Discord server