Skip to main content
Glama

Meraki Magic MCP

by MKutka
meraki-mcp.py39.5 kB
import os import json import meraki import asyncio import functools from typing import Dict, List, Optional, Any, TypedDict, Union, Callable from pydantic import BaseModel, Field from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Create an MCP server mcp = FastMCP("Meraki Magic MCP") # Configuration MERAKI_API_KEY = os.getenv("MERAKI_API_KEY") MERAKI_ORG_ID = os.getenv("MERAKI_ORG_ID") # Initialize Meraki API client using Meraki SDK dashboard = meraki.DashboardAPI(api_key=MERAKI_API_KEY, suppress_logging=True) ################### # ASYNC UTILITIES ################### def to_async(func: Callable) -> Callable: """ Convert a synchronous function to an asynchronous function Args: func: The synchronous function to convert Returns: An asynchronous version of the function """ @functools.wraps(func) async def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: func(*args, **kwargs) ) return wrapper # Create async versions of commonly used Meraki API methods async_get_organizations = to_async(dashboard.organizations.getOrganizations) async_get_organization = to_async(dashboard.organizations.getOrganization) async_get_organization_networks = to_async(dashboard.organizations.getOrganizationNetworks) async_get_organization_devices = to_async(dashboard.organizations.getOrganizationDevices) async_get_network = to_async(dashboard.networks.getNetwork) async_get_network_devices = to_async(dashboard.networks.getNetworkDevices) async_get_network_clients = to_async(dashboard.networks.getNetworkClients) async_get_device = to_async(dashboard.devices.getDevice) async_update_device = to_async(dashboard.devices.updateDevice) async_get_wireless_ssids = to_async(dashboard.wireless.getNetworkWirelessSsids) async_update_wireless_ssid = to_async(dashboard.wireless.updateNetworkWirelessSsid) ################### # SCHEMA DEFINITIONS ################### # Wireless SSID Schema class Dot11wSettings(BaseModel): enabled: bool = Field(False, description="Whether 802.11w is enabled or not") required: bool = Field(False, description="Whether 802.11w is required or not") class Dot11rSettings(BaseModel): enabled: bool = Field(False, description="Whether 802.11r is enabled or not") adaptive: bool = Field(False, description="Whether 802.11r is adaptive or not") class RadiusServer(BaseModel): host: str = Field(..., description="IP address of the RADIUS server") port: int = Field(..., description="Port of the RADIUS server") secret: str = Field(..., description="Secret for the RADIUS server") radsecEnabled: Optional[bool] = Field(None, description="Whether RADSEC is enabled or not") openRoamingCertificateId: Optional[int] = Field(None, description="OpenRoaming certificate ID") caCertificate: Optional[str] = Field(None, description="CA certificate for RADSEC") class SsidUpdateSchema(BaseModel): name: Optional[str] = Field(None, description="The name of the SSID") enabled: Optional[bool] = Field(None, description="Whether the SSID is enabled or not") authMode: Optional[str] = Field(None, description="The auth mode for the SSID (e.g., 'open', 'psk', '8021x-radius')") enterpriseAdminAccess: Optional[str] = Field(None, description="Enterprise admin access setting") encryptionMode: Optional[str] = Field(None, description="The encryption mode for the SSID") psk: Optional[str] = Field(None, description="The pre-shared key for the SSID when using PSK auth mode") wpaEncryptionMode: Optional[str] = Field(None, description="WPA encryption mode (e.g., 'WPA1 and WPA2', 'WPA2 only')") dot11w: Optional[Dot11wSettings] = Field(None, description="802.11w settings") dot11r: Optional[Dot11rSettings] = Field(None, description="802.11r settings") splashPage: Optional[str] = Field(None, description="The type of splash page for the SSID") radiusServers: Optional[List[RadiusServer]] = Field(None, description="List of RADIUS servers") visible: Optional[bool] = Field(None, description="Whether the SSID is visible or not") availableOnAllAps: Optional[bool] = Field(None, description="Whether the SSID is available on all APs") bandSelection: Optional[str] = Field(None, description="Band selection for SSID (e.g., '5 GHz band only', 'Dual band operation')") # Firewall Rule Schema class FirewallRule(BaseModel): comment: str = Field(..., description="Description of the firewall rule") policy: str = Field(..., description="'allow' or 'deny'") protocol: str = Field(..., description="The protocol (e.g., 'tcp', 'udp', 'any')") srcPort: Optional[str] = Field("Any", description="Source port (e.g., '80', '443-8080', 'Any')") srcCidr: str = Field("Any", description="Source CIDR (e.g., '192.168.1.0/24', 'Any')") destPort: Optional[str] = Field("Any", description="Destination port (e.g., '80', '443-8080', 'Any')") destCidr: str = Field("Any", description="Destination CIDR (e.g., '192.168.1.0/24', 'Any')") syslogEnabled: Optional[bool] = Field(False, description="Whether syslog is enabled for this rule") # Device Update Schema class DeviceUpdateSchema(BaseModel): name: Optional[str] = Field(None, description="The name of the device") tags: Optional[List[str]] = Field(None, description="List of tags for the device") lat: Optional[float] = Field(None, description="Latitude of the device") lng: Optional[float] = Field(None, description="Longitude of the device") address: Optional[str] = Field(None, description="Physical address of the device") notes: Optional[str] = Field(None, description="Notes for the device") moveMapMarker: Optional[bool] = Field(None, description="Whether to move the map marker or not") switchProfileId: Optional[str] = Field(None, description="Switch profile ID") floorPlanId: Optional[str] = Field(None, description="Floor plan ID") # Network Update Schema class NetworkUpdateSchema(BaseModel): name: Optional[str] = Field(None, description="The name of the network") timeZone: Optional[str] = Field(None, description="The timezone of the network") tags: Optional[List[str]] = Field(None, description="List of tags for the network") enrollmentString: Optional[str] = Field(None, description="Enrollment string for the network") notes: Optional[str] = Field(None, description="Notes for the network") # Admin Creation Schema class AdminCreationSchema(BaseModel): email: str = Field(..., description="Email address of the admin") name: str = Field(..., description="Name of the admin") orgAccess: str = Field(..., description="Access level for the organization") tags: Optional[List[str]] = Field(None, description="List of tags for the admin") networks: Optional[List[dict]] = Field(None, description="Network access for the admin") # Action Batch Schema class ActionBatchSchema(BaseModel): actions: List[dict] = Field(..., description="List of actions to perform") confirmed: bool = Field(True, description="Whether the batch is confirmed") synchronous: bool = Field(False, description="Whether the batch is synchronous") # VPN Configuration Schema class VpnSiteToSiteSchema(BaseModel): mode: str = Field(..., description="VPN mode (none, full, or hub-and-spoke)") hubs: Optional[List[dict]] = Field(None, description="List of hub configurations") subnets: Optional[List[dict]] = Field(None, description="List of subnet configurations") # Content Filtering Schema class ContentFilteringSchema(BaseModel): allowedUrls: Optional[List[str]] = Field(None, description="List of allowed URLs") blockedUrls: Optional[List[str]] = Field(None, description="List of blocked URLs") blockedUrlPatterns: Optional[List[str]] = Field(None, description="List of blocked URL patterns") youtubeRestrictedForTeenagers: Optional[bool] = Field(None, description="Restrict YouTube for teenagers") youtubeRestrictedForMature: Optional[bool] = Field(None, description="Restrict YouTube for mature content") # Traffic Shaping Schema class TrafficShapingSchema(BaseModel): globalBandwidthLimits: Optional[dict] = Field(None, description="Global bandwidth limits") rules: Optional[List[dict]] = Field(None, description="Traffic shaping rules") # Camera Sense Schema class CameraSenseSchema(BaseModel): senseEnabled: Optional[bool] = Field(None, description="Whether camera sense is enabled") mqttBrokerId: Optional[str] = Field(None, description="MQTT broker ID") audioDetection: Optional[dict] = Field(None, description="Audio detection settings") # Switch QoS Rule Schema class SwitchQosRuleSchema(BaseModel): vlan: int = Field(..., description="VLAN ID") protocol: str = Field(..., description="Protocol (tcp, udp, any)") srcPort: int = Field(..., description="Source port") srcPortRange: Optional[str] = Field(None, description="Source port range") dstPort: Optional[int] = Field(None, description="Destination port") dstPortRange: Optional[str] = Field(None, description="Destination port range") dscp: Optional[int] = Field(None, description="DSCP value") ####################### # ORGANIZATION TOOLS # ####################### # Get organizations @mcp.tool() async def get_organizations() -> str: """Get a list of organizations the user has access to""" organizations = await async_get_organizations() return json.dumps(organizations, indent=2) # Get organization details @mcp.tool() async def get_organization_details(org_id: str = None) -> str: """Get details for a specific organization, defaults to the configured organization""" organization_id = org_id or MERAKI_ORG_ID org_details = await async_get_organization(organization_id) return json.dumps(org_details, indent=2) # Get networks from Meraki @mcp.tool() async def get_networks(org_id: str = None) -> str: """Get a list of networks from Meraki""" organization_id = org_id or MERAKI_ORG_ID networks = await async_get_organization_networks(organization_id) return json.dumps(networks, indent=2) # Get devices from Meraki @mcp.tool() async def get_devices(org_id: str = None) -> str: """Get a list of devices from Meraki""" organization_id = org_id or MERAKI_ORG_ID devices = await async_get_organization_devices(organization_id) return json.dumps(devices, indent=2) # Create network in Meraki @mcp.tool() def create_network(name: str, tags: list[str], productTypes: list[str], org_id: str = None, copyFromNetworkId: str = None) -> str: """Create a new network in Meraki, optionally copying from another network.""" organization_id = org_id or MERAKI_ORG_ID kwargs = {} if copyFromNetworkId: kwargs['copyFromNetworkId'] = copyFromNetworkId network = dashboard.organizations.createOrganizationNetwork(organization_id, name, productTypes, tags=tags, **kwargs) return json.dumps(network, indent=2) # Delete network in Meraki @mcp.tool() def delete_network(network_id: str) -> str: """Delete a network in Meraki""" dashboard.networks.deleteNetwork(network_id) return f"Network {network_id} deleted" # Get organization status @mcp.tool() def get_organization_status(org_id: str = None) -> str: """Get the status and health of an organization""" organization_id = org_id or MERAKI_ORG_ID status = dashboard.organizations.getOrganizationStatus(organization_id) return json.dumps(status, indent=2) # Get organization inventory @mcp.tool() def get_organization_inventory(org_id: str = None) -> str: """Get the inventory for an organization""" organization_id = org_id or MERAKI_ORG_ID inventory = dashboard.organizations.getOrganizationInventoryDevices(organization_id) return json.dumps(inventory, indent=2) # Get organization license state @mcp.tool() def get_organization_license(org_id: str = None) -> str: """Get the license state for an organization""" organization_id = org_id or MERAKI_ORG_ID license_state = dashboard.organizations.getOrganizationLicensesOverview(organization_id) return json.dumps(license_state, indent=2) # Get organization configuration changes @mcp.tool() def get_organization_conf_change(org_id: str = None) -> str: """Get the org change state for an organization""" organization_id = org_id or MERAKI_ORG_ID org_config_changes = dashboard.organizations.getOrganizationConfigurationChanges(organization_id) return json.dumps(org_config_changes, indent=2) ####################### # NETWORK TOOLS # ####################### # Get network details @mcp.tool() def get_network_details(network_id: str) -> str: """Get details for a specific network""" network = dashboard.networks.getNetwork(network_id) return json.dumps(network, indent=2) # Get network devices @mcp.tool() def get_network_devices(network_id: str) -> str: """Get a list of devices in a specific network""" devices = dashboard.networks.getNetworkDevices(network_id) return json.dumps(devices, indent=2) # Update network @mcp.tool() def update_network(network_id: str, update_data: NetworkUpdateSchema) -> str: """ Update a network's properties using a schema-validated model Args: network_id: The ID of the network to update update_data: Network properties to update (name, timeZone, tags, enrollmentString, notes) """ # Convert the Pydantic model to a dictionary and filter out None values update_dict = {k: v for k, v in update_data.dict().items() if v is not None} result = dashboard.networks.updateNetwork(network_id, **update_dict) return json.dumps(result, indent=2) # Get clients from Meraki @mcp.tool() def get_clients(network_id: str, timespan: int = 86400) -> str: """ Get a list of clients from a specific Meraki network. Args: network_id (str): The ID of the Meraki network. timespan (int): The timespan in seconds to get clients (default: 24 hours) Returns: str: JSON-formatted list of clients. """ clients = dashboard.networks.getNetworkClients(network_id, timespan=timespan) return json.dumps(clients, indent=2) # Get client details @mcp.tool() def get_client_details(network_id: str, client_id: str) -> str: """Get details for a specific client in a network""" client = dashboard.networks.getNetworkClient(network_id, client_id) return json.dumps(client, indent=2) # Get client usage history @mcp.tool() def get_client_usage(network_id: str, client_id: str) -> str: """Get the usage history for a client""" usage = dashboard.networks.getNetworkClientUsageHistory(network_id, client_id) return json.dumps(usage, indent=2) # Get client policy from Meraki @mcp.tool() async def get_client_policy(network_id: str, client_id: str) -> str: """ Get the policy for a specific client in a specific Meraki network. Args: network_id (str): The ID of the Meraki network. client_id (str): The ID (MAC address or client ID) of the client. Returns: str: JSON-formatted client policy. """ loop = asyncio.get_event_loop() policy = await loop.run_in_executor( None, lambda: dashboard.networks.getNetworkClientPolicy(network_id, client_id) ) return json.dumps(policy, indent=2) # Update client policy @mcp.tool() def update_client_policy(network_id: str, client_id: str, device_policy: str, group_policy_id: str = None) -> str: """Update policy for a client""" kwargs = {'devicePolicy': device_policy} if group_policy_id: kwargs['groupPolicyId'] = group_policy_id result = dashboard.networks.updateNetworkClientPolicy(network_id, client_id, **kwargs) return json.dumps(result, indent=2) # Get network traffic analysis @mcp.tool() def get_network_traffic(network_id: str, timespan: int = 86400) -> str: """Get traffic analysis data for a network""" traffic = dashboard.networks.getNetworkTraffic(network_id, timespan=timespan) return json.dumps(traffic, indent=2) ####################### # DEVICE TOOLS # ####################### # Get device details @mcp.tool() async def get_device_details(serial: str) -> str: """Get details for a specific device by serial number""" device = await async_get_device(serial) return json.dumps(device, indent=2) # Update device @mcp.tool() async def update_device(serial: str, device_settings: DeviceUpdateSchema) -> str: """ Update a device in the Meraki organization using a schema-validated model Args: serial: The serial number of the device to update device_settings: Device properties to update (name, tags, lat, lng, address, notes, etc.) Returns: Confirmation of the update with the new settings """ # Convert the Pydantic model to a dictionary and filter out None values update_dict = {k: v for k, v in device_settings.dict().items() if v is not None} await async_update_device(serial, **update_dict) # Get the updated device details to return updated_device = await async_get_device(serial) return json.dumps({ "status": "success", "message": f"Device {serial} updated", "updated_settings": update_dict, "current_device": updated_device }, indent=2) # Claim devices into the Meraki organization @mcp.tool() def claim_devices(network_id: str, serials: list[str]) -> str: """Claim one or more devices into a Meraki network""" dashboard.networks.claimNetworkDevices(network_id, serials) return f"Devices {serials} claimed into network {network_id}" # Remove device from network @mcp.tool() def remove_device(serial: str) -> str: """Remove a device from its network""" dashboard.networks.removeNetworkDevices(serial) return f"Device {serial} removed from network" # Reboot device @mcp.tool() def reboot_device(serial: str) -> str: """Reboot a device""" result = dashboard.devices.rebootDevice(serial) return json.dumps(result, indent=2) # Get device clients @mcp.tool() def get_device_clients(serial: str, timespan: int = 86400) -> str: """Get clients connected to a specific device""" clients = dashboard.devices.getDeviceClients(serial, timespan=timespan) return json.dumps(clients, indent=2) # Get device status @mcp.tool() def get_device_status(serial: str) -> str: """Get the current status of a device""" status = dashboard.devices.getDeviceStatuses(serial) return json.dumps(status, indent=2) # Get device uplink status @mcp.tool() def get_device_uplink(serial: str) -> str: """Get the uplink status of a device""" uplink = dashboard.devices.getDeviceUplink(serial) return json.dumps(uplink, indent=2) ####################### # WIRELESS TOOLS # ####################### # Get wireless SSIDs @mcp.tool() async def get_wireless_ssids(network_id: str) -> str: """Get wireless SSIDs for a network""" ssids = await async_get_wireless_ssids(network_id) return json.dumps(ssids, indent=2) # Update wireless SSID @mcp.tool() async def update_wireless_ssid(network_id: str, ssid_number: str, ssid_settings: SsidUpdateSchema) -> str: """ Update a wireless SSID with comprehensive schema validation Args: network_id: The ID of the network containing the SSID ssid_number: The number of the SSID to update ssid_settings: Comprehensive SSID settings following the Meraki schema Returns: The updated SSID configuration """ # Convert the Pydantic model to a dictionary and filter out None values update_dict = {k: v for k, v in ssid_settings.dict().items() if v is not None} result = await async_update_wireless_ssid(network_id, ssid_number, **update_dict) return json.dumps(result, indent=2) # Get wireless settings @mcp.tool() def get_wireless_settings(network_id: str) -> str: """Get wireless settings for a network""" settings = dashboard.wireless.getNetworkWirelessSettings(network_id) return json.dumps(settings, indent=2) ####################### # SWITCH TOOLS # ####################### # Get switch ports @mcp.tool() def get_switch_ports(serial: str) -> str: """Get ports for a switch""" ports = dashboard.switch.getDeviceSwitchPorts(serial) return json.dumps(ports, indent=2) # Update switch port @mcp.tool() def update_switch_port(serial: str, port_id: str, name: str = None, tags: list[str] = None, enabled: bool = None, vlan: int = None) -> str: """Update a switch port""" kwargs = {} if name: kwargs['name'] = name if tags: kwargs['tags'] = tags if enabled is not None: kwargs['enabled'] = enabled if vlan: kwargs['vlan'] = vlan result = dashboard.switch.updateDeviceSwitchPort(serial, port_id, **kwargs) return json.dumps(result, indent=2) # Get switch VLAN settings @mcp.tool() def get_switch_vlans(network_id: str) -> str: """Get VLANs for a network""" vlans = dashboard.switch.getNetworkSwitchVlans(network_id) return json.dumps(vlans, indent=2) # Create switch VLAN @mcp.tool() def create_switch_vlan(network_id: str, vlan_id: int, name: str, subnet: str = None, appliance_ip: str = None) -> str: """Create a switch VLAN""" kwargs = {} if subnet: kwargs['subnet'] = subnet if appliance_ip: kwargs['applianceIp'] = appliance_ip result = dashboard.switch.createNetworkSwitchVlan(network_id, vlan_id, name, **kwargs) return json.dumps(result, indent=2) ####################### # APPLIANCE TOOLS # ####################### # Get security center @mcp.tool() def get_security_center(network_id: str) -> str: """Get security information for a network""" security = dashboard.appliance.getNetworkApplianceSecurityCenter(network_id) return json.dumps(security, indent=2) # Get VPN status @mcp.tool() def get_vpn_status(network_id: str) -> str: """Get VPN status for a network""" vpn_status = dashboard.appliance.getNetworkApplianceVpnSiteToSiteVpn(network_id) return json.dumps(vpn_status, indent=2) # Get firewall rules @mcp.tool() def get_firewall_rules(network_id: str) -> str: """Get firewall rules for a network""" rules = dashboard.appliance.getNetworkApplianceFirewallL3FirewallRules(network_id) return json.dumps(rules, indent=2) # Update firewall rules @mcp.tool() def update_firewall_rules(network_id: str, rules: List[FirewallRule]) -> str: """ Update firewall rules for a network using schema-validated models Args: network_id: The ID of the network rules: List of firewall rules following the Meraki schema Returns: The updated firewall rules configuration """ # Convert the list of Pydantic models to a list of dictionaries rules_dict = [rule.dict(exclude_none=True) for rule in rules] result = dashboard.appliance.updateNetworkApplianceFirewallL3FirewallRules(network_id, rules=rules_dict) return json.dumps(result, indent=2) ####################### # CAMERA TOOLS # ####################### # Get camera video settings @mcp.tool() def get_camera_video_settings(network_id: str, serial: str) -> str: """Get video settings for a camera""" settings = dashboard.camera.getDeviceCameraVideoSettings(serial) return json.dumps(settings, indent=2) # Get camera quality and retention settings @mcp.tool() def get_camera_quality_settings(network_id: str) -> str: """Get quality and retention settings for cameras in a network""" settings = dashboard.camera.getNetworkCameraQualityRetentionProfiles(network_id) return json.dumps(settings, indent=2) ####################### # ADVANCED ORGANIZATION TOOLS ####################### # Get organization admins @mcp.tool() def get_organization_admins(org_id: str = None) -> str: """Get a list of organization admins""" organization_id = org_id or MERAKI_ORG_ID admins = dashboard.organizations.getOrganizationAdmins(organization_id) return json.dumps(admins, indent=2) # Create organization admin @mcp.tool() def create_organization_admin(org_id: str, email: str, name: str, org_access: str, tags: list[str] = None, networks: list[dict] = None) -> str: """Create a new organization admin""" organization_id = org_id or MERAKI_ORG_ID kwargs = { 'email': email, 'name': name, 'orgAccess': org_access } if tags: kwargs['tags'] = tags if networks: kwargs['networks'] = networks result = dashboard.organizations.createOrganizationAdmin(organization_id, **kwargs) return json.dumps(result, indent=2) # Get organization API requests @mcp.tool() def get_organization_api_requests(org_id: str = None, timespan: int = 86400) -> str: """Get organization API request history""" organization_id = org_id or MERAKI_ORG_ID requests = dashboard.organizations.getOrganizationApiRequests(organization_id, timespan=timespan) return json.dumps(requests, indent=2) # Get organization webhook logs @mcp.tool() def get_organization_webhook_logs(org_id: str = None, timespan: int = 86400) -> str: """Get organization webhook logs""" organization_id = org_id or MERAKI_ORG_ID logs = dashboard.organizations.getOrganizationWebhooksLogs(organization_id, timespan=timespan) return json.dumps(logs, indent=2) ####################### # ENHANCED NETWORK MONITORING ####################### # Get network events @mcp.tool() def get_network_events(network_id: str, timespan: int = 86400, per_page: int = 100) -> str: """Get network events history""" events = dashboard.networks.getNetworkEvents(network_id, timespan=timespan, perPage=per_page) return json.dumps(events, indent=2) # Get network event types @mcp.tool() def get_network_event_types(network_id: str) -> str: """Get available network event types""" event_types = dashboard.networks.getNetworkEventsEventTypes(network_id) return json.dumps(event_types, indent=2) # Get network alerts history @mcp.tool() def get_network_alerts_history(network_id: str, timespan: int = 86400) -> str: """Get network alerts history""" alerts = dashboard.networks.getNetworkAlertsHistory(network_id, timespan=timespan) return json.dumps(alerts, indent=2) # Get network alerts settings @mcp.tool() def get_network_alerts_settings(network_id: str) -> str: """Get network alerts settings""" settings = dashboard.networks.getNetworkAlertsSettings(network_id) return json.dumps(settings, indent=2) # Update network alerts settings @mcp.tool() def update_network_alerts_settings(network_id: str, defaultDestinations: dict = None, alerts: list[dict] = None) -> str: """Update network alerts settings""" kwargs = {} if defaultDestinations: kwargs['defaultDestinations'] = defaultDestinations if alerts: kwargs['alerts'] = alerts result = dashboard.networks.updateNetworkAlertsSettings(network_id, **kwargs) return json.dumps(result, indent=2) ####################### # LIVE DEVICE TOOLS ####################### # Ping device @mcp.tool() def ping_device(serial: str, target_ip: str, count: int = 5) -> str: """Ping a device from another device""" result = dashboard.devices.createDeviceLiveToolsPing(serial, target_ip, count=count) return json.dumps(result, indent=2) # Get ping results @mcp.tool() def get_device_ping_results(serial: str, ping_id: str) -> str: """Get results from a device ping test""" result = dashboard.devices.getDeviceLiveToolsPing(serial, ping_id) return json.dumps(result, indent=2) # Cable test device @mcp.tool() def cable_test_device(serial: str, ports: list[str]) -> str: """Run cable test on device ports""" result = dashboard.devices.createDeviceLiveToolsCableTest(serial, ports) return json.dumps(result, indent=2) # Get cable test results @mcp.tool() def get_device_cable_test_results(serial: str, cable_test_id: str) -> str: """Get results from a device cable test""" result = dashboard.devices.getDeviceLiveToolsCableTest(serial, cable_test_id) return json.dumps(result, indent=2) # Blink device LEDs @mcp.tool() def blink_device_leds(serial: str, duration: int = 5) -> str: """Blink device LEDs for identification""" result = dashboard.devices.blinkDeviceLeds(serial, duration=duration) return json.dumps(result, indent=2) # Wake on LAN @mcp.tool() def wake_on_lan_device(serial: str, mac: str) -> str: """Send wake-on-LAN packet to a device""" result = dashboard.devices.createDeviceLiveToolsWakeOnLan(serial, mac) return json.dumps(result, indent=2) ####################### # ADVANCED WIRELESS TOOLS ####################### # Get wireless RF profiles @mcp.tool() def get_wireless_rf_profiles(network_id: str) -> str: """Get wireless RF profiles for a network""" profiles = dashboard.wireless.getNetworkWirelessRfProfiles(network_id) return json.dumps(profiles, indent=2) # Create wireless RF profile @mcp.tool() def create_wireless_rf_profile(network_id: str, name: str, band_selection_type: str, **kwargs) -> str: """Create a wireless RF profile""" result = dashboard.wireless.createNetworkWirelessRfProfile(network_id, name, bandSelectionType=band_selection_type, **kwargs) return json.dumps(result, indent=2) # Get wireless channel utilization @mcp.tool() def get_wireless_channel_utilization(network_id: str, timespan: int = 86400) -> str: """Get wireless channel utilization history""" utilization = dashboard.wireless.getNetworkWirelessChannelUtilizationHistory(network_id, timespan=timespan) return json.dumps(utilization, indent=2) # Get wireless signal quality @mcp.tool() def get_wireless_signal_quality(network_id: str, timespan: int = 86400) -> str: """Get wireless signal quality history""" quality = dashboard.wireless.getNetworkWirelessSignalQualityHistory(network_id, timespan=timespan) return json.dumps(quality, indent=2) # Get wireless connection stats @mcp.tool() def get_wireless_connection_stats(network_id: str, timespan: int = 86400) -> str: """Get wireless connection statistics""" stats = dashboard.wireless.getNetworkWirelessConnectionStats(network_id, timespan=timespan) return json.dumps(stats, indent=2) # Get wireless client connectivity events @mcp.tool() def get_wireless_client_connectivity_events(network_id: str, client_id: str, timespan: int = 86400) -> str: """Get wireless client connectivity events""" events = dashboard.wireless.getNetworkWirelessClientConnectivityEvents(network_id, client_id, timespan=timespan) return json.dumps(events, indent=2) ####################### # ADVANCED SWITCH TOOLS ####################### # Get switch port statuses @mcp.tool() def get_switch_port_statuses(serial: str) -> str: """Get switch port statuses""" statuses = dashboard.switch.getDeviceSwitchPortsStatuses(serial) return json.dumps(statuses, indent=2) # Cycle switch ports @mcp.tool() def cycle_switch_ports(serial: str, ports: list[str]) -> str: """Cycle (restart) switch ports""" result = dashboard.switch.cycleDeviceSwitchPorts(serial, ports) return json.dumps(result, indent=2) # Get switch access control lists @mcp.tool() def get_switch_access_control_lists(network_id: str) -> str: """Get switch access control lists""" acls = dashboard.switch.getNetworkSwitchAccessControlLists(network_id) return json.dumps(acls, indent=2) # Update switch access control lists @mcp.tool() def update_switch_access_control_lists(network_id: str, rules: list[dict]) -> str: """Update switch access control lists""" result = dashboard.switch.updateNetworkSwitchAccessControlLists(network_id, rules) return json.dumps(result, indent=2) # Get switch QoS rules @mcp.tool() def get_switch_qos_rules(network_id: str) -> str: """Get switch QoS rules""" rules = dashboard.switch.getNetworkSwitchQosRules(network_id) return json.dumps(rules, indent=2) # Create switch QoS rule @mcp.tool() def create_switch_qos_rule(network_id: str, vlan: int, protocol: str, src_port: int, src_port_range: str = None, dst_port: int = None, dst_port_range: str = None, dscp: int = None) -> str: """Create a switch QoS rule""" kwargs = { 'vlan': vlan, 'protocol': protocol, 'srcPort': src_port } if src_port_range: kwargs['srcPortRange'] = src_port_range if dst_port: kwargs['dstPort'] = dst_port if dst_port_range: kwargs['dstPortRange'] = dst_port_range if dscp: kwargs['dscp'] = dscp result = dashboard.switch.createNetworkSwitchQosRule(network_id, **kwargs) return json.dumps(result, indent=2) ####################### # ADVANCED APPLIANCE TOOLS ####################### # Get appliance VPN site-to-site status @mcp.tool() def get_appliance_vpn_site_to_site(network_id: str) -> str: """Get appliance VPN site-to-site configuration""" vpn = dashboard.appliance.getNetworkApplianceVpnSiteToSiteVpn(network_id) return json.dumps(vpn, indent=2) # Update appliance VPN site-to-site @mcp.tool() def update_appliance_vpn_site_to_site(network_id: str, mode: str, hubs: list[dict] = None, subnets: list[dict] = None) -> str: """Update appliance VPN site-to-site configuration""" kwargs = {'mode': mode} if hubs: kwargs['hubs'] = hubs if subnets: kwargs['subnets'] = subnets result = dashboard.appliance.updateNetworkApplianceVpnSiteToSiteVpn(network_id, **kwargs) return json.dumps(result, indent=2) # Get appliance content filtering @mcp.tool() def get_appliance_content_filtering(network_id: str) -> str: """Get appliance content filtering settings""" filtering = dashboard.appliance.getNetworkApplianceContentFiltering(network_id) return json.dumps(filtering, indent=2) # Update appliance content filtering @mcp.tool() def update_appliance_content_filtering(network_id: str, allowed_urls: list[str] = None, blocked_urls: list[str] = None, blocked_url_patterns: list[str] = None, youtube_restricted_for_teenagers: bool = None, youtube_restricted_for_mature: bool = None) -> str: """Update appliance content filtering settings""" kwargs = {} if allowed_urls: kwargs['allowedUrls'] = allowed_urls if blocked_urls: kwargs['blockedUrls'] = blocked_urls if blocked_url_patterns: kwargs['blockedUrlPatterns'] = blocked_url_patterns if youtube_restricted_for_teenagers is not None: kwargs['youtubeRestrictedForTeenagers'] = youtube_restricted_for_teenagers if youtube_restricted_for_mature is not None: kwargs['youtubeRestrictedForMature'] = youtube_restricted_for_mature result = dashboard.appliance.updateNetworkApplianceContentFiltering(network_id, **kwargs) return json.dumps(result, indent=2) # Get appliance security events @mcp.tool() def get_appliance_security_events(network_id: str, timespan: int = 86400) -> str: """Get appliance security events""" events = dashboard.appliance.getNetworkApplianceSecurityEvents(network_id, timespan=timespan) return json.dumps(events, indent=2) # Get appliance traffic shaping @mcp.tool() def get_appliance_traffic_shaping(network_id: str) -> str: """Get appliance traffic shaping settings""" shaping = dashboard.appliance.getNetworkApplianceTrafficShaping(network_id) return json.dumps(shaping, indent=2) # Update appliance traffic shaping @mcp.tool() def update_appliance_traffic_shaping(network_id: str, global_bandwidth_limits: dict = None) -> str: """Update appliance traffic shaping settings""" kwargs = {} if global_bandwidth_limits: kwargs['globalBandwidthLimits'] = global_bandwidth_limits result = dashboard.appliance.updateNetworkApplianceTrafficShaping(network_id, **kwargs) return json.dumps(result, indent=2) ####################### # CAMERA TOOLS ####################### # Get camera analytics live @mcp.tool() def get_camera_analytics_live(serial: str) -> str: """Get live camera analytics""" analytics = dashboard.camera.getDeviceCameraAnalyticsLive(serial) return json.dumps(analytics, indent=2) # Get camera analytics overview @mcp.tool() def get_camera_analytics_overview(serial: str, timespan: int = 86400) -> str: """Get camera analytics overview""" overview = dashboard.camera.getDeviceCameraAnalyticsOverview(serial, timespan=timespan) return json.dumps(overview, indent=2) # Get camera analytics zones @mcp.tool() def get_camera_analytics_zones(serial: str) -> str: """Get camera analytics zones""" zones = dashboard.camera.getDeviceCameraAnalyticsZones(serial) return json.dumps(zones, indent=2) # Generate camera snapshot @mcp.tool() def generate_camera_snapshot(serial: str, timestamp: str = None) -> str: """Generate a camera snapshot""" kwargs = {} if timestamp: kwargs['timestamp'] = timestamp result = dashboard.camera.generateDeviceCameraSnapshot(serial, **kwargs) return json.dumps(result, indent=2) # Get camera sense @mcp.tool() def get_camera_sense(serial: str) -> str: """Get camera sense configuration""" sense = dashboard.camera.getDeviceCameraSense(serial) return json.dumps(sense, indent=2) # Update camera sense @mcp.tool() def update_camera_sense(serial: str, sense_enabled: bool = None, mqtt_broker_id: str = None, audio_detection: dict = None) -> str: """Update camera sense configuration""" kwargs = {} if sense_enabled is not None: kwargs['senseEnabled'] = sense_enabled if mqtt_broker_id: kwargs['mqttBrokerId'] = mqtt_broker_id if audio_detection: kwargs['audioDetection'] = audio_detection result = dashboard.camera.updateDeviceCameraSense(serial, **kwargs) return json.dumps(result, indent=2) ####################### # NETWORK AUTOMATION TOOLS ####################### # Create action batch @mcp.tool() def create_action_batch(org_id: str, actions: list[dict], confirmed: bool = True, synchronous: bool = False) -> str: """Create an action batch for bulk operations""" organization_id = org_id or MERAKI_ORG_ID result = dashboard.organizations.createOrganizationActionBatch(organization_id, actions, confirmed=confirmed, synchronous=synchronous) return json.dumps(result, indent=2) # Get action batch status @mcp.tool() def get_action_batch_status(org_id: str, batch_id: str) -> str: """Get action batch status""" organization_id = org_id or MERAKI_ORG_ID status = dashboard.organizations.getOrganizationActionBatch(organization_id, batch_id) return json.dumps(status, indent=2) # Get action batches @mcp.tool() def get_action_batches(org_id: str = None) -> str: """Get all action batches for an organization""" organization_id = org_id or MERAKI_ORG_ID batches = dashboard.organizations.getOrganizationActionBatches(organization_id) return json.dumps(batches, indent=2) # Define resources #Add a dynamic greeting resource @mcp.resource("greeting: //{name}") def greeting(name: str) -> str: """Greet a user by name""" return f"Hello {name}!" #execute and return the stdio output if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MKutka/meraki-magic-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server