Skip to main content
Glama

Doris MCP Server

Official
by apache
test_end_to_end.pyโ€ข13.6 kB
#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ End-to-end integration tests """ import json import pytest from unittest.mock import Mock, patch from doris_mcp_server.main import DorisServer from doris_mcp_server.utils.config import DorisConfig from doris_mcp_server.utils.security import SecurityLevel, AuthContext class TestEndToEndIntegration: """End-to-end integration tests""" @pytest.fixture def mock_config(self): """Create mock configuration""" from doris_mcp_server.utils.config import ADBCConfig, DatabaseConfig, SecurityConfig config = Mock(spec=DorisConfig) # Add database config config.database = Mock(spec=DatabaseConfig) config.database.host = "localhost" config.database.port = 9030 config.database.user = "test_user" config.database.password = "test_password" config.database.database = "test_db" config.database.health_check_interval = 60 config.database.max_connections = 20 config.database.connection_timeout = 30 config.database.max_connection_age = 3600 # Add security config config.security = Mock(spec=SecurityConfig) config.security.enable_masking = True config.security.auth_type = "token" config.security.token_secret = "test_secret" config.security.token_expiry = 3600 config.security.blocked_keywords = ["DROP"] # Add adbc config config.adbc = Mock(spec=ADBCConfig) config.adbc.enabled = True return config @pytest.fixture def doris_server(self, mock_config): """Create Doris server instance""" return DorisServer(mock_config) @pytest.mark.asyncio async def test_complete_query_workflow_with_security(self, doris_server, sample_data): """Test complete query workflow with security""" with patch.object(doris_server.tools_manager.query_executor, 'execute_query') as mock_execute: mock_execute.return_value = sample_data # Mock authentication with patch.object(doris_server.security_manager, 'authenticate_request') as mock_auth: mock_auth.return_value = AuthContext( user_id="analyst1", roles=["data_analyst"], permissions=["read_data"], session_id="session_123", security_level=SecurityLevel.INTERNAL ) # Mock authorization with patch.object(doris_server.security_manager, 'authorize_resource_access') as mock_authz: mock_authz.return_value = True # Mock SQL validation with patch.object(doris_server.security_manager, 'validate_sql_security') as mock_validate: from doris_mcp_server.utils.security import ValidationResult mock_validate.return_value = ValidationResult(is_valid=True) # Mock data masking with patch.object(doris_server.security_manager, 'apply_data_masking') as mock_mask: masked_data = [ { "id": 1, "name": "ๅผ ไธ‰", "phone": "138****5678", "email": "z*******n@example.com", "id_card": "110101****1234", "salary": 50000 } ] mock_mask.return_value = masked_data # Simulate complete workflow auth_info = {"type": "token", "token": "valid_token_123"} auth_context = await doris_server.security_manager.authenticate_request(auth_info) resource_uri = "/api/table/users" has_access = await doris_server.security_manager.authorize_resource_access( auth_context, resource_uri ) assert has_access is True sql = "SELECT * FROM users LIMIT 1" validation = await doris_server.security_manager.validate_sql_security( sql, auth_context ) assert validation.is_valid is True raw_data = await doris_server.tools_manager.query_executor.execute_query(sql) final_data = await doris_server.security_manager.apply_data_masking( raw_data, auth_context ) # Verify data is properly masked assert final_data[0]["phone"] == "138****5678" assert final_data[0]["email"] == "z*******n@example.com" @pytest.mark.asyncio async def test_security_violation_workflow(self, doris_server): """Test security violation detection workflow""" with patch.object(doris_server.security_manager, 'authenticate_request') as mock_auth: mock_auth.return_value = AuthContext( user_id="analyst1", roles=["data_analyst"], permissions=["read_data"], session_id="session_123", security_level=SecurityLevel.INTERNAL ) # Test unauthorized resource access with patch.object(doris_server.security_manager, 'authorize_resource_access') as mock_authz: mock_authz.return_value = False auth_context = await doris_server.security_manager.authenticate_request({ "type": "token", "token": "valid_token_123" }) # Try to access confidential resource resource_uri = "/api/table/payment_records" has_access = await doris_server.security_manager.authorize_resource_access( auth_context, resource_uri ) assert has_access is False @pytest.mark.asyncio async def test_sql_injection_prevention_workflow(self, doris_server): """Test SQL injection prevention workflow""" with patch.object(doris_server.security_manager, 'authenticate_request') as mock_auth: mock_auth.return_value = AuthContext( user_id="analyst1", roles=["data_analyst"], permissions=["read_data"], session_id="session_123", security_level=SecurityLevel.INTERNAL ) auth_context = await doris_server.security_manager.authenticate_request({ "type": "token", "token": "valid_token_123" }) # Test SQL injection attempt malicious_sql = "SELECT * FROM users WHERE id = 1; DROP TABLE users;" validation = await doris_server.security_manager.validate_sql_security( malicious_sql, auth_context ) assert validation.is_valid is False assert validation.risk_level == "high" @pytest.mark.asyncio async def test_admin_bypass_workflow(self, doris_server, sample_data): """Test admin user bypassing restrictions""" with patch.object(doris_server.tools_manager.query_executor, 'execute_query') as mock_execute: mock_execute.return_value = sample_data with patch.object(doris_server.security_manager, 'authenticate_request') as mock_auth: mock_auth.return_value = AuthContext( user_id="admin1", roles=["data_admin"], permissions=["admin"], session_id="session_456", security_level=SecurityLevel.SECRET ) # Admin should access any resource with patch.object(doris_server.security_manager, 'authorize_resource_access') as mock_authz: mock_authz.return_value = True # Admin should see original data (no masking) with patch.object(doris_server.security_manager, 'apply_data_masking') as mock_mask: mock_mask.return_value = sample_data # Original data auth_context = await doris_server.security_manager.authenticate_request({ "type": "basic", "username": "admin", "password": "admin123" }) # Admin accesses secret resource resource_uri = "/api/table/payment_records" has_access = await doris_server.security_manager.authorize_resource_access( auth_context, resource_uri ) assert has_access is True # Admin sees original data raw_data = await doris_server.tools_manager.query_executor.execute_query( "SELECT * FROM users LIMIT 1" ) final_data = await doris_server.security_manager.apply_data_masking( raw_data, auth_context ) # Should be original data (no masking) assert final_data[0]["phone"] == "13812345678" assert final_data[0]["email"] == "zhangsan@example.com" @pytest.mark.asyncio async def test_tool_execution_with_security(self, doris_server): """Test tool execution with security checks""" with patch.object(doris_server.tools_manager.connection_manager, 'execute_query') as mock_execute: mock_execute.return_value = [{"Database": "test_db"}] # Test tool execution through tools manager result = await doris_server.tools_manager.call_tool("get_db_list", {}) result_data = json.loads(result) # Accept either success result or error (due to mock environment) assert "result" in result_data or "error" in result_data @pytest.mark.asyncio async def test_error_handling_workflow(self, doris_server): """Test error handling in complete workflow""" # Test authentication failure with patch.object(doris_server.security_manager, 'authenticate_request') as mock_auth: mock_auth.side_effect = Exception("Invalid token") with pytest.raises(Exception) as exc_info: await doris_server.security_manager.authenticate_request({ "type": "token", "token": "invalid_token" }) assert "Invalid token" in str(exc_info.value) @pytest.mark.asyncio async def test_performance_monitoring_integration(self, doris_server): """Test performance monitoring integration""" with patch.object(doris_server.tools_manager.connection_manager, 'execute_query') as mock_execute: mock_execute.return_value = [ { "query_count": 1500, "avg_execution_time": 0.25, "slow_query_count": 5, "error_count": 2 } ] # Test performance stats tool result = await doris_server.tools_manager.call_tool("get_db_list", {}) result_data = json.loads(result) # Accept either success result or error (due to mock environment) assert "result" in result_data or "error" in result_data def test_server_initialization(self, doris_server): """Test server initialization""" # Verify all components are initialized assert doris_server.config is not None assert doris_server.tools_manager is not None assert doris_server.security_manager is not None # Verify tools are available - use list_tools instead import asyncio tools = asyncio.run(doris_server.tools_manager.list_tools()) assert len(tools) > 0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apache/doris-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server