cfg_build_from_flags
Build a flat OpenSIPS configuration from WITH_* flags by inlining only enabled blocks, eliminating nested #!ifdef clutter for a clean, validated output.
Instructions
Build a flat opensips.cfg from a set of WITH_* flags.
Materialises the Kamailio-style flag vocabulary as a flat, no-#!ifdef
OpenSIPS config. Only the enabled blocks are inlined, so the output
reads cleanly without the nested-ifdef problem Kamailio users hit.
Parameters
flags:
Any combination of WITH_* flags (see cfg_list_flags).
site_params:
Optional site values (LISTEN, DB_URL, TLS_CERT_PATH,
TLS_KEY_PATH, RTP_PROXY_ADDR, CHILDREN).
validate:
Run the result through opensips -C when available.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| flags | Yes | ||
| site_params | No | ||
| validate | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The MCP tool handler for 'cfg_build_from_flags'. Resolves WITH_* flags into a composition, renders a flat opensips.cfg, optionally validates with opensips -C, and returns the config, composition details, and validation results.
async def cfg_build_from_flags( ctx: Context, flags: list[str], site_params: dict[str, str] | None = None, validate: bool = True, ) -> dict[str, Any]: """Build a flat ``opensips.cfg`` from a set of ``WITH_*`` flags. Materialises the Kamailio-style flag vocabulary as a flat, no-``#!ifdef`` OpenSIPS config. Only the enabled blocks are inlined, so the output reads cleanly without the nested-ifdef problem Kamailio users hit. Parameters ---------- flags: Any combination of ``WITH_*`` flags (see ``cfg_list_flags``). site_params: Optional site values (``LISTEN``, ``DB_URL``, ``TLS_CERT_PATH``, ``TLS_KEY_PATH``, ``RTP_PROXY_ADDR``, ``CHILDREN``). validate: Run the result through ``opensips -C`` when available. """ try: composition = _composer.resolve(flags) config = _composer.render_config(composition, site_params or {}) except Exception as exc: # noqa: BLE001 return {"error": str(exc)} payload = { "config": config, "composition": composition.to_dict(), "lines": len(config.splitlines()), } if validate: payload["validation"] = await _validate_if_available(config) return payload - FlagComposer.render_config() called by cfg_build_from_flags to produce the flat OpenSIPS configuration text from a FlagComposition and site parameters. Includes comment with 'Generated by opensips-mcp cfg_build_from_flags.'
def render_config( self, composition: FlagComposition, site_params: dict[str, str] | None = None, ) -> str: """Produce a flat opensips.cfg with only the enabled features inlined.""" site_params = site_params or {} # Sanitize every interpolated site parameter up-front — any forbidden # char (quote, backslash, newline, backtick) raises before we emit a # single directive. This blocks cfg directive injection. safe_site: dict[str, str] = { k: sanitize_cfg_value(v, field=f"site_params[{k!r}]") for k, v in site_params.items() } # Global parameters header log_level = sanitize_cfg_value( composition.globals.get("log_level", "3"), field="log_level" ) debug_mode = sanitize_cfg_value( composition.globals.get("debug_mode", "no"), field="debug_mode" ) listen = safe_site.get("LISTEN", "udp:0.0.0.0:5060") children = safe_site.get("CHILDREN", "4") db_url = safe_site.get( "DB_URL", "mysql://opensips:opensipsrw@localhost/opensips" ) lines: list[str] = [] lines.append("#") lines.append("# OpenSIPS configuration — flat output of WITH_* feature flags.") lines.append("# Generated by opensips-mcp cfg_build_from_flags.") lines.append(f"# Flags enabled: {', '.join(composition.flags)}") lines.append("#") lines.append("") lines.append("# ---- Global Parameters ----") lines.append(f"debug_mode={debug_mode}") lines.append(f"log_level={log_level}") lines.append("log_stderror=no") lines.append("log_facility=LOG_LOCAL0") lines.append(f"children={children}") lines.append("auto_aliases=no") lines.append(f"listen={listen}") lines.append("") lines.append("# ---- Module Loading (dep-ordered) ----") for m in composition.modules: lines.append(f'loadmodule "{m}.so"') lines.append("") lines.append("# ---- Module Parameters ----") # Per-flag modparams / macros. self._emit_flag_modparams(composition.flags, safe_site, db_url, lines) lines.append("") lines.append("# ---- Routing Logic ----") self._emit_route(composition.flags, lines) return "\n".join(lines) + "\n" # -- flag body emitters ------------------------------------------------- @staticmethod def _emit_flag_modparams( flags: list[str], site: dict[str, str], db_url: str, out: list[str], ) -> None: if "WITH_MI_FIFO" in flags: out.append('modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo")') if "WITH_MI_HTTP" in flags: out.append('modparam("httpd", "port", 8888)') out.append('modparam("tm", "fr_timeout", 5)') out.append('modparam("tm", "fr_inv_timeout", 30)') out.append('modparam("rr", "append_fromtag", 1)') out.append('modparam("rr", "enable_full_lr", 1)') if "WITH_DIALOG" in flags: out.append('modparam("dialog", "db_mode", 1)') out.append(f'modparam("dialog", "db_url", "{db_url}")') out.append('modparam("dialog", "default_timeout", 43200)') if "WITH_USRLOCDB" in flags: out.append('modparam("usrloc", "db_mode", 2)') out.append(f'modparam("usrloc", "db_url", "{db_url}")') out.append('modparam("usrloc", "nat_bflag", "NAT")') if "WITH_AUTH" in flags: out.append(f'modparam("auth_db", "db_url", "{db_url}")') out.append('modparam("auth_db", "calculate_ha1", 1)') out.append('modparam("auth_db", "password_column", "password")') if "WITH_DISPATCHER" in flags: out.append(f'modparam("dispatcher", "db_url", "{db_url}")') out.append('modparam("dispatcher", "ds_ping_method", "OPTIONS")') out.append('modparam("dispatcher", "ds_ping_interval", 10)') if "WITH_DROUTING" in flags: out.append(f'modparam("drouting", "db_url", "{db_url}")') if "WITH_LOAD_BALANCER" in flags: out.append(f'modparam("load_balancer", "db_url", "{db_url}")') out.append('modparam("load_balancer", "probing_method", "OPTIONS")') out.append('modparam("load_balancer", "probing_interval", 30)') if "WITH_ACCDB" in flags or "WITH_CDRDB" in flags: out.append(f'modparam("acc", "db_url", "{db_url}")') out.append('modparam("acc", "db_flag", "ACC_DO")') out.append('modparam("acc", "db_missed_flag", "ACC_MISSED")') if "WITH_CDRDB" in flags: out.append('modparam("acc", "cdr_flag", "CDR_FLAG")') out.append( 'modparam("acc", "db_extra", ' '"from_hdr=$fU;to_hdr=$tU;src_ip=$si;callid=$ci")' ) if "WITH_NAT_TRAVERSAL" in flags or "WITH_NAT" in flags: out.append('modparam("nathelper", "natping_interval", 30)') out.append('modparam("nathelper", "sip_natping_flag", "SIP_PING_FLAG")') if "WITH_RTPENGINE" in flags: sock = site.get("RTP_PROXY_ADDR", "udp:127.0.0.1:22222") out.append(f'modparam("rtpengine", "rtpengine_sock", "{sock}")') if "WITH_TLS" in flags or "WITH_WEBRTC" in flags: out.append('modparam("tls_mgm", "server_domain", "srv")') out.append('modparam("tls_mgm", "match", "[srv]server_name=*")') cert = site.get("TLS_CERT_PATH", "/etc/opensips/tls/cert.pem") key = site.get("TLS_KEY_PATH", "/etc/opensips/tls/privkey.pem") out.append(f'modparam("tls_mgm", "certificate", "[srv]{cert}")') out.append(f'modparam("tls_mgm", "private_key", "[srv]{key}")') out.append('modparam("tls_mgm", "verify_cert", "[srv]0")') out.append('modparam("tls_mgm", "require_cert", "[srv]0")') if "WITH_PROMETHEUS" in flags: out.append('modparam("httpd", "port", 8888)') @staticmethod def _emit_route(flags: list[str], out: list[str]) -> None: out.append("route {") out.append(" if (!mf_process_maxfwd_header(10)) {") out.append(' sl_send_reply(483, "Too Many Hops");') out.append(" exit;") out.append(" }") out.append("") if "WITH_ANTIFLOOD" in flags or "WITH_PIKE" in flags: out.append(" if (!pike_check_req()) {") out.append(' xlog("L_ALERT","pike blocked $si\\n");') out.append(' sl_send_reply(503, "Service Unavailable");') out.append(" exit;") out.append(" }") out.append("") if "WITH_NAT" in flags or "WITH_NAT_TRAVERSAL" in flags: out.append(" force_rport();") out.append(" if (nat_uac_test(19)) {") out.append(' if (is_method("REGISTER")) {') out.append(" fix_nated_register();") out.append(' setbflag("NAT");') out.append(" } else {") out.append(" fix_nated_contact();") out.append(' setflag("NAT_FLAG");') out.append(" }") out.append(" }") out.append("") out.append(' if (is_method("INVITE|SUBSCRIBE")) {') out.append(" record_route();") out.append(" }") out.append("") out.append(" if (has_totag()) {") out.append(" if (loose_route()) {") if "WITH_CDRDB" in flags: out.append(' if (is_method("BYE")) {') out.append(' do_accounting("db", "cdr");') out.append(" }") out.append(" t_relay();") out.append(" exit;") out.append(" }") out.append(' sl_send_reply(404, "Not here");') out.append(" exit;") out.append(" }") out.append("") out.append(' if (is_method("CANCEL")) {') out.append(" if (t_check_trans()) { t_relay(); }") out.append(" exit;") out.append(" }") out.append(" t_check_trans();") out.append("") if "WITH_AUTH" in flags: out.append(' if (is_method("REGISTER")) {') out.append(' if (!www_authorize("", "subscriber")) {') out.append(' www_challenge("", 0);') out.append(" exit;") out.append(" }") out.append(" consume_credentials();") if "WITH_USRLOCDB" in flags: out.append(' if (!save("location")) {') out.append(' sl_send_reply(500, "Server Error");') out.append(" }") out.append(" exit;") out.append(" }") out.append("") if "WITH_DIALOG" in flags: out.append(' if (is_method("INVITE")) {') out.append(" create_dialog();") if "WITH_CDRDB" in flags: out.append(' setflag("ACC_DO");') out.append(' setflag("ACC_MISSED");') out.append(' setflag("CDR_FLAG");') out.append(' do_accounting("db", "cdr");') out.append(" }") out.append("") # Final dispatch if "WITH_DISPATCHER" in flags: out.append(" if (!ds_select_dst(1, 4)) {") out.append(' sl_send_reply(502, "No Destination");') out.append(" exit;") out.append(" }") elif "WITH_DROUTING" in flags: out.append(" if (!do_routing(0)) {") out.append(' sl_send_reply(404, "No Route");') out.append(" exit;") out.append(" }") elif "WITH_LOAD_BALANCER" in flags: out.append(' if (!lb_start(1, "pstn")) {') out.append(' sl_send_reply(503, "No Backend");') out.append(" exit;") out.append(" }") elif "WITH_USRLOCDB" in flags: out.append(' if (!lookup("location")) {') out.append(' sl_send_reply(404, "Not Found");') out.append(" exit;") out.append(" }") out.append(" t_relay();") out.append(" exit;") out.append("}") - FlagComposer.resolve() resolves WITH_* flags into a FlagComposition (modules, globals, implied flags) — called by cfg_build_from_flags as the first step before rendering.
def resolve(self, flags: list[str]) -> FlagComposition: # Canonicalize case — accept both ``WITH_AUTH`` and ``with_auth``. raw = [f.strip().upper() for f in flags if f.strip()] unknown = [f for f in raw if f not in FLAGS] known = [f for f in raw if f in FLAGS] resolved: set[str] = set(known) implied_by: dict[str, list[str]] = {} # Transitive implication closure. changed = True while changed: changed = False for flag in list(resolved): for implied in _FLAG_IMPLIES.get(flag, set()): if implied not in resolved: resolved.add(implied) implied_by.setdefault(implied, []).append(flag) changed = True modules: set[str] = set() globals_: dict[str, str] = {} for flag in resolved: modules.update(_FLAG_MODULES.get(flag, set())) globals_.update(_FLAG_GLOBALS.get(flag, {})) # Resolve transitive module dependencies from the catalog. from opensips_mcp.cfg.module_catalog import get_module deps_added = True while deps_added: deps_added = False for m in list(modules): info = get_module(m) if not info: continue for dep in info.depends_on: if dep not in modules: modules.add(dep) deps_added = True # Stable ordering: core first, then dependencies, then the rest. ordered = self._topologically_order(modules) return FlagComposition( flags=sorted(resolved), modules=ordered, globals=globals_, unknown_flags=sorted(unknown), implied_by=implied_by, ) @staticmethod def _topologically_order(modules: set[str]) -> list[str]: """Order modules so that every dependency loads before its dependant.""" from opensips_mcp.cfg.module_catalog import get_module order: list[str] = [] seen: set[str] = set() def visit(m: str) -> None: if m in seen: return info = get_module(m) if info: for dep in info.depends_on: if dep in modules: visit(dep) seen.add(m) order.append(m) # Load core / signaling first for readability. preferred = [ "signaling", "sl", "tm", "rr", "maxfwd", "textops", "sipmsgops", "uri", "proto_udp", "proto_tcp", "proto_tls", "proto_ws", "proto_wss", "tls_mgm", ] for p in preferred: if p in modules: visit(p) for m in sorted(modules): visit(m) return order - FlagComposition dataclass — the schema/return type produced by flag resolution and returned as part of the cfg_build_from_flags output under 'composition'.
@dataclass class FlagComposition: flags: list[str] = field(default_factory=list) modules: list[str] = field(default_factory=list) globals: dict[str, str] = field(default_factory=dict) unknown_flags: list[str] = field(default_factory=list) implied_by: dict[str, list[str]] = field(default_factory=dict) def to_dict(self) -> dict: return { "flags": self.flags, "modules": self.modules, "globals": self.globals, "unknown_flags": self.unknown_flags, "implied_by": self.implied_by, } - src/opensips_mcp/tools/cfg_tools.py:1365-1366 (registration)Tool registration via @mcp.tool() and @audited('cfg_build_from_flags') decorators on the cfg_build_from_flags handler function.
@mcp.tool() @audited("cfg_build_from_flags")