mi_execute
Run a registered OpenSIPS MI command with optional parameters, enforcing read or write permission scopes based on the command's registration.
Instructions
Execute a known OpenSIPS MI command.
The command must be registered in :data:MI_COMMANDS. The specific
permission recorded on the catalogue entry (mi.read or mi.write)
is enforced in addition to the outer mi.execute scope, so a future
role with read-only MI access cannot use this tool to run a write
command.
Parameters
command:
The MI command name (e.g. ul_dump, lb_list).
params:
Optional parameters for the MI command.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | ||
| params | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/opensips_mcp/tools/mi_tools.py:17-55 (handler)The main mi_execute handler function. Decorated with @mcp.tool() to register as an MCP tool, @audited('mi_execute') for audit logging, and @require_permission('mi.execute') for RBAC. Looks up the command in MI_COMMANDS registry, enforces per-command dynamic permission (mi.read/mi.write), sanitizes params, and delegates to mi_client.execute().
@mcp.tool() @audited("mi_execute") @require_permission("mi.execute") async def mi_execute( ctx: Context, command: str, params: dict[str, Any] | None = None, ) -> dict[str, Any]: """Execute a known OpenSIPS MI command. The *command* must be registered in :data:`MI_COMMANDS`. The specific permission recorded on the catalogue entry (``mi.read`` or ``mi.write``) is enforced in addition to the outer ``mi.execute`` scope, so a future role with read-only MI access cannot use this tool to run a write command. Parameters ---------- command: The MI command name (e.g. ``ul_dump``, ``lb_list``). params: Optional parameters for the MI command. """ cmd = MI_COMMANDS.get(command) if cmd is None: return { "error": ( f"Unknown MI command {command!r}. " "Only commands registered in MI_COMMANDS may be executed; " "call mi_list_commands() to discover available names." ), } # Enforce the per-command permission on top of the outer mi.execute gate. require_dynamic_permission(ctx, cmd.permission) app = ctx.request_context.lifespan_context clean_params = sanitize_mi_params(params) if params else None result = await app.mi_client.execute(command, clean_params) return result - MI command registry defining the MICommand dataclass and the MI_COMMANDS dict. This is the schema/registry that mi_execute uses to validate command names and enforce per-command permissions (mi.read vs mi.write).
"""MI command registry — catalogue of known OpenSIPS MI commands.""" from __future__ import annotations from dataclasses import dataclass, field @dataclass class MICommand: """Describes a single OpenSIPS Management Interface command.""" name: str module: str description: str params: list[str] = field(default_factory=list) permission: str = "mi.read" category: str = "core" MI_COMMANDS: dict[str, MICommand] = {} def _r( name: str, module: str, desc: str, params: list[str] | None = None, permission: str = "mi.read", category: str = "core", ) -> None: MI_COMMANDS[name] = MICommand(name, module, desc, params or [], permission, category) # --------------------------------------------------------------------------- # Core # --------------------------------------------------------------------------- _r("ps", "core", "List running OpenSIPS processes") _r("which", "core", "List available MI commands") _r("get_statistics", "core", "Retrieve runtime statistics", ["statistics"]) _r("reset_statistics", "core", "Reset a statistic to zero", ["statistics"], "mi.write") _r("list_tcp_conns", "core", "List active TCP/TLS connections") _r("mem", "core", "Show memory usage details") _r("uptime", "core", "Show server uptime") _r("version", "core", "Show server version string") # --------------------------------------------------------------------------- # Dispatcher # --------------------------------------------------------------------------- _r("ds_list", "dispatcher", "List dispatcher destination sets", category="dispatcher") _r("ds_reload", "dispatcher", "Reload dispatcher sets from database", permission="mi.write", category="dispatcher") _r("ds_set_state", "dispatcher", "Set state of a dispatcher destination", ["state", "group", "address"], "mi.write", "dispatcher") _r("ds_push_script_attrs", "dispatcher", "Push script attributes to a dispatcher destination", ["group", "address", "attrs"], "mi.write", "dispatcher") # --------------------------------------------------------------------------- # Dialog # --------------------------------------------------------------------------- _r("dlg_list", "dialog", "List active dialogs", ["callid", "from_tag"], category="dialog") _r("dlg_list_ctx", "dialog", "List dialog contexts", ["callid", "from_tag"], category="dialog") _r("dlg_end_dlg", "dialog", "End (tear down) a dialog", ["dialog_id", "extra_hdrs"], "mi.write", "dialog") _r("profile_get_size", "dialog", "Get number of dialogs in a profile", ["profile", "value"], category="dialog") _r("profile_list_dlgs", "dialog", "List dialogs in a profile", ["profile", "value"], category="dialog") _r("profile_get_values", "dialog", "Get values in a dialog profile", ["profile"], category="dialog") _r("profile_end_dlgs", "dialog", "End all dialogs in a profile", ["profile", "value"], "mi.write", "dialog") _r("dlg_db_sync", "dialog", "Synchronize dialogs to database", permission="mi.write", category="dialog") _r("dlg_restore_db", "dialog", "Restore dialogs from database", permission="mi.write", category="dialog") _r("list_all_profiles", "dialog", "List all dialog profiles", category="dialog") _r("dlg_push_var", "dialog", "Push a variable into a dialog", ["callid", "from_tag", "var_name", "var_value"], "mi.write", "dialog") _r("dlg_send_sequential", "dialog", "Send sequential request in dialog", ["callid", "from_tag"], "mi.write", "dialog") _r("set_dlg_profile", "dialog", "Add a dialog to a profile", ["callid", "from_tag", "profile", "value"], "mi.write", "dialog") _r("unset_dlg_profile", "dialog", "Remove a dialog from a profile", ["callid", "from_tag", "profile", "value"], "mi.write", "dialog") # --------------------------------------------------------------------------- # Usrloc # --------------------------------------------------------------------------- _r("ul_dump", "usrloc", "Dump user location table contents", ["brief"], category="usrloc") _r("ul_show_contact", "usrloc", "Show contacts for an AOR", ["table_name", "aor"], category="usrloc") _r("ul_add", "usrloc", "Add a contact to usrloc", ["table_name", "aor", "contact", "expires", "q", "flags", "cflags", "methods"], "mi.write", "usrloc") _r("ul_rm", "usrloc", "Remove all contacts for an AOR", ["table_name", "aor"], "mi.write", "usrloc") _r("ul_rm_contact", "usrloc", "Remove a specific contact", ["table_name", "aor", "contact"], "mi.write", "usrloc") _r("ul_flush", "usrloc", "Flush usrloc to database", permission="mi.write", category="usrloc") _r("ul_sync", "usrloc", "Synchronize usrloc from database", permission="mi.write", category="usrloc") # --------------------------------------------------------------------------- # Load Balancer # --------------------------------------------------------------------------- _r("lb_list", "load_balancer", "List load balancer destinations", category="load_balancer") _r("lb_reload", "load_balancer", "Reload load balancer from database", permission="mi.write", category="load_balancer") _r("lb_resize", "load_balancer", "Resize capacity of a resource", ["group_id", "dst_id", "resource", "new_capacity"], "mi.write", "load_balancer") _r("lb_status", "load_balancer", "Get or set status of a LB destination", ["group_id", "dst_id", "new_status"], category="load_balancer") # --------------------------------------------------------------------------- # Dynamic Routing # --------------------------------------------------------------------------- _r("dr_reload", "drouting", "Reload dynamic routing data", permission="mi.write", category="drouting") _r("dr_gw_status", "drouting", "Get or set gateway status", ["id", "status"], category="drouting") _r("dr_carrier_status", "drouting", "Get or set carrier status", ["id", "status"], category="drouting") _r("dr_reload_status", "drouting", "Check reload status", category="drouting") _r("dr_number_routing", "drouting", "Test number routing", ["number", "group_id"], category="drouting") _r("dr_enable_probing", "drouting", "Enable or disable gateway probing", ["status"], "mi.write", "drouting") # --------------------------------------------------------------------------- # Permissions # --------------------------------------------------------------------------- _r("address_reload", "permissions", "Reload address table", permission="mi.write", category="permissions") _r("address_dump", "permissions", "Dump address table contents", category="permissions") _r("subnet_dump", "permissions", "Dump subnet table contents", category="permissions") _r("allow_uri", "permissions", "Check if a URI is allowed", ["basename", "uri", "contact"], category="permissions") # --------------------------------------------------------------------------- # RTPEngine # --------------------------------------------------------------------------- _r("rtpengine_show", "rtpengine", "Show RTPEngine instances and status", category="rtpengine") _r("rtpengine_enable", "rtpengine", "Enable or disable an RTPEngine instance", ["url", "enable"], "mi.write", "rtpengine") _r("rtpengine_reload", "rtpengine", "Reload RTPEngine configuration", permission="mi.write", category="rtpengine") _r("teardown", "rtpengine", "Tear down an active media session", ["callid"], "mi.write", "rtpengine") # --------------------------------------------------------------------------- # Tracer # --------------------------------------------------------------------------- _r("trace", "tracer", "Show current tracing status", category="tracer") _r("trace_start", "tracer", "Start SIP tracing", ["filter"], "mi.write", "tracer") _r("trace_stop", "tracer", "Stop SIP tracing", permission="mi.write", category="tracer") # --------------------------------------------------------------------------- # Rate Limit # --------------------------------------------------------------------------- _r("rl_list", "ratelimit", "List rate limiting pipes", category="ratelimit") _r("rl_dump_pipe", "ratelimit", "Dump pipe details", category="ratelimit") _r("rl_reset_pipe", "ratelimit", "Reset a rate limiting pipe", ["pipe"], "mi.write", "ratelimit") _r("rl_set_pid", "ratelimit", "Set PID controller parameters", ["ki", "kp", "kd"], "mi.write", "ratelimit") _r("rl_get_pid", "ratelimit", "Get PID controller parameters", category="ratelimit") _r("rl_bin_status", "ratelimit", "Show binary replication status", category="ratelimit") # --------------------------------------------------------------------------- # Clusterer # --------------------------------------------------------------------------- _r("clusterer_list", "clusterer", "List cluster nodes", category="clusterer") _r("clusterer_list_topology", "clusterer", "List cluster topology", category="clusterer") _r("clusterer_set_status", "clusterer", "Set status of a cluster node", ["cluster_id", "node_id", "status"], "mi.write", "clusterer") _r("clusterer_reload", "clusterer", "Reload clusterer data", permission="mi.write", category="clusterer") _r("cluster_send_mi", "clusterer", "Send MI command to a specific cluster node", ["cluster_id", "node_id", "cmd", "params"], "mi.write", "clusterer") _r("cluster_broadcast_mi", "clusterer", "Broadcast MI command to all cluster nodes", ["cluster_id", "cmd", "params"], "mi.write", "clusterer") _r("clusterer_list_cap", "clusterer", "List cluster capabilities", category="clusterer") _r("clusterer_set_cap_status", "clusterer", "Set capability status", ["cluster_id", "capability", "status"], "mi.write", "clusterer") _r("clusterer_shtag_set_active", "clusterer", "Set a sharing tag as active", ["tag"], "mi.write", "clusterer") _r("clusterer_list_shtags", "clusterer", "List sharing tags", category="clusterer") _r("clusterer_remove_node", "clusterer", "Remove a node from the cluster", ["cluster_id", "node_id"], "mi.write", "clusterer") # --------------------------------------------------------------------------- # Call Center # --------------------------------------------------------------------------- _r("cc_reload", "call_center", "Reload call center agents and flows", permission="mi.write", category="call_center") _r("cc_agent_login", "call_center", "Log a call center agent in or out", ["agent_id", "state"], "mi.write", "call_center") _r("cc_list_queue", "call_center", "List queued calls", category="call_center") _r("cc_list_flows", "call_center", "List call center flows", category="call_center") _r("cc_list_agents", "call_center", "List call center agents", category="call_center") _r("cc_list_calls", "call_center", "List active call center calls", category="call_center") _r("cc_dispatch_call_to_agent", "call_center", "Dispatch a queued call to a specific agent", ["call_id", "agent_id"], "mi.write", "call_center") _r("cc_reset_stats", "call_center", "Reset call center statistics", permission="mi.write", category="call_center") # --------------------------------------------------------------------------- # Transaction (TM) # --------------------------------------------------------------------------- _r("t_uac_dlg", "tm", "Generate a SIP request via UAC", ["method", "ruri", "headers", "body"], "mi.write", "tm") _r("t_uac_cancel", "tm", "Cancel a pending UAC transaction", ["callid", "cseq"], "mi.write", "tm") _r("t_hash", "tm", "Show transaction hash table statistics", category="tm") _r("t_reply", "tm", "Send a reply to a transaction", ["code", "reason", "trans_id", "to_tag"], "mi.write", "tm") # --------------------------------------------------------------------------- # CFG Utils # --------------------------------------------------------------------------- _r("rand_set_prob", "cfgutils", "Set random probability", ["prob"], "mi.write", "cfgutils") _r("rand_reset_prob", "cfgutils", "Reset random probability to default", permission="mi.write", category="cfgutils") _r("rand_get_prob", "cfgutils", "Get current random probability", category="cfgutils") _r("check_config_hash", "cfgutils", "Check if config file hash has changed", category="cfgutils") _r("get_config_hash", "cfgutils", "Get current config file hash", category="cfgutils") _r("shv_set", "cfgutils", "Set a shared variable", ["name", "type", "value"], "mi.write", "cfgutils") _r("shv_get", "cfgutils", "Get a shared variable value", ["name"], category="cfgutils") # --------------------------------------------------------------------------- # Pike # --------------------------------------------------------------------------- _r("pike_list", "pike", "List blocked IP addresses", category="pike") _r("pike_rm", "pike", "Unblock an IP address", ["ip"], "mi.write", "pike") # --------------------------------------------------------------------------- # TLS Management # --------------------------------------------------------------------------- _r("tls_reload", "tls_mgm", "Reload TLS domains from database", permission="mi.write", category="tls_mgm") _r("tls_list", "tls_mgm", "List configured TLS domains", category="tls_mgm") _r("tls_info", "tls_mgm", "Show TLS domain configuration details", category="tls_mgm") _r("tls_trace", "tls_mgm", "Enable or disable TLS tracing", ["mode"], "mi.write", "tls_mgm") # --------------------------------------------------------------------------- # Registrar # --------------------------------------------------------------------------- _r("reg_dump", "registrar", "Dump registrar state", category="registrar") # --------------------------------------------------------------------------- # Domain # --------------------------------------------------------------------------- _r("domain_reload", "domain", "Reload domain table from database", permission="mi.write", category="domain") _r("domain_dump", "domain", "Dump the domain table", category="domain") # --------------------------------------------------------------------------- # Dialplan # --------------------------------------------------------------------------- _r("dp_reload", "dialplan", "Reload dialplan rules from database", permission="mi.write", category="dialplan") _r("dp_translate", "dialplan", "Test dialplan translation", ["dpid", "input"], category="dialplan") # --------------------------------------------------------------------------- # NAT Helper # --------------------------------------------------------------------------- _r("nh_enable_ping", "nathelper", "Enable or disable NAT pinging", ["status"], "mi.write", "nathelper") # --------------------------------------------------------------------------- # Accounting # --------------------------------------------------------------------------- _r("acc_reload", "acc", "Reload accounting configuration", permission="mi.write", category="acc") # --------------------------------------------------------------------------- # Fraud Detection # --------------------------------------------------------------------------- _r("fraud_reload", "fraud_detection", "Reload fraud detection rules", permission="mi.write", category="fraud_detection") # --------------------------------------------------------------------------- # Stateless (SL) # --------------------------------------------------------------------------- _r("sl_stats", "sl", "Show stateless reply statistics", category="sl") # --------------------------------------------------------------------------- # B2B Logic / Entities / SCA # --------------------------------------------------------------------------- _r("b2b_list", "b2b_logic", "List active B2B sessions", category="b2b_logic") _r("b2b_terminate", "b2b_logic", "Terminate a B2B session", ["key"], "mi.write", "b2b_logic") _r("b2be_list", "b2b_entities", "List B2B entities (UAC/UAS/server)", category="b2b_entities") _r("b2b_trigger_scenario", "b2b_logic", "Trigger a B2B scenario by id", ["id", "custom_hdrs", "params"], "mi.write", "b2b_logic") _r("b2b_bridge", "b2b_logic", "Bridge a B2B call to a new destination", ["key", "new_dst"], "mi.write", "b2b_logic") _r("b2b_sca_list", "b2b_sca", "List SCA (Shared Call Appearance) subscriptions", category="b2b_sca") # --------------------------------------------------------------------------- # Presence / PUA # --------------------------------------------------------------------------- _r("presence_list_phtable", "presence", "List presence hash table (subscriptions)", category="presence") _r("presence_refresh_watchers", "presence", "Refresh presence watchers", ["pres_uri", "event", "refresh_type"], "mi.write", "presence") _r("presence_cleanup", "presence", "Force cleanup of expired presence records", permission="mi.write", category="presence") _r("pres_expose", "presence", "Expose a presence rule", ["presentity_uri", "rules_doc"], "mi.write", "presence") _r("pua_publish", "pua", "Publish a PUA event", ["pres_uri", "expires", "event", "content_type", "id", "etag", "outbound_proxy", "extra_headers", "body"], "mi.write", "pua") _r("pua_list", "pua", "List active PUA publications", category="pua") # --------------------------------------------------------------------------- # Mid Registrar # --------------------------------------------------------------------------- _r("mid_reg_dump", "mid_registrar", "Dump mid-registrar AoR state", category="mid_registrar") _r("mid_reg_update", "mid_registrar", "Update a mid-registrar record", ["aor"], "mi.write", "mid_registrar") # --------------------------------------------------------------------------- # Carrier Route # --------------------------------------------------------------------------- _r("cr_reload_routes", "carrierroute", "Reload carrier route tables", permission="mi.write", category="carrierroute") _r("cr_dump_routes", "carrierroute", "Dump carrier route tables", category="carrierroute") _r("cr_replace_host", "carrierroute", "Replace host in carrier route", ["carrier", "domain", "prefix", "host", "new_host"], "mi.write", "carrierroute") _r("cr_deactivate_host", "carrierroute", "Deactivate a host in carrier route", ["carrier", "domain", "prefix", "host"], "mi.write", "carrierroute") _r("cr_activate_host", "carrierroute", "Activate a host in carrier route", ["carrier", "domain", "prefix", "host"], "mi.write", "carrierroute") # --------------------------------------------------------------------------- # LCR (Least Cost Routing) # --------------------------------------------------------------------------- _r("lcr_reload", "lcr", "Reload LCR rules and gateways", permission="mi.write", category="lcr") _r("lcr_dump", "lcr", "Dump LCR rules and gateways", category="lcr") # --------------------------------------------------------------------------- # UAC / UAC Registrant # --------------------------------------------------------------------------- _r("uac_reg_list", "uac_registrant", "List UAC registrations", ["aor", "contact"], category="uac_registrant") _r("uac_reg_reload", "uac_registrant", "Reload UAC registrant data", permission="mi.write", category="uac_registrant") _r("uac_reg_enable", "uac_registrant", "Enable a UAC registration", ["aor"], "mi.write", "uac_registrant") _r("uac_reg_disable", "uac_registrant", "Disable a UAC registration", ["aor"], "mi.write", "uac_registrant") _r("uac_reg_force_register", "uac_registrant", "Force registration for an AoR", ["aor"], "mi.write", "uac_registrant") # --------------------------------------------------------------------------- # Event Interface # --------------------------------------------------------------------------- _r("event_list", "event_interface", "List available event types", category="event_interface") _r("event_subscribers_list", "event_interface", "List subscribers to events", ["event", "socket"], category="event_interface") _r("event_subscribe", "event_interface", "Subscribe to an event", ["event", "socket", "expire"], "mi.write", "event_interface") _r("events_subscribe", "event_interface", "Subscribe to an event (alias)", ["event", "socket", "expire"], "mi.write", "event_interface") # --------------------------------------------------------------------------- # Topology Hiding # --------------------------------------------------------------------------- _r("th_list", "topology_hiding", "List topology-hidden dialogs", category="topology_hiding") # --------------------------------------------------------------------------- # AVP Operations # --------------------------------------------------------------------------- _r("avp_get", "avpops", "Get an AVP value", ["name"], category="avpops") _r("avp_set", "avpops", "Set an AVP value", ["name", "value"], "mi.write", "avpops") _r("avp_delete", "avpops", "Delete an AVP", ["name"], "mi.write", "avpops") # --------------------------------------------------------------------------- # Lookup helpers # --------------------------------------------------------------------------- def get_command(name: str) -> MICommand | None: """Return the :class:`MICommand` for *name*, or ``None``.""" return MI_COMMANDS.get(name) def get_commands_by_category(category: str) -> list[MICommand]: """Return all commands belonging to *category*.""" return [c for c in MI_COMMANDS.values() if c.category == category] def get_all_commands() -> list[MICommand]: """Return every registered MI command.""" return list(MI_COMMANDS.values()) - src/opensips_mcp/server.py:138-185 (registration)The MCP server instance (FastMCP('opensips-mcp-server')) created at module level. The mi_tools module is imported at line 185 (from opensips_mcp.tools import mi_tools as _mi_tools), which triggers the @mcp.tool() decorator on mi_execute, registering it as an MCP tool.
mcp = FastMCP("opensips-mcp-server", lifespan=app_lifespan) # Register tool modules (imported here to avoid circular imports). # Register prompt modules. from opensips_mcp.prompts import config_prompts as _cfg_prompts # noqa: E402, F401 # Register ecosystem prompt modules. from opensips_mcp.prompts import ecosystem_prompts as _eco_prompts # noqa: E402, F401 from opensips_mcp.prompts import educational_prompts as _edu_prompts # noqa: E402, F401 from opensips_mcp.prompts import migration_prompts as _mig_prompts # noqa: E402, F401 from opensips_mcp.prompts import security_prompts as _sec_prompts # noqa: E402, F401 from opensips_mcp.prompts import sip_prompts as _sip_prompts # noqa: E402, F401 from opensips_mcp.prompts import troubleshoot_prompts as _ts_prompts # noqa: E402, F401 from opensips_mcp.resources import config_resources as _cfg_res # noqa: E402, F401 from opensips_mcp.resources import db_resources as _db_res # noqa: E402, F401 from opensips_mcp.resources import docs_resources as _docs_res # noqa: E402, F401 # Register ecosystem resource modules. from opensips_mcp.resources import ecosystem_resources as _eco_res # noqa: E402, F401 from opensips_mcp.resources import scenario_resources as _scenario_res # noqa: E402, F401 # Register resource modules. from opensips_mcp.resources import system_resources as _sys_res # noqa: E402, F401 from opensips_mcp.tools import acc_tools as _acc_tools # noqa: E402, F401 from opensips_mcp.tools import avpops_tools as _avpops_tools # noqa: E402, F401 from opensips_mcp.tools import b2b_tools as _b2b_tools # noqa: E402, F401 from opensips_mcp.tools import benchmark_tools as _benchmark_tools # noqa: E402, F401 from opensips_mcp.tools import call_center_tools as _cc_tools # noqa: E402, F401 from opensips_mcp.tools import carrierroute_tools as _carrierroute_tools # noqa: E402, F401 from opensips_mcp.tools import cdr_tools as _cdr_tools # noqa: E402, F401 from opensips_mcp.tools import cfg_tools as _cfg_tools # noqa: E402, F401 from opensips_mcp.tools import cluster_tools as _cluster_tools # noqa: E402, F401 from opensips_mcp.tools import db_backup_tools as _db_backup_tools # noqa: E402, F401 from opensips_mcp.tools import diagnostics_tools as _diagnostics_tools # noqa: E402, F401 from opensips_mcp.tools import dialog_tools as _dialog_tools # noqa: E402, F401 from opensips_mcp.tools import dialplan_tools as _dialplan_tools # noqa: E402, F401 from opensips_mcp.tools import dispatcher_tools as _dispatcher_tools # noqa: E402, F401 from opensips_mcp.tools import docker_tools as _docker_tools # noqa: E402, F401 from opensips_mcp.tools import domain_tools as _domain_tools # noqa: E402, F401 from opensips_mcp.tools import drouting_tools as _drouting_tools # noqa: E402, F401 from opensips_mcp.tools import ecosystem_tools as _ecosystem_tools # noqa: E402, F401 from opensips_mcp.tools import event_tools as _event_tools # noqa: E402, F401 from opensips_mcp.tools import fail2ban_tools as _fail2ban_tools # noqa: E402, F401 from opensips_mcp.tools import fraud_tools as _fraud_tools # noqa: E402, F401 from opensips_mcp.tools import homer_tools as _homer_tools # noqa: E402, F401 from opensips_mcp.tools import lcr_tools as _lcr_tools # noqa: E402, F401 from opensips_mcp.tools import load_balancer_tools as _lb_tools # noqa: E402, F401 from opensips_mcp.tools import mi_tools as _mi_tools # noqa: E402, F401 - src/opensips_mcp/mi/client.py:51-205 (helper)MIClient class with execute() method that sends JSON-RPC 2.0 requests to the OpenSIPS MI HTTP endpoint. This is the underlying client that mi_execute calls via app.mi_client.execute(command, clean_params).
class MIClient: """Async JSON-RPC 2.0 client for the OpenSIPS MI HTTP interface. Parameters ---------- http_client: A shared ``httpx.AsyncClient`` instance. base_url: The MI endpoint URL, e.g. ``http://127.0.0.1:8888/mi``. """ def __init__(self, http_client: httpx.AsyncClient, base_url: str) -> None: self._http = http_client self._base_url = base_url.rstrip("/") self._id_counter = itertools.count(1) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def execute( self, method: str, params: dict[str, Any] | list[Any] | None = None, ) -> dict[str, Any]: """Send a JSON-RPC 2.0 request to the MI endpoint. Returns the ``result`` field on success. Raises an appropriate :class:`MIError` subclass on failure. """ payload: dict[str, Any] = { "jsonrpc": "2.0", "method": method, "id": next(self._id_counter), } if params is not None: payload["params"] = params response_data = await self._send_with_retry(payload, method=method) return self._parse_response(response_data, method) async def health_check(self) -> bool: """Return ``True`` if the MI endpoint is reachable and responsive.""" try: await self.execute("which") return True except MIError: return False # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _send_with_retry( self, payload: dict[str, Any], *, method: str | None = None, ) -> dict[str, Any]: """POST *payload* with retries for transient errors only. Retry policy: * ``ConnectError`` / ``TimeoutException`` — the request never reached the server (or we never got a response). Safe to retry regardless of method, retried with exponential backoff. * HTTP 5xx — the server processed the request and returned an error. **Only retried for commands declared idempotent in the MI command registry.** Non-idempotent writes (``dlg_end_dlg``, ``ul_add``, ``*_reload``, ...) are NOT retried because a 5xx may indicate a partially-applied state mutation; retrying could double-terminate dialogs, create duplicate contacts, or leave reload state inconsistent. Unknown methods default to non-idempotent (safe default). """ last_exc: Exception | None = None idempotent = _is_idempotent(method) for attempt in range(_MAX_RETRIES): try: response = await self._http.post( self._base_url, json=payload, headers={"Content-Type": "application/json"}, ) if response.status_code >= 500: err = MIError( f"MI returned HTTP {response.status_code}", code=response.status_code, ) if not idempotent: # Don't retry blind — a 5xx after a write may have # already mutated server state. raise err last_exc = err logger.warning( "MI %s returned 5xx (attempt %d/%d) — idempotent, retrying", method or "<unknown>", attempt + 1, _MAX_RETRIES, ) await self._backoff(attempt) continue return response.json() # type: ignore[no-any-return] except httpx.ConnectError as exc: last_exc = exc logger.warning( "MI connection failed (attempt %d/%d)", attempt + 1, _MAX_RETRIES, ) await self._backoff(attempt) except httpx.TimeoutException as exc: last_exc = exc logger.warning( "MI request timed out (attempt %d/%d)", attempt + 1, _MAX_RETRIES, ) await self._backoff(attempt) except httpx.HTTPError as exc: # Non-retryable HTTP errors raise MIConnectionError(str(exc)) from exc # Exhausted retries if isinstance(last_exc, httpx.TimeoutException): raise MITimeoutError() from last_exc if isinstance(last_exc, httpx.ConnectError): raise MIConnectionError(str(last_exc)) from last_exc raise MIConnectionError(str(last_exc)) from last_exc @staticmethod async def _backoff(attempt: int) -> None: delay = _BACKOFF_BASE * (2**attempt) await asyncio.sleep(delay) @staticmethod def _parse_response(data: dict[str, Any], method: str) -> dict[str, Any]: """Extract the result from a JSON-RPC response or raise on error.""" if "error" in data: err = data["error"] code = err.get("code", -1) message = err.get("message", "Unknown MI error") if code == -32601: raise MICommandNotFoundError(method) raise MIError(message=message, code=code) result = data.get("result") if result is None: # Some MI commands return an empty success — normalise to dict. return {} if isinstance(result, dict): return result # Wrap scalar / list results so callers always get a dict. return {"result": result} - The @audited decorator applied to mi_execute. Logs audit entries with operation='mi_execute', the tool parameters, result status, and the active role.
def audited(operation: str): """Decorator that logs audit entries for tool calls.""" def decorator(func): @wraps(func) async def wrapper(ctx: Context, *args, **kwargs): app = ctx.request_context.lifespan_context role = getattr(app.settings, "role", "unknown") try: result = await func(ctx, *args, **kwargs) audit_log(operation, kwargs, "success", role) return result except Exception as e: audit_log(operation, kwargs, f"error: {e}", role) raise return wrapper return decorator