Skip to main content
Glama

Fabric MCP

by aci-labs
test_security.py11.1 kB
""" Test script for the secure MCP server. Validates authentication, authorization, and security features. """ import requests import json import time import subprocess import threading from typing import Optional class SecurityTester: """Test suite for MCP server security.""" def __init__(self, base_url: str = "http://localhost:8081"): self.base_url = base_url.rstrip('/') self.session = requests.Session() self.session.verify = False # For self-signed certificates def test_health_check(self) -> bool: """Test health check endpoint.""" try: response = self.session.get(f"{self.base_url}/health", timeout=5) response.raise_for_status() health_data = response.json() print(f"✅ Health check passed: {health_data}") return True except Exception as e: print(f"❌ Health check failed: {e}") return False def test_unauthenticated_access(self) -> bool: """Test that unauthenticated requests are rejected.""" try: mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list" } response = self.session.post( f"{self.base_url}/mcp", json=mcp_request, timeout=5 ) if response.status_code == 401: print("✅ Unauthenticated access properly rejected") return True else: print(f"❌ Unauthenticated access allowed (status: {response.status_code})") return False except Exception as e: print(f"❌ Error testing unauthenticated access: {e}") return False def test_authentication(self, username: str = "admin", password: str = "changeme") -> Optional[str]: """Test username/password authentication.""" try: response = self.session.post( f"{self.base_url}/auth/login", json={"username": username, "password": password}, timeout=5 ) if response.status_code == 200: token_data = response.json() token = token_data.get("access_token") print(f"✅ Authentication successful: {username}") return token else: print(f"❌ Authentication failed: {response.status_code} - {response.text}") return None except Exception as e: print(f"❌ Authentication error: {e}") return None def test_authenticated_access(self, token: str) -> bool: """Test authenticated MCP tool access.""" try: headers = {"Authorization": f"Bearer {token}"} mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list" } response = self.session.post( f"{self.base_url}/mcp", json=mcp_request, headers=headers, timeout=5 ) if response.status_code == 200: result = response.json() print(f"✅ Authenticated access successful") if 'result' in result and 'tools' in result['result']: tools = result['result']['tools'] print(f" Available tools: {[t['name'] for t in tools]}") return True else: print(f"❌ Authenticated access failed: {response.status_code}") return False except Exception as e: print(f"❌ Authenticated access error: {e}") return False def test_token_verification(self, token: str) -> bool: """Test token verification endpoint.""" try: headers = {"Authorization": f"Bearer {token}"} response = self.session.get( f"{self.base_url}/auth/verify", headers=headers, timeout=5 ) if response.status_code == 200: verify_data = response.json() print(f"✅ Token verification passed: {verify_data}") return True else: print(f"❌ Token verification failed: {response.status_code}") return False except Exception as e: print(f"❌ Token verification error: {e}") return False def test_invalid_credentials(self) -> bool: """Test that invalid credentials are rejected.""" try: response = self.session.post( f"{self.base_url}/auth/login", json={"username": "invalid", "password": "invalid"}, timeout=5 ) if response.status_code == 401: print("✅ Invalid credentials properly rejected") return True else: print(f"❌ Invalid credentials accepted (status: {response.status_code})") return False except Exception as e: print(f"❌ Error testing invalid credentials: {e}") return False def test_rate_limiting(self, token: str) -> bool: """Test rate limiting functionality.""" try: headers = {"Authorization": f"Bearer {token}"} # Make multiple rapid requests success_count = 0 rate_limited = False for i in range(10): response = self.session.get( f"{self.base_url}/health", headers=headers, timeout=5 ) if response.status_code == 200: success_count += 1 elif response.status_code == 429: # Too Many Requests rate_limited = True break time.sleep(0.1) # Small delay between requests if success_count > 0: print(f"✅ Rate limiting configured (processed {success_count} requests)") if rate_limited: print(" Rate limit triggered as expected") return True else: print("❌ No requests succeeded") return False except Exception as e: print(f"❌ Rate limiting test error: {e}") return False def test_security_headers(self) -> bool: """Test that security headers are present.""" try: response = self.session.get(f"{self.base_url}/", timeout=5) headers = response.headers security_headers = { 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'DENY', 'X-XSS-Protection': '1; mode=block', 'Content-Security-Policy': "default-src 'self'" } missing_headers = [] for header, expected_value in security_headers.items(): if header not in headers: missing_headers.append(header) elif headers[header] != expected_value: print(f"⚠️ Security header {header} has unexpected value: {headers[header]}") if missing_headers: print(f"❌ Missing security headers: {missing_headers}") return False else: print("✅ All security headers present") return True except Exception as e: print(f"❌ Security headers test error: {e}") return False def run_all_tests(self) -> bool: """Run all security tests.""" print("🔒 Starting MCP Server Security Tests") print("=" * 50) test_results = [] # Test health check test_results.append(self.test_health_check()) # Test security headers test_results.append(self.test_security_headers()) # Test unauthenticated access test_results.append(self.test_unauthenticated_access()) # Test invalid credentials test_results.append(self.test_invalid_credentials()) # Test authentication token = self.test_authentication() if token: test_results.append(True) # Test authenticated access test_results.append(self.test_authenticated_access(token)) # Test token verification test_results.append(self.test_token_verification(token)) # Test rate limiting test_results.append(self.test_rate_limiting(token)) else: test_results.extend([False, False, False, False]) # Results summary passed = sum(test_results) total = len(test_results) print("\n" + "=" * 50) print(f"Test Results: {passed}/{total} passed") if passed == total: print("🎉 All security tests passed!") return True else: print("⚠️ Some security tests failed. Please review the output above.") return False def main(): """Main test runner.""" import argparse parser = argparse.ArgumentParser(description="Test MCP server security") parser.add_argument("--url", default="http://localhost:8081", help="Server URL") parser.add_argument("--start-server", action="store_true", help="Start secure server before testing") parser.add_argument("--server-args", default="", help="Additional server arguments") args = parser.parse_args() server_process = None if args.start_server: print("🚀 Starting secure MCP server...") server_cmd = f"python secure_fabric_mcp.py {args.server_args}" server_process = subprocess.Popen( server_cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Wait for server to start print("⏳ Waiting for server to start...") time.sleep(5) try: # Run tests tester = SecurityTester(args.url) success = tester.run_all_tests() if success: print("\n✅ Security validation completed successfully!") else: print("\n❌ Security validation failed!") finally: if server_process: print("\n🛑 Stopping server...") server_process.terminate() server_process.wait() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aci-labs/ms-fabric-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server