Skip to main content
Glama

Wireshark MCP

security_signature_example.py9.22 kB
#!/usr/bin/env python3 """ Example demonstrating the use of message signatures in Wireshark MCP. This example shows how to use the generate_message_signature method to create secure signatures for agent-to-agent communication, and how to verify those signatures to ensure message integrity and authenticity. """ import os import sys import json import time import logging from typing import Dict, Any # Add the parent directory to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Import the security module from wireshark_mcp.security import ( AgentSecurityWrapper, SecurityMonitor, DEFAULT_SECURITY_POLICIES ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def print_separator(title: str = None): """Print a separator line with an optional title.""" print("\n" + "=" * 80) if title: print(f"{title.center(80)}") print("=" * 80) print() def example_1_basic_signature(): """Example 1: Basic message signature generation and verification.""" print_separator("Example 1: Basic Message Signatures") # Create a mock agent and security wrapper class MockAgent: def __init__(self, name: str): self.name = name agent = MockAgent("Agent1") security_monitor = SecurityMonitor() security_wrapper = AgentSecurityWrapper( agent=agent, security_monitor=security_monitor, policies=DEFAULT_SECURITY_POLICIES ) # Create a test message message = "Hello, this is a secure message from Agent1." # Generate a signature using the default settings (HMAC-SHA256 with timestamp) print("Generating signature with default settings (HMAC-SHA256 with timestamp)...") signature_data = security_wrapper.generate_message_signature(message) # Print the signature data print("\nSignature Data:") print(json.dumps(signature_data, indent=2)) # Verify the signature print("\nVerifying signature...") is_valid = security_wrapper.verify_message_signature(message, signature_data) print(f"Signature valid: {is_valid}") # Try to verify a tampered message print("\nTrying to verify a tampered message...") tampered_message = message + " This part was added by an attacker." is_valid = security_wrapper.verify_message_signature(tampered_message, signature_data) print(f"Tampered message signature valid: {is_valid}") def example_2_algorithm_options(): """Example 2: Comparing different signature algorithms.""" print_separator("Example 2: Comparing Different Signature Algorithms") # Create a mock agent and security wrapper class MockAgent: def __init__(self, name: str): self.name = name agent = MockAgent("Agent2") security_monitor = SecurityMonitor() security_wrapper = AgentSecurityWrapper( agent=agent, security_monitor=security_monitor, policies=DEFAULT_SECURITY_POLICIES ) # Create a test message message = "This message will be signed with different algorithms." # Test different algorithms algorithms = ["sha256", "sha512", "hmac-sha256", "hmac-sha512"] for algorithm in algorithms: print(f"\nTesting algorithm: {algorithm}") # Generate a signature with the current algorithm signature_data = security_wrapper.generate_message_signature( message=message, algorithm=algorithm, include_timestamp=True ) # Print the signature data print(f"Signature data size: {len(json.dumps(signature_data))} bytes") print(f"Signature value: {signature_data['signature'][:20]}... (truncated)") # Verify the signature is_valid = security_wrapper.verify_message_signature(message, signature_data) print(f"Signature valid: {is_valid}") def example_3_agent_to_agent_communication(): """Example 3: Simulating secure agent-to-agent communication.""" print_separator("Example 3: Secure Agent-to-Agent Communication") # Create two mock agents and their security wrappers class MockAgent: def __init__(self, name: str): self.name = name self.received_messages = [] def receive_message(self, message: str, sender: str, signature_data: Dict[str, Any] = None): self.received_messages.append({ "message": message, "sender": sender, "signature_data": signature_data, "timestamp": time.time() }) return f"Received message from {sender}: {message[:30]}..." # Create Agent A agent_a = MockAgent("AgentA") security_monitor_a = SecurityMonitor() security_wrapper_a = AgentSecurityWrapper( agent=agent_a, security_monitor=security_monitor_a, policies=DEFAULT_SECURITY_POLICIES ) # Create Agent B agent_b = MockAgent("AgentB") security_monitor_b = SecurityMonitor() security_wrapper_b = AgentSecurityWrapper( agent=agent_b, security_monitor=security_monitor_b, policies=DEFAULT_SECURITY_POLICIES ) # Generate a shared secret key for both agents shared_secret = "supersecretkey123" # In a real system, use a secure key exchange print("Simulating secure communication between AgentA and AgentB") # AgentA sends a message to AgentB message_from_a = "Hello AgentB, here's some confidential data: XYZ-123." print(f"\nAgentA -> AgentB: {message_from_a}") # AgentA signs the message signature_data = security_wrapper_a.generate_message_signature( message=message_from_a, key=shared_secret, algorithm="hmac-sha256" ) print("Message signed by AgentA:") print(json.dumps(signature_data, indent=2)) # Simulate message transmission print("\nTransmitting message over the network...") # AgentB receives and verifies the message print("\nAgentB verifying the message signature...") # In a real implementation, AgentB would need to register the shared secret security_wrapper_b.secrets[signature_data["key_id"]] = shared_secret is_valid = security_wrapper_b.verify_message_signature(message_from_a, signature_data) print(f"Signature valid: {is_valid}") if is_valid: # Process the message only if the signature is valid agent_b.receive_message(message_from_a, "AgentA", signature_data) print(f"AgentB has processed the message from AgentA") print(f"AgentB received messages: {len(agent_b.received_messages)}") else: print("AgentB rejected the message due to invalid signature") def example_4_message_expiration(): """Example 4: Message expiration based on timestamp.""" print_separator("Example 4: Message Expiration") # Create mock agents class MockAgent: def __init__(self, name: str): self.name = name agent = MockAgent("TimeAgent") security_monitor = SecurityMonitor() security_wrapper = AgentSecurityWrapper( agent=agent, security_monitor=security_monitor, policies=DEFAULT_SECURITY_POLICIES ) # Create a test message message = "This message will expire after a short time." # Generate a signature with a custom timestamp (5 minutes in the past) current_time = int(time.time()) past_time = current_time - (5 * 60) # 5 minutes ago # Override the current time function for the test original_time = time.time try: # First, set the time to 5 minutes ago to create the signature time.time = lambda: past_time print("Generating signature with timestamp from 5 minutes ago...") signature_data = security_wrapper.generate_message_signature(message) print(f"Signature timestamp: {signature_data['timestamp']} ({time.ctime(signature_data['timestamp'])})") # Now, restore the current time to verify with current time time.time = original_time print(f"\nCurrent time: {current_time} ({time.ctime(current_time)})") print("Verifying signature with current time...") # The default expiration is 5 minutes, so this should fail is_valid = security_wrapper.verify_message_signature(message, signature_data) print(f"Signature valid: {is_valid} (Expected: False due to expiration)") finally: # Restore the original time function time.time = original_time def main(): """Main function to run the examples.""" print("Wireshark MCP Security Signature Examples") print("=========================================") # Run the examples example_1_basic_signature() example_2_algorithm_options() example_3_agent_to_agent_communication() example_4_message_expiration() print("\nAll examples completed.") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sarthaksiddha/Wireshark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server