Skip to main content
Glama

ONOS MCP Server

by davidlin2k
flow.py7.14 kB
from typing import Any, Dict, List from mcp.server.fastmcp import FastMCP from onos_mcp_server.api_client import make_onos_request from urllib.parse import quote # Get all flows async def get_flows() -> str: """Get all flow entries. Returns array of all flow rules in the system. """ try: flows = await make_onos_request("get", "/flows") return str(flows) except Exception as e: return f"Error retrieving flows: {str(e)}" # Get flows by device async def get_device_flows(deviceId: str) -> str: """Get all flow entries for a specific device. Args: deviceId: ID of the device to query Returns array of all flow rules for the specified device. """ try: flows = await make_onos_request("get", f"/flows/{deviceId}") return str(flows) except Exception as e: return f"Error retrieving flows for device {deviceId}: {str(e)}" # Get a specific flow rule async def get_flow(deviceId: str, flowId: str | int) -> str: """Get a specific flow rule by device ID and flow ID. Args: deviceId: Device identifier flowId: Flow rule identifier formatted as a string Returns the flow entry specified by the device id and flow rule id. """ try: flow = await make_onos_request("get", f"/flows/{deviceId}/{flowId}") return str(flow) except Exception as e: return f"Error retrieving flow {flowId} for device {deviceId}: {str(e)}" # Get flows by table async def get_table_flows(tableId: int) -> str: """Get all flow entries for a table. Args: tableId: Table identifier Returns array of all flow rules for a table. """ try: flows = await make_onos_request("get", f"/flows/table/{tableId}") return str(flows) except Exception as e: return f"Error retrieving flows for table {tableId}: {str(e)}" # Get flows by application async def get_application_flows(appId: str) -> str: """Get flow rules generated by an application. Args: appId: Application identifier Returns the flow rule specified by the application id. """ try: flows = await make_onos_request("get", f"/flows/application/{appId}") return str(flows) except Exception as e: return f"Error retrieving flows for application {appId}: {str(e)}" # Get pending flows async def get_pending_flows() -> str: """Get all pending flow entries. Returns array of all pending flow rules in the system. """ try: flows = await make_onos_request("get", "/flows/pending") return str(flows) except Exception as e: return f"Error retrieving pending flows: {str(e)}" async def add_flow( device_id: str, priority: int, timeout: int, is_permanent: bool, criteria: List[Dict[str, Any]], instructions: List[Dict[str, Any]], ) -> str: """ Add a flow rule to a device with comprehensive criteria and instruction support. Args: device_id: Device ID to add the flow to priority: Flow priority (higher values = higher priority) timeout: Flow timeout in seconds (0 for no timeout) is_permanent: Whether the flow is permanent criteria: List of criteria dictionaries. Each criterion must have 'type' and associated fields. For example: [{"type": "ETH_TYPE", "ethType": "0x88cc"}, {"type": "IN_PORT", "port": "1"}] instructions: List of instruction dictionaries. Each instruction must have 'type' and associated fields. For example: [{"type": "OUTPUT", "port": "2"}, {"type": "GROUP", "groupId": 1}] """ try: # Create flow rule with full criteria and instruction support flow_data = { "priority": priority, "timeout": timeout, "isPermanent": is_permanent, "deviceId": device_id, "treatment": {"instructions": instructions}, "selector": {"criteria": criteria}, } params = {"appId": "org.onosproject.mcp"} result = await make_onos_request( "post", f"/flows/{device_id}", json=flow_data, params=params ) return f"Flow added successfully: {result}" except Exception as e: return f"Error adding flow: {str(e)}" # Add batch of flow rules async def add_flows(appId: str, flows: List[Dict[str, Any]]) -> str: """Create multiple flow rules. Args: appId: Application identifier flows: List of flow rule specifications Creates and installs multiple flow rules. """ try: flows_data = {"flows": flows} params = {"appId": appId} result = await make_onos_request( "post", "/flows", json=flows_data, params=params ) return f"Flow rules added successfully: {result}" except Exception as e: return f"Error adding flow rules: {str(e)}" # Remove flow rule async def remove_flow(deviceId: str, flowId: str | int) -> str: """Remove a specific flow rule. Args: deviceId: Device identifier flowId: Flow rule identifier Removes the specified flow rule. """ try: encoded_device_id = quote(deviceId, safe="") encoded_flow_id = quote(str(flowId), safe="") await make_onos_request( "delete", f"/flows/{encoded_device_id}/{encoded_flow_id}" ) return f"Flow {flowId} removed successfully from device {deviceId}" except Exception as e: return f"Error removing flow {flowId} from device {deviceId}: {str(e)}" # Remove flows by application async def remove_application_flows(appId: str) -> str: """Remove flow rules generated by an application. Args: appId: Application identifier Removes a collection of flow rules generated by the given application. """ try: await make_onos_request("delete", f"/flows/application/{appId}") return f"All flows for application {appId} removed successfully" except Exception as e: return f"Error removing flows for application {appId}: {str(e)}" # Remove batch of flow rules async def remove_flows(flows: List[Dict[str, Any]]) -> str: """Remove a batch of flow rules. Args: flows: List of flow rule specifications to remove """ try: await make_onos_request("delete", "/flows", json=flows) return f"Flow rules batch removed successfully" except Exception as e: return f"Error removing flow rules batch: {str(e)}" def register_tools(mcp_server: FastMCP): """Register all flow management tools with the MCP server.""" mcp_server.tool()(get_flows) mcp_server.tool()(get_device_flows) mcp_server.tool()(get_flow) mcp_server.tool()(get_table_flows) mcp_server.tool()(get_application_flows) mcp_server.tool()(get_pending_flows) mcp_server.tool()(add_flow) mcp_server.tool()(add_flows) mcp_server.tool()(remove_flow) mcp_server.tool()(remove_application_flows) mcp_server.tool()(remove_flows)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidlin2k/onos-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server