from typing import Dict, Any
from .base_tool import BaseTool
from src.database.connection import db_manager
import logging
logger = logging.getLogger(__name__)
class DeleteTool(BaseTool):
def __init__(self):
super().__init__(
name="delete_data",
description="Delete data from a table with WHERE clause"
)
def execute(self, **kwargs) -> Dict[str, Any]:
try:
table_name = kwargs.get('table_name')
where_clause = kwargs.get('where_clause')
where_params = kwargs.get('where_params', [])
confirm = kwargs.get('confirm', False) # Safety confirmation
# Validate required parameters
if not table_name:
return self.format_response(False, error="table_name is required")
if not where_clause:
return self.format_response(False, error="where_clause is required for safety")
if not confirm:
return self.format_response(False, error="confirm=True is required for delete operations")
# Validate table name
if not table_name.replace('_', '').replace('-', '').isalnum():
return self.format_response(False, error="Invalid table name")
# Additional safety checks
where_lower = where_clause.lower().strip()
if where_lower in ['1=1', 'true', '1', 'where']:
return self.format_response(False, error="Unsafe WHERE clause detected")
# Build DELETE query
query = f"DELETE FROM `{table_name}` WHERE {where_clause}"
# Prepare parameters
params = where_params if isinstance(where_params, (list, tuple)) else [where_params] if where_params else []
logger.warning(f"DELETING data from {table_name} with WHERE: {where_clause}")
# Execute delete
affected_rows = db_manager.execute_update(query, tuple(params) if params else None)
result = {
'table_name': table_name,
'deleted_rows': affected_rows,
'where_clause': where_clause
}
logger.warning(f"Successfully deleted {affected_rows} rows from {table_name}")
return self.format_response(True, result)
except Exception as e:
logger.error(f"Delete tool execution failed: {str(e)}")
return self.format_response(False, error=str(e))