Skip to main content
Glama
ddl_operations.py14.6 kB
""" Tools seguras para operações DDL (Data Definition Language) no MCP Databases. Permite criação de tabelas com validação rigorosa de segurança. """ from mcp_databases.db.factory import get_db from mcp_databases.logger import MCPLogger from mcp_databases.utils import ensure_valid_conn_params from typing import Dict, Any, List import re logger = MCPLogger.get_logger("mcp_databases.ddl_operations") class DDLSecurityError(Exception): """Exceção para operações DDL inseguras.""" pass def create_table(params: Dict[str, Any]): """ Cria uma nova tabela com validação de segurança. Espera: { "db_type": "mssql|mysql|postgres", "conn_params": {...}, "table_name": "nome_da_tabela", "columns": [ {"name": "id", "type": "INT", "constraints": ["PRIMARY KEY", "AUTO_INCREMENT"]}, {"name": "name", "type": "VARCHAR(255)", "constraints": ["NOT NULL"]}, {"name": "email", "type": "VARCHAR(255)", "constraints": ["UNIQUE"]} ], "options": { "if_not_exists": true, # Opcional "engine": "InnoDB", # MySQL específico "charset": "utf8mb4" # MySQL específico } } """ try: db_type = params["db_type"] conn_params = params["conn_params"] table_name = params["table_name"] columns = params["columns"] options = params.get("options", {}) # Validação de segurança _validate_table_creation(table_name, columns, options) # Garante que conn_params seja válido conn_params = ensure_valid_conn_params(db_type, conn_params) # Gera o SQL de criação de forma segura create_sql = _generate_create_table_sql(db_type, table_name, columns, options) logger.info(f"Criando tabela {table_name} no banco {db_type}") logger.info(f"SQL gerado: {create_sql}") db = get_db(db_type, conn_params) # Executa usando método específico para DDL (sem validação de SELECT) result = _execute_ddl_safely(db, create_sql, "CREATE TABLE") logger.info(f"Tabela {table_name} criada com sucesso") return { "success": True, "message": f"Tabela '{table_name}' criada com sucesso", "table_name": table_name, "sql_executed": create_sql, "columns_created": len(columns) } except Exception as e: logger.error(f"Erro ao criar tabela: {e}", exc_info=True) raise def alter_table(params: Dict[str, Any]): """ Altera uma tabela existente com validação de segurança. Espera: { "db_type": "mssql|mysql|postgres", "conn_params": {...}, "table_name": "nome_da_tabela", "operation": "ADD_COLUMN|DROP_COLUMN|MODIFY_COLUMN|RENAME_COLUMN", "column_spec": { "name": "nova_coluna", "type": "VARCHAR(100)", "constraints": ["NOT NULL"], "position": "AFTER existing_column" # MySQL específico } } """ try: db_type = params["db_type"] conn_params = params["conn_params"] table_name = params["table_name"] operation = params["operation"] column_spec = params["column_spec"] # Validação de segurança _validate_table_alteration(table_name, operation, column_spec) # Garante que conn_params seja válido conn_params = ensure_valid_conn_params(db_type, conn_params) # Gera o SQL de alteração de forma segura alter_sql = _generate_alter_table_sql(db_type, table_name, operation, column_spec) logger.info(f"Alterando tabela {table_name} no banco {db_type}") logger.info(f"SQL gerado: {alter_sql}") db = get_db(db_type, conn_params) # Executa usando método específico para DDL result = _execute_ddl_safely(db, alter_sql, "ALTER TABLE") logger.info(f"Tabela {table_name} alterada com sucesso") return { "success": True, "message": f"Tabela '{table_name}' alterada com sucesso", "table_name": table_name, "operation": operation, "sql_executed": alter_sql } except Exception as e: logger.error(f"Erro ao alterar tabela: {e}", exc_info=True) raise def drop_table(params: Dict[str, Any]): """ Remove uma tabela com confirmação dupla de segurança. Espera: { "db_type": "mssql|mysql|postgres", "conn_params": {...}, "table_name": "nome_da_tabela", "confirmation": "DELETE_TABLE_nome_da_tabela", # Confirmação obrigatória "if_exists": true # Opcional } """ try: db_type = params["db_type"] conn_params = params["conn_params"] table_name = params["table_name"] confirmation = params["confirmation"] if_exists = params.get("if_exists", False) # Validação de segurança com confirmação dupla expected_confirmation = f"DELETE_TABLE_{table_name}" if confirmation != expected_confirmation: raise DDLSecurityError(f"Confirmação de segurança incorreta. Esperado: '{expected_confirmation}'") _validate_table_name(table_name) # Garante que conn_params seja válido conn_params = ensure_valid_conn_params(db_type, conn_params) # Gera o SQL de remoção de forma segura drop_sql = _generate_drop_table_sql(db_type, table_name, if_exists) logger.warning(f"REMOVENDO tabela {table_name} no banco {db_type}") logger.warning(f"SQL gerado: {drop_sql}") db = get_db(db_type, conn_params) # Executa usando método específico para DDL result = _execute_ddl_safely(db, drop_sql, "DROP TABLE") logger.warning(f"Tabela {table_name} REMOVIDA com sucesso") return { "success": True, "message": f"Tabela '{table_name}' removida com sucesso", "table_name": table_name, "sql_executed": drop_sql, "warning": "OPERAÇÃO DE REMOÇÃO EXECUTADA - DADOS PERDIDOS PERMANENTEMENTE" } except Exception as e: logger.error(f"Erro ao remover tabela: {e}", exc_info=True) raise def _validate_table_creation(table_name: str, columns: List[Dict], options: Dict) -> None: """Valida parâmetros para criação de tabela.""" _validate_table_name(table_name) if not columns or not isinstance(columns, list): raise DDLSecurityError("Lista de colunas é obrigatória e deve ser uma lista") for col in columns: if not isinstance(col, dict) or "name" not in col or "type" not in col: raise DDLSecurityError("Cada coluna deve ter 'name' e 'type'") _validate_column_name(col["name"]) _validate_column_type(col["type"]) if "constraints" in col: _validate_column_constraints(col["constraints"]) def _validate_table_alteration(table_name: str, operation: str, column_spec: Dict) -> None: """Valida parâmetros para alteração de tabela.""" _validate_table_name(table_name) allowed_operations = ["ADD_COLUMN", "DROP_COLUMN", "MODIFY_COLUMN", "RENAME_COLUMN"] if operation not in allowed_operations: raise DDLSecurityError(f"Operação não permitida: {operation}. Permitidas: {allowed_operations}") if "name" not in column_spec: raise DDLSecurityError("Nome da coluna é obrigatório") _validate_column_name(column_spec["name"]) if operation in ["ADD_COLUMN", "MODIFY_COLUMN"] and "type" not in column_spec: raise DDLSecurityError(f"Tipo da coluna é obrigatório para operação {operation}") def _validate_table_name(table_name: str) -> None: """Valida nome de tabela.""" if not table_name or not isinstance(table_name, str): raise DDLSecurityError("Nome da tabela é obrigatório") # Apenas caracteres alfanuméricos e underscore if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', table_name): raise DDLSecurityError("Nome da tabela deve começar com letra e conter apenas letras, números e underscore") # Evita nomes reservados perigosos dangerous_names = ['admin', 'root', 'sys', 'system', 'master', 'information_schema'] if table_name.lower() in dangerous_names: raise DDLSecurityError(f"Nome de tabela '{table_name}' não é permitido por segurança") def _validate_column_name(column_name: str) -> None: """Valida nome de coluna.""" if not column_name or not isinstance(column_name, str): raise DDLSecurityError("Nome da coluna é obrigatório") # Apenas caracteres alfanuméricos e underscore if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', column_name): raise DDLSecurityError("Nome da coluna deve começar com letra e conter apenas letras, números e underscore") def _validate_column_type(column_type: str) -> None: """Valida tipo de coluna.""" if not column_type or not isinstance(column_type, str): raise DDLSecurityError("Tipo da coluna é obrigatório") # Lista de tipos permitidos (pode ser expandida) allowed_types_patterns = [ r'^INT(\(\d+\))?$', r'^VARCHAR\(\d+\)$', r'^CHAR\(\d+\)$', r'^TEXT$', r'^DECIMAL\(\d+,\d+\)$', r'^FLOAT(\(\d+,\d+\))?$', r'^DOUBLE(\(\d+,\d+\))?$', r'^DATE$', r'^DATETIME$', r'^TIMESTAMP$', r'^BOOLEAN$', r'^BOOL$', r'^TINYINT(\(\d+\))?$', r'^BIGINT(\(\d+\))?$', r'^MEDIUMTEXT$', r'^LONGTEXT$' ] column_type_upper = column_type.upper() if not any(re.match(pattern, column_type_upper) for pattern in allowed_types_patterns): raise DDLSecurityError(f"Tipo de coluna não permitido: {column_type}") def _validate_column_constraints(constraints: List[str]) -> None: """Valida constraints de coluna.""" if not isinstance(constraints, list): raise DDLSecurityError("Constraints devem ser uma lista") allowed_constraints = [ 'NOT NULL', 'NULL', 'PRIMARY KEY', 'UNIQUE', 'AUTO_INCREMENT', 'DEFAULT', 'CHECK', 'FOREIGN KEY', 'INDEX' ] for constraint in constraints: if not any(constraint.upper().startswith(allowed) for allowed in allowed_constraints): raise DDLSecurityError(f"Constraint não permitida: {constraint}") def _generate_create_table_sql(db_type: str, table_name: str, columns: List[Dict], options: Dict) -> str: """Gera SQL seguro para criação de tabela.""" # Escapar nome da tabela escaped_table = _escape_identifier(table_name, db_type) # Construir colunas column_definitions = [] for col in columns: col_def = f"{_escape_identifier(col['name'], db_type)} {col['type']}" if "constraints" in col: col_def += " " + " ".join(col["constraints"]) column_definitions.append(col_def) # Construir SQL base sql = f"CREATE TABLE" if options.get("if_not_exists"): sql += " IF NOT EXISTS" sql += f" {escaped_table} (\n " + ",\n ".join(column_definitions) + "\n)" # Opções específicas por banco if db_type.lower() == "mysql": if options.get("engine"): sql += f" ENGINE={options['engine']}" if options.get("charset"): sql += f" CHARACTER SET {options['charset']}" return sql def _generate_alter_table_sql(db_type: str, table_name: str, operation: str, column_spec: Dict) -> str: """Gera SQL seguro para alteração de tabela.""" escaped_table = _escape_identifier(table_name, db_type) escaped_column = _escape_identifier(column_spec["name"], db_type) sql = f"ALTER TABLE {escaped_table}" if operation == "ADD_COLUMN": sql += f" ADD COLUMN {escaped_column} {column_spec['type']}" if "constraints" in column_spec: sql += " " + " ".join(column_spec["constraints"]) if "position" in column_spec and db_type.lower() == "mysql": sql += f" {column_spec['position']}" elif operation == "DROP_COLUMN": sql += f" DROP COLUMN {escaped_column}" elif operation == "MODIFY_COLUMN": if db_type.lower() == "mysql": sql += f" MODIFY COLUMN {escaped_column} {column_spec['type']}" else: sql += f" ALTER COLUMN {escaped_column} TYPE {column_spec['type']}" elif operation == "RENAME_COLUMN": new_name = _escape_identifier(column_spec.get("new_name", ""), db_type) if db_type.lower() == "mysql": sql += f" CHANGE COLUMN {escaped_column} {new_name} {column_spec['type']}" else: sql += f" RENAME COLUMN {escaped_column} TO {new_name}" return sql def _generate_drop_table_sql(db_type: str, table_name: str, if_exists: bool) -> str: """Gera SQL seguro para remoção de tabela.""" escaped_table = _escape_identifier(table_name, db_type) sql = "DROP TABLE" if if_exists: sql += " IF EXISTS" sql += f" {escaped_table}" return sql def _escape_identifier(identifier: str, db_type: str) -> str: """Escapar identificadores SQL de forma segura.""" if db_type.lower() == "mysql": return f"`{identifier}`" elif db_type.lower() == "postgres": return f'"{identifier}"' elif db_type.lower() == "mssql": return f"[{identifier}]" else: return identifier def _execute_ddl_safely(db, sql: str, operation_type: str): """Executa DDL de forma segura sem passar pela validação de SELECT.""" import psycopg2 import mysql.connector import pyodbc try: if hasattr(db, '_connect'): conn = db._connect() cursor = conn.cursor() cursor.execute(sql) conn.commit() if hasattr(conn, 'close'): conn.close() return {"status": "success", "operation": operation_type} else: raise Exception("Método de conexão não encontrado") except Exception as e: logger.error(f"Erro ao executar {operation_type}: {e}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fean-developer/mcp-databases'

If you have feedback or need assistance with the MCP directory API, please join our Discord server