Skip to main content
Glama
test_security_integration.py19.9 kB
# # Copyright (C) 2024 Billy Bryant # Portions copyright (C) 2024 Sergey Parfenyuk (original MIT-licensed author) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # MIT License attribution: Portions of this file were originally licensed # under the MIT License by Sergey Parfenyuk (2024). # """Integration security tests for MCP Foxxy Bridge. This module provides end-to-end integration tests for security features including configuration loading, token management, and command execution in realistic scenarios. """ import base64 import json import os import tempfile from pathlib import Path from typing import Any from unittest.mock import patch import pytest from mcp_foxxy_bridge.config.config_loader import ( ConfigLoader, expand_env_vars, load_bridge_config_from_file, ) from mcp_foxxy_bridge.oauth import utils as oauth_utils class TestConfigurationSecurity: """Test security in configuration loading scenarios.""" def test_config_with_environment_variables_only(self) -> None: """Test configuration loading with only environment variables (no command substitution).""" config_data = { "mcpServers": { "test-server": { "command": "python", "args": ["-m", "server"], "env": {"API_KEY": "${API_SECRET}", "DATABASE_URL": "${DB_URL:sqlite:///default.db}"}, } } } with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(config_data, f, indent=2) config_file = f.name try: with patch.dict(os.environ, {"API_SECRET": "secret123", "DB_URL": "postgresql://localhost"}): config = load_bridge_config_from_file(config_file, {}) server = config.servers["test-server"] assert server.env is not None, "Server env should not be None" assert server.env["API_KEY"] == "secret123" assert server.env["DATABASE_URL"] == "postgresql://localhost" finally: Path(config_file).unlink() def test_config_with_disabled_command_substitution(self) -> None: """Test that command substitution is left unchanged when disabled.""" config_data = { "mcpServers": { "test-server": { "command": "python", "args": ["$(echo dangerous_command)"], "env": {"TOKEN": "$(vault read -field=token secret/api)", "SAFE_VAR": "${SAFE_VALUE:default}"}, } } } with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(config_data, f, indent=2) config_file = f.name try: # Command substitution disabled by default with patch.dict(os.environ, {"SAFE_VALUE": "safe123"}, clear=True): config = load_bridge_config_from_file(config_file, {}) server = config.servers["test-server"] # Command substitution should be left unchanged assert server.args is not None, "Server args should not be None" assert server.args[0] == "$(echo dangerous_command)" assert server.env is not None, "Server env should not be None" assert server.env["TOKEN"] == "$(vault read -field=token secret/api)" # Environment variables should still work assert server.env["SAFE_VAR"] == "safe123" finally: Path(config_file).unlink() def test_config_with_enabled_command_substitution_safe_commands(self) -> None: """Test command substitution with safe commands when explicitly enabled.""" config_data = { "mcpServers": { "test-server": { "command": "python", "args": ["-m", "server"], "env": {"CURRENT_USER": "$(whoami)", "TIMESTAMP": "$(date +%Y-%m-%d)", "HOSTNAME": "$(hostname)"}, } } } with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(config_data, f, indent=2) config_file = f.name try: with patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}): config = load_bridge_config_from_file(config_file, {}) server = config.servers["test-server"] # These should have been executed assert server.env is not None, "Server env should not be None" assert server.env["CURRENT_USER"] != "$(whoami)" assert server.env["TIMESTAMP"] != "$(date +%Y-%m-%d)" assert server.env["HOSTNAME"] != "$(hostname)" # Verify they're actual values assert len(server.env["CURRENT_USER"]) > 0 assert len(server.env["TIMESTAMP"]) > 0 assert len(server.env["HOSTNAME"]) > 0 finally: Path(config_file).unlink() def test_config_with_dangerous_command_substitution(self) -> None: """Test that dangerous command substitution is blocked even when enabled.""" config_data = { "mcpServers": { "test-server": { "command": "python", "args": ["-m", "server"], "env": {"DANGEROUS": "$(rm -rf /tmp/test)"}, } } } with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(config_data, f, indent=2) config_file = f.name try: with patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}): config = load_bridge_config_from_file(config_file, {}) server = config.servers["test-server"] # Dangerous command should be left unchanged (failed to execute) assert server.env is not None, "Server env should not be None" assert server.env["DANGEROUS"] == "$(rm -rf /tmp/test)" finally: Path(config_file).unlink() def test_config_loader_validation(self) -> None: """Test ConfigLoader validation functionality.""" config_data = {"mcpServers": {"valid-server": {"command": "python", "args": ["-m", "server"]}}} with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(config_data, f, indent=2) config_file = f.name try: loader = ConfigLoader(config_file) errors = loader.validate_config() assert len(errors) == 0 # Should be valid # Test loading works config = loader.load_config() assert "valid-server" in config.servers finally: Path(config_file).unlink() def test_config_validation_with_invalid_data(self) -> None: """Test configuration validation catches security issues.""" # Invalid config with missing required fields invalid_config = { "mcpServers": { "invalid-server": { # Missing required 'command' field "args": ["-m", "server"] } } } with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(invalid_config, f, indent=2) config_file = f.name try: loader = ConfigLoader(config_file) errors = loader.validate_config() # Should have validation errors due to missing command assert len(errors) > 0 finally: Path(config_file).unlink() class TestOAuthIntegrationSecurity: """Test OAuth security in integration scenarios.""" @patch("mcp_foxxy_bridge.oauth.utils.keyring") @patch("mcp_foxxy_bridge.oauth.utils.ENCRYPTION_AVAILABLE", new=True) def test_complete_oauth_flow_with_encryption(self, mock_keyring: Any) -> None: """Test complete OAuth flow with encryption enabled.""" with ( tempfile.TemporaryDirectory() as temp_dir, patch("mcp_foxxy_bridge.oauth.utils.get_config_dir") as mock_get_config, ): config_dir = Path(temp_dir) / "auth" config_dir.mkdir(parents=True) mock_get_config.return_value = config_dir # Mock encryption key test_key = os.urandom(32) encoded_key = base64.b64encode(test_key).decode() mock_keyring.get_password.return_value = encoded_key mock_keyring.set_password.return_value = None server_name = "github-api" server_url_hash = "gh123456" # Simulate OAuth flow data client_info = { "client_id": "github_client_123", "client_secret": "github_secret_456", "redirect_uri": "http://localhost:8090/callback", } tokens = { "access_token": "gho_1234567890abcdef", "refresh_token": "ghr_abcdef1234567890", "token_type": "bearer", "expires_in": 3600, } code_verifier = "random_code_verifier_123456" # Save all OAuth data oauth_utils.save_client_info(server_url_hash, client_info, server_name) oauth_utils.save_tokens(server_url_hash, tokens, server_name) oauth_utils.save_code_verifier(server_url_hash, code_verifier, server_name) # Verify all data can be loaded correctly loaded_client = oauth_utils.load_client_info(server_url_hash, server_name) loaded_tokens = oauth_utils.load_tokens(server_url_hash, server_name) loaded_verifier = oauth_utils.load_code_verifier(server_url_hash, server_name) assert loaded_client == client_info assert loaded_tokens is not None, "Loaded tokens should not be None" assert loaded_tokens["access_token"] == tokens["access_token"] assert loaded_tokens["refresh_token"] == tokens["refresh_token"] assert loaded_verifier == code_verifier # Verify encryption was used for tokens tokens_path = oauth_utils.get_tokens_path(server_url_hash, server_name) with tokens_path.open() as f: file_content = json.load(f) assert file_content.get("encrypted") is True assert "gho_1234567890abcdef" not in json.dumps(file_content) def test_oauth_with_server_name_validation(self) -> None: """Test OAuth functions properly validate server names.""" server_url_hash = "test123" # Valid server names should work valid_names = ["github-api", "slack_bot", "MyApp123"] for name in valid_names: try: oauth_utils.get_tokens_path(server_url_hash, name) # Should not raise exception except ValueError: pytest.fail(f"Valid server name '{name}' was rejected") # Invalid server names should be rejected (path traversal patterns only) invalid_names = ["../etc/passwd", "server/path", "server\\windows"] for name in invalid_names: with pytest.raises(ValueError, match="Server name"): oauth_utils.get_tokens_path(server_url_hash, name) # This name gets sanitized rather than rejected sanitized_path = oauth_utils.get_tokens_path(server_url_hash, "server<script>") assert "serverscript" in str(sanitized_path) def test_oauth_migration_from_legacy_format(self) -> None: """Test OAuth data migration from legacy format with security validation.""" with ( tempfile.TemporaryDirectory() as temp_dir, patch("mcp_foxxy_bridge.oauth.utils.get_oauth_config_dir") as mock_get_config, ): config_dir = Path(temp_dir) mock_get_config.return_value = config_dir server_url_hash = "legacy123" server_name = "migrated-server" # Create legacy token file legacy_tokens = {"access_token": "legacy_token_123", "refresh_token": "legacy_refresh_456"} legacy_path = config_dir / f"tokens-{server_url_hash}.json" with legacy_path.open("w") as f: json.dump(legacy_tokens, f) # Load tokens with server name (should trigger migration) with patch("mcp_foxxy_bridge.oauth.utils.ENCRYPTION_AVAILABLE", new=False): loaded_tokens = oauth_utils.load_tokens(server_url_hash, server_name) assert loaded_tokens is not None assert loaded_tokens["access_token"] == legacy_tokens["access_token"] # Verify legacy file was removed assert not legacy_path.exists() # Verify new file exists in correct location new_path = oauth_utils.get_tokens_path(server_url_hash, server_name) assert new_path.exists() class TestSecurityBoundaries: """Test security boundaries and edge cases.""" def test_mixed_environment_and_command_substitution(self) -> None: """Test mixed environment variables and command substitution.""" test_cases = [ ("${HOME}/$(whoami)", "Environment variable should work, command should be blocked"), ("$(echo ${HOME})", "Environment expansion within command should be handled safely"), ("${API_KEY:$(vault read secret)}", "Default command substitution should be blocked"), ] with patch.dict(os.environ, {"HOME": "/home/user"}, clear=True): for test_input, description in test_cases: # With command substitution disabled (default) result = expand_env_vars(test_input) # Environment variables should expand, commands should not assert "${HOME}" not in result or "$(whoami)" in result, f"Failed: {description}" def test_nested_security_bypass_attempts(self) -> None: """Test attempts to bypass security through nesting.""" bypass_attempts = [ "$(echo '$(rm -rf /)')", # Nested command substitution "$(echo 'rm -rf /' | sh)", # Command piping "${PATH:$(malicious_command)}", # Command in default value "$(printf '%s' '${DANGEROUS_VAR}')", # Environment var in command ] with patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}): for attempt in bypass_attempts: result = expand_env_vars(attempt) # These dangerous patterns should either be left unchanged (failed to execute) # or not contain the literal dangerous command # The key security test is that the dangerous commands are not actually executed if "$(rm -rf" in attempt: # If the command contains rm -rf it should have failed to parse/execute and be left unchanged assert result == attempt, f"Command was partially processed: {attempt} -> {result}" def test_file_permission_security(self) -> None: """Test that OAuth files have correct permissions.""" with ( tempfile.TemporaryDirectory() as temp_dir, patch("mcp_foxxy_bridge.oauth.utils.get_config_dir") as mock_get_config, ): config_dir = Path(temp_dir) mock_get_config.return_value = config_dir server_name = "test-server" server_url_hash = "test123" # Create test files test_data = {"test": "data"} oauth_utils.save_client_info(server_url_hash, test_data, server_name) oauth_utils.save_code_verifier(server_url_hash, "test_verifier", server_name) with patch("mcp_foxxy_bridge.oauth.utils.ENCRYPTION_AVAILABLE", new=False): oauth_utils.save_tokens(server_url_hash, test_data, server_name) # Check file permissions client_path = config_dir / "test_server" / "client.json" verifier_path = config_dir / "test_server" / "verifier.txt" tokens_path = config_dir / "test_server" / "tokens.json" for file_path in [client_path, verifier_path, tokens_path]: if file_path.exists(): # Check permissions are 600 (owner read/write only) file_mode = oct(file_path.stat().st_mode)[-3:] assert file_mode == "600", f"File {file_path} has incorrect permissions: {file_mode}" def test_configuration_with_all_security_features(self) -> None: """Test configuration loading with all security features enabled.""" config_data = { "mcpServers": { "secure-server": { "command": "python", "args": ["-m", "secure_server"], "env": {"API_KEY": "${API_SECRET}", "USER": "$(whoami)", "TIMESTAMP": "$(date +%s)"}, "oauth_config": { "enabled": True, "type": "proxy", "client_id": "${OAUTH_CLIENT_ID}", "authorization_url": "https://api.example.com/oauth/authorize", }, } }, "bridge": {"host": "127.0.0.1", "port": 8080, "oauth_port": 8090}, } with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(config_data, f, indent=2) config_file = f.name try: env_vars = { "API_SECRET": "secret_api_key_123", "OAUTH_CLIENT_ID": "oauth_client_456", "MCP_ALLOW_COMMAND_SUBSTITUTION": "true", } with patch.dict(os.environ, env_vars): config = load_bridge_config_from_file(config_file, {}) server = config.servers["secure-server"] # Environment variables should be expanded assert server.env is not None, "Server env should not be None" assert server.env["API_KEY"] == "secret_api_key_123" assert server.oauth_config is not None, "OAuth config should not be None" assert server.oauth_config["client_id"] == "oauth_client_456" # Safe command substitution should work assert server.env["USER"] != "$(whoami)" assert server.env["TIMESTAMP"] != "$(date +%s)" # OAuth should be configured assert server.is_oauth_enabled() assert server.needs_oauth_proxy() # Bridge configuration should be secure assert config.bridge is not None, "Bridge config should not be None" assert config.bridge.host == "127.0.0.1" # Localhost only assert config.bridge.port == 8080 assert config.bridge.oauth_port == 8090 finally: Path(config_file).unlink()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyjbryant/mcp-foxxy-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server