Skip to main content
Glama

get_analytics

Generate structured analytics reports in table format for performance analysis, metric comparison, and data export from Kaltura media content.

Instructions

Get detailed analytics in TABLE format for reporting. USE WHEN: Creating reports, comparing metrics, ranking content, analyzing performance, exporting data. RETURNS: Structured data with headers/rows. EXAMPLES: 'Show top 10 videos by views', 'Compare user engagement by category', 'Export monthly performance report'. Use list_analytics_capabilities to see all 60+ report types. For charts/graphs, use get_analytics_timeseries instead.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
from_dateYesStart date in YYYY-MM-DD format (e.g., '2024-01-01')
to_dateYesEnd date in YYYY-MM-DD format (e.g., '2024-01-31')
report_typeNoType of analytics report (default: 'content'). Common options: 'content' (video performance), 'user_engagement' (viewer behavior), 'geographic' (location data), 'platforms' (device/OS breakdown). Run list_analytics_capabilities for all 60+ types.
entry_idNoOptional: Filter analytics for specific video (e.g., '1_abc123'). Leave empty for all content.
user_idNoOptional user ID for user-specific reports
categoriesNoOptional category filter
dimensionNoOptional dimension for grouping (e.g., 'device', 'country')
limitNoMax results per page (default: 50)

