Skip to main content
Glama

validate_schema

Verify CSV data compliance with a predefined schema using a session-specific validation process. Ensures data integrity and structural accuracy within CSV files.

Instructions

Validate data against a schema definition.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
schemaYes
session_idYes

Implementation Reference

  • Registration of the 'validate_schema' tool using the @mcp.tool decorator in the main server file. This thin wrapper delegates to the implementation in tools/validation.py.
    @mcp.tool async def validate_schema( session_id: str, schema: Dict[str, Dict[str, Any]], ctx: Context = None ) -> Dict[str, Any]: """Validate data against a schema definition.""" return await _validate_schema(session_id, schema, ctx)
  • Core handler function implementing the validate_schema tool. Performs comprehensive schema validation on CSV data: checks column presence, types, nullability, ranges, patterns, allowed values, uniqueness, and string lengths. Returns detailed validation results including errors per column and summary statistics.
    async def validate_schema( session_id: str, schema: Dict[str, Dict[str, Any]], ctx: Context = None ) -> Dict[str, Any]: """ Validate data against a schema definition. Args: session_id: Session identifier schema: Schema definition with column rules Example: { "column_name": { "type": "int", # int, float, str, bool, datetime "nullable": False, "min": 0, "max": 100, "pattern": "^[A-Z]+$", "values": ["A", "B", "C"], # allowed values "unique": True } } ctx: FastMCP context Returns: Dict with validation results """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df validation_errors = {} validation_summary = { "total_columns": len(schema), "valid_columns": 0, "invalid_columns": 0, "missing_columns": [], "extra_columns": [] } # Check for missing and extra columns schema_columns = set(schema.keys()) df_columns = set(df.columns) validation_summary["missing_columns"] = list(schema_columns - df_columns) validation_summary["extra_columns"] = list(df_columns - schema_columns) # Validate each column in schema for col_name, rules in schema.items(): if col_name not in df.columns: validation_errors[col_name] = [{ "error": "column_missing", "message": f"Column '{col_name}' not found in data" }] validation_summary["invalid_columns"] += 1 continue col_errors = [] col_data = df[col_name] # Type validation expected_type = rules.get("type") if expected_type: type_valid = False if expected_type == "int": type_valid = pd.api.types.is_integer_dtype(col_data) elif expected_type == "float": type_valid = pd.api.types.is_float_dtype(col_data) elif expected_type == "str": type_valid = pd.api.types.is_string_dtype(col_data) or col_data.dtype == object elif expected_type == "bool": type_valid = pd.api.types.is_bool_dtype(col_data) elif expected_type == "datetime": type_valid = pd.api.types.is_datetime64_any_dtype(col_data) if not type_valid: col_errors.append({ "error": "type_mismatch", "message": f"Expected type '{expected_type}', got '{col_data.dtype}'", "actual_type": str(col_data.dtype) }) # Nullable validation if not rules.get("nullable", True): null_count = col_data.isna().sum() if null_count > 0: col_errors.append({ "error": "null_values", "message": f"Column contains {null_count} null values", "null_count": int(null_count), "null_indices": df[col_data.isna()].index.tolist()[:100] }) # Min/Max validation for numeric columns if pd.api.types.is_numeric_dtype(col_data): if "min" in rules: min_val = rules["min"] violations = col_data[col_data < min_val] if len(violations) > 0: col_errors.append({ "error": "min_violation", "message": f"{len(violations)} values below minimum {min_val}", "violation_count": len(violations), "min_found": float(violations.min()) }) if "max" in rules: max_val = rules["max"] violations = col_data[col_data > max_val] if len(violations) > 0: col_errors.append({ "error": "max_violation", "message": f"{len(violations)} values above maximum {max_val}", "violation_count": len(violations), "max_found": float(violations.max()) }) # Pattern validation for string columns if "pattern" in rules and (col_data.dtype == object or pd.api.types.is_string_dtype(col_data)): pattern = rules["pattern"] try: non_null = col_data.dropna() if len(non_null) > 0: matches = non_null.astype(str).str.match(pattern) violations = non_null[~matches] if len(violations) > 0: col_errors.append({ "error": "pattern_violation", "message": f"{len(violations)} values don't match pattern '{pattern}'", "violation_count": len(violations), "sample_violations": violations.head(10).tolist() }) except Exception as e: col_errors.append({ "error": "pattern_error", "message": f"Invalid regex pattern: {str(e)}" }) # Allowed values validation if "values" in rules: allowed = set(rules["values"]) actual = set(col_data.dropna().unique()) invalid = actual - allowed if invalid: col_errors.append({ "error": "invalid_values", "message": f"Found {len(invalid)} invalid values", "invalid_values": list(invalid)[:50] }) # Uniqueness validation if rules.get("unique", False): duplicates = col_data.duplicated() if duplicates.any(): col_errors.append({ "error": "duplicate_values", "message": f"Column contains {duplicates.sum()} duplicate values", "duplicate_count": int(duplicates.sum()) }) # Length validation for strings if col_data.dtype == object or pd.api.types.is_string_dtype(col_data): if "min_length" in rules: min_len = rules["min_length"] str_data = col_data.dropna().astype(str) short = str_data[str_data.str.len() < min_len] if len(short) > 0: col_errors.append({ "error": "min_length_violation", "message": f"{len(short)} values shorter than {min_len} characters", "violation_count": len(short) }) if "max_length" in rules: max_len = rules["max_length"] str_data = col_data.dropna().astype(str) long = str_data[str_data.str.len() > max_len] if len(long) > 0: col_errors.append({ "error": "max_length_violation", "message": f"{len(long)} values longer than {max_len} characters", "violation_count": len(long) }) if col_errors: validation_errors[col_name] = col_errors validation_summary["invalid_columns"] += 1 else: validation_summary["valid_columns"] += 1 is_valid = len(validation_errors) == 0 and len(validation_summary["missing_columns"]) == 0 session.record_operation(OperationType.VALIDATE, { "type": "schema_validation", "is_valid": is_valid, "errors_count": len(validation_errors) }) return { "success": True, "is_valid": is_valid, "summary": validation_summary, "validation_errors": validation_errors } except Exception as e: logger.error(f"Error validating schema: {str(e)}") return {"success": False, "error": str(e)}
  • Input schema definition documented in the function docstring, specifying the expected structure for the 'schema' parameter including supported rules like type, nullable, min/max, pattern, values, unique.
    """ Validate data against a schema definition. Args: session_id: Session identifier schema: Schema definition with column rules Example: { "column_name": { "type": "int", # int, float, str, bool, datetime "nullable": False, "min": 0, "max": 100, "pattern": "^[A-Z]+$", "values": ["A", "B", "C"], # allowed values "unique": True } } ctx: FastMCP context Returns: Dict with validation results """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server