Skip to main content
Glama

ONOS MCP Server

by davidlin2k
multicast.py2.73 kB
from typing import Any, Dict, List from mcp.server.fastmcp import FastMCP from onos_mcp_server.api_client import make_onos_request async def get_multicast_routes() -> str: """Get all multicast routes. Returns array of all known multicast routes. """ try: routes = await make_onos_request("get", "/mcast") return str(routes) except Exception as e: return f"Error retrieving multicast routes: {str(e)}" async def add_multicast_route( group: str, source: str, sources: List[str], sinks: List[str] ) -> str: """Create a new multicast route. Args: group: Multicast group address source: Source IP address sources: List of source connection points sinks: List of sink connection points Creates a new route in the multicast RIB. """ try: route_data = { "group": group, "source": source, "sources": sources, "sinks": sinks, } result = await make_onos_request("post", "/mcast", json=route_data) return f"Multicast route added successfully for group {group}" except Exception as e: return f"Error adding multicast route: {str(e)}" async def remove_multicast_route(group: str, source: str) -> str: """Remove a multicast route. Args: group: Multicast group address source: Source IP address Removes a route from the multicast RIB. """ try: route_data = {"group": group, "source": source} await make_onos_request("delete", "/mcast", json=route_data) return f"Multicast route from {source} to group {group} removed successfully" except Exception as e: return f"Error removing multicast route: {str(e)}" async def add_multicast_sink(group: str, source: str, sink: str) -> str: """Create a sink for a multicast route. Args: group: Multicast group address source: Source IP address sink: Sink connection point Creates a new sink for an existing multicast route. """ try: sink_data = {"sink": sink} result = await make_onos_request( "post", f"/mcast/sinks/{group}/{source}", json=sink_data ) return ( f"Sink added successfully to multicast route from {source} to group {group}" ) except Exception as e: return f"Error adding sink to multicast route: {str(e)}" def register_tools(mcp_server: FastMCP): """Register all multicast management tools with the MCP server.""" mcp_server.tool()(get_multicast_routes) mcp_server.tool()(add_multicast_route) mcp_server.tool()(remove_multicast_route) mcp_server.tool()(add_multicast_sink)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidlin2k/onos-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server