"""Qlik Sense Engine API client."""
import json
import websocket
import ssl
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
from .config import (
QlikSenseConfig,
DEFAULT_WS_TIMEOUT,
DEFAULT_WS_RETRIES,
DEFAULT_HYPERCUBE_MAX_ROWS,
MAX_TABLES_AND_KEYS_DIM,
MAX_TABLES,
)
from .exceptions import QlikConnectionError, QlikEngineError
import logging
import os
logger = logging.getLogger(__name__)
class QlikEngineAPI:
"""Client for Qlik Sense Engine API using WebSocket."""
def __init__(self, config: QlikSenseConfig):
self.config = config
self.ws = None
self.request_id = 0
# Timeouts / retries from env
ws_timeout_env = os.getenv("QLIK_WS_TIMEOUT")
try:
self.ws_timeout_seconds = float(ws_timeout_env) if ws_timeout_env else DEFAULT_WS_TIMEOUT
except ValueError:
self.ws_timeout_seconds = DEFAULT_WS_TIMEOUT
retries_env = os.getenv("QLIK_WS_RETRIES")
try:
self.ws_retries = int(retries_env) if retries_env else DEFAULT_WS_RETRIES
except ValueError:
self.ws_retries = DEFAULT_WS_RETRIES
def _get_next_request_id(self) -> int:
"""Get next request ID."""
self.request_id += 1
return self.request_id
def connect(self, app_id: Optional[str] = None) -> None:
"""Connect to Engine API via WebSocket."""
# Try different WebSocket endpoints
server_host = self.config.server_url.replace("https://", "").replace(
"http://", ""
)
# Order and count of endpoints controlled by retries setting
endpoints_all = [
f"wss://{server_host}:{self.config.engine_port}/app/engineData",
f"wss://{server_host}:{self.config.engine_port}/app",
f"ws://{server_host}:{self.config.engine_port}/app/engineData",
f"ws://{server_host}:{self.config.engine_port}/app",
]
endpoints_to_try = endpoints_all[: max(1, min(self.ws_retries, len(endpoints_all)))]
# Setup SSL context
ssl_context = ssl.create_default_context()
if not self.config.verify_ssl:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
if self.config.client_cert_path and self.config.client_key_path:
ssl_context.load_cert_chain(
self.config.client_cert_path, self.config.client_key_path
)
if self.config.ca_cert_path:
ssl_context.load_verify_locations(self.config.ca_cert_path)
# Headers for authentication
headers = [
f"X-Qlik-User: UserDirectory={self.config.user_directory}; UserId={self.config.user_id}"
]
last_error = None
for url in endpoints_to_try:
try:
if url.startswith("wss://"):
self.ws = websocket.create_connection(
url, sslopt={"context": ssl_context}, header=headers, timeout=self.ws_timeout_seconds
)
else:
self.ws = websocket.create_connection(
url, header=headers, timeout=self.ws_timeout_seconds
)
# initial recv to establish session
self.ws.recv()
return # Success
except Exception as e:
last_error = e
if self.ws:
try:
self.ws.close()
except Exception:
pass
self.ws = None
continue
raise ConnectionError(
f"Failed to connect to Engine API. Last error: {str(last_error)}"
)
def disconnect(self) -> None:
"""Disconnect from Engine API."""
if self.ws:
self.ws.close()
self.ws = None
def send_request(
self, method: str, params: List[Any] = None, handle: int = -1
) -> Dict[str, Any]:
"""
Send JSON-RPC 2.0 request to Qlik Engine API and return response.
Args:
method: Engine API method name
params: Method parameters list
handle: Object handle for scoped operations (-1 for global)
Returns:
Response dictionary from Engine API
"""
if not self.ws:
raise ConnectionError("Not connected to Engine API")
request = {
"jsonrpc": "2.0",
"id": self._get_next_request_id(),
"handle": handle,
"method": method,
"params": params or [],
}
self.ws.send(json.dumps(request))
while True:
data = self.ws.recv()
if "result" in data or "error" in data:
break
response = json.loads(data)
if "error" in response:
raise Exception(f"Engine API error: {response['error']}")
return response.get("result", {})
def get_doc_list(self) -> List[Dict[str, Any]]:
"""Get list of available documents."""
try:
# Connect to global engine first
result = self.send_request("GetDocList")
doc_list = result.get("qDocList", [])
# Ensure we return a list even if empty
if isinstance(doc_list, list):
return doc_list
else:
return []
except Exception as e:
# Return empty list on error for compatibility
return []
def open_doc(self, app_id: str, no_data: bool = True) -> Dict[str, Any]:
"""
Open Qlik Sense application document.
Args:
app_id: Application ID to open
no_data: If True, open without loading data (faster for metadata operations)
Returns:
Response with document handle
"""
try:
if no_data:
return self.send_request("OpenDoc", [app_id, "", "", "", True])
else:
return self.send_request("OpenDoc", [app_id])
except Exception as e:
# If app is already open, try to get existing handle
if "already open" in str(e).lower():
try:
# Try to get the already open document
doc_list = self.get_doc_list()
for doc in doc_list:
if doc.get("qDocId") == app_id:
# Return mock response with existing handle
return {
"qReturn": {
"qHandle": doc.get("qHandle", -1),
"qGenericId": app_id
}
}
except Exception:
pass
raise e
def close_doc(self, app_handle: int) -> bool:
"""Close application document."""
try:
result = self.send_request("CloseDoc", [], handle=app_handle)
return result.get("qReturn", {}).get("qSuccess", False)
except Exception:
return False
def get_active_doc(self) -> Dict[str, Any]:
"""Get currently active document if any."""
try:
result = self.send_request("GetActiveDoc")
return result
except Exception:
return {}
def open_doc_safe(self, app_id: str, no_data: bool = True) -> Dict[str, Any]:
"""
Safely open document with better error handling for already open apps.
Args:
app_id: Application ID to open
no_data: If True, open without loading data
Returns:
Response with document handle
"""
try:
# First try to open normally
if no_data:
return self.send_request("OpenDoc", [app_id, "", "", "", True])
else:
return self.send_request("OpenDoc", [app_id])
except Exception as e:
error_msg = str(e)
# Handle "already open" errors specially
if "already open" in error_msg.lower() or "app already open" in error_msg.lower():
try:
# Try to get active document
active_doc = self.get_active_doc()
if active_doc and "qReturn" in active_doc:
return active_doc
# Try to find in document list
doc_list = self.get_doc_list()
for doc in doc_list:
if doc.get("qDocId") == app_id or doc.get("qDocName") == app_id:
return {
"qReturn": {
"qHandle": doc.get("qHandle", -1),
"qGenericId": app_id
}
}
# If still not found, re-raise original error
raise e
except Exception:
# If all recovery attempts fail, re-raise original error
raise e
else:
# For other errors, just re-raise
raise e
def get_app_properties(self, app_handle: int) -> Dict[str, Any]:
"""Get app properties."""
return self.send_request("GetAppProperties", handle=app_handle)
def get_script(self, app_handle: int) -> str:
"""Get load script."""
result = self.send_request("GetScript", [], handle=app_handle)
return result.get("qScript", "")
def set_script(self, app_handle: int, script: str) -> bool:
"""Set load script."""
result = self.send_request("SetScript", [script], handle=app_handle)
return result.get("qReturn", {}).get("qSuccess", False)
def do_save(self, app_handle: int, file_name: Optional[str] = None) -> bool:
"""Save app."""
params = {}
if file_name:
params["qFileName"] = file_name
result = self.send_request("DoSave", params, handle=app_handle)
return result.get("qReturn", {}).get("qSuccess", False)
def get_objects(
self, app_handle: int, object_type: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Get app objects."""
# Build parameters based on whether specific object_type is requested
if object_type:
# Get specific object type
params = {
"qOptions": {
"qTypes": [object_type],
"qIncludeSessionObjects": True,
"qData": {},
}
}
else:
# Get ALL objects - don't specify qTypes to get everything including extensions
params = {
"qOptions": {
"qIncludeSessionObjects": True,
"qData": {},
}
}
# Debug logging
logger.debug(f"get_objects params: {params}")
result = self.send_request("GetObjects", params, handle=app_handle)
# Debug result
if "error" in str(result) or "Missing Types" in str(result):
logger.debug(f"get_objects error result: {result}")
return result.get("qList", {}).get("qItems", [])
def get_sheets(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get app sheets."""
try:
sheet_list_def = {
"qInfo": {"qType": "SheetList"},
"qAppObjectListDef": {
"qType": "sheet",
"qData": {
"title": "/qMetaDef/title",
"description": "/qMetaDef/description",
"thumbnail": "/thumbnail",
"cells": "/cells",
"rank": "/rank",
"columns": "/columns",
"rows": "/rows"
}
}
}
create_result = self.send_request("CreateSessionObject", [sheet_list_def], handle=app_handle)
if "qReturn" not in create_result or "qHandle" not in create_result["qReturn"]:
logger.warning(f"Failed to create SheetList object: {create_result}")
return []
sheet_list_handle = create_result["qReturn"]["qHandle"]
layout_result = self.send_request("GetLayout", [], handle=sheet_list_handle)
if "qLayout" not in layout_result or "qAppObjectList" not in layout_result["qLayout"]:
logger.warning(f"No sheet list in layout: {layout_result}")
return []
sheets = layout_result["qLayout"]["qAppObjectList"]["qItems"]
logger.info(f"Found {len(sheets)} sheets")
return sheets
except Exception as e:
logger.error(f"get_sheets exception: {str(e)}")
return []
def get_sheet_objects(self, app_handle: int, sheet_id: str) -> List[Dict[str, Any]]:
"""Get objects on a specific sheet."""
try:
# First get the sheet object
sheet_params = {"qId": sheet_id}
sheet_result = self.send_request(
"GetObject", sheet_params, handle=app_handle
)
if not sheet_result or "qReturn" not in sheet_result:
return {"error": "Could not get sheet object", "sheet_id": sheet_id}
sheet_handle = sheet_result["qReturn"]["qHandle"]
# Get sheet layout to find child objects
layout_result = self.send_request("GetLayout", {}, handle=sheet_handle)
if not layout_result or "qLayout" not in layout_result:
return {"error": "Could not get sheet layout", "sheet_id": sheet_id}
# Extract child objects from layout
layout = layout_result["qLayout"]
child_objects = []
# Look for cells or children in the layout
if "qChildList" in layout:
child_objects = layout["qChildList"]["qItems"]
elif "cells" in layout:
child_objects = layout["cells"]
elif "qChildren" in layout:
child_objects = layout["qChildren"]
return child_objects
except Exception as e:
return {
"error": str(e),
"details": f"Error getting objects for sheet {sheet_id}",
}
def get_sheets_with_objects(self, app_id: str) -> Dict[str, Any]:
"""Get sheets and their objects with detailed field usage analysis."""
try:
self.connect()
# Open the app
app_result = self.open_doc(app_id, no_data=False)
if "qReturn" not in app_result or "qHandle" not in app_result["qReturn"]:
return {"error": "Failed to open app", "response": app_result}
app_handle = app_result["qReturn"]["qHandle"]
# Get sheets using correct API sequence
sheets = self.get_sheets(app_handle)
logger.debug(f"get_sheets returned {len(sheets)} sheets")
if not sheets:
return {
"sheets": [],
"total_sheets": 0,
"field_usage": {},
"debug_info": {
"sheets_from_api": 0,
"error_reason": "get_sheets returned empty list"
}
}
detailed_sheets = []
field_usage_map = {}
for sheet in sheets:
if not isinstance(sheet, dict) or "qInfo" not in sheet:
continue
sheet_id = sheet["qInfo"]["qId"]
sheet_title = sheet.get("qMeta", {}).get("title", "")
logger.info(f"Processing sheet {sheet_id}: {sheet_title}")
sheet_objects = self._get_sheet_objects_detailed(app_handle, sheet_id)
for obj in sheet_objects:
if isinstance(obj, dict) and "fields_used" in obj:
for field_name in obj["fields_used"]:
if field_name not in field_usage_map:
field_usage_map[field_name] = {"objects": [], "sheets": []}
field_usage_map[field_name]["objects"].append({
"object_id": obj.get("object_id", ""),
"object_type": obj.get("object_type", ""),
"object_title": obj.get("object_title", ""),
"sheet_id": sheet_id,
"sheet_title": sheet_title
})
sheet_already_added = any(
s["sheet_id"] == sheet_id
for s in field_usage_map[field_name]["sheets"]
)
if not sheet_already_added:
field_usage_map[field_name]["sheets"].append({
"sheet_id": sheet_id,
"sheet_title": sheet_title
})
sheet_info = {
"sheet_info": sheet,
"objects": sheet_objects,
"objects_count": len(sheet_objects)
}
detailed_sheets.append(sheet_info)
return {
"sheets": detailed_sheets,
"total_sheets": len(detailed_sheets),
"field_usage": field_usage_map,
"debug_info": {
"sheets_from_api": len(sheets),
"processed_sheets": len(detailed_sheets),
"fields_with_usage": len([k for k, v in field_usage_map.items() if v["objects"]])
}
}
except Exception as e:
return {
"error": str(e),
"details": "Error in get_sheets_with_objects method",
}
def _get_sheet_objects_detailed(self, app_handle: int, sheet_id: str) -> List[Dict[str, Any]]:
"""Get detailed information about objects on a sheet."""
try:
sheet_result = self.send_request("GetObject", {"qId": sheet_id}, handle=app_handle)
if "qReturn" not in sheet_result or "qHandle" not in sheet_result["qReturn"]:
logger.warning(f"Failed to get sheet object {sheet_id}: {sheet_result}")
return []
sheet_handle = sheet_result["qReturn"]["qHandle"]
sheet_layout = self.send_request("GetLayout", [], handle=sheet_handle)
if "qLayout" not in sheet_layout or "qChildList" not in sheet_layout["qLayout"]:
logger.warning(f"No child objects in sheet {sheet_id}")
return []
child_objects = sheet_layout["qLayout"]["qChildList"]["qItems"]
detailed_objects = []
for child_obj in child_objects:
obj_id = child_obj.get("qInfo", {}).get("qId", "")
obj_type = child_obj.get("qInfo", {}).get("qType", "")
if not obj_id:
continue
try:
obj_result = self.send_request("GetObject", {"qId": obj_id}, handle=app_handle)
if "qReturn" not in obj_result or "qHandle" not in obj_result["qReturn"]:
continue
obj_handle = obj_result["qReturn"]["qHandle"]
obj_layout = self.send_request("GetLayout", [], handle=obj_handle)
if "qLayout" not in obj_layout:
continue
fields_used = self._extract_fields_from_object(obj_layout["qLayout"])
detailed_obj = {
"object_id": obj_id,
"object_type": obj_type,
"object_title": obj_layout["qLayout"].get("title", ""),
"object_subtitle": obj_layout["qLayout"].get("subtitle", ""),
"fields_used": fields_used,
"basic_info": child_obj,
"detailed_layout": obj_layout["qLayout"]
}
detailed_objects.append(detailed_obj)
logger.info(f"Processed object {obj_id} ({obj_type}) with {len(fields_used)} fields")
except Exception as obj_error:
logger.warning(f"Error processing object {obj_id}: {obj_error}")
continue
return detailed_objects
except Exception as e:
logger.error(f"_get_sheet_objects_detailed error: {str(e)}")
return []
def _extract_fields_from_object(self, obj_layout: Dict[str, Any]) -> List[str]:
"""Extract field names used in an object layout."""
fields = set()
try:
if "qHyperCube" in obj_layout:
hypercube = obj_layout["qHyperCube"]
for dim_info in hypercube.get("qDimensionInfo", []):
field_defs = dim_info.get("qGroupFieldDefs", [])
for field_def in field_defs:
field_name = self._extract_field_name_from_expression(field_def)
if field_name:
fields.add(field_name)
for measure_info in hypercube.get("qMeasureInfo", []):
measure_def = measure_info.get("qDef", "")
extracted_fields = self._extract_fields_from_expression(measure_def)
fields.update(extracted_fields)
if "qListObject" in obj_layout:
list_obj = obj_layout["qListObject"]
for dim_info in list_obj.get("qDimensionInfo", []):
field_defs = dim_info.get("qGroupFieldDefs", [])
for field_def in field_defs:
field_name = self._extract_field_name_from_expression(field_def)
if field_name:
fields.add(field_name)
if "qChildList" in obj_layout:
for child in obj_layout["qChildList"].get("qItems", []):
pass
except Exception as e:
logger.warning(f"Error extracting fields from object: {e}")
return list(fields)
def _extract_field_name_from_expression(self, expression: str) -> Optional[str]:
"""Extract field name from a simple field expression."""
if not expression:
return None
expression = expression.strip()
if expression.startswith('[') and expression.endswith(']') and expression.count('[') == 1:
return expression[1:-1]
if ' ' not in expression and '(' not in expression and not any(op in expression for op in ['=', '+', '-', '*', '/']):
return expression
return None
def _extract_fields_from_expression(self, expression: str) -> List[str]:
"""Extract field names from a complex expression."""
import re
fields = []
if not expression:
return fields
bracket_fields = re.findall(r'\[([^\]]+)\]', expression)
fields.extend(bracket_fields)
return list(set(fields))
def get_fields(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get app fields using GetTablesAndKeys method."""
try:
# Use correct GetTablesAndKeys method as in qsea.py
result = self.send_request(
"GetTablesAndKeys",
[
{"qcx": 1000, "qcy": 1000}, # Max dimensions
{"qcx": 0, "qcy": 0}, # Min dimensions
30, # Max tables
True, # Include system tables
False, # Include hidden fields
],
handle=app_handle,
)
fields_info = []
if "qtr" in result:
for table in result["qtr"]:
table_name = table.get("qName", "Unknown")
if "qFields" in table:
for field in table["qFields"]:
field_info = {
"field_name": field.get("qName", ""),
"table_name": table_name,
"data_type": field.get("qType", ""),
"is_key": field.get("qIsKey", False),
"is_system": field.get("qIsSystem", False),
"is_hidden": field.get("qIsHidden", False),
"is_semantic": field.get("qIsSemantic", False),
"distinct_values": field.get(
"qnTotalDistinctValues", 0
),
"present_distinct_values": field.get(
"qnPresentDistinctValues", 0
),
"rows_count": field.get("qnRows", 0),
"subset_ratio": field.get("qSubsetRatio", 0),
"key_type": field.get("qKeyType", ""),
"tags": field.get("qTags", []),
}
fields_info.append(field_info)
return {
"fields": fields_info,
"tables_count": len(result.get("qtr", [])),
"total_fields": len(fields_info),
}
except Exception as e:
return {"error": str(e), "details": "Error in get_fields method"}
def get_tables(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get app tables."""
result = self.send_request("GetTablesList", handle=app_handle)
return result.get("qtr", [])
def create_session_object(
self, app_handle: int, obj_def: Dict[str, Any]
) -> Dict[str, Any]:
"""Create session object."""
return self.send_request(
"CreateSessionObject", {"qProp": obj_def}, handle=app_handle
)
def get_object(self, app_handle: int, object_id: str) -> Dict[str, Any]:
"""Get object by ID."""
return self.send_request("GetObject", {"qId": object_id}, handle=app_handle)
def evaluate_expression(self, app_handle: int, expression: str) -> Any:
"""Evaluate expression."""
result = self.send_request(
"Evaluate", {"qExpression": expression}, handle=app_handle
)
return result.get("qReturn", {})
def select_in_field(
self, app_handle: int, field_name: str, values: List[str], toggle: bool = False
) -> bool:
"""Select values in field."""
params = {"qFieldName": field_name, "qValues": values, "qToggleMode": toggle}
result = self.send_request("SelectInField", params, handle=app_handle)
return result.get("qReturn", False)
def clear_selections(self, app_handle: int, locked_also: bool = False) -> bool:
"""Clear all selections."""
params = {"qLockedAlso": locked_also}
result = self.send_request("ClearAll", params, handle=app_handle)
return result.get("qReturn", False)
def get_current_selections(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get current selections."""
result = self.send_request("GetCurrentSelections", handle=app_handle)
return result.get("qSelections", [])
def get_data_model(self, app_handle: int) -> Dict[str, Any]:
"""Get complete data model with tables and associations."""
try:
# Use GetAllInfos to get basic structure information
all_infos = self.send_request("GetAllInfos", [], handle=app_handle)
# Analyze the objects to understand data structure
sheets = []
visualizations = []
measures = []
dimensions = []
for info in all_infos.get("qInfos", []):
obj_type = info.get("qType", "")
obj_id = info.get("qId", "")
if obj_type == "sheet":
sheets.append({"id": obj_id, "type": obj_type})
elif obj_type in [
"table",
"barchart",
"linechart",
"piechart",
"combochart",
"kpi",
"listbox",
]:
visualizations.append({"id": obj_id, "type": obj_type})
elif obj_type == "measure":
measures.append({"id": obj_id, "type": obj_type})
elif obj_type == "dimension":
dimensions.append({"id": obj_id, "type": obj_type})
return {
"app_structure": {
"total_objects": len(all_infos.get("qInfos", [])),
"sheets": sheets,
"visualizations": visualizations,
"measures": measures,
"dimensions": dimensions,
},
"raw_info": all_infos,
}
except Exception as e:
return {"error": str(e)}
def get_field_description(self, app_handle: int, field_name: str) -> Dict[str, Any]:
"""Get detailed field information including values."""
# Use correct structure as in pyqlikengine
params = [{"qFieldName": field_name, "qStateName": "$"}]
result = self.send_request("GetField", params, handle=app_handle)
return result
def create_hypercube(
self,
app_handle: int,
dimensions: List[Dict[str, Any]] = None,
measures: List[Dict[str, Any]] = None,
max_rows: int = 1000,
) -> Dict[str, Any]:
"""Create hypercube for data extraction with proper structure."""
try:
# Handle empty dimensions/measures
if dimensions is None:
dimensions = []
if measures is None:
measures = []
# Convert old format (list of strings) to new format (list of dicts) for backward compatibility
converted_dimensions = []
for dim in dimensions:
if isinstance(dim, str):
# Old format - just field name
converted_dimensions.append({
"field": dim,
"sort_by": {
"qSortByNumeric": 0,
"qSortByAscii": 1, # Default: ASCII ascending
"qSortByExpression": 0,
"qExpression": ""
}
})
else:
# New format - dict with field and sort options
# Set defaults if not specified
if "sort_by" not in dim:
dim["sort_by"] = {
"qSortByNumeric": 0,
"qSortByAscii": 1, # Default: ASCII ascending
"qSortByExpression": 0,
"qExpression": ""
}
converted_dimensions.append(dim)
converted_measures = []
for measure in measures:
if isinstance(measure, str):
# Old format - just expression
converted_measures.append({
"expression": measure,
"sort_by": {
"qSortByNumeric": -1 # Default: numeric descending
}
})
else:
# New format - dict with expression and sort options
# Set defaults if not specified
if "sort_by" not in measure:
measure["sort_by"] = {
"qSortByNumeric": -1 # Default: numeric descending
}
converted_measures.append(measure)
# Create correct hypercube structure
hypercube_def = {
"qDimensions": [
{
"qDef": {
"qFieldDefs": [dim["field"]],
"qSortCriterias": [
{
"qSortByState": 0,
"qSortByFrequency": 0,
"qSortByNumeric": dim["sort_by"].get("qSortByNumeric", 0),
"qSortByAscii": dim["sort_by"].get("qSortByAscii", 1),
"qSortByLoadOrder": 0,
"qSortByExpression": dim["sort_by"].get("qSortByExpression", 0),
"qExpression": {"qv": dim["sort_by"].get("qExpression", "")},
}
],
},
"qNullSuppression": False,
"qIncludeElemValue": True,
}
for dim in converted_dimensions
],
"qMeasures": [
{
"qDef": {"qDef": measure["expression"], "qLabel": measure.get("label", f"Measure_{i}")},
"qSortBy": measure["sort_by"],
}
for i, measure in enumerate(converted_measures)
],
"qInitialDataFetch": [
{
"qTop": 0,
"qLeft": 0,
"qHeight": max_rows,
"qWidth": len(converted_dimensions) + len(converted_measures),
}
],
"qSuppressZero": False,
"qSuppressMissing": False,
"qMode": "S",
"qInterColumnSortOrder": list(range(len(converted_dimensions) + len(converted_measures))),
}
obj_def = {
"qInfo": {
"qId": f"hypercube-{len(converted_dimensions)}d-{len(converted_measures)}m",
"qType": "HyperCube",
},
"qHyperCubeDef": hypercube_def,
}
result = self.send_request(
"CreateSessionObject", [obj_def], handle=app_handle
)
if "qReturn" not in result or "qHandle" not in result["qReturn"]:
return {"error": "Failed to create hypercube", "response": result}
cube_handle = result["qReturn"]["qHandle"]
# Get layout with data
layout = self.send_request("GetLayout", [], handle=cube_handle)
if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]:
return {"error": "No hypercube in layout", "layout": layout}
hypercube = layout["qLayout"]["qHyperCube"]
return {
"hypercube_handle": cube_handle,
"hypercube_data": hypercube,
"dimensions": converted_dimensions,
"measures": converted_measures,
"total_rows": hypercube.get("qSize", {}).get("qcy", 0),
"total_columns": hypercube.get("qSize", {}).get("qcx", 0),
}
except Exception as e:
return {"error": str(e), "details": "Error in create_hypercube method"}
def get_hypercube_data(
self,
hypercube_handle: int,
page_top: int = 0,
page_height: int = 1000,
page_left: int = 0,
page_width: int = 50,
) -> Dict[str, Any]:
"""Get data from existing hypercube with pagination."""
try:
# Use correct GetHyperCubeData method
params = [
{
"qPath": "/qHyperCubeDef",
"qPages": [
{
"qTop": page_top,
"qLeft": page_left,
"qHeight": page_height,
"qWidth": page_width,
}
],
}
]
result = self.send_request(
"GetHyperCubeData", params, handle=hypercube_handle
)
return result
except Exception as e:
return {"error": str(e), "details": "Error in get_hypercube_data method"}
def get_table_data(
self, app_handle: int, table_name: str = None, max_rows: int = 1000
) -> Dict[str, Any]:
"""Get data from a specific table by creating hypercube with all table fields."""
try:
if not table_name:
# Get list of available tables
fields_result = self.get_fields(app_handle)
if "error" in fields_result:
return fields_result
tables = {}
for field in fields_result.get("fields", []):
table = field.get("table_name", "Unknown")
if table not in tables:
tables[table] = []
tables[table].append(field["field_name"])
return {
"message": "Please specify table_name parameter",
"available_tables": tables,
"note": "Use one of the available table names to get data",
}
# Get fields for specified table
fields_result = self.get_fields(app_handle)
if "error" in fields_result:
return fields_result
table_fields = []
for field in fields_result.get("fields", []):
if field.get("table_name") == table_name:
table_fields.append(field["field_name"])
if not table_fields:
return {"error": f"Table '{table_name}' not found or has no fields"}
# Limit number of fields to avoid too wide tables
max_fields = 20
if len(table_fields) > max_fields:
table_fields = table_fields[:max_fields]
truncated = True
else:
truncated = False
# Create hypercube with all table fields as dimensions
hypercube_def = {
"qDimensions": [
{
"qDef": {
"qFieldDefs": [field],
"qSortCriterias": [
{
"qSortByState": 0,
"qSortByFrequency": 0,
"qSortByNumeric": 1,
"qSortByAscii": 1,
"qSortByLoadOrder": 1,
"qSortByExpression": 0,
"qExpression": {"qv": ""},
}
],
},
"qNullSuppression": False,
"qIncludeElemValue": True,
}
for field in table_fields
],
"qMeasures": [],
"qInitialDataFetch": [
{
"qTop": 0,
"qLeft": 0,
"qHeight": max_rows,
"qWidth": len(table_fields),
}
],
"qSuppressZero": False,
"qSuppressMissing": False,
"qMode": "S",
}
obj_def = {
"qInfo": {"qId": f"table-data-{table_name}", "qType": "HyperCube"},
"qHyperCubeDef": hypercube_def,
}
result = self.send_request(
"CreateSessionObject", [obj_def], handle=app_handle
)
if "qReturn" not in result or "qHandle" not in result["qReturn"]:
return {
"error": "Failed to create hypercube for table data",
"response": result,
}
cube_handle = result["qReturn"]["qHandle"]
layout = self.send_request("GetLayout", [], handle=cube_handle)
if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]:
try:
self.send_request(
"DestroySessionObject",
[f"table-data-{table_name}"],
handle=app_handle,
)
except Exception:
pass
return {"error": "No hypercube in layout", "layout": layout}
hypercube = layout["qLayout"]["qHyperCube"]
# Process data into convenient format
table_data = []
headers = table_fields
for page in hypercube.get("qDataPages", []):
for row in page.get("qMatrix", []):
row_data = {}
for i, cell in enumerate(row):
if i < len(headers):
row_data[headers[i]] = {
"text": cell.get("qText", ""),
"numeric": (
cell.get("qNum", None)
if cell.get("qNum") != "NaN"
else None
),
"is_numeric": cell.get("qIsNumeric", False),
"state": cell.get("qState", "O"),
}
table_data.append(row_data)
result_data = {
"table_name": table_name,
"headers": headers,
"data": table_data,
"total_rows": hypercube.get("qSize", {}).get("qcy", 0),
"returned_rows": len(table_data),
"total_columns": len(headers),
"truncated_fields": truncated,
"dimension_info": hypercube.get("qDimensionInfo", []),
}
# Cleanup created session object
try:
self.send_request(
"DestroySessionObject",
[f"table-data-{table_name}"],
handle=app_handle,
)
except Exception as cleanup_error:
result_data["cleanup_warning"] = str(cleanup_error)
return result_data
except Exception as e:
return {"error": str(e), "details": "Error in get_table_data method"}
def get_field_values(
self,
app_handle: int,
field_name: str,
max_values: int = 100,
include_frequency: bool = True,
) -> Dict[str, Any]:
"""Get field values with frequency information using ListObject."""
try:
# Use correct structure
list_def = {
"qInfo": {"qId": f"field-values-{field_name}", "qType": "ListObject"},
"qListObjectDef": {
"qStateName": "$",
"qLibraryId": "",
"qDef": {
"qFieldDefs": [field_name],
"qFieldLabels": [],
"qSortCriterias": [
{
"qSortByState": 0,
"qSortByFrequency": 1 if include_frequency else 0,
"qSortByNumeric": 1,
"qSortByAscii": 1,
"qSortByLoadOrder": 0,
"qSortByExpression": 0,
"qExpression": {"qv": ""},
}
],
},
"qInitialDataFetch": [
{"qTop": 0, "qLeft": 0, "qHeight": max_values, "qWidth": 1}
],
},
}
# Create session object - use correct parameter format
result = self.send_request(
"CreateSessionObject", [list_def], handle=app_handle
)
if "qReturn" not in result or "qHandle" not in result["qReturn"]:
return {"error": "Failed to create session object", "response": result}
list_handle = result["qReturn"]["qHandle"]
layout = self.send_request("GetLayout", [], handle=list_handle)
# Correct path to qListObject - it's in qLayout
if "qLayout" not in layout or "qListObject" not in layout["qLayout"]:
# Clean up object before returning error
try:
self.send_request(
"DestroySessionObject",
[f"field-values-{field_name}"],
handle=app_handle,
)
except Exception:
pass
return {"error": "No list object in layout", "layout": layout}
list_object = layout["qLayout"]["qListObject"]
values_data = []
# Process data
for page in list_object.get("qDataPages", []):
for row in page.get("qMatrix", []):
if row and len(row) > 0:
cell = row[0]
value_info = {
"value": cell.get("qText", ""),
"state": cell.get(
"qState", "O"
), # O=Optional, S=Selected, A=Alternative, X=Excluded
"numeric_value": cell.get("qNum", None),
"is_numeric": cell.get("qIsNumeric", False),
}
# Add frequency if available
if "qFrequency" in cell:
value_info["frequency"] = cell.get("qFrequency", 0)
values_data.append(value_info)
# Get general field information
field_info = {
"field_name": field_name,
"values": values_data,
"total_values": list_object.get("qSize", {}).get("qcy", 0),
"returned_count": len(values_data),
"dimension_info": list_object.get("qDimensionInfo", {}),
"debug_info": {
"list_handle": list_handle,
"data_pages_count": len(list_object.get("qDataPages", [])),
"raw_size": list_object.get("qSize", {}),
},
}
try:
self.send_request(
"DestroySessionObject",
[f"field-values-{field_name}"],
handle=app_handle,
)
except Exception as cleanup_error:
field_info["cleanup_warning"] = str(cleanup_error)
return field_info
except Exception as e:
return {"error": str(e), "details": "Error in get_field_values method"}
def get_field_statistics(self, app_handle: int, field_name: str) -> Dict[str, Any]:
"""Get comprehensive statistics for a field."""
debug_log = []
debug_log.append(f"get_field_statistics called with app_handle={app_handle}, field_name={field_name}")
try:
# Create expressions for statistics
stats_expressions = [
f"Count(DISTINCT [{field_name}])", # Unique values
f"Count([{field_name}])", # Total count
f"Count({{$<[{field_name}]={{'*'}}>}})", # Non-null count
f"Min([{field_name}])", # Minimum value
f"Max([{field_name}])", # Maximum value
f"Avg([{field_name}])", # Average value
f"Sum([{field_name}])", # Sum (if numeric)
f"Median([{field_name}])", # Median
f"Mode([{field_name}])", # Mode (most frequent)
f"Stdev([{field_name}])", # Standard deviation
]
debug_log.append(f"Created {len(stats_expressions)} expressions: {stats_expressions}")
# Create hypercube for statistics calculation
hypercube_def = {
"qDimensions": [],
"qMeasures": [
{"qDef": {"qDef": expr, "qLabel": f"Stat_{i}"}}
for i, expr in enumerate(stats_expressions)
],
"qInitialDataFetch": [
{
"qTop": 0,
"qLeft": 0,
"qHeight": 1,
"qWidth": len(stats_expressions),
}
],
"qSuppressZero": False,
"qSuppressMissing": False,
}
obj_def = {
"qInfo": {"qId": f"field-stats-{field_name}", "qType": "HyperCube"},
"qHyperCubeDef": hypercube_def,
}
# Create session object
debug_log.append(f"Creating session object with obj_def: {obj_def}")
result = self.send_request(
"CreateSessionObject", [obj_def], handle=app_handle
)
debug_log.append(f"CreateSessionObject result: {result}")
if "qReturn" not in result or "qHandle" not in result["qReturn"]:
debug_log.append(f"Failed to create session object, returning error")
return {
"error": "Failed to create statistics hypercube",
"response": result,
"debug_log": debug_log
}
cube_handle = result["qReturn"]["qHandle"]
# Get layout with data
layout = self.send_request("GetLayout", [], handle=cube_handle)
if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]:
try:
self.send_request(
"DestroySessionObject",
[f"field-stats-{field_name}"],
handle=app_handle,
)
except Exception:
pass
return {"error": "No hypercube in statistics layout", "layout": layout, "debug_log": debug_log}
hypercube = layout["qLayout"]["qHyperCube"]
# Extract statistics values
stats_labels = [
"unique_values",
"total_count",
"non_null_count",
"min_value",
"max_value",
"avg_value",
"sum_value",
"median_value",
"mode_value",
"std_deviation",
]
statistics = {"field_name": field_name}
for page in hypercube.get("qDataPages", []):
for row in page.get("qMatrix", []):
for i, cell in enumerate(row):
if i < len(stats_labels):
stat_name = stats_labels[i]
statistics[stat_name] = {
"text": cell.get("qText", ""),
"numeric": (
cell.get("qNum", None)
if cell.get("qNum") != "NaN"
else None
),
"is_numeric": cell.get("qIsNumeric", False),
}
# Calculate additional derived statistics
debug_log.append(f"Statistics before calculation: {statistics}")
if "total_count" in statistics and "non_null_count" in statistics:
# Handle None values safely
total_dict = statistics["total_count"]
non_null_dict = statistics["non_null_count"]
debug_log.append(f"total_dict: {total_dict}")
debug_log.append(f"non_null_dict: {non_null_dict}")
total = total_dict.get("numeric", 0) if total_dict.get("numeric") is not None else 0
non_null = non_null_dict.get("numeric", 0) if non_null_dict.get("numeric") is not None else 0
debug_log.append(f"total: {total} (type: {type(total)})")
debug_log.append(f"non_null: {non_null} (type: {type(non_null)})")
if total > 0:
debug_log.append(f"Calculating percentages...")
debug_log.append(f"Calculation: ({total} - {non_null}) / {total} * 100")
statistics["null_percentage"] = round(
(total - non_null) / total * 100, 2
)
statistics["completeness_percentage"] = round(
non_null / total * 100, 2
)
debug_log.append(f"Percentages calculated successfully")
# Cleanup
try:
self.send_request(
"DestroySessionObject",
[f"field-stats-{field_name}"],
handle=app_handle,
)
except Exception as cleanup_error:
statistics["cleanup_warning"] = str(cleanup_error)
statistics["debug_log"] = debug_log
return statistics
except Exception as e:
import traceback
debug_log.append(f"Exception in get_field_statistics: {e}")
debug_log.append(f"Traceback: {traceback.format_exc()}")
return {
"error": str(e),
"details": "Error in get_field_statistics method",
"traceback": traceback.format_exc(),
"debug_log": debug_log
}
def get_object_data(self, app_handle: int, object_id: str) -> Dict[str, Any]:
"""Get data from existing visualization object."""
obj_result = self.send_request(
"GetObject", {"qId": object_id}, handle=app_handle
)
obj_handle = obj_result.get("qReturn", {}).get("qHandle", -1)
if obj_handle != -1:
layout = self.send_request("GetLayout", handle=obj_handle)
return layout
return {}
def export_data_to_csv(
self, app_handle: int, object_id: str, file_path: str = "/tmp/export.csv"
) -> Dict[str, Any]:
"""Export object data to CSV."""
params = {
"qObjectId": object_id,
"qPath": file_path,
"qExportState": "A", # All data
}
result = self.send_request("ExportData", params, handle=app_handle)
return result
def search_objects(
self, app_handle: int, search_terms: List[str], object_types: List[str] = None
) -> List[Dict[str, Any]]:
"""Search for objects by terms."""
params = {
"qOptions": {"qSearchFields": ["*"], "qContext": "LockedFieldsOnly"},
"qTerms": search_terms,
"qPage": {"qOffset": 0, "qCount": 100, "qMaxNbrFieldMatches": 5},
}
if object_types:
params["qOptions"]["qTypes"] = object_types
result = self.send_request("SearchObjects", params, handle=app_handle)
return result.get("qResult", {}).get("qSearchTerms", [])
def get_field_and_variable_list(self, app_handle: int) -> Dict[str, Any]:
"""Get comprehensive list of fields and variables."""
result = self.send_request("GetFieldAndVariableList", {}, handle=app_handle)
return result
def get_measures(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get master measures."""
result = self.send_request("GetMeasureList", handle=app_handle)
return result.get("qMeasureList", {}).get("qItems", [])
def get_dimensions(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get master dimensions."""
result = self.send_request("GetDimensionList", handle=app_handle)
return result.get("qDimensionList", {}).get("qItems", [])
def get_variables(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get variables."""
result = self.send_request("GetVariableList", handle=app_handle)
return result.get("qVariableList", {}).get("qItems", [])
def create_list_object(
self, app_handle: int, field_name: str, sort_by_frequency: bool = True
) -> Dict[str, Any]:
"""Create optimized list object for field analysis."""
list_def = {
"qInfo": {"qType": "ListObject"},
"qListObjectDef": {
"qDef": {
"qFieldDefs": [field_name],
"qSortCriterias": [
{
"qSortByFrequency": 1 if sort_by_frequency else 0,
"qSortByNumeric": 1,
"qSortByAscii": 1,
}
],
},
"qInitialDataFetch": [
{"qTop": 0, "qLeft": 0, "qHeight": 100, "qWidth": 1}
],
},
}
result = self.send_request(
"CreateSessionObject", {"qProp": list_def}, handle=app_handle
)
return result
def get_pivot_table_data(
self,
app_handle: int,
dimensions: List[str],
measures: List[str],
max_rows: int = 1000,
) -> Dict[str, Any]:
"""Create pivot table for complex data analysis."""
pivot_def = {
"qInfo": {"qType": "PivotTable"},
"qHyperCubeDef": {
"qDimensions": [
{"qDef": {"qFieldDefs": [dim]}, "qNullSuppression": True}
for dim in dimensions
],
"qMeasures": [
{"qDef": {"qDef": measure}, "qSortBy": {"qSortByNumeric": -1}}
for measure in measures
],
"qInitialDataFetch": [
{
"qTop": 0,
"qLeft": 0,
"qHeight": max_rows,
"qWidth": len(converted_dimensions) + len(converted_measures),
}
],
"qSuppressZero": True,
"qSuppressMissing": True,
},
}
result = self.send_request(
"CreateSessionObject", {"qProp": pivot_def}, handle=app_handle
)
return result
def calculate_expression(
self, app_handle: int, expression: str, dimensions: List[str] = None
) -> Dict[str, Any]:
"""Calculate expression with optional grouping by dimensions."""
if dimensions:
# Create hypercube for grouped calculation
hypercube_def = {
"qDimensions": [{"qDef": {"qFieldDefs": [dim]}} for dim in dimensions],
"qMeasures": [{"qDef": {"qDef": expression}}],
"qInitialDataFetch": [
{
"qTop": 0,
"qLeft": 0,
"qHeight": 1000,
"qWidth": len(dimensions) + 1,
}
],
}
obj_def = {
"qInfo": {"qType": "calculation"},
"qHyperCubeDef": hypercube_def,
}
result = self.send_request(
"CreateSessionObject", {"qProp": obj_def}, handle=app_handle
)
return result
else:
# Simple expression evaluation
return self.evaluate_expression(app_handle, expression)
def get_bookmarks(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get bookmarks (saved selections)."""
result = self.send_request("GetBookmarkList", handle=app_handle)
return result.get("qBookmarkList", {}).get("qItems", [])
def apply_bookmark(self, app_handle: int, bookmark_id: str) -> bool:
"""Apply bookmark selections."""
result = self.send_request(
"ApplyBookmark", {"qBookmarkId": bookmark_id}, handle=app_handle
)
return result.get("qReturn", False)
def get_locale_info(self, app_handle: int) -> Dict[str, Any]:
"""Get locale information for proper number/date formatting."""
result = self.send_request("GetLocaleInfo", handle=app_handle)
return result
def search_suggest(
self, app_handle: int, search_terms: List[str], object_types: List[str] = None
) -> List[Dict[str, Any]]:
"""Get search suggestions for better field/value discovery."""
params = {
"qSuggestions": {
"qSuggestionTypes": (
["Field", "Value", "Object"] if not object_types else object_types
)
},
"qTerms": search_terms,
}
result = self.send_request("SearchSuggest", params, handle=app_handle)
return result.get("qResult", {}).get("qSuggestions", [])
def create_data_export(
self,
app_handle: int,
table_name: str = None,
fields: List[str] = None,
format_type: str = "json",
max_rows: int = 10000,
filters: Dict[str, Any] = None,
) -> Dict[str, Any]:
"""Create data export in various formats (JSON, CSV-like structure)."""
try:
# If no specific fields provided, get all fields from table
if not fields:
if table_name:
fields_result = self.get_fields(app_handle)
if "error" in fields_result:
return fields_result
table_fields = []
for field in fields_result.get("fields", []):
if field.get("table_name") == table_name:
table_fields.append(field["field_name"])
if not table_fields:
return {"error": f"No fields found for table '{table_name}'"}
fields = table_fields[:50] # Limit to 50 fields max
else:
return {
"error": "Either table_name or fields list must be provided"
}
# Create hypercube for data extraction
hypercube_def = {
"qDimensions": [
{
"qDef": {
"qFieldDefs": [field],
"qSortCriterias": [
{
"qSortByState": 0,
"qSortByFrequency": 0,
"qSortByNumeric": 1,
"qSortByAscii": 1,
"qSortByLoadOrder": 1,
"qSortByExpression": 0,
"qExpression": {"qv": ""},
}
],
},
"qNullSuppression": False,
"qIncludeElemValue": True,
}
for field in fields
],
"qMeasures": [],
"qInitialDataFetch": [
{"qTop": 0, "qLeft": 0, "qHeight": max_rows, "qWidth": len(fields)}
],
"qSuppressZero": False,
"qSuppressMissing": False,
"qMode": "S",
}
# Apply filters if provided
if filters:
# Add selection expressions as calculated dimensions
for field_name, filter_values in filters.items():
if isinstance(filter_values, list):
values_str = ", ".join([f"'{v}'" for v in filter_values])
filter_expr = f"If(Match([{field_name}], {values_str}), [{field_name}], Null())"
else:
filter_expr = f"If([{field_name}] = '{filter_values}', [{field_name}], Null())"
# Replace the original field with filtered version
for dim in hypercube_def["qDimensions"]:
if dim["qDef"]["qFieldDefs"][0] == field_name:
dim["qDef"]["qFieldDefs"] = [filter_expr]
break
obj_def = {
"qInfo": {
"qId": f"data-export-{table_name or 'custom'}",
"qType": "HyperCube",
},
"qHyperCubeDef": hypercube_def,
}
# Create session object
result = self.send_request(
"CreateSessionObject", [obj_def], handle=app_handle
)
if "qReturn" not in result or "qHandle" not in result["qReturn"]:
return {
"error": "Failed to create export hypercube",
"response": result,
}
cube_handle = result["qReturn"]["qHandle"]
# Get layout with data
layout = self.send_request("GetLayout", [], handle=cube_handle)
if "qLayout" not in layout or "qHyperCube" not in layout["qLayout"]:
try:
self.send_request(
"DestroySessionObject",
[f"data-export-{table_name or 'custom'}"],
handle=app_handle,
)
except Exception:
pass
return {"error": "No hypercube in export layout", "layout": layout}
hypercube = layout["qLayout"]["qHyperCube"]
# Process data based on format
export_data = []
headers = fields
for page in hypercube.get("qDataPages", []):
for row in page.get("qMatrix", []):
if format_type.lower() == "json":
row_data = {}
for i, cell in enumerate(row):
if i < len(headers):
row_data[headers[i]] = {
"text": cell.get("qText", ""),
"numeric": (
cell.get("qNum", None)
if cell.get("qNum") != "NaN"
else None
),
"is_numeric": cell.get("qIsNumeric", False),
}
export_data.append(row_data)
elif format_type.lower() == "csv":
# CSV-like structure (list of values)
row_values = []
for cell in row:
row_values.append(cell.get("qText", ""))
export_data.append(row_values)
elif format_type.lower() == "simple":
# Simple key-value structure
row_data = {}
for i, cell in enumerate(row):
if i < len(headers):
row_data[headers[i]] = cell.get("qText", "")
export_data.append(row_data)
result_data = {
"export_format": format_type,
"table_name": table_name,
"fields": headers,
"data": export_data,
"metadata": {
"total_rows": hypercube.get("qSize", {}).get("qcy", 0),
"exported_rows": len(export_data),
"total_columns": len(headers),
"filters_applied": filters is not None,
"export_timestamp": None, # Could be added with datetime.now() if needed
"dimension_info": hypercube.get("qDimensionInfo", []),
},
}
# Add CSV headers if CSV format
if format_type.lower() == "csv":
result_data["csv_headers"] = headers
# Cleanup
try:
self.send_request(
"DestroySessionObject",
[f"data-export-{table_name or 'custom'}"],
handle=app_handle,
)
except Exception as cleanup_error:
result_data["cleanup_warning"] = str(cleanup_error)
return result_data
except Exception as e:
return {"error": str(e), "details": "Error in create_data_export method"}
def get_visualization_data(self, app_handle: int, object_id: str) -> Dict[str, Any]:
"""Get data from existing visualization object (chart, table, etc.)."""
try:
obj_result = self.send_request("GetObject", [object_id], handle=app_handle)
if "qReturn" not in obj_result or "qHandle" not in obj_result["qReturn"]:
return {"error": f"Failed to get object with ID: {object_id}", "response": obj_result}
obj_handle = obj_result["qReturn"]["qHandle"]
layout = self.send_request("GetLayout", [], handle=obj_handle)
if "qLayout" not in layout:
return {"error": "No layout found for object", "layout": layout}
obj_layout = layout["qLayout"]
obj_info = obj_layout.get("qInfo", {})
obj_type = obj_info.get("qType", "unknown")
result = {
"object_id": object_id,
"object_type": obj_type,
"object_title": obj_layout.get("qMeta", {}).get("title", ""),
"data": None,
"structure": None,
}
if "qHyperCube" in obj_layout:
hypercube = obj_layout["qHyperCube"]
table_data = []
dimensions = []
measures = []
for dim_info in hypercube.get("qDimensionInfo", []):
dimensions.append({
"title": dim_info.get("qFallbackTitle", ""),
"field": (dim_info.get("qGroupFieldDefs", [""])[0] if dim_info.get("qGroupFieldDefs") else ""),
"cardinal": dim_info.get("qCardinal", 0),
})
for measure_info in hypercube.get("qMeasureInfo", []):
measures.append({
"title": measure_info.get("qFallbackTitle", ""),
"expression": measure_info.get("qDef", ""),
"format": measure_info.get("qNumFormat", {}),
})
for page in hypercube.get("qDataPages", []):
for row in page.get("qMatrix", []):
row_data = {}
for i, cell in enumerate(row[: len(dimensions)]):
if i < len(dimensions):
row_data[f"dim_{i}_{dimensions[i]['title']}"] = {
"text": cell.get("qText", ""),
"numeric": (cell.get("qNum", None) if cell.get("qNum") != "NaN" else None),
"state": cell.get("qState", "O"),
}
for i, cell in enumerate(row[len(dimensions) :]):
if i < len(measures):
row_data[f"measure_{i}_{measures[i]['title']}"] = {
"text": cell.get("qText", ""),
"numeric": (cell.get("qNum", None) if cell.get("qNum") != "NaN" else None),
}
table_data.append(row_data)
result["data"] = table_data
result["structure"] = {
"dimensions": dimensions,
"measures": measures,
"total_rows": hypercube.get("qSize", {}).get("qcy", 0),
"total_columns": hypercube.get("qSize", {}).get("qcx", 0),
"returned_rows": len(table_data),
}
elif "qListObject" in obj_layout:
list_object = obj_layout["qListObject"]
values_data = []
for page in list_object.get("qDataPages", []):
for row in page.get("qMatrix", []):
if row and len(row) > 0:
cell = row[0]
values_data.append({
"value": cell.get("qText", ""),
"state": cell.get("qState", "O"),
"frequency": cell.get("qFrequency", 0),
})
result["data"] = values_data
result["structure"] = {
"field_name": list_object.get("qDimensionInfo", {}).get("qFallbackTitle", ""),
"total_values": list_object.get("qSize", {}).get("qcy", 0),
"returned_values": len(values_data),
}
elif "qPivotTable" in obj_layout:
pivot_table = obj_layout["qPivotTable"]
result["data"] = pivot_table.get("qDataPages", [])
result["structure"] = {"type": "pivot_table", "size": pivot_table.get("qSize", {})}
else:
result["data"] = obj_layout
result["structure"] = {"type": "unknown", "raw_layout": True}
return result
except Exception as e:
return {"error": str(e), "details": "Error in get_visualization_data method"}
def get_detailed_app_metadata(self, app_id: str) -> Dict[str, Any]:
"""Get detailed app metadata similar to /api/v1/apps/{app_id}/data/metadata endpoint."""
try:
self.connect()
# Open the app
app_result = self.open_doc(app_id, no_data=False)
if "qReturn" not in app_result or "qHandle" not in app_result["qReturn"]:
return {"error": "Failed to open app", "response": app_result}
app_handle = app_result["qReturn"]["qHandle"]
# Get app layout and properties using correct methods
try:
layout = self.send_request("GetAppLayout", [], handle=app_handle)
except Exception:
layout = {}
try:
properties = self.send_request(
"GetAppProperties", [], handle=app_handle
)
except Exception:
properties = {}
# Get fields information
fields_result = self.get_fields(app_handle)
# Get tables information using GetTablesAndKeys
tables_result = self.send_request(
"GetTablesAndKeys",
[
{"qcx": 1000, "qcy": 1000}, # Max dimensions
{"qcx": 0, "qcy": 0}, # Min dimensions
30, # Max tables
True, # Include system tables
False, # Include hidden fields
],
handle=app_handle,
)
# Process fields data
fields_metadata = []
if "fields" in fields_result:
for field in fields_result["fields"]:
field_meta = {
"name": field.get("field_name", ""),
"src_tables": [field.get("table_name", "")],
"is_system": field.get("is_system", False),
"is_hidden": field.get("is_hidden", False),
"is_semantic": field.get("is_semantic", False),
"distinct_only": False,
"cardinal": field.get("distinct_values", 0),
"total_count": field.get("rows_count", 0),
"is_locked": False,
"always_one_selected": False,
"is_numeric": "numeric" in field.get("tags", []),
"comment": "",
"tags": field.get("tags", []),
"byte_size": 0, # Not available via Engine API
"hash": "", # Not available via Engine API
}
fields_metadata.append(field_meta)
# Process tables data
tables_metadata = []
if "qtr" in tables_result:
for table in tables_result["qtr"]:
table_meta = {
"name": table.get("qName", ""),
"is_system": table.get("qIsSystem", False),
"is_semantic": table.get("qIsSemantic", False),
"is_loose": table.get("qIsLoose", False),
"no_of_rows": table.get("qNoOfRows", 0),
"no_of_fields": len(table.get("qFields", [])),
"no_of_key_fields": len(
[
f
for f in table.get("qFields", [])
if f.get("qIsKey", False)
]
),
"comment": table.get("qComment", ""),
"byte_size": 0, # Not available via Engine API
}
tables_metadata.append(table_meta)
# Get reload metadata if available
reload_meta = {
"cpu_time_spent_ms": 0, # Not available via Engine API
"hardware": {"logical_cores": 0, "total_memory": 0},
"peak_memory_bytes": 0,
"fullReloadPeakMemoryBytes": 0,
"partialReloadPeakMemoryBytes": 0,
}
# Calculate static byte size approximation
static_byte_size = sum(
table.get("byte_size", 0) for table in tables_metadata
)
# Build response similar to the expected format
metadata = {
"reload_meta": reload_meta,
"static_byte_size": static_byte_size,
"fields": fields_metadata,
"tables": tables_metadata,
"has_section_access": False, # Would need to check script for this
"tables_profiling_data": [],
"is_direct_query_mode": False,
"usage": "ANALYTICS",
"source": "engine_api",
"app_layout": layout,
"app_properties": properties,
}
return metadata
except Exception as e:
return {"error": str(e), "details": "Error in get_detailed_app_metadata"}
finally:
self.disconnect()
def get_app_details(self, app_id: str) -> Dict[str, Any]:
"""
Get comprehensive information about application for initial analysis.
Fast overview including:
- App metadata (size, dates, reload status)
- Data model structure (tables, fields, types, cardinality)
- Master items (only user-created measures and dimensions)
- Variables (only user-created)
- Object counts by type
- Table relationships (key fields)
Returns optimized JSON report for quick app understanding.
"""
try:
self.connect()
# Open app once and reuse connection
app_result = self.open_doc(app_id, no_data=False)
if "qReturn" not in app_result or "qHandle" not in app_result["qReturn"]:
return {"error": "Failed to open app", "response": app_result}
app_handle = app_result["qReturn"]["qHandle"]
# Get app metadata and layout
app_metadata = self._get_app_metadata_fast(app_handle)
# Get data model structure
data_model = self._get_data_model_structure(app_handle)
# Get master items (only user-created)
master_items = self._get_user_master_items(app_handle)
# Get user variables (exclude system)
user_variables = self._get_user_variables(app_handle)
# Get object counts by type
object_counts = self._get_object_counts(app_handle)
# Get table relationships
table_relationships = self._get_table_relationships(app_handle)
# Build optimized response
report = {
"app_metadata": {
"app_id": app_id,
"name": app_metadata.get("title", ""),
"description": app_metadata.get("description", ""),
"filename": app_metadata.get("filename", ""),
"size_bytes": app_metadata.get("size", 0),
"size_mb": round(app_metadata.get("size", 0) / (1024 * 1024), 2),
"created_date": app_metadata.get("created_date", ""),
"modified_date": app_metadata.get("modified_date", ""),
"last_reload_time": app_metadata.get("last_reload_time", ""),
"has_script": app_metadata.get("has_script", False),
"has_data": app_metadata.get("has_data", False),
"is_published": app_metadata.get("published", False)
},
"data_model": {
"tables": data_model.get("tables", []),
"total_tables": len(data_model.get("tables", [])),
"total_fields": sum(len(table.get("fields", [])) for table in data_model.get("tables", [])),
"table_relationships": table_relationships
},
"master_items": {
"measures": master_items.get("measures", []),
"dimensions": master_items.get("dimensions", []),
"total_measures": len(master_items.get("measures", [])),
"total_dimensions": len(master_items.get("dimensions", []))
},
"variables": {
"user_variables": user_variables,
"total_variables": len(user_variables)
},
"object_counts": object_counts,
"reload_info": app_metadata.get("reload_info", {}),
"summary": {
"analysis_type": "quick_overview",
"analysis_timestamp": datetime.now().isoformat()
}
}
return report
except Exception as e:
return {"error": str(e), "details": "Error in get_app_details method"}
finally:
self.disconnect()
def _get_app_metadata_fast(self, app_handle: int) -> Dict[str, Any]:
"""Get basic app metadata without heavy analysis."""
try:
# Get app layout
layout_response = self.send_request("GetAppLayout", [], handle=app_handle)
layout = layout_response.get("qLayout", {})
# Get app properties
properties_response = self.send_request("GetAppProperties", [], handle=app_handle)
properties = properties_response.get("qProperties", {})
return {
"title": layout.get("qTitle", ""),
"filename": layout.get("qFileName", ""),
"description": properties.get("qMetaDef", {}).get("description", ""),
"size": layout.get("qStaticByteSize", 0),
"created_date": layout.get("createdDate", ""),
"modified_date": layout.get("modifiedDate", ""),
"last_reload_time": layout.get("qLastReloadTime", ""),
"has_script": layout.get("qHasScript", False),
"has_data": layout.get("qHasData", False),
"published": layout.get("published", False),
"reload_info": {
"last_execution_time": layout.get("qLastReloadTime", ""),
"is_partial_reload": layout.get("qIsPartialReload", False),
"has_data": layout.get("qHasData", False)
}
}
except Exception as e:
return {"error": str(e)}
def _get_data_model_structure(self, app_handle: int) -> Dict[str, Any]:
"""Get tables and fields structure without usage analysis."""
try:
# Get tables structure using GetTablesAndKeys
tables_result = self.send_request(
"GetTablesAndKeys",
[
{"qcx": 1000, "qcy": 1000}, # Max dimensions
{"qcx": 0, "qcy": 0}, # Min dimensions
50, # Max tables
False, # Include system tables
False, # Include hidden fields
],
handle=app_handle,
)
tables = []
for table in tables_result.get("qtr", []):
table_name = table.get("qName", "")
table_fields = []
for field in table.get("qFields", []):
field_info = {
"name": field.get("qName", ""),
"data_type": self._determine_data_type(field.get("qTags", [])),
"total_rows": field.get("qnRows", 0),
"distinct_values": field.get("qnTotalDistinctValues", 0),
"present_distinct_values": field.get("qnPresentDistinctValues", 0),
"completeness_pct": round(
(field.get("qnNonNulls", 0) / max(field.get("qnRows", 1), 1)) * 100, 1
),
"is_key": field.get("qIsKey", False),
"key_type": field.get("qKeyType", "")
}
table_fields.append(field_info)
table_info = {
"name": table_name,
"total_rows": table.get("qNoOfRows", 0),
"field_count": len(table_fields),
"fields": table_fields,
"is_system": table.get("qIsSystem", False),
"is_semantic": table.get("qIsSemantic", False)
}
tables.append(table_info)
return {"tables": tables}
except Exception as e:
return {"error": str(e), "tables": []}
def _determine_data_type(self, tags: List[str]) -> str:
"""Determine data type from field tags."""
if "$numeric" in tags:
if "$integer" in tags:
return "integer"
else:
return "numeric"
elif "$text" in tags:
return "text"
elif "$date" in tags:
return "date"
elif "$timestamp" in tags:
return "timestamp"
else:
return "unknown"
def _get_user_master_items(self, app_handle: int) -> Dict[str, Any]:
"""Get only user-created master items (exclude system)."""
try:
# Get master measures
measures = self._get_master_measures(app_handle)
user_measures = []
for measure in measures:
# Filter out system measures
if not measure.get("qMeta", {}).get("qIsHidden", False):
user_measures.append({
"name": measure.get("qMeta", {}).get("title", ""),
"description": measure.get("qMeta", {}).get("description", ""),
"definition": measure.get("qMeasure", {}).get("qDef", ""),
"created_date": measure.get("qMeta", {}).get("createdDate", ""),
"modified_date": measure.get("qMeta", {}).get("modifiedDate", ""),
"owner": measure.get("qMeta", {}).get("owner", {}).get("name", "")
})
# Get master dimensions
dimensions = self._get_master_dimensions(app_handle)
user_dimensions = []
for dimension in dimensions:
# Filter out system dimensions
if not dimension.get("qMeta", {}).get("qIsHidden", False):
user_dimensions.append({
"name": dimension.get("qMeta", {}).get("title", ""),
"description": dimension.get("qMeta", {}).get("description", ""),
"field_definitions": dimension.get("qDim", {}).get("qFieldDefs", []),
"created_date": dimension.get("qMeta", {}).get("createdDate", ""),
"modified_date": dimension.get("qMeta", {}).get("modifiedDate", ""),
"owner": dimension.get("qMeta", {}).get("owner", {}).get("name", "")
})
return {
"measures": user_measures,
"dimensions": user_dimensions
}
except Exception as e:
return {"error": str(e), "measures": [], "dimensions": []}
def _get_user_variables(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get only user-created variables (exclude system)."""
try:
# Create VariableList object
variable_list_def = {
"qInfo": {"qType": "VariableList"},
"qVariableListDef": {
"qType": "variable",
"qShowReserved": False, # Exclude system variables
"qShowConfig": False,
"qData": {"tags": "/tags"}
}
}
variable_list_response = self.send_request("CreateSessionObject", [variable_list_def], handle=app_handle)
if "qReturn" not in variable_list_response:
return []
variable_list_handle = variable_list_response["qReturn"]["qHandle"]
layout_response = self.send_request("GetLayout", [], handle=variable_list_handle)
variables = layout_response.get("qLayout", {}).get("qVariableList", {}).get("qItems", [])
user_variables = []
for variable in variables:
# Additional filter for user variables only
if not variable.get("qIsReserved", False) and not variable.get("qIsConfig", False):
definition = variable.get("qDefinition", "")
user_variables.append({
"name": variable.get("qName", ""),
"text_value": definition,
"is_script_created": variable.get("qIsScriptCreated", False)
})
return user_variables
except Exception as e:
return []
def _get_object_counts(self, app_handle: int) -> Dict[str, int]:
"""Get count of objects by type."""
try:
# Get all app objects
all_infos = self.send_request("GetAllInfos", [], handle=app_handle)
object_counts = {}
for info in all_infos.get("qInfos", []):
obj_type = info.get("qType", "unknown")
object_counts[obj_type] = object_counts.get(obj_type, 0) + 1
# Group similar types for better readability
grouped_counts = {
"sheets": object_counts.get("sheet", 0),
"charts": (
object_counts.get("barchart", 0) +
object_counts.get("linechart", 0) +
object_counts.get("piechart", 0) +
object_counts.get("combochart", 0) +
object_counts.get("scatterplot", 0)
),
"tables": object_counts.get("table", 0),
"kpis": object_counts.get("kpi", 0),
"filters": (
object_counts.get("listbox", 0) +
object_counts.get("filterpane", 0)
),
"text_objects": object_counts.get("text-image", 0),
"other": sum(v for k, v in object_counts.items()
if k not in ["sheet", "barchart", "linechart", "piechart",
"combochart", "scatterplot", "table", "kpi",
"listbox", "filterpane", "text-image"])
}
# Add total count
grouped_counts["total_objects"] = sum(grouped_counts.values())
return grouped_counts
except Exception as e:
return {"error": str(e)}
def _get_table_relationships(self, app_handle: int) -> List[Dict[str, Any]]:
"""Get relationships between tables based on key fields."""
try:
# Get tables with key information
tables_result = self.send_request(
"GetTablesAndKeys",
[
{"qcx": 1000, "qcy": 1000},
{"qcx": 0, "qcy": 0},
50,
False,
False
],
handle=app_handle,
)
relationships = []
tables = tables_result.get("qtr", [])
# Find relationships based on field names and key types
for i, table1 in enumerate(tables):
table1_name = table1.get("qName", "")
table1_keys = [f for f in table1.get("qFields", []) if f.get("qIsKey", False)]
for j, table2 in enumerate(tables[i+1:], i+1):
table2_name = table2.get("qName", "")
table2_keys = [f for f in table2.get("qFields", []) if f.get("qIsKey", False)]
# Find common key fields
common_keys = []
for key1 in table1_keys:
for key2 in table2_keys:
if key1.get("qName") == key2.get("qName"):
common_keys.append({
"field_name": key1.get("qName"),
"key_type": key1.get("qKeyType", "")
})
if common_keys:
relationships.append({
"table1": table1_name,
"table2": table2_name,
"relationship_type": "key_match",
"common_fields": common_keys
})
return relationships
except Exception as e:
return []