Skip to main content
Glama

MySQL-Performance-Tuner-Mcp

test_tools.py65.2 kB
""" Unit tests for all tool handlers. """ import pytest import json from unittest.mock import AsyncMock, MagicMock from mysqltuner_mcp.tools import ( # Base handler ToolHandler, # Original tools (performance, index, health) GetSlowQueriesToolHandler, AnalyzeQueryToolHandler, TableStatsToolHandler, IndexRecommendationsToolHandler, UnusedIndexesToolHandler, IndexStatsToolHandler, DatabaseHealthToolHandler, ActiveQueriesToolHandler, SettingsReviewToolHandler, WaitEventsToolHandler, # InnoDB tools InnoDBStatusToolHandler, InnoDBBufferPoolToolHandler, InnoDBTransactionsToolHandler, # Statement tools StatementAnalysisToolHandler, StatementsTempTablesToolHandler, StatementsSortingToolHandler, StatementsFullScansToolHandler, StatementErrorsToolHandler, # Memory tools MemoryCalculationsToolHandler, MemoryByHostToolHandler, TableMemoryUsageToolHandler, # Engine tools StorageEngineAnalysisToolHandler, FragmentedTablesToolHandler, AutoIncrementAnalysisToolHandler, # Replication tools ReplicationStatusToolHandler, GaleraClusterToolHandler, GroupReplicationToolHandler, # Security tools SecurityAnalysisToolHandler, UserPrivilegesToolHandler, AuditLogToolHandler, ) def create_mock_sql_driver(): """Create a mock SQL driver for testing.""" driver = MagicMock() driver.execute_query = AsyncMock(return_value=[]) driver.execute_one = AsyncMock(return_value={}) driver.execute_scalar = AsyncMock(return_value=None) driver.get_server_status = AsyncMock(return_value={}) driver.get_server_variables = AsyncMock(return_value={}) return driver # ============================================================================= # Base Tool Handler Tests # ============================================================================= class TestToolHandlerBase: """Tests for the base ToolHandler class.""" def test_format_json_result(self): """Test JSON result formatting.""" driver = create_mock_sql_driver() handler = GetSlowQueriesToolHandler(driver) result = handler.format_json_result({"key": "value", "number": 42}) assert len(result) == 1 assert result[0].type == "text" # Parse the JSON to verify it's valid parsed = json.loads(result[0].text) assert parsed["key"] == "value" assert parsed["number"] == 42 def test_format_error(self): """Test error formatting.""" driver = create_mock_sql_driver() handler = GetSlowQueriesToolHandler(driver) result = handler.format_error(ValueError("Test error")) assert len(result) == 1 assert result[0].type == "text" assert "error" in result[0].text.lower() assert "Test error" in result[0].text def test_validate_required_args_success(self): """Test required args validation success.""" driver = create_mock_sql_driver() handler = GetSlowQueriesToolHandler(driver) # Should not raise handler.validate_required_args( {"arg1": "value1", "arg2": "value2"}, ["arg1", "arg2"] ) def test_validate_required_args_failure(self): """Test required args validation failure.""" driver = create_mock_sql_driver() handler = GetSlowQueriesToolHandler(driver) with pytest.raises(ValueError, match="Missing required"): handler.validate_required_args( {"arg1": "value1"}, ["arg1", "arg2", "arg3"] ) def test_get_annotations(self): """Test annotation generation.""" driver = create_mock_sql_driver() handler = GetSlowQueriesToolHandler(driver) annotations = handler.get_annotations() assert annotations["title"] == handler.title assert annotations["readOnlyHint"] == handler.read_only_hint assert annotations["destructiveHint"] == handler.destructive_hint # ============================================================================= # Performance Tool Tests (Original) # ============================================================================= class TestGetSlowQueriesToolHandler: """Tests for GetSlowQueriesToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = GetSlowQueriesToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_slow_queries" assert "slow" in definition.description.lower() assert "inputSchema" in dir(definition) or hasattr(definition, "inputSchema") @pytest.mark.asyncio async def test_run_tool(self): """Test running the tool.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "query_text": "SELECT * FROM users WHERE id = ?", "schema_name": "testdb", "exec_count": 100, "total_time_sec": 5.5, "avg_time_sec": 0.055, "max_time_sec": 0.2, "rows_examined": 10000, "rows_sent": 100, "rows_affected": 0, "full_scans": 0, "no_good_index": 0, "tmp_tables": 0, "tmp_disk_tables": 0, "full_joins": 0, "sort_rows": 0, "first_seen": "2024-01-01", "last_seen": "2024-06-01" } ]) handler = GetSlowQueriesToolHandler(driver) result = await handler.run_tool({"limit": 10}) assert len(result) == 1 assert result[0].type == "text" parsed = json.loads(result[0].text) assert "queries" in parsed assert len(parsed["queries"]) == 1 class TestAnalyzeQueryToolHandler: """Tests for AnalyzeQueryToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = AnalyzeQueryToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_query" assert "query" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_explain(self): """Test running EXPLAIN.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "id": 1, "select_type": "SIMPLE", "table": "users", "type": "ref", "possible_keys": "idx_email", "key": "idx_email", "key_len": "767", "ref": "const", "rows": 1, "Extra": "Using index" } ]) handler = AnalyzeQueryToolHandler(driver) result = await handler.run_tool({ "query": "SELECT * FROM users WHERE email = 'test@test.com'", "format": "traditional" }) assert len(result) == 1 parsed = json.loads(result[0].text) assert "plan" in parsed @pytest.mark.asyncio async def test_run_tool_missing_query(self): """Test error when query is missing.""" driver = create_mock_sql_driver() handler = AnalyzeQueryToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 assert "error" in result[0].text.lower() class TestTableStatsToolHandler: """Tests for TableStatsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = TableStatsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_table_stats" @pytest.mark.asyncio async def test_run_tool(self): """Test running table stats.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="testdb") driver.execute_query = AsyncMock(return_value=[ { "TABLE_NAME": "users", "TABLE_TYPE": "BASE TABLE", "ENGINE": "InnoDB", "ROW_FORMAT": "Dynamic", "TABLE_ROWS": 1000, "AVG_ROW_LENGTH": 100, "DATA_LENGTH": 10485760, "INDEX_LENGTH": 2621440, "DATA_FREE": 0, "AUTO_INCREMENT": 1001, "CREATE_TIME": "2024-01-01 00:00:00", "UPDATE_TIME": "2024-06-01 00:00:00", "TABLE_COLLATION": "utf8mb4_general_ci" } ]) handler = TableStatsToolHandler(driver) # Set include_indexes=False to avoid secondary query for index data result = await handler.run_tool({"include_indexes": False}) assert len(result) == 1 parsed = json.loads(result[0].text) assert "tables" in parsed # ============================================================================= # Index Tool Tests (Original) # ============================================================================= class TestIndexRecommendationsToolHandler: """Tests for IndexRecommendationsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = IndexRecommendationsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_index_recommendations" def test_extract_table_name(self): """Test table name extraction.""" driver = create_mock_sql_driver() handler = IndexRecommendationsToolHandler(driver) assert handler._extract_table_name("SELECT * FROM users") == "users" assert handler._extract_table_name("UPDATE orders SET status = 1") == "orders" assert handler._extract_table_name("DELETE FROM logs WHERE id > 100") == "logs" def test_extract_where_columns(self): """Test WHERE column extraction.""" driver = create_mock_sql_driver() handler = IndexRecommendationsToolHandler(driver) columns = handler._extract_where_columns( "SELECT * FROM users WHERE email = 'test' AND status = 1" ) assert "email" in columns assert "status" in columns @pytest.mark.asyncio async def test_run_tool(self): """Test running index recommendations.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="testdb") driver.execute_query = AsyncMock(return_value=[]) handler = IndexRecommendationsToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 parsed = json.loads(result[0].text) assert "recommendations" in parsed class TestUnusedIndexesToolHandler: """Tests for UnusedIndexesToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = UnusedIndexesToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "find_unused_indexes" @pytest.mark.asyncio async def test_run_tool(self): """Test running unused index finder.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="testdb") driver.execute_query = AsyncMock(return_value=[]) handler = UnusedIndexesToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 parsed = json.loads(result[0].text) assert "unused_indexes" in parsed assert "summary" in parsed class TestIndexStatsToolHandler: """Tests for IndexStatsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = IndexStatsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_index_stats" @pytest.mark.asyncio async def test_run_tool(self): """Test running index stats.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="testdb") driver.execute_query = AsyncMock(return_value=[ { "TABLE_NAME": "users", "INDEX_NAME": "PRIMARY", "NON_UNIQUE": 0, "INDEX_TYPE": "BTREE", "columns": "id", "cardinality": 1000, "TABLE_ROWS": 1000, "read_count": 5000, "write_count": 100, "read_time_ms": 50.0, "write_time_ms": 10.0 } ]) handler = IndexStatsToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 parsed = json.loads(result[0].text) assert "indexes" in parsed # ============================================================================= # Health Tool Tests (Original) # ============================================================================= class TestDatabaseHealthToolHandler: """Tests for DatabaseHealthToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = DatabaseHealthToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "check_database_health" @pytest.mark.asyncio async def test_run_tool_healthy(self): """Test health check on healthy database.""" driver = create_mock_sql_driver() driver.get_server_status = AsyncMock(return_value={ "Threads_connected": "10", "Threads_running": "2", "Innodb_buffer_pool_reads": "100", "Innodb_buffer_pool_read_requests": "100000", "Questions": "1000000", "Slow_queries": "10", "Handler_read_rnd_next": "1000", "Handler_read_rnd": "100", "Com_select": "50000", "Created_tmp_tables": "1000", "Created_tmp_disk_tables": "10", "Threads_created": "50", "Connections": "1000", "Uptime": "86400", "Key_reads": "100", "Key_read_requests": "10000" }) driver.get_server_variables = AsyncMock(return_value={ "max_connections": "151", "innodb_buffer_pool_size": "134217728" }) handler = DatabaseHealthToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 parsed = json.loads(result[0].text) assert "health_score" in parsed assert "status" in parsed assert "checks" in parsed @pytest.mark.asyncio async def test_run_tool_with_issues(self): """Test health check that detects issues.""" driver = create_mock_sql_driver() driver.get_server_status = AsyncMock(return_value={ "Threads_connected": "140", # High connection usage "Threads_running": "50", "Innodb_buffer_pool_reads": "50000", # Low hit ratio "Innodb_buffer_pool_read_requests": "100000", "Questions": "1000", "Slow_queries": "100", # High slow query % "Handler_read_rnd_next": "1000000", "Handler_read_rnd": "100", "Com_select": "100", "Created_tmp_tables": "100", "Created_tmp_disk_tables": "50", # High disk temp usage "Threads_created": "900", # Low thread cache hit "Connections": "1000", "Uptime": "86400", "Key_reads": "100", "Key_read_requests": "10000" }) driver.get_server_variables = AsyncMock(return_value={ "max_connections": "151", "innodb_buffer_pool_size": "134217728" }) handler = DatabaseHealthToolHandler(driver) result = await handler.run_tool({"include_recommendations": True}) parsed = json.loads(result[0].text) assert parsed["health_score"] < 100 assert len(parsed["issues"]) > 0 assert len(parsed["recommendations"]) > 0 class TestActiveQueriesToolHandler: """Tests for ActiveQueriesToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = ActiveQueriesToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_active_queries" @pytest.mark.asyncio async def test_run_tool(self): """Test active query monitoring.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "process_id": 1, "user": "root", "host": "localhost", "database_name": "testdb", "command": "Query", "duration_sec": 5, "state": "executing", "query": "SELECT * FROM large_table" } ]) handler = ActiveQueriesToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert "queries" in parsed assert "summary" in parsed class TestSettingsReviewToolHandler: """Tests for SettingsReviewToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = SettingsReviewToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "review_settings" @pytest.mark.asyncio async def test_run_tool(self): """Test settings review.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "innodb_buffer_pool_size": "134217728", "max_connections": "151", "thread_cache_size": "8", "slow_query_log": "ON", "long_query_time": "2" }) handler = SettingsReviewToolHandler(driver) result = await handler.run_tool({"category": "all"}) parsed = json.loads(result[0].text) assert "settings" in parsed assert "recommendations" in parsed class TestWaitEventsToolHandler: """Tests for WaitEventsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = WaitEventsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_wait_events" @pytest.mark.asyncio async def test_run_tool(self): """Test wait events analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "EVENT_NAME": "wait/io/file/innodb/innodb_data_file", "total_count": 1000, "total_wait_sec": 5.5, "avg_wait_ms": 5.5, "max_wait_ms": 100.0 } ]) handler = WaitEventsToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert "wait_events" in parsed assert "summary" in parsed # ============================================================================= # InnoDB Tool Tests # ============================================================================= class TestInnoDBStatusToolHandler: """Tests for InnoDBStatusToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = InnoDBStatusToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_innodb_status" assert "innodb" in definition.description.lower() assert "include_raw_output" in definition.inputSchema["properties"] assert "detailed_analysis" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_basic(self): """Test running the InnoDB status tool.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ {"Status": "INNODB STATUS OUTPUT\nBuffer pool size 16384\n"} ]) driver.get_server_variables = AsyncMock(return_value={ "innodb_buffer_pool_size": "134217728", "innodb_buffer_pool_instances": "1", "innodb_log_file_size": "50331648", "innodb_log_files_in_group": "2", "innodb_log_buffer_size": "16777216", "innodb_file_per_table": "ON", "innodb_flush_log_at_trx_commit": "1" }) driver.get_server_status = AsyncMock(return_value={ "Innodb_buffer_pool_pages_total": "8192", "Innodb_buffer_pool_pages_free": "1000", "Innodb_buffer_pool_pages_data": "7000", "Innodb_buffer_pool_pages_dirty": "100", "Innodb_buffer_pool_read_requests": "100000", "Innodb_buffer_pool_reads": "100", "Innodb_log_waits": "0", "Innodb_log_writes": "1000", "Innodb_rows_read": "50000", "Innodb_rows_inserted": "1000", "Innodb_data_read": "104857600", "Innodb_data_written": "52428800" }) handler = InnoDBStatusToolHandler(driver) result = await handler.run_tool({"detailed_analysis": False}) assert len(result) == 1 parsed = json.loads(result[0].text) assert "buffer_pool" in parsed assert "log_info" in parsed assert "recommendations" in parsed @pytest.mark.asyncio async def test_run_tool_with_low_hit_ratio(self): """Test InnoDB status detects low buffer pool hit ratio.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[{"Status": ""}]) driver.get_server_variables = AsyncMock(return_value={ "innodb_buffer_pool_size": "134217728", "innodb_buffer_pool_instances": "1", "innodb_file_per_table": "ON", "innodb_flush_log_at_trx_commit": "1" }) driver.get_server_status = AsyncMock(return_value={ "Innodb_buffer_pool_pages_total": "8192", "Innodb_buffer_pool_pages_free": "100", "Innodb_buffer_pool_read_requests": "1000", "Innodb_buffer_pool_reads": "500", # 50% miss rate = 50% hit ratio }) handler = InnoDBStatusToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) # Should detect low hit ratio assert any("hit ratio" in issue.lower() or "buffer pool" in issue.lower() for issue in parsed.get("issues", []) + parsed.get("recommendations", [])) @pytest.mark.asyncio async def test_parse_deadlock_info(self): """Test parsing deadlock information from status.""" driver = create_mock_sql_driver() status_with_deadlock = """ ------------------------ LATEST DETECTED DEADLOCK ------------------------ 2024-01-15 10:30:00 *** (1) TRANSACTION: TRANSACTION 12345, ACTIVE 5 sec *** (2) TRANSACTION: TRANSACTION 12346, ACTIVE 3 sec ------------------------ TRANSACTIONS """ driver.execute_query = AsyncMock(return_value=[{"Status": status_with_deadlock}]) driver.get_server_variables = AsyncMock(return_value={ "innodb_buffer_pool_size": "134217728", "innodb_file_per_table": "ON", "innodb_flush_log_at_trx_commit": "1" }) driver.get_server_status = AsyncMock(return_value={ "Innodb_buffer_pool_pages_total": "8192", "Innodb_buffer_pool_pages_free": "1000", "Innodb_buffer_pool_read_requests": "100000", "Innodb_buffer_pool_reads": "100" }) handler = InnoDBStatusToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["deadlock_info"]["has_deadlock"] is True class TestInnoDBBufferPoolToolHandler: """Tests for InnoDBBufferPoolToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = InnoDBBufferPoolToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_buffer_pool" assert "by_schema" in definition.inputSchema["properties"] assert "by_table" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool(self): """Test running buffer pool analysis.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "innodb_buffer_pool_size": "1073741824" # 1GB }) driver.get_server_status = AsyncMock(return_value={ "Innodb_buffer_pool_pages_total": "65536", "Innodb_buffer_pool_pages_free": "1000", "Innodb_buffer_pool_pages_data": "60000", "Innodb_buffer_pool_pages_dirty": "500", "Innodb_buffer_pool_pages_misc": "4036", "Innodb_buffer_pool_read_requests": "1000000", "Innodb_buffer_pool_reads": "100" }) handler = InnoDBBufferPoolToolHandler(driver) result = await handler.run_tool({"by_schema": False, "by_table": False}) parsed = json.loads(result[0].text) assert "buffer_pool_summary" in parsed assert parsed["buffer_pool_summary"]["size_gb"] == 1.0 assert parsed["buffer_pool_summary"]["hit_ratio_pct"] > 99 class TestInnoDBTransactionsToolHandler: """Tests for InnoDBTransactionsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = InnoDBTransactionsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_innodb_transactions" assert "include_queries" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_with_transactions(self): """Test transaction analysis with active transactions.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="REPEATABLE-READ") driver.execute_query = AsyncMock(side_effect=[ # First call: active transactions [ { "trx_id": "12345", "trx_state": "RUNNING", "trx_started": "2024-01-15 10:00:00", "duration_sec": 120, "trx_requested_lock_id": None, "trx_wait_started": None, "trx_weight": 10, "trx_mysql_thread_id": 100, "trx_query": "SELECT * FROM large_table", "trx_operation_state": "fetching rows", "trx_tables_in_use": 1, "trx_tables_locked": 0, "trx_lock_structs": 0, "trx_rows_locked": 0, "trx_rows_modified": 0 } ], # Second call: lock waits [] ]) driver.get_server_status = AsyncMock(return_value={ "Innodb_history_list_length": "100" }) handler = InnoDBTransactionsToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["transaction_summary"]["total_active"] == 1 assert parsed["transaction_summary"]["isolation_level"] == "REPEATABLE-READ" assert len(parsed["active_transactions"]) == 1 assert parsed["active_transactions"][0]["is_long_running"] is True # ============================================================================= # Statement Tool Tests # ============================================================================= class TestStatementAnalysisToolHandler: """Tests for StatementAnalysisToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = StatementAnalysisToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_statements" assert "order_by" in definition.inputSchema["properties"] assert "limit" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_with_performance_schema(self): """Test statement analysis with performance_schema.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="1") # performance_schema enabled driver.execute_query = AsyncMock(return_value=[ { "query": "SELECT * FROM users WHERE id = ?", "db": "testdb", "full_scan": "", "exec_count": 1000, "total_latency": "5.00 s", "avg_latency": "5.00 ms", "rows_sent": 1000, "rows_sent_avg": 1, "rows_examined": 50000, "rows_examined_avg": 50 } ]) handler = StatementAnalysisToolHandler(driver) result = await handler.run_tool({"limit": 10}) parsed = json.loads(result[0].text) assert "statements" in parsed assert len(parsed["statements"]) == 1 assert parsed["statements"][0]["exec_count"] == 1000 @pytest.mark.asyncio async def test_run_tool_disabled_performance_schema(self): """Test behavior when performance_schema is disabled.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(return_value="0") handler = StatementAnalysisToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert "error" in parsed assert "performance_schema" in parsed["error"] class TestStatementsTempTablesToolHandler: """Tests for StatementsTempTablesToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = StatementsTempTablesToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_statements_with_temp_tables" assert "disk_only" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool(self): """Test statements with temp tables analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "query": "SELECT * FROM users GROUP BY name", "db": "testdb", "exec_count": 100, "total_latency": "10.00 s", "memory_tmp_tables": 100, "disk_tmp_tables": 50, "avg_tmp_tables_per_query": 1.5 } ]) handler = StatementsTempTablesToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert "statements" in parsed assert parsed["summary"]["total_disk_tmp_tables"] == 50 assert len(parsed["recommendations"]) > 0 class TestStatementsSortingToolHandler: """Tests for StatementsSortingToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = StatementsSortingToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_statements_with_sorting" @pytest.mark.asyncio async def test_run_tool(self): """Test statements with sorting analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "query": "SELECT * FROM users ORDER BY name", "db": "testdb", "exec_count": 500, "total_latency": "30.00 s", "sort_merge_passes": 100, "avg_sort_merges": 0.2, "sorts_using_scans": 400, "sort_using_range": 100, "rows_sorted": 50000, "avg_rows_sorted": 100 } ]) handler = StatementsSortingToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["summary"]["total_sort_merge_passes"] == 100 class TestStatementsFullScansToolHandler: """Tests for StatementsFullScansToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = StatementsFullScansToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_statements_with_full_scans" @pytest.mark.asyncio async def test_run_tool(self): """Test full table scan analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "query": "SELECT * FROM users WHERE status = 1", "db": "testdb", "exec_count": 1000, "total_latency": "60.00 s", "no_index_used_count": 1000, "no_good_index_used_count": 0, "no_index_used_pct": 100.0, "rows_sent": 100, "rows_examined": 100000, "rows_sent_avg": 0.1, "rows_examined_avg": 100 } ]) handler = StatementsFullScansToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert len(parsed["statements"]) == 1 assert parsed["statements"][0]["scan_efficiency_ratio"] == 1000.0 class TestStatementErrorsToolHandler: """Tests for StatementErrorsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = StatementErrorsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_statements_with_errors" @pytest.mark.asyncio async def test_run_tool(self): """Test statement errors analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "query": "INSERT INTO users (id, name) VALUES (?, ?)", "db": "testdb", "exec_count": 1000, "total_latency": "5.00 s", "errors": 50, "error_pct": 5.0, "warnings": 100, "warning_pct": 10.0 } ]) handler = StatementErrorsToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["summary"]["total_errors"] == 50 assert parsed["summary"]["total_warnings"] == 100 # ============================================================================= # Memory Tool Tests # ============================================================================= class TestMemoryCalculationsToolHandler: """Tests for MemoryCalculationsToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = MemoryCalculationsToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "calculate_memory_usage" assert "physical_memory_gb" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool(self): """Test memory calculations.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "key_buffer_size": "134217728", # 128MB "innodb_buffer_pool_size": "1073741824", # 1GB "innodb_log_buffer_size": "16777216", # 16MB "query_cache_size": "0", "read_buffer_size": "262144", # 256KB "read_rnd_buffer_size": "524288", # 512KB "sort_buffer_size": "524288", # 512KB "join_buffer_size": "262144", # 256KB "thread_stack": "262144", "binlog_stmt_cache_size": "32768", "net_buffer_length": "16384", "tmp_table_size": "16777216", "max_heap_table_size": "16777216", "max_connections": "151" }) driver.get_server_status = AsyncMock(return_value={ "Threads_connected": "10", "Max_used_connections": "50" }) handler = MemoryCalculationsToolHandler(driver) result = await handler.run_tool({"physical_memory_gb": 16, "detailed": False}) parsed = json.loads(result[0].text) assert "server_buffers" in parsed assert "per_thread_buffers" in parsed assert "memory_summary" in parsed assert parsed["memory_summary"]["max_connections"] == 151 @pytest.mark.asyncio async def test_run_tool_memory_exceeds_physical(self): """Test detection when MySQL memory exceeds physical memory.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "key_buffer_size": "0", "innodb_buffer_pool_size": "17179869184", # 16GB "innodb_log_buffer_size": "16777216", "query_cache_size": "0", "read_buffer_size": "262144", "read_rnd_buffer_size": "524288", "sort_buffer_size": "524288", "join_buffer_size": "262144", "thread_stack": "262144", "binlog_stmt_cache_size": "32768", "net_buffer_length": "16384", "tmp_table_size": "16777216", "max_heap_table_size": "16777216", "max_connections": "500" }) driver.get_server_status = AsyncMock(return_value={ "Threads_connected": "10", "Max_used_connections": "50" }) handler = MemoryCalculationsToolHandler(driver) result = await handler.run_tool({"physical_memory_gb": 8, "detailed": False}) parsed = json.loads(result[0].text) # Should detect memory issue assert len(parsed["issues"]) > 0 class TestMemoryByHostToolHandler: """Tests for MemoryByHostToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = MemoryByHostToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_memory_by_host" assert "group_by" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool(self): """Test memory by host analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(side_effect=[ [ { "host": "localhost", "current_count_used": 1000, "current_bytes": 104857600, "current_allocated": "100.00 MB", "current_avg_alloc": "104.86 KB", "current_max_alloc": "16.00 MB", "total_allocated": "500.00 MB" } ], 104857600 # Total query ]) driver.execute_scalar = AsyncMock(return_value=104857600) handler = MemoryByHostToolHandler(driver) result = await handler.run_tool({"group_by": "host"}) parsed = json.loads(result[0].text) assert "memory_usage" in parsed class TestTableMemoryUsageToolHandler: """Tests for TableMemoryUsageToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = TableMemoryUsageToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_table_memory_usage" @pytest.mark.asyncio async def test_run_tool(self): """Test table memory usage analysis.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "table_open_cache": "4000", "table_open_cache_instances": "16", "table_definition_cache": "2000" }) driver.get_server_status = AsyncMock(return_value={ "Open_tables": "500", "Opened_tables": "1000", "Open_table_definitions": "400", "Opened_table_definitions": "500" }) driver.execute_query = AsyncMock(return_value=[]) handler = TableMemoryUsageToolHandler(driver) result = await handler.run_tool({"include_buffer_pool": False}) parsed = json.loads(result[0].text) assert "table_cache" in parsed assert parsed["table_cache"]["table_open_cache"] == 4000 # ============================================================================= # Engine Tool Tests # ============================================================================= class TestStorageEngineAnalysisToolHandler: """Tests for StorageEngineAnalysisToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = StorageEngineAnalysisToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_storage_engines" @pytest.mark.asyncio async def test_run_tool(self): """Test storage engine analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(side_effect=[ # Available engines [ { "ENGINE": "InnoDB", "SUPPORT": "DEFAULT", "COMMENT": "Supports transactions", "TRANSACTIONS": "YES", "XA": "YES", "SAVEPOINTS": "YES" }, { "ENGINE": "MyISAM", "SUPPORT": "YES", "COMMENT": "MyISAM storage engine", "TRANSACTIONS": "NO", "XA": "NO", "SAVEPOINTS": "NO" } ], # Engine stats [ { "ENGINE": "InnoDB", "table_count": 50, "total_rows": 1000000, "data_size": 1073741824, "index_size": 268435456, "total_size": 1342177280, "data_free": 10485760 }, { "ENGINE": "MyISAM", "table_count": 5, "total_rows": 10000, "data_size": 10485760, "index_size": 1048576, "total_size": 11534336, "data_free": 0 } ], # Non-InnoDB tables [], # Fragmented tables [] ]) driver.get_server_variables = AsyncMock(return_value={ "innodb_buffer_pool_size": "1073741824", "innodb_buffer_pool_instances": "1", "innodb_file_per_table": "ON", "innodb_flush_method": "O_DIRECT", "key_buffer_size": "16777216" }) driver.get_server_status = AsyncMock(return_value={ "Innodb_buffer_pool_read_requests": "1000000", "Innodb_buffer_pool_reads": "100", "Key_read_requests": "10000", "Key_reads": "10", "Key_blocks_used": "100", "Key_blocks_unused": "1000" }) handler = StorageEngineAnalysisToolHandler(driver) result = await handler.run_tool({"include_table_details": False}) parsed = json.loads(result[0].text) assert "available_engines" in parsed assert "engine_usage" in parsed assert "InnoDB" in parsed["engine_usage"] class TestFragmentedTablesToolHandler: """Tests for FragmentedTablesToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = FragmentedTablesToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_fragmented_tables" assert "min_fragmentation_pct" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool(self): """Test fragmented tables analysis.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "TABLE_SCHEMA": "testdb", "TABLE_NAME": "orders", "ENGINE": "InnoDB", "TABLE_ROWS": 100000, "DATA_LENGTH": 104857600, "INDEX_LENGTH": 26214400, "DATA_FREE": 52428800, "fragmentation_pct": 50.0 } ]) handler = FragmentedTablesToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert len(parsed["fragmented_tables"]) == 1 assert parsed["summary"]["total_wasted_space_mb"] == 50.0 class TestAutoIncrementAnalysisToolHandler: """Tests for AutoIncrementAnalysisToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = AutoIncrementAnalysisToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_auto_increment" assert "warning_threshold_pct" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_with_at_risk_table(self): """Test auto-increment analysis with at-risk table.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(return_value=[ { "TABLE_SCHEMA": "testdb", "TABLE_NAME": "users", "AUTO_INCREMENT": 2000000000, # ~93% of signed int "COLUMN_NAME": "id", "COLUMN_TYPE": "int", "DATA_TYPE": "int" } ]) handler = AutoIncrementAnalysisToolHandler(driver) result = await handler.run_tool({"warning_threshold_pct": 75}) parsed = json.loads(result[0].text) assert len(parsed["at_risk_tables"]) == 1 assert parsed["at_risk_tables"][0]["usage_pct"] > 75 # ============================================================================= # Replication Tool Tests # ============================================================================= class TestReplicationStatusToolHandler: """Tests for ReplicationStatusToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = ReplicationStatusToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_replication_status" @pytest.mark.asyncio async def test_run_tool_as_master(self): """Test replication status on master.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "log_bin": "ON", "server_id": "1", "binlog_format": "ROW", "gtid_mode": "ON" }) driver.get_server_status = AsyncMock(return_value={}) driver.execute_query = AsyncMock(side_effect=[ # SHOW BINARY LOG STATUS / SHOW MASTER STATUS [ { "File": "mysql-bin.000001", "Position": 1234567, "Binlog_Do_DB": "", "Binlog_Ignore_DB": "", "Executed_Gtid_Set": "uuid:1-100" } ], # SHOW BINARY LOGS [ {"Log_name": "mysql-bin.000001", "File_size": 104857600} ], # SHOW REPLICAS / SHOW SLAVE HOSTS [ { "Server_id": 2, "Host": "replica1", "Port": 3306, "Replica_UUID": "uuid-replica-1" } ], # SHOW REPLICA STATUS / SHOW SLAVE STATUS [] ]) handler = ReplicationStatusToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["is_master"] is True assert parsed["master_status"]["file"] == "mysql-bin.000001" @pytest.mark.asyncio async def test_run_tool_as_slave(self): """Test replication status on slave.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "log_bin": "OFF", "server_id": "2" }) driver.get_server_status = AsyncMock(return_value={}) driver.execute_query = AsyncMock(side_effect=[ # SHOW REPLICA STATUS / SHOW SLAVE STATUS [ { "Channel_Name": "", "Master_Host": "master1", "Master_Port": 3306, "Master_User": "repl", "Slave_IO_Running": "Yes", "Slave_SQL_Running": "Yes", "Seconds_Behind_Master": 0, "Last_IO_Error": "", "Last_SQL_Error": "", "Relay_Log_File": "relay.000001", "Relay_Log_Pos": 12345, "Master_Log_File": "mysql-bin.000001", "Read_Master_Log_Pos": 1234567, "Exec_Master_Log_Pos": 1234567, "Executed_Gtid_Set": "uuid:1-100", "Auto_Position": 1 } ] ]) handler = ReplicationStatusToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["is_slave"] is True assert len(parsed["slave_status"]) == 1 assert parsed["slave_status"][0]["io_running"] == "Yes" class TestGaleraClusterToolHandler: """Tests for GaleraClusterToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = GaleraClusterToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_galera_status" @pytest.mark.asyncio async def test_run_tool_galera_enabled(self): """Test Galera status when enabled.""" driver = create_mock_sql_driver() driver.get_server_status = AsyncMock(return_value={ "wsrep_on": "ON", "wsrep_ready": "ON", "wsrep_connected": "ON", "wsrep_cluster_name": "my_cluster", "wsrep_cluster_size": "3", "wsrep_cluster_state_uuid": "uuid-123", "wsrep_cluster_status": "Primary", "wsrep_local_state": "4", "wsrep_local_state_comment": "Synced", "wsrep_node_name": "node1", "wsrep_local_recv_queue": "0", "wsrep_local_recv_queue_avg": "0.0", "wsrep_local_send_queue": "0", "wsrep_local_send_queue_avg": "0.0", "wsrep_local_cert_failures": "0", "wsrep_local_bf_aborts": "0", "wsrep_flow_control_paused": "0.0", "wsrep_flow_control_sent": "0", "wsrep_flow_control_recv": "0" }) handler = GaleraClusterToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["is_galera"] is True assert parsed["cluster_status"]["cluster_size"] == 3 @pytest.mark.asyncio async def test_run_tool_galera_disabled(self): """Test Galera status when disabled.""" driver = create_mock_sql_driver() driver.get_server_status = AsyncMock(return_value={}) handler = GaleraClusterToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["is_galera"] is False class TestGroupReplicationToolHandler: """Tests for GroupReplicationToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = GroupReplicationToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "get_group_replication_status" @pytest.mark.asyncio async def test_run_tool_gr_enabled(self): """Test Group Replication status when enabled.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "group_replication_group_name": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "group_replication_single_primary_mode": "ON" }) driver.execute_query = AsyncMock(side_effect=[ # Members query [ { "CHANNEL_NAME": "group_replication_applier", "MEMBER_ID": "uuid-1", "MEMBER_HOST": "node1", "MEMBER_PORT": 3306, "MEMBER_STATE": "ONLINE", "MEMBER_ROLE": "PRIMARY", "MEMBER_VERSION": "8.0.32" }, { "CHANNEL_NAME": "group_replication_applier", "MEMBER_ID": "uuid-2", "MEMBER_HOST": "node2", "MEMBER_PORT": 3306, "MEMBER_STATE": "ONLINE", "MEMBER_ROLE": "SECONDARY", "MEMBER_VERSION": "8.0.32" } ], # Local stats query [ { "MEMBER_ID": "uuid-1", "COUNT_TRANSACTIONS_IN_QUEUE": 0, "COUNT_TRANSACTIONS_CHECKED": 1000, "COUNT_CONFLICTS_DETECTED": 0, "COUNT_TRANSACTIONS_ROWS_VALIDATING": 0 } ] ]) handler = GroupReplicationToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["is_group_replication"] is True assert len(parsed["members"]) == 2 # ============================================================================= # Security Tool Tests # ============================================================================= class TestSecurityAnalysisToolHandler: """Tests for SecurityAnalysisToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = SecurityAnalysisToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_security" assert "include_user_list" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_secure_setup(self): """Test security analysis on secure setup.""" driver = create_mock_sql_driver() # Mock all security check queries driver.execute_query = AsyncMock(side_effect=[ [], # No anonymous users [], # No users without password [], # No root remote access [], # No dangerous privileges [], # No wildcard hosts [], # No test databases ]) driver.execute_scalar = AsyncMock(side_effect=[ 1, # Root exists ]) driver.get_server_variables = AsyncMock(return_value={ "validate_password.policy": "MEDIUM", "validate_password.length": "8", "have_ssl": "YES", "require_secure_transport": "ON", "tls_version": "TLSv1.2,TLSv1.3" }) driver.get_server_status = AsyncMock(return_value={ "Ssl_accepts": "1000", "Ssl_finished_accepts": "995" }) handler = SecurityAnalysisToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert "security_score" in parsed assert parsed["password_policy"]["enabled"] is True @pytest.mark.asyncio async def test_run_tool_insecure_setup(self): """Test security analysis detects issues.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(side_effect=[ [{"User": "", "Host": "%"}], # Anonymous user found [{"User": "testuser", "Host": "%", "plugin": "mysql_native_password"}], # User without password [{"User": "root", "Host": "%"}], # Root remote access [{"User": "admin", "Host": "%", "Super_priv": "Y", "File_priv": "N", "Process_priv": "N", "Shutdown_priv": "N", "Grant_priv": "N"}], # Dangerous privileges [{"User": "app", "Host": "%"}], # Wildcard host [{"SCHEMA_NAME": "test"}], # Test database exists ]) driver.execute_scalar = AsyncMock(return_value=1) driver.get_server_variables = AsyncMock(return_value={ "have_ssl": "NO" }) driver.get_server_status = AsyncMock(return_value={}) handler = SecurityAnalysisToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["security_score"] < 100 assert len(parsed["issues"]) > 0 class TestUserPrivilegesToolHandler: """Tests for UserPrivilegesToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = UserPrivilegesToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "analyze_user_privileges" assert "username" in definition.inputSchema["properties"] @pytest.mark.asyncio async def test_run_tool_specific_user(self): """Test privilege analysis for specific user.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(side_effect=[ # Global privileges [ { "User": "testuser", "Host": "%", "Select_priv": "Y", "Insert_priv": "Y", "Update_priv": "N", "Super_priv": "N" } ], # Database privileges [ { "Db": "testdb", "Select_priv": "Y", "Insert_priv": "Y" } ], # Table privileges [] ]) handler = UserPrivilegesToolHandler(driver) result = await handler.run_tool({"username": "testuser", "hostname": "%"}) parsed = json.loads(result[0].text) assert len(parsed["users"]) == 1 assert parsed["users"][0]["user"] == "testuser" class TestAuditLogToolHandler: """Tests for AuditLogToolHandler.""" def test_tool_definition(self): """Test tool definition.""" driver = create_mock_sql_driver() handler = AuditLogToolHandler(driver) definition = handler.get_tool_definition() assert definition.name == "check_audit_log" @pytest.mark.asyncio async def test_run_tool_audit_enabled(self): """Test audit log check with MySQL Enterprise Audit.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={ "audit_log_file": "/var/log/mysql/audit.log", "audit_log_format": "JSON", "audit_log_policy": "ALL" }) driver.execute_query = AsyncMock(return_value=[ {"PLUGIN_NAME": "audit_log", "PLUGIN_STATUS": "ACTIVE"} ]) handler = AuditLogToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["audit_enabled"] is True assert parsed["audit_plugin"] == "MySQL Enterprise Audit" @pytest.mark.asyncio async def test_run_tool_audit_disabled(self): """Test audit log check when disabled.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(return_value={}) driver.execute_query = AsyncMock(return_value=[]) handler = AuditLogToolHandler(driver) result = await handler.run_tool({}) parsed = json.loads(result[0].text) assert parsed["audit_enabled"] is False assert len(parsed["recommendations"]) > 0 class TestErrorHandling: """Tests for error handling across all handlers.""" @pytest.mark.asyncio async def test_innodb_status_error_handling(self): """Test InnoDB status error handling.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(side_effect=Exception("Connection lost")) handler = InnoDBStatusToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 assert "error" in result[0].text.lower() @pytest.mark.asyncio async def test_statement_analysis_error_handling(self): """Test statement analysis error handling.""" driver = create_mock_sql_driver() driver.execute_scalar = AsyncMock(side_effect=Exception("Access denied")) handler = StatementAnalysisToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 assert "error" in result[0].text.lower() @pytest.mark.asyncio async def test_memory_calculation_error_handling(self): """Test memory calculation error handling.""" driver = create_mock_sql_driver() driver.get_server_variables = AsyncMock(side_effect=Exception("Timeout")) handler = MemoryCalculationsToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 assert "error" in result[0].text.lower() @pytest.mark.asyncio async def test_security_analysis_error_handling(self): """Test security analysis error handling.""" driver = create_mock_sql_driver() driver.execute_query = AsyncMock(side_effect=Exception("Permission denied")) handler = SecurityAnalysisToolHandler(driver) result = await handler.run_tool({}) assert len(result) == 1 assert "error" in result[0].text.lower() class TestAnnotations: """Tests for tool annotations.""" def test_all_handlers_have_annotations(self): """Test that all handlers have proper annotations.""" driver = create_mock_sql_driver() handlers = [ InnoDBStatusToolHandler(driver), InnoDBBufferPoolToolHandler(driver), InnoDBTransactionsToolHandler(driver), StatementAnalysisToolHandler(driver), StatementsTempTablesToolHandler(driver), StatementsSortingToolHandler(driver), StatementsFullScansToolHandler(driver), StatementErrorsToolHandler(driver), MemoryCalculationsToolHandler(driver), MemoryByHostToolHandler(driver), TableMemoryUsageToolHandler(driver), StorageEngineAnalysisToolHandler(driver), FragmentedTablesToolHandler(driver), AutoIncrementAnalysisToolHandler(driver), ReplicationStatusToolHandler(driver), GaleraClusterToolHandler(driver), GroupReplicationToolHandler(driver), SecurityAnalysisToolHandler(driver), UserPrivilegesToolHandler(driver), AuditLogToolHandler(driver), ] for handler in handlers: annotations = handler.get_annotations() assert "title" in annotations assert "readOnlyHint" in annotations assert "destructiveHint" in annotations assert annotations["readOnlyHint"] is True assert annotations["destructiveHint"] is False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/MySQL-Performance-Tuner-Mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server