find_anomalies
Detect data anomalies using statistical, pattern, or missing value methods to identify outliers and irregularities in datasets for quality assurance.
Instructions
Find anomalies in the data using multiple detection methods.
Returns: FindAnomaliesResult with comprehensive anomaly detection results
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| columns | No | List of columns to analyze (None = all columns) | |
| sensitivity | No | Sensitivity threshold for anomaly detection (0-1) | |
| methods | No | Detection methods to use (None = all methods) |
Implementation Reference
- The core handler function implementing the find_anomalies tool. Performs anomaly detection using statistical methods (Z-score and IQR outliers), pattern-based detection for categorical data, and missing value pattern analysis. Supports configurable columns, sensitivity, and methods.def find_anomalies( ctx: Annotated[Context, Field(description="FastMCP context for session access")], columns: Annotated[ list[str] | None, Field(description="List of columns to analyze (None = all columns)"), ] = None, sensitivity: Annotated[ float, Field(description="Sensitivity threshold for anomaly detection (0-1)"), ] = 0.95, methods: Annotated[ list[Literal["statistical", "pattern", "missing"]] | None, Field(description="Detection methods to use (None = all methods)"), ] = None, ) -> FindAnomaliesResult: """Find anomalies in the data using multiple detection methods. Returns: FindAnomaliesResult with comprehensive anomaly detection results """ session_id = ctx.session_id _session, df = get_session_data(session_id) settings = get_settings() # Apply resource management for large datasets logger.info("Finding anomalies in %d rows, %d columns", len(df), len(df.columns)) if len(df) > settings.max_anomaly_sample_size: logger.warning( "Large dataset (%d rows), using sample of %d for anomaly detection", len(df), settings.max_anomaly_sample_size, ) df = sample_large_dataset(df, settings.max_anomaly_sample_size, "Anomaly detection") if columns: missing_cols = [col for col in columns if col not in df.columns] if missing_cols: # Raise error for first missing column raise ColumnNotFoundError(missing_cols[0], df.columns.tolist()) target_cols = columns else: target_cols = df.columns.tolist() if not methods: methods = ["statistical", "pattern", "missing"] # Track anomalies using proper data structures total_anomalies = 0 affected_rows: set[int] = set() affected_columns: list[str] = [] by_column: dict[str, StatisticalAnomaly | PatternAnomaly | MissingAnomaly] = {} by_method: dict[str, dict[str, StatisticalAnomaly | PatternAnomaly | MissingAnomaly]] = {} # Statistical anomalies (outliers) if "statistical" in methods: numeric_cols = df[target_cols].select_dtypes(include=[np.number]).columns statistical_anomalies: dict[str, StatisticalAnomaly] = {} for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 0: # Z-score method z_scores = np.abs((col_data - col_data.mean()) / col_data.std()) z_threshold = 3 * (1 - sensitivity + 0.5) # Adjust threshold based on sensitivity z_anomalies = col_data.index[z_scores > z_threshold].tolist() # IQR method q1 = col_data.quantile(0.25) q3 = col_data.quantile(0.75) iqr = q3 - q1 iqr_factor = 1.5 * (2 - sensitivity) # Adjust factor based on sensitivity lower = q1 - iqr_factor * iqr upper = q3 + iqr_factor * iqr iqr_anomalies = df.index[(df[col] < lower) | (df[col] > upper)].tolist() # Combine both methods combined_anomalies = list(set(z_anomalies) | set(iqr_anomalies)) if combined_anomalies: statistical_anomaly = StatisticalAnomaly( anomaly_count=len(combined_anomalies), anomaly_indices=combined_anomalies[:100], anomaly_values=[ float(v) for v in df.loc[combined_anomalies[:10], col].tolist() ], mean=float(col_data.mean()), std=float(col_data.std()), lower_bound=float(lower), upper_bound=float(upper), ) statistical_anomalies[col] = statistical_anomaly total_anomalies += len(combined_anomalies) affected_rows.update(combined_anomalies) affected_columns.append(col) if statistical_anomalies: # Type cast for mypy by_method["statistical"] = dict(statistical_anomalies.items()) # Pattern anomalies if "pattern" in methods: pattern_anomalies: dict[str, PatternAnomaly] = {} for col in target_cols: if df[col].dtype == object or pd.api.types.is_string_dtype(df[col]): col_data = df[col].dropna() if len(col_data) > 0: # Detect unusual patterns value_counts = col_data.value_counts() total_count = len(col_data) # Find rare values (appearing less than threshold) threshold = (1 - sensitivity) * 0.01 # Adjust threshold rare_values = value_counts[value_counts / total_count < threshold] if len(rare_values) > 0: rare_indices = df[df[col].isin(rare_values.index)].index.tolist() # Check for format anomalies (e.g., different case, special characters) common_pattern = None if len(value_counts) > settings.max_category_display: # Detect common pattern from frequent values top_values = value_counts.head(10).index # Check if most values are uppercase/lowercase upper_count = sum(1 for v in top_values if str(v).isupper()) lower_count = sum(1 for v in top_values if str(v).islower()) if upper_count > settings.min_length_threshold: common_pattern = "uppercase" elif lower_count > settings.min_length_threshold: common_pattern = "lowercase" format_anomalies = [] if common_pattern: for idx, val in col_data.items(): if (common_pattern == "uppercase" and not str(val).isupper()) or ( common_pattern == "lowercase" and not str(val).islower() ): format_anomalies.append(idx) all_pattern_anomalies = list(set(rare_indices + format_anomalies)) if all_pattern_anomalies: pattern_anomaly = PatternAnomaly( anomaly_count=len(all_pattern_anomalies), anomaly_indices=all_pattern_anomalies[:100], sample_values=[str(v) for v in rare_values.head(10).index.tolist()], expected_patterns=[common_pattern] if common_pattern else [], ) pattern_anomalies[col] = pattern_anomaly total_anomalies += len(all_pattern_anomalies) affected_rows.update(all_pattern_anomalies) if col not in affected_columns: affected_columns.append(col) if pattern_anomalies: # Type cast for mypy by_method["pattern"] = dict(pattern_anomalies.items()) # Missing value anomalies if "missing" in methods: missing_anomalies: dict[str, MissingAnomaly] = {} for col in target_cols: null_mask = df[col].isna() null_count = null_mask.sum() if null_count > 0: null_ratio = null_count / len(df) # Check for suspicious missing patterns if 0 < null_ratio < settings.data_completeness_threshold: # Partially missing # Check if missing values are clustered null_indices = df.index[null_mask].tolist() # Check for sequential missing values sequential_missing: list[list[int]] = [] if len(null_indices) > 1: for i in range(len(null_indices) - 1): if null_indices[i + 1] - null_indices[i] == 1 and ( not sequential_missing or null_indices[i] - sequential_missing[-1][-1] == 1 ): if sequential_missing: sequential_missing[-1].append(null_indices[i + 1]) else: sequential_missing.append( [null_indices[i], null_indices[i + 1]], ) # Flag as anomaly if there are suspicious patterns is_anomaly = ( len(sequential_missing) > 0 and len(sequential_missing) > len(null_indices) * 0.3 ) if is_anomaly or ( null_ratio > settings.outlier_detection_threshold and null_ratio < settings.correlation_threshold ): missing_anomaly = MissingAnomaly( missing_count=int(null_count), missing_ratio=round(null_ratio, 4), missing_indices=null_indices[:100], sequential_clusters=len(sequential_missing), pattern="clustered" if sequential_missing else "random", ) missing_anomalies[col] = missing_anomaly if col not in affected_columns: affected_columns.append(col) if missing_anomalies: # Type cast for mypy by_method["missing"] = dict(missing_anomalies.items()) # Organize anomalies by column for method_anomalies in by_method.values(): for col, col_anomalies in method_anomalies.items(): if col not in by_column: by_column[col] = col_anomalies # Note: For simplicity, we're taking the first anomaly type per column # In practice, you might want to combine multiple anomaly types # Create summary affected_rows_list = list(affected_rows)[:1000] # Limit for performance unique_affected_columns = list(set(affected_columns)) summary = AnomalySummary( total_anomalies=total_anomalies, affected_rows=len(affected_rows_list), affected_columns=unique_affected_columns, ) # Create final results anomaly_results = AnomalyResults( summary=summary, by_column=by_column, by_method=by_method, ) # No longer recording operations (simplified MCP architecture) return FindAnomaliesResult( anomalies=anomaly_results, columns_analyzed=target_cols, methods_used=[str(m) for m in methods], # Convert to list[str] for compatibility sensitivity=sensitivity, )
- src/databeak/servers/validation_server.py:1309-1309 (registration)Registration of the find_anomalies tool on the DataBeak-Validation FastMCP server instance.validation_server.tool(name="check_data_quality")(check_data_quality)
- Pydantic models defining the output schema for find_anomalies, including detailed anomaly types (statistical, pattern, missing), summaries, and per-column/method results. Input schema defined via Annotated function parameters.class StatisticalAnomaly(BaseModel): """Statistical anomaly detection result.""" anomaly_count: int = Field(description="Number of statistical anomalies detected") anomaly_indices: list[int] = Field(description="Row indices where anomalies were found") anomaly_values: list[float] = Field(description="Sample of anomalous values found") mean: float = Field(description="Mean value of the column") std: float = Field(description="Standard deviation of the column") lower_bound: float = Field(description="Lower bound for normal values") upper_bound: float = Field(description="Upper bound for normal values") class PatternAnomaly(BaseModel): """Pattern-based anomaly detection result.""" anomaly_count: int = Field(description="Number of pattern anomalies detected") anomaly_indices: list[int] = Field(description="Row indices where pattern anomalies were found") sample_values: list[str] = Field(description="Sample values that don't match expected patterns") expected_patterns: list[str] = Field(description="List of expected patterns that were violated") class MissingAnomaly(BaseModel): """Missing value anomaly detection result. Represents anomalies found in missing value patterns within a column. This includes both the quantity of missing values and their distribution patterns (clustered vs random), which can indicate data quality issues or systematic collection problems. Attributes: missing_count: Total number of missing/null values in the column missing_ratio: Proportion of missing values (0.0 to 1.0) missing_indices: Row indices where missing values occur (limited to first 100) sequential_clusters: Number of consecutive missing value sequences found pattern: Distribution pattern of missing values ('clustered' or 'random') """ missing_count: int = Field( description="Total number of missing/null values found in the column", ) missing_ratio: float = Field( description="Ratio of missing values to total values (0.0 to 1.0)", ge=0.0, le=1.0, ) missing_indices: list[int] = Field( description="Row indices where missing values occur (limited to first 100 for performance)", ) sequential_clusters: int = Field( description="Number of consecutive missing value sequences detected", ge=0, ) pattern: Literal["clustered", "random"] = Field( description="Distribution pattern of missing values ('clustered' or 'random')", ) class AnomalySummary(BaseModel): """Summary of anomaly detection results.""" total_anomalies: int = Field(description="Total number of anomalies found across all columns") affected_rows: int = Field(description="Number of rows containing at least one anomaly") affected_columns: list[str] = Field( description="Names of columns where anomalies were detected", ) class AnomalyResults(BaseModel): """Comprehensive anomaly detection results.""" model_config = ConfigDict(extra="forbid") summary: AnomalySummary = Field(description="Summary statistics of anomaly detection results") by_column: dict[str, StatisticalAnomaly | PatternAnomaly | MissingAnomaly] = Field( description="Anomalies organized by column name", ) by_method: dict[str, dict[str, StatisticalAnomaly | PatternAnomaly | MissingAnomaly]] = Field( description="Anomalies organized by detection method", ) class FindAnomaliesResult(BaseModel): """Response model for anomaly detection operations.""" anomalies: AnomalyResults = Field(description="Comprehensive anomaly detection results") columns_analyzed: list[str] = Field( description="Names of columns that were analyzed for anomalies", ) methods_used: list[str] = Field(description="Detection methods that were applied") sensitivity: float = Field(description="Sensitivity threshold used for detection (0.0-1.0)")
- src/databeak/servers/system_server.py:234-234 (registration)The find_anomalies tool is listed in the server's capabilities under data_validation category in get_server_info."find_anomalies",