Skip to main content
Glama
JoshDoesIT

Network Security Control Reviews

by JoshDoesIT
test_integration.py16.5 kB
""" Integration Tests for AWS Security Groups Parser This module contains integration tests that make real AWS API calls. These tests verify end-to-end functionality with actual AWS resources. Prerequisites: - AWS credentials configured (via aws configure or environment variables) - AWS test environment deployed via Terraform (see ../../docs/aws-setup.md) - Terraform outputs available (VPC IDs) Usage: # Run integration tests (requires AWS credentials) python mcp-server/tests/test_integration.py # Or skip if credentials not available (tests will be skipped automatically) python mcp-server/tests/test_integration.py Note: These tests make real AWS API calls and may incur minimal costs. Tests are automatically skipped if AWS credentials are not available. """ import os import sys import subprocess from pathlib import Path from typing import Optional, Tuple # Add parent directory to Python path for imports # Tests are in mcp-server/tests/, need to import from mcp-server/src/ sys.path.insert(0, str(Path(__file__).parent.parent)) # Import parser module from src.parsers.aws_security_groups import AWSSecurityGroupsParser # Try to import boto3 for credential checking try: import boto3 from botocore.exceptions import NoCredentialsError, ClientError, MissingDependencyException AWS_AVAILABLE = True except ImportError: AWS_AVAILABLE = False def check_aws_credentials() -> bool: """ Check if AWS credentials are available and valid. Uses the 'mcp' profile to avoid SSO/login credential issues. Returns: True if credentials are available and valid, False otherwise """ if not AWS_AVAILABLE: return False try: # Try to create a session with the mcp profile and get caller identity # This avoids SSO/login credential issues by using access key credentials session = boto3.Session(profile_name='mcp') sts = session.client('sts') sts.get_caller_identity() return True except (NoCredentialsError, ClientError, MissingDependencyException): # MissingDependencyException occurs when trying to use SSO/login credentials # without botocore[crt] installed return False def get_terraform_output(output_name: str) -> Optional[str]: """ Get a Terraform output value. Args: output_name: Name of the Terraform output to retrieve Returns: Output value as string, or None if not available """ try: # Terraform directory is two levels up from mcp-server/tests/ terraform_dir = Path(__file__).parent.parent.parent / "terraform" if not terraform_dir.exists(): return None # Run terraform output command result = subprocess.run( ["terraform", "output", "-raw", output_name], cwd=terraform_dir, capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip() return None except (subprocess.TimeoutExpired, FileNotFoundError, Exception): return None def get_vpc_ids() -> Tuple[Optional[str], Optional[str]]: """ Get VPC IDs from Terraform outputs or environment variables. Returns: Tuple of (production_vpc_id, development_vpc_id) """ # Try Terraform outputs first prod_vpc = get_terraform_output("production_vpc_id") dev_vpc = get_terraform_output("development_vpc_id") # Fall back to environment variables if not prod_vpc: prod_vpc = os.getenv("TEST_PRODUCTION_VPC_ID") if not dev_vpc: dev_vpc = os.getenv("TEST_DEVELOPMENT_VPC_ID") return (prod_vpc, dev_vpc) def skip_if_no_credentials(test_func): """ Decorator to skip a test if AWS credentials are not available. Usage: @skip_if_no_credentials def test_something(): ... """ def wrapper(*args, **kwargs): if not check_aws_credentials(): print(f"⏭️ Skipping {test_func.__name__}: AWS credentials not available") return return test_func(*args, **kwargs) wrapper.__name__ = test_func.__name__ return wrapper @skip_if_no_credentials def test_load_production_vpc_from_aws(): """ Integration test: Load production VPC security groups from AWS. Verifies: - Can connect to AWS API - Can load security groups from a real VPC - Parser correctly handles real AWS API responses - Summary generation works with real data """ prod_vpc_id, _ = get_vpc_ids() if not prod_vpc_id: print("⏭️ Skipping test: Production VPC ID not available") print(" Deploy Terraform infrastructure or set TEST_PRODUCTION_VPC_ID env var") return print(f"\n🧪 Testing: Load production VPC ({prod_vpc_id}) from AWS...") try: # Create parser instance parser = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") # Load from AWS (real API call) security_groups = parser.load_from_aws(vpc_id=prod_vpc_id) # Verify we got security groups assert len(security_groups) > 0, "Should load at least one security group" print(f" ✓ Loaded {len(security_groups)} security groups") # Verify summary generation summary = parser.get_summary() assert summary["total_security_groups"] > 0 assert summary["total_rules"] > 0 assert prod_vpc_id in summary["vpcs"] assert summary["aws_source"] == f"AWS VPC: {prod_vpc_id}" print(f" ✓ Summary: {summary['total_security_groups']} SGs, {summary['total_rules']} rules") # Verify security groups have expected structure for sg_id, sg in security_groups.items(): assert sg.group_id == sg_id assert sg.vpc_id == prod_vpc_id assert len(sg.ip_permissions) >= 0 # Can have zero ingress rules assert len(sg.ip_permissions_egress) >= 0 # Can have zero egress rules print(" ✓ All security groups have valid structure") print("✅ test_load_production_vpc_from_aws passed") except Exception as e: print(f"❌ test_load_production_vpc_from_aws failed: {e}") raise @skip_if_no_credentials def test_load_development_vpc_from_aws(): """ Integration test: Load development VPC security groups from AWS. Verifies: - Can load development environment security groups - Parser handles different VPC configurations """ _, dev_vpc_id = get_vpc_ids() if not dev_vpc_id: print("⏭️ Skipping test: Development VPC ID not available") print(" Deploy Terraform infrastructure or set TEST_DEVELOPMENT_VPC_ID env var") return print(f"\n🧪 Testing: Load development VPC ({dev_vpc_id}) from AWS...") try: # Create parser instance parser = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") # Load from AWS (real API call) security_groups = parser.load_from_aws(vpc_id=dev_vpc_id) # Verify we got security groups assert len(security_groups) > 0, "Should load at least one security group" print(f" ✓ Loaded {len(security_groups)} security groups") # Verify summary summary = parser.get_summary() assert dev_vpc_id in summary["vpcs"] print(f" ✓ Summary: {summary['total_security_groups']} SGs, {summary['total_rules']} rules") print("✅ test_load_development_vpc_from_aws passed") except Exception as e: print(f"❌ test_load_development_vpc_from_aws failed: {e}") raise @skip_if_no_credentials def test_query_rules_with_real_data(): """ Integration test: Query rules from real AWS security groups. Verifies: - Rule querying works with real AWS data - Tag-based filtering works - Source CIDR filtering works """ prod_vpc_id, _ = get_vpc_ids() if not prod_vpc_id: print("⏭️ Skipping test: Production VPC ID not available") return print(f"\n🧪 Testing: Query rules from production VPC ({prod_vpc_id})...") try: # Create parser and load from AWS parser = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") parser.load_from_aws(vpc_id=prod_vpc_id) # Query all rules all_rules = parser.get_all_rules() assert len(all_rules) > 0, "Should have at least one rule" print(f" ✓ Found {len(all_rules)} total rules") # Query by tag (if tags exist) # Try to find rules from security groups with Environment tag env_rules = parser.query_rules(tag_key="Environment") print(f" ✓ Found {len(env_rules)} rules with Environment tag") # Query for overly permissive rules (0.0.0.0/0) permissive_rules = parser.query_rules(source="0.0.0.0/0") print(f" ✓ Found {len(permissive_rules)} permissive rules (0.0.0.0/0)") # Query by port (if any rules exist) if all_rules: # Try to find rules for common ports for port in [80, 443, 22]: port_rules = parser.query_rules(port=port) if port_rules: print(f" ✓ Found {len(port_rules)} rules for port {port}") print("✅ test_query_rules_with_real_data passed") except Exception as e: print(f"❌ test_query_rules_with_real_data failed: {e}") raise @skip_if_no_credentials def test_load_by_security_group_ids(): """ Integration test: Load specific security groups by their IDs. Verifies: - Can load specific security groups by ID - Works with real AWS security group IDs """ prod_vpc_id, _ = get_vpc_ids() if not prod_vpc_id: print("⏭️ Skipping test: Production VPC ID not available") return print(f"\n🧪 Testing: Load specific security groups by ID...") try: # First, load all security groups to get some IDs parser_all = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") all_sgs = parser_all.load_from_aws(vpc_id=prod_vpc_id) if len(all_sgs) == 0: print("⏭️ Skipping test: No security groups found in VPC") return # Get first two security group IDs sg_ids = list(all_sgs.keys())[:2] print(f" ✓ Testing with security group IDs: {sg_ids}") # Load specific security groups by ID parser_specific = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") specific_sgs = parser_specific.load_from_aws(security_group_ids=sg_ids) # Verify we got the expected security groups assert len(specific_sgs) == len(sg_ids), f"Expected {len(sg_ids)} SGs, got {len(specific_sgs)}" for sg_id in sg_ids: assert sg_id in specific_sgs, f"Security group {sg_id} should be loaded" print(f" ✓ Successfully loaded {len(specific_sgs)} security groups by ID") print("✅ test_load_by_security_group_ids passed") except Exception as e: print(f"❌ test_load_by_security_group_ids failed: {e}") raise @skip_if_no_credentials def test_segmentation_analysis(): """ Integration test: Test segmentation analysis between production and development. Verifies: - Can load both environments - Can identify cross-network connections - Segmentation analysis works with real data """ prod_vpc_id, dev_vpc_id = get_vpc_ids() if not prod_vpc_id or not dev_vpc_id: print("⏭️ Skipping test: VPC IDs not available") return print(f"\n🧪 Testing: Segmentation analysis between VPCs...") print(f" Production VPC: {prod_vpc_id}") print(f" Development VPC: {dev_vpc_id}") try: # Load production security groups parser_prod = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") prod_sgs = parser_prod.load_from_aws(vpc_id=prod_vpc_id) prod_summary = parser_prod.get_summary() print(f" ✓ Production: {prod_summary['total_security_groups']} SGs, {prod_summary['total_rules']} rules") # Load development security groups parser_dev = AWSSecurityGroupsParser(aws_region="us-east-1", aws_profile="mcp") dev_sgs = parser_dev.load_from_aws(vpc_id=dev_vpc_id) dev_summary = parser_dev.get_summary() print(f" ✓ Development: {dev_summary['total_security_groups']} SGs, {dev_summary['total_rules']} rules") # Check for overly permissive rules in development dev_permissive = parser_dev.query_rules(source="0.0.0.0/0") print(f" ✓ Development permissive rules: {len(dev_permissive)}") # Check for production permissive rules prod_permissive = parser_prod.query_rules(source="0.0.0.0/0") print(f" ✓ Production permissive rules: {len(prod_permissive)}") # Note: Cross-network connection detection would require loading both VPCs # into the same parser instance, which is a more advanced use case print("✅ test_segmentation_analysis passed") except Exception as e: print(f"❌ test_segmentation_analysis failed: {e}") raise if __name__ == "__main__": """ Run all integration tests when executed directly. Usage: # Run all integration tests (requires AWS credentials) python mcp-server/tests/test_integration.py Note: Tests are automatically skipped if AWS credentials are not available. VPC IDs are read from Terraform outputs or environment variables. """ print("=" * 70) print("Integration Tests for AWS Security Groups Parser") print("=" * 70) # Check prerequisites if not AWS_AVAILABLE: print("\n❌ boto3 not installed. Install with: pip install boto3") print(" Skipping all integration tests.") sys.exit(0) if not check_aws_credentials(): print("\n⏭️ AWS credentials not available.") print(" Configure credentials using: aws configure") print(" Or set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables") print(" Skipping all integration tests.") sys.exit(0) print("\n✓ AWS credentials available") # Check for VPC IDs prod_vpc, dev_vpc = get_vpc_ids() if prod_vpc: print(f"✓ Production VPC ID: {prod_vpc}") else: print("⚠️ Production VPC ID not found (from Terraform or TEST_PRODUCTION_VPC_ID)") if dev_vpc: print(f"✓ Development VPC ID: {dev_vpc}") else: print("⚠️ Development VPC ID not found (from Terraform or TEST_DEVELOPMENT_VPC_ID)") print("\n" + "=" * 70) print("Running Integration Tests...") print("=" * 70) # Run tests tests = [ test_load_production_vpc_from_aws, test_load_development_vpc_from_aws, test_query_rules_with_real_data, test_load_by_security_group_ids, test_segmentation_analysis, ] passed = 0 skipped = 0 failed = 0 for test in tests: try: test() passed += 1 except AssertionError as e: print(f"\n❌ {test.__name__} failed: {e}") failed += 1 except Exception as e: # Check if it was skipped (no exception means skip) if "Skipping" in str(e) or "not available" in str(e): skipped += 1 else: print(f"\n❌ {test.__name__} failed with error: {e}") failed += 1 print("\n" + "=" * 70) print("Integration Test Results") print("=" * 70) print(f"Passed: {passed}") print(f"Skipped: {skipped}") print(f"Failed: {failed}") print("=" * 70) if failed > 0: sys.exit(1) else: print("\n✅ All integration tests passed!")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JoshDoesIT/Network-Security-Control-Reviews-with-MCP-and-LLMs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server