check_data_quality
Evaluate CSV data quality by applying predefined or custom rules to ensure accuracy and consistency within the MCP server's CSV Editor.
Instructions
Check data quality based on predefined or custom rules.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| rules | No | ||
| session_id | Yes |
Implementation Reference
- Core implementation of the check_data_quality tool. Performs comprehensive data quality assessment including completeness, duplicates, uniqueness, data types consistency, outliers detection, and logical consistency checks across multiple columns. Supports custom rules or defaults. Computes overall quality score and provides issues/recommendations.async def check_data_quality( session_id: str, rules: Optional[List[Dict[str, Any]]] = None, ctx: Context = None ) -> Dict[str, Any]: """ Check data quality based on predefined or custom rules. Args: session_id: Session identifier rules: Custom quality rules to check. If None, uses default rules. Example: [ {"type": "completeness", "threshold": 0.95}, {"type": "uniqueness", "column": "id"}, {"type": "consistency", "columns": ["start_date", "end_date"]} ] ctx: FastMCP context Returns: Dict with quality check results """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df quality_results = { "overall_score": 100.0, "checks": [], "issues": [], "recommendations": [] } # Default rules if none provided if not rules: rules = [ {"type": "completeness", "threshold": 0.95}, {"type": "duplicates", "threshold": 0.01}, {"type": "data_types"}, {"type": "outliers", "threshold": 0.05}, {"type": "consistency"} ] total_score = 0 score_count = 0 for rule in rules: rule_type = rule.get("type") if rule_type == "completeness": # Check data completeness threshold = rule.get("threshold", 0.95) columns = rule.get("columns", df.columns.tolist()) for col in columns: if col in df.columns: completeness = 1 - (df[col].isna().sum() / len(df)) passed = completeness >= threshold score = completeness * 100 quality_results["checks"].append({ "type": "completeness", "column": col, "completeness": round(completeness, 4), "threshold": threshold, "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "incomplete_data", "column": col, "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete", "severity": "high" if completeness < 0.5 else "medium" }) total_score += score score_count += 1 elif rule_type == "duplicates": # Check for duplicate rows threshold = rule.get("threshold", 0.01) subset = rule.get("columns") duplicates = df.duplicated(subset=subset) duplicate_ratio = duplicates.sum() / len(df) passed = duplicate_ratio <= threshold score = (1 - duplicate_ratio) * 100 quality_results["checks"].append({ "type": "duplicates", "duplicate_rows": int(duplicates.sum()), "duplicate_ratio": round(duplicate_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "duplicate_rows", "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)", "severity": "high" if duplicate_ratio > 0.1 else "medium" }) quality_results["recommendations"].append( "Consider removing duplicate rows using the remove_duplicates tool" ) total_score += score score_count += 1 elif rule_type == "uniqueness": # Check column uniqueness column = rule.get("column") if column and column in df.columns: unique_ratio = df[column].nunique() / len(df) expected_unique = rule.get("expected_unique", True) if expected_unique: passed = unique_ratio >= 0.99 score = unique_ratio * 100 else: passed = True score = 100 quality_results["checks"].append({ "type": "uniqueness", "column": column, "unique_values": int(df[column].nunique()), "unique_ratio": round(unique_ratio, 4), "passed": passed, "score": round(score, 2) }) if not passed and expected_unique: quality_results["issues"].append({ "type": "non_unique_values", "column": column, "message": f"Column '{column}' expected to be unique but has duplicates", "severity": "high" }) total_score += score score_count += 1 elif rule_type == "data_types": # Check data type consistency for col in df.columns: col_data = df[col].dropna() if len(col_data) > 0: # Check for mixed types types = col_data.apply(type).unique() mixed_types = len(types) > 1 # Check for numeric strings if col_data.dtype == object: numeric_strings = col_data.astype(str).str.match(r'^-?\d+\.?\d*$').sum() numeric_ratio = numeric_strings / len(col_data) else: numeric_ratio = 0 score = 100 if not mixed_types else 50 quality_results["checks"].append({ "type": "data_type_consistency", "column": col, "dtype": str(df[col].dtype), "mixed_types": mixed_types, "numeric_strings": numeric_ratio > 0.9, "score": score }) if numeric_ratio > 0.9: quality_results["recommendations"].append( f"Column '{col}' appears to contain numeric data stored as strings. " f"Consider converting to numeric type using change_column_type tool" ) total_score += score score_count += 1 elif rule_type == "outliers": # Check for outliers in numeric columns threshold = rule.get("threshold", 0.05) numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum() outlier_ratio = outliers / len(df) passed = outlier_ratio <= threshold score = (1 - min(outlier_ratio, 1)) * 100 quality_results["checks"].append({ "type": "outliers", "column": col, "outlier_count": int(outliers), "outlier_ratio": round(outlier_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "outliers", "column": col, "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)", "severity": "medium" }) total_score += score score_count += 1 elif rule_type == "consistency": # Check data consistency columns = rule.get("columns", []) # Date consistency check date_cols = df.select_dtypes(include=['datetime64']).columns if len(date_cols) >= 2 and not columns: columns = date_cols.tolist() if len(columns) >= 2: col1, col2 = columns[0], columns[1] if col1 in df.columns and col2 in df.columns: # Check if col1 should be before col2 (e.g., start_date < end_date) if pd.api.types.is_datetime64_any_dtype(df[col1]) and pd.api.types.is_datetime64_any_dtype(df[col2]): inconsistent = (df[col1] > df[col2]).sum() consistency_ratio = 1 - (inconsistent / len(df)) passed = consistency_ratio >= 0.99 score = consistency_ratio * 100 quality_results["checks"].append({ "type": "consistency", "columns": [col1, col2], "consistent_rows": len(df) - inconsistent, "inconsistent_rows": int(inconsistent), "consistency_ratio": round(consistency_ratio, 4), "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "data_inconsistency", "columns": [col1, col2], "message": f"Found {inconsistent} rows where {col1} > {col2}", "severity": "high" }) total_score += score score_count += 1 # Calculate overall score if score_count > 0: quality_results["overall_score"] = round(total_score / score_count, 2) # Determine quality level overall_score = quality_results["overall_score"] if overall_score >= 95: quality_results["quality_level"] = "Excellent" elif overall_score >= 85: quality_results["quality_level"] = "Good" elif overall_score >= 70: quality_results["quality_level"] = "Fair" else: quality_results["quality_level"] = "Poor" # Add general recommendations if not quality_results["recommendations"]: if overall_score < 85: quality_results["recommendations"].append( "Consider running profile_data to get a comprehensive overview of data issues" ) session.record_operation(OperationType.QUALITY_CHECK, { "rules_count": len(rules), "overall_score": overall_score, "issues_count": len(quality_results["issues"]) }) return { "success": True, "quality_results": quality_results } except Exception as e: logger.error(f"Error checking data quality: {str(e)}") return {"success": False, "error": str(e)}
- src/csv_editor/server.py:410-417 (registration)MCP tool registration for check_data_quality using @mcp.tool decorator. This wrapper delegates to the actual implementation in tools/validation.py.@mcp.tool async def check_data_quality( session_id: str, rules: Optional[List[Dict[str, Any]]] = None, ctx: Context = None ) -> Dict[str, Any]: """Check data quality based on predefined or custom rules.""" return await _check_data_quality(session_id, rules, ctx)