Skip to main content
Glama
santoshray02

CSV Editor

by santoshray02

check_data_quality

Check data quality in CSV files by applying custom or predefined validation rules to identify issues.

Instructions

Check data quality based on predefined or custom rules.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
rulesNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Core implementation of check_data_quality. Accepts session_id, optional custom rules, and a FastMCP context. Runs quality checks (completeness, duplicates, uniqueness, data types, outliers, consistency) against a DataFrame, returns overall_score, quality_level, checks, issues, and recommendations.
    async def check_data_quality(
        session_id: str, rules: list[dict[str, Any]] | None = None, ctx: Context = None
    ) -> dict[str, Any]:
        """
        Check data quality based on predefined or custom rules.
    
        Args:
            session_id: Session identifier
            rules: Custom quality rules to check. If None, uses default rules.
                   Example: [
                       {"type": "completeness", "threshold": 0.95},
                       {"type": "uniqueness", "column": "id"},
                       {"type": "consistency", "columns": ["start_date", "end_date"]}
                   ]
            ctx: FastMCP context
    
        Returns:
            Dict with quality check results
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
    
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
    
            df = session.df
            quality_results = {
                "overall_score": 100.0,
                "checks": [],
                "issues": [],
                "recommendations": [],
            }
    
            # Default rules if none provided
            if not rules:
                rules = [
                    {"type": "completeness", "threshold": 0.95},
                    {"type": "duplicates", "threshold": 0.01},
                    {"type": "data_types"},
                    {"type": "outliers", "threshold": 0.05},
                    {"type": "consistency"},
                ]
    
            total_score = 0
            score_count = 0
    
            for rule in rules:
                rule_type = rule.get("type")
    
                if rule_type == "completeness":
                    # Check data completeness
                    threshold = rule.get("threshold", 0.95)
                    columns = rule.get("columns", df.columns.tolist())
    
                    for col in columns:
                        if col in df.columns:
                            completeness = 1 - (df[col].isna().sum() / len(df))
                            passed = completeness >= threshold
                            score = completeness * 100
    
                            quality_results["checks"].append(
                                {
                                    "type": "completeness",
                                    "column": col,
                                    "completeness": round(completeness, 4),
                                    "threshold": threshold,
                                    "passed": passed,
                                    "score": round(score, 2),
                                }
                            )
    
                            if not passed:
                                quality_results["issues"].append(
                                    {
                                        "type": "incomplete_data",
                                        "column": col,
                                        "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete",
                                        "severity": "high" if completeness < 0.5 else "medium",
                                    }
                                )
    
                            total_score += score
                            score_count += 1
    
                elif rule_type == "duplicates":
                    # Check for duplicate rows
                    threshold = rule.get("threshold", 0.01)
                    subset = rule.get("columns")
    
                    duplicates = df.duplicated(subset=subset)
                    duplicate_ratio = duplicates.sum() / len(df)
                    passed = duplicate_ratio <= threshold
                    score = (1 - duplicate_ratio) * 100
    
                    quality_results["checks"].append(
                        {
                            "type": "duplicates",
                            "duplicate_rows": int(duplicates.sum()),
                            "duplicate_ratio": round(duplicate_ratio, 4),
                            "threshold": threshold,
                            "passed": passed,
                            "score": round(score, 2),
                        }
                    )
    
                    if not passed:
                        quality_results["issues"].append(
                            {
                                "type": "duplicate_rows",
                                "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)",
                                "severity": "high" if duplicate_ratio > 0.1 else "medium",
                            }
                        )
                        quality_results["recommendations"].append(
                            "Consider removing duplicate rows using the remove_duplicates tool"
                        )
    
                    total_score += score
                    score_count += 1
    
                elif rule_type == "uniqueness":
                    # Check column uniqueness
                    column = rule.get("column")
                    if column and column in df.columns:
                        unique_ratio = df[column].nunique() / len(df)
                        expected_unique = rule.get("expected_unique", True)
    
                        if expected_unique:
                            passed = unique_ratio >= 0.99
                            score = unique_ratio * 100
                        else:
                            passed = True
                            score = 100
    
                        quality_results["checks"].append(
                            {
                                "type": "uniqueness",
                                "column": column,
                                "unique_values": int(df[column].nunique()),
                                "unique_ratio": round(unique_ratio, 4),
                                "passed": passed,
                                "score": round(score, 2),
                            }
                        )
    
                        if not passed and expected_unique:
                            quality_results["issues"].append(
                                {
                                    "type": "non_unique_values",
                                    "column": column,
                                    "message": f"Column '{column}' expected to be unique but has duplicates",
                                    "severity": "high",
                                }
                            )
    
                        total_score += score
                        score_count += 1
    
                elif rule_type == "data_types":
                    # Check data type consistency
                    for col in df.columns:
                        col_data = df[col].dropna()
                        if len(col_data) > 0:
                            # Check for mixed types
                            types = col_data.apply(type).unique()
                            mixed_types = len(types) > 1
    
                            # Check for numeric strings
                            if col_data.dtype == object:
                                numeric_strings = col_data.astype(str).str.match(r"^-?\d+\.?\d*$").sum()
                                numeric_ratio = numeric_strings / len(col_data)
                            else:
                                numeric_ratio = 0
    
                            score = 100 if not mixed_types else 50
    
                            quality_results["checks"].append(
                                {
                                    "type": "data_type_consistency",
                                    "column": col,
                                    "dtype": str(df[col].dtype),
                                    "mixed_types": mixed_types,
                                    "numeric_strings": numeric_ratio > 0.9,
                                    "score": score,
                                }
                            )
    
                            if numeric_ratio > 0.9:
                                quality_results["recommendations"].append(
                                    f"Column '{col}' appears to contain numeric data stored as strings. "
                                    f"Consider converting to numeric type using change_column_type tool"
                                )
    
                            total_score += score
                            score_count += 1
    
                elif rule_type == "outliers":
                    # Check for outliers in numeric columns
                    threshold = rule.get("threshold", 0.05)
                    numeric_cols = df.select_dtypes(include=[np.number]).columns
    
                    for col in numeric_cols:
                        Q1 = df[col].quantile(0.25)
                        Q3 = df[col].quantile(0.75)
                        IQR = Q3 - Q1
    
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
    
                        outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()
                        outlier_ratio = outliers / len(df)
                        passed = outlier_ratio <= threshold
                        score = (1 - min(outlier_ratio, 1)) * 100
    
                        quality_results["checks"].append(
                            {
                                "type": "outliers",
                                "column": col,
                                "outlier_count": int(outliers),
                                "outlier_ratio": round(outlier_ratio, 4),
                                "threshold": threshold,
                                "passed": passed,
                                "score": round(score, 2),
                            }
                        )
    
                        if not passed:
                            quality_results["issues"].append(
                                {
                                    "type": "outliers",
                                    "column": col,
                                    "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)",
                                    "severity": "medium",
                                }
                            )
    
                        total_score += score
                        score_count += 1
    
                elif rule_type == "consistency":
                    # Check data consistency
                    columns = rule.get("columns", [])
    
                    # Date consistency check
                    date_cols = df.select_dtypes(include=["datetime64"]).columns
                    if len(date_cols) >= 2 and not columns:
                        columns = date_cols.tolist()
    
                    if len(columns) >= 2:
                        col1, col2 = columns[0], columns[1]
                        if col1 in df.columns and col2 in df.columns:
                            # Check if col1 should be before col2 (e.g., start_date < end_date)
                            if pd.api.types.is_datetime64_any_dtype(
                                df[col1]
                            ) and pd.api.types.is_datetime64_any_dtype(df[col2]):
                                inconsistent = (df[col1] > df[col2]).sum()
                                consistency_ratio = 1 - (inconsistent / len(df))
                                passed = consistency_ratio >= 0.99
                                score = consistency_ratio * 100
    
                                quality_results["checks"].append(
                                    {
                                        "type": "consistency",
                                        "columns": [col1, col2],
                                        "consistent_rows": len(df) - inconsistent,
                                        "inconsistent_rows": int(inconsistent),
                                        "consistency_ratio": round(consistency_ratio, 4),
                                        "passed": passed,
                                        "score": round(score, 2),
                                    }
                                )
    
                                if not passed:
                                    quality_results["issues"].append(
                                        {
                                            "type": "data_inconsistency",
                                            "columns": [col1, col2],
                                            "message": f"Found {inconsistent} rows where {col1} > {col2}",
                                            "severity": "high",
                                        }
                                    )
    
                                total_score += score
                                score_count += 1
    
            # Calculate overall score
            if score_count > 0:
                quality_results["overall_score"] = round(total_score / score_count, 2)
    
            # Determine quality level
            overall_score = quality_results["overall_score"]
            if overall_score >= 95:
                quality_results["quality_level"] = "Excellent"
            elif overall_score >= 85:
                quality_results["quality_level"] = "Good"
            elif overall_score >= 70:
                quality_results["quality_level"] = "Fair"
            else:
                quality_results["quality_level"] = "Poor"
    
            # Add general recommendations
            if not quality_results["recommendations"]:
                if overall_score < 85:
                    quality_results["recommendations"].append(
                        "Consider running profile_data to get a comprehensive overview of data issues"
                    )
    
            session.record_operation(
                OperationType.QUALITY_CHECK,
                {
                    "rules_count": len(rules),
                    "overall_score": overall_score,
                    "issues_count": len(quality_results["issues"]),
                },
            )
    
            return {"success": True, "quality_results": quality_results}
    
        except Exception as e:
            logger.error(f"Error checking data quality: {e!s}")
            return {"success": False, "error": str(e)}
  • MCP tool registration decorator (@mcp.tool) wrapping the handler. Exposes check_data_quality as a public tool that delegates to the implementation in validation.py.
    @mcp.tool
    async def check_data_quality(
        session_id: str, rules: list[dict[str, Any]] | None = None, ctx: Context = None
    ) -> dict[str, Any]:
        """Check data quality based on predefined or custom rules."""
        return await _check_data_quality(session_id, rules, ctx)
  • Tool listed under 'data_validation' capability in the server's capabilities dictionary.
    "data_validation": ["validate_schema", "check_data_quality", "find_anomalies"],
    "session_management": ["multi_session_support", "session_isolation", "auto_cleanup"],
  • Uses get_session_manager() and session.record_operation() for session management and operation logging.
    async def check_data_quality(
        session_id: str, rules: list[dict[str, Any]] | None = None, ctx: Context = None
    ) -> dict[str, Any]:
        """
        Check data quality based on predefined or custom rules.
    
        Args:
            session_id: Session identifier
            rules: Custom quality rules to check. If None, uses default rules.
                   Example: [
                       {"type": "completeness", "threshold": 0.95},
                       {"type": "uniqueness", "column": "id"},
                       {"type": "consistency", "columns": ["start_date", "end_date"]}
                   ]
            ctx: FastMCP context
    
        Returns:
            Dict with quality check results
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
    
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
    
            df = session.df
            quality_results = {
                "overall_score": 100.0,
                "checks": [],
                "issues": [],
                "recommendations": [],
            }
    
            # Default rules if none provided
            if not rules:
                rules = [
                    {"type": "completeness", "threshold": 0.95},
                    {"type": "duplicates", "threshold": 0.01},
                    {"type": "data_types"},
                    {"type": "outliers", "threshold": 0.05},
                    {"type": "consistency"},
                ]
    
            total_score = 0
            score_count = 0
    
            for rule in rules:
                rule_type = rule.get("type")
    
                if rule_type == "completeness":
                    # Check data completeness
                    threshold = rule.get("threshold", 0.95)
                    columns = rule.get("columns", df.columns.tolist())
    
                    for col in columns:
                        if col in df.columns:
                            completeness = 1 - (df[col].isna().sum() / len(df))
                            passed = completeness >= threshold
                            score = completeness * 100
    
                            quality_results["checks"].append(
                                {
                                    "type": "completeness",
                                    "column": col,
                                    "completeness": round(completeness, 4),
                                    "threshold": threshold,
                                    "passed": passed,
                                    "score": round(score, 2),
                                }
                            )
    
                            if not passed:
                                quality_results["issues"].append(
                                    {
                                        "type": "incomplete_data",
                                        "column": col,
                                        "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete",
                                        "severity": "high" if completeness < 0.5 else "medium",
                                    }
                                )
    
                            total_score += score
                            score_count += 1
    
                elif rule_type == "duplicates":
                    # Check for duplicate rows
                    threshold = rule.get("threshold", 0.01)
                    subset = rule.get("columns")
    
                    duplicates = df.duplicated(subset=subset)
                    duplicate_ratio = duplicates.sum() / len(df)
                    passed = duplicate_ratio <= threshold
                    score = (1 - duplicate_ratio) * 100
    
                    quality_results["checks"].append(
                        {
                            "type": "duplicates",
                            "duplicate_rows": int(duplicates.sum()),
                            "duplicate_ratio": round(duplicate_ratio, 4),
                            "threshold": threshold,
                            "passed": passed,
                            "score": round(score, 2),
                        }
                    )
    
                    if not passed:
                        quality_results["issues"].append(
                            {
                                "type": "duplicate_rows",
                                "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)",
                                "severity": "high" if duplicate_ratio > 0.1 else "medium",
                            }
                        )
                        quality_results["recommendations"].append(
                            "Consider removing duplicate rows using the remove_duplicates tool"
                        )
    
                    total_score += score
                    score_count += 1
    
                elif rule_type == "uniqueness":
                    # Check column uniqueness
                    column = rule.get("column")
                    if column and column in df.columns:
                        unique_ratio = df[column].nunique() / len(df)
                        expected_unique = rule.get("expected_unique", True)
    
                        if expected_unique:
                            passed = unique_ratio >= 0.99
                            score = unique_ratio * 100
                        else:
                            passed = True
                            score = 100
    
                        quality_results["checks"].append(
                            {
                                "type": "uniqueness",
                                "column": column,
                                "unique_values": int(df[column].nunique()),
                                "unique_ratio": round(unique_ratio, 4),
                                "passed": passed,
                                "score": round(score, 2),
                            }
                        )
    
                        if not passed and expected_unique:
                            quality_results["issues"].append(
                                {
                                    "type": "non_unique_values",
                                    "column": column,
                                    "message": f"Column '{column}' expected to be unique but has duplicates",
                                    "severity": "high",
                                }
                            )
    
                        total_score += score
                        score_count += 1
    
                elif rule_type == "data_types":
                    # Check data type consistency
                    for col in df.columns:
                        col_data = df[col].dropna()
                        if len(col_data) > 0:
                            # Check for mixed types
                            types = col_data.apply(type).unique()
                            mixed_types = len(types) > 1
    
                            # Check for numeric strings
                            if col_data.dtype == object:
                                numeric_strings = col_data.astype(str).str.match(r"^-?\d+\.?\d*$").sum()
                                numeric_ratio = numeric_strings / len(col_data)
                            else:
                                numeric_ratio = 0
    
                            score = 100 if not mixed_types else 50
    
                            quality_results["checks"].append(
                                {
                                    "type": "data_type_consistency",
                                    "column": col,
                                    "dtype": str(df[col].dtype),
                                    "mixed_types": mixed_types,
                                    "numeric_strings": numeric_ratio > 0.9,
                                    "score": score,
                                }
                            )
    
                            if numeric_ratio > 0.9:
                                quality_results["recommendations"].append(
                                    f"Column '{col}' appears to contain numeric data stored as strings. "
                                    f"Consider converting to numeric type using change_column_type tool"
                                )
    
                            total_score += score
                            score_count += 1
    
                elif rule_type == "outliers":
                    # Check for outliers in numeric columns
                    threshold = rule.get("threshold", 0.05)
                    numeric_cols = df.select_dtypes(include=[np.number]).columns
    
                    for col in numeric_cols:
                        Q1 = df[col].quantile(0.25)
                        Q3 = df[col].quantile(0.75)
                        IQR = Q3 - Q1
    
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
    
                        outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()
                        outlier_ratio = outliers / len(df)
                        passed = outlier_ratio <= threshold
                        score = (1 - min(outlier_ratio, 1)) * 100
    
                        quality_results["checks"].append(
                            {
                                "type": "outliers",
                                "column": col,
                                "outlier_count": int(outliers),
                                "outlier_ratio": round(outlier_ratio, 4),
                                "threshold": threshold,
                                "passed": passed,
                                "score": round(score, 2),
                            }
                        )
    
                        if not passed:
                            quality_results["issues"].append(
                                {
                                    "type": "outliers",
                                    "column": col,
                                    "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)",
                                    "severity": "medium",
                                }
                            )
    
                        total_score += score
                        score_count += 1
    
                elif rule_type == "consistency":
                    # Check data consistency
                    columns = rule.get("columns", [])
    
                    # Date consistency check
                    date_cols = df.select_dtypes(include=["datetime64"]).columns
                    if len(date_cols) >= 2 and not columns:
                        columns = date_cols.tolist()
    
                    if len(columns) >= 2:
                        col1, col2 = columns[0], columns[1]
                        if col1 in df.columns and col2 in df.columns:
                            # Check if col1 should be before col2 (e.g., start_date < end_date)
                            if pd.api.types.is_datetime64_any_dtype(
                                df[col1]
                            ) and pd.api.types.is_datetime64_any_dtype(df[col2]):
                                inconsistent = (df[col1] > df[col2]).sum()
                                consistency_ratio = 1 - (inconsistent / len(df))
                                passed = consistency_ratio >= 0.99
                                score = consistency_ratio * 100
    
                                quality_results["checks"].append(
                                    {
                                        "type": "consistency",
                                        "columns": [col1, col2],
                                        "consistent_rows": len(df) - inconsistent,
                                        "inconsistent_rows": int(inconsistent),
                                        "consistency_ratio": round(consistency_ratio, 4),
                                        "passed": passed,
                                        "score": round(score, 2),
                                    }
                                )
    
                                if not passed:
                                    quality_results["issues"].append(
                                        {
                                            "type": "data_inconsistency",
                                            "columns": [col1, col2],
                                            "message": f"Found {inconsistent} rows where {col1} > {col2}",
                                            "severity": "high",
                                        }
                                    )
    
                                total_score += score
                                score_count += 1
    
            # Calculate overall score
            if score_count > 0:
                quality_results["overall_score"] = round(total_score / score_count, 2)
    
            # Determine quality level
            overall_score = quality_results["overall_score"]
            if overall_score >= 95:
                quality_results["quality_level"] = "Excellent"
            elif overall_score >= 85:
                quality_results["quality_level"] = "Good"
            elif overall_score >= 70:
                quality_results["quality_level"] = "Fair"
            else:
                quality_results["quality_level"] = "Poor"
    
            # Add general recommendations
            if not quality_results["recommendations"]:
                if overall_score < 85:
                    quality_results["recommendations"].append(
                        "Consider running profile_data to get a comprehensive overview of data issues"
                    )
    
            session.record_operation(
                OperationType.QUALITY_CHECK,
                {
                    "rules_count": len(rules),
                    "overall_score": overall_score,
                    "issues_count": len(quality_results["issues"]),
                },
            )
    
            return {"success": True, "quality_results": quality_results}
    
        except Exception as e:
            logger.error(f"Error checking data quality: {e!s}")
            return {"success": False, "error": str(e)}
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations, the description must bear the burden of behavioral disclosure, but it only vaguely mentions 'predefined or custom rules.' It does not specify side effects (e.g., does it modify data?), required permissions, or output behavior beyond what the output schema might imply.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness3/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single concise sentence, which is efficient but sacrifices necessary detail. It front-loads the purpose but omits elaboration that would fit in a brief paragraph.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of the sibling tools and the lack of schema descriptions, this description is too sparse. An output schema exists but is not described; the tool's role among many related operations remains unclear.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description should compensate, but it only loosely references 'rules' without explaining their format or valid values. The 'session_id' parameter is not mentioned at all, adding minimal meaning beyond the schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description uses a specific verb ('check') and resource ('data quality'), distinguishing this tool from sibling transformation tools. However, it lacks detail on what 'data quality' entails (e.g., missing values, outliers), which slightly reduces clarity.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives like detect_outliers or profile_data. There is no mention of prerequisites, context, or exclusions, leaving the agent to infer usage independently.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server