flow.py•7.14 kB
from typing import Any, Dict, List
from mcp.server.fastmcp import FastMCP
from onos_mcp_server.api_client import make_onos_request
from urllib.parse import quote
# Get all flows
async def get_flows() -> str:
"""Get all flow entries.
Returns array of all flow rules in the system.
"""
try:
flows = await make_onos_request("get", "/flows")
return str(flows)
except Exception as e:
return f"Error retrieving flows: {str(e)}"
# Get flows by device
async def get_device_flows(deviceId: str) -> str:
"""Get all flow entries for a specific device.
Args:
deviceId: ID of the device to query
Returns array of all flow rules for the specified device.
"""
try:
flows = await make_onos_request("get", f"/flows/{deviceId}")
return str(flows)
except Exception as e:
return f"Error retrieving flows for device {deviceId}: {str(e)}"
# Get a specific flow rule
async def get_flow(deviceId: str, flowId: str | int) -> str:
"""Get a specific flow rule by device ID and flow ID.
Args:
deviceId: Device identifier
flowId: Flow rule identifier formatted as a string
Returns the flow entry specified by the device id and flow rule id.
"""
try:
flow = await make_onos_request("get", f"/flows/{deviceId}/{flowId}")
return str(flow)
except Exception as e:
return f"Error retrieving flow {flowId} for device {deviceId}: {str(e)}"
# Get flows by table
async def get_table_flows(tableId: int) -> str:
"""Get all flow entries for a table.
Args:
tableId: Table identifier
Returns array of all flow rules for a table.
"""
try:
flows = await make_onos_request("get", f"/flows/table/{tableId}")
return str(flows)
except Exception as e:
return f"Error retrieving flows for table {tableId}: {str(e)}"
# Get flows by application
async def get_application_flows(appId: str) -> str:
"""Get flow rules generated by an application.
Args:
appId: Application identifier
Returns the flow rule specified by the application id.
"""
try:
flows = await make_onos_request("get", f"/flows/application/{appId}")
return str(flows)
except Exception as e:
return f"Error retrieving flows for application {appId}: {str(e)}"
# Get pending flows
async def get_pending_flows() -> str:
"""Get all pending flow entries.
Returns array of all pending flow rules in the system.
"""
try:
flows = await make_onos_request("get", "/flows/pending")
return str(flows)
except Exception as e:
return f"Error retrieving pending flows: {str(e)}"
async def add_flow(
device_id: str,
priority: int,
timeout: int,
is_permanent: bool,
criteria: List[Dict[str, Any]],
instructions: List[Dict[str, Any]],
) -> str:
"""
Add a flow rule to a device with comprehensive criteria and instruction support.
Args:
device_id: Device ID to add the flow to
priority: Flow priority (higher values = higher priority)
timeout: Flow timeout in seconds (0 for no timeout)
is_permanent: Whether the flow is permanent
criteria: List of criteria dictionaries. Each criterion must have 'type' and associated fields.
For example: [{"type": "ETH_TYPE", "ethType": "0x88cc"},
{"type": "IN_PORT", "port": "1"}]
instructions: List of instruction dictionaries. Each instruction must have 'type' and associated fields.
For example: [{"type": "OUTPUT", "port": "2"},
{"type": "GROUP", "groupId": 1}]
"""
try:
# Create flow rule with full criteria and instruction support
flow_data = {
"priority": priority,
"timeout": timeout,
"isPermanent": is_permanent,
"deviceId": device_id,
"treatment": {"instructions": instructions},
"selector": {"criteria": criteria},
}
params = {"appId": "org.onosproject.mcp"}
result = await make_onos_request(
"post", f"/flows/{device_id}", json=flow_data, params=params
)
return f"Flow added successfully: {result}"
except Exception as e:
return f"Error adding flow: {str(e)}"
# Add batch of flow rules
async def add_flows(appId: str, flows: List[Dict[str, Any]]) -> str:
"""Create multiple flow rules.
Args:
appId: Application identifier
flows: List of flow rule specifications
Creates and installs multiple flow rules.
"""
try:
flows_data = {"flows": flows}
params = {"appId": appId}
result = await make_onos_request(
"post", "/flows", json=flows_data, params=params
)
return f"Flow rules added successfully: {result}"
except Exception as e:
return f"Error adding flow rules: {str(e)}"
# Remove flow rule
async def remove_flow(deviceId: str, flowId: str | int) -> str:
"""Remove a specific flow rule.
Args:
deviceId: Device identifier
flowId: Flow rule identifier
Removes the specified flow rule.
"""
try:
encoded_device_id = quote(deviceId, safe="")
encoded_flow_id = quote(str(flowId), safe="")
await make_onos_request(
"delete", f"/flows/{encoded_device_id}/{encoded_flow_id}"
)
return f"Flow {flowId} removed successfully from device {deviceId}"
except Exception as e:
return f"Error removing flow {flowId} from device {deviceId}: {str(e)}"
# Remove flows by application
async def remove_application_flows(appId: str) -> str:
"""Remove flow rules generated by an application.
Args:
appId: Application identifier
Removes a collection of flow rules generated by the given application.
"""
try:
await make_onos_request("delete", f"/flows/application/{appId}")
return f"All flows for application {appId} removed successfully"
except Exception as e:
return f"Error removing flows for application {appId}: {str(e)}"
# Remove batch of flow rules
async def remove_flows(flows: List[Dict[str, Any]]) -> str:
"""Remove a batch of flow rules.
Args:
flows: List of flow rule specifications to remove
"""
try:
await make_onos_request("delete", "/flows", json=flows)
return f"Flow rules batch removed successfully"
except Exception as e:
return f"Error removing flow rules batch: {str(e)}"
def register_tools(mcp_server: FastMCP):
"""Register all flow management tools with the MCP server."""
mcp_server.tool()(get_flows)
mcp_server.tool()(get_device_flows)
mcp_server.tool()(get_flow)
mcp_server.tool()(get_table_flows)
mcp_server.tool()(get_application_flows)
mcp_server.tool()(get_pending_flows)
mcp_server.tool()(add_flow)
mcp_server.tool()(add_flows)
mcp_server.tool()(remove_flow)
mcp_server.tool()(remove_application_flows)
mcp_server.tool()(remove_flows)