Skip to main content
Glama

ONOS MCP Server

by davidlin2k
device.py3.29 kB
from mcp.server.fastmcp import FastMCP from onos_mcp_server.api_client import make_onos_request async def get_devices() -> str: """Get information about all network devices. Returns array of all discovered infrastructure devices. """ try: devices = await make_onos_request("get", "/devices") return str(devices) except Exception as e: return f"Error retrieving devices: {str(e)}" async def get_device(deviceId: str) -> str: """Get detailed information about a specific device. Args: deviceId: ID of the device to query Returns details of the specified infrastructure device. """ try: device = await make_onos_request("get", f"/devices/{deviceId}") return str(device) except Exception as e: return f"Error retrieving device {deviceId}: {str(e)}" async def remove_device(deviceId: str) -> str: """Administratively remove a device from the inventory. Args: deviceId: ID of the device to remove Administratively deletes the specified device from the inventory of known devices. """ try: await make_onos_request("delete", f"/devices/{deviceId}") return f"Device {deviceId} removed successfully from the inventory" except Exception as e: return f"Error removing device {deviceId}: {str(e)}" async def get_all_device_ports() -> str: """Get information about ports on all infrastructure devices. Returns port details of all infrastructure devices. """ try: ports = await make_onos_request("get", "/devices/ports") return str(ports) except Exception as e: return f"Error retrieving all device ports: {str(e)}" async def get_device_ports(deviceId: str) -> str: """Get information about all ports on a specific device. Args: deviceId: ID of the device to query ports for Returns details of ports for the specified infrastructure device. """ try: ports = await make_onos_request("get", f"/devices/{deviceId}/ports") return str(ports) except Exception as e: return f"Error retrieving ports for device {deviceId}: {str(e)}" async def change_device_port_state(device_id: str, port_id: str, enabled: bool) -> str: """Change the administrative state of a device port. Args: device_id: Device identifier port_id: Port number enabled: True to enable the port, False to disable it Changes the administrative state of the specified port on the device. """ try: port_data = {"enabled": enabled} await make_onos_request( "post", f"/devices/{device_id}/portstate/{port_id}", json=port_data ) state = "enabled" if enabled else "disabled" return f"Port {port_id} on device {device_id} {state} successfully" except Exception as e: return f"Error changing port state: {str(e)}" def register_tools(mcp_server: FastMCP): """Register all device management tools with the MCP server.""" mcp_server.tool()(get_devices) mcp_server.tool()(get_device) mcp_server.tool()(remove_device) mcp_server.tool()(get_all_device_ports) mcp_server.tool()(get_device_ports) mcp_server.tool()(change_device_port_state)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidlin2k/onos-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server