Skip to main content
Glama
server.py74.1 kB
"""Main MCP Server for Qlik Sense APIs.""" import asyncio import json import sys from typing import Any, Dict, List, Optional from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import ServerCapabilities, Tool from mcp.types import CallToolResult, TextContent from .config import QlikSenseConfig from .repository_api import QlikRepositoryAPI from .engine_api import QlikEngineAPI from .utils import generate_xrfkey import logging import os import requests from urllib3.exceptions import InsecureRequestWarning from dotenv import load_dotenv requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # Initialize logging configuration early load_dotenv() LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() # Configure root logger to stderr (stdout reserved for MCP protocol) _logging_level = getattr(logging, LOG_LEVEL, logging.INFO) if not logging.getLogger().handlers: handler = logging.StreamHandler(stream=sys.stderr) formatter = logging.Formatter( fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) handler.setFormatter(formatter) logging.getLogger().addHandler(handler) logging.getLogger().setLevel(_logging_level) logger = logging.getLogger(__name__) class QlikSenseMCPServer: """MCP Server for Qlik Sense Enterprise APIs.""" def __init__(self): try: self.config = QlikSenseConfig.from_env() self.config_valid = self._validate_config() except Exception as e: self.config = None self.config_valid = False # Initialize API clients safely self.repository_api = None self.engine_api = None if self.config_valid: try: self.repository_api = QlikRepositoryAPI(self.config) self.engine_api = QlikEngineAPI(self.config) except Exception as e: # API clients will be None, tools will return errors logging.getLogger(__name__).warning("Failed to initialize APIs: %s", e) self.server = Server("qlik-sense-mcp-server") self._setup_handlers() def _validate_config(self) -> bool: """Validate that required configuration is present.""" if not self.config: return False return bool( self.config.server_url and self.config.user_directory and self.config.user_id ) def _get_qlik_ticket(self) -> Optional[str]: """Get Qlik Sense ticket for user authentication.""" ticket_url = f"{self.config.server_url}:{self.config.proxy_port}/qps/ticket" ticket_data = { "UserDirectory": self.config.user_directory, "UserId": self.config.user_id, "Attributes": [] } xrfkey = generate_xrfkey() headers = { "Content-Type": "application/json", "X-Qlik-Xrfkey": xrfkey } params = {"xrfkey": xrfkey} cert = (self.config.client_cert_path, self.config.client_key_path) if self.config.client_cert_path and self.config.client_key_path else None verify_ssl = self.config.ca_cert_path if self.config.ca_cert_path else False try: response = requests.post( ticket_url, json=ticket_data, headers=headers, params=params, cert=cert, verify=verify_ssl, timeout=30 ) response.raise_for_status() ticket_response = response.json() ticket = ticket_response.get("Ticket") if not ticket: raise ValueError("Ticket not found in response") return ticket except requests.exceptions.RequestException as e: logger.error(f"Failed to get ticket: {e}") return None def _get_app_metadata_via_proxy(self, app_id: str, ticket: str) -> Dict[str, Any]: """Get application metadata via Qlik Sense Proxy API.""" server_url = self.config.server_url if self.config.http_port: server_url = f"{server_url}:{self.config.http_port}" metadata_url = f"{server_url}/api/v1/apps/{app_id}/data/metadata?qlikTicket={ticket}" xrfkey = generate_xrfkey() headers = { "X-Qlik-Xrfkey": xrfkey } params = {"xrfkey": xrfkey} cert = (self.config.client_cert_path, self.config.client_key_path) if self.config.client_cert_path and self.config.client_key_path else None verify_ssl = self.config.verify_ssl try: response = requests.get( metadata_url, headers=headers, params=params, cert=cert, verify=verify_ssl, timeout=30 ) response.raise_for_status() metadata = response.json() return self._filter_metadata(metadata) except requests.exceptions.RequestException as e: logger.error(f"Failed to get app metadata: {e}") return {"error": str(e)} def _filter_metadata(self, metadata: Dict[str, Any]) -> Dict[str, Any]: """Filter metadata to remove system fields and hidden items.""" # Fields to remove from output fields_to_remove = { 'is_system', 'is_hidden', 'is_semantic', 'distinct_only', 'is_locked', 'always_one_selected', 'is_numeric', 'hash', 'tags', 'has_section_access', 'tables_profiling_data', 'is_direct_query_mode', 'usage', 'reload_meta', 'static_byte_size', 'byte_size', 'no_of_key_fields' } # Qlik Sense reserved fields to remove qlik_reserved_fields = {'$Field', '$Table', '$Rows', '$Fields', '$FieldNo', '$Info'} def filter_object(obj): """Recursively filter object.""" if isinstance(obj, dict): filtered = {} for key, value in obj.items(): # Skip fields to remove if key in fields_to_remove: continue # Skip fields with is_system: true or is_hidden: true if isinstance(value, dict): if value.get('is_system') or value.get('is_hidden'): continue # Replace cardinal with unique_count if key == 'cardinal': filtered['unique_count'] = value continue # Recursively filter nested objects filtered[key] = filter_object(value) return filtered elif isinstance(obj, list): # Filter field arrays, removing reserved Qlik fields if obj and isinstance(obj[0], dict) and 'name' in obj[0]: # This is a fields array return [filter_object(item) for item in obj if item.get('name') not in qlik_reserved_fields] else: return [filter_object(item) for item in obj] else: return obj # Keep only fields and tables filtered = filter_object(metadata) result = {} if 'fields' in filtered: result['fields'] = filtered['fields'] if 'tables' in filtered: result['tables'] = filtered['tables'] return result def _setup_handlers(self): """Setup MCP server handlers.""" @self.server.list_tools() async def handle_list_tools(): """ List all available MCP tools for Qlik Sense operations. Returns tool definitions with schemas for Repository API and Engine API operations including applications, analytics tools, and data export. """ tools_list = [ Tool( name="get_apps", description="Get list of Qlik Sense applications with essential fields and filters (name, stream, published) and pagination.", inputSchema={ "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of apps to return (default: 25, max: 50)", "default": 25 }, "offset": { "type": "integer", "description": "Number of apps to skip for pagination (default: 0)", "default": 0 }, "name": { "type": "string", "description": "Wildcard case-insensitive search in application name" }, "stream": { "type": "string", "description": "Wildcard case-insensitive search in stream name" }, "published": { "type": "string", "description": "Filter by published status (true/false or 1/0). Default: true", "default": "true" } } } ), Tool( name="get_app_details", description="Get compact application info with filters by guid or name (case-insensitive). Returns metainfo, tables/fields list, master items, sheets and objects with used fields.", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application GUID (preferred if known)"}, "name": {"type": "string", "description": "Case-insensitive fuzzy search by app name"} }, "oneOf": [ {"required": ["app_id"]}, {"required": ["name"]} ] } ), Tool(name="get_app_script", description="Get load script from app", inputSchema={"type": "object", "properties": {"app_id": {"type": "string", "description": "Application ID"}}, "required": ["app_id"]}), Tool(name="get_app_field_statistics", description="Get comprehensive statistics for a field", inputSchema={"type": "object", "properties": {"app_id": {"type": "string", "description": "Application ID"}, "field_name": {"type": "string", "description": "Field name"}}, "required": ["app_id", "field_name"]}), Tool(name="engine_create_hypercube", description="Create hypercube for data analysis with custom sorting options. IMPORTANT: To get top-N records, use qSortByExpression: 1 in dimension sorting with qExpression containing the measure formula (e.g., 'Count(field)' for ascending, '-Count(field)' for descending). Measure sorting is ignored by Qlik Engine.", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application ID"}, "dimensions": { "type": "array", "items": { "type": "object", "properties": { "field": {"type": "string", "description": "Field name for dimension"}, "label": {"type": "string", "description": "Optional label for dimension"}, "sort_by": { "type": "object", "properties": { "qSortByNumeric": {"type": "integer", "description": "Sort by numeric value (-1 desc, 0 none, 1 asc)", "default": 0}, "qSortByAscii": {"type": "integer", "description": "Sort by ASCII value (-1 desc, 0 none, 1 asc)", "default": 1}, "qSortByExpression": {"type": "integer", "description": "Use expression for sorting (0/1). For top-N results, set to 1 and use qExpression with measure formula", "default": 0}, "qExpression": {"type": "string", "description": "Expression for custom sorting. For top-N: 'Count(field)' for ascending, '-Count(field)' for descending", "default": ""} }, "additionalProperties": False } }, "additionalProperties": False }, "description": "List of dimension definitions with optional sorting" }, "measures": { "type": "array", "items": { "type": "object", "properties": { "expression": {"type": "string", "description": "Measure expression"}, "label": {"type": "string", "description": "Optional label for measure"}, "sort_by": { "type": "object", "properties": { "qSortByNumeric": {"type": "integer", "description": "Sort by numeric value (-1 desc, 0 none, 1 asc). NOTE: Measure sorting is ignored by Qlik Engine - use dimension sorting with qSortByExpression for top-N results", "default": -1} }, "additionalProperties": False } }, "additionalProperties": False }, "description": "List of measure definitions with optional sorting" }, "max_rows": {"type": "integer", "description": "Maximum rows to return", "default": 1000} }, "required": ["app_id"] }) , Tool( name="get_app_field", description="Return values of a single field from app with pagination and wildcard search (supports * and %).", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application GUID"}, "field_name": {"type": "string", "description": "Field name"}, "limit": {"type": "integer", "description": "Max values to return (default: 10, max: 100)", "default": 10}, "offset": {"type": "integer", "description": "Offset for pagination (default: 0)", "default": 0}, "search_string": {"type": "string", "description": "Wildcard text search mask (* and % supported), case-insensitive by default"}, "search_number": {"type": "string", "description": "Wildcard numeric search mask (* and % supported)"}, "case_sensitive": {"type": "boolean", "description": "Case sensitive matching for search_string", "default": False} }, "required": ["app_id", "field_name"], } ), Tool( name="get_app_variables", description="Return variables split by source (script/ui) with pagination and wildcard search.", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application GUID"}, "limit": {"type": "integer", "description": "Max variables to return (default: 10, max: 100)", "default": 10}, "offset": {"type": "integer", "description": "Offset for pagination (default: 0)", "default": 0}, "created_in_script": {"type": "string", "description": "Return only variables created in script (true/false). If omitted, return both"}, "search_string": {"type": "string", "description": "Wildcard search by variable name or text value (* and % supported), case-insensitive by default"}, "search_number": {"type": "string", "description": "Wildcard search among numeric variable values (* and % supported)"}, "case_sensitive": {"type": "boolean", "description": "Case sensitive matching for search_string", "default": False} }, "required": ["app_id"], } ), Tool( name="get_app_sheets", description="Get list of sheets from application with title and description.", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application GUID"} }, "required": ["app_id"] } ), Tool( name="get_app_sheet_objects", description="Get list of objects from specific sheet with object ID, type and description.", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application GUID"}, "sheet_id": {"type": "string", "description": "Sheet GUID"} }, "required": ["app_id", "sheet_id"] } ), Tool( name="get_app_object", description="Get specific object layout by calling GetObject and GetLayout sequentially via WebSocket.", inputSchema={ "type": "object", "properties": { "app_id": {"type": "string", "description": "Application GUID"}, "object_id": {"type": "string", "description": "Object ID to retrieve"} }, "required": ["app_id", "object_id"] } ) ] return tools_list @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]): # Check configuration before processing any tool calls if not self.config_valid: error_msg = { "error": "Qlik Sense configuration missing", "message": "Please set the following environment variables:", "required": [ "QLIK_SERVER_URL - Qlik Sense server URL", "QLIK_USER_DIRECTORY - User directory", "QLIK_USER_ID - User ID", "QLIK_CLIENT_CERT_PATH - Path to client certificate", "QLIK_CLIENT_KEY_PATH - Path to client key", "QLIK_CA_CERT_PATH - Path to CA certificate" ], "example": "uvx --with-env QLIK_SERVER_URL=https://qlik.company.com qlik-sense-mcp-server" } return [TextContent(type="text", text=json.dumps(error_msg, indent=2))] """ Handle MCP tool calls by routing to appropriate API handlers. Args: name: Tool name to execute arguments: Tool-specific parameters Returns: TextContent with JSON response from Qlik Sense APIs """ try: if name == "get_apps": limit = arguments.get("limit", 25) offset = arguments.get("offset", 0) name_filter = arguments.get("name") stream_filter = arguments.get("stream") published_arg = arguments.get("published", True) if limit is None or limit < 1: limit = 25 if limit > 50: limit = 50 def _to_bool(value: Any, default: bool = True) -> bool: if isinstance(value, bool): return value if isinstance(value, int): return value != 0 if isinstance(value, str): v = value.strip().lower() if v in ("true", "1", "yes", "y"): return True if v in ("false", "0", "no", "n"): return False return default published_bool = _to_bool(published_arg, True) apps_payload = await asyncio.to_thread( self.repository_api.get_comprehensive_apps, limit, offset, name_filter, stream_filter, published_bool, ) return [ TextContent( type="text", text=json.dumps(apps_payload, indent=2, ensure_ascii=False) ) ] elif name == "get_app_details": req_app_id = arguments.get("app_id") req_name = arguments.get("name") def _resolve_app() -> Dict[str, Any]: """Resolve application by ID or name from Repository API.""" try: if req_app_id: app_meta = self.repository_api.get_app_by_id(req_app_id) if isinstance(app_meta, dict) and app_meta.get("id"): return { "app_id": app_meta.get("id"), "name": app_meta.get("name", ""), "description": app_meta.get("description") or "", "stream": (app_meta.get("stream", {}) or {}).get("name", "") if app_meta.get("published") else "", "modified_dttm": app_meta.get("modifiedDate", "") or "", "reload_dttm": app_meta.get("lastReloadTime", "") or "" } return {"error": "App not found by provided app_id"} if req_name: apps_payload = self.repository_api.get_comprehensive_apps(limit=50, offset=0, name=req_name, stream=None, published=None) apps = apps_payload.get("apps", []) if isinstance(apps_payload, dict) else [] if not apps: return {"error": "No apps found by name"} lowered = req_name.lower() exact = [a for a in apps if a.get("name", "").lower() == lowered] selected = exact[0] if exact else apps[0] selected["app_id"] = selected.pop("guid", "") return selected return {"error": "Either app_id or name must be provided"} except Exception as e: return {"error": str(e)} def _get_app_details(): """Get application details with metadata, fields and tables.""" try: # Get app metadata from Repository API resolved = _resolve_app() if "error" in resolved: return resolved app_id = resolved.get("app_id") # Get Qlik ticket for authentication ticket = self._get_qlik_ticket() if not ticket: return {"error": "Failed to obtain Qlik ticket"} # Get fields and tables metadata via Proxy API metadata = self._get_app_metadata_via_proxy(app_id, ticket) if "error" in metadata: return metadata # Build final response result = { "metainfo": { "app_id": app_id, "name": resolved.get("name", ""), "description": resolved.get("description", ""), "stream": resolved.get("stream", ""), "modified_dttm": resolved.get("modified_dttm", ""), "reload_dttm": resolved.get("reload_dttm", "") }, "fields": metadata.get("fields", []), "tables": metadata.get("tables", []) } return result except Exception as e: return {"error": str(e)} details = await asyncio.to_thread(_get_app_details) return [ TextContent(type="text", text=json.dumps(details, indent=2, ensure_ascii=False)) ] elif name == "get_app_script": app_id = arguments["app_id"] def _get_script(): app_handle = -1 try: self.engine_api.connect() app_result = self.engine_api.open_doc_safe(app_id, no_data=True) if "qReturn" not in app_result: raise Exception(f"Failed to open app: invalid response {app_result}") app_handle = app_result["qReturn"].get("qHandle", -1) if app_handle == -1: raise Exception(f"Failed to get app handle: {app_result}") script = self.engine_api.get_script(app_handle) return { "qScript": script, "app_id": app_id, "app_handle": app_handle, "script_length": len(script) if script else 0 } except Exception as e: error_msg = str(e) if "already open" in error_msg.lower(): error_msg = f"App {app_id} is already open in another session. Try again later or use a different session." elif "failed to open app" in error_msg.lower(): error_msg = f"Could not open app {app_id}. Check if app exists and you have access." return { "error": error_msg, "app_id": app_id, "app_handle": app_handle } finally: if app_handle != -1: try: self.engine_api.close_doc(app_handle) except: pass self.engine_api.disconnect() script = await asyncio.to_thread(_get_script) return [ TextContent( type="text", text=json.dumps(script, indent=2, ensure_ascii=False) ) ] elif name == "get_app_field_statistics": app_id = arguments["app_id"] field_name = arguments["field_name"] def _get_field_statistics(): app_handle = -1 debug_info = [] try: debug_info.append(f"Starting field statistics for app_id={app_id}, field_name={field_name}") self.engine_api.connect() debug_info.append("Connected to engine") app_result = self.engine_api.open_doc_safe(app_id, no_data=False) debug_info.append(f"App open result: {app_result}") app_handle = app_result.get("qReturn", {}).get("qHandle", -1) debug_info.append(f"App handle: {app_handle}") if app_handle != -1: result = self.engine_api.get_field_statistics(app_handle, field_name) debug_info.append("Field statistics method completed") if isinstance(result, dict) and "debug_log" not in result: result["server_debug"] = debug_info return result else: raise Exception(f"Failed to open app: {app_result}") except Exception as e: import traceback debug_info.append(f"Exception in server handler: {e}") debug_info.append(f"Traceback: {traceback.format_exc()}") return { "error": str(e), "server_debug": debug_info, "traceback": traceback.format_exc() } finally: debug_info.append("Disconnecting from engine") self.engine_api.disconnect() result = await asyncio.to_thread(_get_field_statistics) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "engine_create_hypercube": app_id = arguments["app_id"] dimensions = arguments.get("dimensions", []) measures = arguments.get("measures", []) max_rows = arguments.get("max_rows", 1000) def _create_hypercube(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.create_hypercube(app_handle, dimensions, measures, max_rows) else: raise Exception("Failed to open app") except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_create_hypercube) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "get_app_reload_chain": app_id = arguments["app_id"] def _get_reload_chain(): tasks = self.repository_api.get_reload_tasks_for_app(app_id) chain_info = { "app_id": app_id, "reload_tasks": [], "execution_history": [] } for task in tasks: task_id = task.get("id") if task_id: executions = self.repository_api.get_task_executions(task_id, 10) chain_info["reload_tasks"].append(task) chain_info["execution_history"].extend(executions) return chain_info chain = await asyncio.to_thread(_get_reload_chain) return [ TextContent( type="text", text=json.dumps(chain, indent=2, ensure_ascii=False) ) ] elif name == "get_app_objects_detailed": app_id = arguments["app_id"] object_type = arguments.get("object_type") objects = await asyncio.to_thread(self.repository_api.get_app_objects, app_id, object_type) return [ TextContent( type="text", text=json.dumps(objects, indent=2, ensure_ascii=False) ) ] elif name == "get_app_field": app_id = arguments["app_id"] field_name = arguments["field_name"] limit = arguments.get("limit", 10) offset = arguments.get("offset", 0) search_string = arguments.get("search_string") search_number = arguments.get("search_number") case_sensitive = arguments.get("case_sensitive", False) if limit is None or limit < 1: limit = 10 if limit > 100: limit = 100 if offset is None or offset < 0: offset = 0 def _wildcard_to_regex(pattern: str, case_sensitive_flag: bool) -> Any: import re escaped = re.escape(pattern).replace("\\*", ".*").replace("%", ".*") regex = f"^{escaped}$" return re.compile(regex, 0 if case_sensitive_flag else re.IGNORECASE) def _get_values(): try: self.engine_api.connect() app_result = self.engine_api.open_doc_safe(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle == -1: return {"error": "Failed to open app"} fetch_size = max(limit + offset, 500) if fetch_size > 5000: fetch_size = 5000 field_data = self.engine_api.get_field_values(app_handle, field_name, fetch_size, include_frequency=False) values = [v.get("value", "") for v in field_data.get("values", [])] if search_string: rx = _wildcard_to_regex(search_string, case_sensitive) values = [val for val in values if isinstance(val, str) and rx.match(val)] if search_number: rxn = _wildcard_to_regex(search_number, case_sensitive) filtered = [] for idx, vobj in enumerate(field_data.get("values", [])): cell_text = vobj.get("value", "") qnum = vobj.get("numeric_value", None) if qnum is not None: if rxn.match(str(qnum)) or rxn.match(str(cell_text)): filtered.append(cell_text) values = filtered sliced = values[offset:offset + limit] return {"field_values": sliced} except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_values) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] elif name == "get_app_variables": app_id = arguments["app_id"] limit = arguments.get("limit", 10) offset = arguments.get("offset", 0) created_in_script_arg = arguments.get("created_in_script", None) search_string = arguments.get("search_string") search_number = arguments.get("search_number") case_sensitive = arguments.get("case_sensitive", False) if limit is None or limit < 1: limit = 10 if limit > 100: limit = 100 if offset is None or offset < 0: offset = 0 def _to_bool(value: Any, default: Optional[bool] = None) -> Optional[bool]: if value is None: return default if isinstance(value, bool): return value if isinstance(value, int): return value != 0 if isinstance(value, str): v = value.strip().lower() if v in ("true", "1", "yes", "y"): return True if v in ("false", "0", "no", "n"): return False return default created_in_script = _to_bool(created_in_script_arg, None) def _wildcard_to_regex(pattern: str, case_sensitive_flag: bool): import re escaped = re.escape(pattern).replace("\\*", ".*").replace("%", ".*") regex = f"^{escaped}$" return re.compile(regex, 0 if case_sensitive_flag else re.IGNORECASE) def _get_variables(): try: self.engine_api.connect() app_result = self.engine_api.open_doc_safe(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle == -1: return {"error": "Failed to open app"} # Use the alternative method for getting variables var_list = self.engine_api._get_user_variables(app_handle) or [] prepared = [] for v in var_list: name = v.get("name", "") text_val = v.get("text_value", "") is_script = v.get("is_script_created", False) prepared.append({ "name": name, "text_value": text_val if text_val is not None else "", "is_script": is_script }) if created_in_script is True: prepared = [x for x in prepared if x["is_script"]] elif created_in_script is False: prepared = [x for x in prepared if not x["is_script"]] else: # По умолчанию показываем только переменные из UI prepared = [x for x in prepared if not x["is_script"]] if search_string: rx = _wildcard_to_regex(search_string, case_sensitive) prepared = [x for x in prepared if rx.match(x["name"]) or rx.match(x.get("text_value", ""))] from_script = [x for x in prepared if x["is_script"]] from_ui = [x for x in prepared if not x["is_script"]] def _slice_and_map(items): sliced = items[offset:offset + limit] result_map = {} for it in sliced: val = it.get("text_value", "") result_map[it["name"]] = val return result_map res_script = _slice_and_map(from_script) res_ui = _slice_and_map(from_ui) return { "variables_from_script": res_script if res_script else "", "variables_from_ui": res_ui if res_ui else "" } except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_variables) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] elif name == "get_app_sheets": app_id = arguments["app_id"] def _get_app_sheets(): try: self.engine_api.connect() app_result = self.engine_api.open_doc_safe(app_id, no_data=True) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle == -1: return {"error": "Failed to open app"} sheets = self.engine_api.get_sheets(app_handle) sheets_list = [] for sheet in sheets: sheet_info = sheet.get("qMeta", {}) sheet_data = sheet.get("qData", {}) sheets_list.append({ "sheet_id": sheet.get("qInfo", {}).get("qId", ""), "title": sheet_info.get("title", ""), "description": sheet_info.get("description", "") }) return { "app_id": app_id, "total_sheets": len(sheets_list), "sheets": sheets_list } except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_app_sheets) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] elif name == "get_app_sheet_objects": app_id = arguments["app_id"] sheet_id = arguments["sheet_id"] def _get_sheet_objects(): try: self.engine_api.connect() app_result = self.engine_api.open_doc_safe(app_id, no_data=True) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle == -1: return {"error": "Failed to open app"} # Get detailed objects from the sheet objects = self.engine_api._get_sheet_objects_detailed(app_handle, sheet_id) or [] # Format objects according to requirements: id объекта, тип объекта, описание объекта formatted_objects = [] for obj in objects: if isinstance(obj, dict): obj_info = { "object_id": obj.get("object_id", ""), "object_type": obj.get("object_type", ""), "object_description": obj.get("object_title", "") } formatted_objects.append(obj_info) return { "app_id": app_id, "sheet_id": sheet_id, "total_objects": len(formatted_objects), "objects": formatted_objects } except Exception as e: return {"error": str(e), "details": f"Error getting objects for sheet {sheet_id} in app {app_id}"} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_sheet_objects) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] elif name == "engine_get_field_info": app_id = arguments["app_id"] field_name = arguments["field_name"] def _get_field_info(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: field_desc = self.engine_api.get_field_description(app_handle, field_name) field_values = self.engine_api.get_field_values(app_handle, field_name, 50) return { "field_description": field_desc, "sample_values": field_values } else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() field_info = await asyncio.to_thread(_get_field_info) return [ TextContent( type="text", text=json.dumps(field_info, indent=2, ensure_ascii=False) ) ] elif name == "engine_extract_data": app_id = arguments["app_id"] dimensions = arguments.get("dimensions", []) measures = arguments.get("measures", []) max_rows = arguments.get("max_rows", 1000) def _extract_data(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: cube_result = self.engine_api.create_hypercube(app_handle, dimensions, measures, max_rows) cube_handle = cube_result.get("qReturn", {}).get("qHandle", -1) if cube_handle != -1: data = self.engine_api.get_hypercube_data(cube_handle, 0, max_rows) return { "dimensions": dimensions, "measures": measures, "data": data } else: raise Exception("Failed to create hypercube") else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() data = await asyncio.to_thread(_extract_data) return [ TextContent( type="text", text=json.dumps(data, indent=2, ensure_ascii=False) ) ] elif name == "engine_get_visualization_data": app_id = arguments["app_id"] object_id = arguments["object_id"] def _get_viz_data(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.get_object_data(app_handle, object_id) else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() viz_data = await asyncio.to_thread(_get_viz_data) return [ TextContent( type="text", text=json.dumps(viz_data, indent=2, ensure_ascii=False) ) ] elif name == "engine_search_and_analyze": app_id = arguments["app_id"] search_terms = arguments["search_terms"] def _search_analyze(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: search_results = self.engine_api.search_objects(app_handle, search_terms) fields = self.engine_api.get_fields(app_handle) matching_fields = [] for field in fields: field_name = field.get("qName", "").lower() for term in search_terms: if term.lower() in field_name: matching_fields.append(field) break return { "search_terms": search_terms, "object_matches": search_results, "field_matches": matching_fields } else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() results = await asyncio.to_thread(_search_analyze) return [ TextContent( type="text", text=json.dumps(results, indent=2, ensure_ascii=False) ) ] elif name == "engine_get_master_items": app_id = arguments["app_id"] def _get_master_items(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: dimensions = self.engine_api.get_dimensions(app_handle) measures = self.engine_api.get_measures(app_handle) variables = self.engine_api.get_variables(app_handle) return { "master_dimensions": dimensions, "master_measures": measures, "variables": variables } else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() master_items = await asyncio.to_thread(_get_master_items) return [ TextContent( type="text", text=json.dumps(master_items, indent=2, ensure_ascii=False) ) ] elif name == "engine_calculate_expression": app_id = arguments["app_id"] expression = arguments["expression"] dimensions = arguments.get("dimensions", []) def _calculate_expression(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.calculate_expression(app_handle, expression, dimensions) else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() result = await asyncio.to_thread(_calculate_expression) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "engine_get_associations": app_id = arguments["app_id"] def _get_associations(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: associations = self.engine_api.get_associations(app_handle) data_model = self.engine_api.get_data_model(app_handle) return { "associations": associations, "data_model": data_model } else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() associations = await asyncio.to_thread(_get_associations) return [ TextContent( type="text", text=json.dumps(associations, indent=2, ensure_ascii=False) ) ] elif name == "engine_smart_search": app_id = arguments["app_id"] search_terms = arguments["search_terms"] def _smart_search(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: suggestions = self.engine_api.search_suggest(app_handle, search_terms) return { "search_terms": search_terms, "suggestions": suggestions } else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() suggestions = await asyncio.to_thread(_smart_search) return [ TextContent( type="text", text=json.dumps(suggestions, indent=2, ensure_ascii=False) ) ] elif name == "engine_create_pivot_analysis": app_id = arguments["app_id"] dimensions = arguments.get("dimensions", []) measures = arguments.get("measures", []) max_rows = arguments.get("max_rows", 1000) def _create_pivot(): self.engine_api.connect(app_id) try: app_result = self.engine_api.open_doc(app_id) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: pivot_result = self.engine_api.get_pivot_table_data(app_handle, dimensions, measures, max_rows) pivot_handle = pivot_result.get("qReturn", {}).get("qHandle", -1) if pivot_handle != -1: layout = self.engine_api.send_request("GetLayout", handle=pivot_handle) return { "dimensions": dimensions, "measures": measures, "pivot_data": layout } else: raise Exception("Failed to create pivot table") else: raise Exception("Failed to open app") finally: self.engine_api.disconnect() pivot_data = await asyncio.to_thread(_create_pivot) return [ TextContent( type="text", text=json.dumps(pivot_data, indent=2, ensure_ascii=False) ) ] elif name == "engine_get_visualization_data": app_id = arguments["app_id"] object_id = arguments["object_id"] def _get_visualization_data(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.get_visualization_data(app_handle, object_id) else: raise Exception("Failed to open app") except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_visualization_data) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "engine_create_simple_table": app_id = arguments["app_id"] dimensions = arguments["dimensions"] measures = arguments.get("measures", []) max_rows = arguments.get("max_rows", 1000) def _create_simple_table(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.create_simple_table(app_handle, dimensions, measures, max_rows) else: raise Exception("Failed to open app") except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_create_simple_table) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "engine_get_chart_data": app_id = arguments["app_id"] chart_type = arguments["chart_type"] dimensions = arguments.get("dimensions", []) measures = arguments.get("measures", []) max_rows = arguments.get("max_rows", 1000) def _get_chart_data(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.get_chart_data(app_handle, chart_type, dimensions, measures, max_rows) else: raise Exception("Failed to open app") except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_chart_data) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "engine_create_hypercube": app_id = arguments["app_id"] dimensions = arguments.get("dimensions", []) measures = arguments.get("measures", []) max_rows = arguments.get("max_rows", 1000) def _create_hypercube(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.create_hypercube(app_handle, dimensions, measures, max_rows) else: raise Exception("Failed to open app") except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_create_hypercube) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "engine_export_visualization_to_csv": app_id = arguments["app_id"] object_id = arguments["object_id"] file_path = arguments.get("file_path", "/tmp/export.csv") def _export_visualization(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle != -1: return self.engine_api.export_visualization_to_csv(app_handle, object_id, file_path) else: raise Exception("Failed to open app") except Exception as e: return {"error": str(e)} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_export_visualization) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] elif name == "get_app_object": app_id = arguments["app_id"] object_id = arguments["object_id"] def _get_app_object(): try: self.engine_api.connect() app_result = self.engine_api.open_doc(app_id, no_data=False) app_handle = app_result.get("qReturn", {}).get("qHandle", -1) if app_handle == -1: return {"error": "Failed to open app"} # Get object obj_result = self.engine_api.send_request("GetObject", {"qId": object_id}, handle=app_handle) if "qReturn" not in obj_result: return {"error": f"Object {object_id} not found"} obj_handle = obj_result["qReturn"]["qHandle"] # Get layout layout_result = self.engine_api.send_request("GetLayout", [], handle=obj_handle) if "qLayout" not in layout_result: return {"error": "Failed to get object layout"} # Return only the result part as requested return layout_result except Exception as e: return {"error": str(e), "app_id": app_id, "object_id": object_id} finally: self.engine_api.disconnect() result = await asyncio.to_thread(_get_app_object) return [ TextContent( type="text", text=json.dumps(result, indent=2, ensure_ascii=False) ) ] else: return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}, indent=2, ensure_ascii=False))] except Exception as e: return [TextContent(type="text", text=json.dumps({"error": str(e)}, indent=2, ensure_ascii=False))] async def run(self): """ Start the MCP server with stdio transport. Initializes server capabilities and begins listening for MCP protocol messages over stdin/stdout for communication with MCP clients. """ async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="qlik-sense-mcp-server", server_version="1.3.4", capabilities=ServerCapabilities( tools={} ), ), ) async def async_main(): """ Async main entry point for the Qlik Sense MCP Server. Creates and starts the MCP server instance with configured Qlik Sense Repository and Engine API connections. """ server = QlikSenseMCPServer() await server.run() def main(): """ Synchronous entry point for CLI usage. This function is used as the entry point in pyproject.toml for the qlik-sense-mcp-server command. """ # Handle command line arguments if len(sys.argv) > 1: if sys.argv[1] in ["--help", "-h"]: print_help() return elif sys.argv[1] in ["--version", "-v"]: sys.stderr.write("qlik-sense-mcp-server 1.3.4\n") sys.stderr.flush() return asyncio.run(async_main()) def print_help(): """Print help information using logging instead of print.""" help_text = """ Qlik Sense MCP Server - Model Context Protocol server for Qlik Sense Enterprise APIs USAGE: qlik-sense-mcp-server [OPTIONS] uvx qlik-sense-mcp-server [OPTIONS] OPTIONS: -h, --help Show this help message -v, --version Show version information CONFIGURATION: Set these environment variables before running: QLIK_SERVER_URL - Qlik Sense server URL (required) Example: https://qlik.company.com QLIK_USER_DIRECTORY - User directory (required) Example: COMPANY QLIK_USER_ID - User ID (required) Example: your-username QLIK_CLIENT_CERT_PATH - Path to client certificate (required) Example: /path/to/certs/client.pem QLIK_CLIENT_KEY_PATH - Path to client key (required) Example: /path/to/certs/client_key.pem QLIK_CA_CERT_PATH - Path to CA certificate (required) Example: /path/to/certs/root.pem QLIK_REPOSITORY_PORT - Repository API port (optional, default: 4242) QLIK_ENGINE_PORT - Engine API port (optional, default: 4747) QLIK_HTTP_PORT - HTTP API port for metadata requests (optional) QLIK_VERIFY_SSL - Verify SSL certificates (optional, default: true) EXAMPLES: # Using uvx with environment variables uvx --with-env QLIK_SERVER_URL=https://qlik.company.com \ --with-env QLIK_USER_DIRECTORY=COMPANY \ --with-env QLIK_USER_ID=username \ --with-env QLIK_CLIENT_CERT_PATH=/path/to/client.pem \ --with-env QLIK_CLIENT_KEY_PATH=/path/to/client_key.pem \ --with-env QLIK_CA_CERT_PATH=/path/to/root.pem \ qlik-sense-mcp-server # Using environment file export QLIK_SERVER_URL=https://qlik.company.com export QLIK_USER_DIRECTORY=COMPANY export QLIK_USER_ID=username qlik-sense-mcp-server AVAILABLE TOOLS: Repository API: get_apps, get_app_details Engine API: get_app_sheets, get_app_sheet_objects, get_app_script, get_app_field, get_app_variables, get_app_field_statistics, engine_create_hypercube, get_app_object Total: 10 tools for Qlik Sense analytics operations MORE INFO: GitHub: https://github.com/bintocher/qlik-sense-mcp PyPI: https://pypi.org/project/qlik-sense-mcp-server/ """ # Use stderr for help output to avoid mixing with MCP protocol output import sys sys.stderr.write(help_text + "\n") sys.stderr.flush() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bintocher/qlik-sense-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server