Skip to main content
Glama

get_analytics

Generate structured analytics reports in table format for performance analysis, metric comparison, and data export from Kaltura media content.

Instructions

Get detailed analytics in TABLE format for reporting. USE WHEN: Creating reports, comparing metrics, ranking content, analyzing performance, exporting data. RETURNS: Structured data with headers/rows. EXAMPLES: 'Show top 10 videos by views', 'Compare user engagement by category', 'Export monthly performance report'. Use list_analytics_capabilities to see all 60+ report types. For charts/graphs, use get_analytics_timeseries instead.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
from_dateYesStart date in YYYY-MM-DD format (e.g., '2024-01-01')
to_dateYesEnd date in YYYY-MM-DD format (e.g., '2024-01-31')
report_typeNoType of analytics report (default: 'content'). Common options: 'content' (video performance), 'user_engagement' (viewer behavior), 'geographic' (location data), 'platforms' (device/OS breakdown). Run list_analytics_capabilities for all 60+ types.
entry_idNoOptional: Filter analytics for specific video (e.g., '1_abc123'). Leave empty for all content.
user_idNoOptional user ID for user-specific reports
categoriesNoOptional category filter
dimensionNoOptional dimension for grouping (e.g., 'device', 'country')
limitNoMax results per page (default: 50)

