security_signature_example.py•9.22 kB
#!/usr/bin/env python3
"""
Example demonstrating the use of message signatures in Wireshark MCP.
This example shows how to use the generate_message_signature method to create
secure signatures for agent-to-agent communication, and how to verify those
signatures to ensure message integrity and authenticity.
"""
import os
import sys
import json
import time
import logging
from typing import Dict, Any
# Add the parent directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Import the security module
from wireshark_mcp.security import (
    AgentSecurityWrapper,
    SecurityMonitor,
    DEFAULT_SECURITY_POLICIES
)
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def print_separator(title: str = None):
    """Print a separator line with an optional title."""
    print("\n" + "=" * 80)
    if title:
        print(f"{title.center(80)}")
        print("=" * 80)
    print()
def example_1_basic_signature():
    """Example 1: Basic message signature generation and verification."""
    print_separator("Example 1: Basic Message Signatures")
    
    # Create a mock agent and security wrapper
    class MockAgent:
        def __init__(self, name: str):
            self.name = name
    
    agent = MockAgent("Agent1")
    security_monitor = SecurityMonitor()
    security_wrapper = AgentSecurityWrapper(
        agent=agent,
        security_monitor=security_monitor,
        policies=DEFAULT_SECURITY_POLICIES
    )
    
    # Create a test message
    message = "Hello, this is a secure message from Agent1."
    
    # Generate a signature using the default settings (HMAC-SHA256 with timestamp)
    print("Generating signature with default settings (HMAC-SHA256 with timestamp)...")
    signature_data = security_wrapper.generate_message_signature(message)
    
    # Print the signature data
    print("\nSignature Data:")
    print(json.dumps(signature_data, indent=2))
    
    # Verify the signature
    print("\nVerifying signature...")
    is_valid = security_wrapper.verify_message_signature(message, signature_data)
    print(f"Signature valid: {is_valid}")
    
    # Try to verify a tampered message
    print("\nTrying to verify a tampered message...")
    tampered_message = message + " This part was added by an attacker."
    is_valid = security_wrapper.verify_message_signature(tampered_message, signature_data)
    print(f"Tampered message signature valid: {is_valid}")
def example_2_algorithm_options():
    """Example 2: Comparing different signature algorithms."""
    print_separator("Example 2: Comparing Different Signature Algorithms")
    
    # Create a mock agent and security wrapper
    class MockAgent:
        def __init__(self, name: str):
            self.name = name
    
    agent = MockAgent("Agent2")
    security_monitor = SecurityMonitor()
    security_wrapper = AgentSecurityWrapper(
        agent=agent,
        security_monitor=security_monitor,
        policies=DEFAULT_SECURITY_POLICIES
    )
    
    # Create a test message
    message = "This message will be signed with different algorithms."
    
    # Test different algorithms
    algorithms = ["sha256", "sha512", "hmac-sha256", "hmac-sha512"]
    
    for algorithm in algorithms:
        print(f"\nTesting algorithm: {algorithm}")
        
        # Generate a signature with the current algorithm
        signature_data = security_wrapper.generate_message_signature(
            message=message,
            algorithm=algorithm,
            include_timestamp=True
        )
        
        # Print the signature data
        print(f"Signature data size: {len(json.dumps(signature_data))} bytes")
        print(f"Signature value: {signature_data['signature'][:20]}... (truncated)")
        
        # Verify the signature
        is_valid = security_wrapper.verify_message_signature(message, signature_data)
        print(f"Signature valid: {is_valid}")
