Skip to main content
Glama

check_data_quality

Validate CSV data quality by checking completeness, consistency, duplicates, uniqueness, data types, and outliers using predefined or custom rules.

Instructions

Check data quality based on predefined or custom rules.

Returns: DataQualityResult with comprehensive quality assessment results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
rulesNoList of quality rules to check (None = use default rules)

Implementation Reference

  • Main handler function implementing the check_data_quality tool. Loads session data, applies configurable quality rules (or defaults), computes scores, detects issues, and returns comprehensive results with recommendations.
    def check_data_quality(
        ctx: Annotated[Context, Field(description="FastMCP context for session access")],
        rules: Annotated[
            list[QualityRuleType] | None,
            Field(description="List of quality rules to check (None = use default rules)"),
        ] = None,
    ) -> DataQualityResult:
        """Check data quality based on predefined or custom rules.
    
        Returns:
            DataQualityResult with comprehensive quality assessment results
    
        """
        session_id = ctx.session_id
        _session, df = get_session_data(session_id)
        settings = get_settings()
        rule_results: list[QualityRuleResult] = []
        quality_issues: list[QualityIssue] = []
        recommendations: list[str] = []
    
        # Apply resource management for large datasets
        logger.info("Checking data quality for %d rows, %d columns", len(df), len(df.columns))
        if len(df) > settings.max_anomaly_sample_size:
            logger.warning(
                "Large dataset (%d rows), using sample of %d for quality check",
                len(df),
                settings.max_anomaly_sample_size,
            )
            df = sample_large_dataset(df, settings.max_anomaly_sample_size, "Data quality check")
    
        # Use default rules if none provided
        if rules is None:
            rules = [
                CompletenessRule(threshold=0.95),
                DuplicatesRule(threshold=0.01),
                DataTypesRule(),
                OutliersRule(threshold=0.05),
                ConsistencyRule(),
            ]
    
        total_score: float = 0
        score_count = 0
    
        for rule in rules:
            if isinstance(rule, CompletenessRule):
                # Check data completeness
                threshold = rule.threshold
                columns = rule.columns if rule.columns is not None else df.columns.tolist()
    
                for col in columns:
                    if col in df.columns:
                        completeness = 1 - (df[col].isna().sum() / len(df))
                        passed = completeness >= threshold
                        score = completeness * 100
    
                        # Create issue if failed
                        rule_issues = []
                        if not passed:
                            issue = QualityIssue(
                                type="incomplete_data",
                                severity="high"
                                if completeness < settings.data_completeness_threshold
                                else "medium",
                                column=col,
                                message=f"Column '{col}' is only {round(completeness * 100, 2)}% complete",
                                affected_rows=int(df[col].isna().sum()),
                                metric_value=completeness,
                                threshold=float(threshold),
                            )
                            rule_issues.append(issue)
                            quality_issues.append(issue)
    
                        # Add rule result
                        rule_results.append(
                            QualityRuleResult(
                                rule_type="completeness",
                                passed=passed,
                                score=round(score, 2),
                                issues=rule_issues,
                                column=col,
                            ),
                        )
    
                        total_score += score
                        score_count += 1
    
            elif isinstance(rule, DuplicatesRule):
                # Check for duplicate rows
                threshold = rule.threshold
                subset = rule.columns
    
                duplicates = df.duplicated(subset=subset)
                duplicate_ratio = duplicates.sum() / len(df)
                passed = duplicate_ratio <= threshold
                score = (1 - duplicate_ratio) * 100
    
                # Create issue if failed
                rule_issues = []
                if not passed:
                    issue = QualityIssue(
                        type="duplicate_rows",
                        severity="high"
                        if duplicate_ratio > settings.outlier_detection_threshold
                        else "medium",
                        message=f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio * 100, 2)}%)",
                        affected_rows=int(duplicates.sum()),
                        metric_value=duplicate_ratio,
                        threshold=float(threshold),
                    )
                    rule_issues.append(issue)
                    quality_issues.append(issue)
                    recommendations.append(
                        "Consider removing duplicate rows using the remove_duplicates tool",
                    )
    
                # Add rule result
                rule_results.append(
                    QualityRuleResult(
                        rule_type="duplicates",
                        passed=passed,
                        score=round(score, 2),
                        issues=rule_issues,
                    ),
                )
    
                total_score += score
                score_count += 1
    
            elif isinstance(rule, UniquenessRule):
                # Check column uniqueness
                column = rule.column
                if column in df.columns:
                    unique_ratio = df[column].nunique() / len(df)
                    expected_unique = rule.expected_unique
    
                    if expected_unique:
                        passed = unique_ratio >= settings.uniqueness_threshold
                        score = unique_ratio * 100
                    else:
                        passed = True
                        score = 100.0
    
                    # Create issue if failed
                    rule_issues = []
                    if not passed and expected_unique:
                        duplicate_count = len(df) - df[column].nunique()
                        issue = QualityIssue(
                            type="non_unique_values",
                            severity="high",
                            column=str(column),
                            message=f"Column '{column}' expected to be unique but has duplicates",
                            affected_rows=duplicate_count,
                            metric_value=unique_ratio,
                            threshold=settings.uniqueness_threshold,
                        )
                        rule_issues.append(issue)
                        quality_issues.append(issue)
    
                    # Add rule result
                    rule_results.append(
                        QualityRuleResult(
                            rule_type="uniqueness",
                            passed=passed,
                            score=round(score, 2),
                            issues=rule_issues,
                            column=str(column),
                        ),
                    )
    
                    total_score += score
                    score_count += 1
    
            elif isinstance(rule, DataTypesRule):
                # Check data type consistency
                for col in df.columns:
                    col_data = df[col].dropna()
                    if len(col_data) > 0:
                        # Check for mixed types
                        types = col_data.apply(lambda x: type(x).__name__).unique()
                        mixed_types = len(types) > 1
    
                        # Check for numeric strings
                        if col_data.dtype == object:
                            numeric_strings = col_data.astype(str).str.match(r"^-?\d+\.?\d*$").sum()
                            numeric_ratio = numeric_strings / len(col_data)
                        else:
                            numeric_ratio = 0
    
                        score = 100.0 if not mixed_types else 50.0
    
                        # Create recommendations for numeric strings
                        if numeric_ratio > settings.high_quality_threshold:
                            recommendations.append(
                                f"Column '{col}' appears to contain numeric data stored as strings. "
                                f"Consider converting to numeric type using change_column_type tool",
                            )
    
                        # Add rule result
                        rule_results.append(
                            QualityRuleResult(
                                rule_type="data_type_consistency",
                                passed=not mixed_types,
                                score=score,
                                issues=[],
                                column=col,
                            ),
                        )
    
                        total_score += score
                        score_count += 1
    
            elif isinstance(rule, OutliersRule):
                # Check for outliers in numeric columns
                threshold = rule.threshold
                numeric_cols = df.select_dtypes(include=[np.number]).columns
    
                for col in numeric_cols:
                    q1 = df[col].quantile(0.25)
                    q3 = df[col].quantile(0.75)
                    iqr = q3 - q1
    
                    lower_bound = q1 - 1.5 * iqr
                    upper_bound = q3 + 1.5 * iqr
    
                    outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()
                    outlier_ratio = outliers / len(df)
                    passed = outlier_ratio <= threshold
                    score = (1 - min(outlier_ratio, 1)) * 100
    
                    # Create issue if failed
                    rule_issues = []
                    if not passed:
                        issue = QualityIssue(
                            type="outliers",
                            severity="medium",
                            column=col,
                            message=f"Column '{col}' has {outliers} outliers ({round(outlier_ratio * 100, 2)}%)",
                            affected_rows=int(outliers),
                            metric_value=outlier_ratio,
                            threshold=float(threshold),
                        )
                        rule_issues.append(issue)
                        quality_issues.append(issue)
    
                    # Add rule result
                    rule_results.append(
                        QualityRuleResult(
                            rule_type="outliers",
                            passed=passed,
                            score=round(score, 2),
                            issues=rule_issues,
                            column=col,
                        ),
                    )
    
                    total_score += score
                    score_count += 1
    
            elif isinstance(rule, ConsistencyRule):
                # Check data consistency
                columns = rule.columns
    
                # Date consistency check
                date_cols = df.select_dtypes(include=["datetime64"]).columns
                if len(date_cols) >= settings.min_statistical_sample_size and not columns:
                    columns = date_cols.tolist()
    
                if len(columns) >= settings.min_statistical_sample_size:
                    col1, col2 = str(columns[0]), str(columns[1])
                    if (
                        col1 in df.columns
                        and col2 in df.columns
                        and pd.api.types.is_datetime64_any_dtype(df[col1])
                        and pd.api.types.is_datetime64_any_dtype(df[col2])
                    ):
                        inconsistent = (df[col1] > df[col2]).sum()
                        consistency_ratio = 1 - (inconsistent / len(df))
                        passed = consistency_ratio >= settings.uniqueness_threshold
                        score = consistency_ratio * 100
    
                        # Create issue if failed
                        rule_issues = []
                        if not passed:
                            issue = QualityIssue(
                                type="data_inconsistency",
                                severity="high",
                                message=f"Found {inconsistent} rows where {col1} > {col2}",
                                affected_rows=int(inconsistent),
                                metric_value=consistency_ratio,
                                threshold=settings.uniqueness_threshold,
                            )
                            rule_issues.append(issue)
                            quality_issues.append(issue)
    
                        # Add rule result
                        rule_results.append(
                            QualityRuleResult(
                                rule_type="consistency",
                                passed=passed,
                                score=round(score, 2),
                                issues=rule_issues,
                            ),
                        )
    
                        total_score += score
                        score_count += 1
    
        # Calculate overall score
        overall_score = round(total_score / score_count, 2) if score_count > 0 else 100.0
    
        # Add general recommendations
        if not recommendations and overall_score < settings.character_score_threshold:
            recommendations.append(
                "Consider running profile_data to get a comprehensive overview of data issues",
            )
    
        # Count passed/failed rules
        passed_rules = sum(1 for rule in rule_results if rule.passed)
        failed_rules = len(rule_results) - passed_rules
    
        # Apply limits to quality issues to prevent resource exhaustion
        limited_issues, was_truncated = apply_violation_limits(
            quality_issues, settings.max_validation_violations, "Data quality check"
        )
    
        if was_truncated:
            logger.warning(
                "Quality check found %d issues, limited to %d",
                len(quality_issues),
                settings.max_validation_violations,
            )
    
        # Create QualityResults
        quality_results = QualityResults(
            overall_score=overall_score,
            passed_rules=passed_rules,
            failed_rules=failed_rules,
            total_issues=len(limited_issues),
            rule_results=rule_results,
            issues=limited_issues,
            recommendations=recommendations,
        )
    
        # No longer recording operations (simplified MCP architecture)
    
        return DataQualityResult(
            quality_results=quality_results,
        )
  • Registers the check_data_quality function as an MCP tool on the DataBeak-Validation FastMCP server instance.
    validation_server.tool(name="validate_schema")(validate_schema)
    validation_server.tool(name="check_data_quality")(check_data_quality)
    validation_server.tool(name="find_anomalies")(find_anomalies)
  • Output schema model for the tool response, containing QualityResults with overall score, rule results, issues, and recommendations.
    class DataQualityResult(BaseModel):
        """Response model for data quality check operations."""
    
        quality_results: QualityResults = Field(description="Comprehensive quality assessment results")
  • Input schema: Discriminated union type for quality rules (completeness, duplicates, etc.) used to configure custom checks.
    QualityRuleType = Annotated[
        CompletenessRule
        | DuplicatesRule
        | UniquenessRule
        | DataTypesRule
        | OutliersRule
        | ConsistencyRule,
        Field(discriminator="type"),
    ]
  • Default quality rules applied when no custom rules are provided.
    rules = [
        CompletenessRule(threshold=0.95),
        DuplicatesRule(threshold=0.01),
        DataTypesRule(),
        OutliersRule(threshold=0.05),
        ConsistencyRule(),

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jonpspri/databeak'

If you have feedback or need assistance with the MCP directory API, please join our Discord server