Skip to main content
Glama
santoshray02

CSV Editor

by santoshray02

check_data_quality

Validate CSV data quality by applying predefined or custom rules to identify errors, inconsistencies, and compliance issues in your datasets.

Instructions

Check data quality based on predefined or custom rules.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
rulesNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • Core handler function implementing the check_data_quality tool. Performs comprehensive data quality checks including completeness, duplicates, uniqueness, data types, outliers, and consistency based on provided or default rules. Computes overall quality score and provides issues and recommendations.
    async def check_data_quality(
        session_id: str,
        rules: Optional[List[Dict[str, Any]]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """
        Check data quality based on predefined or custom rules.
        
        Args:
            session_id: Session identifier
            rules: Custom quality rules to check. If None, uses default rules.
                   Example: [
                       {"type": "completeness", "threshold": 0.95},
                       {"type": "uniqueness", "column": "id"},
                       {"type": "consistency", "columns": ["start_date", "end_date"]}
                   ]
            ctx: FastMCP context
            
        Returns:
            Dict with quality check results
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
            
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
            
            df = session.df
            quality_results = {
                "overall_score": 100.0,
                "checks": [],
                "issues": [],
                "recommendations": []
            }
            
            # Default rules if none provided
            if not rules:
                rules = [
                    {"type": "completeness", "threshold": 0.95},
                    {"type": "duplicates", "threshold": 0.01},
                    {"type": "data_types"},
                    {"type": "outliers", "threshold": 0.05},
                    {"type": "consistency"}
                ]
            
            total_score = 0
            score_count = 0
            
            for rule in rules:
                rule_type = rule.get("type")
                
                if rule_type == "completeness":
                    # Check data completeness
                    threshold = rule.get("threshold", 0.95)
                    columns = rule.get("columns", df.columns.tolist())
                    
                    for col in columns:
                        if col in df.columns:
                            completeness = 1 - (df[col].isna().sum() / len(df))
                            passed = completeness >= threshold
                            score = completeness * 100
                            
                            quality_results["checks"].append({
                                "type": "completeness",
                                "column": col,
                                "completeness": round(completeness, 4),
                                "threshold": threshold,
                                "passed": passed,
                                "score": round(score, 2)
                            })
                            
                            if not passed:
                                quality_results["issues"].append({
                                    "type": "incomplete_data",
                                    "column": col,
                                    "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete",
                                    "severity": "high" if completeness < 0.5 else "medium"
                                })
                            
                            total_score += score
                            score_count += 1
                
                elif rule_type == "duplicates":
                    # Check for duplicate rows
                    threshold = rule.get("threshold", 0.01)
                    subset = rule.get("columns")
                    
                    duplicates = df.duplicated(subset=subset)
                    duplicate_ratio = duplicates.sum() / len(df)
                    passed = duplicate_ratio <= threshold
                    score = (1 - duplicate_ratio) * 100
                    
                    quality_results["checks"].append({
                        "type": "duplicates",
                        "duplicate_rows": int(duplicates.sum()),
                        "duplicate_ratio": round(duplicate_ratio, 4),
                        "threshold": threshold,
                        "passed": passed,
                        "score": round(score, 2)
                    })
                    
                    if not passed:
                        quality_results["issues"].append({
                            "type": "duplicate_rows",
                            "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)",
                            "severity": "high" if duplicate_ratio > 0.1 else "medium"
                        })
                        quality_results["recommendations"].append(
                            "Consider removing duplicate rows using the remove_duplicates tool"
                        )
                    
                    total_score += score
                    score_count += 1
                
                elif rule_type == "uniqueness":
                    # Check column uniqueness
                    column = rule.get("column")
                    if column and column in df.columns:
                        unique_ratio = df[column].nunique() / len(df)
                        expected_unique = rule.get("expected_unique", True)
                        
                        if expected_unique:
                            passed = unique_ratio >= 0.99
                            score = unique_ratio * 100
                        else:
                            passed = True
                            score = 100
                        
                        quality_results["checks"].append({
                            "type": "uniqueness",
                            "column": column,
                            "unique_values": int(df[column].nunique()),
                            "unique_ratio": round(unique_ratio, 4),
                            "passed": passed,
                            "score": round(score, 2)
                        })
                        
                        if not passed and expected_unique:
                            quality_results["issues"].append({
                                "type": "non_unique_values",
                                "column": column,
                                "message": f"Column '{column}' expected to be unique but has duplicates",
                                "severity": "high"
                            })
                        
                        total_score += score
                        score_count += 1
                
                elif rule_type == "data_types":
                    # Check data type consistency
                    for col in df.columns:
                        col_data = df[col].dropna()
                        if len(col_data) > 0:
                            # Check for mixed types
                            types = col_data.apply(type).unique()
                            mixed_types = len(types) > 1
                            
                            # Check for numeric strings
                            if col_data.dtype == object:
                                numeric_strings = col_data.astype(str).str.match(r'^-?\d+\.?\d*$').sum()
                                numeric_ratio = numeric_strings / len(col_data)
                            else:
                                numeric_ratio = 0
                            
                            score = 100 if not mixed_types else 50
                            
                            quality_results["checks"].append({
                                "type": "data_type_consistency",
                                "column": col,
                                "dtype": str(df[col].dtype),
                                "mixed_types": mixed_types,
                                "numeric_strings": numeric_ratio > 0.9,
                                "score": score
                            })
                            
                            if numeric_ratio > 0.9:
                                quality_results["recommendations"].append(
                                    f"Column '{col}' appears to contain numeric data stored as strings. "
                                    f"Consider converting to numeric type using change_column_type tool"
                                )
                            
                            total_score += score
                            score_count += 1
                
                elif rule_type == "outliers":
                    # Check for outliers in numeric columns
                    threshold = rule.get("threshold", 0.05)
                    numeric_cols = df.select_dtypes(include=[np.number]).columns
                    
                    for col in numeric_cols:
                        Q1 = df[col].quantile(0.25)
                        Q3 = df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        
                        outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()
                        outlier_ratio = outliers / len(df)
                        passed = outlier_ratio <= threshold
                        score = (1 - min(outlier_ratio, 1)) * 100
                        
                        quality_results["checks"].append({
                            "type": "outliers",
                            "column": col,
                            "outlier_count": int(outliers),
                            "outlier_ratio": round(outlier_ratio, 4),
                            "threshold": threshold,
                            "passed": passed,
                            "score": round(score, 2)
                        })
                        
                        if not passed:
                            quality_results["issues"].append({
                                "type": "outliers",
                                "column": col,
                                "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)",
                                "severity": "medium"
                            })
                        
                        total_score += score
                        score_count += 1
                
                elif rule_type == "consistency":
                    # Check data consistency
                    columns = rule.get("columns", [])
                    
                    # Date consistency check
                    date_cols = df.select_dtypes(include=['datetime64']).columns
                    if len(date_cols) >= 2 and not columns:
                        columns = date_cols.tolist()
                    
                    if len(columns) >= 2:
                        col1, col2 = columns[0], columns[1]
                        if col1 in df.columns and col2 in df.columns:
                            # Check if col1 should be before col2 (e.g., start_date < end_date)
                            if pd.api.types.is_datetime64_any_dtype(df[col1]) and pd.api.types.is_datetime64_any_dtype(df[col2]):
                                inconsistent = (df[col1] > df[col2]).sum()
                                consistency_ratio = 1 - (inconsistent / len(df))
                                passed = consistency_ratio >= 0.99
                                score = consistency_ratio * 100
                                
                                quality_results["checks"].append({
                                    "type": "consistency",
                                    "columns": [col1, col2],
                                    "consistent_rows": len(df) - inconsistent,
                                    "inconsistent_rows": int(inconsistent),
                                    "consistency_ratio": round(consistency_ratio, 4),
                                    "passed": passed,
                                    "score": round(score, 2)
                                })
                                
                                if not passed:
                                    quality_results["issues"].append({
                                        "type": "data_inconsistency",
                                        "columns": [col1, col2],
                                        "message": f"Found {inconsistent} rows where {col1} > {col2}",
                                        "severity": "high"
                                    })
                                
                                total_score += score
                                score_count += 1
            
            # Calculate overall score
            if score_count > 0:
                quality_results["overall_score"] = round(total_score / score_count, 2)
            
            # Determine quality level
            overall_score = quality_results["overall_score"]
            if overall_score >= 95:
                quality_results["quality_level"] = "Excellent"
            elif overall_score >= 85:
                quality_results["quality_level"] = "Good"
            elif overall_score >= 70:
                quality_results["quality_level"] = "Fair"
            else:
                quality_results["quality_level"] = "Poor"
            
            # Add general recommendations
            if not quality_results["recommendations"]:
                if overall_score < 85:
                    quality_results["recommendations"].append(
                        "Consider running profile_data to get a comprehensive overview of data issues"
                    )
            
            session.record_operation(OperationType.QUALITY_CHECK, {
                "rules_count": len(rules),
                "overall_score": overall_score,
                "issues_count": len(quality_results["issues"])
            })
            
            return {
                "success": True,
                "quality_results": quality_results
            }
            
        except Exception as e:
            logger.error(f"Error checking data quality: {str(e)}")
            return {"success": False, "error": str(e)}
  • MCP tool registration for check_data_quality. Imports the handler from validation.py as _check_data_quality and registers it using @mcp.tool decorator with matching signature.
    async def check_data_quality(
        session_id: str,
        rules: Optional[List[Dict[str, Any]]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """Check data quality based on predefined or custom rules."""
        return await _check_data_quality(session_id, rules, ctx)
  • Docstring in the handler provides detailed input schema description with examples of rules structure and expected return format.
    """
    Check data quality based on predefined or custom rules.
    
    Args:
        session_id: Session identifier
        rules: Custom quality rules to check. If None, uses default rules.
               Example: [
                   {"type": "completeness", "threshold": 0.95},
                   {"type": "uniqueness", "column": "id"},
                   {"type": "consistency", "columns": ["start_date", "end_date"]}
               ]
        ctx: FastMCP context
        
    Returns:
        Dict with quality check results
    """
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It mentions checking based on rules but doesn't describe what the tool actually does (e.g., runs validation, returns metrics, flags issues), whether it modifies data, requires specific permissions, or has side effects. This is insufficient for a tool with potential data analysis implications.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence with no wasted words. It's front-loaded with the core action ('Check data quality') and includes a key detail ('based on predefined or custom rules'), making it appropriately concise for its limited content.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (data quality checking with rules), no annotations, and an output schema (which reduces need to describe returns), the description is minimally adequate. It covers the basic purpose but lacks details on behavior, parameters, and usage context, leaving gaps that could hinder effective tool selection.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description must compensate for undocumented parameters. It mentions 'predefined or custom rules', which loosely relates to the 'rules' parameter, but doesn't explain what 'session_id' is, the format of rules, or how they interact. This adds minimal value beyond the schema's structural information.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description states the tool checks data quality, which is a clear purpose, but it's vague about scope and resources. It mentions 'predefined or custom rules' but doesn't specify what data is being checked or how it relates to siblings like 'validate_schema' or 'detect_outliers'. This provides a basic understanding but lacks differentiation from similar tools.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives like 'validate_schema', 'detect_outliers', or 'find_anomalies'. The description implies it's for data quality checking but doesn't specify contexts, prerequisites, or exclusions, leaving the agent to infer usage from the tool name alone.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server