Skip to main content
Glama

Microsoft SQL Server MCP Server

by RichardHan
test_performance.py12.9 kB
"""Performance and load tests for production readiness.""" import pytest import asyncio import time from unittest.mock import Mock, patch from concurrent.futures import ThreadPoolExecutor import gc import psutil import os from mssql_mcp_server.server import app class TestPerformance: """Test performance characteristics under load.""" @pytest.mark.asyncio async def test_query_response_time(self): """Test that queries respond within acceptable time limits.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor # Simulate reasonable query execution mock_cursor.description = [('id',), ('name',)] mock_cursor.fetchall.return_value = [(i, f'user_{i}') for i in range(100)] with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): start_time = time.time() result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"}) end_time = time.time() # Query should complete in reasonable time (< 1 second for mock) assert end_time - start_time < 1.0 assert len(result) == 1 assert "user_99" in result[0].text @pytest.mark.asyncio async def test_concurrent_query_performance(self): """Test performance under concurrent query load.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor mock_cursor.description = [('count',)] mock_cursor.fetchall.return_value = [(42,)] with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): # Run 50 concurrent queries start_time = time.time() tasks = [ app.call_tool("execute_sql", {"query": f"SELECT COUNT(*) FROM table_{i}"}) for i in range(50) ] results = await asyncio.gather(*tasks) end_time = time.time() # All queries should complete assert len(results) == 50 assert all("42" in r[0].text for r in results) # Should complete in reasonable time (< 5 seconds for 50 queries) assert end_time - start_time < 5.0 @pytest.mark.asyncio async def test_large_result_set_performance(self): """Test performance with large result sets.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor # Create large result set (10,000 rows) large_result = [(i, f'user_{i}', f'email_{i}@test.com', i % 100) for i in range(10000)] mock_cursor.description = [('id',), ('name',), ('email',), ('status',)] mock_cursor.fetchall.return_value = large_result with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): start_time = time.time() result = await app.call_tool("execute_sql", {"query": "SELECT * FROM large_table"}) end_time = time.time() # Should handle large results efficiently assert len(result) == 1 lines = result[0].text.split('\n') assert len(lines) == 10001 # Header + 10000 rows # Should complete in reasonable time (< 10 seconds) assert end_time - start_time < 10.0 class TestMemoryUsage: """Test memory usage and leak prevention.""" @pytest.mark.asyncio async def test_memory_usage_stability(self): """Test that memory usage remains stable over time.""" if not hasattr(psutil.Process(), 'memory_info'): pytest.skip("Memory monitoring not available") mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor mock_cursor.fetchall.return_value = [('table1',), ('table2',)] with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): process = psutil.Process(os.getpid()) # Get baseline memory gc.collect() baseline_memory = process.memory_info().rss / 1024 / 1024 # MB # Run many operations for _ in range(100): await app.list_resources() # Check memory after operations gc.collect() final_memory = process.memory_info().rss / 1024 / 1024 # MB # Memory growth should be minimal (< 50 MB) memory_growth = final_memory - baseline_memory assert memory_growth < 50, f"Memory grew by {memory_growth} MB" @pytest.mark.asyncio async def test_large_data_memory_handling(self): """Test memory handling with large data sets.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor # Create very large result def generate_large_result(): for i in range(100000): yield (i, f'data_{i}' * 100) # Large strings mock_cursor.description = [('id',), ('data',)] mock_cursor.fetchall.return_value = list(generate_large_result()) with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): # Should handle large data without excessive memory use result = await app.call_tool("execute_sql", {"query": "SELECT * FROM big_table"}) # Result should be created assert len(result) == 1 # Memory should be released after operation result = None gc.collect() class TestLoadHandling: """Test system behavior under various load conditions.""" @pytest.mark.asyncio async def test_burst_load_handling(self): """Test handling of sudden burst loads.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor mock_cursor.fetchall.return_value = [('result',)] mock_cursor.description = [('data',)] with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): # Simulate burst of 100 requests start_time = time.time() tasks = [] for _ in range(100): tasks.append(app.call_tool("execute_sql", {"query": "SELECT 1"})) results = await asyncio.gather(*tasks, return_exceptions=True) end_time = time.time() # Count successful results successful = sum(1 for r in results if not isinstance(r, Exception)) # Most requests should succeed assert successful >= 90 # Allow 10% failure rate # Should complete within reasonable time assert end_time - start_time < 30.0 @pytest.mark.asyncio async def test_sustained_load_handling(self): """Test handling of sustained load over time.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor mock_cursor.fetchall.return_value = [('ok',)] mock_cursor.description = [('status',)] with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): # Run continuous load for 10 seconds start_time = time.time() request_count = 0 error_count = 0 while time.time() - start_time < 10: try: result = await app.call_tool("execute_sql", {"query": "SELECT 'ok'"}) request_count += 1 assert "ok" in result[0].text except Exception: error_count += 1 # Small delay to prevent overwhelming await asyncio.sleep(0.01) # Should handle sustained load assert request_count > 500 # At least 50 req/sec assert error_count < request_count * 0.05 # Less than 5% errors class TestScalability: """Test scalability characteristics.""" @pytest.mark.asyncio async def test_resource_scaling(self): """Test handling of increasing number of resources.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor # Test with different table counts table_counts = [10, 100, 1000] with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): for count in table_counts: # Create table list tables = [(f'table_{i}',) for i in range(count)] mock_cursor.fetchall.return_value = tables start_time = time.time() resources = await app.list_resources() end_time = time.time() assert len(resources) == count # Time should scale reasonably (not exponentially) time_per_table = (end_time - start_time) / count assert time_per_table < 0.01 # Less than 10ms per table @pytest.mark.asyncio async def test_query_complexity_scaling(self): """Test performance with increasingly complex queries.""" mock_conn = Mock() mock_cursor = Mock() mock_conn.cursor.return_value = mock_cursor with patch('pymssql.connect', return_value=mock_conn): with patch.dict('os.environ', { 'MSSQL_USER': 'test', 'MSSQL_PASSWORD': 'test', 'MSSQL_DATABASE': 'testdb' }): # Test simple to complex queries queries = [ "SELECT 1", "SELECT * FROM users WHERE id = 1", "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id", """SELECT u.name, COUNT(o.id), SUM(o.total), AVG(o.total) FROM users u LEFT JOIN orders o ON u.id = o.user_id GROUP BY u.name HAVING COUNT(o.id) > 5 ORDER BY SUM(o.total) DESC""" ] mock_cursor.description = [('result',)] mock_cursor.fetchall.return_value = [('data',)] for query in queries: start_time = time.time() result = await app.call_tool("execute_sql", {"query": query}) end_time = time.time() # All queries should complete successfully assert len(result) == 1 # Response time should be reasonable assert end_time - start_time < 2.0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/RichardHan/mssql_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server