Skip to main content
Glama
santoshray02

CSV Editor

by santoshray02

find_anomalies

Find anomalies in CSV data by applying multiple detection methods. Customize sensitivity and target columns for outlier identification.

Instructions

Find anomalies in the data using multiple detection methods.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
columnsNo
sensitivityNo
methodsNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Core implementation of the find_anomalies tool. Uses three detection methods: (1) statistical (Z-score + IQR for numeric columns), (2) pattern (rare values + format casing anomalies for string columns), and (3) missing values (clustered/sequential null patterns). Returns anomaly summary with affected rows, columns, severity score, and per-column/per-method breakdowns.
    async def find_anomalies(
        session_id: str,
        columns: list[str] | None = None,
        sensitivity: float = 0.95,
        methods: list[str] | None = None,
        ctx: Context = None,
    ) -> dict[str, Any]:
        """
        Find anomalies in the data using multiple detection methods.
    
        Args:
            session_id: Session identifier
            columns: Columns to check (None for all)
            sensitivity: Detection sensitivity (0.0 to 1.0, higher = more sensitive)
            methods: Detection methods to use (default: ["statistical", "pattern"])
            ctx: FastMCP context
    
        Returns:
            Dict with anomaly detection results
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
    
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
    
            df = session.df
    
            if columns:
                missing_cols = [col for col in columns if col not in df.columns]
                if missing_cols:
                    return {"success": False, "error": f"Columns not found: {missing_cols}"}
                target_cols = columns
            else:
                target_cols = df.columns.tolist()
    
            if not methods:
                methods = ["statistical", "pattern", "missing"]
    
            anomalies = {
                "summary": {"total_anomalies": 0, "affected_rows": set(), "affected_columns": []},
                "by_column": {},
                "by_method": {},
            }
    
            # Statistical anomalies (outliers)
            if "statistical" in methods:
                numeric_cols = df[target_cols].select_dtypes(include=[np.number]).columns
                statistical_anomalies = {}
    
                for col in numeric_cols:
                    col_data = df[col].dropna()
                    if len(col_data) > 0:
                        # Z-score method
                        z_scores = np.abs((col_data - col_data.mean()) / col_data.std())
                        z_threshold = 3 * (
                            1 - sensitivity + 0.5
                        )  # Adjust threshold based on sensitivity
                        z_anomalies = df.index[z_scores > z_threshold].tolist()
    
                        # IQR method
                        Q1 = col_data.quantile(0.25)
                        Q3 = col_data.quantile(0.75)
                        IQR = Q3 - Q1
                        iqr_factor = 1.5 * (2 - sensitivity)  # Adjust factor based on sensitivity
                        lower = Q1 - iqr_factor * IQR
                        upper = Q3 + iqr_factor * IQR
                        iqr_anomalies = df.index[(df[col] < lower) | (df[col] > upper)].tolist()
    
                        # Combine both methods
                        combined_anomalies = list(set(z_anomalies) | set(iqr_anomalies))
    
                        if combined_anomalies:
                            statistical_anomalies[col] = {
                                "anomaly_count": len(combined_anomalies),
                                "anomaly_indices": combined_anomalies[:100],
                                "anomaly_values": df.loc[combined_anomalies[:10], col].tolist(),
                                "mean": float(col_data.mean()),
                                "std": float(col_data.std()),
                                "lower_bound": float(lower),
                                "upper_bound": float(upper),
                            }
    
                            anomalies["summary"]["total_anomalies"] += len(combined_anomalies)
                            anomalies["summary"]["affected_rows"].update(combined_anomalies)
                            anomalies["summary"]["affected_columns"].append(col)
    
                if statistical_anomalies:
                    anomalies["by_method"]["statistical"] = statistical_anomalies
    
            # Pattern anomalies
            if "pattern" in methods:
                pattern_anomalies = {}
    
                for col in target_cols:
                    if df[col].dtype == object or pd.api.types.is_string_dtype(df[col]):
                        col_data = df[col].dropna()
                        if len(col_data) > 0:
                            # Detect unusual patterns
                            value_counts = col_data.value_counts()
                            total_count = len(col_data)
    
                            # Find rare values (appearing less than threshold)
                            threshold = (1 - sensitivity) * 0.01  # Adjust threshold
                            rare_values = value_counts[value_counts / total_count < threshold]
    
                            if len(rare_values) > 0:
                                rare_indices = df[df[col].isin(rare_values.index)].index.tolist()
    
                                # Check for format anomalies (e.g., different case, special characters)
                                common_pattern = None
                                if len(value_counts) > 10:
                                    # Detect common pattern from frequent values
                                    top_values = value_counts.head(10).index
    
                                    # Check if most values are uppercase/lowercase
                                    upper_count = sum(1 for v in top_values if str(v).isupper())
                                    lower_count = sum(1 for v in top_values if str(v).islower())
    
                                    if upper_count > 7:
                                        common_pattern = "uppercase"
                                    elif lower_count > 7:
                                        common_pattern = "lowercase"
    
                                format_anomalies = []
                                if common_pattern:
                                    for idx, val in col_data.items():
                                        if (
                                            common_pattern == "uppercase" and not str(val).isupper()
                                        ) or (common_pattern == "lowercase" and not str(val).islower()):
                                            format_anomalies.append(idx)
    
                                all_pattern_anomalies = list(set(rare_indices + format_anomalies))
    
                                if all_pattern_anomalies:
                                    pattern_anomalies[col] = {
                                        "anomaly_count": len(all_pattern_anomalies),
                                        "rare_values": rare_values.head(10).to_dict(),
                                        "anomaly_indices": all_pattern_anomalies[:100],
                                        "common_pattern": common_pattern,
                                    }
    
                                    anomalies["summary"]["total_anomalies"] += len(
                                        all_pattern_anomalies
                                    )
                                    anomalies["summary"]["affected_rows"].update(all_pattern_anomalies)
                                    if col not in anomalies["summary"]["affected_columns"]:
                                        anomalies["summary"]["affected_columns"].append(col)
    
                if pattern_anomalies:
                    anomalies["by_method"]["pattern"] = pattern_anomalies
    
            # Missing value anomalies
            if "missing" in methods:
                missing_anomalies = {}
    
                for col in target_cols:
                    null_mask = df[col].isna()
                    null_count = null_mask.sum()
    
                    if null_count > 0:
                        null_ratio = null_count / len(df)
    
                        # Check for suspicious missing patterns
                        if 0 < null_ratio < 0.5:  # Partially missing
                            # Check if missing values are clustered
                            null_indices = df.index[null_mask].tolist()
    
                            # Check for sequential missing values
                            sequential_missing = []
                            if len(null_indices) > 1:
                                for i in range(len(null_indices) - 1):
                                    if null_indices[i + 1] - null_indices[i] == 1:
                                        if (
                                            not sequential_missing
                                            or null_indices[i] - sequential_missing[-1][-1] == 1
                                        ):
                                            if sequential_missing:
                                                sequential_missing[-1].append(null_indices[i + 1])
                                            else:
                                                sequential_missing.append(
                                                    [null_indices[i], null_indices[i + 1]]
                                                )
    
                            # Flag as anomaly if there are suspicious patterns
                            is_anomaly = (
                                len(sequential_missing) > 0
                                and len(sequential_missing) > len(null_indices) * 0.3
                            )
    
                            if is_anomaly or (null_ratio > 0.1 and null_ratio < 0.3):
                                missing_anomalies[col] = {
                                    "missing_count": int(null_count),
                                    "missing_ratio": round(null_ratio, 4),
                                    "missing_indices": null_indices[:100],
                                    "sequential_clusters": len(sequential_missing),
                                    "pattern": "clustered" if sequential_missing else "random",
                                }
    
                                anomalies["summary"]["affected_columns"].append(col)
    
                if missing_anomalies:
                    anomalies["by_method"]["missing"] = missing_anomalies
    
            # Organize anomalies by column
            for method_name, method_anomalies in anomalies["by_method"].items():
                for col, col_anomalies in method_anomalies.items():
                    if col not in anomalies["by_column"]:
                        anomalies["by_column"][col] = {}
                    anomalies["by_column"][col][method_name] = col_anomalies
    
            # Convert set to list for JSON serialization
            anomalies["summary"]["affected_rows"] = list(anomalies["summary"]["affected_rows"])[:1000]
            anomalies["summary"]["affected_columns"] = list(
                set(anomalies["summary"]["affected_columns"])
            )
    
            # Calculate anomaly score
            total_cells = len(df) * len(target_cols)
            anomaly_cells = len(anomalies["summary"]["affected_rows"]) * len(
                anomalies["summary"]["affected_columns"]
            )
            anomaly_score = min(anomaly_cells / total_cells, 1.0) * 100
    
            anomalies["summary"]["anomaly_score"] = round(anomaly_score, 2)
            anomalies["summary"]["severity"] = (
                "high" if anomaly_score > 10 else "medium" if anomaly_score > 5 else "low"
            )
    
            session.record_operation(
                OperationType.ANOMALY_DETECTION,
                {
                    "methods": methods,
                    "sensitivity": sensitivity,
                    "anomalies_found": anomalies["summary"]["total_anomalies"],
                },
            )
    
            return {
                "success": True,
                "anomalies": anomalies,
                "columns_analyzed": target_cols,
                "methods_used": methods,
                "sensitivity": sensitivity,
            }
    
        except Exception as e:
            logger.error(f"Error finding anomalies: {e!s}")
            return {"success": False, "error": str(e)}
  • MCP tool registration for find_anomalies. Decorated with @mcp.tool, it's a thin wrapper that calls _find_anomalies from the validation module.
    @mcp.tool
    async def find_anomalies(
        session_id: str,
        columns: list[str] | None = None,
        sensitivity: float = 0.95,
        methods: list[str] | None = None,
        ctx: Context = None,
    ) -> dict[str, Any]:
        """Find anomalies in the data using multiple detection methods."""
        return await _find_anomalies(session_id, columns, sensitivity, methods, ctx)
  • Lists find_anomalies under the 'data_validation' tool category in the server's tool listing metadata.
    "data_validation": ["validate_schema", "check_data_quality", "find_anomalies"],
  • Import of find_anomalies from src/csv_editor/tools/validation module, aliased as _find_anomalies.
    from .tools.validation import find_anomalies as _find_anomalies
  • Integration test calling find_anomalies(session_id, columns=['salary']) and printing total anomalies and their types.
    # Find anomalies
    result = await find_anomalies(session_id=session_id, columns=["salary"])
    if result["success"]:
        total_anomalies = result["summary"]["total_anomalies"]
        print_success(f"Found {total_anomalies} anomaly(ies)")
        if total_anomalies > 0:
            print_info("Anomaly types:")
            for atype, count in result["summary"]["by_type"].items():
                print(f"    {atype}: {count}")
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description must carry the full burden. It mentions 'using multiple detection methods' but does not disclose what those methods are, whether data is modified, or the nature of the output (output schema exists but is not referenced).

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, concise sentence that front-loads the verb and resource. It avoids wordiness, making it easy to parse quickly.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Despite having an output schema, the description does not mention return values or structure. With 4 parameters and no schema descriptions, the description is too sparse to fully guide an agent. Important context like available detection methods and sensitivity meaning is missing.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, and the description adds no parameter-level information. Parameters like 'sensitivity', 'methods', and 'columns' are completely unexplained. The phrase 'multiple detection methods' hints at the 'methods' parameter but is insufficient.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the verb 'find' and resource 'anomalies in the data', which gives a good sense of the tool's purpose. However, it does not differentiate from the sibling tool 'detect_outliers', which likely has a similar function.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance on when to use this tool versus alternatives like 'detect_outliers'. There is no mention of prerequisites, limitations, or typical use cases. The description is entirely generic.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server