check_data_quality
Check data quality in CSV files by applying custom or predefined validation rules to identify issues.
Instructions
Check data quality based on predefined or custom rules.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| session_id | Yes | ||
| rules | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- Core implementation of check_data_quality. Accepts session_id, optional custom rules, and a FastMCP context. Runs quality checks (completeness, duplicates, uniqueness, data types, outliers, consistency) against a DataFrame, returns overall_score, quality_level, checks, issues, and recommendations.
async def check_data_quality( session_id: str, rules: list[dict[str, Any]] | None = None, ctx: Context = None ) -> dict[str, Any]: """ Check data quality based on predefined or custom rules. Args: session_id: Session identifier rules: Custom quality rules to check. If None, uses default rules. Example: [ {"type": "completeness", "threshold": 0.95}, {"type": "uniqueness", "column": "id"}, {"type": "consistency", "columns": ["start_date", "end_date"]} ] ctx: FastMCP context Returns: Dict with quality check results """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df quality_results = { "overall_score": 100.0, "checks": [], "issues": [], "recommendations": [], } # Default rules if none provided if not rules: rules = [ {"type": "completeness", "threshold": 0.95}, {"type": "duplicates", "threshold": 0.01}, {"type": "data_types"}, {"type": "outliers", "threshold": 0.05}, {"type": "consistency"}, ] total_score = 0 score_count = 0 for rule in rules: rule_type = rule.get("type") if rule_type == "completeness": # Check data completeness threshold = rule.get("threshold", 0.95) columns = rule.get("columns", df.columns.tolist()) for col in columns: if col in df.columns: completeness = 1 - (df[col].isna().sum() / len(df)) passed = completeness >= threshold score = completeness * 100 quality_results["checks"].append( { "type": "completeness", "column": col, "completeness": round(completeness, 4), "threshold": threshold, "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "incomplete_data", "column": col, "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete", "severity": "high" if completeness < 0.5 else "medium", } ) total_score += score score_count += 1 elif rule_type == "duplicates": # Check for duplicate rows threshold = rule.get("threshold", 0.01) subset = rule.get("columns") duplicates = df.duplicated(subset=subset) duplicate_ratio = duplicates.sum() / len(df) passed = duplicate_ratio <= threshold score = (1 - duplicate_ratio) * 100 quality_results["checks"].append( { "type": "duplicates", "duplicate_rows": int(duplicates.sum()), "duplicate_ratio": round(duplicate_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "duplicate_rows", "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)", "severity": "high" if duplicate_ratio > 0.1 else "medium", } ) quality_results["recommendations"].append( "Consider removing duplicate rows using the remove_duplicates tool" ) total_score += score score_count += 1 elif rule_type == "uniqueness": # Check column uniqueness column = rule.get("column") if column and column in df.columns: unique_ratio = df[column].nunique() / len(df) expected_unique = rule.get("expected_unique", True) if expected_unique: passed = unique_ratio >= 0.99 score = unique_ratio * 100 else: passed = True score = 100 quality_results["checks"].append( { "type": "uniqueness", "column": column, "unique_values": int(df[column].nunique()), "unique_ratio": round(unique_ratio, 4), "passed": passed, "score": round(score, 2), } ) if not passed and expected_unique: quality_results["issues"].append( { "type": "non_unique_values", "column": column, "message": f"Column '{column}' expected to be unique but has duplicates", "severity": "high", } ) total_score += score score_count += 1 elif rule_type == "data_types": # Check data type consistency for col in df.columns: col_data = df[col].dropna() if len(col_data) > 0: # Check for mixed types types = col_data.apply(type).unique() mixed_types = len(types) > 1 # Check for numeric strings if col_data.dtype == object: numeric_strings = col_data.astype(str).str.match(r"^-?\d+\.?\d*$").sum() numeric_ratio = numeric_strings / len(col_data) else: numeric_ratio = 0 score = 100 if not mixed_types else 50 quality_results["checks"].append( { "type": "data_type_consistency", "column": col, "dtype": str(df[col].dtype), "mixed_types": mixed_types, "numeric_strings": numeric_ratio > 0.9, "score": score, } ) if numeric_ratio > 0.9: quality_results["recommendations"].append( f"Column '{col}' appears to contain numeric data stored as strings. " f"Consider converting to numeric type using change_column_type tool" ) total_score += score score_count += 1 elif rule_type == "outliers": # Check for outliers in numeric columns threshold = rule.get("threshold", 0.05) numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum() outlier_ratio = outliers / len(df) passed = outlier_ratio <= threshold score = (1 - min(outlier_ratio, 1)) * 100 quality_results["checks"].append( { "type": "outliers", "column": col, "outlier_count": int(outliers), "outlier_ratio": round(outlier_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "outliers", "column": col, "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)", "severity": "medium", } ) total_score += score score_count += 1 elif rule_type == "consistency": # Check data consistency columns = rule.get("columns", []) # Date consistency check date_cols = df.select_dtypes(include=["datetime64"]).columns if len(date_cols) >= 2 and not columns: columns = date_cols.tolist() if len(columns) >= 2: col1, col2 = columns[0], columns[1] if col1 in df.columns and col2 in df.columns: # Check if col1 should be before col2 (e.g., start_date < end_date) if pd.api.types.is_datetime64_any_dtype( df[col1] ) and pd.api.types.is_datetime64_any_dtype(df[col2]): inconsistent = (df[col1] > df[col2]).sum() consistency_ratio = 1 - (inconsistent / len(df)) passed = consistency_ratio >= 0.99 score = consistency_ratio * 100 quality_results["checks"].append( { "type": "consistency", "columns": [col1, col2], "consistent_rows": len(df) - inconsistent, "inconsistent_rows": int(inconsistent), "consistency_ratio": round(consistency_ratio, 4), "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "data_inconsistency", "columns": [col1, col2], "message": f"Found {inconsistent} rows where {col1} > {col2}", "severity": "high", } ) total_score += score score_count += 1 # Calculate overall score if score_count > 0: quality_results["overall_score"] = round(total_score / score_count, 2) # Determine quality level overall_score = quality_results["overall_score"] if overall_score >= 95: quality_results["quality_level"] = "Excellent" elif overall_score >= 85: quality_results["quality_level"] = "Good" elif overall_score >= 70: quality_results["quality_level"] = "Fair" else: quality_results["quality_level"] = "Poor" # Add general recommendations if not quality_results["recommendations"]: if overall_score < 85: quality_results["recommendations"].append( "Consider running profile_data to get a comprehensive overview of data issues" ) session.record_operation( OperationType.QUALITY_CHECK, { "rules_count": len(rules), "overall_score": overall_score, "issues_count": len(quality_results["issues"]), }, ) return {"success": True, "quality_results": quality_results} except Exception as e: logger.error(f"Error checking data quality: {e!s}") return {"success": False, "error": str(e)} - src/csv_editor/server.py:391-396 (registration)MCP tool registration decorator (@mcp.tool) wrapping the handler. Exposes check_data_quality as a public tool that delegates to the implementation in validation.py.
@mcp.tool async def check_data_quality( session_id: str, rules: list[dict[str, Any]] | None = None, ctx: Context = None ) -> dict[str, Any]: """Check data quality based on predefined or custom rules.""" return await _check_data_quality(session_id, rules, ctx) - src/csv_editor/server.py:88-89 (registration)Tool listed under 'data_validation' capability in the server's capabilities dictionary.
"data_validation": ["validate_schema", "check_data_quality", "find_anomalies"], "session_management": ["multi_session_support", "session_isolation", "auto_cleanup"], - Uses get_session_manager() and session.record_operation() for session management and operation logging.
async def check_data_quality( session_id: str, rules: list[dict[str, Any]] | None = None, ctx: Context = None ) -> dict[str, Any]: """ Check data quality based on predefined or custom rules. Args: session_id: Session identifier rules: Custom quality rules to check. If None, uses default rules. Example: [ {"type": "completeness", "threshold": 0.95}, {"type": "uniqueness", "column": "id"}, {"type": "consistency", "columns": ["start_date", "end_date"]} ] ctx: FastMCP context Returns: Dict with quality check results """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df quality_results = { "overall_score": 100.0, "checks": [], "issues": [], "recommendations": [], } # Default rules if none provided if not rules: rules = [ {"type": "completeness", "threshold": 0.95}, {"type": "duplicates", "threshold": 0.01}, {"type": "data_types"}, {"type": "outliers", "threshold": 0.05}, {"type": "consistency"}, ] total_score = 0 score_count = 0 for rule in rules: rule_type = rule.get("type") if rule_type == "completeness": # Check data completeness threshold = rule.get("threshold", 0.95) columns = rule.get("columns", df.columns.tolist()) for col in columns: if col in df.columns: completeness = 1 - (df[col].isna().sum() / len(df)) passed = completeness >= threshold score = completeness * 100 quality_results["checks"].append( { "type": "completeness", "column": col, "completeness": round(completeness, 4), "threshold": threshold, "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "incomplete_data", "column": col, "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete", "severity": "high" if completeness < 0.5 else "medium", } ) total_score += score score_count += 1 elif rule_type == "duplicates": # Check for duplicate rows threshold = rule.get("threshold", 0.01) subset = rule.get("columns") duplicates = df.duplicated(subset=subset) duplicate_ratio = duplicates.sum() / len(df) passed = duplicate_ratio <= threshold score = (1 - duplicate_ratio) * 100 quality_results["checks"].append( { "type": "duplicates", "duplicate_rows": int(duplicates.sum()), "duplicate_ratio": round(duplicate_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "duplicate_rows", "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)", "severity": "high" if duplicate_ratio > 0.1 else "medium", } ) quality_results["recommendations"].append( "Consider removing duplicate rows using the remove_duplicates tool" ) total_score += score score_count += 1 elif rule_type == "uniqueness": # Check column uniqueness column = rule.get("column") if column and column in df.columns: unique_ratio = df[column].nunique() / len(df) expected_unique = rule.get("expected_unique", True) if expected_unique: passed = unique_ratio >= 0.99 score = unique_ratio * 100 else: passed = True score = 100 quality_results["checks"].append( { "type": "uniqueness", "column": column, "unique_values": int(df[column].nunique()), "unique_ratio": round(unique_ratio, 4), "passed": passed, "score": round(score, 2), } ) if not passed and expected_unique: quality_results["issues"].append( { "type": "non_unique_values", "column": column, "message": f"Column '{column}' expected to be unique but has duplicates", "severity": "high", } ) total_score += score score_count += 1 elif rule_type == "data_types": # Check data type consistency for col in df.columns: col_data = df[col].dropna() if len(col_data) > 0: # Check for mixed types types = col_data.apply(type).unique() mixed_types = len(types) > 1 # Check for numeric strings if col_data.dtype == object: numeric_strings = col_data.astype(str).str.match(r"^-?\d+\.?\d*$").sum() numeric_ratio = numeric_strings / len(col_data) else: numeric_ratio = 0 score = 100 if not mixed_types else 50 quality_results["checks"].append( { "type": "data_type_consistency", "column": col, "dtype": str(df[col].dtype), "mixed_types": mixed_types, "numeric_strings": numeric_ratio > 0.9, "score": score, } ) if numeric_ratio > 0.9: quality_results["recommendations"].append( f"Column '{col}' appears to contain numeric data stored as strings. " f"Consider converting to numeric type using change_column_type tool" ) total_score += score score_count += 1 elif rule_type == "outliers": # Check for outliers in numeric columns threshold = rule.get("threshold", 0.05) numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum() outlier_ratio = outliers / len(df) passed = outlier_ratio <= threshold score = (1 - min(outlier_ratio, 1)) * 100 quality_results["checks"].append( { "type": "outliers", "column": col, "outlier_count": int(outliers), "outlier_ratio": round(outlier_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "outliers", "column": col, "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)", "severity": "medium", } ) total_score += score score_count += 1 elif rule_type == "consistency": # Check data consistency columns = rule.get("columns", []) # Date consistency check date_cols = df.select_dtypes(include=["datetime64"]).columns if len(date_cols) >= 2 and not columns: columns = date_cols.tolist() if len(columns) >= 2: col1, col2 = columns[0], columns[1] if col1 in df.columns and col2 in df.columns: # Check if col1 should be before col2 (e.g., start_date < end_date) if pd.api.types.is_datetime64_any_dtype( df[col1] ) and pd.api.types.is_datetime64_any_dtype(df[col2]): inconsistent = (df[col1] > df[col2]).sum() consistency_ratio = 1 - (inconsistent / len(df)) passed = consistency_ratio >= 0.99 score = consistency_ratio * 100 quality_results["checks"].append( { "type": "consistency", "columns": [col1, col2], "consistent_rows": len(df) - inconsistent, "inconsistent_rows": int(inconsistent), "consistency_ratio": round(consistency_ratio, 4), "passed": passed, "score": round(score, 2), } ) if not passed: quality_results["issues"].append( { "type": "data_inconsistency", "columns": [col1, col2], "message": f"Found {inconsistent} rows where {col1} > {col2}", "severity": "high", } ) total_score += score score_count += 1 # Calculate overall score if score_count > 0: quality_results["overall_score"] = round(total_score / score_count, 2) # Determine quality level overall_score = quality_results["overall_score"] if overall_score >= 95: quality_results["quality_level"] = "Excellent" elif overall_score >= 85: quality_results["quality_level"] = "Good" elif overall_score >= 70: quality_results["quality_level"] = "Fair" else: quality_results["quality_level"] = "Poor" # Add general recommendations if not quality_results["recommendations"]: if overall_score < 85: quality_results["recommendations"].append( "Consider running profile_data to get a comprehensive overview of data issues" ) session.record_operation( OperationType.QUALITY_CHECK, { "rules_count": len(rules), "overall_score": overall_score, "issues_count": len(quality_results["issues"]), }, ) return {"success": True, "quality_results": quality_results} except Exception as e: logger.error(f"Error checking data quality: {e!s}") return {"success": False, "error": str(e)}