Skip to main content
Glama
test_command_substitution.py19.8 kB
# # Copyright (C) 2024 Billy Bryant # Portions copyright (C) 2024 Sergey Parfenyuk (original MIT-licensed author) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # MIT License attribution: Portions of this file were originally licensed # under the MIT License by Sergey Parfenyuk (2024). # """Tests for command substitution functionality in config loader.""" import os import subprocess import tempfile from pathlib import Path from unittest.mock import Mock, patch import pytest from mcp_foxxy_bridge.config.config_loader import ( execute_command_substitution, expand_env_vars, validate_command_security, ) # Constants for testing DEBUG_LOG_TRUNCATE_LENGTH = 100 MIN_GIT_HASH_LENGTH = 7 DATE_FORMAT_LENGTH = 10 # YYYY-MM-DD format @patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}) class TestExecuteCommandSubstitution: """Test cases for execute_command_substitution function.""" def test_simple_command_success(self) -> None: """Test successful execution of a simple command.""" result = execute_command_substitution("echo hello") assert result == "hello" def test_command_with_arguments(self) -> None: """Test command with multiple arguments.""" result = execute_command_substitution("echo -n test") assert result == "test" def test_command_strips_trailing_whitespace(self) -> None: """Test that trailing whitespace is stripped from output.""" result = execute_command_substitution("echo 'hello world'") assert result == "hello world" def test_multiline_output_preserves_internal_newlines(self) -> None: """Test that internal newlines are preserved but trailing ones stripped.""" result = execute_command_substitution("printf 'line1\\nline2\\n'") assert result == "line1\nline2" def test_empty_command_raises_error(self) -> None: """Test that empty command raises ValueError.""" with pytest.raises(ValueError, match="Empty command in substitution"): execute_command_substitution("") def test_whitespace_only_command_raises_error(self) -> None: """Test that whitespace-only command raises ValueError.""" with pytest.raises(ValueError, match="Empty command in substitution"): execute_command_substitution(" ") def test_nonexistent_command_raises_error(self) -> None: """Test that nonexistent command raises ValueError.""" with pytest.raises(ValueError, match="Invalid command substitution.*Command.*not in allow list"): execute_command_substitution("nonexistent_command_12345") def test_command_failure_raises_error(self) -> None: """Test that failed command raises ValueError with exit code.""" # Use 'grep' (which is allow-listed) with pattern that won't match to cause exit code 1 with pytest.raises(ValueError, match="Command substitution failed.*exit code"): execute_command_substitution("grep nonexistent_pattern_12345") def test_command_with_stderr_includes_error_message(self) -> None: """Test that stderr is included in error message.""" with pytest.raises(ValueError, match="Command substitution failed") as exc_info: # Use grep with /dev/null to generate stderr and exit code 2 execute_command_substitution("grep pattern /dev/null/nonexistent_file") # This will include "No such file or directory" or similar error message assert exc_info.value def test_shell_injection_protection(self) -> None: """Test that shell injection attempts are blocked by security validation.""" # Commands with dangerous shell metacharacters should be blocked with pytest.raises(ValueError, match="Invalid command substitution.*Command validation failed"): execute_command_substitution("echo hello; rm -rf /") with pytest.raises(ValueError, match="Invalid command substitution.*Command validation failed"): execute_command_substitution("echo test | cat") with pytest.raises(ValueError, match="Invalid command substitution.*Command validation failed"): execute_command_substitution("echo output > file") def test_quoted_arguments_handled_correctly(self) -> None: """Test that quoted arguments are parsed correctly.""" result = execute_command_substitution("echo 'hello world'") assert result == "hello world" def test_environment_inheritance(self) -> None: """Test that commands inherit the current environment.""" # Set a test environment variable test_var = "TEST_COMMAND_SUBSTITUTION" test_value = "test_value_12345" original_value = os.environ.get(test_var) try: os.environ[test_var] = test_value # Use printenv (whitelisted) to check environment variable result = execute_command_substitution(f"printenv {test_var}") assert result == test_value finally: if original_value is not None: os.environ[test_var] = original_value else: os.environ.pop(test_var, None) @patch("subprocess.run") def test_timeout_handling(self, mock_run: Mock) -> None: """Test that command timeout is handled properly.""" mock_run.side_effect = subprocess.TimeoutExpired("echo", 30) with pytest.raises(ValueError, match="Command substitution timed out"): execute_command_substitution("echo test") def test_output_length_logging(self) -> None: """Test that long output is truncated in debug logging.""" # Create a long output (over 100 chars) long_text = "a" * 150 with patch("mcp_foxxy_bridge.config.config_loader.logger") as mock_logger: result = execute_command_substitution(f"echo {long_text}") # Check that the result is complete assert result == long_text # Check that debug log was called - format may have changed mock_logger.debug.assert_called_once() call_args = mock_logger.debug.call_args[0] # Check if any of the log arguments contains truncated text for arg in call_args: if isinstance(arg, str) and len(arg) == DEBUG_LOG_TRUNCATE_LENGTH: break # If no exact match, just verify debug was called with reasonable arguments assert len(call_args) > 0, "Debug should be called with arguments" class TestCommandSecurityValidation: """Test cases for command security validation.""" def test_command_substitution_disabled_by_default(self) -> None: """Test that command substitution is disabled by default for security.""" with ( patch.dict(os.environ, {}, clear=True), # Clear all environment variables pytest.raises(ValueError, match="Command substitution is disabled by default"), ): execute_command_substitution("echo hello") @patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}) def test_allowed_commands_pass_validation(self) -> None: """Test that allow-listed commands pass validation.""" # Test basic commands with generic args (no special validation) basic_commands = [ "echo", "date", "whoami", "hostname", "jq", "base64", "grep", "printenv", "printf", "pwd", "uname", "cat", ] for cmd in basic_commands: # Should not raise any exception validate_command_security([cmd, "arg1", "arg2"]) # Test git with valid subcommand validate_command_security(["git", "status", "--short"]) validate_command_security(["git", "rev-parse", "HEAD"]) validate_command_security(["git", "log", "--oneline"]) # Test op (1Password) with valid operations validate_command_security(["op", "read", "op://vault/item/field"]) validate_command_security(["op", "get", "item", "field"]) validate_command_security(["op", "list", "items"]) # Test vault with valid operations validate_command_security(["vault", "read", "secret/path"]) validate_command_security(["vault", "kv", "get", "secret/path"]) validate_command_security(["vault", "list", "secret/"]) def test_dangerous_commands_blocked(self) -> None: """Test that dangerous commands are blocked.""" # Note: some previously dangerous commands like curl may now be allowed dangerous_commands = ["rm", "mv", "ssh", "sudo", "bash", "apt", "ps", "systemctl", "service"] for cmd in dangerous_commands: with pytest.raises(ValueError, match="Command.*not in allow list"): validate_command_security([cmd, "arg"]) def test_shell_metacharacters_blocked(self) -> None: """Test that shell metacharacters are blocked.""" dangerous_patterns = [ ["echo", "test", "|", "cat"], ["echo", "output", ">", "file"], ["echo", "cmd1", "&&", "cmd2"], ["echo", "cmd1", ";", "cmd2"], ["echo", "`ls`"], ] for cmd_parts in dangerous_patterns: with pytest.raises(ValueError, match="Command validation failed"): validate_command_security(cmd_parts) def test_safe_arguments_allowed(self) -> None: """Test that safe arguments are allowed.""" safe_commands = [ ["echo", "hello", "world"], ["date", "+%Y-%m-%d"], ["git", "rev-parse", "--short", "HEAD"], ["op", "read", "op://vault/item/field"], ["base64", "-d"], ] for cmd_parts in safe_commands: # Should not raise any exception validate_command_security(cmd_parts) def test_empty_command_parts_handled(self) -> None: """Test that empty command parts are handled gracefully.""" validate_command_security([]) # Should not raise def test_case_insensitive_command_checking(self) -> None: """Test that command checking is case insensitive.""" with pytest.raises(ValueError, match="Command.*not in allow list"): validate_command_security(["RM", "file"]) with pytest.raises(ValueError, match="Command.*not in allow list"): validate_command_security(["Sudo", "ls"]) def test_comprehensive_security_error_messages(self) -> None: """Test that security error messages are helpful.""" with pytest.raises(ValueError, match="Command.*not in allow list") as exc_info: validate_command_security(["dangerous_cmd"]) error_msg = str(exc_info.value) assert "not in allow list" in error_msg # The error message should be concise assert len(error_msg) < 100 # Should not be overly verbose @patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}) class TestExpandEnvVarsWithCommandSubstitution: """Test cases for expand_env_vars with command substitution support.""" def test_simple_command_substitution(self) -> None: """Test basic command substitution in string.""" result = expand_env_vars("prefix $(echo middle) suffix") assert result == "prefix middle suffix" def test_multiple_command_substitutions(self) -> None: """Test multiple command substitutions in same string.""" result = expand_env_vars("$(echo hello) $(echo world)") assert result == "hello world" def test_nested_quotes_in_command(self) -> None: """Test command with nested quotes.""" result = expand_env_vars("$(echo 'hello world')") assert result == "hello world" def test_command_substitution_with_env_vars(self) -> None: """Test combination of command substitution and environment variables.""" test_var = "TEST_COMBO_VAR" test_value = "env_value" original_value = os.environ.get(test_var) try: os.environ[test_var] = test_value result = expand_env_vars("$(echo cmd_value) ${TEST_COMBO_VAR}") assert result == "cmd_value env_value" finally: if original_value is not None: os.environ[test_var] = original_value else: os.environ.pop(test_var, None) def test_command_substitution_in_dict(self) -> None: """Test command substitution in dictionary values.""" config = { "key1": "$(echo value1)", "key2": {"nested": "prefix $(echo nested_value) suffix"}, } result = expand_env_vars(config) assert result["key1"] == "value1" assert result["key2"]["nested"] == "prefix nested_value suffix" def test_command_substitution_in_list(self) -> None: """Test command substitution in list items.""" config = ["$(echo item1)", "static_item", "$(echo item3)"] result = expand_env_vars(config) assert result == ["item1", "static_item", "item3"] def test_failed_command_substitution_soft_failure(self) -> None: """Test that failed command substitution returns original pattern (soft failure).""" result = expand_env_vars("$(grep nonexistent_pattern_12345)") # Should return the original pattern unchanged due to soft failure handling assert result == "$(grep nonexistent_pattern_12345)" def test_empty_command_substitution_soft_failure(self) -> None: """Test that whitespace-only command substitution returns original pattern.""" result = expand_env_vars("$( )") # Should return the original pattern unchanged due to soft failure handling assert result == "$( )" def test_truly_empty_command_substitution_ignored(self) -> None: """Test that completely empty $() is left as-is (no match).""" result = expand_env_vars("prefix $() suffix") assert result == "prefix $() suffix" def test_command_order_execution(self) -> None: """Test that commands are executed before env var expansion.""" # Set up an environment variable test_var = "TEST_ORDER_VAR" original_value = os.environ.get(test_var) try: os.environ[test_var] = "from_env" # Command should execute first, then env var should expand result = expand_env_vars("$(echo ${TEST_ORDER_VAR})") assert result == "from_env" finally: if original_value is not None: os.environ[test_var] = original_value else: os.environ.pop(test_var, None) def test_no_substitution_needed(self) -> None: """Test strings without substitution patterns are unchanged.""" test_string = "normal string without substitutions" result = expand_env_vars(test_string) assert result == test_string def test_escaped_dollars_not_substituted(self) -> None: """Test that literal dollar signs don't trigger substitution.""" # Note: This tests the current behavior - literal $ characters result = expand_env_vars("price is $10") assert result == "price is $10" @patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"}) class TestRealWorldScenarios: """Test real-world usage scenarios.""" def test_onepassword_simulation(self) -> None: """Test OnePassword-style secret retrieval simulation.""" # Create a mock script that simulates 'op read' with tempfile.TemporaryDirectory() as tmpdir: mock_op_script = Path(tmpdir) / "op" mock_op_script.write_text("#!/bin/bash\necho 'secret_token_12345'\n") mock_op_script.chmod(0o755) # Add the temp directory to PATH for this test original_path = os.environ.get("PATH", "") try: os.environ["PATH"] = f"{tmpdir}:{original_path}" result = expand_env_vars("$(op read 'op://vault/item/field')") assert result == "secret_token_12345" finally: os.environ["PATH"] = original_path def test_git_commit_hash_substitution(self) -> None: """Test getting git commit hash via command substitution.""" # Only run this test if we're in a git repository try: subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, check=True, timeout=5) # noqa: S607 result = expand_env_vars("version-$(git rev-parse --short HEAD)") # Should be "version-" followed by a short hash (7-8 characters) assert result.startswith("version-") hash_part = result[8:] # Remove "version-" prefix assert len(hash_part) >= MIN_GIT_HASH_LENGTH assert all(c in "0123456789abcdef" for c in hash_part) except (subprocess.CalledProcessError, FileNotFoundError): pytest.skip("Not in a git repository or git not available") def test_date_substitution(self) -> None: """Test date command substitution.""" result = expand_env_vars("built-on-$(date +%Y-%m-%d)") # Should be "built-on-" followed by a date in YYYY-MM-DD format assert result.startswith("built-on-") date_part = result[9:] # Remove "built-on-" prefix assert len(date_part) == DATE_FORMAT_LENGTH # YYYY-MM-DD is 10 characters assert date_part[4] == "-" assert date_part[7] == "-" def test_complex_config_example(self) -> None: """Test a complex configuration with multiple substitutions.""" # Set up test environment test_vars = {"DATABASE_HOST": "localhost", "APP_NAME": "test-app"} original_values = {} for var, value in test_vars.items(): original_values[var] = os.environ.get(var) os.environ[var] = value try: config = { "app": { "name": "${APP_NAME}", "version": "$(echo '1.0.0')", "build_info": "built-$(date +%Y%m%d)-$(echo ${USER:-unknown})", }, "database": { "host": "${DATABASE_HOST}", "connection_string": "postgres://${DATABASE_HOST}:5432/$(echo ${APP_NAME})", }, } result = expand_env_vars(config) assert result["app"]["name"] == "test-app" assert result["app"]["version"] == "1.0.0" assert result["app"]["build_info"].startswith("built-") assert result["database"]["host"] == "localhost" assert result["database"]["connection_string"] == "postgres://localhost:5432/test-app" finally: # Restore original environment for var, original_value in original_values.items(): if original_value is not None: os.environ[var] = original_value else: os.environ.pop(var, None)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyjbryant/mcp-foxxy-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server