#
# Copyright (C) 2024 Billy Bryant
# Portions copyright (C) 2024 Sergey Parfenyuk (original MIT-licensed author)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# MIT License attribution: Portions of this file were originally licensed
# under the MIT License by Sergey Parfenyuk (2024).
#
"""Tests for command substitution functionality in config loader."""
import os
import subprocess
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from mcp_foxxy_bridge.config.config_loader import (
execute_command_substitution,
expand_env_vars,
validate_command_security,
)
# Constants for testing
DEBUG_LOG_TRUNCATE_LENGTH = 100
MIN_GIT_HASH_LENGTH = 7
DATE_FORMAT_LENGTH = 10 # YYYY-MM-DD format
@patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"})
class TestExecuteCommandSubstitution:
"""Test cases for execute_command_substitution function."""
def test_simple_command_success(self) -> None:
"""Test successful execution of a simple command."""
result = execute_command_substitution("echo hello")
assert result == "hello"
def test_command_with_arguments(self) -> None:
"""Test command with multiple arguments."""
result = execute_command_substitution("echo -n test")
assert result == "test"
def test_command_strips_trailing_whitespace(self) -> None:
"""Test that trailing whitespace is stripped from output."""
result = execute_command_substitution("echo 'hello world'")
assert result == "hello world"
def test_multiline_output_preserves_internal_newlines(self) -> None:
"""Test that internal newlines are preserved but trailing ones stripped."""
result = execute_command_substitution("printf 'line1\\nline2\\n'")
assert result == "line1\nline2"
def test_empty_command_raises_error(self) -> None:
"""Test that empty command raises ValueError."""
with pytest.raises(ValueError, match="Empty command in substitution"):
execute_command_substitution("")
def test_whitespace_only_command_raises_error(self) -> None:
"""Test that whitespace-only command raises ValueError."""
with pytest.raises(ValueError, match="Empty command in substitution"):
execute_command_substitution(" ")
def test_nonexistent_command_raises_error(self) -> None:
"""Test that nonexistent command raises ValueError."""
with pytest.raises(ValueError, match="Invalid command substitution.*Command.*not in allow list"):
execute_command_substitution("nonexistent_command_12345")
def test_command_failure_raises_error(self) -> None:
"""Test that failed command raises ValueError with exit code."""
# Use 'grep' (which is allow-listed) with pattern that won't match to cause exit code 1
with pytest.raises(ValueError, match="Command substitution failed.*exit code"):
execute_command_substitution("grep nonexistent_pattern_12345")
def test_command_with_stderr_includes_error_message(self) -> None:
"""Test that stderr is included in error message."""
with pytest.raises(ValueError, match="Command substitution failed") as exc_info:
# Use grep with /dev/null to generate stderr and exit code 2
execute_command_substitution("grep pattern /dev/null/nonexistent_file")
# This will include "No such file or directory" or similar error message
assert exc_info.value
def test_shell_injection_protection(self) -> None:
"""Test that shell injection attempts are blocked by security validation."""
# Commands with dangerous shell metacharacters should be blocked
with pytest.raises(ValueError, match="Invalid command substitution.*Command validation failed"):
execute_command_substitution("echo hello; rm -rf /")
with pytest.raises(ValueError, match="Invalid command substitution.*Command validation failed"):
execute_command_substitution("echo test | cat")
with pytest.raises(ValueError, match="Invalid command substitution.*Command validation failed"):
execute_command_substitution("echo output > file")
def test_quoted_arguments_handled_correctly(self) -> None:
"""Test that quoted arguments are parsed correctly."""
result = execute_command_substitution("echo 'hello world'")
assert result == "hello world"
def test_environment_inheritance(self) -> None:
"""Test that commands inherit the current environment."""
# Set a test environment variable
test_var = "TEST_COMMAND_SUBSTITUTION"
test_value = "test_value_12345"
original_value = os.environ.get(test_var)
try:
os.environ[test_var] = test_value
# Use printenv (whitelisted) to check environment variable
result = execute_command_substitution(f"printenv {test_var}")
assert result == test_value
finally:
if original_value is not None:
os.environ[test_var] = original_value
else:
os.environ.pop(test_var, None)
@patch("subprocess.run")
def test_timeout_handling(self, mock_run: Mock) -> None:
"""Test that command timeout is handled properly."""
mock_run.side_effect = subprocess.TimeoutExpired("echo", 30)
with pytest.raises(ValueError, match="Command substitution timed out"):
execute_command_substitution("echo test")
def test_output_length_logging(self) -> None:
"""Test that long output is truncated in debug logging."""
# Create a long output (over 100 chars)
long_text = "a" * 150
with patch("mcp_foxxy_bridge.config.config_loader.logger") as mock_logger:
result = execute_command_substitution(f"echo {long_text}")
# Check that the result is complete
assert result == long_text
# Check that debug log was called - format may have changed
mock_logger.debug.assert_called_once()
call_args = mock_logger.debug.call_args[0]
# Check if any of the log arguments contains truncated text
for arg in call_args:
if isinstance(arg, str) and len(arg) == DEBUG_LOG_TRUNCATE_LENGTH:
break
# If no exact match, just verify debug was called with reasonable arguments
assert len(call_args) > 0, "Debug should be called with arguments"
class TestCommandSecurityValidation:
"""Test cases for command security validation."""
def test_command_substitution_disabled_by_default(self) -> None:
"""Test that command substitution is disabled by default for security."""
with (
patch.dict(os.environ, {}, clear=True), # Clear all environment variables
pytest.raises(ValueError, match="Command substitution is disabled by default"),
):
execute_command_substitution("echo hello")
@patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"})
def test_allowed_commands_pass_validation(self) -> None:
"""Test that allow-listed commands pass validation."""
# Test basic commands with generic args (no special validation)
basic_commands = [
"echo",
"date",
"whoami",
"hostname",
"jq",
"base64",
"grep",
"printenv",
"printf",
"pwd",
"uname",
"cat",
]
for cmd in basic_commands:
# Should not raise any exception
validate_command_security([cmd, "arg1", "arg2"])
# Test git with valid subcommand
validate_command_security(["git", "status", "--short"])
validate_command_security(["git", "rev-parse", "HEAD"])
validate_command_security(["git", "log", "--oneline"])
# Test op (1Password) with valid operations
validate_command_security(["op", "read", "op://vault/item/field"])
validate_command_security(["op", "get", "item", "field"])
validate_command_security(["op", "list", "items"])
# Test vault with valid operations
validate_command_security(["vault", "read", "secret/path"])
validate_command_security(["vault", "kv", "get", "secret/path"])
validate_command_security(["vault", "list", "secret/"])
def test_dangerous_commands_blocked(self) -> None:
"""Test that dangerous commands are blocked."""
# Note: some previously dangerous commands like curl may now be allowed
dangerous_commands = ["rm", "mv", "ssh", "sudo", "bash", "apt", "ps", "systemctl", "service"]
for cmd in dangerous_commands:
with pytest.raises(ValueError, match="Command.*not in allow list"):
validate_command_security([cmd, "arg"])
def test_shell_metacharacters_blocked(self) -> None:
"""Test that shell metacharacters are blocked."""
dangerous_patterns = [
["echo", "test", "|", "cat"],
["echo", "output", ">", "file"],
["echo", "cmd1", "&&", "cmd2"],
["echo", "cmd1", ";", "cmd2"],
["echo", "`ls`"],
]
for cmd_parts in dangerous_patterns:
with pytest.raises(ValueError, match="Command validation failed"):
validate_command_security(cmd_parts)
def test_safe_arguments_allowed(self) -> None:
"""Test that safe arguments are allowed."""
safe_commands = [
["echo", "hello", "world"],
["date", "+%Y-%m-%d"],
["git", "rev-parse", "--short", "HEAD"],
["op", "read", "op://vault/item/field"],
["base64", "-d"],
]
for cmd_parts in safe_commands:
# Should not raise any exception
validate_command_security(cmd_parts)
def test_empty_command_parts_handled(self) -> None:
"""Test that empty command parts are handled gracefully."""
validate_command_security([]) # Should not raise
def test_case_insensitive_command_checking(self) -> None:
"""Test that command checking is case insensitive."""
with pytest.raises(ValueError, match="Command.*not in allow list"):
validate_command_security(["RM", "file"])
with pytest.raises(ValueError, match="Command.*not in allow list"):
validate_command_security(["Sudo", "ls"])
def test_comprehensive_security_error_messages(self) -> None:
"""Test that security error messages are helpful."""
with pytest.raises(ValueError, match="Command.*not in allow list") as exc_info:
validate_command_security(["dangerous_cmd"])
error_msg = str(exc_info.value)
assert "not in allow list" in error_msg
# The error message should be concise
assert len(error_msg) < 100 # Should not be overly verbose
@patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"})
class TestExpandEnvVarsWithCommandSubstitution:
"""Test cases for expand_env_vars with command substitution support."""
def test_simple_command_substitution(self) -> None:
"""Test basic command substitution in string."""
result = expand_env_vars("prefix $(echo middle) suffix")
assert result == "prefix middle suffix"
def test_multiple_command_substitutions(self) -> None:
"""Test multiple command substitutions in same string."""
result = expand_env_vars("$(echo hello) $(echo world)")
assert result == "hello world"
def test_nested_quotes_in_command(self) -> None:
"""Test command with nested quotes."""
result = expand_env_vars("$(echo 'hello world')")
assert result == "hello world"
def test_command_substitution_with_env_vars(self) -> None:
"""Test combination of command substitution and environment variables."""
test_var = "TEST_COMBO_VAR"
test_value = "env_value"
original_value = os.environ.get(test_var)
try:
os.environ[test_var] = test_value
result = expand_env_vars("$(echo cmd_value) ${TEST_COMBO_VAR}")
assert result == "cmd_value env_value"
finally:
if original_value is not None:
os.environ[test_var] = original_value
else:
os.environ.pop(test_var, None)
def test_command_substitution_in_dict(self) -> None:
"""Test command substitution in dictionary values."""
config = {
"key1": "$(echo value1)",
"key2": {"nested": "prefix $(echo nested_value) suffix"},
}
result = expand_env_vars(config)
assert result["key1"] == "value1"
assert result["key2"]["nested"] == "prefix nested_value suffix"
def test_command_substitution_in_list(self) -> None:
"""Test command substitution in list items."""
config = ["$(echo item1)", "static_item", "$(echo item3)"]
result = expand_env_vars(config)
assert result == ["item1", "static_item", "item3"]
def test_failed_command_substitution_soft_failure(self) -> None:
"""Test that failed command substitution returns original pattern (soft failure)."""
result = expand_env_vars("$(grep nonexistent_pattern_12345)")
# Should return the original pattern unchanged due to soft failure handling
assert result == "$(grep nonexistent_pattern_12345)"
def test_empty_command_substitution_soft_failure(self) -> None:
"""Test that whitespace-only command substitution returns original pattern."""
result = expand_env_vars("$( )")
# Should return the original pattern unchanged due to soft failure handling
assert result == "$( )"
def test_truly_empty_command_substitution_ignored(self) -> None:
"""Test that completely empty $() is left as-is (no match)."""
result = expand_env_vars("prefix $() suffix")
assert result == "prefix $() suffix"
def test_command_order_execution(self) -> None:
"""Test that commands are executed before env var expansion."""
# Set up an environment variable
test_var = "TEST_ORDER_VAR"
original_value = os.environ.get(test_var)
try:
os.environ[test_var] = "from_env"
# Command should execute first, then env var should expand
result = expand_env_vars("$(echo ${TEST_ORDER_VAR})")
assert result == "from_env"
finally:
if original_value is not None:
os.environ[test_var] = original_value
else:
os.environ.pop(test_var, None)
def test_no_substitution_needed(self) -> None:
"""Test strings without substitution patterns are unchanged."""
test_string = "normal string without substitutions"
result = expand_env_vars(test_string)
assert result == test_string
def test_escaped_dollars_not_substituted(self) -> None:
"""Test that literal dollar signs don't trigger substitution."""
# Note: This tests the current behavior - literal $ characters
result = expand_env_vars("price is $10")
assert result == "price is $10"
@patch.dict(os.environ, {"MCP_ALLOW_COMMAND_SUBSTITUTION": "true"})
class TestRealWorldScenarios:
"""Test real-world usage scenarios."""
def test_onepassword_simulation(self) -> None:
"""Test OnePassword-style secret retrieval simulation."""
# Create a mock script that simulates 'op read'
with tempfile.TemporaryDirectory() as tmpdir:
mock_op_script = Path(tmpdir) / "op"
mock_op_script.write_text("#!/bin/bash\necho 'secret_token_12345'\n")
mock_op_script.chmod(0o755)
# Add the temp directory to PATH for this test
original_path = os.environ.get("PATH", "")
try:
os.environ["PATH"] = f"{tmpdir}:{original_path}"
result = expand_env_vars("$(op read 'op://vault/item/field')")
assert result == "secret_token_12345"
finally:
os.environ["PATH"] = original_path
def test_git_commit_hash_substitution(self) -> None:
"""Test getting git commit hash via command substitution."""
# Only run this test if we're in a git repository
try:
subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, check=True, timeout=5) # noqa: S607
result = expand_env_vars("version-$(git rev-parse --short HEAD)")
# Should be "version-" followed by a short hash (7-8 characters)
assert result.startswith("version-")
hash_part = result[8:] # Remove "version-" prefix
assert len(hash_part) >= MIN_GIT_HASH_LENGTH
assert all(c in "0123456789abcdef" for c in hash_part)
except (subprocess.CalledProcessError, FileNotFoundError):
pytest.skip("Not in a git repository or git not available")
def test_date_substitution(self) -> None:
"""Test date command substitution."""
result = expand_env_vars("built-on-$(date +%Y-%m-%d)")
# Should be "built-on-" followed by a date in YYYY-MM-DD format
assert result.startswith("built-on-")
date_part = result[9:] # Remove "built-on-" prefix
assert len(date_part) == DATE_FORMAT_LENGTH # YYYY-MM-DD is 10 characters
assert date_part[4] == "-"
assert date_part[7] == "-"
def test_complex_config_example(self) -> None:
"""Test a complex configuration with multiple substitutions."""
# Set up test environment
test_vars = {"DATABASE_HOST": "localhost", "APP_NAME": "test-app"}
original_values = {}
for var, value in test_vars.items():
original_values[var] = os.environ.get(var)
os.environ[var] = value
try:
config = {
"app": {
"name": "${APP_NAME}",
"version": "$(echo '1.0.0')",
"build_info": "built-$(date +%Y%m%d)-$(echo ${USER:-unknown})",
},
"database": {
"host": "${DATABASE_HOST}",
"connection_string": "postgres://${DATABASE_HOST}:5432/$(echo ${APP_NAME})",
},
}
result = expand_env_vars(config)
assert result["app"]["name"] == "test-app"
assert result["app"]["version"] == "1.0.0"
assert result["app"]["build_info"].startswith("built-")
assert result["database"]["host"] == "localhost"
assert result["database"]["connection_string"] == "postgres://localhost:5432/test-app"
finally:
# Restore original environment
for var, original_value in original_values.items():
if original_value is not None:
os.environ[var] = original_value
else:
os.environ.pop(var, None)