data_quality_report
Generate a comprehensive data quality assessment report to evaluate completeness, uniqueness, validity, and overall health of tabular data before analysis.
Instructions
Generate a comprehensive data quality assessment report.
Essential for understanding data health before analysis.
Args:
file_path: Path to CSV or SQLite file
Returns:
Dictionary containing:
- completeness: Missing value analysis per column
- uniqueness: Duplicate detection
- validity: Data type consistency and outlier counts
- overall_score: Data quality score (0-100)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_path | Yes |
Implementation Reference
- src/mcp_tabular/server.py:688-817 (handler)The primary handler function decorated with @mcp.tool(), implementing the full logic for generating a data quality report including completeness, uniqueness, validity checks, scoring, and grading.def data_quality_report(file_path: str) -> dict[str, Any]: """ Generate a comprehensive data quality assessment report. Essential for understanding data health before analysis. Args: file_path: Path to CSV or SQLite file Returns: Dictionary containing: - completeness: Missing value analysis per column - uniqueness: Duplicate detection - validity: Data type consistency and outlier counts - overall_score: Data quality score (0-100) """ df = _load_data(file_path) total_cells = df.size total_rows = len(df) # Completeness Analysis missing_per_column = df.isnull().sum().to_dict() missing_pct_per_column = (df.isnull().sum() / total_rows * 100).round(2).to_dict() total_missing = df.isnull().sum().sum() completeness_score = 100 - (total_missing / total_cells * 100) # Uniqueness Analysis duplicate_rows = df.duplicated().sum() duplicate_pct = (duplicate_rows / total_rows * 100) if total_rows > 0 else 0 uniqueness_score = 100 - duplicate_pct # Column-level uniqueness column_uniqueness = { col: { "unique_count": df[col].nunique(), "unique_pct": round(df[col].nunique() / total_rows * 100, 2) if total_rows > 0 else 0, "is_potential_id": df[col].nunique() == total_rows, } for col in df.columns } # Validity Analysis validity_issues = [] numeric_cols = _get_numeric_columns(df) for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 0: # Check for outliers using IQR q1, q3 = col_data.quantile([0.25, 0.75]) iqr = q3 - q1 outlier_count = ((col_data < q1 - 1.5 * iqr) | (col_data > q3 + 1.5 * iqr)).sum() if outlier_count > 0: validity_issues.append({ "column": col, "issue": "outliers", "count": int(outlier_count), "pct": round(outlier_count / len(col_data) * 100, 2), }) # Check for negative values in typically positive columns if col_data.min() < 0: neg_count = (col_data < 0).sum() validity_issues.append({ "column": col, "issue": "negative_values", "count": int(neg_count), "min_value": float(col_data.min()), }) # Check for empty strings in text columns text_cols = df.select_dtypes(include=["object"]).columns for col in text_cols: empty_strings = (df[col] == "").sum() if empty_strings > 0: validity_issues.append({ "column": col, "issue": "empty_strings", "count": int(empty_strings), }) validity_score = max(0, 100 - len(validity_issues) * 5) # Overall Data Quality Score overall_score = round((completeness_score * 0.4 + uniqueness_score * 0.3 + validity_score * 0.3), 1) # Quality grade if overall_score >= 90: grade = "A" recommendation = "Excellent data quality. Ready for analysis." elif overall_score >= 80: grade = "B" recommendation = "Good data quality. Minor cleaning recommended." elif overall_score >= 70: grade = "C" recommendation = "Moderate data quality. Cleaning needed before analysis." elif overall_score >= 60: grade = "D" recommendation = "Poor data quality. Significant cleaning required." else: grade = "F" recommendation = "Critical data quality issues. Major data cleaning needed." return { "file": file_path, "shape": {"rows": total_rows, "columns": len(df.columns)}, "overall_quality": { "score": overall_score, "grade": grade, "recommendation": recommendation, }, "completeness": { "score": round(completeness_score, 1), "total_missing_cells": int(total_missing), "missing_by_column": missing_per_column, "missing_pct_by_column": missing_pct_per_column, "columns_with_missing": [col for col, pct in missing_pct_per_column.items() if pct > 0], }, "uniqueness": { "score": round(uniqueness_score, 1), "duplicate_rows": int(duplicate_rows), "duplicate_pct": round(duplicate_pct, 2), "column_uniqueness": column_uniqueness, }, "validity": { "score": validity_score, "issues": validity_issues, }, }
- src/mcp_tabular/server.py:103-106 (helper)Helper function used by data_quality_report to identify numeric columns for validity analysis.def _get_numeric_columns(df: pd.DataFrame) -> list[str]: """Get list of numeric column names.""" return df.select_dtypes(include=[np.number]).columns.tolist()
- src/mcp_tabular/server.py:70-101 (helper)Core helper function used by data_quality_report (and other tools) to load CSV or SQLite data into a Pandas DataFrame.def _load_data(file_path: str) -> pd.DataFrame: """Load data from CSV or SQLite file.""" path = _resolve_path(file_path) if not path.exists(): raise FileNotFoundError( f"File not found: {file_path}\n" f"Resolved to: {path}\n" f"Project root: {_PROJECT_ROOT}\n" f"Current working directory: {Path.cwd()}" ) suffix = path.suffix.lower() if suffix == ".csv": return pd.read_csv(str(path)) elif suffix in (".db", ".sqlite", ".sqlite3"): # For SQLite, list tables or load first table conn = sqlite3.connect(str(path)) tables = pd.read_sql_query( "SELECT name FROM sqlite_master WHERE type='table'", conn ) if tables.empty: conn.close() raise ValueError(f"No tables found in SQLite database: {file_path}") first_table = tables.iloc[0]["name"] df = pd.read_sql_query(f"SELECT * FROM {first_table}", conn) conn.close() return df else: raise ValueError(f"Unsupported file format: {suffix}. Use .csv or .db/.sqlite")