Skip to main content
Glama
engine_api.py95.4 kB
"""Qlik Sense Engine API client.""" import json import websocket import ssl from typing import Dict, List, Any, Optional, Union from datetime import datetime from .config import QlikSenseConfig import logging import os logger = logging.getLogger(__name__) class QlikEngineAPI: """Client for Qlik Sense Engine API using WebSocket.""" def __init__(self, config: QlikSenseConfig): self.config = config self.ws = None self.request_id = 0 # Timeouts / retries from env ws_timeout_env = os.getenv("QLIK_WS_TIMEOUT") try: self.ws_timeout_seconds = float(ws_timeout_env) if ws_timeout_env else 8.0 except ValueError: self.ws_timeout_seconds = 8.0 retries_env = os.getenv("QLIK_WS_RETRIES") try: self.ws_retries = int(retries_env) if retries_env else 2 except ValueError: self.ws_retries = 2 def _get_next_request_id(self) -> int: """Get next request ID.""" self.request_id += 1 return self.request_id def connect(self, app_id: Optional[str] = None) -> None: """Connect to Engine API via WebSocket.""" # Try different WebSocket endpoints server_host = self.config.server_url.replace("https://", "").replace( "http://", "" ) # Order and count of endpoints controlled by retries setting endpoints_all = [ f"wss://{server_host}:{self.config.engine_port}/app/engineData", f"wss://{server_host}:{self.config.engine_port}/app", f"ws://{server_host}:{self.config.engine_port}/app/engineData", f"ws://{server_host}:{self.config.engine_port}/app", ] endpoints_to_try = endpoints_all[: max(1, min(self.ws_retries, len(endpoints_all)))] # Setup SSL context ssl_context = ssl.create_default_context() if not self.config.verify_ssl: ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE if self.config.client_cert_path and self.config.client_key_path: ssl_context.load_cert_chain( self.config.client_cert_path, self.config.client_key_path ) if self.config.ca_cert_path: ssl_context.load_verify_locations(self.config.ca_cert_path) # Headers for authentication headers = [ f"X-Qlik-User: UserDirectory={self.config.user_directory}; UserId={self.config.user_id}" ] last_error = None for url in endpoints_to_try: try: if url.startswith("wss://"): self.ws = websocket.create_connection( url, sslopt={"context": ssl_context}, header=headers, timeout=self.ws_timeout_seconds ) else: self.ws = websocket.create_connection( url, header=headers, timeout=self.ws_timeout_seconds ) # initial recv to establish session self.ws.recv() return # Success except Exception as e: last_error = e if self.ws: try: self.ws.close() except Exception: pass self.ws = None continue raise ConnectionError( f"Failed to connect to Engine API. Last error: {str(last_error)}" ) def disconnect(self) -> None: """Disconnect from Engine API.""" if self.ws: self.ws.close() self.ws = None def send_request( self, method: str, params: List[Any] = None, handle: int = -1 ) -> Dict[str, Any]: """ Send JSON-RPC 2.0 request to Qlik Engine API and return response. Args: method: Engine API method name params: Method parameters list handle: Object handle for scoped operations (-1 for global) Returns: Response dictionary from Engine API """ if not self.ws: raise ConnectionError("Not connected to Engine API") request = { "jsonrpc": "2.0", "id": self._get_next_request_id(), "handle": handle, "method": method, "params": params or [], } self.ws.send(json.dumps(request)) while True: data = self.ws.recv() if "result" in data or "error" in data: break response = json.loads(data) if "error" in response: raise Exception(f"Engine API error: {response['error']}") return response.get("result", {}) def get_doc_list(self) -> List[Dict[str, Any]]: """Get list of available documents.""" try: # Connect to global engine first result = self.send_request("GetDocList") doc_list = result.get("qDocList", []) # Ensure we return a list even if empty if isinstance(doc_list, list): return doc_list else: return [] except Exception as e: # Return empty list on error for compatibility return [] def open_doc(self, app_id: str, no_data: bool = True) -> Dict[str, Any]: """ Open Qlik Sense application document. Args: app_id: Application ID to open no_data: If True, open without loading data (faster for metadata operations) Returns: Response with document handle """ try: if no_data: return self.send_request("OpenDoc", [app_id, "", "", "", True]) else: return self.send_request("OpenDoc", [app_id]) except Exception as e: # If app is already open, try to get existing handle if "already open" in str(e).lower(): try: # Try to get the already open document doc_list = self.get_doc_list() for doc in doc_list: if doc.get("qDocId") == app_id: # Return mock response with existing handle return { "qReturn": { "qHandle": doc.get("qHandle", -1), "qGenericId": app_id } } except: pass raise e def close_doc(self, app_handle: int) -> bool: """Close application document.""" try: result = self.send_request("CloseDoc", [], handle=app_handle) return result.get("qReturn", {}).get("qSuccess", False) except Exception: return False def get_active_doc(self) -> Dict[str, Any]: """Get currently active document if any.""" try: result = self.send_request("GetActiveDoc") return result except Exception: return {} def open_doc_safe(self, app_id: str, no_data: bool = True) -> Dict[str, Any]: """ Safely open document with better error handling for already open apps. Args: app_id: Application ID to open no_data: If True, open without loading data Returns: Response with document handle """ try: # First try to open normally if no_data: return self.send_request("OpenDoc", [app_id, "", "", "", True]) else: return self.send_request("OpenDoc", [app_id]) except Exception as e: error_msg = str(e) # Handle "already open" errors specially if "already open" in error_msg.lower() or "app already open" in error_msg.lower(): try: # Try to get active document active_doc = self.get_active_doc() if active_doc and "qReturn" in active_doc: return active_doc # Try to find in document list doc_list = self.get_doc_list() for doc in doc_list: if doc.get("qDocId") == app_id or doc.get("qDocName") == app_id: return { "qReturn": { "qHandle": doc.get("qHandle", -1), "qGenericId": app_id } } # If still not found, re-raise original error raise e except Exception: # If all recovery attempts fail, re-raise original error raise e else: # For other errors, just re-raise raise e def get_app_properties(self, app_handle: int) -> Dict[str, Any]: """Get app properties.""" return self.send_request("GetAppProperties", handle=app_handle) def get_script(self, app_handle: int) -> str: """Get load script.""" result = self.send_request("GetScript", [], handle=app_handle) return result.get("qScript", "") def set_script(self, app_handle: int, script: str) -> bool: """Set load script.""" result = self.send_request("SetScript", [script], handle=app_handle) return result.get("qReturn", {}).get("qSuccess", False) def do_save(self, app_handle: int, file_name: Optional[str] = None) -> bool: """Save app.""" params = {} if file_name: params["qFileName"] = file_name result = self.send_request("DoSave", params, handle=app_handle) return result.get("qReturn", {}).get("qSuccess", False) def get_objects( self, app_handle: int, object_type: Optional[str] = None ) -> List[Dict[str, Any]]: """Get app objects.""" # Build parameters based on whether specific object_type is requested if object_type: # Get specific object type params = { "qOptions": { "qTypes": [object_type], "qIncludeSessionObjects": True, "qData": {}, } } else: # Get ALL objects - don't specify qTypes to get everything including extensions params = { "qOptions": { "qIncludeSessionObjects": True, "qData": {}, } } # Debug logging logger.debug(f"get_objects params: {params}") result = self.send_request("GetObjects", params, handle=app_handle) # Debug result if "error" in str(result) or "Missing Types" in str(result): logger.debug(f"get_objects error result: {result}") return result.get("qList", {}).get("qItems", []) def get_sheets(self, app_handle: int) -> List[Dict[str, Any]]: """Get app sheets.""" try: sheet_list_def = { "qInfo": {"qType": "SheetList"}, "qAppObjectListDef": { "qType": "sheet", "qData": { "title": "/qMetaDef/title", "description": "/qMetaDef/description", "thumbnail": "/thumbnail", "cells": "/cells", "rank": "/rank", "columns": "/columns", "rows": "/rows" } } } create_result = self.send_request("CreateSessionObject", [sheet_list_def], handle=app_handle) if "qReturn" not in create_result or "qHandle" not in create_result["qReturn"]: logger.warning(f"Failed to create SheetList object: {create_result}") return [] sheet_list_handle = create_result["qReturn"]["qHandle"] layout_result = self.send_request("GetLayout", [], handle=sheet_list_handle) if "qLayout" not in layout_result or "qAppObjectList" not in layout_result["qLayout"]: logger.warning(f"No sheet list in layout: {layout_result}") return [] sheets = layout_result["qLayout"]["qAppObjectList"]["qItems"] logger.info(f"Found {len(sheets)} sheets") return sheets except Exception as e: logger.error(f"get_sheets exception: {str(e)}") return [] def get_sheet_objects(self, app_handle: int, sheet_id: str) -> List[Dict[str, Any]]: """Get objects on a specific sheet.""" try: # First get the sheet object sheet_params = {"qId": sheet_id} sheet_result = self.send_request( "GetObject", sheet_params, handle=app_handle ) if not sheet_result or "qReturn" not in sheet_result: return {"error": "Could not get sheet object", "sheet_id": sheet_id} sheet_handle = sheet_result["qReturn"]["qHandle"] # Get sheet layout to find child objects layout_result = self.send_request("GetLayout", {}, handle=sheet_handle) if not layout_result or "qLayout" not in layout_result: return {"error": "Could not get sheet layout", "sheet_id": sheet_id} # Extract child objects from layout layout = layout_result["qLayout"] child_objects = [] # Look for cells or children in the layout if "qChildList" in layout: child_objects = layout["qChildList"]["qItems"] elif "cells" in layout: child_objects = layout["cells"] elif "qChildren" in layout: child_objects = layout["qChildren"] return child_objects except Exception as e: return { "error": str(e), "details": f"Error getting objects for sheet {sheet_id}", } def get_sheets_with_objects(self, app_id: str) -> Dict[str, Any]: """Get sheets and their objects with detailed field usage analysis.""" try: self.connect() # Open the app app_result = self.open_doc(app_id, no_data=False) if "qReturn" not in app_result or "qHandle" not in app_result["qReturn"]: return {"error": "Failed to open app", "response": app_result} app_handle = app_result["qReturn"]["qHandle"] # Get sheets using correct API sequence sheets = self.get_sheets(app_handle) logger.debug(f"get_sheets returned {len(sheets)} sheets") if not sheets: return { "sheets": [], "total_sheets": 0, "field_usage": {}, "debug_info": { "sheets_from_api": 0, "error_reason": "get_sheets returned empty list" } } detailed_sheets = [] field_usage_map = {} for sheet in sheets: if not isinstance(sheet, dict) or "qInfo" not in sheet: continue sheet_id = sheet["qInfo"]["qId"] sheet_title = sheet.get("qMeta", {}).get("title", "") logger.info(f"Processing sheet {sheet_id}: {sheet_title}") sheet_objects = self._get_sheet_objects_detailed(app_handle, sheet_id) for obj in sheet_objects: if isinstance(obj, dict) and "fields_used" in obj: for field_name in obj["fields_used"]: if field_name not in field_usage_map: field_usage_map[field_name] = {"objects": [], "sheets": []} field_usage_map[field_name]["objects"].append({ "object_id": obj.get("object_id", ""), "object_type": obj.get("object_type", ""), "object_title": obj.get("object_title", ""), "sheet_id": sheet_id, "sheet_title": sheet_title }) sheet_already_added = any( s["sheet_id"] == sheet_id for s in field_usage_map[field_name]["sheets"] ) if not sheet_already_added: field_usage_map[field_name]["sheets"].append({ "sheet_id": sheet_id, "sheet_title": sheet_title }) sheet_info = { "sheet_info": sheet, "objects": sheet_objects, "objects_count": len(sheet_objects) } detailed_sheets.append(sheet_info) return { "sheets": detailed_sheets, "total_sheets": len(detailed_sheets), "field_usage": field_usage_map, "debug_info": { "sheets_from_api": len(sheets), "processed_sheets": len(detailed_sheets), "fields_with_usage": len([k for k, v in field_usage_map.items() if v["objects"]]) } } except Exception as e: return { "error": str(e), "details": "Error in get_sheets_with_objects method", } def _get_sheet_objects_detailed(self, app_handle: int, sheet_id: str) -> List[Dict[str, Any]]: """Get detailed information about objects on a sheet.""" try: sheet_result = self.send_request("GetObject", {"qId": sheet_id}, handle=app_handle) if "qReturn" not in sheet_result or "qHandle" not in sheet_result["qReturn"]: logger.warning(f"Failed to get sheet object {sheet_id}: {sheet_result}") return [] sheet_handle = sheet_result["qReturn"]["qHandle"] sheet_layout = self.send_request("GetLayout", [], handle=sheet_handle) if "qLayout" not in sheet_layout or "qChildList" not in sheet_layout["qLayout"]: logger.warning(f"No child objects in sheet {sheet_id}") return [] child_objects = sheet_layout["qLayout"]["qChildList"]["qItems"] detailed_objects = [] for child_obj in child_objects: obj_id = child_obj.get("qInfo", {}).get("qId", "") obj_type = child_obj.get("qInfo", {}).get("qType", "") if not obj_id: continue try: obj_result = self.send_request("GetObject", {"qId": obj_id}, handle=app_handle) if "qReturn" not in obj_result or "qHandle" not in obj_result["qReturn"]: continue obj_handle = obj_result["qReturn"]["qHandle"] obj_layout = self.send_request("GetLayout", [], handle=obj_handle) if "qLayout" not in obj_layout: continue fields_used = self._extract_fields_from_object(obj_layout["qLayout"]) detailed_obj = { "object_id": obj_id, "object_type": obj_type, "object_title": obj_layout["qLayout"].get("title", ""), "object_subtitle": obj_layout["qLayout"].get("subtitle", ""), "fields_used": fields_used, "basic_info": child_obj, "detailed_layout": obj_layout["qLayout"] } detailed_objects.append(detailed_obj) logger.info(f"Processed object {obj_id} ({obj_type}) with {len(fields_used)} fields") except Exception as obj_error: logger.warning(f"Error processing object {obj_id}: {obj_error}") continue return detailed_objects except Exception as e: logger.error(f"_get_sheet_objects_detailed error: {str(e)}") return [] def _extract_fields_from_object(self, obj_layout: Dict[str, Any]) -> List[str]: """Extract field names used in an object layout.""" fields = set() try: if "qHyperCube" in obj_layout: hypercube = obj_layout["qHyperCube"] for dim_info in hypercube.get("qDimensionInfo", []): field_defs = dim_info.get("qGroupFieldDefs", []) for field_def in field_defs: field_name = self._extract_field_name_from_expression(field_def) if field_name: fields.add(field_name) for measure_info in hypercube.get("qMeasureInfo", []): measure_def = measure_info.get("qDef", "") extracted_fields = self._extract_fields_from_expression(measure_def) fields.update(extracted_fields) if "qListObject" in obj_layout: list_obj = obj_layout["qListObject"] for dim_info in list_obj.get("qDimensionInfo", []): field_defs = dim_info.get("qGroupFieldDefs", []) for field_def in field_defs: field_name = self._extract_field_name_from_expression(field_def) if field_name: fields.add(field_name) if "qChildList" in obj_layout: for child in obj_layout["qChildList"].get("qItems", []): pass except Exception as e: logger.warning(f"Error extracting fields from object: {e}") return list(fields) def _extract_field_name_from_expression(self, expression: str) -> Optional[str]: """Extract field name from a simple field expression.""" if not expression: return None expression = expression.strip() if expression.startswith('[') and expression.endswith(']') and expression.count('[') == 1: return expression[1:-1] if ' ' not in expression and '(' not in expression and not any(op in expression for op in ['=', '+', '-', '*', '/']): return expression return None def _extract_fields_from_expression(self, expression: str) -> List[str]: """Extract field names from a complex expression.""" import re fields = [] if not expression: return fields bracket_fields = re.findall(r'\[([^\]]+)\]', expression) fields.extend(bracket_fields) return list(set(fields)) def get_fields(self, app_handle: int) -> List[Dict[str, Any]]: """Get app fields using GetTablesAndKeys method.""" try: # Use correct GetTablesAndKeys method as in qsea.py result = self.send_request( "GetTablesAndKeys", [ {"qcx": 1000, "qcy": 1000}, # Max dimensions {"qcx": 0, "qcy": 0}, # Min dimensions 30, # Max tables True, # Include system tables False, # Include hidden fields ], handle=app_handle, ) fields_info = [] if "qtr" in result: for table in result["qtr"]: table_name = table.get("qName", "Unknown") if "qFields" in table: for field in table["qFields"]: field_info = { "field_name": field.get("qName", ""), "table_name": table_name, "data_type": field.get("qType", ""), "is_key": field.get("qIsKey", False), "is_system": field.get("qIsSystem", False), "is_hidden": field.get("qIsHidden", False), "is_semantic": field.get("qIsSemantic", False), "distinct_values": field.get( "qnTotalDistinctValues", 0 ), "present_distinct_values": field.get( "qnPresentDistinctValues", 0 ), "rows_count": field.get("qnRows", 0), "subset_ratio": field.get("qSubsetRatio", 0), "key_type": field.get("qKeyType", ""), "tags": field.get("qTags", []), } fields_info.append(field_info) return { "fields": fields_info, "tables_count": len(result.get("qtr", [])), "total_fields": len(fields_info), } except Exception as e: return {"error": str(e), "details": "Error in get_fields method"} def get_tables(self, app_handle: int) -> List[Dict[str, Any]]: """Get app tables.""" result = self.send_request("GetTablesList", handle=app_handle) return result.get("qtr", []) def create_session_object( self, app_handle: int, obj_def: Dict[str, Any] ) -> Dict[str, Any]: """Create session object.""" return self.send_request( "CreateSessionObject", {"qProp": obj_def}, handle=app_handle ) def get_object(self, app_handle: int, object_id: str) -> Dict[str, Any]: """Get object by ID.""" return self.send_request("GetObject", {"qId": object_id}, handle=app_handle) def evaluate_expression(self, app_handle: int, expression: str) -> Any: """Evaluate expression.""" result = self.send_request( "Evaluate", {"qExpression": expression}, handle=app_handle ) return result.get("qReturn", {}) def select_in_field( self, app_handle: int, field_name: str, values: List[str], toggle: bool = False ) -> bool: """Select values in field.""" params = {"qFieldName": field_name, "qValues": values, "qToggleMode": toggle} result = self.send_request("SelectInField", params, handle=app_handle) return result.get("qReturn", False) def clear_selections(self, app_handle: int, locked_also: bool = False) -> bool: """Clear all selections.""" params = {"qLockedAlso": locked_also} result = self.send_request("ClearAll", params, handle=app_handle) return result.get("qReturn", False) def get_current_selections(self, app_handle: int) -> List[Dict[str, Any]]: """Get current selections.""" result = self.send_request("GetCurrentSelections", handle=app_handle) return result.get("qSelections", []) def get_data_model(self, app_handle: int) -> Dict[str, Any]: """Get complete data model with tables and associations.""" try: # Use GetAllInfos to get basic structure information all_infos = self.send_request("GetAllInfos", [], handle=app_handle) # Analyze the objects to understand data structure sheets = [] visualizations = [] measures = [] dimensions = [] for info in all_infos.get("qInfos", []): obj_type = info.get("qType", "") obj_id = info.get("qId", "") if obj_type == "sheet": sheets.append({"id": obj_id, "type": obj_type}) elif obj_type in [ "table", "barchart", "linechart", "piechart", "combochart", "kpi", "listbox", ]: visualizations.append({"id": obj_id, "type": obj_type}) elif obj_type == "measure": measures.append({"id": obj_id, "type": obj_type}) elif obj_type == "dimension": dimensions.append({"id": obj_id, "type": obj_type}) return { "app_structure": { "total_objects": len(all_infos.get("qInfos", [])), "sheets": sheets, "visualizations": visualizations, "measures": measures, "dimensions": dimensions, }, "raw_info": all_infos, } except Exception as e: return {"error": str(e)} def get_field_description(self, app_handle: int, field_name: str) -> Dict[str, Any]: """Get detailed field information including values.""" # Use correct structure as in pyqlikengine params = [{"qFieldName": field_name, "qStateName": "$"}] result = self.send_request("GetField", params, handle=app_handle) return result def create_hypercube( self, app_handle: int, dimensions: List[Dict[str, Any]] = None, measures: List[Dict[str, Any]] = None, max_rows: int = 1000, ) -> Dict[str, Any]: """Create hypercube for data extraction with proper structure.""" try: # Handle empty dimensions/measures if dimensions is None: dimensions = [] if measures is None: measures = [] # Convert old format (list of strings) to new format (list of dicts) for backward compatibility converted_dimensions = [] for dim in dimensions: if isinstance(dim, str): # Old format - just field name converted_dimensions.append({ "field": dim, "sort_by": { "qSortByNumeric": 0, "qSortByAscii": 1, # Default: ASCII ascending "qSortByExpression": 0, "qExpression": "" } }) else: # New format - dict with field and sort options # Set defaults if not specified if "sort_by" not in dim: dim["sort_by"] = { "qSortByNumeric": 0, "qSortByAscii": 1, # Default: ASCII ascending "qSortByExpression": 0, "qExpression": "" } converted_dimensions.append(dim) converted_measures = [] for measure in measures: if isinstance(measure, str): # Old format - just expression converted_measures.append({ "expression": measure, "sort_by": { "qSortByNumeric": -1 # Default: numeric descending } }) else: # New format - dict with expression and sort options # Set defaults if not specified if "sort_by" not in measure: measure["sort_by"] = { "qSortByNumeric": -1 # Default: numeric descending } converted_measures.append(measure) # Create correct hypercube structure hypercube_def = { "qDimensions": [ { "qDef": { "qFieldDefs": [dim["field"]], "qSortCriterias": [ { "qSortByState": 0, "qSortByFrequency": 0, "qSortByNumeric": dim["sort_by"].get("qSortByNumeric", 0), "qSortByAscii": dim["sort_by"].get("qSortByAscii", 1), "qSortByLoadOrder": 0, "qSortByExpression": dim["sort_by"].get("qSortByExpression", 0), "qExpression": {"qv": dim["sort_by"].get("qExpression", "")}, } ], }, "qNullSuppression": False, "qIncludeElemValue": True, } for dim in converted_dimensions ], "qMeasures": [ { "qDef": {"qDef": measure["expression"], "qLabel": measure.get("label", f"Measure_{i}")}, "qSortBy": measure["sort_by"], } for i, measure in enumerate(converted_measures) ], "qInitialDataFetch": [ { "qTop": 0, "qLeft": 0, "qHeight": max_rows, "qWidth": len(converted_dimensions) + len(converted_measures), } ], "qSuppressZero": False, "qSuppressMissing": False, "qMode": "S", "qInterColumnSortOrder": list(range(len(converted_dimensions) + len(converted_measures))), } obj_def = { "qInfo": { "qId": f"hypercube-{len(converted_dimensions)}d-{len(converted_measures)}m", "qType": "HyperCube", }, "qHyperCubeDef": hypercube_def, } result = self.send_request( "CreateSessionObject", [obj_def], handle=app_handle ) if "qReturn" not in result or "qHandle" not in result["qReturn"]: return {"error": "Failed to create hypercube", "response": result} cube_handle = result["qReturn"]["qHandle"] # Получаем layout с данными layout = self.send_request("GetLayout", [], handle=cube_handle) if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]: return {"error": "No hypercube in layout", "layout": layout} hypercube = layout["qLayout"]["qHyperCube"] return { "hypercube_handle": cube_handle, "hypercube_data": hypercube, "dimensions": converted_dimensions, "measures": converted_measures, "total_rows": hypercube.get("qSize", {}).get("qcy", 0), "total_columns": hypercube.get("qSize", {}).get("qcx", 0), } except Exception as e: return {"error": str(e), "details": "Error in create_hypercube method"} def get_hypercube_data( self, hypercube_handle: int, page_top: int = 0, page_height: int = 1000, page_left: int = 0, page_width: int = 50, ) -> Dict[str, Any]: """Get data from existing hypercube with pagination.""" try: # Use correct GetHyperCubeData method params = [ { "qPath": "/qHyperCubeDef", "qPages": [ { "qTop": page_top, "qLeft": page_left, "qHeight": page_height, "qWidth": page_width, } ], } ] result = self.send_request( "GetHyperCubeData", params, handle=hypercube_handle ) return result except Exception as e: return {"error": str(e), "details": "Error in get_hypercube_data method"} def get_table_data( self, app_handle: int, table_name: str = None, max_rows: int = 1000 ) -> Dict[str, Any]: """Get data from a specific table by creating hypercube with all table fields.""" try: if not table_name: # Get list of available tables fields_result = self.get_fields(app_handle) if "error" in fields_result: return fields_result tables = {} for field in fields_result.get("fields", []): table = field.get("table_name", "Unknown") if table not in tables: tables[table] = [] tables[table].append(field["field_name"]) return { "message": "Please specify table_name parameter", "available_tables": tables, "note": "Use one of the available table names to get data", } # Get fields for specified table fields_result = self.get_fields(app_handle) if "error" in fields_result: return fields_result table_fields = [] for field in fields_result.get("fields", []): if field.get("table_name") == table_name: table_fields.append(field["field_name"]) if not table_fields: return {"error": f"Table '{table_name}' not found or has no fields"} # Limit number of fields to avoid too wide tables max_fields = 20 if len(table_fields) > max_fields: table_fields = table_fields[:max_fields] truncated = True else: truncated = False # Create hypercube with all table fields as dimensions hypercube_def = { "qDimensions": [ { "qDef": { "qFieldDefs": [field], "qSortCriterias": [ { "qSortByState": 0, "qSortByFrequency": 0, "qSortByNumeric": 1, "qSortByAscii": 1, "qSortByLoadOrder": 1, "qSortByExpression": 0, "qExpression": {"qv": ""}, } ], }, "qNullSuppression": False, "qIncludeElemValue": True, } for field in table_fields ], "qMeasures": [], "qInitialDataFetch": [ { "qTop": 0, "qLeft": 0, "qHeight": max_rows, "qWidth": len(table_fields), } ], "qSuppressZero": False, "qSuppressMissing": False, "qMode": "S", } obj_def = { "qInfo": {"qId": f"table-data-{table_name}", "qType": "HyperCube"}, "qHyperCubeDef": hypercube_def, } result = self.send_request( "CreateSessionObject", [obj_def], handle=app_handle ) if "qReturn" not in result or "qHandle" not in result["qReturn"]: return { "error": "Failed to create hypercube for table data", "response": result, } cube_handle = result["qReturn"]["qHandle"] layout = self.send_request("GetLayout", [], handle=cube_handle) if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]: try: self.send_request( "DestroySessionObject", [f"table-data-{table_name}"], handle=app_handle, ) except: pass return {"error": "No hypercube in layout", "layout": layout} hypercube = layout["qLayout"]["qHyperCube"] # Process data into convenient format table_data = [] headers = table_fields for page in hypercube.get("qDataPages", []): for row in page.get("qMatrix", []): row_data = {} for i, cell in enumerate(row): if i < len(headers): row_data[headers[i]] = { "text": cell.get("qText", ""), "numeric": ( cell.get("qNum", None) if cell.get("qNum") != "NaN" else None ), "is_numeric": cell.get("qIsNumeric", False), "state": cell.get("qState", "O"), } table_data.append(row_data) result_data = { "table_name": table_name, "headers": headers, "data": table_data, "total_rows": hypercube.get("qSize", {}).get("qcy", 0), "returned_rows": len(table_data), "total_columns": len(headers), "truncated_fields": truncated, "dimension_info": hypercube.get("qDimensionInfo", []), } # Очищаем созданный объект try: self.send_request( "DestroySessionObject", [f"table-data-{table_name}"], handle=app_handle, ) except Exception as cleanup_error: result_data["cleanup_warning"] = str(cleanup_error) return result_data except Exception as e: return {"error": str(e), "details": "Error in get_table_data method"} def get_field_values( self, app_handle: int, field_name: str, max_values: int = 100, include_frequency: bool = True, ) -> Dict[str, Any]: """Get field values with frequency information using ListObject.""" try: # Use correct structure list_def = { "qInfo": {"qId": f"field-values-{field_name}", "qType": "ListObject"}, "qListObjectDef": { "qStateName": "$", "qLibraryId": "", "qDef": { "qFieldDefs": [field_name], "qFieldLabels": [], "qSortCriterias": [ { "qSortByState": 0, "qSortByFrequency": 1 if include_frequency else 0, "qSortByNumeric": 1, "qSortByAscii": 1, "qSortByLoadOrder": 0, "qSortByExpression": 0, "qExpression": {"qv": ""}, } ], }, "qInitialDataFetch": [ {"qTop": 0, "qLeft": 0, "qHeight": max_values, "qWidth": 1} ], }, } # Create session object - use correct parameter format result = self.send_request( "CreateSessionObject", [list_def], handle=app_handle ) if "qReturn" not in result or "qHandle" not in result["qReturn"]: return {"error": "Failed to create session object", "response": result} list_handle = result["qReturn"]["qHandle"] layout = self.send_request("GetLayout", [], handle=list_handle) # Correct path to qListObject - it's in qLayout if "qLayout" not in layout or "qListObject" not in layout["qLayout"]: # Clean up object before returning error try: self.send_request( "DestroySessionObject", [f"field-values-{field_name}"], handle=app_handle, ) except: pass return {"error": "No list object in layout", "layout": layout} list_object = layout["qLayout"]["qListObject"] values_data = [] # Process data for page in list_object.get("qDataPages", []): for row in page.get("qMatrix", []): if row and len(row) > 0: cell = row[0] value_info = { "value": cell.get("qText", ""), "state": cell.get( "qState", "O" ), # O=Optional, S=Selected, A=Alternative, X=Excluded "numeric_value": cell.get("qNum", None), "is_numeric": cell.get("qIsNumeric", False), } # Add frequency if available if "qFrequency" in cell: value_info["frequency"] = cell.get("qFrequency", 0) values_data.append(value_info) # Get general field information field_info = { "field_name": field_name, "values": values_data, "total_values": list_object.get("qSize", {}).get("qcy", 0), "returned_count": len(values_data), "dimension_info": list_object.get("qDimensionInfo", {}), "debug_info": { "list_handle": list_handle, "data_pages_count": len(list_object.get("qDataPages", [])), "raw_size": list_object.get("qSize", {}), }, } try: self.send_request( "DestroySessionObject", [f"field-values-{field_name}"], handle=app_handle, ) except Exception as cleanup_error: field_info["cleanup_warning"] = str(cleanup_error) return field_info except Exception as e: return {"error": str(e), "details": "Error in get_field_values method"} def get_field_statistics(self, app_handle: int, field_name: str) -> Dict[str, Any]: """Get comprehensive statistics for a field.""" debug_log = [] debug_log.append(f"get_field_statistics called with app_handle={app_handle}, field_name={field_name}") try: # Create expressions for statistics stats_expressions = [ f"Count(DISTINCT [{field_name}])", # Unique values f"Count([{field_name}])", # Total count f"Count({{$<[{field_name}]={{'*'}}>}})", # Non-null count f"Min([{field_name}])", # Minimum value f"Max([{field_name}])", # Maximum value f"Avg([{field_name}])", # Average value f"Sum([{field_name}])", # Sum (if numeric) f"Median([{field_name}])", # Median f"Mode([{field_name}])", # Mode (most frequent) f"Stdev([{field_name}])", # Standard deviation ] debug_log.append(f"Created {len(stats_expressions)} expressions: {stats_expressions}") # Create hypercube for statistics calculation hypercube_def = { "qDimensions": [], "qMeasures": [ {"qDef": {"qDef": expr, "qLabel": f"Stat_{i}"}} for i, expr in enumerate(stats_expressions) ], "qInitialDataFetch": [ { "qTop": 0, "qLeft": 0, "qHeight": 1, "qWidth": len(stats_expressions), } ], "qSuppressZero": False, "qSuppressMissing": False, } obj_def = { "qInfo": {"qId": f"field-stats-{field_name}", "qType": "HyperCube"}, "qHyperCubeDef": hypercube_def, } # Create session object debug_log.append(f"Creating session object with obj_def: {obj_def}") result = self.send_request( "CreateSessionObject", [obj_def], handle=app_handle ) debug_log.append(f"CreateSessionObject result: {result}") if "qReturn" not in result or "qHandle" not in result["qReturn"]: debug_log.append(f"Failed to create session object, returning error") return { "error": "Failed to create statistics hypercube", "response": result, "debug_log": debug_log } cube_handle = result["qReturn"]["qHandle"] # Get layout with data layout = self.send_request("GetLayout", [], handle=cube_handle) if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]: try: self.send_request( "DestroySessionObject", [f"field-stats-{field_name}"], handle=app_handle, ) except: pass return {"error": "No hypercube in statistics layout", "layout": layout, "debug_log": debug_log} hypercube = layout["qLayout"]["qHyperCube"] # Extract statistics values stats_labels = [ "unique_values", "total_count", "non_null_count", "min_value", "max_value", "avg_value", "sum_value", "median_value", "mode_value", "std_deviation", ] statistics = {"field_name": field_name} for page in hypercube.get("qDataPages", []): for row in page.get("qMatrix", []): for i, cell in enumerate(row): if i < len(stats_labels): stat_name = stats_labels[i] statistics[stat_name] = { "text": cell.get("qText", ""), "numeric": ( cell.get("qNum", None) if cell.get("qNum") != "NaN" else None ), "is_numeric": cell.get("qIsNumeric", False), } # Calculate additional derived statistics debug_log.append(f"Statistics before calculation: {statistics}") if "total_count" in statistics and "non_null_count" in statistics: # Handle None values safely total_dict = statistics["total_count"] non_null_dict = statistics["non_null_count"] debug_log.append(f"total_dict: {total_dict}") debug_log.append(f"non_null_dict: {non_null_dict}") total = total_dict.get("numeric", 0) if total_dict.get("numeric") is not None else 0 non_null = non_null_dict.get("numeric", 0) if non_null_dict.get("numeric") is not None else 0 debug_log.append(f"total: {total} (type: {type(total)})") debug_log.append(f"non_null: {non_null} (type: {type(non_null)})") if total > 0: debug_log.append(f"Calculating percentages...") debug_log.append(f"Calculation: ({total} - {non_null}) / {total} * 100") statistics["null_percentage"] = round( (total - non_null) / total * 100, 2 ) statistics["completeness_percentage"] = round( non_null / total * 100, 2 ) debug_log.append(f"Percentages calculated successfully") # Cleanup try: self.send_request( "DestroySessionObject", [f"field-stats-{field_name}"], handle=app_handle, ) except Exception as cleanup_error: statistics["cleanup_warning"] = str(cleanup_error) statistics["debug_log"] = debug_log return statistics except Exception as e: import traceback debug_log.append(f"Exception in get_field_statistics: {e}") debug_log.append(f"Traceback: {traceback.format_exc()}") return { "error": str(e), "details": "Error in get_field_statistics method", "traceback": traceback.format_exc(), "debug_log": debug_log } def get_object_data(self, app_handle: int, object_id: str) -> Dict[str, Any]: """Get data from existing visualization object.""" obj_result = self.send_request( "GetObject", {"qId": object_id}, handle=app_handle ) obj_handle = obj_result.get("qReturn", {}).get("qHandle", -1) if obj_handle != -1: layout = self.send_request("GetLayout", handle=obj_handle) return layout return {} def export_data_to_csv( self, app_handle: int, object_id: str, file_path: str = "/tmp/export.csv" ) -> Dict[str, Any]: """Export object data to CSV.""" params = { "qObjectId": object_id, "qPath": file_path, "qExportState": "A", # All data } result = self.send_request("ExportData", params, handle=app_handle) return result def search_objects( self, app_handle: int, search_terms: List[str], object_types: List[str] = None ) -> List[Dict[str, Any]]: """Search for objects by terms.""" params = { "qOptions": {"qSearchFields": ["*"], "qContext": "LockedFieldsOnly"}, "qTerms": search_terms, "qPage": {"qOffset": 0, "qCount": 100, "qMaxNbrFieldMatches": 5}, } if object_types: params["qOptions"]["qTypes"] = object_types result = self.send_request("SearchObjects", params, handle=app_handle) return result.get("qResult", {}).get("qSearchTerms", []) def get_field_and_variable_list(self, app_handle: int) -> Dict[str, Any]: """Get comprehensive list of fields and variables.""" result = self.send_request("GetFieldAndVariableList", {}, handle=app_handle) return result def get_measures(self, app_handle: int) -> List[Dict[str, Any]]: """Get master measures.""" result = self.send_request("GetMeasureList", handle=app_handle) return result.get("qMeasureList", {}).get("qItems", []) def get_dimensions(self, app_handle: int) -> List[Dict[str, Any]]: """Get master dimensions.""" result = self.send_request("GetDimensionList", handle=app_handle) return result.get("qDimensionList", {}).get("qItems", []) def get_variables(self, app_handle: int) -> List[Dict[str, Any]]: """Get variables.""" result = self.send_request("GetVariableList", handle=app_handle) return result.get("qVariableList", {}).get("qItems", []) def create_list_object( self, app_handle: int, field_name: str, sort_by_frequency: bool = True ) -> Dict[str, Any]: """Create optimized list object for field analysis.""" list_def = { "qInfo": {"qType": "ListObject"}, "qListObjectDef": { "qDef": { "qFieldDefs": [field_name], "qSortCriterias": [ { "qSortByFrequency": 1 if sort_by_frequency else 0, "qSortByNumeric": 1, "qSortByAscii": 1, } ], }, "qInitialDataFetch": [ {"qTop": 0, "qLeft": 0, "qHeight": 100, "qWidth": 1} ], }, } result = self.send_request( "CreateSessionObject", {"qProp": list_def}, handle=app_handle ) return result def get_pivot_table_data( self, app_handle: int, dimensions: List[str], measures: List[str], max_rows: int = 1000, ) -> Dict[str, Any]: """Create pivot table for complex data analysis.""" pivot_def = { "qInfo": {"qType": "PivotTable"}, "qHyperCubeDef": { "qDimensions": [ {"qDef": {"qFieldDefs": [dim]}, "qNullSuppression": True} for dim in dimensions ], "qMeasures": [ {"qDef": {"qDef": measure}, "qSortBy": {"qSortByNumeric": -1}} for measure in measures ], "qInitialDataFetch": [ { "qTop": 0, "qLeft": 0, "qHeight": max_rows, "qWidth": len(converted_dimensions) + len(converted_measures), } ], "qSuppressZero": True, "qSuppressMissing": True, }, } result = self.send_request( "CreateSessionObject", {"qProp": pivot_def}, handle=app_handle ) return result def calculate_expression( self, app_handle: int, expression: str, dimensions: List[str] = None ) -> Dict[str, Any]: """Calculate expression with optional grouping by dimensions.""" if dimensions: # Create hypercube for grouped calculation hypercube_def = { "qDimensions": [{"qDef": {"qFieldDefs": [dim]}} for dim in dimensions], "qMeasures": [{"qDef": {"qDef": expression}}], "qInitialDataFetch": [ { "qTop": 0, "qLeft": 0, "qHeight": 1000, "qWidth": len(dimensions) + 1, } ], } obj_def = { "qInfo": {"qType": "calculation"}, "qHyperCubeDef": hypercube_def, } result = self.send_request( "CreateSessionObject", {"qProp": obj_def}, handle=app_handle ) return result else: # Simple expression evaluation return self.evaluate_expression(app_handle, expression) def get_bookmarks(self, app_handle: int) -> List[Dict[str, Any]]: """Get bookmarks (saved selections).""" result = self.send_request("GetBookmarkList", handle=app_handle) return result.get("qBookmarkList", {}).get("qItems", []) def apply_bookmark(self, app_handle: int, bookmark_id: str) -> bool: """Apply bookmark selections.""" result = self.send_request( "ApplyBookmark", {"qBookmarkId": bookmark_id}, handle=app_handle ) return result.get("qReturn", False) def get_locale_info(self, app_handle: int) -> Dict[str, Any]: """Get locale information for proper number/date formatting.""" result = self.send_request("GetLocaleInfo", handle=app_handle) return result def search_suggest( self, app_handle: int, search_terms: List[str], object_types: List[str] = None ) -> List[Dict[str, Any]]: """Get search suggestions for better field/value discovery.""" params = { "qSuggestions": { "qSuggestionTypes": ( ["Field", "Value", "Object"] if not object_types else object_types ) }, "qTerms": search_terms, } result = self.send_request("SearchSuggest", params, handle=app_handle) return result.get("qResult", {}).get("qSuggestions", []) def create_data_export( self, app_handle: int, table_name: str = None, fields: List[str] = None, format_type: str = "json", max_rows: int = 10000, filters: Dict[str, Any] = None, ) -> Dict[str, Any]: """Create data export in various formats (JSON, CSV-like structure).""" try: # If no specific fields provided, get all fields from table if not fields: if table_name: fields_result = self.get_fields(app_handle) if "error" in fields_result: return fields_result table_fields = [] for field in fields_result.get("fields", []): if field.get("table_name") == table_name: table_fields.append(field["field_name"]) if not table_fields: return {"error": f"No fields found for table '{table_name}'"} fields = table_fields[:50] # Limit to 50 fields max else: return { "error": "Either table_name or fields list must be provided" } # Create hypercube for data extraction hypercube_def = { "qDimensions": [ { "qDef": { "qFieldDefs": [field], "qSortCriterias": [ { "qSortByState": 0, "qSortByFrequency": 0, "qSortByNumeric": 1, "qSortByAscii": 1, "qSortByLoadOrder": 1, "qSortByExpression": 0, "qExpression": {"qv": ""}, } ], }, "qNullSuppression": False, "qIncludeElemValue": True, } for field in fields ], "qMeasures": [], "qInitialDataFetch": [ {"qTop": 0, "qLeft": 0, "qHeight": max_rows, "qWidth": len(fields)} ], "qSuppressZero": False, "qSuppressMissing": False, "qMode": "S", } # Apply filters if provided if filters: # Add selection expressions as calculated dimensions for field_name, filter_values in filters.items(): if isinstance(filter_values, list): values_str = ", ".join([f"'{v}'" for v in filter_values]) filter_expr = f"If(Match([{field_name}], {values_str}), [{field_name}], Null())" else: filter_expr = f"If([{field_name}] = '{filter_values}', [{field_name}], Null())" # Replace the original field with filtered version for dim in hypercube_def["qDimensions"]: if dim["qDef"]["qFieldDefs"][0] == field_name: dim["qDef"]["qFieldDefs"] = [filter_expr] break obj_def = { "qInfo": { "qId": f"data-export-{table_name or 'custom'}", "qType": "HyperCube", }, "qHyperCubeDef": hypercube_def, } # Create session object result = self.send_request( "CreateSessionObject", [obj_def], handle=app_handle ) if "qReturn" not in result or "qHandle" not in result["qReturn"]: return { "error": "Failed to create export hypercube", "response": result, } cube_handle = result["qReturn"]["qHandle"] # Get layout with data layout = self.send_request("GetLayout", [], handle=cube_handle) if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]: try: self.send_request( "DestroySessionObject", [f"data-export-{table_name or 'custom'}"], handle=app_handle, ) except: pass return {"error": "No hypercube in export layout", "layout": layout} hypercube = layout["qLayout"]["qHyperCube"] # Process data based on format export_data = [] headers = fields for page in hypercube.get("qDataPages", []): for row in page.get("qMatrix", []): if format_type.lower() == "json": row_data = {} for i, cell in enumerate(row): if i < len(headers): row_data[headers[i]] = { "text": cell.get("qText", ""), "numeric": ( cell.get("qNum", None) if cell.get("qNum") != "NaN" else None ), "is_numeric": cell.get("qIsNumeric", False), } export_data.append(row_data) elif format_type.lower() == "csv": # CSV-like structure (list of values) row_values = [] for cell in row: row_values.append(cell.get("qText", "")) export_data.append(row_values) elif format_type.lower() == "simple": # Simple key-value structure row_data = {} for i, cell in enumerate(row): if i < len(headers): row_data[headers[i]] = cell.get("qText", "") export_data.append(row_data) result_data = { "export_format": format_type, "table_name": table_name, "fields": headers, "data": export_data, "metadata": { "total_rows": hypercube.get("qSize", {}).get("qcy", 0), "exported_rows": len(export_data), "total_columns": len(headers), "filters_applied": filters is not None, "export_timestamp": None, # Could be added with datetime.now() if needed "dimension_info": hypercube.get("qDimensionInfo", []), }, } # Add CSV headers if CSV format if format_type.lower() == "csv": result_data["csv_headers"] = headers # Cleanup try: self.send_request( "DestroySessionObject", [f"data-export-{table_name or 'custom'}"], handle=app_handle, ) except Exception as cleanup_error: result_data["cleanup_warning"] = str(cleanup_error) return result_data except Exception as e: return {"error": str(e), "details": "Error in create_data_export method"} def get_visualization_data(self, app_handle: int, object_id: str) -> Dict[str, Any]: """Get data from existing visualization object (chart, table, etc.).""" try: obj_result = self.send_request("GetObject", [object_id], handle=app_handle) if "qReturn" not in obj_result or "qHandle" not in obj_result["qReturn"]: return {"error": f"Failed to get object with ID: {object_id}", "response": obj_result} obj_handle = obj_result["qReturn"]["qHandle"] layout = self.send_request("GetLayout", [], handle=obj_handle) if "qLayout" not in layout: return {"error": "No layout found for object", "layout": layout} obj_layout = layout["qLayout"] obj_info = obj_layout.get("qInfo", {}) obj_type = obj_info.get("qType", "unknown") result = { "object_id": object_id, "object_type": obj_type, "object_title": obj_layout.get("qMeta", {}).get("title", ""), "data": None, "structure": None, } if "qHyperCube" in obj_layout: hypercube = obj_layout["qHyperCube"] table_data = [] dimensions = [] measures = [] for dim_info in hypercube.get("qDimensionInfo", []): dimensions.append({ "title": dim_info.get("qFallbackTitle", ""), "field": (dim_info.get("qGroupFieldDefs", [""])[0] if dim_info.get("qGroupFieldDefs") else ""), "cardinal": dim_info.get("qCardinal", 0), }) for measure_info in hypercube.get("qMeasureInfo", []): measures.append({ "title": measure_info.get("qFallbackTitle", ""), "expression": measure_info.get("qDef", ""), "format": measure_info.get("qNumFormat", {}), }) for page in hypercube.get("qDataPages", []): for row in page.get("qMatrix", []): row_data = {} for i, cell in enumerate(row[: len(dimensions)]): if i < len(dimensions): row_data[f"dim_{i}_{dimensions[i]['title']}"] = { "text": cell.get("qText", ""), "numeric": (cell.get("qNum", None) if cell.get("qNum") != "NaN" else None), "state": cell.get("qState", "O"), } for i, cell in enumerate(row[len(dimensions) :]): if i < len(measures): row_data[f"measure_{i}_{measures[i]['title']}"] = { "text": cell.get("qText", ""), "numeric": (cell.get("qNum", None) if cell.get("qNum") != "NaN" else None), } table_data.append(row_data) result["data"] = table_data result["structure"] = { "dimensions": dimensions, "measures": measures, "total_rows": hypercube.get("qSize", {}).get("qcy", 0), "total_columns": hypercube.get("qSize", {}).get("qcx", 0), "returned_rows": len(table_data), } elif "qListObject" in obj_layout: list_object = obj_layout["qListObject"] values_data = [] for page in list_object.get("qDataPages", []): for row in page.get("qMatrix", []): if row and len(row) > 0: cell = row[0] values_data.append({ "value": cell.get("qText", ""), "state": cell.get("qState", "O"), "frequency": cell.get("qFrequency", 0), }) result["data"] = values_data result["structure"] = { "field_name": list_object.get("qDimensionInfo", {}).get("qFallbackTitle", ""), "total_values": list_object.get("qSize", {}).get("qcy", 0), "returned_values": len(values_data), } elif "qPivotTable" in obj_layout: pivot_table = obj_layout["qPivotTable"] result["data"] = pivot_table.get("qDataPages", []) result["structure"] = {"type": "pivot_table", "size": pivot_table.get("qSize", {})} else: result["data"] = obj_layout result["structure"] = {"type": "unknown", "raw_layout": True} return result except Exception as e: return {"error": str(e), "details": "Error in get_visualization_data method"} def get_detailed_app_metadata(self, app_id: str) -> Dict[str, Any]: """Get detailed app metadata similar to /api/v1/apps/{app_id}/data/metadata endpoint.""" try: self.connect() # Open the app app_result = self.open_doc(app_id, no_data=False) if "qReturn" not in app_result or "qHandle" not in app_result["qReturn"]: return {"error": "Failed to open app", "response": app_result} app_handle = app_result["qReturn"]["qHandle"] # Get app layout and properties using correct methods try: layout = self.send_request("GetAppLayout", [], handle=app_handle) except: layout = {} try: properties = self.send_request( "GetAppProperties", [], handle=app_handle ) except: properties = {} # Get fields information fields_result = self.get_fields(app_handle) # Get tables information using GetTablesAndKeys tables_result = self.send_request( "GetTablesAndKeys", [ {"qcx": 1000, "qcy": 1000}, # Max dimensions {"qcx": 0, "qcy": 0}, # Min dimensions 30, # Max tables True, # Include system tables False, # Include hidden fields ], handle=app_handle, ) # Process fields data fields_metadata = [] if "fields" in fields_result: for field in fields_result["fields"]: field_meta = { "name": field.get("field_name", ""), "src_tables": [field.get("table_name", "")], "is_system": field.get("is_system", False), "is_hidden": field.get("is_hidden", False), "is_semantic": field.get("is_semantic", False), "distinct_only": False, "cardinal": field.get("distinct_values", 0), "total_count": field.get("rows_count", 0), "is_locked": False, "always_one_selected": False, "is_numeric": "numeric" in field.get("tags", []), "comment": "", "tags": field.get("tags", []), "byte_size": 0, # Not available via Engine API "hash": "", # Not available via Engine API } fields_metadata.append(field_meta) # Process tables data tables_metadata = [] if "qtr" in tables_result: for table in tables_result["qtr"]: table_meta = { "name": table.get("qName", ""), "is_system": table.get("qIsSystem", False), "is_semantic": table.get("qIsSemantic", False), "is_loose": table.get("qIsLoose", False), "no_of_rows": table.get("qNoOfRows", 0), "no_of_fields": len(table.get("qFields", [])), "no_of_key_fields": len( [ f for f in table.get("qFields", []) if f.get("qIsKey", False) ] ), "comment": table.get("qComment", ""), "byte_size": 0, # Not available via Engine API } tables_metadata.append(table_meta) # Get reload metadata if available reload_meta = { "cpu_time_spent_ms": 0, # Not available via Engine API "hardware": {"logical_cores": 0, "total_memory": 0}, "peak_memory_bytes": 0, "fullReloadPeakMemoryBytes": 0, "partialReloadPeakMemoryBytes": 0, } # Calculate static byte size approximation static_byte_size = sum( table.get("byte_size", 0) for table in tables_metadata ) # Build response similar to the expected format metadata = { "reload_meta": reload_meta, "static_byte_size": static_byte_size, "fields": fields_metadata, "tables": tables_metadata, "has_section_access": False, # Would need to check script for this "tables_profiling_data": [], "is_direct_query_mode": False, "usage": "ANALYTICS", "source": "engine_api", "app_layout": layout, "app_properties": properties, } return metadata except Exception as e: return {"error": str(e), "details": "Error in get_detailed_app_metadata"} finally: self.disconnect() def get_app_details(self, app_id: str) -> Dict[str, Any]: """ Get comprehensive information about application for initial analysis. Fast overview including: - App metadata (size, dates, reload status) - Data model structure (tables, fields, types, cardinality) - Master items (only user-created measures and dimensions) - Variables (only user-created) - Object counts by type - Table relationships (key fields) Returns optimized JSON report for quick app understanding. """ try: self.connect() # Open app once and reuse connection app_result = self.open_doc(app_id, no_data=False) if "qReturn" not in app_result or "qHandle" not in app_result["qReturn"]: return {"error": "Failed to open app", "response": app_result} app_handle = app_result["qReturn"]["qHandle"] # Get app metadata and layout app_metadata = self._get_app_metadata_fast(app_handle) # Get data model structure data_model = self._get_data_model_structure(app_handle) # Get master items (only user-created) master_items = self._get_user_master_items(app_handle) # Get user variables (exclude system) user_variables = self._get_user_variables(app_handle) # Get object counts by type object_counts = self._get_object_counts(app_handle) # Get table relationships table_relationships = self._get_table_relationships(app_handle) # Build optimized response report = { "app_metadata": { "app_id": app_id, "name": app_metadata.get("title", ""), "description": app_metadata.get("description", ""), "filename": app_metadata.get("filename", ""), "size_bytes": app_metadata.get("size", 0), "size_mb": round(app_metadata.get("size", 0) / (1024 * 1024), 2), "created_date": app_metadata.get("created_date", ""), "modified_date": app_metadata.get("modified_date", ""), "last_reload_time": app_metadata.get("last_reload_time", ""), "has_script": app_metadata.get("has_script", False), "has_data": app_metadata.get("has_data", False), "is_published": app_metadata.get("published", False) }, "data_model": { "tables": data_model.get("tables", []), "total_tables": len(data_model.get("tables", [])), "total_fields": sum(len(table.get("fields", [])) for table in data_model.get("tables", [])), "table_relationships": table_relationships }, "master_items": { "measures": master_items.get("measures", []), "dimensions": master_items.get("dimensions", []), "total_measures": len(master_items.get("measures", [])), "total_dimensions": len(master_items.get("dimensions", [])) }, "variables": { "user_variables": user_variables, "total_variables": len(user_variables) }, "object_counts": object_counts, "reload_info": app_metadata.get("reload_info", {}), "summary": { "analysis_type": "quick_overview", "analysis_timestamp": datetime.now().isoformat() } } return report except Exception as e: return {"error": str(e), "details": "Error in get_app_details method"} finally: self.disconnect() def _get_app_metadata_fast(self, app_handle: int) -> Dict[str, Any]: """Get basic app metadata without heavy analysis.""" try: # Get app layout layout_response = self.send_request("GetAppLayout", [], handle=app_handle) layout = layout_response.get("qLayout", {}) # Get app properties properties_response = self.send_request("GetAppProperties", [], handle=app_handle) properties = properties_response.get("qProperties", {}) return { "title": layout.get("qTitle", ""), "filename": layout.get("qFileName", ""), "description": properties.get("qMetaDef", {}).get("description", ""), "size": layout.get("qStaticByteSize", 0), "created_date": layout.get("createdDate", ""), "modified_date": layout.get("modifiedDate", ""), "last_reload_time": layout.get("qLastReloadTime", ""), "has_script": layout.get("qHasScript", False), "has_data": layout.get("qHasData", False), "published": layout.get("published", False), "reload_info": { "last_execution_time": layout.get("qLastReloadTime", ""), "is_partial_reload": layout.get("qIsPartialReload", False), "has_data": layout.get("qHasData", False) } } except Exception as e: return {"error": str(e)} def _get_data_model_structure(self, app_handle: int) -> Dict[str, Any]: """Get tables and fields structure without usage analysis.""" try: # Get tables structure using GetTablesAndKeys tables_result = self.send_request( "GetTablesAndKeys", [ {"qcx": 1000, "qcy": 1000}, # Max dimensions {"qcx": 0, "qcy": 0}, # Min dimensions 50, # Max tables False, # Include system tables False, # Include hidden fields ], handle=app_handle, ) tables = [] for table in tables_result.get("qtr", []): table_name = table.get("qName", "") table_fields = [] for field in table.get("qFields", []): field_info = { "name": field.get("qName", ""), "data_type": self._determine_data_type(field.get("qTags", [])), "total_rows": field.get("qnRows", 0), "distinct_values": field.get("qnTotalDistinctValues", 0), "present_distinct_values": field.get("qnPresentDistinctValues", 0), "completeness_pct": round( (field.get("qnNonNulls", 0) / max(field.get("qnRows", 1), 1)) * 100, 1 ), "is_key": field.get("qIsKey", False), "key_type": field.get("qKeyType", "") } table_fields.append(field_info) table_info = { "name": table_name, "total_rows": table.get("qNoOfRows", 0), "field_count": len(table_fields), "fields": table_fields, "is_system": table.get("qIsSystem", False), "is_semantic": table.get("qIsSemantic", False) } tables.append(table_info) return {"tables": tables} except Exception as e: return {"error": str(e), "tables": []} def _determine_data_type(self, tags: List[str]) -> str: """Determine data type from field tags.""" if "$numeric" in tags: if "$integer" in tags: return "integer" else: return "numeric" elif "$text" in tags: return "text" elif "$date" in tags: return "date" elif "$timestamp" in tags: return "timestamp" else: return "unknown" def _get_user_master_items(self, app_handle: int) -> Dict[str, Any]: """Get only user-created master items (exclude system).""" try: # Get master measures measures = self._get_master_measures(app_handle) user_measures = [] for measure in measures: # Filter out system measures if not measure.get("qMeta", {}).get("qIsHidden", False): user_measures.append({ "name": measure.get("qMeta", {}).get("title", ""), "description": measure.get("qMeta", {}).get("description", ""), "definition": measure.get("qMeasure", {}).get("qDef", ""), "created_date": measure.get("qMeta", {}).get("createdDate", ""), "modified_date": measure.get("qMeta", {}).get("modifiedDate", ""), "owner": measure.get("qMeta", {}).get("owner", {}).get("name", "") }) # Get master dimensions dimensions = self._get_master_dimensions(app_handle) user_dimensions = [] for dimension in dimensions: # Filter out system dimensions if not dimension.get("qMeta", {}).get("qIsHidden", False): user_dimensions.append({ "name": dimension.get("qMeta", {}).get("title", ""), "description": dimension.get("qMeta", {}).get("description", ""), "field_definitions": dimension.get("qDim", {}).get("qFieldDefs", []), "created_date": dimension.get("qMeta", {}).get("createdDate", ""), "modified_date": dimension.get("qMeta", {}).get("modifiedDate", ""), "owner": dimension.get("qMeta", {}).get("owner", {}).get("name", "") }) return { "measures": user_measures, "dimensions": user_dimensions } except Exception as e: return {"error": str(e), "measures": [], "dimensions": []} def _get_user_variables(self, app_handle: int) -> List[Dict[str, Any]]: """Get only user-created variables (exclude system).""" try: # Create VariableList object variable_list_def = { "qInfo": {"qType": "VariableList"}, "qVariableListDef": { "qType": "variable", "qShowReserved": False, # Exclude system variables "qShowConfig": False, "qData": {"tags": "/tags"} } } variable_list_response = self.send_request("CreateSessionObject", [variable_list_def], handle=app_handle) if "qReturn" not in variable_list_response: return [] variable_list_handle = variable_list_response["qReturn"]["qHandle"] layout_response = self.send_request("GetLayout", [], handle=variable_list_handle) variables = layout_response.get("qLayout", {}).get("qVariableList", {}).get("qItems", []) user_variables = [] for variable in variables: # Additional filter for user variables only if not variable.get("qIsReserved", False) and not variable.get("qIsConfig", False): definition = variable.get("qDefinition", "") user_variables.append({ "name": variable.get("qName", ""), "text_value": definition, "is_script_created": variable.get("qIsScriptCreated", False) }) return user_variables except Exception as e: return [] def _get_object_counts(self, app_handle: int) -> Dict[str, int]: """Get count of objects by type.""" try: # Get all app objects all_infos = self.send_request("GetAllInfos", [], handle=app_handle) object_counts = {} for info in all_infos.get("qInfos", []): obj_type = info.get("qType", "unknown") object_counts[obj_type] = object_counts.get(obj_type, 0) + 1 # Group similar types for better readability grouped_counts = { "sheets": object_counts.get("sheet", 0), "charts": ( object_counts.get("barchart", 0) + object_counts.get("linechart", 0) + object_counts.get("piechart", 0) + object_counts.get("combochart", 0) + object_counts.get("scatterplot", 0) ), "tables": object_counts.get("table", 0), "kpis": object_counts.get("kpi", 0), "filters": ( object_counts.get("listbox", 0) + object_counts.get("filterpane", 0) ), "text_objects": object_counts.get("text-image", 0), "other": sum(v for k, v in object_counts.items() if k not in ["sheet", "barchart", "linechart", "piechart", "combochart", "scatterplot", "table", "kpi", "listbox", "filterpane", "text-image"]) } # Add total count grouped_counts["total_objects"] = sum(grouped_counts.values()) return grouped_counts except Exception as e: return {"error": str(e)} def _get_table_relationships(self, app_handle: int) -> List[Dict[str, Any]]: """Get relationships between tables based on key fields.""" try: # Get tables with key information tables_result = self.send_request( "GetTablesAndKeys", [ {"qcx": 1000, "qcy": 1000}, {"qcx": 0, "qcy": 0}, 50, False, False ], handle=app_handle, ) relationships = [] tables = tables_result.get("qtr", []) # Find relationships based on field names and key types for i, table1 in enumerate(tables): table1_name = table1.get("qName", "") table1_keys = [f for f in table1.get("qFields", []) if f.get("qIsKey", False)] for j, table2 in enumerate(tables[i+1:], i+1): table2_name = table2.get("qName", "") table2_keys = [f for f in table2.get("qFields", []) if f.get("qIsKey", False)] # Find common key fields common_keys = [] for key1 in table1_keys: for key2 in table2_keys: if key1.get("qName") == key2.get("qName"): common_keys.append({ "field_name": key1.get("qName"), "key_type": key1.get("qKeyType", "") }) if common_keys: relationships.append({ "table1": table1_name, "table2": table2_name, "relationship_type": "key_match", "common_fields": common_keys }) return relationships except Exception as e: return []

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bintocher/qlik-sense-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server