"""Comprehensive tests for Keyboard Maestro client integration functionality.
This module provides comprehensive test coverage for the KM client system with focus
on functional programming patterns, error handling monads, and connection management.
"""
from unittest.mock import Mock, patch
import pytest
from hypothesis import given
from hypothesis import strategies as st
from src.core.types import Duration, GroupId, MacroId, TriggerId
from src.integration.km_client import (
ConnectionMethod,
KMClient,
KMError,
TriggerDefinition,
create_client_with_fallback,
retry_with_backoff,
)
class TestConnectionMethod:
"""Test ConnectionMethod enumeration."""
def test_connection_method_values(self) -> None:
"""Test all connection method values."""
assert ConnectionMethod.APPLESCRIPT.value == "applescript"
assert ConnectionMethod.URL_SCHEME.value == "url_scheme"
assert ConnectionMethod.WEB_API.value == "web_api"
assert ConnectionMethod.REMOTE_TRIGGER.value == "remote_trigger"
def test_connection_method_enumeration(self) -> None:
"""Test connection method enumeration completeness."""
all_methods = list(ConnectionMethod)
assert len(all_methods) == 4
values = [method.value for method in all_methods]
assert len(values) == len(set(values)) # All unique
def test_connection_method_string_conversion(self) -> None:
"""Test connection method string representation."""
for method in ConnectionMethod:
assert isinstance(method.value, str)
assert len(method.value) > 0
class TestKMError:
"""Test KMError functionality."""
def test_km_error_creation(self) -> None:
"""Test creation of KM error instances."""
error = KMError(
error_type="connection_error",
message="Failed to connect to Keyboard Maestro",
details={"host": "localhost", "port": 8080}
)
assert error.error_type == "connection_error"
assert error.message == "Failed to connect to Keyboard Maestro"
assert error.details["host"] == "localhost"
assert error.details["port"] == 8080
def test_km_error_minimal(self) -> None:
"""Test creation of minimal KM error."""
error = KMError(
error_type="timeout",
message="Operation timed out"
)
assert error.error_type == "timeout"
assert error.message == "Operation timed out"
assert error.details == {}
def test_km_error_from_exception(self) -> None:
"""Test creation of KM error from exception."""
exception = ValueError("Invalid parameter")
error = KMError.from_exception(exception)
assert error.error_type == "ValueError"
assert "Invalid parameter" in error.message
assert "exception_type" in error.details
def test_km_error_connection_error(self) -> None:
"""Test creation of connection error."""
error = KMError.connection_error("localhost", 8080)
assert error.error_type == "connection_error"
assert "localhost" in error.message
assert "8080" in error.message
def test_km_error_timeout_error(self) -> None:
"""Test creation of timeout error."""
error = KMError.timeout_error(30.0)
assert error.error_type == "timeout_error"
assert "30.0" in error.message
assert error.details["timeout_seconds"] == 30.0
def test_km_error_validation_error(self) -> None:
"""Test creation of validation error."""
error = KMError.validation_error("macro_id", "invalid format")
assert error.error_type == "validation_error"
assert "macro_id" in error.message
assert "invalid format" in error.message
class TestKMConfig:
"""Test KMConfig functionality."""
def test_km_config_creation(self) -> None:
"""Test creation of KM configuration."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_api_key",
timeout=Duration(seconds=30),
retry_attempts=3
)
assert config.connection_method == ConnectionMethod.WEB_API
assert config.host == "localhost"
assert config.port == 8080
assert config.api_key == "test_api_key"
assert config.timeout.total_seconds() == 30
assert config.retry_attempts == 3
def test_km_config_defaults(self) -> None:
"""Test KM configuration with default values."""
config = KMConfig()
assert config.connection_method == ConnectionMethod.APPLESCRIPT
assert config.host == "localhost"
assert config.port == 8100
assert config.timeout.total_seconds() == 30
assert config.retry_attempts == 3
assert config.use_https is False
def test_km_config_validation_valid(self) -> None:
"""Test KM configuration validation with valid config."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="valid.host.com",
port=8080,
api_key="valid_key"
)
result = config.validate()
assert result["valid"] is True
assert result["errors"] == []
def test_km_config_validation_invalid_port(self) -> None:
"""Test KM configuration validation with invalid port."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=0 # Invalid port
)
result = config.validate()
assert result["valid"] is False
assert any("port" in error.lower() for error in result["errors"])
def test_km_config_validation_missing_api_key(self) -> None:
"""Test KM configuration validation with missing API key for web API."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="" # Missing API key for web API
)
result = config.validate()
assert result["valid"] is False
assert any("api_key" in error.lower() for error in result["errors"])
def test_km_config_applescript_no_api_key_needed(self) -> None:
"""Test KM configuration validation for AppleScript (no API key needed)."""
config = KMConfig(
connection_method=ConnectionMethod.APPLESCRIPT,
host="localhost",
port=8080
# No API key needed for AppleScript
)
result = config.validate()
assert result["valid"] is True
def test_km_config_to_dict(self) -> None:
"""Test KM configuration serialization to dictionary."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="example.com",
port=9000,
api_key="secret_key"
)
config_dict = config.to_dict()
assert config_dict["connection_method"] == "web_api"
assert config_dict["host"] == "example.com"
assert config_dict["port"] == 9000
assert config_dict["api_key"] == "secret_key"
def test_km_config_from_dict(self) -> None:
"""Test KM configuration creation from dictionary."""
config_dict = {
"connection_method": "url_scheme",
"host": "example.com",
"port": 9000,
"timeout_seconds": 60,
"retry_attempts": 5
}
config = KMConfig.from_dict(config_dict)
assert config.connection_method == ConnectionMethod.URL_SCHEME
assert config.host == "example.com"
assert config.port == 9000
assert config.timeout.total_seconds() == 60
assert config.retry_attempts == 5
class TestKMResponse:
"""Test KMResponse functionality."""
def test_km_response_creation(self) -> None:
"""Test creation of KM response."""
response = KMResponse(
status=KMStatus.SUCCESS,
data={"result": "operation completed"},
error=None,
execution_time=Duration(seconds=1.5)
)
assert response.status == KMStatus.SUCCESS
assert response.data["result"] == "operation completed"
assert response.error is None
assert response.execution_time.total_seconds() == 1.5
def test_km_response_error(self) -> None:
"""Test creation of KM error response."""
error = KMError(
error_type="execution_error",
message="Macro execution failed"
)
response = KMResponse(
status=KMStatus.ERROR,
data={},
error=error,
execution_time=Duration(seconds=0.5)
)
assert response.status == KMStatus.ERROR
assert response.error == error
assert response.data == {}
def test_km_response_success_factory(self) -> None:
"""Test KM response success factory method."""
data = {"macro_id": "test_macro", "executed": True}
response = KMResponse.success(data, execution_time=Duration(seconds=2.0))
assert response.status == KMStatus.SUCCESS
assert response.data == data
assert response.error is None
assert response.execution_time.total_seconds() == 2.0
def test_km_response_error_factory(self) -> None:
"""Test KM response error factory method."""
error = KMError.validation_error("macro_id", "not found")
response = KMResponse.error(error, execution_time=Duration(seconds=0.1))
assert response.status == KMStatus.ERROR
assert response.error == error
assert response.data == {}
def test_km_response_is_success(self) -> None:
"""Test KM response success checking."""
success_response = KMResponse.success({"result": "ok"})
error_response = KMResponse.error(KMError("test", "test error"))
assert success_response.is_success() is True
assert error_response.is_success() is False
def test_km_response_is_error(self) -> None:
"""Test KM response error checking."""
success_response = KMResponse.success({"result": "ok"})
error_response = KMResponse.error(KMError("test", "test error"))
assert success_response.is_error() is False
assert error_response.is_error() is True
class TestKMOperation:
"""Test KMOperation functionality."""
def test_km_operation_creation(self) -> None:
"""Test creation of KM operation."""
operation = KMOperation(
operation_type="execute_macro",
target_id=MacroId("test_macro"),
parameters={"param1": "value1", "param2": 42},
timeout=Duration(seconds=60)
)
assert operation.operation_type == "execute_macro"
assert operation.target_id == MacroId("test_macro")
assert operation.parameters["param1"] == "value1"
assert operation.parameters["param2"] == 42
assert operation.timeout.total_seconds() == 60
def test_km_operation_defaults(self) -> None:
"""Test KM operation with default values."""
operation = KMOperation(
operation_type="list_macros"
)
assert operation.operation_type == "list_macros"
assert operation.target_id is None
assert operation.parameters == {}
assert operation.timeout.total_seconds() == 30
def test_km_operation_execute_macro(self) -> None:
"""Test KM operation for macro execution."""
operation = KMOperation.execute_macro(
macro_id=MacroId("test_macro"),
parameters={"input": "test_input"}
)
assert operation.operation_type == "execute_macro"
assert operation.target_id == MacroId("test_macro")
assert operation.parameters["input"] == "test_input"
def test_km_operation_list_macros(self) -> None:
"""Test KM operation for listing macros."""
operation = KMOperation.list_macros(group_id=GroupId("test_group"))
assert operation.operation_type == "list_macros"
assert operation.parameters["group_id"] == GroupId("test_group")
def test_km_operation_get_macro_info(self) -> None:
"""Test KM operation for getting macro info."""
operation = KMOperation.get_macro_info(MacroId("test_macro"))
assert operation.operation_type == "get_macro_info"
assert operation.target_id == MacroId("test_macro")
def test_km_operation_enable_macro(self) -> None:
"""Test KM operation for enabling macro."""
operation = KMOperation.enable_macro(MacroId("test_macro"))
assert operation.operation_type == "enable_macro"
assert operation.target_id == MacroId("test_macro")
def test_km_operation_disable_macro(self) -> None:
"""Test KM operation for disabling macro."""
operation = KMOperation.disable_macro(MacroId("test_macro"))
assert operation.operation_type == "disable_macro"
assert operation.target_id == MacroId("test_macro")
def test_km_operation_trigger_macro(self) -> None:
"""Test KM operation for triggering macro."""
operation = KMOperation.trigger_macro(
trigger_id=TriggerId("test_trigger"),
context={"key": "value"}
)
assert operation.operation_type == "trigger_macro"
assert operation.target_id == TriggerId("test_trigger")
assert operation.parameters["context"]["key"] == "value"
def test_km_operation_validation_valid(self) -> None:
"""Test KM operation validation with valid operation."""
operation = KMOperation(
operation_type="execute_macro",
target_id=MacroId("valid_macro"),
parameters={"param": "value"}
)
result = operation.validate()
assert result["valid"] is True
assert result["errors"] == []
def test_km_operation_validation_invalid_type(self) -> None:
"""Test KM operation validation with invalid operation type."""
operation = KMOperation(
operation_type="invalid_operation",
target_id=MacroId("test_macro")
)
result = operation.validate()
assert result["valid"] is False
assert any("operation_type" in error.lower() for error in result["errors"])
def test_km_operation_to_dict(self) -> None:
"""Test KM operation serialization to dictionary."""
operation = KMOperation(
operation_type="execute_macro",
target_id=MacroId("test_macro"),
parameters={"param": "value"}
)
op_dict = operation.to_dict()
assert op_dict["operation_type"] == "execute_macro"
assert op_dict["target_id"] == "test_macro"
assert op_dict["parameters"]["param"] == "value"
class TestKMClient:
"""Test KMClient functionality."""
def setup_method(self) -> None:
"""Set up test fixtures."""
self.config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
self.client = KMClient(self.config)
def test_km_client_creation(self) -> None:
"""Test creation of KM client."""
assert self.client.config == self.config
assert self.client.is_connected() is False
assert hasattr(self.client, 'execute_operation')
def test_km_client_connect_success(self) -> None:
"""Test successful KM client connection."""
with patch.object(self.client, '_test_connection', return_value=Right(True)):
result = self.client.connect()
assert isinstance(result, Right)
assert result.value is True
assert self.client.is_connected() is True
def test_km_client_connect_failure(self) -> None:
"""Test failed KM client connection."""
error = KMError.connection_error("localhost", 8080)
with patch.object(self.client, '_test_connection', return_value=Left(error)):
result = self.client.connect()
assert isinstance(result, Left)
assert result.value == error
assert self.client.is_connected() is False
def test_km_client_disconnect(self) -> None:
"""Test KM client disconnection."""
# First connect
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
assert self.client.is_connected() is True
# Then disconnect
self.client.disconnect()
assert self.client.is_connected() is False
def test_km_client_execute_operation_not_connected(self) -> None:
"""Test operation execution when not connected."""
operation = KMOperation.list_macros()
result = self.client.execute_operation(operation)
assert isinstance(result, Left)
assert "not connected" in result.value.message.lower()
def test_km_client_execute_operation_success(self) -> None:
"""Test successful operation execution."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
operation = KMOperation.list_macros()
expected_response = KMResponse.success({"macros": ["macro1", "macro2"]})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.execute_operation(operation)
assert isinstance(result, Right)
assert result.value == expected_response
def test_km_client_execute_macro(self) -> None:
"""Test macro execution through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({"executed": True, "result": "success"})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.execute_macro(MacroId("test_macro"), {"param": "value"})
assert isinstance(result, Right)
assert result.value.data["executed"] is True
def test_km_client_list_macros(self) -> None:
"""Test listing macros through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({
"macros": [
{"id": "macro1", "name": "Test Macro 1"},
{"id": "macro2", "name": "Test Macro 2"}
]
})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.list_macros()
assert isinstance(result, Right)
assert len(result.value.data["macros"]) == 2
def test_km_client_get_macro_info(self) -> None:
"""Test getting macro info through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({
"macro": {
"id": "test_macro",
"name": "Test Macro",
"enabled": True,
"group": "Test Group"
}
})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.get_macro_info(MacroId("test_macro"))
assert isinstance(result, Right)
assert result.value.data["macro"]["name"] == "Test Macro"
def test_km_client_enable_macro(self) -> None:
"""Test enabling macro through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({"enabled": True})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.enable_macro(MacroId("test_macro"))
assert isinstance(result, Right)
assert result.value.data["enabled"] is True
def test_km_client_disable_macro(self) -> None:
"""Test disabling macro through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({"enabled": False})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.disable_macro(MacroId("test_macro"))
assert isinstance(result, Right)
assert result.value.data["enabled"] is False
def test_km_client_trigger_macro(self) -> None:
"""Test triggering macro through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({"triggered": True})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.trigger_macro(TriggerId("test_trigger"), {"context": "test"})
assert isinstance(result, Right)
assert result.value.data["triggered"] is True
def test_km_client_get_status(self) -> None:
"""Test getting KM status through client."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
expected_response = KMResponse.success({
"status": "running",
"version": "10.0",
"macros_count": 50
})
with patch.object(self.client, '_execute_operation_internal', return_value=Right(expected_response)):
result = self.client.get_status()
assert isinstance(result, Right)
assert result.value.data["status"] == "running"
def test_km_client_retry_mechanism(self) -> None:
"""Test client retry mechanism on failures."""
# Connect first
with patch.object(self.client, '_test_connection', return_value=Right(True)):
self.client.connect()
operation = KMOperation.list_macros()
# Mock to fail twice then succeed
call_count = 0
def mock_execute(op):
nonlocal call_count
call_count += 1
if call_count <= 2:
return Left(KMError.timeout_error(30.0))
else:
return Right(KMResponse.success({"macros": []}))
with patch.object(self.client, '_execute_operation_internal', side_effect=mock_execute):
result = self.client.execute_operation(operation)
assert isinstance(result, Right)
assert call_count == 3 # Failed twice, succeeded on third try
class TestAsyncKMClient:
"""Test AsyncKMClient functionality."""
def setup_method(self) -> None:
"""Set up test fixtures."""
self.config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
@pytest.mark.asyncio
async def test_async_km_client_creation(self) -> None:
"""Test creation of async KM client."""
async with AsyncKMClient(self.config) as client:
assert client.config == self.config
assert hasattr(client, 'execute_operation')
@pytest.mark.asyncio
async def test_async_km_client_connect_success(self) -> None:
"""Test successful async KM client connection."""
client = AsyncKMClient(self.config)
with patch.object(client, '_test_connection_async', return_value=Right(True)):
result = await client.connect()
assert isinstance(result, Right)
assert result.value is True
await client.close()
@pytest.mark.asyncio
async def test_async_km_client_connect_failure(self) -> None:
"""Test failed async KM client connection."""
client = AsyncKMClient(self.config)
error = KMError.connection_error("localhost", 8080)
with patch.object(client, '_test_connection_async', return_value=Left(error)):
result = await client.connect()
assert isinstance(result, Left)
assert result.value == error
await client.close()
@pytest.mark.asyncio
async def test_async_km_client_execute_operation(self) -> None:
"""Test async operation execution."""
client = AsyncKMClient(self.config)
# Mock connection
with patch.object(client, '_test_connection_async', return_value=Right(True)):
await client.connect()
operation = KMOperation.list_macros()
expected_response = KMResponse.success({"macros": ["macro1", "macro2"]})
with patch.object(client, '_execute_operation_async', return_value=Right(expected_response)):
result = await client.execute_operation(operation)
assert isinstance(result, Right)
assert result.value == expected_response
await client.close()
@pytest.mark.asyncio
async def test_async_km_client_execute_macro(self) -> None:
"""Test async macro execution."""
client = AsyncKMClient(self.config)
# Mock connection
with patch.object(client, '_test_connection_async', return_value=Right(True)):
await client.connect()
expected_response = KMResponse.success({"executed": True})
with patch.object(client, '_execute_operation_async', return_value=Right(expected_response)):
result = await client.execute_macro(MacroId("test_macro"))
assert isinstance(result, Right)
assert result.value.data["executed"] is True
await client.close()
@pytest.mark.asyncio
async def test_async_km_client_context_manager(self) -> None:
"""Test async KM client context manager."""
async with AsyncKMClient(self.config) as client:
assert client is not None
# Client should be properly initialized within context
@pytest.mark.asyncio
async def test_async_km_client_concurrent_operations(self) -> None:
"""Test concurrent async operations."""
client = AsyncKMClient(self.config)
# Mock connection
with patch.object(client, '_test_connection_async', return_value=Right(True)):
await client.connect()
# Mock operation execution
async def mock_execute(operation):
await asyncio.sleep(0.1) # Simulate async work
return Right(KMResponse.success({"operation": operation.operation_type}))
with patch.object(client, '_execute_operation_async', side_effect=mock_execute):
# Execute multiple operations concurrently
operations = [
KMOperation.list_macros(),
KMOperation.get_macro_info(MacroId("macro1")),
KMOperation.get_macro_info(MacroId("macro2"))
]
results = await asyncio.gather(*[
client.execute_operation(op) for op in operations
])
assert len(results) == 3
assert all(isinstance(result, Right) for result in results)
await client.close()
class TestHelperFunctions:
"""Test helper functions."""
def test_km_applescript_cmd(self) -> None:
"""Test AppleScript command generation."""
operation = KMOperation.execute_macro(MacroId("test_macro"))
cmd = km_applescript_cmd(operation)
assert isinstance(cmd, list)
assert "osascript" in cmd
assert "test_macro" in " ".join(cmd)
def test_km_url_scheme(self) -> None:
"""Test URL scheme generation."""
operation = KMOperation.execute_macro(
MacroId("test_macro"),
parameters={"param1": "value1", "param2": "value2"}
)
url = km_url_scheme(operation)
assert url.startswith("kmtrigger://")
assert "test_macro" in url
assert "param1=value1" in url
@pytest.mark.asyncio
async def test_km_web_api_request(self) -> None:
"""Test web API request generation."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
operation = KMOperation.list_macros()
# Mock httpx response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"macros": ["macro1", "macro2"]}
with patch('httpx.AsyncClient.post', return_value=mock_response):
result = await km_web_api_request(config, operation)
assert isinstance(result, Right)
assert result.value.status == KMStatus.SUCCESS
def test_parse_km_response(self) -> None:
"""Test KM response parsing."""
raw_response = {
"status": "success",
"data": {"result": "operation completed"},
"execution_time": 1.5
}
response = parse_km_response(raw_response)
assert response.status == KMStatus.SUCCESS
assert response.data["result"] == "operation completed"
assert response.execution_time.total_seconds() == 1.5
def test_parse_km_response_error(self) -> None:
"""Test KM error response parsing."""
raw_response = {
"status": "error",
"error": {
"type": "execution_error",
"message": "Macro not found"
},
"execution_time": 0.1
}
response = parse_km_response(raw_response)
assert response.status == KMStatus.ERROR
assert response.error.error_type == "execution_error"
assert "Macro not found" in response.error.message
def test_validate_km_config(self) -> None:
"""Test KM configuration validation function."""
valid_config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="valid_key"
)
result = validate_km_config(valid_config)
assert isinstance(result, Right)
def test_validate_km_config_invalid(self) -> None:
"""Test KM configuration validation with invalid config."""
invalid_config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="", # Invalid empty host
port=0, # Invalid port
api_key="" # Missing API key
)
result = validate_km_config(invalid_config)
assert isinstance(result, Left)
def test_create_km_client(self) -> None:
"""Test KM client creation helper."""
config = KMConfig(
connection_method=ConnectionMethod.APPLESCRIPT,
host="localhost"
)
client = create_km_client(config)
assert isinstance(client, KMClient)
assert client.config == config
def test_get_km_client_singleton(self) -> None:
"""Test KM client singleton access."""
config = KMConfig(
connection_method=ConnectionMethod.APPLESCRIPT,
host="localhost"
)
client1 = get_km_client(config)
client2 = get_km_client(config)
# Should return same instance for same config
assert client1 is client2
class TestConnectionMethods:
"""Test different connection method implementations."""
def test_applescript_connection_method(self) -> None:
"""Test AppleScript connection method."""
config = KMConfig(
connection_method=ConnectionMethod.APPLESCRIPT,
host="localhost"
)
client = KMClient(config)
operation = KMOperation.execute_macro(MacroId("test_macro"))
# Mock subprocess call
with patch('subprocess.run') as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = "success"
# This would normally use the _execute_applescript method
# We'll test the command generation
cmd = km_applescript_cmd(operation)
assert "osascript" in cmd
def test_url_scheme_connection_method(self) -> None:
"""Test URL scheme connection method."""
config = KMConfig(
connection_method=ConnectionMethod.URL_SCHEME,
host="localhost"
)
client = KMClient(config)
operation = KMOperation.trigger_macro(TriggerId("test_trigger"))
# Test URL generation
url = km_url_scheme(operation)
assert url.startswith("kmtrigger://")
assert "test_trigger" in url
@pytest.mark.asyncio
async def test_web_api_connection_method(self) -> None:
"""Test Web API connection method."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
operation = KMOperation.list_macros()
# Mock successful HTTP response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"status": "success",
"data": {"macros": []},
"execution_time": 0.1
}
with patch('httpx.AsyncClient.post', return_value=mock_response):
result = await km_web_api_request(config, operation)
assert isinstance(result, Right)
assert result.value.status == KMStatus.SUCCESS
def test_remote_trigger_connection_method(self) -> None:
"""Test remote trigger connection method."""
config = KMConfig(
connection_method=ConnectionMethod.REMOTE_TRIGGER,
host="remote.host.com",
port=8080
)
client = KMClient(config)
# Remote trigger should work similarly to URL scheme
# but with different target host
assert client.config.host == "remote.host.com"
assert client.config.connection_method == ConnectionMethod.REMOTE_TRIGGER
class TestErrorHandling:
"""Test comprehensive error handling."""
def test_connection_timeout_error(self) -> None:
"""Test connection timeout handling."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="nonexistent.host.com",
port=8080,
api_key="test_key",
timeout=Duration(seconds=1)
)
client = KMClient(config)
# Mock timeout exception
with patch.object(client, '_test_connection', side_effect=TimeoutError("Connection timed out")):
result = client.connect()
assert isinstance(result, Left)
assert "timeout" in result.value.error_type.lower()
def test_invalid_response_handling(self) -> None:
"""Test handling of invalid responses."""
invalid_responses = [
{"invalid": "response"},
{"status": "unknown_status"},
{"status": "success"}, # Missing data
None,
"not_a_dict"
]
for invalid_response in invalid_responses:
try:
response = parse_km_response(invalid_response)
# Should either handle gracefully or return error status
assert response.status in [KMStatus.SUCCESS, KMStatus.ERROR, KMStatus.TIMEOUT]
except Exception:
# Exception handling is also acceptable for truly invalid responses
pass
def test_network_error_handling(self) -> None:
"""Test network error handling."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
client = KMClient(config)
# Mock network error
with patch.object(client, '_test_connection', side_effect=ConnectionError("Network unreachable")):
result = client.connect()
assert isinstance(result, Left)
assert "connection" in result.value.error_type.lower()
def test_authentication_error_handling(self) -> None:
"""Test authentication error handling."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="invalid_key"
)
client = KMClient(config)
# Mock authentication error
auth_error = KMError(
error_type="authentication_error",
message="Invalid API key"
)
with patch.object(client, '_test_connection', return_value=Left(auth_error)):
result = client.connect()
assert isinstance(result, Left)
assert result.value.error_type == "authentication_error"
class TestPropertyBasedTesting:
"""Property-based tests for KM client functionality."""
@given(st.text(min_size=1, max_size=50).filter(lambda x: x.strip() and x.isalnum()))
def test_macro_id_consistency(self, macro_id: str) -> None:
"""Property: Macro IDs should be handled consistently."""
operation = KMOperation.execute_macro(MacroId(macro_id))
assert operation.target_id == MacroId(macro_id)
assert str(operation.target_id) == macro_id
@given(st.integers(min_value=1, max_value=65535))
def test_port_validation_consistency(self, port: int) -> None:
"""Property: Port numbers should be validated consistently."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=port,
api_key="test_key"
)
result = config.validate()
assert result["valid"] is True
@given(st.sampled_from(list(ConnectionMethod)))
def test_connection_method_consistency(self, method: ConnectionMethod) -> None:
"""Property: All connection methods should be handled consistently."""
config = KMConfig(
connection_method=method,
host="localhost",
port=8080
)
# Add API key for methods that require it
if method in [ConnectionMethod.WEB_API, ConnectionMethod.REMOTE_TRIGGER]:
config = config.with_api_key("test_key")
client = create_km_client(config)
assert client.config.connection_method == method
@given(st.floats(min_value=0.1, max_value=300.0))
def test_timeout_consistency(self, timeout_seconds: float) -> None:
"""Property: Timeout values should be handled consistently."""
config = KMConfig(
connection_method=ConnectionMethod.APPLESCRIPT,
timeout=Duration(seconds=timeout_seconds)
)
assert config.timeout.total_seconds() == timeout_seconds
class TestIntegrationScenarios:
"""Test realistic integration scenarios."""
def test_complete_macro_execution_workflow(self) -> None:
"""Test complete workflow from connection to macro execution."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
client = KMClient(config)
# Step 1: Connect
with patch.object(client, '_test_connection', return_value=Right(True)):
connect_result = client.connect()
assert isinstance(connect_result, Right)
# Step 2: List macros
with patch.object(client, '_execute_operation_internal') as mock_execute:
mock_execute.return_value = Right(KMResponse.success({
"macros": [{"id": "macro1", "name": "Test Macro"}]
}))
list_result = client.list_macros()
assert isinstance(list_result, Right)
# Step 3: Execute macro
with patch.object(client, '_execute_operation_internal') as mock_execute:
mock_execute.return_value = Right(KMResponse.success({
"executed": True,
"result": "Macro executed successfully"
}))
execute_result = client.execute_macro(MacroId("macro1"))
assert isinstance(execute_result, Right)
def test_error_recovery_workflow(self) -> None:
"""Test error recovery and retry workflow."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key",
retry_attempts=3
)
client = KMClient(config)
# Connect successfully
with patch.object(client, '_test_connection', return_value=Right(True)):
client.connect()
operation = KMOperation.execute_macro(MacroId("test_macro"))
# Mock to fail twice, then succeed
call_count = 0
def mock_execute(op):
nonlocal call_count
call_count += 1
if call_count <= 2:
return Left(KMError.timeout_error(30.0))
else:
return Right(KMResponse.success({"executed": True}))
with patch.object(client, '_execute_operation_internal', side_effect=mock_execute):
result = client.execute_operation(operation)
assert isinstance(result, Right)
assert call_count == 3 # Two failures + one success
@pytest.mark.asyncio
async def test_async_batch_operations(self) -> None:
"""Test batch operations with async client."""
config = KMConfig(
connection_method=ConnectionMethod.WEB_API,
host="localhost",
port=8080,
api_key="test_key"
)
async with AsyncKMClient(config) as client:
# Mock connection
with patch.object(client, '_test_connection_async', return_value=Right(True)):
await client.connect()
# Mock batch operations
async def mock_execute(operation):
await asyncio.sleep(0.05) # Simulate async work
return Right(KMResponse.success({
"operation": operation.operation_type,
"target": str(operation.target_id) if operation.target_id else None
}))
with patch.object(client, '_execute_operation_async', side_effect=mock_execute):
# Execute multiple operations in batch
operations = [
client.execute_macro(MacroId(f"macro_{i}"))
for i in range(5)
]
results = await asyncio.gather(*operations)
assert len(results) == 5
assert all(isinstance(result, Right) for result in results)
def test_configuration_management_workflow(self) -> None:
"""Test configuration management and validation workflow."""
# Start with minimal config
config = KMConfig()
# Validate initial config
validation = config.validate()
assert validation["valid"] is True # AppleScript doesn't need API key
# Update to web API
config = config.with_connection_method(ConnectionMethod.WEB_API)
config = config.with_api_key("test_key")
config = config.with_host("api.example.com")
config = config.with_port(443)
config = config.with_https(True)
# Validate updated config
validation = config.validate()
assert validation["valid"] is True
# Export and import config
config_dict = config.to_dict()
restored_config = KMConfig.from_dict(config_dict)
assert restored_config.connection_method == ConnectionMethod.WEB_API
assert restored_config.api_key == "test_key"
assert restored_config.host == "api.example.com"
assert restored_config.port == 443
assert restored_config.use_https is True