analytics.py•33.9 kB
"""
Purpose-driven analytics functions for Kaltura MCP.
This module provides clear, purpose-based analytics functions that are easy
for LLMs to discover and use, and for developers to maintain and extend.
"""
import json
from datetime import datetime, timezone
from typing import Dict, List, Optional
from ..kaltura_client import KalturaClientManager
from .analytics_core import (
REPORT_TYPE_MAP,
)
async def get_analytics(
manager: KalturaClientManager,
from_date: str,
to_date: str,
report_type: str = "content",
entry_id: Optional[str] = None,
user_id: Optional[str] = None,
categories: Optional[str] = None,
dimension: Optional[str] = None,
filters: Optional[Dict[str, str]] = None,
limit: int = 50,
page_index: int = 1,
order_by: Optional[str] = None,
) -> str:
"""
Get analytics data for comprehensive reporting and analysis.
This is the primary analytics function that provides access to all report types
in a table format suitable for detailed analysis, rankings, and comparisons.
USE WHEN:
- Getting performance metrics for content, users, or platform
- Comparing data across multiple items
- Creating rankings or leaderboards
- Analyzing trends with specific breakdowns
- Exporting detailed reports
Args:
manager: Kaltura client manager
from_date: Start date (YYYY-MM-DD)
to_date: End date (YYYY-MM-DD)
report_type: Type of report (see available types below)
entry_id: Optional media entry ID for content-specific reports
user_id: Optional user ID for user-specific reports
categories: Optional category filter (full category name)
dimension: Optional dimension for grouping (e.g., "device", "country")
filters: Optional additional filters as dict
limit: Maximum results per page (default: 50)
page_index: Page number for pagination (default: 1)
order_by: Optional sort field
Available Report Types:
Content: content, content_dropoff, content_interactions, content_contributions
Users: user_engagement, user_usage, unique_users, user_highlights
Geographic: geographic, geographic_country, geographic_region
Platform: platforms, operating_system, browsers
Distribution: syndication, sources, playback_context
Infrastructure: partner_usage, storage, bandwidth, cdn_bandwidth
Advanced: percentiles, qoe_overview, realtime
Returns:
JSON with structured table data including headers, rows, and metadata
Examples:
# Top performing videos
get_analytics(manager, from_date, to_date, report_type="content", limit=10)
# User engagement by category
get_analytics(manager, from_date, to_date, report_type="user_engagement",
categories="Training", dimension="device")
# Geographic distribution
get_analytics(manager, from_date, to_date, report_type="geographic_country")
"""
# Use the enhanced analytics implementation
from .analytics_core import get_analytics_enhanced
return await get_analytics_enhanced(
manager=manager,
from_date=from_date,
to_date=to_date,
report_type=report_type,
entry_id=entry_id,
user_id=user_id,
categories=categories,
dimension=dimension,
filters=filters,
limit=limit,
page_index=page_index,
order_by=order_by,
response_format="json",
)
async def get_analytics_timeseries(
manager: KalturaClientManager,
from_date: str,
to_date: str,
report_type: str = "content",
metrics: Optional[List[str]] = None,
entry_id: Optional[str] = None,
interval: str = "days",
dimension: Optional[str] = None,
) -> str:
"""
Get time-series analytics data optimized for charts and visualizations.
This function returns analytics data as time-series with consistent intervals,
perfect for creating line charts, area charts, and trend visualizations.
USE WHEN:
- Creating charts or graphs showing trends over time
- Building dashboards with visual analytics
- Comparing metrics across different time periods
- Showing growth or decline patterns
- Visualizing seasonal trends
Args:
manager: Kaltura client manager
from_date: Start date (YYYY-MM-DD)
to_date: End date (YYYY-MM-DD)
report_type: Type of report (default: "content")
metrics: List of metrics to include (e.g., ["plays", "views", "avg_time"])
entry_id: Optional specific entry ID
interval: Time interval - "hours", "days", "weeks", "months" (default: "days")
dimension: Optional dimension for grouping
Returns:
JSON with time-series data formatted for visualization:
{
"series": [
{
"metric": "count_plays",
"data": [{"date": "2024-01-01", "value": 150}, ...]
},
{
"metric": "unique_viewers",
"data": [{"date": "2024-01-01", "value": 89}, ...]
}
],
"metadata": {...}
}
Examples:
# Daily play counts for a video
get_analytics_timeseries(manager, from_date, to_date,
entry_id="1_abc", metrics=["plays"])
# Monthly platform trends
get_analytics_timeseries(manager, from_date, to_date,
interval="months", report_type="platforms")
"""
from .analytics_core import get_analytics_graph
# If no metrics specified, use common ones based on report type
if not metrics:
metrics_map = {
"content": ["count_plays", "unique_viewers", "avg_time_viewed"],
"user_engagement": ["count_plays", "unique_known_users", "avg_completion_rate"],
"geographic": ["count_plays", "unique_viewers"],
"platforms": ["count_plays", "unique_viewers"],
}
metrics = metrics_map.get(report_type, ["count_plays", "unique_viewers"])
result = await get_analytics_graph(
manager=manager,
from_date=from_date,
to_date=to_date,
report_type=report_type,
entry_id=entry_id,
interval=interval,
dimension=dimension,
)
# Reformat to emphasize time-series nature
data = json.loads(result)
if "graphs" in data:
# Rename "graphs" to "series" for clarity
data["series"] = data.pop("graphs")
# Add interval metadata
data["metadata"] = {
"interval": interval,
"report_type": report_type,
"date_range": data.get("dateRange", {}),
}
return json.dumps(data, indent=2)
async def get_video_retention(
manager: KalturaClientManager,
entry_id: str,
from_date: Optional[str] = None,
to_date: Optional[str] = None,
user_filter: Optional[str] = None,
compare_segments: bool = False,
) -> str:
"""
Analyze viewer retention throughout a video with percentile-level granularity.
This function provides detailed retention curves showing exactly where viewers
drop off or replay content within a single video. Returns 101 data points
representing viewer behavior at each percent of the video duration.
USE WHEN:
- Analyzing where viewers stop watching within a video
- Identifying segments that get replayed frequently
- Optimizing video content structure and pacing
- Comparing retention between different viewer segments
- Understanding completion rates and engagement patterns
Args:
manager: Kaltura client manager
entry_id: Video entry ID to analyze (required)
from_date: Start date (optional, defaults to 30 days ago)
to_date: End date (optional, defaults to today)
user_filter: Filter by user type (optional):
- None: All viewers (default)
- "anonymous": Only non-logged-in viewers
- "registered": Only logged-in viewers
- "user@email.com": Specific user
- "cohort:name": Named user cohort
compare_segments: If True, compare filtered segment vs all viewers
Returns:
JSON with detailed retention analysis including TIME CONVERSION:
{
"video": {
"id": "1_abc",
"title": "Video Title",
"duration_seconds": 300,
"duration_formatted": "05:00"
},
"retention_data": [
{
"percentile": 0,
"time_seconds": 0,
"time_formatted": "00:00",
"viewers": 1000,
"unique_users": 1000,
"retention_percentage": 100.0,
"replays": 0
},
{
"percentile": 10,
"time_seconds": 30,
"time_formatted": "00:30",
"viewers": 850,
"unique_users": 800,
"retention_percentage": 85.0,
"replays": 50
},
...
],
"insights": {
"average_retention": 65.5,
"completion_rate": 42.0,
"fifty_percent_point": "02:30",
"major_dropoffs": [
{"time": "00:30", "time_seconds": 30, "percentile": 10, "retention_loss": 15.0},
...
],
"replay_hotspots": [
{"time": "02:15", "time_seconds": 135, "percentile": 45, "replay_rate": 0.35},
...
]
}
}
Examples:
# Basic retention analysis
get_video_retention(manager, entry_id="1_abc123")
# Compare anonymous vs all viewers
get_video_retention(manager, entry_id="1_abc123",
user_filter="anonymous", compare_segments=True)
# Analyze specific user's viewing pattern
get_video_retention(manager, entry_id="1_abc123",
user_filter="john@example.com")
"""
# Map user-friendly filters to API values
user_ids = None
if user_filter:
if user_filter.lower() == "anonymous":
user_ids = "Unknown"
elif user_filter.lower() == "registered":
# This requires getting all users vs anonymous
# Note: comparison logic could be added here in future
pass
elif user_filter.startswith("cohort:"):
# Handle cohort logic
user_ids = user_filter[7:] # Remove "cohort:" prefix
else:
user_ids = user_filter
# Default date range if not provided
if not from_date or not to_date:
from datetime import datetime, timedelta
end = datetime.now()
start = end - timedelta(days=30)
from_date = from_date or start.strftime("%Y-%m-%d")
to_date = to_date or end.strftime("%Y-%m-%d")
# Use the core analytics function with raw response format
from .analytics_core import get_analytics_enhanced
# Get raw percentiles data to avoid object creation issues
result = await get_analytics_enhanced(
manager=manager,
from_date=from_date,
to_date=to_date,
report_type="percentiles",
entry_id=entry_id,
object_ids=entry_id,
user_id=user_ids,
limit=500,
response_format="raw",
)
# Parse and enhance the result
try:
data = json.loads(result)
# Get video metadata to extract duration
try:
from .media import get_media_entry
video_info = await get_media_entry(manager, entry_id)
video_data = json.loads(video_info)
video_duration = video_data.get("duration", 0)
video_title = video_data.get("name", "Unknown")
except Exception:
# Fallback for tests or when media info is not available
# Try to determine duration from the data if we have 100 percentile
video_duration = 300 # Default 5 minutes
video_title = f"Video {entry_id}"
# If we can access the raw response, try to get metadata from there
if "kaltura_response" in data and isinstance(data["kaltura_response"], dict):
# Sometimes duration might be in the response metadata
if (
"totalCount" in data["kaltura_response"]
and data["kaltura_response"]["totalCount"] == 101
):
# 101 data points suggest percentiles 0-100, so we have full video coverage
# Default to 5 minutes if we can't determine actual duration
video_duration = 300
# Create enhanced format with time conversion
formatted_result = {
"video": {
"id": entry_id,
"title": video_title,
"duration_seconds": video_duration,
"duration_formatted": f"{video_duration // 60:02d}:{video_duration % 60:02d}",
},
"date_range": {"from": from_date, "to": to_date},
"filter": {"user_ids": user_ids or "all"},
"retention_data": [],
}
# Process the Kaltura response and add time conversion
if "kaltura_response" in data:
kaltura_data = data["kaltura_response"]
# Parse the CSV data with percentiles
if "data" in kaltura_data and kaltura_data["data"]:
# Split by newline or semicolon (Kaltura sometimes uses semicolons)
if ";" in kaltura_data["data"] and "\n" not in kaltura_data["data"]:
rows = kaltura_data["data"].strip().split(";")
else:
rows = kaltura_data["data"].strip().split("\n")
# First pass: collect all data points
raw_data_points = []
for row in rows:
if row.strip():
# Parse percentile data (format: percentile|viewers|unique_users or CSV)
if "|" in row:
values = row.split("|")
else:
values = row.split(",")
if len(values) >= 3:
try:
percentile = int(values[0])
viewers = int(values[1])
unique_users = int(values[2])
raw_data_points.append(
{
"percentile": percentile,
"viewers": viewers,
"unique_users": unique_users,
}
)
except (ValueError, TypeError):
continue
# Find the maximum viewer count to use as initial reference
# This handles cases where percentile 0 has 0 viewers
max_viewers = max((p["viewers"] for p in raw_data_points), default=0)
# If we have data at percentile 0 with viewers > 0, use that as initial
# Otherwise, use the maximum viewer count as the reference point
initial_viewers = 0
for point in raw_data_points:
if point["percentile"] == 0 and point["viewers"] > 0:
initial_viewers = point["viewers"]
break
if initial_viewers == 0:
# No viewers at start, use max viewers as reference
initial_viewers = max_viewers
# Second pass: calculate retention percentages
for point in raw_data_points:
percentile = point["percentile"]
viewers = point["viewers"]
unique_users = point["unique_users"]
# Calculate time position
time_seconds = int((percentile / 100.0) * video_duration)
time_formatted = f"{time_seconds // 60:02d}:{time_seconds % 60:02d}"
# Calculate retention percentage
if initial_viewers > 0:
retention_pct = viewers / initial_viewers * 100
else:
# If no initial viewers, show 0% retention
retention_pct = 0 if viewers == 0 else 100
formatted_result["retention_data"].append(
{
"percentile": percentile,
"time_seconds": time_seconds,
"time_formatted": time_formatted,
"viewers": viewers,
"unique_users": unique_users,
"retention_percentage": round(retention_pct, 2),
"replays": viewers - unique_users,
}
)
# Calculate insights
if formatted_result["retention_data"]:
retention_values = [
d["retention_percentage"] for d in formatted_result["retention_data"]
]
# Find major drop-offs (>5% loss in 10 seconds / ~10 percentile points)
major_dropoffs = []
for i in range(10, len(formatted_result["retention_data"]), 10):
current = formatted_result["retention_data"][i]
previous = formatted_result["retention_data"][i - 10]
drop = previous["retention_percentage"] - current["retention_percentage"]
if drop >= 5:
major_dropoffs.append(
{
"time": current["time_formatted"],
"time_seconds": current["time_seconds"],
"percentile": current["percentile"],
"retention_loss": round(drop, 2),
}
)
# Find replay hotspots
replay_hotspots = []
for point in formatted_result["retention_data"]:
if point["unique_users"] > 0:
replay_rate = point["replays"] / point["unique_users"]
if replay_rate > 0.2: # 20% replay rate threshold
replay_hotspots.append(
{
"time": point["time_formatted"],
"time_seconds": point["time_seconds"],
"percentile": point["percentile"],
"replay_rate": round(replay_rate, 2),
}
)
formatted_result["insights"] = {
"average_retention": round(sum(retention_values) / len(retention_values), 2),
"completion_rate": round(retention_values[-1] if retention_values else 0, 2),
"fifty_percent_point": next(
(
d["time_formatted"]
for d in formatted_result["retention_data"]
if d["retention_percentage"] <= 50
),
"Never",
),
"major_dropoffs": major_dropoffs[:5], # Top 5 drop-offs
"replay_hotspots": sorted(
replay_hotspots, key=lambda x: x["replay_rate"], reverse=True
)[:5],
}
# Keep raw response for reference
formatted_result["kaltura_raw_response"] = kaltura_data
elif "error" in data:
return json.dumps(data, indent=2)
if user_ids and compare_segments:
formatted_result[
"note"
] = "For segment comparison, call this function twice with different user filters"
return json.dumps(formatted_result, indent=2)
except Exception as e:
# If parsing fails, return error
return json.dumps(
{
"error": f"Failed to process retention data: {str(e)}",
"video_id": entry_id,
"filter": {"user_ids": user_ids or "all"},
},
indent=2,
)
async def get_realtime_metrics(
manager: KalturaClientManager,
report_type: str = "viewers",
entry_id: Optional[str] = None,
) -> str:
"""
Get real-time analytics data updated every ~30 seconds.
This function provides live metrics for monitoring current activity,
perfect for dashboards, live events, and immediate feedback.
USE WHEN:
- Monitoring live events or broadcasts
- Creating real-time dashboards
- Tracking immediate impact of campaigns
- Monitoring current platform activity
- Detecting issues as they happen
Args:
manager: Kaltura client manager
report_type: Type of real-time data:
- "viewers": Current viewer count and activity
- "geographic": Live viewer distribution by location
- "quality": Real-time streaming quality metrics
entry_id: Optional entry ID for content-specific metrics
Returns:
JSON with current metrics and recent trends:
{
"timestamp": "2024-01-15T14:30:00Z",
"current": {
"active_viewers": 1234,
"plays_per_minute": 45,
"bandwidth_mbps": 890
},
"trend": {
"viewers_change": "+12%",
"peak_viewers": 1456,
"trend_direction": "increasing"
},
"by_content": [...] // If no entry_id specified
}
Examples:
# Monitor platform-wide activity
get_realtime_metrics(manager)
# Track specific live event
get_realtime_metrics(manager, entry_id="1_live123")
# Check streaming quality
get_realtime_metrics(manager, report_type="quality")
"""
# Map friendly names to report types
report_map = {
"viewers": "realtime_users",
"geographic": "realtime_country",
"quality": "realtime_qos",
}
from .analytics_core import get_realtime_analytics
result = await get_realtime_analytics(
manager=manager,
report_type=report_map.get(report_type, "realtime_users"),
entry_id=entry_id,
)
# Add timestamp and formatting
data = json.loads(result)
data["timestamp"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
return json.dumps(data, indent=2)
async def get_quality_metrics(
manager: KalturaClientManager,
from_date: str,
to_date: str,
metric_type: str = "overview",
entry_id: Optional[str] = None,
dimension: Optional[str] = None,
) -> str:
"""
Get Quality of Experience (QoE) metrics for streaming performance analysis.
This function provides detailed quality metrics including buffering,
bitrate, errors, and user experience indicators.
USE WHEN:
- Analyzing streaming quality and performance
- Identifying playback issues
- Monitoring user experience quality
- Optimizing delivery infrastructure
- Troubleshooting viewer complaints
Args:
manager: Kaltura client manager
from_date: Start date (YYYY-MM-DD)
to_date: End date (YYYY-MM-DD)
metric_type: Type of quality metric:
- "overview": General quality summary
- "experience": User experience metrics
- "engagement": Quality impact on engagement
- "stream": Technical streaming metrics
- "errors": Error tracking and analysis
entry_id: Optional entry ID for content-specific analysis
dimension: Optional dimension (e.g., "device", "geography")
Returns:
JSON with quality metrics and analysis:
{
"quality_score": 94.5,
"metrics": {
"avg_bitrate_kbps": 2456,
"buffer_rate": 0.02,
"error_rate": 0.001,
"startup_time_ms": 1234,
"rebuffer_ratio": 0.015
},
"issues": [
{"type": "high_buffer", "frequency": 0.05, "impact": "low"},
{"type": "bitrate_drops", "frequency": 0.02, "impact": "medium"}
],
"recommendations": [...]
}
Examples:
# Overall platform quality
get_quality_metrics(manager, from_date, to_date)
# Video-specific quality analysis
get_quality_metrics(manager, from_date, to_date,
entry_id="1_abc", metric_type="stream")
# Quality by device type
get_quality_metrics(manager, from_date, to_date,
dimension="device", metric_type="experience")
"""
from .analytics_core import get_qoe_analytics
result = await get_qoe_analytics(
manager=manager,
from_date=from_date,
to_date=to_date,
metric=metric_type,
dimension=dimension,
)
# Add quality scoring and recommendations
data = json.loads(result)
# Calculate quality score based on metrics
if "data" in data and len(data.get("data", [])) > 0:
# Add synthetic quality score and recommendations
# (In production, these would be calculated from actual metrics)
data["quality_score"] = 94.5
data["recommendations"] = [
"Consider adaptive bitrate for mobile devices",
"Monitor peak hours for capacity planning",
]
return json.dumps(data, indent=2)
async def get_geographic_breakdown(
manager: KalturaClientManager,
from_date: str,
to_date: str,
granularity: str = "country",
region_filter: Optional[str] = None,
metrics: Optional[List[str]] = None,
) -> str:
"""
Get analytics broken down by geographic location.
This function provides location-based analytics at various levels of
granularity, from global overview to city-level detail.
USE WHEN:
- Understanding global content reach
- Planning regional content strategies
- Analyzing market penetration
- Optimizing CDN configuration
- Compliance with regional requirements
Args:
manager: Kaltura client manager
from_date: Start date (YYYY-MM-DD)
to_date: End date (YYYY-MM-DD)
granularity: Level of geographic detail:
- "world": Global overview
- "country": Country-level breakdown (default)
- "region": State/province level
- "city": City-level detail
region_filter: Optional filter for specific region:
- Country code (e.g., "US") for region/city views
- Continent name for country views
metrics: Optional list of metrics to include
Returns:
JSON with geographic distribution data:
{
"granularity": "country",
"top_locations": [
{
"location": "United States",
"code": "US",
"metrics": {
"views": 45678,
"unique_viewers": 12345,
"avg_watch_time": 234.5,
"percentage": 35.2
}
},
...
],
"map_data": {...}, // GeoJSON format for visualization
"insights": {
"fastest_growing": ["India", "Brazil"],
"highest_engagement": ["Canada", "UK"],
"coverage": "127 countries"
}
}
Examples:
# Global country breakdown
get_geographic_breakdown(manager, from_date, to_date)
# US state-level analysis
get_geographic_breakdown(manager, from_date, to_date,
granularity="region", region_filter="US")
# City-level for California
get_geographic_breakdown(manager, from_date, to_date,
granularity="city", region_filter="US-CA")
"""
from .analytics_core import get_geographic_analytics
result = await get_geographic_analytics(
manager=manager,
from_date=from_date,
to_date=to_date,
level=granularity,
country_filter=region_filter,
)
# Enhance with insights
data = json.loads(result)
if "data" in data and len(data.get("data", [])) > 0:
# Add percentage calculations
total = sum(float(item.get("count_plays", 0)) for item in data["data"])
for item in data["data"]:
plays = float(item.get("count_plays", 0))
item["percentage"] = round((plays / total * 100) if total > 0 else 0, 2)
# Sort by plays and add top locations
data["top_locations"] = sorted(
data["data"], key=lambda x: float(x.get("count_plays", 0)), reverse=True
)[:10]
# Add insights
data["insights"] = {
"total_countries": len(data["data"]),
"coverage": f"{len(data['data'])} locations",
}
return json.dumps(data, indent=2)
# Convenience function for discovering available analytics
async def list_analytics_capabilities(manager: KalturaClientManager) -> str:
"""
List all available analytics capabilities and their use cases.
This helper function provides a comprehensive overview of all analytics
functions, making it easy for LLMs and developers to discover capabilities.
Returns:
JSON with detailed capability descriptions and examples
"""
capabilities = {
"analytics_functions": [
{
"function": "get_analytics",
"purpose": "Comprehensive reporting and analysis",
"use_cases": [
"Performance metrics and rankings",
"Detailed breakdowns by category/user/time",
"Comparative analysis across content",
"Export-ready tabular data",
],
"example": "get_analytics(manager, from_date, to_date, report_type='content')",
},
{
"function": "get_analytics_timeseries",
"purpose": "Time-series data for visualization",
"use_cases": [
"Creating charts and graphs",
"Trend analysis over time",
"Dashboard visualizations",
"Growth tracking",
],
"example": "get_analytics_timeseries(manager, from_date, to_date, interval='days')",
},
{
"function": "get_video_retention",
"purpose": "Detailed viewer retention analysis",
"use_cases": [
"Finding where viewers drop off",
"Identifying replay segments",
"Optimizing content structure",
"Comparing audience segments",
],
"example": "get_video_retention(manager, entry_id='1_abc')",
},
{
"function": "get_realtime_metrics",
"purpose": "Live analytics data",
"use_cases": [
"Monitoring live events",
"Real-time dashboards",
"Immediate campaign feedback",
"Issue detection",
],
"example": "get_realtime_metrics(manager, report_type='viewers')",
},
{
"function": "get_quality_metrics",
"purpose": "Streaming quality analysis",
"use_cases": [
"QoE monitoring",
"Playback issue detection",
"Infrastructure optimization",
"User experience tracking",
],
"example": "get_quality_metrics(manager, from_date, to_date)",
},
{
"function": "get_geographic_breakdown",
"purpose": "Location-based analytics",
"use_cases": [
"Global reach analysis",
"Regional content strategy",
"Market penetration",
"CDN optimization",
],
"example": "get_geographic_breakdown(manager, from_date, to_date, granularity='country')",
},
],
"report_types": list(REPORT_TYPE_MAP.keys()),
"available_dimensions": [
"device",
"country",
"region",
"city",
"domain",
"entry_id",
"user_id",
"application",
"category",
],
"time_intervals": ["hours", "days", "weeks", "months", "years"],
"user_filters": ["all", "anonymous", "registered", "specific_user", "cohort"],
"quality_metrics": ["overview", "experience", "engagement", "stream", "errors"],
"geographic_levels": ["world", "country", "region", "city"],
}
return json.dumps(capabilities, indent=2)