Skip to main content
Glama
test_security_quality_integration.pyโ€ข16.4 kB
#!/usr/bin/env python3 """ Integration tests for security and quality analysis MCP tools. Tests the integration of security and quality analysis tools with the main MCP server and other FastApply components, including real-world scenarios and error handling. Phase 7 Implementation Tests - Security & Quality Integration """ import shutil import tempfile import unittest from pathlib import Path from fastapply.main import call_tool class TestSecurityQualityMCPIntegration(unittest.TestCase): """Test integration of security and quality MCP tools.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def create_test_project_with_vulnerabilities(self): """Create a test project with security vulnerabilities and quality issues.""" project_structure = { "main.py": """ import os import sqlite3 # Hardcoded secrets (security issue) API_KEY = "sk-1234567890abcdef" DB_PASSWORD = "supersecret123" def get_user_data(user_id): # SQL injection vulnerability query = "SELECT * FROM users WHERE id = " + user_id conn = sqlite3.connect('database.db') cursor = conn.cursor() cursor.execute(query) return cursor.fetchall() def render_user_input(user_input): # XSS vulnerability return f"<div>{user_input}</div>" def read_config_file(filename): # Path traversal vulnerability config_path = "/app/config/" + filename with open(config_path, 'r') as f: return f.read() # Complex function with quality issues def process_complex_data(data): result = [] for item in data: if item is not None: if isinstance(item, dict): if item.get('active'): if item.get('verified'): if item.get('score', 0) > 50: if item.get('category') == 'premium': result.append(item) return result def main(): user_id = input("Enter user ID: ") user_data = get_user_data(user_id) print(f"User data: {user_data}") user_input = input("Enter message: ") rendered = render_user_input(user_input) print(f"Rendered: {rendered}") if __name__ == "__main__": main() """, "requirements.txt": """ requests==2.28.0 # Known vulnerable version django==4.2.0 flask==2.0.0 # Known vulnerable version sqlalchemy==1.4.0 """, "config.py": """ # Configuration file with security issues DEBUG = True # Security risk in production SECRET_KEY = "insecure-secret-key" # Hardcoded secret ALLOWED_HOSTS = ['*'] # Insecure host configuration DATABASE_URL = "sqlite:///database.db" API_VERSION = "v1" """, "utils.py": """ import hashlib import base64 def weak_hash_password(password): # Weak password hashing return hashlib.md5(password.encode()).hexdigest() def insecure_encode(data): # Insecure encoding return base64.b64encode(data.encode()).decode() def duplicate_function(items): # Code duplication total = 0 for item in items: total += item['price'] * item['quantity'] return total def another_duplicate_function(items): # Duplicate code total = 0 for item in items: total += item['price'] * item['quantity'] return total """, } for file_name, content in project_structure.items(): file_path = Path(self.temp_dir) / file_name file_path.write_text(content.strip()) return self.temp_dir async def test_security_scan_comprehensive_mcp_tool(self): """Test security_scan_comprehensive MCP tool.""" project_path = self.create_test_project_with_vulnerabilities() arguments = { "project_path": project_path, "scan_types": ["pattern", "dependencies", "configuration"], "output_format": "json" } result = await call_tool("security_scan_comprehensive", arguments) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) # Should return one text response # The result should contain JSON with security analysis data response_text = result[0]["text"] self.assertIn("security_score", response_text) self.assertIn("total_vulnerabilities", response_text) self.assertIn("vulnerabilities", response_text) async def test_quality_assessment_comprehensive_mcp_tool(self): """Test quality_assessment_comprehensive MCP tool.""" project_path = self.create_test_project_with_vulnerabilities() arguments = { "project_path": project_path, "analysis_types": ["complexity", "code_smells", "maintainability"], "output_format": "json" } result = await call_tool("quality_assessment_comprehensive", arguments) self.assertIsInstance(result, list) response_text = result[0]["text"] self.assertIn("overall_score", response_text) self.assertIn("file_metrics", response_text) async def test_compliance_reporting_generate_mcp_tool(self): """Test compliance_reporting_generate MCP tool.""" project_path = self.create_test_project_with_vulnerabilities() arguments = { "project_path": project_path, "compliance_standards": ["owasp_top_10", "pci_dss"], "output_format": "json" } result = await call_tool("compliance_reporting_generate", arguments) self.assertIsInstance(result, list) response_text = result[0]["text"] self.assertIn("overall_compliance_score", response_text) self.assertIn("owasp_top_10", response_text) async def test_quality_gates_evaluate_mcp_tool(self): """Test quality_gates_evaluate MCP tool.""" project_path = self.create_test_project_with_vulnerabilities() arguments = { "project_path": project_path, "output_format": "json" } result = await call_tool("quality_gates_evaluate", arguments) self.assertIsInstance(result, list) response_text = result[0]["text"] self.assertIn("overall_result", response_text) self.assertIn("passed_gates", response_text) self.assertIn("failed_gates", response_text) async def test_vulnerability_database_check_mcp_tool(self): """Test vulnerability_database_check MCP tool.""" project_path = self.create_test_project_with_vulnerabilities() arguments = { "project_path": project_path, "check_dependencies": True, "check_patterns": True, "output_format": "json" } result = await call_tool("vulnerability_database_check", arguments) self.assertIsInstance(result, list) response_text = result[0]["text"] self.assertIn("vulnerabilities_found", response_text) self.assertIn("dependency_vulnerabilities", response_text) async def test_custom_quality_gates_mcp_tool(self): """Test quality gates with custom gate definitions.""" project_path = self.create_test_project_with_vulnerabilities() custom_gates = [ { "name": "Complexity Limit", "metric": "cyclomatic_complexity", "threshold": 10, "operator": "<=", "severity": "high", "enabled": True }, { "name": "Maintainability Minimum", "metric": "maintainability_index", "threshold": 70, "operator": ">=", "severity": "medium", "enabled": True } ] arguments = { "project_path": project_path, "gates": custom_gates, "fail_on_error": True, "output_format": "json" } result = await call_tool("quality_gates_evaluate", arguments) self.assertIsInstance(result, list) response_text = result[0]["text"] self.assertIn("custom_gates", response_text) async def test_security_quality_integration_workflow(self): """Test complete security and quality analysis workflow.""" project_path = self.create_test_project_with_vulnerabilities() # Step 1: Security scan security_args = { "project_path": project_path, "scan_types": ["pattern", "dependencies"], "output_format": "json" } security_result = await call_tool("security_scan_comprehensive", security_args) # Step 2: Quality assessment quality_args = { "project_path": project_path, "analysis_types": ["complexity", "code_smells"], "output_format": "json" } quality_result = await call_tool("quality_assessment_comprehensive", quality_args) # Step 3: Compliance reporting compliance_args = { "project_path": project_path, "compliance_standards": ["owasp_top_10"], "output_format": "json" } compliance_result = await call_tool("compliance_reporting_generate", compliance_args) # Step 4: Quality gates evaluation gates_args = { "project_path": project_path, "output_format": "json" } gates_result = await call_tool("quality_gates_evaluate", gates_args) # Verify all steps worked self.assertIsInstance(security_result, list) self.assertIsInstance(quality_result, list) self.assertIsInstance(compliance_result, list) self.assertIsInstance(gates_result, list) # Verify security issues detected security_text = security_result[0]["text"] self.assertIn("vulnerabilities", security_text) # Verify quality issues detected quality_text = quality_result[0]["text"] self.assertIn("overall_score", quality_text) # Verify compliance calculated compliance_text = compliance_result[0]["text"] self.assertIn("overall_compliance_score", compliance_text) # Verify gates evaluated gates_text = gates_result[0]["text"] self.assertIn("overall_result", gates_text) class TestSecurityQualityErrorHandling(unittest.TestCase): """Test error handling in security and quality operations.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) async def test_invalid_project_path_handling(self): """Test handling of invalid project paths.""" arguments = {"project_path": "/nonexistent/path", "output_format": "json"} result = await call_tool("security_scan_comprehensive", arguments) self.assertIsInstance(result, list) # Should handle error gracefully response_text = result[0]["text"] self.assertIn("error", response_text.lower()) async def test_permission_error_handling(self): """Test handling of permission errors.""" # Create a directory with restricted permissions restricted_dir = Path(self.temp_dir) / "restricted" restricted_dir.mkdir(mode=0o000) try: arguments = {"project_path": str(restricted_dir), "output_format": "json"} result = await call_tool("quality_assessment_comprehensive", arguments) self.assertIsInstance(result, list) # Should handle permission error gracefully response_text = result[0]["text"] self.assertIn("error", response_text.lower()) finally: # Restore permissions for cleanup restricted_dir.chmod(0o755) async def test_invalid_compliance_standards(self): """Test handling of invalid compliance standards.""" arguments = { "project_path": self.temp_dir, "compliance_standards": ["invalid_standard"], "output_format": "json" } result = await call_tool("compliance_reporting_generate", arguments) self.assertIsInstance(result, list) # Should handle invalid standard gracefully response_text = result[0]["text"] self.assertIn("error", response_text.lower()) async def test_invalid_quality_gate_config(self): """Test handling of invalid quality gate configuration.""" invalid_gates = [ { "name": "Invalid Gate", "metric": "invalid_metric", # Invalid metric "threshold": 10, "operator": "invalid_operator", # Invalid operator "severity": "invalid_severity", # Invalid severity "enabled": True } ] arguments = { "project_path": self.temp_dir, "gates": invalid_gates, "output_format": "json" } result = await call_tool("quality_gates_evaluate", arguments) self.assertIsInstance(result, list) # Should handle invalid configuration gracefully response_text = result[0]["text"] self.assertIn("error", response_text.lower()) class TestSecurityQualityPerformance(unittest.TestCase): """Test performance characteristics of security and quality operations.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def create_large_test_project(self, num_files=20): """Create a large test project for performance testing.""" for i in range(num_files): file_path = Path(self.temp_dir) / f"test_file_{i}.py" content = f""" # Test file {i} import os import sys # Security issue: hardcoded secret API_KEY_{i} = "sk-1234567890abcdef-{i}" def complex_function_{i}(data): # Quality issue: high complexity result = [] for item in data: if item is not None: if isinstance(item, dict): if item.get('active'): if item.get('verified'): if item.get('score', 0) > 50: result.append(item) return result def sql_injection_{i}(user_id): # Security issue: SQL injection query = "SELECT * FROM users WHERE id = " + user_id return execute_query(query) if __name__ == "__main__": print("Test file {i}") """ file_path.write_text(content.strip()) return self.temp_dir async def test_large_project_security_scan_performance(self): """Test performance of security scanning on large projects.""" import time project_path = self.create_large_test_project(15) start_time = time.time() result = await call_tool("security_scan_comprehensive", { "project_path": project_path, "scan_types": ["pattern"], "output_format": "json" }) end_time = time.time() execution_time = end_time - start_time print(f"Security scan performance: {execution_time:.2f}s for 15 files") self.assertIsInstance(result, list) self.assertLess(execution_time, 30.0, f"Security scan took {execution_time:.2f}s, expected < 30s") async def test_large_project_quality_assessment_performance(self): """Test performance of quality assessment on large projects.""" import time project_path = self.create_large_test_project(10) start_time = time.time() result = await call_tool("quality_assessment_comprehensive", { "project_path": project_path, "analysis_types": ["complexity"], "output_format": "json" }) end_time = time.time() execution_time = end_time - start_time print(f"Quality assessment performance: {execution_time:.2f}s for 10 files") self.assertIsInstance(result, list) self.assertLess(execution_time, 20.0, f"Quality assessment took {execution_time:.2f}s, expected < 20s") if __name__ == "__main__": unittest.main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/betmoar/FastApply-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server