InterSystems IRIS MCP Server
by caretdev
Verified
- mcp-server-iris
- src
- mcp_server_iris
from enum import Enum
from mcp_server_iris.mcpserver import Context
from mcp_server_iris.tools import transaction, raise_on_error
from iris import IRISReference
class ProductionStatus(Enum):
Unknown = 0
Running = 1
Stopped = 2
Suspended = 3
Troubled = 4
NetworkStopped = 5
ShardWorkerProhibited = 6
log_types = [
"ASSERT",
"ERROR",
"WARNING",
"INFO",
"TRACE",
"ALERT",
]
class LogType(Enum):
Assert = 1
Error = 2
Warning = 3
Info = 4
Trace = 5
Alert = 6
def production_items_status(iris, running: bool, name: str) -> list[str]:
result = []
namespace = iris.classMethodString("%SYSTEM.Process", "NameSpace")
prod = iris.classMethodObject("Ens.Config.Production", "%OpenId", name)
if not prod:
raise ValueError(f"Production {name} not found")
items = prod.getObject("Items")
for i in range(1, items.invokeInteger("Count") + 1):
item = items.invokeObject("GetAt", i)
item_name = item.getString("Name")
status_info = []
enabled = item.getBoolean("Enabled")
status_info += [f"Enabled={enabled}"]
if enabled:
val = iris.getString(
"^IRIS.Temp.EnsHostMonitor", namespace, item_name, "%Status"
)
status_info += [f"Status={val}"]
result.append(f"{item_name}: " + "; ".join(status_info))
return result
def init(server, logger):
@server.tool(description="Create an Interoperability Production")
async def interoperability_production_create(name: str, ctx: Context) -> bool:
if "." not in name:
raise ValueError(
"Production name must in format packagenamespace.productionname, where packagenamespace can have multiple parts separated by dots"
)
iris = ctx.request_context.lifespan_context["iris"]
with transaction(iris):
prod = iris.classMethodObject(
"%Dictionary.ClassDefinition", "%OpenId", name
)
if prod:
raise ValueError(f"Class {name} already exists")
logger.info(f"Creating Interoperability Production: {name}")
prod = iris.classMethodObject("Ens.Config.Production", "%New", name)
raise_on_error(iris, prod.invokeString("SaveToClass"))
raise_on_error(iris, prod.invokeString("%Save"))
raise_on_error(
iris, iris.classMethodString("%SYSTEM.OBJ", "Compile", name, "ck-d")
)
return True
@server.tool(description="Status of an Interoperability Production")
async def interoperability_production_status(
ctx: Context,
name: str = None,
full_status: bool = False,
) -> str:
logger.info("Interoperability Production Status" + f": {name}" if name else "")
iris = ctx.request_context.lifespan_context["iris"]
refname = IRISReference(iris)
refname.setValue(name)
refstatus = IRISReference(iris)
raise_on_error(
iris,
iris.classMethodString(
"Ens.Director", "GetProductionStatus", refname, refstatus
),
)
if not refname.getValue():
raise ValueError("No running production found")
name = refname.getValue()
status = ProductionStatus(int(refstatus.getValue()))
reason = IRISReference(iris)
needsupdate = iris.classMethodBoolean(
"Ens.Director", "ProductionNeedsUpdate", reason
)
reason_update = (
f"Production needs update: {reason.getValue()}" if needsupdate else ""
)
if status == ProductionStatus.Running and full_status:
items_status = production_items_status(
iris, status == ProductionStatus.Running, name
)
return f"Production {name} is running with items: \n{"\n".join(items_status)}\n{reason_update}"
return f"Production {name} with status: {status.name}\n{reason_update}"
@server.tool(description="Start an Interoperability Production")
async def interoperability_production_start(
ctx: Context,
name: str = None,
) -> str:
logger.info(
"Starting Interoperability Production" + f": {name}" if name else "."
)
iris = ctx.request_context.lifespan_context["iris"]
raise_on_error(
iris,
iris.classMethodString(
"Ens.Director", "StartProduction", *([name] if name else [])
),
)
refname = IRISReference(iris)
name and refname.setValue(name)
refstatus = IRISReference(iris)
status = iris.classMethodString(
"Ens.Director", "GetProductionStatus", refname, refstatus
)
if not name:
name = refname.getValue()
if (
status != "1"
or ProductionStatus(int(refstatus.getValue())) != ProductionStatus.Running
):
raise ValueError(f"Production {name} not started.")
return "Started production"
@server.tool(description="Stop an Interoperability Production")
async def interoperability_production_stop(
ctx: Context,
timeout: int = None,
force: bool = False,
) -> str:
logger.info("Sopping Interoperability Production.")
iris = ctx.request_context.lifespan_context["iris"]
raise_on_error(
iris,
iris.classMethodString(
"Ens.Director", "StopProduction", timeout or 10, force
),
)
return "Stopped production"
@server.tool(description="Recover an Interoperability Production")
async def interoperability_production_recover(
ctx: Context,
) -> str:
logger.info("Recovering Interoperability Production")
iris = ctx.request_context.lifespan_context["iris"]
raise_on_error(
iris, iris.classMethodString("Ens.Director", "RecoverProduction")
)
return "Recovered"
@server.tool(description="Check if an Interoperability Production needs update")
async def interoperability_production_needsupdate(
ctx: Context,
) -> str:
logger.info("Checking if Interoperability Production needs update")
iris = ctx.request_context.lifespan_context["iris"]
reason = IRISReference(iris)
result = iris.classMethodBoolean(
"Ens.Director", "ProductionNeedsUpdate", reason
)
if result:
raise ValueError(f"Production needs update: {reason.getValue()}")
return "Production does not need update"
@server.tool(description="Update Interoperability Production")
async def interoperability_production_update(
ctx: Context,
timeout: int = None,
force: bool = False,
) -> str:
iris = ctx.request_context.lifespan_context["iris"]
raise_on_error(
iris,
iris.classMethodString("Ens.Director", "UpdateProduction", timeout, force),
)
return "Production updated"
@server.tool(description="Get Interoperability Production logs")
async def interoperability_production_logs(
ctx: Context,
item_name: str = None,
limit: int = 10,
log_type_info: bool = False,
log_type_alert: bool = False,
log_type_error: bool = True,
log_type_warning: bool = True,
) -> str:
logs = []
log_type = []
log_type_info and log_type.append(LogType.Info.value)
log_type_alert and log_type.append(LogType.Alert.value)
log_type_error and log_type.append(LogType.Error.value)
log_type_warning and log_type.append(LogType.Warning.value)
db = ctx.request_context.lifespan_context["db"]
with db.cursor() as cur:
sql = f"""
select top ? TimeLogged , %External(Type) Type, ConfigName, Text
from Ens_Util.Log
where
{"ConfigName = ?" if item_name else "1=1"}
{f"and type in ({', '.join(['?'] * len(log_type))})" if log_type else ""}
order by id desc
"""
params = [limit, *([item_name] if item_name else []), *log_type]
cur.execute(sql, params)
for row in cur.fetchall():
logs.append(f"{row[0]} {row[1]} {row[2]} {row[3]}")
return "\n".join(logs)
@server.tool(description="Get Interoperability Production queues")
async def interoperability_production_queues(
ctx: Context,
) -> str:
queues = []
db = ctx.request_context.lifespan_context["db"]
with db.cursor() as cur:
sql = "select * from Ens.Queue_Enumerate()"
cur.execute(sql)
rows = cur.fetchall()
queues = [", ".join([f"{cell}" for cell in row]) for row in rows]
return "\n".join(queues)