from typing import Dict, Any
from .base_tool import BaseTool
from src.database.query_executor import query_executor
import logging
logger = logging.getLogger(__name__)
class SecurityStatusTool(BaseTool):
def __init__(self):
super().__init__(
name="check_security_status",
description="Check database security status and user privileges"
)
def execute(self, **kwargs) -> Dict[str, Any]:
try:
# Get current user info
user_info_query = "SELECT USER() as current_user, DATABASE() as current_database"
user_info = query_executor.db.execute_query(user_info_query)[0]
# Get user privileges
privileges_query = "SHOW GRANTS FOR CURRENT_USER()"
try:
privileges = query_executor.db.execute_query(privileges_query)
grant_statements = [list(grant.values())[0] for grant in privileges]
except:
grant_statements = ["Unable to retrieve grant information"]
# Check if user has full access to current database
has_full_access = any(
'ALL PRIVILEGES' in grant or
f'ON `{user_info["current_database"]}`.*' in grant or
'ON *.*' in grant
for grant in grant_statements
)
# Get tables user can access (basic check)
accessible_tables = []
try:
tables = query_executor.get_tables()
for table in tables:
try:
# Try to select from table to check access
query_executor.db.execute_query(f"SELECT 1 FROM `{table['table_name']}` LIMIT 1")
accessible_tables.append({
'table_name': table['table_name'],
'access_level': 'READ_ACCESS'
})
except:
pass
except Exception as e:
logger.warning(f"Could not check table access: {str(e)}")
result = {
'current_user': user_info['current_user'],
'current_database': user_info['current_database'],
'has_full_database_access': has_full_access,
'grant_statements': grant_statements,
'accessible_tables_count': len(accessible_tables),
'accessible_tables': accessible_tables[:10], # Show first 10
'security_warnings': []
}
# Add security warnings
if has_full_access:
result['security_warnings'].append("User has full database access")
if 'ON *.*' in str(grant_statements):
result['security_warnings'].append("User has global privileges")
logger.info("Security status check completed")
return self.format_response(True, result)
except Exception as e:
logger.error(f"Security status tool failed: {str(e)}")
return self.format_response(False, error=str(e))