Skip to main content
Glama

check_data_quality

Validate CSV data quality by applying predefined or custom rules to identify errors, inconsistencies, and compliance issues in your datasets.

Instructions

Check data quality based on predefined or custom rules.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
rulesNo

Implementation Reference

  • Core handler function implementing the check_data_quality tool. Performs comprehensive data quality checks including completeness, duplicates, uniqueness, data types, outliers, and consistency based on provided or default rules. Computes overall quality score and provides issues and recommendations.
    async def check_data_quality( session_id: str, rules: Optional[List[Dict[str, Any]]] = None, ctx: Context = None ) -> Dict[str, Any]: """ Check data quality based on predefined or custom rules. Args: session_id: Session identifier rules: Custom quality rules to check. If None, uses default rules. Example: [ {"type": "completeness", "threshold": 0.95}, {"type": "uniqueness", "column": "id"}, {"type": "consistency", "columns": ["start_date", "end_date"]} ] ctx: FastMCP context Returns: Dict with quality check results """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df quality_results = { "overall_score": 100.0, "checks": [], "issues": [], "recommendations": [] } # Default rules if none provided if not rules: rules = [ {"type": "completeness", "threshold": 0.95}, {"type": "duplicates", "threshold": 0.01}, {"type": "data_types"}, {"type": "outliers", "threshold": 0.05}, {"type": "consistency"} ] total_score = 0 score_count = 0 for rule in rules: rule_type = rule.get("type") if rule_type == "completeness": # Check data completeness threshold = rule.get("threshold", 0.95) columns = rule.get("columns", df.columns.tolist()) for col in columns: if col in df.columns: completeness = 1 - (df[col].isna().sum() / len(df)) passed = completeness >= threshold score = completeness * 100 quality_results["checks"].append({ "type": "completeness", "column": col, "completeness": round(completeness, 4), "threshold": threshold, "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "incomplete_data", "column": col, "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete", "severity": "high" if completeness < 0.5 else "medium" }) total_score += score score_count += 1 elif rule_type == "duplicates": # Check for duplicate rows threshold = rule.get("threshold", 0.01) subset = rule.get("columns") duplicates = df.duplicated(subset=subset) duplicate_ratio = duplicates.sum() / len(df) passed = duplicate_ratio <= threshold score = (1 - duplicate_ratio) * 100 quality_results["checks"].append({ "type": "duplicates", "duplicate_rows": int(duplicates.sum()), "duplicate_ratio": round(duplicate_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "duplicate_rows", "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)", "severity": "high" if duplicate_ratio > 0.1 else "medium" }) quality_results["recommendations"].append( "Consider removing duplicate rows using the remove_duplicates tool" ) total_score += score score_count += 1 elif rule_type == "uniqueness": # Check column uniqueness column = rule.get("column") if column and column in df.columns: unique_ratio = df[column].nunique() / len(df) expected_unique = rule.get("expected_unique", True) if expected_unique: passed = unique_ratio >= 0.99 score = unique_ratio * 100 else: passed = True score = 100 quality_results["checks"].append({ "type": "uniqueness", "column": column, "unique_values": int(df[column].nunique()), "unique_ratio": round(unique_ratio, 4), "passed": passed, "score": round(score, 2) }) if not passed and expected_unique: quality_results["issues"].append({ "type": "non_unique_values", "column": column, "message": f"Column '{column}' expected to be unique but has duplicates", "severity": "high" }) total_score += score score_count += 1 elif rule_type == "data_types": # Check data type consistency for col in df.columns: col_data = df[col].dropna() if len(col_data) > 0: # Check for mixed types types = col_data.apply(type).unique() mixed_types = len(types) > 1 # Check for numeric strings if col_data.dtype == object: numeric_strings = col_data.astype(str).str.match(r'^-?\d+\.?\d*$').sum() numeric_ratio = numeric_strings / len(col_data) else: numeric_ratio = 0 score = 100 if not mixed_types else 50 quality_results["checks"].append({ "type": "data_type_consistency", "column": col, "dtype": str(df[col].dtype), "mixed_types": mixed_types, "numeric_strings": numeric_ratio > 0.9, "score": score }) if numeric_ratio > 0.9: quality_results["recommendations"].append( f"Column '{col}' appears to contain numeric data stored as strings. " f"Consider converting to numeric type using change_column_type tool" ) total_score += score score_count += 1 elif rule_type == "outliers": # Check for outliers in numeric columns threshold = rule.get("threshold", 0.05) numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum() outlier_ratio = outliers / len(df) passed = outlier_ratio <= threshold score = (1 - min(outlier_ratio, 1)) * 100 quality_results["checks"].append({ "type": "outliers", "column": col, "outlier_count": int(outliers), "outlier_ratio": round(outlier_ratio, 4), "threshold": threshold, "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "outliers", "column": col, "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)", "severity": "medium" }) total_score += score score_count += 1 elif rule_type == "consistency": # Check data consistency columns = rule.get("columns", []) # Date consistency check date_cols = df.select_dtypes(include=['datetime64']).columns if len(date_cols) >= 2 and not columns: columns = date_cols.tolist() if len(columns) >= 2: col1, col2 = columns[0], columns[1] if col1 in df.columns and col2 in df.columns: # Check if col1 should be before col2 (e.g., start_date < end_date) if pd.api.types.is_datetime64_any_dtype(df[col1]) and pd.api.types.is_datetime64_any_dtype(df[col2]): inconsistent = (df[col1] > df[col2]).sum() consistency_ratio = 1 - (inconsistent / len(df)) passed = consistency_ratio >= 0.99 score = consistency_ratio * 100 quality_results["checks"].append({ "type": "consistency", "columns": [col1, col2], "consistent_rows": len(df) - inconsistent, "inconsistent_rows": int(inconsistent), "consistency_ratio": round(consistency_ratio, 4), "passed": passed, "score": round(score, 2) }) if not passed: quality_results["issues"].append({ "type": "data_inconsistency", "columns": [col1, col2], "message": f"Found {inconsistent} rows where {col1} > {col2}", "severity": "high" }) total_score += score score_count += 1 # Calculate overall score if score_count > 0: quality_results["overall_score"] = round(total_score / score_count, 2) # Determine quality level overall_score = quality_results["overall_score"] if overall_score >= 95: quality_results["quality_level"] = "Excellent" elif overall_score >= 85: quality_results["quality_level"] = "Good" elif overall_score >= 70: quality_results["quality_level"] = "Fair" else: quality_results["quality_level"] = "Poor" # Add general recommendations if not quality_results["recommendations"]: if overall_score < 85: quality_results["recommendations"].append( "Consider running profile_data to get a comprehensive overview of data issues" ) session.record_operation(OperationType.QUALITY_CHECK, { "rules_count": len(rules), "overall_score": overall_score, "issues_count": len(quality_results["issues"]) }) return { "success": True, "quality_results": quality_results } except Exception as e: logger.error(f"Error checking data quality: {str(e)}") return {"success": False, "error": str(e)}
  • MCP tool registration for check_data_quality. Imports the handler from validation.py as _check_data_quality and registers it using @mcp.tool decorator with matching signature.
    async def check_data_quality( session_id: str, rules: Optional[List[Dict[str, Any]]] = None, ctx: Context = None ) -> Dict[str, Any]: """Check data quality based on predefined or custom rules.""" return await _check_data_quality(session_id, rules, ctx)
  • Docstring in the handler provides detailed input schema description with examples of rules structure and expected return format.
    """ Check data quality based on predefined or custom rules. Args: session_id: Session identifier rules: Custom quality rules to check. If None, uses default rules. Example: [ {"type": "completeness", "threshold": 0.95}, {"type": "uniqueness", "column": "id"}, {"type": "consistency", "columns": ["start_date", "end_date"]} ] ctx: FastMCP context Returns: Dict with quality check results """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server