Skip to main content
Glama
santoshray02

CSV Editor

by santoshray02

check_data_quality

Validate CSV data quality by applying predefined or custom rules to identify errors, inconsistencies, and compliance issues in your datasets.

Instructions

Check data quality based on predefined or custom rules.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
rulesNo

Implementation Reference

  • Core handler function implementing the check_data_quality tool. Performs comprehensive data quality checks including completeness, duplicates, uniqueness, data types, outliers, and consistency based on provided or default rules. Computes overall quality score and provides issues and recommendations.
    async def check_data_quality(
        session_id: str,
        rules: Optional[List[Dict[str, Any]]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """
        Check data quality based on predefined or custom rules.
        
        Args:
            session_id: Session identifier
            rules: Custom quality rules to check. If None, uses default rules.
                   Example: [
                       {"type": "completeness", "threshold": 0.95},
                       {"type": "uniqueness", "column": "id"},
                       {"type": "consistency", "columns": ["start_date", "end_date"]}
                   ]
            ctx: FastMCP context
            
        Returns:
            Dict with quality check results
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
            
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
            
            df = session.df
            quality_results = {
                "overall_score": 100.0,
                "checks": [],
                "issues": [],
                "recommendations": []
            }
            
            # Default rules if none provided
            if not rules:
                rules = [
                    {"type": "completeness", "threshold": 0.95},
                    {"type": "duplicates", "threshold": 0.01},
                    {"type": "data_types"},
                    {"type": "outliers", "threshold": 0.05},
                    {"type": "consistency"}
                ]
            
            total_score = 0
            score_count = 0
            
            for rule in rules:
                rule_type = rule.get("type")
                
                if rule_type == "completeness":
                    # Check data completeness
                    threshold = rule.get("threshold", 0.95)
                    columns = rule.get("columns", df.columns.tolist())
                    
                    for col in columns:
                        if col in df.columns:
                            completeness = 1 - (df[col].isna().sum() / len(df))
                            passed = completeness >= threshold
                            score = completeness * 100
                            
                            quality_results["checks"].append({
                                "type": "completeness",
                                "column": col,
                                "completeness": round(completeness, 4),
                                "threshold": threshold,
                                "passed": passed,
                                "score": round(score, 2)
                            })
                            
                            if not passed:
                                quality_results["issues"].append({
                                    "type": "incomplete_data",
                                    "column": col,
                                    "message": f"Column '{col}' is only {round(completeness*100, 2)}% complete",
                                    "severity": "high" if completeness < 0.5 else "medium"
                                })
                            
                            total_score += score
                            score_count += 1
                
                elif rule_type == "duplicates":
                    # Check for duplicate rows
                    threshold = rule.get("threshold", 0.01)
                    subset = rule.get("columns")
                    
                    duplicates = df.duplicated(subset=subset)
                    duplicate_ratio = duplicates.sum() / len(df)
                    passed = duplicate_ratio <= threshold
                    score = (1 - duplicate_ratio) * 100
                    
                    quality_results["checks"].append({
                        "type": "duplicates",
                        "duplicate_rows": int(duplicates.sum()),
                        "duplicate_ratio": round(duplicate_ratio, 4),
                        "threshold": threshold,
                        "passed": passed,
                        "score": round(score, 2)
                    })
                    
                    if not passed:
                        quality_results["issues"].append({
                            "type": "duplicate_rows",
                            "message": f"Found {duplicates.sum()} duplicate rows ({round(duplicate_ratio*100, 2)}%)",
                            "severity": "high" if duplicate_ratio > 0.1 else "medium"
                        })
                        quality_results["recommendations"].append(
                            "Consider removing duplicate rows using the remove_duplicates tool"
                        )
                    
                    total_score += score
                    score_count += 1
                
                elif rule_type == "uniqueness":
                    # Check column uniqueness
                    column = rule.get("column")
                    if column and column in df.columns:
                        unique_ratio = df[column].nunique() / len(df)
                        expected_unique = rule.get("expected_unique", True)
                        
                        if expected_unique:
                            passed = unique_ratio >= 0.99
                            score = unique_ratio * 100
                        else:
                            passed = True
                            score = 100
                        
                        quality_results["checks"].append({
                            "type": "uniqueness",
                            "column": column,
                            "unique_values": int(df[column].nunique()),
                            "unique_ratio": round(unique_ratio, 4),
                            "passed": passed,
                            "score": round(score, 2)
                        })
                        
                        if not passed and expected_unique:
                            quality_results["issues"].append({
                                "type": "non_unique_values",
                                "column": column,
                                "message": f"Column '{column}' expected to be unique but has duplicates",
                                "severity": "high"
                            })
                        
                        total_score += score
                        score_count += 1
                
                elif rule_type == "data_types":
                    # Check data type consistency
                    for col in df.columns:
                        col_data = df[col].dropna()
                        if len(col_data) > 0:
                            # Check for mixed types
                            types = col_data.apply(type).unique()
                            mixed_types = len(types) > 1
                            
                            # Check for numeric strings
                            if col_data.dtype == object:
                                numeric_strings = col_data.astype(str).str.match(r'^-?\d+\.?\d*$').sum()
                                numeric_ratio = numeric_strings / len(col_data)
                            else:
                                numeric_ratio = 0
                            
                            score = 100 if not mixed_types else 50
                            
                            quality_results["checks"].append({
                                "type": "data_type_consistency",
                                "column": col,
                                "dtype": str(df[col].dtype),
                                "mixed_types": mixed_types,
                                "numeric_strings": numeric_ratio > 0.9,
                                "score": score
                            })
                            
                            if numeric_ratio > 0.9:
                                quality_results["recommendations"].append(
                                    f"Column '{col}' appears to contain numeric data stored as strings. "
                                    f"Consider converting to numeric type using change_column_type tool"
                                )
                            
                            total_score += score
                            score_count += 1
                
                elif rule_type == "outliers":
                    # Check for outliers in numeric columns
                    threshold = rule.get("threshold", 0.05)
                    numeric_cols = df.select_dtypes(include=[np.number]).columns
                    
                    for col in numeric_cols:
                        Q1 = df[col].quantile(0.25)
                        Q3 = df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        
                        outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()
                        outlier_ratio = outliers / len(df)
                        passed = outlier_ratio <= threshold
                        score = (1 - min(outlier_ratio, 1)) * 100
                        
                        quality_results["checks"].append({
                            "type": "outliers",
                            "column": col,
                            "outlier_count": int(outliers),
                            "outlier_ratio": round(outlier_ratio, 4),
                            "threshold": threshold,
                            "passed": passed,
                            "score": round(score, 2)
                        })
                        
                        if not passed:
                            quality_results["issues"].append({
                                "type": "outliers",
                                "column": col,
                                "message": f"Column '{col}' has {outliers} outliers ({round(outlier_ratio*100, 2)}%)",
                                "severity": "medium"
                            })
                        
                        total_score += score
                        score_count += 1
                
                elif rule_type == "consistency":
                    # Check data consistency
                    columns = rule.get("columns", [])
                    
                    # Date consistency check
                    date_cols = df.select_dtypes(include=['datetime64']).columns
                    if len(date_cols) >= 2 and not columns:
                        columns = date_cols.tolist()
                    
                    if len(columns) >= 2:
                        col1, col2 = columns[0], columns[1]
                        if col1 in df.columns and col2 in df.columns:
                            # Check if col1 should be before col2 (e.g., start_date < end_date)
                            if pd.api.types.is_datetime64_any_dtype(df[col1]) and pd.api.types.is_datetime64_any_dtype(df[col2]):
                                inconsistent = (df[col1] > df[col2]).sum()
                                consistency_ratio = 1 - (inconsistent / len(df))
                                passed = consistency_ratio >= 0.99
                                score = consistency_ratio * 100
                                
                                quality_results["checks"].append({
                                    "type": "consistency",
                                    "columns": [col1, col2],
                                    "consistent_rows": len(df) - inconsistent,
                                    "inconsistent_rows": int(inconsistent),
                                    "consistency_ratio": round(consistency_ratio, 4),
                                    "passed": passed,
                                    "score": round(score, 2)
                                })
                                
                                if not passed:
                                    quality_results["issues"].append({
                                        "type": "data_inconsistency",
                                        "columns": [col1, col2],
                                        "message": f"Found {inconsistent} rows where {col1} > {col2}",
                                        "severity": "high"
                                    })
                                
                                total_score += score
                                score_count += 1
            
            # Calculate overall score
            if score_count > 0:
                quality_results["overall_score"] = round(total_score / score_count, 2)
            
            # Determine quality level
            overall_score = quality_results["overall_score"]
            if overall_score >= 95:
                quality_results["quality_level"] = "Excellent"
            elif overall_score >= 85:
                quality_results["quality_level"] = "Good"
            elif overall_score >= 70:
                quality_results["quality_level"] = "Fair"
            else:
                quality_results["quality_level"] = "Poor"
            
            # Add general recommendations
            if not quality_results["recommendations"]:
                if overall_score < 85:
                    quality_results["recommendations"].append(
                        "Consider running profile_data to get a comprehensive overview of data issues"
                    )
            
            session.record_operation(OperationType.QUALITY_CHECK, {
                "rules_count": len(rules),
                "overall_score": overall_score,
                "issues_count": len(quality_results["issues"])
            })
            
            return {
                "success": True,
                "quality_results": quality_results
            }
            
        except Exception as e:
            logger.error(f"Error checking data quality: {str(e)}")
            return {"success": False, "error": str(e)}
  • MCP tool registration for check_data_quality. Imports the handler from validation.py as _check_data_quality and registers it using @mcp.tool decorator with matching signature.
    async def check_data_quality(
        session_id: str,
        rules: Optional[List[Dict[str, Any]]] = None,
        ctx: Context = None
    ) -> Dict[str, Any]:
        """Check data quality based on predefined or custom rules."""
        return await _check_data_quality(session_id, rules, ctx)
  • Docstring in the handler provides detailed input schema description with examples of rules structure and expected return format.
    """
    Check data quality based on predefined or custom rules.
    
    Args:
        session_id: Session identifier
        rules: Custom quality rules to check. If None, uses default rules.
               Example: [
                   {"type": "completeness", "threshold": 0.95},
                   {"type": "uniqueness", "column": "id"},
                   {"type": "consistency", "columns": ["start_date", "end_date"]}
               ]
        ctx: FastMCP context
        
    Returns:
        Dict with quality check results
    """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server