Skip to main content
Glama
routes.py10.9 kB
from typing import Optional, List from ..connector import execute_mikrotik_command from ..logger import app_logger def mikrotik_add_route( dst_address: str, gateway: str, distance: Optional[int] = None, scope: Optional[int] = None, target_scope: Optional[int] = None, routing_mark: Optional[str] = None, comment: Optional[str] = None, disabled: bool = False, vrf_interface: Optional[str] = None, pref_src: Optional[str] = None, check_gateway: Optional[str] = None ) -> str: app_logger.info(f"Adding route: dst={dst_address}, gateway={gateway}") cmd = f"/ip route add dst-address={dst_address} gateway={gateway}" if distance is not None: cmd += f" distance={distance}" if scope is not None: cmd += f" scope={scope}" if target_scope is not None: cmd += f" target-scope={target_scope}" if routing_mark: cmd += f' routing-mark="{routing_mark}"' if comment: cmd += f' comment="{comment}"' if disabled: cmd += " disabled=yes" if vrf_interface: cmd += f' vrf-interface="{vrf_interface}"' if pref_src: cmd += f" pref-src={pref_src}" if check_gateway: cmd += f" check-gateway={check_gateway}" result = execute_mikrotik_command(cmd) if result.strip(): if "*" in result or result.strip().isdigit(): route_id = result.strip() details_cmd = f"/ip route print detail where .id={route_id}" details = execute_mikrotik_command(details_cmd) if details.strip(): return f"Route added successfully:\n\n{details}" else: return f"Route added with ID: {result}" else: return f"Failed to add route: {result}" else: details_cmd = f'/ip route print detail where dst-address="{dst_address}" and gateway="{gateway}"' details = execute_mikrotik_command(details_cmd) if details.strip(): return f"Route added successfully:\n\n{details}" else: return "Route addition completed but unable to verify." def mikrotik_list_routes( dst_filter: Optional[str] = None, gateway_filter: Optional[str] = None, routing_mark_filter: Optional[str] = None, distance_filter: Optional[int] = None, active_only: bool = False, disabled_only: bool = False, dynamic_only: bool = False, static_only: bool = False ) -> str: app_logger.info(f"Listing routes with filters: dst={dst_filter}, gateway={gateway_filter}") cmd = "/ip route print" filters = [] if dst_filter: filters.append(f'dst-address~"{dst_filter}"') if gateway_filter: filters.append(f'gateway~"{gateway_filter}"') if routing_mark_filter: filters.append(f'routing-mark="{routing_mark_filter}"') if distance_filter is not None: filters.append(f"distance={distance_filter}") if active_only: filters.append("active=yes") if disabled_only: filters.append("disabled=yes") if dynamic_only: filters.append("dynamic=yes") if static_only: filters.append("static=yes") if filters: cmd += " where " + " ".join(filters) result = execute_mikrotik_command(cmd) if not result or result.strip() == "" or result.strip() == "no such item": return "No routes found matching the criteria." return f"ROUTES:\n\n{result}" def mikrotik_get_route(route_id: str) -> str: app_logger.info(f"Getting route details: route_id={route_id}") cmd = f"/ip route print detail where .id={route_id}" result = execute_mikrotik_command(cmd) if not result or result.strip() == "": return f"Route with ID '{route_id}' not found." return f"ROUTE DETAILS:\n\n{result}" def mikrotik_update_route( route_id: str, dst_address: Optional[str] = None, gateway: Optional[str] = None, distance: Optional[int] = None, scope: Optional[int] = None, target_scope: Optional[int] = None, routing_mark: Optional[str] = None, comment: Optional[str] = None, disabled: Optional[bool] = None, vrf_interface: Optional[str] = None, pref_src: Optional[str] = None, check_gateway: Optional[str] = None ) -> str: app_logger.info(f"Updating route: route_id={route_id}") cmd = f"/ip route set {route_id}" updates = [] if dst_address: updates.append(f"dst-address={dst_address}") if gateway: updates.append(f"gateway={gateway}") if distance is not None: updates.append(f"distance={distance}") if scope is not None: updates.append(f"scope={scope}") if target_scope is not None: updates.append(f"target-scope={target_scope}") if routing_mark is not None: if routing_mark == "": updates.append("!routing-mark") else: updates.append(f'routing-mark="{routing_mark}"') if comment is not None: updates.append(f'comment="{comment}"') if disabled is not None: updates.append(f'disabled={"yes" if disabled else "no"}') if vrf_interface is not None: if vrf_interface == "": updates.append("!vrf-interface") else: updates.append(f'vrf-interface="{vrf_interface}"') if pref_src is not None: if pref_src == "": updates.append("!pref-src") else: updates.append(f"pref-src={pref_src}") if check_gateway is not None: updates.append(f"check-gateway={check_gateway}") if not updates: return "No updates specified." cmd += " " + " ".join(updates) result = execute_mikrotik_command(cmd) if "failure:" in result.lower() or "error" in result.lower(): return f"Failed to update route: {result}" details_cmd = f"/ip route print detail where .id={route_id}" details = execute_mikrotik_command(details_cmd) return f"Route updated successfully:\n\n{details}" def mikrotik_remove_route(route_id: str) -> str: app_logger.info(f"Removing route: route_id={route_id}") check_cmd = f"/ip route print count-only where .id={route_id}" count = execute_mikrotik_command(check_cmd) if count.strip() == "0": return f"Route with ID '{route_id}' not found." cmd = f"/ip route remove {route_id}" result = execute_mikrotik_command(cmd) if "failure:" in result.lower() or "error" in result.lower(): return f"Failed to remove route: {result}" return f"Route with ID '{route_id}' removed successfully." def mikrotik_enable_route(route_id: str) -> str: return mikrotik_update_route(route_id, disabled=False) def mikrotik_disable_route(route_id: str) -> str: return mikrotik_update_route(route_id, disabled=True) def mikrotik_get_routing_table( table_name: Optional[str] = "main", protocol_filter: Optional[str] = None, active_only: bool = True ) -> str: app_logger.info(f"Getting routing table: table={table_name}") cmd = "/ip route print" filters = [] if table_name and table_name != "main": filters.append(f'routing-table="{table_name}"') if protocol_filter: filters.append(f'protocol="{protocol_filter}"') if active_only: filters.append("active=yes") if filters: cmd += " where " + " ".join(filters) result = execute_mikrotik_command(cmd) if not result or result.strip() == "": return f"No routes found in table '{table_name}'." return f"ROUTING TABLE ({table_name}):\n\n{result}" def mikrotik_check_route_path( destination: str, source: Optional[str] = None, routing_mark: Optional[str] = None ) -> str: app_logger.info(f"Checking route path to: {destination}") cmd = f"/ip route check {destination}" if source: cmd += f" src-address={source}" if routing_mark: cmd += f' routing-mark="{routing_mark}"' result = execute_mikrotik_command(cmd) if not result: return f"Unable to check route to {destination}" return f"ROUTE PATH TO {destination}:\n\n{result}" def mikrotik_get_route_cache() -> str: app_logger.info("Getting route cache") cmd = "/ip route cache print" result = execute_mikrotik_command(cmd) if not result or result.strip() == "": return "Route cache is empty." return f"ROUTE CACHE:\n\n{result}" def mikrotik_flush_route_cache() -> str: app_logger.info("Flushing route cache") cmd = "/ip route cache flush" result = execute_mikrotik_command(cmd) if not result.strip(): return "Route cache flushed successfully." else: return f"Flush result: {result}" def mikrotik_add_default_route( gateway: str, distance: int = 1, comment: Optional[str] = None, check_gateway: str = "ping" ) -> str: return mikrotik_add_route( dst_address="0.0.0.0/0", gateway=gateway, distance=distance, comment=comment or "Default route", check_gateway=check_gateway ) def mikrotik_add_blackhole_route( dst_address: str, distance: int = 1, comment: Optional[str] = None ) -> str: app_logger.info(f"Adding blackhole route: dst={dst_address}") cmd = f"/ip route add dst-address={dst_address} type=blackhole distance={distance}" if comment: cmd += f' comment="{comment}"' result = execute_mikrotik_command(cmd) if result.strip(): if "*" in result or result.strip().isdigit(): return f"Blackhole route added successfully. ID: {result}" else: return f"Failed to add blackhole route: {result}" else: return "Blackhole route added successfully." def mikrotik_get_route_statistics() -> str: app_logger.info("Getting route statistics") total_cmd = "/ip route print count-only" total_count = execute_mikrotik_command(total_cmd) active_cmd = "/ip route print count-only where active=yes" active_count = execute_mikrotik_command(active_cmd) dynamic_cmd = "/ip route print count-only where dynamic=yes" dynamic_count = execute_mikrotik_command(dynamic_cmd) static_cmd = "/ip route print count-only where static=yes" static_count = execute_mikrotik_command(static_cmd) disabled_cmd = "/ip route print count-only where disabled=yes" disabled_count = execute_mikrotik_command(disabled_cmd) stats = [ f"Total routes: {total_count.strip()}", f"Active routes: {active_count.strip()}", f"Dynamic routes: {dynamic_count.strip()}", f"Static routes: {static_count.strip()}", f"Disabled routes: {disabled_count.strip()}" ] return "ROUTE STATISTICS:\n\n" + "\n".join(stats)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tarcisiodier/mcp-mikrotik'

If you have feedback or need assistance with the MCP directory API, please join our Discord server