def example_3_agent_to_agent_communication():
    """Example 3: Simulating secure agent-to-agent communication."""
    print_separator("Example 3: Secure Agent-to-Agent Communication")
    
    # Create two mock agents and their security wrappers
    class MockAgent:
        def __init__(self, name: str):
            self.name = name
            self.received_messages = []
        
        def receive_message(self, message: str, sender: str, signature_data: Dict[str, Any] = None):
            self.received_messages.append({
                "message": message,
                "sender": sender,
                "signature_data": signature_data,
                "timestamp": time.time()
            })
            return f"Received message from {sender}: {message[:30]}..."
    
    # Create Agent A
    agent_a = MockAgent("AgentA")
    security_monitor_a = SecurityMonitor()
    security_wrapper_a = AgentSecurityWrapper(
        agent=agent_a,
        security_monitor=security_monitor_a,
        policies=DEFAULT_SECURITY_POLICIES
    )
    
    # Create Agent B
    agent_b = MockAgent("AgentB")
    security_monitor_b = SecurityMonitor()
    security_wrapper_b = AgentSecurityWrapper(
        agent=agent_b,
        security_monitor=security_monitor_b,
        policies=DEFAULT_SECURITY_POLICIES
    )
    
    # Generate a shared secret key for both agents
    shared_secret = "supersecretkey123"  # In a real system, use a secure key exchange
    
    print("Simulating secure communication between AgentA and AgentB")
    
    # AgentA sends a message to AgentB
    message_from_a = "Hello AgentB, here's some confidential data: XYZ-123."
    print(f"\nAgentA -> AgentB: {message_from_a}")
    
    # AgentA signs the message
    signature_data = security_wrapper_a.generate_message_signature(
        message=message_from_a,
        key=shared_secret,
        algorithm="hmac-sha256"
    )
    
    print("Message signed by AgentA:")
    print(json.dumps(signature_data, indent=2))
    
    # Simulate message transmission
    print("\nTransmitting message over the network...")
    
    # AgentB receives and verifies the message
    print("\nAgentB verifying the message signature...")
    
    # In a real implementation, AgentB would need to register the shared secret
    security_wrapper_b.secrets[signature_data["key_id"]] = shared_secret
    
    is_valid = security_wrapper_b.verify_message_signature(message_from_a, signature_data)
    print(f"Signature valid: {is_valid}")
    
    if is_valid:
        # Process the message only if the signature is valid
        agent_b.receive_message(message_from_a, "AgentA", signature_data)
        print(f"AgentB has processed the message from AgentA")
        print(f"AgentB received messages: {len(agent_b.received_messages)}")
    else:
        print("AgentB rejected the message due to invalid signature")
def example_4_message_expiration():
    """Example 4: Message expiration based on timestamp."""
    print_separator("Example 4: Message Expiration")
    
    # Create mock agents
    class MockAgent:
        def __init__(self, name: str):
            self.name = name
    
    agent = MockAgent("TimeAgent")
    security_monitor = SecurityMonitor()
    security_wrapper = AgentSecurityWrapper(
        agent=agent,
        security_monitor=security_monitor,
        policies=DEFAULT_SECURITY_POLICIES
    )
    
    # Create a test message
    message = "This message will expire after a short time."
    
    # Generate a signature with a custom timestamp (5 minutes in the past)
    current_time = int(time.time())
    past_time = current_time - (5 * 60)  # 5 minutes ago
    
    # Override the current time function for the test
    original_time = time.time
    try:
        # First, set the time to 5 minutes ago to create the signature
        time.time = lambda: past_time
        
        print("Generating signature with timestamp from 5 minutes ago...")
        signature_data = security_wrapper.generate_message_signature(message)
        print(f"Signature timestamp: {signature_data['timestamp']} ({time.ctime(signature_data['timestamp'])})")
        
        # Now, restore the current time to verify with current time
        time.time = original_time
        
        print(f"\nCurrent time: {current_time} ({time.ctime(current_time)})")
        print("Verifying signature with current time...")
        
        # The default expiration is 5 minutes, so this should fail
        is_valid = security_wrapper.verify_message_signature(message, signature_data)
        print(f"Signature valid: {is_valid} (Expected: False due to expiration)")
        
    finally:
        # Restore the original time function
        time.time = original_time
def main():
    """Main function to run the examples."""
    print("Wireshark MCP Security Signature Examples")
    print("=========================================")
    
    # Run the examples
    example_1_basic_signature()
    example_2_algorithm_options()
    example_3_agent_to_agent_communication()
    example_4_message_expiration()
    
    print("\nAll examples completed.")
if __name__ == "__main__":
    main()