Skip to main content
Glama

Jupiter Broadcasting Podcast Data MCP Server

by Red5d
rss_parser.py12.2 kB
"""RSS feed parser for Podcast 2.0 specification.""" import requests from datetime import datetime, timezone from typing import Dict, List, Optional, Any from lxml import etree import re class PodcastRSSParser: """Parser for Podcast 2.0 RSS feeds.""" def __init__(self, feeds: Dict[str, str]): """Initialize with a dictionary of show_name -> feed_url mappings.""" self.feeds = feeds self._cached_feeds: Dict[str, Any] = {} def _parse_date(self, date_string: str) -> Optional[datetime]: """Parse various date formats commonly found in RSS feeds.""" if not date_string: return None date_string = date_string.strip() # Common RSS date formats to try formats = [ # RFC 2822 format (most common in RSS) "%a, %d %b %Y %H:%M:%S %z", # "Sun, 05 Oct 2025 19:25:37 -0700" "%a, %d %b %Y %H:%M:%S %Z", # "Sun, 05 Oct 2025 19:25:37 PDT" "%a, %d %b %Y %H:%M:%S", # "Sun, 05 Oct 2025 19:25:37" # ISO 8601 formats "%Y-%m-%dT%H:%M:%S%z", # "2025-10-05T19:25:37-0700" "%Y-%m-%dT%H:%M:%SZ", # "2025-10-05T19:25:37Z" "%Y-%m-%dT%H:%M:%S", # "2025-10-05T19:25:37" "%Y-%m-%d %H:%M:%S", # "2025-10-05 19:25:37" "%Y-%m-%d", # "2025-10-05" # Other common formats "%d %b %Y %H:%M:%S %z", # "05 Oct 2025 19:25:37 -0700" "%d %b %Y", # "05 Oct 2025" ] # Clean up timezone offset format for Python compatibility # Convert "-0700" to "-07:00" format if needed if re.search(r'[+-]\d{4}$', date_string): date_string = re.sub(r'([+-]\d{2})(\d{2})$', r'\1:\2', date_string) parsed_dt = None for fmt in formats: try: parsed_dt = datetime.strptime(date_string, fmt) break except ValueError: continue # If all else fails, try to extract just the date part if parsed_dt is None: date_match = re.search(r'(\d{1,2})\s+(\w{3})\s+(\d{4})', date_string) if date_match: try: day, month, year = date_match.groups() month_map = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12 } if month in month_map: parsed_dt = datetime(int(year), month_map[month], int(day)) except (ValueError, KeyError): pass if parsed_dt is None: print(f"Warning: Could not parse date: {date_string}") return None # Ensure all datetime objects are timezone-aware for consistent comparison # If no timezone info, assume UTC if parsed_dt.tzinfo is None: parsed_dt = parsed_dt.replace(tzinfo=timezone.utc) return parsed_dt def _get_feed(self, show_name: str) -> Optional[etree._Element]: """Get parsed feed data, with caching.""" if show_name not in self.feeds: return None if show_name not in self._cached_feeds: try: feed_url = self.feeds[show_name] # Handle file:// URLs for local testing if feed_url.startswith('file://'): file_path = feed_url[7:] # Remove 'file://' prefix with open(file_path, 'rb') as f: content = f.read() else: # Handle HTTP/HTTPS URLs response = requests.get(feed_url, timeout=30) response.raise_for_status() content = response.content # Parse XML with lxml parser = etree.XMLParser(strip_cdata=False) root = etree.fromstring(content, parser) self._cached_feeds[show_name] = root except Exception as e: print(f"Error parsing feed for {show_name}: {e}") return None return self._cached_feeds[show_name] def get_shows(self) -> List[str]: """Get list of available show names.""" return list(self.feeds.keys()) def search_episodes( self, show_name: Optional[str] = None, since_date: Optional[str] = None, before_date: Optional[str] = None, hosts: Optional[List[str]] = None, text_search: Optional[str] = None, ) -> List[Dict[str, Any]]: """Search episodes based on provided criteria.""" if not any([show_name, since_date, before_date, hosts, text_search]): raise ValueError("At least one search parameter must be provided") results = [] # Determine which shows to search shows_to_search = [show_name] if show_name else self.get_shows() # Parse date filters since_dt = None before_dt = None if since_date: since_dt = self._parse_date(since_date) if before_date: before_dt = self._parse_date(before_date) for show in shows_to_search: feed_root = self._get_feed(show) if feed_root is None: continue # Find all item elements (episodes) items = feed_root.xpath('//item') for item in items: episode_data = self._parse_episode(show, item) # Apply filters if since_dt and episode_data.get("published_date"): episode_dt = self._parse_date(episode_data["published_date"]) if episode_dt and episode_dt < since_dt: continue if before_dt and episode_data.get("published_date"): episode_dt = self._parse_date(episode_data["published_date"]) if episode_dt and episode_dt > before_dt: continue if hosts: episode_hosts = episode_data.get("hosts", []) if not any(host.lower() in [h.lower() for h in episode_hosts] for host in hosts): continue if text_search: search_text = text_search.lower() title = episode_data.get("title", "").lower() description = episode_data.get("description", "").lower() if search_text not in title and search_text not in description: continue results.append(episode_data) return results def get_episode(self, show_name: str, episode_number: str) -> Optional[Dict[str, Any]]: """Get specific episode data.""" feed_root = self._get_feed(show_name) if feed_root is None: return None # Find all item elements (episodes) items = feed_root.xpath('//item') for item in items: # Check GUID guid_elem = item.find('guid') if guid_elem is not None and guid_elem.text == episode_number: return self._parse_episode(show_name, item) # Check podcast:episode number episode_elem = item.find('.//{https://podcastindex.org/namespace/1.0}episode') if episode_elem is not None and episode_elem.text == episode_number: return self._parse_episode(show_name, item) return None def get_transcript(self, show_name: str, episode_number: str) -> Optional[str]: """Get transcript for an episode.""" episode = self.get_episode(show_name, episode_number) if not episode: return None transcript_url = episode.get("transcript_url") if not transcript_url: return None try: response = requests.get(transcript_url, timeout=30) response.raise_for_status() return response.text except Exception as e: print(f"Error fetching transcript: {e}") return None def _parse_episode(self, show_name: str, item: etree._Element) -> Dict[str, Any]: """Parse a single episode entry from XML.""" # Helper function to get text content safely def get_text(element): return element.text if element is not None else "" # Helper function to get CDATA content safely def get_cdata_content(element): if element is not None: # Handle CDATA content if element.text: return element.text.strip() return "" # Basic episode information title_elem = item.find('title') guid_elem = item.find('guid') link_elem = item.find('link') pubdate_elem = item.find('pubDate') description_elem = item.find('description') # iTunes elements duration_elem = item.find('.//{http://www.itunes.com/dtds/podcast-1.0.dtd}duration') # Podcast namespace elements episode_num_elem = item.find('.//{https://podcastindex.org/namespace/1.0}episode') episode_data = { "show_name": show_name, "id": get_text(guid_elem) or get_text(episode_num_elem), "episode_number": get_text(episode_num_elem), "title": get_text(title_elem), "description": get_cdata_content(description_elem), "published_date": get_text(pubdate_elem), "link": get_text(link_elem), "duration": get_text(duration_elem), "hosts": [], "transcript_urls": [], "enclosures": [], "chapters_url": None, } # Parse enclosures (audio files) enclosure_elems = item.findall('enclosure') for enclosure in enclosure_elems: episode_data["enclosures"].append({ "url": enclosure.get("url", ""), "type": enclosure.get("type", ""), "length": enclosure.get("length", "0"), }) # Parse podcast:person elements (hosts, guests, etc.) person_elems = item.findall('.//{https://podcastindex.org/namespace/1.0}person') for person in person_elems: role = person.get("role", "") if role == "host": person_name = person.text if person_name: episode_data["hosts"].append(person_name.strip()) # Parse podcast:transcript elements transcript_elems = item.findall('.//{https://podcastindex.org/namespace/1.0}transcript') for transcript in transcript_elems: transcript_url = transcript.get("url") transcript_type = transcript.get("type", "") if transcript_url: episode_data["transcript_urls"].append({ "url": transcript_url, "type": transcript_type, "language": transcript.get("language", "en-us") }) # For backward compatibility, set transcript_url to the first available transcript if episode_data["transcript_urls"]: episode_data["transcript_url"] = episode_data["transcript_urls"][0]["url"] else: episode_data["transcript_url"] = None # Parse podcast:chapters chapters_elem = item.find('.//{https://podcastindex.org/namespace/1.0}chapters') if chapters_elem is not None: episode_data["chapters_url"] = chapters_elem.get("url") return episode_data

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Red5d/jupiterbroadcasting_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server