Skip to main content
Glama
santoshray02

CSV Editor

by santoshray02

find_anomalies

Detect data anomalies in CSV files using multiple statistical methods to identify outliers and irregularities for data quality assessment.

Instructions

Find anomalies in the data using multiple detection methods.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
columnsNo
sensitivityNo
methodsNo

Implementation Reference

  • Core handler function executing the find_anomalies tool. Implements statistical (Z-score & IQR), pattern-based rarity/format checks, and missing value pattern detection. Adjustable by sensitivity and methods.
    async def find_anomalies(
        session_id: str,
        columns: Optional[List[str]] = None,
        sensitivity: float = 0.95,
        methods: Optional[List[str]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """
        Find anomalies in the data using multiple detection methods.
        
        Args:
            session_id: Session identifier
            columns: Columns to check (None for all)
            sensitivity: Detection sensitivity (0.0 to 1.0, higher = more sensitive)
            methods: Detection methods to use (default: ["statistical", "pattern"])
            ctx: FastMCP context
            
        Returns:
            Dict with anomaly detection results
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
            
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
            
            df = session.df
            
            if columns:
                missing_cols = [col for col in columns if col not in df.columns]
                if missing_cols:
                    return {"success": False, "error": f"Columns not found: {missing_cols}"}
                target_cols = columns
            else:
                target_cols = df.columns.tolist()
            
            if not methods:
                methods = ["statistical", "pattern", "missing"]
            
            anomalies = {
                "summary": {
                    "total_anomalies": 0,
                    "affected_rows": set(),
                    "affected_columns": []
                },
                "by_column": {},
                "by_method": {}
            }
            
            # Statistical anomalies (outliers)
            if "statistical" in methods:
                numeric_cols = df[target_cols].select_dtypes(include=[np.number]).columns
                statistical_anomalies = {}
                
                for col in numeric_cols:
                    col_data = df[col].dropna()
                    if len(col_data) > 0:
                        # Z-score method
                        z_scores = np.abs((col_data - col_data.mean()) / col_data.std())
                        z_threshold = 3 * (1 - sensitivity + 0.5)  # Adjust threshold based on sensitivity
                        z_anomalies = df.index[z_scores > z_threshold].tolist()
                        
                        # IQR method
                        Q1 = col_data.quantile(0.25)
                        Q3 = col_data.quantile(0.75)
                        IQR = Q3 - Q1
                        iqr_factor = 1.5 * (2 - sensitivity)  # Adjust factor based on sensitivity
                        lower = Q1 - iqr_factor * IQR
                        upper = Q3 + iqr_factor * IQR
                        iqr_anomalies = df.index[(df[col] < lower) | (df[col] > upper)].tolist()
                        
                        # Combine both methods
                        combined_anomalies = list(set(z_anomalies) | set(iqr_anomalies))
                        
                        if combined_anomalies:
                            statistical_anomalies[col] = {
                                "anomaly_count": len(combined_anomalies),
                                "anomaly_indices": combined_anomalies[:100],
                                "anomaly_values": df.loc[combined_anomalies[:10], col].tolist(),
                                "mean": float(col_data.mean()),
                                "std": float(col_data.std()),
                                "lower_bound": float(lower),
                                "upper_bound": float(upper)
                            }
                            
                            anomalies["summary"]["total_anomalies"] += len(combined_anomalies)
                            anomalies["summary"]["affected_rows"].update(combined_anomalies)
                            anomalies["summary"]["affected_columns"].append(col)
                
                if statistical_anomalies:
                    anomalies["by_method"]["statistical"] = statistical_anomalies
            
            # Pattern anomalies
            if "pattern" in methods:
                pattern_anomalies = {}
                
                for col in target_cols:
                    if df[col].dtype == object or pd.api.types.is_string_dtype(df[col]):
                        col_data = df[col].dropna()
                        if len(col_data) > 0:
                            # Detect unusual patterns
                            value_counts = col_data.value_counts()
                            total_count = len(col_data)
                            
                            # Find rare values (appearing less than threshold)
                            threshold = (1 - sensitivity) * 0.01  # Adjust threshold
                            rare_values = value_counts[value_counts / total_count < threshold]
                            
                            if len(rare_values) > 0:
                                rare_indices = df[df[col].isin(rare_values.index)].index.tolist()
                                
                                # Check for format anomalies (e.g., different case, special characters)
                                common_pattern = None
                                if len(value_counts) > 10:
                                    # Detect common pattern from frequent values
                                    top_values = value_counts.head(10).index
                                    
                                    # Check if most values are uppercase/lowercase
                                    upper_count = sum(1 for v in top_values if str(v).isupper())
                                    lower_count = sum(1 for v in top_values if str(v).islower())
                                    
                                    if upper_count > 7:
                                        common_pattern = "uppercase"
                                    elif lower_count > 7:
                                        common_pattern = "lowercase"
                                
                                format_anomalies = []
                                if common_pattern:
                                    for idx, val in col_data.items():
                                        if common_pattern == "uppercase" and not str(val).isupper():
                                            format_anomalies.append(idx)
                                        elif common_pattern == "lowercase" and not str(val).islower():
                                            format_anomalies.append(idx)
                                
                                all_pattern_anomalies = list(set(rare_indices + format_anomalies))
                                
                                if all_pattern_anomalies:
                                    pattern_anomalies[col] = {
                                        "anomaly_count": len(all_pattern_anomalies),
                                        "rare_values": rare_values.head(10).to_dict(),
                                        "anomaly_indices": all_pattern_anomalies[:100],
                                        "common_pattern": common_pattern
                                    }
                                    
                                    anomalies["summary"]["total_anomalies"] += len(all_pattern_anomalies)
                                    anomalies["summary"]["affected_rows"].update(all_pattern_anomalies)
                                    if col not in anomalies["summary"]["affected_columns"]:
                                        anomalies["summary"]["affected_columns"].append(col)
                
                if pattern_anomalies:
                    anomalies["by_method"]["pattern"] = pattern_anomalies
            
            # Missing value anomalies
            if "missing" in methods:
                missing_anomalies = {}
                
                for col in target_cols:
                    null_mask = df[col].isna()
                    null_count = null_mask.sum()
                    
                    if null_count > 0:
                        null_ratio = null_count / len(df)
                        
                        # Check for suspicious missing patterns
                        if 0 < null_ratio < 0.5:  # Partially missing
                            # Check if missing values are clustered
                            null_indices = df.index[null_mask].tolist()
                            
                            # Check for sequential missing values
                            sequential_missing = []
                            if len(null_indices) > 1:
                                for i in range(len(null_indices) - 1):
                                    if null_indices[i+1] - null_indices[i] == 1:
                                        if not sequential_missing or null_indices[i] - sequential_missing[-1][-1] == 1:
                                            if sequential_missing:
                                                sequential_missing[-1].append(null_indices[i+1])
                                            else:
                                                sequential_missing.append([null_indices[i], null_indices[i+1]])
                            
                            # Flag as anomaly if there are suspicious patterns
                            is_anomaly = len(sequential_missing) > 0 and len(sequential_missing) > len(null_indices) * 0.3
                            
                            if is_anomaly or (null_ratio > 0.1 and null_ratio < 0.3):
                                missing_anomalies[col] = {
                                    "missing_count": int(null_count),
                                    "missing_ratio": round(null_ratio, 4),
                                    "missing_indices": null_indices[:100],
                                    "sequential_clusters": len(sequential_missing),
                                    "pattern": "clustered" if sequential_missing else "random"
                                }
                                
                                anomalies["summary"]["affected_columns"].append(col)
                
                if missing_anomalies:
                    anomalies["by_method"]["missing"] = missing_anomalies
            
            # Organize anomalies by column
            for method_name, method_anomalies in anomalies["by_method"].items():
                for col, col_anomalies in method_anomalies.items():
                    if col not in anomalies["by_column"]:
                        anomalies["by_column"][col] = {}
                    anomalies["by_column"][col][method_name] = col_anomalies
            
            # Convert set to list for JSON serialization
            anomalies["summary"]["affected_rows"] = list(anomalies["summary"]["affected_rows"])[:1000]
            anomalies["summary"]["affected_columns"] = list(set(anomalies["summary"]["affected_columns"]))
            
            # Calculate anomaly score
            total_cells = len(df) * len(target_cols)
            anomaly_cells = len(anomalies["summary"]["affected_rows"]) * len(anomalies["summary"]["affected_columns"])
            anomaly_score = min(anomaly_cells / total_cells, 1.0) * 100
            
            anomalies["summary"]["anomaly_score"] = round(anomaly_score, 2)
            anomalies["summary"]["severity"] = (
                "high" if anomaly_score > 10
                else "medium" if anomaly_score > 5
                else "low"
            )
            
            session.record_operation(OperationType.ANOMALY_DETECTION, {
                "methods": methods,
                "sensitivity": sensitivity,
                "anomalies_found": anomalies["summary"]["total_anomalies"]
            })
            
            return {
                "success": True,
                "anomalies": anomalies,
                "columns_analyzed": target_cols,
                "methods_used": methods,
                "sensitivity": sensitivity
            }
            
        except Exception as e:
            logger.error(f"Error finding anomalies: {str(e)}")
            return {"success": False, "error": str(e)}
  • FastMCP tool registration using @mcp.tool decorator. Defines input schema via annotated parameters (session_id, columns, sensitivity, methods, ctx) and delegates execution to the core handler in validation.py.
    @mcp.tool
    async def find_anomalies(
        session_id: str,
        columns: Optional[List[str]] = None,
        sensitivity: float = 0.95,
        methods: Optional[List[str]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """Find anomalies in the data using multiple detection methods."""
        return await _find_anomalies(session_id, columns, sensitivity, methods, ctx)
  • Input schema inferred from function parameters in the registered tool definition.
    async def find_anomalies(
        session_id: str,
        columns: Optional[List[str]] = None,
        sensitivity: float = 0.95,
        methods: Optional[List[str]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """Find anomalies in the data using multiple detection methods."""
        return await _find_anomalies(session_id, columns, sensitivity, methods, ctx)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server