Skip to main content
Glama
test_mcp_security.py8.76 kB
"""Unit tests for MCP security module. Tests API key generation, hashing, and validation. """ import hashlib import os from unittest.mock import MagicMock, patch import pytest from fastapi import HTTPException, Request from src.mcp.security import ( generate_api_key, get_supabase_client, hash_api_key, verify_api_key, ) class TestApiKeyGeneration: """Test suite for API key generation and hashing.""" def test_generate_api_key_format(self): """Test generated API key has correct format.""" api_key = generate_api_key() assert api_key.startswith("mcp_") assert len(api_key) > 40 # mcp_ prefix + 32+ characters def test_generate_api_key_uniqueness(self): """Test generated keys are unique.""" keys = [generate_api_key() for _ in range(100)] assert len(set(keys)) == 100 # All unique def test_hash_api_key(self): """Test API key hashing produces consistent results.""" api_key = "mcp_test_key_12345" hash1 = hash_api_key(api_key) hash2 = hash_api_key(api_key) assert hash1 == hash2 assert len(hash1) == 64 # SHA-256 hex digest is 64 characters assert hash1 == hashlib.sha256(api_key.encode()).hexdigest() def test_hash_api_key_different_keys(self): """Test different keys produce different hashes.""" key1 = "mcp_key_1" key2 = "mcp_key_2" hash1 = hash_api_key(key1) hash2 = hash_api_key(key2) assert hash1 != hash2 class TestSupabaseClient: """Test suite for Supabase client initialization.""" @patch.dict( os.environ, {"SUPABASE_URL": "https://test.supabase.co", "SUPABASE_SERVICE_KEY": "test-key"} ) @patch("src.mcp.security.create_client") def test_get_supabase_client_success(self, mock_create_client): """Test Supabase client creation with valid credentials.""" mock_client = MagicMock() mock_create_client.return_value = mock_client client = get_supabase_client() assert client == mock_client mock_create_client.assert_called_once_with("https://test.supabase.co", "test-key") @patch.dict(os.environ, {}, clear=True) def test_get_supabase_client_missing_url(self): """Test error when SUPABASE_URL missing.""" with pytest.raises( ValueError, match="SUPABASE_URL and SUPABASE_SERVICE_KEY must be configured" ): get_supabase_client() @patch.dict(os.environ, {"SUPABASE_URL": "https://test.supabase.co"}, clear=True) def test_get_supabase_client_missing_key(self): """Test error when SUPABASE_SERVICE_KEY missing.""" with pytest.raises( ValueError, match="SUPABASE_URL and SUPABASE_SERVICE_KEY must be configured" ): get_supabase_client() class TestVerifyApiKey: """Test suite for API key verification.""" @pytest.fixture def mock_request(self): """Create mock request.""" request = MagicMock(spec=Request) request.url.path = "/api/v1/test" request.client.host = "127.0.0.1" request.state = MagicMock() return request @pytest.mark.asyncio async def test_verify_api_key_missing(self, mock_request): """Test verification fails when API key missing.""" with pytest.raises(HTTPException) as exc_info: await verify_api_key(mock_request, x_api_key=None) assert exc_info.value.status_code == 401 assert "Missing API key" in exc_info.value.detail @pytest.mark.asyncio @patch("src.mcp.security.get_supabase_client") async def test_verify_api_key_invalid(self, mock_get_client, mock_request): """Test verification fails with invalid API key.""" # Mock Supabase response for invalid key mock_client = MagicMock() mock_response = MagicMock() mock_response.data = None mock_client.table().select().eq().eq().single().execute.return_value = mock_response mock_get_client.return_value = mock_client with pytest.raises(HTTPException) as exc_info: await verify_api_key(mock_request, x_api_key="invalid_key") assert exc_info.value.status_code == 401 assert "Invalid or inactive API key" in exc_info.value.detail @pytest.mark.asyncio @patch("src.mcp.security.get_supabase_client") async def test_verify_api_key_success(self, mock_get_client, mock_request): """Test successful API key verification.""" api_key = "mcp_valid_key_12345" # Mock Supabase response for valid key mock_client = MagicMock() mock_select_response = MagicMock() mock_select_response.data = { "id": "key-123", "organization_id": "org-456", "key_hash": hash_api_key(api_key), "is_active": True, } # Build the chain of method calls mock_query = MagicMock() mock_client.table.return_value = mock_query mock_query.select.return_value = mock_query mock_query.eq.return_value = mock_query mock_query.single.return_value = mock_query mock_query.execute.return_value = mock_select_response # Mock update call mock_update_query = MagicMock() mock_query.update.return_value = mock_update_query mock_update_query.eq.return_value = mock_update_query mock_update_query.execute.return_value = MagicMock() mock_get_client.return_value = mock_client result = await verify_api_key(mock_request, x_api_key=api_key) assert result["organization_id"] == "org-456" assert result["api_key_id"] == "key-123" assert mock_request.state.organization_id == "org-456" assert mock_request.state.api_key_id == "key-123" @pytest.mark.asyncio @patch("src.mcp.security.get_supabase_client") async def test_verify_api_key_updates_last_used(self, mock_get_client, mock_request): """Test that last_used_at timestamp is updated.""" api_key = "mcp_valid_key_12345" # Mock Supabase client mock_client = MagicMock() mock_select_response = MagicMock() mock_select_response.data = { "id": "key-123", "organization_id": "org-456", "key_hash": hash_api_key(api_key), "is_active": True, } # Build query chain mock_query = MagicMock() mock_client.table.return_value = mock_query mock_query.select.return_value = mock_query mock_query.eq.return_value = mock_query mock_query.single.return_value = mock_query mock_query.execute.return_value = mock_select_response # Mock update call mock_update_query = MagicMock() mock_query.update.return_value = mock_update_query mock_update_query.eq.return_value = mock_update_query mock_update_query.execute.return_value = MagicMock() mock_get_client.return_value = mock_client await verify_api_key(mock_request, x_api_key=api_key) # Verify update was called mock_query.update.assert_called_once_with({"last_used_at": "now()"}) @pytest.mark.asyncio @patch("src.mcp.security.get_supabase_client") async def test_verify_api_key_database_error(self, mock_get_client, mock_request): """Test handling of database errors.""" mock_client = MagicMock() mock_client.table().select().eq().eq().single().execute.side_effect = Exception( "Database error" ) mock_get_client.return_value = mock_client with pytest.raises(HTTPException) as exc_info: await verify_api_key(mock_request, x_api_key="some_key") assert exc_info.value.status_code == 401 assert "Authentication failed" in exc_info.value.detail @pytest.mark.asyncio @patch("src.mcp.security.get_supabase_client") async def test_verify_api_key_inactive(self, mock_get_client, mock_request): """Test verification fails for inactive API key.""" api_key = "mcp_inactive_key" # Mock Supabase response for inactive key (is_active=False won't match query) mock_client = MagicMock() mock_response = MagicMock() mock_response.data = None # .eq("is_active", True) filters it out mock_client.table().select().eq().eq().single().execute.return_value = mock_response mock_get_client.return_value = mock_client with pytest.raises(HTTPException) as exc_info: await verify_api_key(mock_request, x_api_key=api_key) assert exc_info.value.status_code == 401 assert "Invalid or inactive API key" in exc_info.value.detail

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server