detect_outliers
Identify statistical outliers in numeric CSV data columns using configurable detection methods to flag anomalies for data quality analysis.
Instructions
Detect outliers in numeric columns.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| session_id | Yes | ||
| columns | No | ||
| method | No | iqr | |
| threshold | No |
Implementation Reference
- src/csv_editor/server.py:370-379 (registration)Registration of the 'detect_outliers' tool using @mcp.tool decorator. This defines the tool interface and delegates to the implementation in analytics.py.@mcp.tool async def detect_outliers( session_id: str, columns: Optional[List[str]] = None, method: str = "iqr", threshold: float = 1.5, ctx: Context = None ) -> Dict[str, Any]: """Detect outliers in numeric columns.""" return await _detect_outliers(session_id, columns, method, threshold, ctx)
- Core implementation of the detect_outliers tool logic, handling IQR and Z-score methods for outlier detection in numeric columns of the CSV data.async def detect_outliers( session_id: str, columns: Optional[List[str]] = None, method: str = "iqr", threshold: float = 1.5, ctx: Context = None ) -> Dict[str, Any]: """ Detect outliers in numeric columns. Args: session_id: Session identifier columns: Columns to check (None for all numeric) method: Detection method ('iqr', 'zscore', 'isolation_forest') threshold: Threshold for outlier detection (1.5 for IQR, 3 for z-score) ctx: FastMCP context Returns: Dict with outlier information """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df # Select numeric columns if columns: missing_cols = [col for col in columns if col not in df.columns] if missing_cols: return {"success": False, "error": f"Columns not found: {missing_cols}"} numeric_df = df[columns].select_dtypes(include=[np.number]) else: numeric_df = df.select_dtypes(include=[np.number]) if numeric_df.empty: return {"success": False, "error": "No numeric columns found"} outliers = {} if method == "iqr": for col in numeric_df.columns: Q1 = numeric_df[col].quantile(0.25) Q3 = numeric_df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - threshold * IQR upper_bound = Q3 + threshold * IQR outlier_mask = (numeric_df[col] < lower_bound) | (numeric_df[col] > upper_bound) outlier_indices = df.index[outlier_mask].tolist() outliers[col] = { "method": "IQR", "lower_bound": float(lower_bound), "upper_bound": float(upper_bound), "outlier_count": len(outlier_indices), "outlier_percentage": round(len(outlier_indices) / len(df) * 100, 2), "outlier_indices": outlier_indices[:100], # Limit to first 100 "q1": float(Q1), "q3": float(Q3), "iqr": float(IQR) } elif method == "zscore": for col in numeric_df.columns: z_scores = np.abs((numeric_df[col] - numeric_df[col].mean()) / numeric_df[col].std()) outlier_mask = z_scores > threshold outlier_indices = df.index[outlier_mask].tolist() outliers[col] = { "method": "Z-Score", "threshold": threshold, "outlier_count": len(outlier_indices), "outlier_percentage": round(len(outlier_indices) / len(df) * 100, 2), "outlier_indices": outlier_indices[:100], # Limit to first 100 "mean": float(numeric_df[col].mean()), "std": float(numeric_df[col].std()) } else: return {"success": False, "error": f"Unknown method: {method}"} # Summary statistics total_outliers = sum(info["outlier_count"] for info in outliers.values()) session.record_operation(OperationType.ANALYZE, { "type": "outlier_detection", "method": method, "threshold": threshold, "columns": list(outliers.keys()) }) return { "success": True, "method": method, "threshold": threshold, "outliers": outliers, "total_outliers": total_outliers, "columns_analyzed": list(outliers.keys()) } except Exception as e: logger.error(f"Error detecting outliers: {str(e)}") return {"success": False, "error": str(e)}
- src/csv_editor/server.py:371-376 (schema)Input schema and parameters for the detect_outliers tool, defining expected arguments and their types/defaults.async def detect_outliers( session_id: str, columns: Optional[List[str]] = None, method: str = "iqr", threshold: float = 1.5, ctx: Context = None