Implementation Reference

  • Main handler function for the 'get_analytics' MCP tool. Takes parameters like dates, report_type, filters, etc., and delegates to get_analytics_enhanced from analytics_core.
    async def get_analytics(
        manager: KalturaClientManager,
        from_date: str,
        to_date: str,
        report_type: str = "content",
        entry_id: Optional[str] = None,
        user_id: Optional[str] = None,
        categories: Optional[str] = None,
        dimension: Optional[str] = None,
        filters: Optional[Dict[str, str]] = None,
        limit: int = 50,
        page_index: int = 1,
        order_by: Optional[str] = None,
    ) -> str:
        """
        Get analytics data for comprehensive reporting and analysis.
    
        This is the primary analytics function that provides access to all report types
        in a table format suitable for detailed analysis, rankings, and comparisons.
    
        USE WHEN:
        - Getting performance metrics for content, users, or platform
        - Comparing data across multiple items
        - Creating rankings or leaderboards
        - Analyzing trends with specific breakdowns
        - Exporting detailed reports
    
        Args:
            manager: Kaltura client manager
            from_date: Start date (YYYY-MM-DD)
            to_date: End date (YYYY-MM-DD)
            report_type: Type of report (see available types below)
            entry_id: Optional media entry ID for content-specific reports
            user_id: Optional user ID for user-specific reports
            categories: Optional category filter (full category name)
            dimension: Optional dimension for grouping (e.g., "device", "country")
            filters: Optional additional filters as dict
            limit: Maximum results per page (default: 50)
            page_index: Page number for pagination (default: 1)
            order_by: Optional sort field
    
        Available Report Types:
            Content: content, content_dropoff, content_interactions, content_contributions
            Users: user_engagement, user_usage, unique_users, user_highlights
            Geographic: geographic, geographic_country, geographic_region
            Platform: platforms, operating_system, browsers
            Distribution: syndication, sources, playback_context
            Infrastructure: partner_usage, storage, bandwidth, cdn_bandwidth
            Advanced: percentiles, qoe_overview, realtime
    
        Returns:
            JSON with structured table data including headers, rows, and metadata
    
        Examples:
            # Top performing videos
            get_analytics(manager, from_date, to_date, report_type="content", limit=10)
    
            # User engagement by category
            get_analytics(manager, from_date, to_date, report_type="user_engagement",
                         categories="Training", dimension="device")
    
            # Geographic distribution
            get_analytics(manager, from_date, to_date, report_type="geographic_country")
        """
        # Use the enhanced analytics implementation
        from .analytics_core import get_analytics_enhanced
    
        return await get_analytics_enhanced(
            manager=manager,
            from_date=from_date,
            to_date=to_date,
            report_type=report_type,
            entry_id=entry_id,
            user_id=user_id,
            categories=categories,
            dimension=dimension,
            filters=filters,
            limit=limit,
            page_index=page_index,
            order_by=order_by,
            response_format="json",
        )
  • Input schema definition for the get_analytics tool, defining parameters like from_date, to_date, report_type, entry_id, etc., with descriptions and requirements.
        "type": "object",
        "properties": {
            "from_date": {
                "type": "string",
                "description": "Start date in YYYY-MM-DD format (e.g., '2024-01-01')",
            },
            "to_date": {
                "type": "string",
                "description": "End date in YYYY-MM-DD format (e.g., '2024-01-31')",
            },
            "report_type": {
                "type": "string",
                "description": "Type of analytics report (default: 'content'). Common options: 'content' (video performance), 'user_engagement' (viewer behavior), 'geographic' (location data), 'platforms' (device/OS breakdown). Run list_analytics_capabilities for all 60+ types.",
            },
            "entry_id": {
                "type": "string",
                "description": "Optional: Filter analytics for specific video (e.g., '1_abc123'). Leave empty for all content.",
            },
            "user_id": {
                "type": "string",
                "description": "Optional user ID for user-specific reports",
            },
            "categories": {
                "type": "string",
                "description": "Optional category filter",
            },
            "dimension": {
                "type": "string",
                "description": "Optional dimension for grouping (e.g., 'device', 'country')",
            },
            "limit": {
                "type": "integer",
                "description": "Max results per page (default: 50)",
            },
        },
        "required": ["from_date", "to_date"],
    },
  • Registration of the get_analytics tool in the MCP server's list_tools(), including name, description, and schema.
    types.Tool(
        name="get_analytics",
        description="Get detailed analytics in TABLE format for reporting. USE WHEN: Creating reports, comparing metrics, ranking content, analyzing performance, exporting data. RETURNS: Structured data with headers/rows. EXAMPLES: 'Show top 10 videos by views', 'Compare user engagement by category', 'Export monthly performance report'. Use list_analytics_capabilities to see all 60+ report types. For charts/graphs, use get_analytics_timeseries instead.",
        inputSchema={
            "type": "object",
            "properties": {
                "from_date": {
                    "type": "string",
                    "description": "Start date in YYYY-MM-DD format (e.g., '2024-01-01')",
                },
                "to_date": {
                    "type": "string",
                    "description": "End date in YYYY-MM-DD format (e.g., '2024-01-31')",
                },
                "report_type": {
                    "type": "string",
                    "description": "Type of analytics report (default: 'content'). Common options: 'content' (video performance), 'user_engagement' (viewer behavior), 'geographic' (location data), 'platforms' (device/OS breakdown). Run list_analytics_capabilities for all 60+ types.",
                },
                "entry_id": {
                    "type": "string",
                    "description": "Optional: Filter analytics for specific video (e.g., '1_abc123'). Leave empty for all content.",
                },
                "user_id": {
                    "type": "string",
                    "description": "Optional user ID for user-specific reports",
                },
                "categories": {
                    "type": "string",
                    "description": "Optional category filter",
                },
                "dimension": {
                    "type": "string",
                    "description": "Optional dimension for grouping (e.g., 'device', 'country')",
                },
                "limit": {
                    "type": "integer",
                    "description": "Max results per page (default: 50)",
                },
            },
            "required": ["from_date", "to_date"],
        },
    ),
  • Dispatch/execution registration in call_tool(): calls get_analytics(kaltura_manager, **arguments) when name == 'get_analytics'.
    elif name == "get_analytics":
        result = await get_analytics(kaltura_manager, **arguments)
  • Core helper function get_analytics_enhanced() that performs the actual Kaltura API calls for analytics reports, handles all report types, parsing, and formatting.
    async def get_analytics_enhanced(
        manager: KalturaClientManager,
        from_date: str,
        to_date: str,
        report_type: str = "content",
        entry_id: Optional[str] = None,
        user_id: Optional[str] = None,
        object_ids: Optional[str] = None,
        metrics: Optional[List[str]] = None,
        categories: Optional[str] = None,
        dimension: Optional[str] = None,
        interval: Optional[str] = None,
        filters: Optional[Dict[str, str]] = None,
        limit: int = 20,
        page_index: int = 1,
        order_by: Optional[str] = None,
        response_format: str = "json",
    ) -> str:
        """Enhanced analytics with support for all report types and advanced features.
    
        Args:
            manager: Kaltura client manager
            from_date: Start date (YYYY-MM-DD)
            to_date: End date (YYYY-MM-DD)
            report_type: Type of report (see REPORT_TYPE_MAP keys)
            entry_id: Optional specific entry ID
            user_id: Optional specific user ID
            object_ids: Optional comma-separated object IDs
            metrics: Requested metrics (for reference)
            categories: Category filter
            dimension: Dimension for grouping (e.g., "device", "country")
            interval: Time interval (e.g., "days", "months", "years")
            filters: Additional filters (customVar1In, countryIn, etc.)
            limit: Maximum results
            page_index: Page number for pagination
            order_by: Sort field
            response_format: "json", "csv", or "raw" (returns unprocessed API response)
        """
        # Validate dates
        date_pattern = r"^\d{4}-\d{2}-\d{2}$"
        if not re.match(date_pattern, from_date) or not re.match(date_pattern, to_date):
            return json.dumps({"error": "Invalid date format. Use YYYY-MM-DD"}, indent=2)
    
        # Validate entry ID if provided
        if entry_id and not validate_entry_id(entry_id):
            return json.dumps({"error": "Invalid entry ID format"}, indent=2)
    
        # Get report type ID
        report_type_id = REPORT_TYPE_MAP.get(report_type)
        if not report_type_id:
            return json.dumps(
                {
                    "error": f"Unknown report type: {report_type}",
                    "available_types": list(REPORT_TYPE_MAP.keys()),
                },
                indent=2,
            )
    
        # Check if object IDs are required
        if report_type in OBJECT_ID_REQUIRED_REPORTS and not (entry_id or user_id or object_ids):
            return json.dumps(
                {
                    "error": f"Report type '{report_type}' requires object IDs",
                    "suggestion": "Provide entry_id, user_id, or object_ids parameter",
                },
                indent=2,
            )
    
        # If requesting raw format and imports might fail, return early with a simpler approach
        if response_format == "raw":
            try:
                # Try the simple approach first for raw format
                client = manager.get_client()
    
                # Direct API call without complex objects
                start_time = int(datetime.strptime(from_date, "%Y-%m-%d").timestamp())
                end_time = int(datetime.strptime(to_date, "%Y-%m-%d").timestamp())
    
                # Try to get the report directly
                try:
                    # Prepare object IDs
                    if object_ids:
                        obj_ids = object_ids
                    elif entry_id:
                        obj_ids = entry_id
                    elif user_id:
                        obj_ids = user_id
                    else:
                        obj_ids = None
    
                    # Try direct call with minimal parameters
                    report_result = client.report.getTable(
                        reportType=report_type_id,
                        reportInputFilter={
                            "fromDate": start_time,
                            "toDate": end_time,
                            "entryIdIn": entry_id if entry_id else None,
                            "userIds": user_id if user_id else None,
                            "categories": categories if categories else None,
                        },
                        pager={"pageSize": min(limit, 500), "pageIndex": page_index},
                        order=order_by,
                        objectIds=obj_ids,
                    )
    
                    # Return raw response
                    return json.dumps(
                        {
                            "kaltura_response": {
                                "header": getattr(report_result, "header", ""),
                                "data": getattr(report_result, "data", ""),
                                "totalCount": getattr(report_result, "totalCount", 0),
                            },
                            "request_info": {
                                "report_type": report_type,
                                "report_type_id": report_type_id,
                                "from_date": from_date,
                                "to_date": to_date,
                                "entry_id": entry_id,
                                "user_id": user_id,
                            },
                        },
                        indent=2,
                    )
                except Exception:
                    # If direct call fails, fall through to normal processing
                    pass
            except Exception:
                # If anything fails, continue with normal processing
                pass
    
        client = manager.get_client()
    
        try:
            from KalturaClient.Plugins.Core import (
                KalturaEndUserReportInputFilter,
                KalturaFilterPager,
                KalturaReportInputFilter,
                KalturaReportInterval,
            )
    
            # Convert dates
            start_time = int(datetime.strptime(from_date, "%Y-%m-%d").timestamp())
            end_time = int(datetime.strptime(to_date, "%Y-%m-%d").timestamp())
    
            # Create appropriate filter
            if report_type in END_USER_REPORTS:
                report_filter = KalturaEndUserReportInputFilter()
            else:
                report_filter = KalturaReportInputFilter()
    
            # Set date range
            report_filter.fromDate = start_time
            report_filter.toDate = end_time
    
            # Set categories if provided
            if categories:
                report_filter.categories = categories
    
            # Set interval if provided
            if interval:
                interval_map = {
                    "days": KalturaReportInterval.DAYS,
                    "months": KalturaReportInterval.MONTHS,
                    "years": KalturaReportInterval.YEARS,
                }
                if interval in interval_map:
                    report_filter.interval = interval_map[interval]
    
            # Apply additional filters
            if filters:
                for key, value in filters.items():
                    if hasattr(report_filter, key):
                        setattr(report_filter, key, value)
    
            # Create pager
            pager = KalturaFilterPager()
            pager.pageSize = min(limit, 500)  # Allow larger pages
            pager.pageIndex = page_index
    
            # Prepare object IDs
            if object_ids:
                obj_ids = object_ids
            elif entry_id:
                obj_ids = entry_id
            elif user_id:
                obj_ids = user_id
            else:
                obj_ids = None
    
            # Get the report type enum value
            # For numeric IDs, just use the ID directly
            kaltura_report_type = report_type_id
    
            # Call appropriate API method
            if response_format == "raw":
                # Get raw table data without processing
                report_result = client.report.getTable(
                    reportType=kaltura_report_type,
                    reportInputFilter=report_filter,
                    pager=pager,
                    order=order_by,
                    objectIds=obj_ids,
                )
    
                # Return raw Kaltura response with minimal wrapping
                return json.dumps(
                    {
                        "kaltura_response": {
                            "header": getattr(report_result, "header", ""),
                            "data": getattr(report_result, "data", ""),
                            "totalCount": getattr(report_result, "totalCount", 0),
                        },
                        "request_info": {
                            "report_type": report_type,
                            "report_type_id": report_type_id,
                            "from_date": from_date,
                            "to_date": to_date,
                            "entry_id": entry_id,
                            "user_id": user_id,
                        },
                    },
                    indent=2,
                )
    
            elif response_format == "csv":
                # Get CSV export URL
                csv_result = client.report.getUrlForReportAsCsv(
                    reportTitle=f"{REPORT_TYPE_NAMES.get(report_type, 'Report')}_{from_date}_{to_date}",
                    reportText=f"Report from {from_date} to {to_date}",
                    headers=",".join(metrics) if metrics else None,
                    reportType=kaltura_report_type,
                    reportInputFilter=report_filter,
                    dimension=dimension,
                    pager=pager,
                    order=order_by,
                    objectIds=obj_ids,
                )
    
                return json.dumps(
                    {
                        "format": "csv",
                        "download_url": csv_result,
                        "expires_in": "300 seconds",
                        "report_type": REPORT_TYPE_NAMES.get(report_type, report_type),
                    },
                    indent=2,
                )
    
            else:
                # Get table data
                # Note: getTable doesn't support dimension parameter
                # If dimension is requested, we'll include it in metadata but cannot group by it
                report_result = client.report.getTable(
                    reportType=kaltura_report_type,
                    reportInputFilter=report_filter,
                    pager=pager,
                    order=order_by,
                    objectIds=obj_ids,
                )
    
                # Parse results
                analytics_data = {
                    "reportType": REPORT_TYPE_NAMES.get(report_type, "Analytics Report"),
                    "reportTypeCode": report_type,
                    "reportTypeId": report_type_id,
                    "dateRange": {"from": from_date, "to": to_date},
                    "filters": {
                        "categories": categories,
                        "dimension": dimension,
                        "interval": interval,
                        "objectIds": obj_ids,
                        "additionalFilters": filters,
                    },
                    "pagination": {
                        "pageSize": pager.pageSize,
                        "pageIndex": pager.pageIndex,
                        "totalCount": getattr(report_result, "totalCount", 0),
                    },
                    "headers": [],
                    "data": [],
                }
    
                # Parse headers
                if report_result.header:
                    analytics_data["headers"] = [h.strip() for h in report_result.header.split(",")]
    
                # Parse data with enhanced handling
                if report_result.data:
                    data_rows = report_result.data.split("\n")
                    for row in data_rows:
                        if row.strip():
                            # Handle different data formats
                            if ";" in row and report_type == "engagement_timeline":
                                # Special handling for timeline data
                                timeline_data = parse_timeline_data(row)
                                analytics_data["data"].append(timeline_data)
                            elif report_type in [
                                "percentiles",
                                "video_timeline",
                                "retention_curve",
                                "viewer_retention",
                                "drop_off_analysis",
                                "replay_detection",
                            ]:
                                # Special handling for PERCENTILES report (ID 43)
                                # This report uses semicolon-separated rows with pipe-separated values
                                if "|" in row:
                                    values = row.split("|")
                                    if len(values) >= 3:
                                        row_dict = {
                                            "percentile": convert_value(values[0]),
                                            "count_viewers": convert_value(values[1]),
                                            "unique_known_users": convert_value(values[2]),
                                        }
                                        analytics_data["data"].append(row_dict)
                                else:
                                    # Fallback to standard CSV parsing if no pipes found
                                    row_values = parse_csv_row(row)
                                    if len(row_values) >= len(analytics_data["headers"]):
                                        row_dict = {}
                                        for i, header in enumerate(analytics_data["headers"]):
                                            if i < len(row_values):
                                                row_dict[header] = convert_value(row_values[i])
                                        analytics_data["data"].append(row_dict)
                            else:
                                # Standard CSV parsing
                                row_values = parse_csv_row(row)
                                if len(row_values) >= len(analytics_data["headers"]):
                                    row_dict = {}
                                    for i, header in enumerate(analytics_data["headers"]):
                                        if i < len(row_values):
                                            row_dict[header] = convert_value(row_values[i])
                                    analytics_data["data"].append(row_dict)
    
                analytics_data["totalResults"] = len(analytics_data["data"])
    
                # Add note if dimension was requested but not applied
                if dimension:
                    analytics_data["note"] = (
                        f"Dimension '{dimension}' was requested but grouping is not supported in table format. "
                        "Use get_analytics_graph() or response_format='graph' for dimensional analysis."
                    )
    
                # Add summary for certain reports
                if report_type in ["partner_usage", "var_usage", "cdn_bandwidth"]:
                    summary_result = client.report.getTotal(
                        reportType=kaltura_report_type,
                        reportInputFilter=report_filter,
                        objectIds=obj_ids,
                    )
                    if summary_result:
                        analytics_data["summary"] = parse_summary_data(summary_result)
    
                return json.dumps(analytics_data, indent=2)
    
        except ImportError as e:
            return json.dumps(
                {
                    "error": "Analytics functionality not available",
                    "detail": str(e),
                    "suggestion": "Ensure Kaltura client has Report plugin",
                },
                indent=2,
            )
    
        except Exception as e:
            return json.dumps(
                {
                    "error": f"Failed to retrieve analytics: {str(e)}",
                    "report_type": report_type,
                    "suggestion": "Check permissions and report availability",
                },
                indent=2,
            )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zoharbabin/kaltura-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server