find_anomalies
Find anomalies in CSV data by applying multiple detection methods. Customize sensitivity and target columns for outlier identification.
Instructions
Find anomalies in the data using multiple detection methods.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| session_id | Yes | ||
| columns | No | ||
| sensitivity | No | ||
| methods | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- Core implementation of the find_anomalies tool. Uses three detection methods: (1) statistical (Z-score + IQR for numeric columns), (2) pattern (rare values + format casing anomalies for string columns), and (3) missing values (clustered/sequential null patterns). Returns anomaly summary with affected rows, columns, severity score, and per-column/per-method breakdowns.
async def find_anomalies( session_id: str, columns: list[str] | None = None, sensitivity: float = 0.95, methods: list[str] | None = None, ctx: Context = None, ) -> dict[str, Any]: """ Find anomalies in the data using multiple detection methods. Args: session_id: Session identifier columns: Columns to check (None for all) sensitivity: Detection sensitivity (0.0 to 1.0, higher = more sensitive) methods: Detection methods to use (default: ["statistical", "pattern"]) ctx: FastMCP context Returns: Dict with anomaly detection results """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df if columns: missing_cols = [col for col in columns if col not in df.columns] if missing_cols: return {"success": False, "error": f"Columns not found: {missing_cols}"} target_cols = columns else: target_cols = df.columns.tolist() if not methods: methods = ["statistical", "pattern", "missing"] anomalies = { "summary": {"total_anomalies": 0, "affected_rows": set(), "affected_columns": []}, "by_column": {}, "by_method": {}, } # Statistical anomalies (outliers) if "statistical" in methods: numeric_cols = df[target_cols].select_dtypes(include=[np.number]).columns statistical_anomalies = {} for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 0: # Z-score method z_scores = np.abs((col_data - col_data.mean()) / col_data.std()) z_threshold = 3 * ( 1 - sensitivity + 0.5 ) # Adjust threshold based on sensitivity z_anomalies = df.index[z_scores > z_threshold].tolist() # IQR method Q1 = col_data.quantile(0.25) Q3 = col_data.quantile(0.75) IQR = Q3 - Q1 iqr_factor = 1.5 * (2 - sensitivity) # Adjust factor based on sensitivity lower = Q1 - iqr_factor * IQR upper = Q3 + iqr_factor * IQR iqr_anomalies = df.index[(df[col] < lower) | (df[col] > upper)].tolist() # Combine both methods combined_anomalies = list(set(z_anomalies) | set(iqr_anomalies)) if combined_anomalies: statistical_anomalies[col] = { "anomaly_count": len(combined_anomalies), "anomaly_indices": combined_anomalies[:100], "anomaly_values": df.loc[combined_anomalies[:10], col].tolist(), "mean": float(col_data.mean()), "std": float(col_data.std()), "lower_bound": float(lower), "upper_bound": float(upper), } anomalies["summary"]["total_anomalies"] += len(combined_anomalies) anomalies["summary"]["affected_rows"].update(combined_anomalies) anomalies["summary"]["affected_columns"].append(col) if statistical_anomalies: anomalies["by_method"]["statistical"] = statistical_anomalies # Pattern anomalies if "pattern" in methods: pattern_anomalies = {} for col in target_cols: if df[col].dtype == object or pd.api.types.is_string_dtype(df[col]): col_data = df[col].dropna() if len(col_data) > 0: # Detect unusual patterns value_counts = col_data.value_counts() total_count = len(col_data) # Find rare values (appearing less than threshold) threshold = (1 - sensitivity) * 0.01 # Adjust threshold rare_values = value_counts[value_counts / total_count < threshold] if len(rare_values) > 0: rare_indices = df[df[col].isin(rare_values.index)].index.tolist() # Check for format anomalies (e.g., different case, special characters) common_pattern = None if len(value_counts) > 10: # Detect common pattern from frequent values top_values = value_counts.head(10).index # Check if most values are uppercase/lowercase upper_count = sum(1 for v in top_values if str(v).isupper()) lower_count = sum(1 for v in top_values if str(v).islower()) if upper_count > 7: common_pattern = "uppercase" elif lower_count > 7: common_pattern = "lowercase" format_anomalies = [] if common_pattern: for idx, val in col_data.items(): if ( common_pattern == "uppercase" and not str(val).isupper() ) or (common_pattern == "lowercase" and not str(val).islower()): format_anomalies.append(idx) all_pattern_anomalies = list(set(rare_indices + format_anomalies)) if all_pattern_anomalies: pattern_anomalies[col] = { "anomaly_count": len(all_pattern_anomalies), "rare_values": rare_values.head(10).to_dict(), "anomaly_indices": all_pattern_anomalies[:100], "common_pattern": common_pattern, } anomalies["summary"]["total_anomalies"] += len( all_pattern_anomalies ) anomalies["summary"]["affected_rows"].update(all_pattern_anomalies) if col not in anomalies["summary"]["affected_columns"]: anomalies["summary"]["affected_columns"].append(col) if pattern_anomalies: anomalies["by_method"]["pattern"] = pattern_anomalies # Missing value anomalies if "missing" in methods: missing_anomalies = {} for col in target_cols: null_mask = df[col].isna() null_count = null_mask.sum() if null_count > 0: null_ratio = null_count / len(df) # Check for suspicious missing patterns if 0 < null_ratio < 0.5: # Partially missing # Check if missing values are clustered null_indices = df.index[null_mask].tolist() # Check for sequential missing values sequential_missing = [] if len(null_indices) > 1: for i in range(len(null_indices) - 1): if null_indices[i + 1] - null_indices[i] == 1: if ( not sequential_missing or null_indices[i] - sequential_missing[-1][-1] == 1 ): if sequential_missing: sequential_missing[-1].append(null_indices[i + 1]) else: sequential_missing.append( [null_indices[i], null_indices[i + 1]] ) # Flag as anomaly if there are suspicious patterns is_anomaly = ( len(sequential_missing) > 0 and len(sequential_missing) > len(null_indices) * 0.3 ) if is_anomaly or (null_ratio > 0.1 and null_ratio < 0.3): missing_anomalies[col] = { "missing_count": int(null_count), "missing_ratio": round(null_ratio, 4), "missing_indices": null_indices[:100], "sequential_clusters": len(sequential_missing), "pattern": "clustered" if sequential_missing else "random", } anomalies["summary"]["affected_columns"].append(col) if missing_anomalies: anomalies["by_method"]["missing"] = missing_anomalies # Organize anomalies by column for method_name, method_anomalies in anomalies["by_method"].items(): for col, col_anomalies in method_anomalies.items(): if col not in anomalies["by_column"]: anomalies["by_column"][col] = {} anomalies["by_column"][col][method_name] = col_anomalies # Convert set to list for JSON serialization anomalies["summary"]["affected_rows"] = list(anomalies["summary"]["affected_rows"])[:1000] anomalies["summary"]["affected_columns"] = list( set(anomalies["summary"]["affected_columns"]) ) # Calculate anomaly score total_cells = len(df) * len(target_cols) anomaly_cells = len(anomalies["summary"]["affected_rows"]) * len( anomalies["summary"]["affected_columns"] ) anomaly_score = min(anomaly_cells / total_cells, 1.0) * 100 anomalies["summary"]["anomaly_score"] = round(anomaly_score, 2) anomalies["summary"]["severity"] = ( "high" if anomaly_score > 10 else "medium" if anomaly_score > 5 else "low" ) session.record_operation( OperationType.ANOMALY_DETECTION, { "methods": methods, "sensitivity": sensitivity, "anomalies_found": anomalies["summary"]["total_anomalies"], }, ) return { "success": True, "anomalies": anomalies, "columns_analyzed": target_cols, "methods_used": methods, "sensitivity": sensitivity, } except Exception as e: logger.error(f"Error finding anomalies: {e!s}") return {"success": False, "error": str(e)} - src/csv_editor/server.py:399-408 (registration)MCP tool registration for find_anomalies. Decorated with @mcp.tool, it's a thin wrapper that calls _find_anomalies from the validation module.
@mcp.tool async def find_anomalies( session_id: str, columns: list[str] | None = None, sensitivity: float = 0.95, methods: list[str] | None = None, ctx: Context = None, ) -> dict[str, Any]: """Find anomalies in the data using multiple detection methods.""" return await _find_anomalies(session_id, columns, sensitivity, methods, ctx) - src/csv_editor/server.py:88-88 (registration)Lists find_anomalies under the 'data_validation' tool category in the server's tool listing metadata.
"data_validation": ["validate_schema", "check_data_quality", "find_anomalies"], - src/csv_editor/server.py:379-379 (registration)Import of find_anomalies from src/csv_editor/tools/validation module, aliased as _find_anomalies.
from .tools.validation import find_anomalies as _find_anomalies - tests/test_integration.py:284-292 (helper)Integration test calling find_anomalies(session_id, columns=['salary']) and printing total anomalies and their types.
# Find anomalies result = await find_anomalies(session_id=session_id, columns=["salary"]) if result["success"]: total_anomalies = result["summary"]["total_anomalies"] print_success(f"Found {total_anomalies} anomaly(ies)") if total_anomalies > 0: print_info("Anomaly types:") for atype, count in result["summary"]["by_type"].items(): print(f" {atype}: {count}")