Implementation Reference

  • Main handler function for the 'get_analytics' MCP tool. Takes parameters like dates, report_type, filters, etc., and delegates to get_analytics_enhanced from analytics_core.
    async def get_analytics( manager: KalturaClientManager, from_date: str, to_date: str, report_type: str = "content", entry_id: Optional[str] = None, user_id: Optional[str] = None, categories: Optional[str] = None, dimension: Optional[str] = None, filters: Optional[Dict[str, str]] = None, limit: int = 50, page_index: int = 1, order_by: Optional[str] = None, ) -> str: """ Get analytics data for comprehensive reporting and analysis. This is the primary analytics function that provides access to all report types in a table format suitable for detailed analysis, rankings, and comparisons. USE WHEN: - Getting performance metrics for content, users, or platform - Comparing data across multiple items - Creating rankings or leaderboards - Analyzing trends with specific breakdowns - Exporting detailed reports Args: manager: Kaltura client manager from_date: Start date (YYYY-MM-DD) to_date: End date (YYYY-MM-DD) report_type: Type of report (see available types below) entry_id: Optional media entry ID for content-specific reports user_id: Optional user ID for user-specific reports categories: Optional category filter (full category name) dimension: Optional dimension for grouping (e.g., "device", "country") filters: Optional additional filters as dict limit: Maximum results per page (default: 50) page_index: Page number for pagination (default: 1) order_by: Optional sort field Available Report Types: Content: content, content_dropoff, content_interactions, content_contributions Users: user_engagement, user_usage, unique_users, user_highlights Geographic: geographic, geographic_country, geographic_region Platform: platforms, operating_system, browsers Distribution: syndication, sources, playback_context Infrastructure: partner_usage, storage, bandwidth, cdn_bandwidth Advanced: percentiles, qoe_overview, realtime Returns: JSON with structured table data including headers, rows, and metadata Examples: # Top performing videos get_analytics(manager, from_date, to_date, report_type="content", limit=10) # User engagement by category get_analytics(manager, from_date, to_date, report_type="user_engagement", categories="Training", dimension="device") # Geographic distribution get_analytics(manager, from_date, to_date, report_type="geographic_country") """ # Use the enhanced analytics implementation from .analytics_core import get_analytics_enhanced return await get_analytics_enhanced( manager=manager, from_date=from_date, to_date=to_date, report_type=report_type, entry_id=entry_id, user_id=user_id, categories=categories, dimension=dimension, filters=filters, limit=limit, page_index=page_index, order_by=order_by, response_format="json", )
  • Input schema definition for the get_analytics tool, defining parameters like from_date, to_date, report_type, entry_id, etc., with descriptions and requirements.
    "type": "object", "properties": { "from_date": { "type": "string", "description": "Start date in YYYY-MM-DD format (e.g., '2024-01-01')", }, "to_date": { "type": "string", "description": "End date in YYYY-MM-DD format (e.g., '2024-01-31')", }, "report_type": { "type": "string", "description": "Type of analytics report (default: 'content'). Common options: 'content' (video performance), 'user_engagement' (viewer behavior), 'geographic' (location data), 'platforms' (device/OS breakdown). Run list_analytics_capabilities for all 60+ types.", }, "entry_id": { "type": "string", "description": "Optional: Filter analytics for specific video (e.g., '1_abc123'). Leave empty for all content.", }, "user_id": { "type": "string", "description": "Optional user ID for user-specific reports", }, "categories": { "type": "string", "description": "Optional category filter", }, "dimension": { "type": "string", "description": "Optional dimension for grouping (e.g., 'device', 'country')", }, "limit": { "type": "integer", "description": "Max results per page (default: 50)", }, }, "required": ["from_date", "to_date"], },
  • Registration of the get_analytics tool in the MCP server's list_tools(), including name, description, and schema.
    types.Tool( name="get_analytics", description="Get detailed analytics in TABLE format for reporting. USE WHEN: Creating reports, comparing metrics, ranking content, analyzing performance, exporting data. RETURNS: Structured data with headers/rows. EXAMPLES: 'Show top 10 videos by views', 'Compare user engagement by category', 'Export monthly performance report'. Use list_analytics_capabilities to see all 60+ report types. For charts/graphs, use get_analytics_timeseries instead.", inputSchema={ "type": "object", "properties": { "from_date": { "type": "string", "description": "Start date in YYYY-MM-DD format (e.g., '2024-01-01')", }, "to_date": { "type": "string", "description": "End date in YYYY-MM-DD format (e.g., '2024-01-31')", }, "report_type": { "type": "string", "description": "Type of analytics report (default: 'content'). Common options: 'content' (video performance), 'user_engagement' (viewer behavior), 'geographic' (location data), 'platforms' (device/OS breakdown). Run list_analytics_capabilities for all 60+ types.", }, "entry_id": { "type": "string", "description": "Optional: Filter analytics for specific video (e.g., '1_abc123'). Leave empty for all content.", }, "user_id": { "type": "string", "description": "Optional user ID for user-specific reports", }, "categories": { "type": "string", "description": "Optional category filter", }, "dimension": { "type": "string", "description": "Optional dimension for grouping (e.g., 'device', 'country')", }, "limit": { "type": "integer", "description": "Max results per page (default: 50)", }, }, "required": ["from_date", "to_date"], }, ),
  • Dispatch/execution registration in call_tool(): calls get_analytics(kaltura_manager, **arguments) when name == 'get_analytics'.
    elif name == "get_analytics": result = await get_analytics(kaltura_manager, **arguments)
  • Core helper function get_analytics_enhanced() that performs the actual Kaltura API calls for analytics reports, handles all report types, parsing, and formatting.
    async def get_analytics_enhanced( manager: KalturaClientManager, from_date: str, to_date: str, report_type: str = "content", entry_id: Optional[str] = None, user_id: Optional[str] = None, object_ids: Optional[str] = None, metrics: Optional[List[str]] = None, categories: Optional[str] = None, dimension: Optional[str] = None, interval: Optional[str] = None, filters: Optional[Dict[str, str]] = None, limit: int = 20, page_index: int = 1, order_by: Optional[str] = None, response_format: str = "json", ) -> str: """Enhanced analytics with support for all report types and advanced features. Args: manager: Kaltura client manager from_date: Start date (YYYY-MM-DD) to_date: End date (YYYY-MM-DD) report_type: Type of report (see REPORT_TYPE_MAP keys) entry_id: Optional specific entry ID user_id: Optional specific user ID object_ids: Optional comma-separated object IDs metrics: Requested metrics (for reference) categories: Category filter dimension: Dimension for grouping (e.g., "device", "country") interval: Time interval (e.g., "days", "months", "years") filters: Additional filters (customVar1In, countryIn, etc.) limit: Maximum results page_index: Page number for pagination order_by: Sort field response_format: "json", "csv", or "raw" (returns unprocessed API response) """ # Validate dates date_pattern = r"^\d{4}-\d{2}-\d{2}$" if not re.match(date_pattern, from_date) or not re.match(date_pattern, to_date): return json.dumps({"error": "Invalid date format. Use YYYY-MM-DD"}, indent=2) # Validate entry ID if provided if entry_id and not validate_entry_id(entry_id): return json.dumps({"error": "Invalid entry ID format"}, indent=2) # Get report type ID report_type_id = REPORT_TYPE_MAP.get(report_type) if not report_type_id: return json.dumps( { "error": f"Unknown report type: {report_type}", "available_types": list(REPORT_TYPE_MAP.keys()), }, indent=2, ) # Check if object IDs are required if report_type in OBJECT_ID_REQUIRED_REPORTS and not (entry_id or user_id or object_ids): return json.dumps( { "error": f"Report type '{report_type}' requires object IDs", "suggestion": "Provide entry_id, user_id, or object_ids parameter", }, indent=2, ) # If requesting raw format and imports might fail, return early with a simpler approach if response_format == "raw": try: # Try the simple approach first for raw format client = manager.get_client() # Direct API call without complex objects start_time = int(datetime.strptime(from_date, "%Y-%m-%d").timestamp()) end_time = int(datetime.strptime(to_date, "%Y-%m-%d").timestamp()) # Try to get the report directly try: # Prepare object IDs if object_ids: obj_ids = object_ids elif entry_id: obj_ids = entry_id elif user_id: obj_ids = user_id else: obj_ids = None # Try direct call with minimal parameters report_result = client.report.getTable( reportType=report_type_id, reportInputFilter={ "fromDate": start_time, "toDate": end_time, "entryIdIn": entry_id if entry_id else None, "userIds": user_id if user_id else None, "categories": categories if categories else None, }, pager={"pageSize": min(limit, 500), "pageIndex": page_index}, order=order_by, objectIds=obj_ids, ) # Return raw response return json.dumps( { "kaltura_response": { "header": getattr(report_result, "header", ""), "data": getattr(report_result, "data", ""), "totalCount": getattr(report_result, "totalCount", 0), }, "request_info": { "report_type": report_type, "report_type_id": report_type_id, "from_date": from_date, "to_date": to_date, "entry_id": entry_id, "user_id": user_id, }, }, indent=2, ) except Exception: # If direct call fails, fall through to normal processing pass except Exception: # If anything fails, continue with normal processing pass client = manager.get_client() try: from KalturaClient.Plugins.Core import ( KalturaEndUserReportInputFilter, KalturaFilterPager, KalturaReportInputFilter, KalturaReportInterval, ) # Convert dates start_time = int(datetime.strptime(from_date, "%Y-%m-%d").timestamp()) end_time = int(datetime.strptime(to_date, "%Y-%m-%d").timestamp()) # Create appropriate filter if report_type in END_USER_REPORTS: report_filter = KalturaEndUserReportInputFilter() else: report_filter = KalturaReportInputFilter() # Set date range report_filter.fromDate = start_time report_filter.toDate = end_time # Set categories if provided if categories: report_filter.categories = categories # Set interval if provided if interval: interval_map = { "days": KalturaReportInterval.DAYS, "months": KalturaReportInterval.MONTHS, "years": KalturaReportInterval.YEARS, } if interval in interval_map: report_filter.interval = interval_map[interval] # Apply additional filters if filters: for key, value in filters.items(): if hasattr(report_filter, key): setattr(report_filter, key, value) # Create pager pager = KalturaFilterPager() pager.pageSize = min(limit, 500) # Allow larger pages pager.pageIndex = page_index # Prepare object IDs if object_ids: obj_ids = object_ids elif entry_id: obj_ids = entry_id elif user_id: obj_ids = user_id else: obj_ids = None # Get the report type enum value # For numeric IDs, just use the ID directly kaltura_report_type = report_type_id # Call appropriate API method if response_format == "raw": # Get raw table data without processing report_result = client.report.getTable( reportType=kaltura_report_type, reportInputFilter=report_filter, pager=pager, order=order_by, objectIds=obj_ids, ) # Return raw Kaltura response with minimal wrapping return json.dumps( { "kaltura_response": { "header": getattr(report_result, "header", ""), "data": getattr(report_result, "data", ""), "totalCount": getattr(report_result, "totalCount", 0), }, "request_info": { "report_type": report_type, "report_type_id": report_type_id, "from_date": from_date, "to_date": to_date, "entry_id": entry_id, "user_id": user_id, }, }, indent=2, ) elif response_format == "csv": # Get CSV export URL csv_result = client.report.getUrlForReportAsCsv( reportTitle=f"{REPORT_TYPE_NAMES.get(report_type, 'Report')}_{from_date}_{to_date}", reportText=f"Report from {from_date} to {to_date}", headers=",".join(metrics) if metrics else None, reportType=kaltura_report_type, reportInputFilter=report_filter, dimension=dimension, pager=pager, order=order_by, objectIds=obj_ids, ) return json.dumps( { "format": "csv", "download_url": csv_result, "expires_in": "300 seconds", "report_type": REPORT_TYPE_NAMES.get(report_type, report_type), }, indent=2, ) else: # Get table data # Note: getTable doesn't support dimension parameter # If dimension is requested, we'll include it in metadata but cannot group by it report_result = client.report.getTable( reportType=kaltura_report_type, reportInputFilter=report_filter, pager=pager, order=order_by, objectIds=obj_ids, ) # Parse results analytics_data = { "reportType": REPORT_TYPE_NAMES.get(report_type, "Analytics Report"), "reportTypeCode": report_type, "reportTypeId": report_type_id, "dateRange": {"from": from_date, "to": to_date}, "filters": { "categories": categories, "dimension": dimension, "interval": interval, "objectIds": obj_ids, "additionalFilters": filters, }, "pagination": { "pageSize": pager.pageSize, "pageIndex": pager.pageIndex, "totalCount": getattr(report_result, "totalCount", 0), }, "headers": [], "data": [], } # Parse headers if report_result.header: analytics_data["headers"] = [h.strip() for h in report_result.header.split(",")] # Parse data with enhanced handling if report_result.data: data_rows = report_result.data.split("\n") for row in data_rows: if row.strip(): # Handle different data formats if ";" in row and report_type == "engagement_timeline": # Special handling for timeline data timeline_data = parse_timeline_data(row) analytics_data["data"].append(timeline_data) elif report_type in [ "percentiles", "video_timeline", "retention_curve", "viewer_retention", "drop_off_analysis", "replay_detection", ]: # Special handling for PERCENTILES report (ID 43) # This report uses semicolon-separated rows with pipe-separated values if "|" in row: values = row.split("|") if len(values) >= 3: row_dict = { "percentile": convert_value(values[0]), "count_viewers": convert_value(values[1]), "unique_known_users": convert_value(values[2]), } analytics_data["data"].append(row_dict) else: # Fallback to standard CSV parsing if no pipes found row_values = parse_csv_row(row) if len(row_values) >= len(analytics_data["headers"]): row_dict = {} for i, header in enumerate(analytics_data["headers"]): if i < len(row_values): row_dict[header] = convert_value(row_values[i]) analytics_data["data"].append(row_dict) else: # Standard CSV parsing row_values = parse_csv_row(row) if len(row_values) >= len(analytics_data["headers"]): row_dict = {} for i, header in enumerate(analytics_data["headers"]): if i < len(row_values): row_dict[header] = convert_value(row_values[i]) analytics_data["data"].append(row_dict) analytics_data["totalResults"] = len(analytics_data["data"]) # Add note if dimension was requested but not applied if dimension: analytics_data["note"] = ( f"Dimension '{dimension}' was requested but grouping is not supported in table format. " "Use get_analytics_graph() or response_format='graph' for dimensional analysis." ) # Add summary for certain reports if report_type in ["partner_usage", "var_usage", "cdn_bandwidth"]: summary_result = client.report.getTotal( reportType=kaltura_report_type, reportInputFilter=report_filter, objectIds=obj_ids, ) if summary_result: analytics_data["summary"] = parse_summary_data(summary_result) return json.dumps(analytics_data, indent=2) except ImportError as e: return json.dumps( { "error": "Analytics functionality not available", "detail": str(e), "suggestion": "Ensure Kaltura client has Report plugin", }, indent=2, ) except Exception as e: return json.dumps( { "error": f"Failed to retrieve analytics: {str(e)}", "report_type": report_type, "suggestion": "Check permissions and report availability", }, indent=2, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zoharbabin/kaltura-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server