Skip to main content
Glama
videos.py16.2 kB
"""Video formatting methods for the Trakt MCP server.""" from __future__ import annotations import re from datetime import UTC, datetime from html import escape from typing import TYPE_CHECKING, Any from urllib.parse import urlparse from pydantic import ValidationError from models.videos.video import ValidatedVideo if TYPE_CHECKING: from models.types.api_responses import VideoResponse # Pre-compiled regex patterns for YouTube video ID extraction _YT_PATTERNS = [ # YouTube domains (youtube.com, youtube-nocookie.com) with all path formats re.compile( r"(?:https?://)?(?:(?:www|m)\.)?youtube(?:-nocookie)?\.com/" + r"(?:watch\?(?:[^&]*&)*v=|embed/|shorts/)([A-Za-z0-9_-]{11})(?=[?#&/\s]|$)" ), # youtu.be short URLs re.compile( r"(?:https?://)?(?:(?:www|m)\.)?youtu\.be/([A-Za-z0-9_-]{11})(?=[?#&/\s]|$)" ), ] class VideoFormatters: """Helper class for formatting video-related data for MCP responses.""" @staticmethod def extract_youtube_video_id(url: str) -> str | None: """Extract YouTube video ID from various URL formats. Args: url: YouTube URL in various formats Returns: Video ID if found, None otherwise """ if not url: return None url = url.strip() # Use pre-compiled patterns for better performance for pattern in _YT_PATTERNS: match = pattern.search(url) if match: return match.group(1) return None @staticmethod def normalize_site_name(site: str | None) -> str: """Normalize video site display names for consistent UX. Args: site: Site name from API response Returns: Normalized display name """ # Coerce None to safe default to prevent AttributeError safe_site = site or "unknown" site_mapping = { "youtube": "YouTube", } return site_mapping.get(safe_site.lower(), safe_site.title()) @staticmethod def normalize_video_type_for_display(video_type: str | None) -> str: """Normalize video type for display with proper pluralization. Args: video_type: Raw video type from API response Returns: Clean, properly pluralized display name """ # Coerce None to safe default to prevent AttributeError safe_type = video_type or "unknown" # Clean the type - replace underscores with spaces and normalize case cleaned = safe_type.replace("_", " ").strip().lower() # Mapping for irregular plurals and special cases type_mapping = { "behind the scenes": "Behind the Scenes", "featurette": "Featurettes", "trailer": "Trailers", "teaser": "Teasers", "clip": "Clips", "unknown": "Unknown Videos", } if cleaned in type_mapping: return type_mapping[cleaned] # Fall back to safe pluralization - title case with 's' appended return cleaned.title() + "s" @staticmethod def escape_markdown(text: str) -> str: """Escape Markdown characters to prevent formatting injection. Args: text: Text that may contain Markdown characters Returns: Text with Markdown characters escaped """ # Escape backslash first to avoid double-escaping newly added backslashes replacements = [ ("\\", r"\\"), ("`", r"\`"), ("*", r"\*"), ("_", r"\_"), ("{", r"\{"), ("}", r"\}"), ("[", r"\["), ("]", r"\]"), ("(", r"\("), (")", r"\)"), ("#", r"\#"), ("+", r"\+"), ("-", r"\-"), (".", r"\."), ("!", r"\!"), ("|", r"\|"), (">", r"\>"), ] for a, b in replacements: text = text.replace(a, b) return text @staticmethod def parse_iso_datetime(date_string: str | None) -> datetime | None: """Parse ISO datetime string with Z timezone. Args: date_string: ISO datetime string, possibly with 'Z' timezone Returns: Timezone-aware datetime in UTC or None if parsing fails """ if not date_string: return None try: # Parse datetime and ensure it's timezone-aware in UTC dt = datetime.fromisoformat(date_string.replace("Z", "+00:00")) # If somehow still naive, assume UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=UTC) # Convert to UTC if not already return dt.astimezone(UTC) except (ValueError, TypeError): return None @staticmethod def validate_embed_url(url: str) -> bool: """Validate that embed URL is from trusted domains with safe schemes. Args: url: URL to validate Returns: True if URL is safe for embedding, False otherwise """ try: parsed = urlparse(url) # Only allow https scheme for embed URLs if parsed.scheme != "https": return False # Only allow explicit embed paths if not parsed.path.startswith("/embed/"): return False # Only allow trusted YouTube embed domains (wildcard-based + country TLDs) domain = parsed.netloc.lower() # Standard domains if ( domain == "youtube.com" or domain == "youtube-nocookie.com" or domain.endswith(".youtube.com") or domain.endswith(".youtube-nocookie.com") ): return True # Country TLD patterns: youtube.[cc], youtube.com.[cc], youtube.co.[cc] country_patterns = [ # youtube.de, youtube.fr (exclude youtube.co) r"^youtube\.(?!co$)[a-z]{2}$", # youtube.com.au, youtube.com.mx r"^youtube\.com\.[a-z]{2}$", # youtube.co.uk, youtube.co.jp (requires country code) r"^youtube\.co\.[a-z]{2}$", # www.youtube.de (exclude .co) r"^[a-z0-9-]+\.youtube\.(?!co$)[a-z]{2}$", # www.youtube.com.au r"^[a-z0-9-]+\.youtube\.com\.[a-z]{2}$", # www.youtube.co.uk r"^[a-z0-9-]+\.youtube\.co\.[a-z]{2}$", ] return any(re.match(pattern, domain) for pattern in country_patterns) except Exception: return False @staticmethod def validate_watch_url(url: str) -> str | None: """Validate that watch URL uses safe schemes (http/https only). Args: url: URL to validate for safety Returns: Original URL if safe, None if unsafe or malformed """ if not url: return None try: parsed = urlparse(url) # Only allow absolute http(s) URLs with a host and no whitespace if any(ch.isspace() for ch in url): return None if parsed.scheme in ("http", "https") and parsed.netloc: return url return None except Exception: return None @staticmethod def get_youtube_embed_url(url: str) -> str | None: """Get YouTube embed URL from various YouTube URL formats. Args: url: YouTube URL in various formats Returns: YouTube embed URL or None if not a YouTube video """ video_id = VideoFormatters.extract_youtube_video_id(url) if video_id: # Use standard YouTube embed URL embed_url = f"https://www.youtube.com/embed/{video_id}" # Validate the constructed URL before returning if VideoFormatters.validate_embed_url(embed_url): return embed_url return None @staticmethod def validate_video_list(videos: list[dict[str, Any]]) -> list[ValidatedVideo]: """Validate a list of video dictionaries using Pydantic models. Args: videos: List of video data dictionaries from API responses Returns: List of validated video models Raises: ValueError: If any video fails validation with details """ if not videos: return [] validated_videos: list[ValidatedVideo] = [] validation_errors: list[str] = [] for i, video in enumerate(videos): try: validated_video = ValidatedVideo.model_validate(video) validated_videos.append(validated_video) except ValidationError as e: error_details: list[str] = [] for error in e.errors(): field = ".".join(str(loc) for loc in error["loc"]) message = error["msg"] error_details.append(f"{field}: {message}") validation_errors.append(f"Video {i}: {'; '.join(error_details)}") if validation_errors: raise ValueError( f"Video validation failed: {' | '.join(validation_errors)}" ) return validated_videos @staticmethod def format_videos_list( videos: list[VideoResponse], title: str, embed_markdown: bool = True, validate_input: bool = True, ) -> str: """Format videos with optional embedded markdown. Args: videos: List of video response data title: Title of the movie/show for context embed_markdown: Use embedded markdown syntax for videos (default: True) validate_input: Whether to validate input data with Pydantic models (default: True) Returns: Formatted markdown text with videos grouped by type Raises: ValueError: If validate_input=True and video data is invalid """ # Validate inputs if requested # (follows project guideline: "Validate all inputs with Pydantic models") if validate_input and videos: try: # Convert VideoResponse TypedDicts to plain dicts for validation video_dicts = [dict(video) for video in videos] VideoFormatters.validate_video_list(video_dicts) except ValueError as e: raise ValueError(f"Invalid video data provided: {e}") from e if not videos: return f"# Videos for {title}\n\nNo videos available." lines = [f"# Videos for {title}\n"] # Check if any YouTube videos will generate iframes # (only when embed_markdown=True) will_have_iframes = embed_markdown and any( (video.get("site") or "").lower() == "youtube" and VideoFormatters.get_youtube_embed_url(video.get("url") or "") is not None for video in videos ) if will_have_iframes: message = ( "**ALWAYS:** Preserve iframe HTML below exactly as provided " "so videos embed and display properly in the response\n" ) lines.append(message) by_type: dict[str, list[VideoResponse]] = {} for video in videos: video_type = video.get("type") or "unknown" by_type.setdefault(video_type, []).append(video) for video_type in sorted(by_type): display_name = VideoFormatters.normalize_video_type_for_display(video_type) lines.append(f"## {display_name}\n") type_videos = by_type[video_type] def get_published_date(video: VideoResponse) -> datetime: parsed_date = VideoFormatters.parse_iso_datetime( video.get("published_at") ) return parsed_date or datetime.min.replace(tzinfo=UTC) sorted_videos = sorted(type_videos, key=get_published_date, reverse=True) for video in sorted_videos: raw_title = video.get("title", "Unknown Video") title_text = VideoFormatters.escape_markdown(raw_title) lines.append(f"### {title_text}") # Format video link url = video.get("url") or "" site = video.get("site") or "unknown" if embed_markdown: # Generate HTML iframe for YouTube videos, # fallback to markdown for others if (site or "").lower() == "youtube": embed_url = VideoFormatters.get_youtube_embed_url(url) if embed_url: iframe_html = ( f'<iframe width="560" height="315" ' f'src="{escape(embed_url, quote=True)}" ' f'title="YouTube video player" frameborder="0" ' f'allow="accelerometer; autoplay; clipboard-write; ' f"encrypted-media; gyroscope; picture-in-picture; " f'web-share" ' f'referrerpolicy="strict-origin-when-cross-origin" ' f"allowfullscreen></iframe>" ) lines.append(iframe_html) else: # Fallback to markdown link if embed URL extraction fails site_display = VideoFormatters.normalize_site_name(site) safe_url = VideoFormatters.validate_watch_url(url) if safe_url: lines.append( f"[▶️ Watch on {site_display}](<{safe_url}>)" ) else: lines.append("Watch link unavailable") else: # For non-YouTube videos, use simple markdown link site_display = VideoFormatters.normalize_site_name(site) safe_url = VideoFormatters.validate_watch_url(url) if safe_url: lines.append(f"[▶️ Watch on {site_display}](<{safe_url}>)") else: lines.append("Watch link unavailable") else: # Simple text link site_display = VideoFormatters.normalize_site_name(site) safe_url = VideoFormatters.validate_watch_url(url) if safe_url: lines.append(f"[▶️ Watch on {site_display}](<{safe_url}>)") else: lines.append("Watch link unavailable") # Video metadata metadata: list[str] = [] if video.get("official"): metadata.append("**Official**") if size := video.get("size"): metadata.append(f"{size}p") if language := video.get("language"): metadata.append((language or "").upper()) if country := video.get("country"): metadata.append(f"({(country or '').upper()})") # Format publication date if published_at := video.get("published_at"): date_obj = VideoFormatters.parse_iso_datetime(published_at) if date_obj: metadata.append(f"*{date_obj.strftime('%B %d, %Y')}*") else: metadata.append("*Date unknown*") if metadata: lines.append(" • ".join(metadata)) lines.append("") # Empty line between videos return "\n".join(lines)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wwiens/trakt_mcpserver'

If you have feedback or need assistance with the MCP directory API, please join our Discord server