Skip to main content
Glama
KaiQin04
by KaiQin04
server.py19.7 kB
"""MCP server for downloading Instagram videos via Instaloader.""" from __future__ import annotations from pathlib import Path import os import re from typing import Any from mcp.server.fastmcp import FastMCP import instaloader DEFAULT_DOWNLOAD_ROOT: Path = Path.home() / "Downloads" / "instagram" try: from dotenv import load_dotenv except Exception: load_dotenv = None if load_dotenv is not None: load_dotenv() def _resolve_download_root(download_root: str | None) -> Path: if download_root is None: return DEFAULT_DOWNLOAD_ROOT cleaned = download_root.strip() if not cleaned: return DEFAULT_DOWNLOAD_ROOT return Path(cleaned).expanduser() def extract_shortcode(url: str) -> str: """Extract an Instagram shortcode from a supported URL. Supported formats include: - https://www.instagram.com/p/<shortcode>/ - https://www.instagram.com/reel/<shortcode>/ - https://www.instagram.com/tv/<shortcode>/ Args: url: Instagram post/reel/tv URL. Returns: The extracted shortcode. Raises: ValueError: If the shortcode cannot be extracted. """ pattern = ( r"(?:https?://)?(?:www\.)?(?:m\.)?instagram\.com/" r"(?:p|reel|tv)/([A-Za-z0-9_-]+)" ) match = re.search(pattern, url) if not match: raise ValueError("Unsupported Instagram URL format.") return match.group(1) def _build_instaloader( download_root: Path, download_pictures: bool = False, download_videos: bool = True, save_metadata: bool = False, save_caption: bool = False, ) -> instaloader.Instaloader: dirname_pattern = str(download_root / "{target}") post_metadata_txt_pattern = "{caption}" if save_caption else "" return instaloader.Instaloader( quiet=True, dirname_pattern=dirname_pattern, download_pictures=download_pictures, download_videos=download_videos, download_video_thumbnails=False, download_geotags=False, download_comments=False, save_metadata=save_metadata, compress_json=False, post_metadata_txt_pattern=post_metadata_txt_pattern, ) def _login_if_needed( loader: instaloader.Instaloader, username: str | None, password: str | None, ) -> None: if username is None and password is None: return if not username or not password: raise ValueError( "Both username and password must be provided together." ) loader.login(username, password) def _normalize_credential(value: str | None) -> str | None: if value is None: return None stripped = value.strip() return stripped if stripped else None def _resolve_credentials( username: str | None, password: str | None, ) -> tuple[str | None, str | None]: user = _normalize_credential(username) pwd = _normalize_credential(password) if user is None or pwd is None: env_user = _normalize_credential(os.getenv("IG_USERNAME")) env_pwd = _normalize_credential(os.getenv("IG_PASSWORD")) if user is None: user = env_user if pwd is None: pwd = env_pwd return user, pwd def _require_login( username: str | None, password: str | None, feature: str, ) -> None: if not username or not password: raise ValueError( f"Downloading {feature} requires login. " "Provide username/password or set IG_USERNAME/IG_PASSWORD." ) def _post_has_video(post: instaloader.Post) -> bool: if getattr(post, "is_video", False): return True try: for node in post.get_sidecar_nodes(): if getattr(node, "is_video", False): return True return False except Exception: return False def _snapshot_files(root: Path) -> set[Path]: if not root.exists(): return set() return {path for path in root.rglob("*") if path.is_file()} def _collect_paths_by_suffixes( paths: set[Path], suffixes: set[str], ) -> list[str]: matched = [str(path) for path in paths if path.suffix.lower() in suffixes] return sorted(matched) def _sanitize_target_component(value: str) -> str: cleaned = value.strip() if not cleaned: return "untitled" sanitized = re.sub(r"[^A-Za-z0-9_-]+", "_", cleaned) sanitized = re.sub(r"_+", "_", sanitized).strip("_") return sanitized if sanitized else "untitled" def _download_post_for_shortcode( loader: instaloader.Instaloader, shortcode: str, download_root: Path, ) -> tuple[Path, set[Path]]: download_root.mkdir(parents=True, exist_ok=True) download_dir = download_root / shortcode before = _snapshot_files(download_dir) post = instaloader.Post.from_shortcode(loader.context, shortcode) loader.download_post(post, target=shortcode) after = _snapshot_files(download_dir) if not after: raise RuntimeError("Download completed but no files were found.") new_files = after - before return download_dir, new_files if new_files else after def _download_videos_for_shortcode( loader: instaloader.Instaloader, shortcode: str, download_root: Path, ) -> list[Path]: download_root.mkdir(parents=True, exist_ok=True) download_dir = download_root / shortcode before = _snapshot_files(download_dir) post = instaloader.Post.from_shortcode(loader.context, shortcode) if not _post_has_video(post): raise ValueError("The Instagram post does not contain video content.") loader.download_post(post, target=shortcode) after = _snapshot_files(download_dir) new_files = after - before video_files = sorted( [path for path in new_files if path.suffix.lower() == ".mp4"], key=lambda p: p.stat().st_mtime, reverse=True, ) if video_files: return video_files all_videos = sorted( [path for path in after if path.suffix.lower() == ".mp4"], key=lambda p: p.stat().st_mtime, reverse=True, ) if all_videos: return all_videos raise RuntimeError("Download completed but no video files were found.") mcp = FastMCP("ig-download-mcp") @mcp.tool() def download_instagram_video( url: str, username: str | None = None, password: str | None = None, save_metadata: bool = True, save_caption: bool = True, download_root: str | None = None, ) -> dict[str, Any]: """Download an Instagram video's media files to the local filesystem. Args: url: Instagram post/reel/tv URL. username: Optional Instagram username for authenticated access. password: Optional Instagram password for authenticated access. save_metadata: Save JSON metadata files when available. save_caption: Save caption into a text file when available. download_root: Optional override for download directory. Returns: A JSON-serializable dict containing download results. """ try: shortcode = extract_shortcode(url) target_root = _resolve_download_root(download_root) loader = _build_instaloader( target_root, download_pictures=False, download_videos=True, save_metadata=save_metadata, save_caption=save_caption, ) user, pwd = _resolve_credentials(username, password) _login_if_needed(loader, user, pwd) video_files = _download_videos_for_shortcode( loader, shortcode, target_root, ) download_dir = target_root / shortcode after = _snapshot_files(download_dir) metadata_files = _collect_paths_by_suffixes(after, {".json"}) caption_files = _collect_paths_by_suffixes(after, {".txt"}) return { "success": True, "shortcode": shortcode, "download_dir": str(download_dir), "video_files": [str(path) for path in video_files], "metadata_files": metadata_files, "caption_files": caption_files, } except instaloader.exceptions.InstaloaderException as exc: return { "success": False, "error": f"Instaloader error: {exc.__class__.__name__}: {exc}", } except Exception as exc: return {"success": False, "error": str(exc)} @mcp.tool() def download_instagram_post( url: str, include_videos: bool = True, save_metadata: bool = True, save_caption: bool = True, username: str | None = None, password: str | None = None, download_root: str | None = None, ) -> dict[str, Any]: """Download an Instagram post's media files to the local filesystem. Args: url: Instagram post/reel/tv URL. include_videos: Whether to download videos alongside pictures. save_metadata: Save JSON metadata files when available. save_caption: Save caption into a text file when available. username: Optional Instagram username for authenticated access. password: Optional Instagram password for authenticated access. download_root: Optional override for download directory. Returns: A JSON-serializable dict containing download results. """ try: shortcode = extract_shortcode(url) target_root = _resolve_download_root(download_root) loader = _build_instaloader( target_root, download_pictures=True, download_videos=include_videos, save_metadata=save_metadata, save_caption=save_caption, ) user, pwd = _resolve_credentials(username, password) _login_if_needed(loader, user, pwd) download_dir, downloaded_files = _download_post_for_shortcode( loader, shortcode, target_root, ) image_files = _collect_paths_by_suffixes( downloaded_files, {".jpg", ".jpeg", ".png", ".webp"}, ) video_files = _collect_paths_by_suffixes(downloaded_files, {".mp4"}) metadata_files = _collect_paths_by_suffixes( downloaded_files, {".json"}, ) caption_files = _collect_paths_by_suffixes( downloaded_files, {".txt"}, ) return { "success": True, "shortcode": shortcode, "download_dir": str(download_dir), "image_files": image_files, "video_files": video_files, "metadata_files": metadata_files, "caption_files": caption_files, } except instaloader.exceptions.InstaloaderException as exc: return { "success": False, "error": f"Instaloader error: {exc.__class__.__name__}: {exc}", } except Exception as exc: return {"success": False, "error": str(exc)} @mcp.tool() def download_instagram_profile_pic( username_target: str, username: str | None = None, password: str | None = None, download_root: str | None = None, ) -> dict[str, Any]: """Download an Instagram profile picture to the local filesystem. Args: username_target: Target Instagram username. username: Optional Instagram username for authenticated access. password: Optional Instagram password for authenticated access. download_root: Optional override for download directory. Returns: A JSON-serializable dict containing download results. """ try: target_username = username_target.strip() if not target_username: raise ValueError("username_target must not be empty.") target_root = _resolve_download_root(download_root) loader = _build_instaloader( target_root, download_pictures=True, download_videos=False, save_metadata=False, save_caption=False, ) user, pwd = _resolve_credentials(username, password) _login_if_needed(loader, user, pwd) profile = instaloader.Profile.from_username( loader.context, target_username, ) download_dir = target_root / target_username.lower() before = _snapshot_files(download_dir) loader.download_profilepic(profile) after = _snapshot_files(download_dir) if not after: raise RuntimeError("Download completed but no files were found.") downloaded_files = (after - before) or after image_files = _collect_paths_by_suffixes( downloaded_files, {".jpg", ".jpeg", ".png", ".webp"}, ) return { "success": True, "username_target": target_username, "download_dir": str(download_dir), "image_files": image_files, } except instaloader.exceptions.InstaloaderException as exc: return { "success": False, "error": f"Instaloader error: {exc.__class__.__name__}: {exc}", } except Exception as exc: return {"success": False, "error": str(exc)} @mcp.tool() def download_instagram_stories( username_target: str, username: str | None = None, password: str | None = None, download_root: str | None = None, ) -> dict[str, Any]: """Download Instagram stories for a target user. Note: This feature requires login. Args: username_target: Target Instagram username. username: Instagram username for authenticated access. password: Instagram password for authenticated access. download_root: Optional override for download directory. Returns: A JSON-serializable dict containing download results. """ try: target_username = username_target.strip() if not target_username: raise ValueError("username_target must not be empty.") target_root = _resolve_download_root(download_root) loader = _build_instaloader( target_root, download_pictures=True, download_videos=True, save_metadata=False, save_caption=False, ) user, pwd = _resolve_credentials(username, password) _require_login(user, pwd, "stories") _login_if_needed(loader, user, pwd) target_dirname = target_username.lower() profile = instaloader.Profile.from_username( loader.context, target_dirname, ) download_dir = target_root / target_dirname before = _snapshot_files(download_dir) num_items = 0 num_downloaded = 0 for story in loader.get_stories(userids=[profile.userid]): for item in story.get_items(): num_items += 1 if loader.download_storyitem(item, target=target_dirname): num_downloaded += 1 after = _snapshot_files(download_dir) downloaded_files = (after - before) or after image_files = _collect_paths_by_suffixes( downloaded_files, {".jpg", ".jpeg", ".png", ".webp"}, ) video_files = _collect_paths_by_suffixes(downloaded_files, {".mp4"}) return { "success": True, "username_target": target_username, "download_dir": str(download_dir), "num_items": num_items, "num_downloaded": num_downloaded, "image_files": image_files, "video_files": video_files, } except instaloader.exceptions.InstaloaderException as exc: return { "success": False, "error": f"Instaloader error: {exc.__class__.__name__}: {exc}", } except Exception as exc: return {"success": False, "error": str(exc)} @mcp.tool() def download_instagram_highlights( username_target: str, highlight_title: str | None = None, username: str | None = None, password: str | None = None, download_root: str | None = None, ) -> dict[str, Any]: """Download Instagram highlights for a target user. Note: This feature requires login. Args: username_target: Target Instagram username. highlight_title: Optional highlight title filter. username: Instagram username for authenticated access. password: Instagram password for authenticated access. download_root: Optional override for download directory. Returns: A JSON-serializable dict containing download results. """ try: target_username = username_target.strip() if not target_username: raise ValueError("username_target must not be empty.") title_filter = highlight_title.strip() if highlight_title else None target_root = _resolve_download_root(download_root) loader = _build_instaloader( target_root, download_pictures=True, download_videos=True, save_metadata=False, save_caption=False, ) user, pwd = _resolve_credentials(username, password) _require_login(user, pwd, "highlights") _login_if_needed(loader, user, pwd) base_username = target_username.lower() profile = instaloader.Profile.from_username( loader.context, base_username, ) results: list[dict[str, Any]] = [] for highlight in loader.get_highlights(profile): if title_filter and ( highlight.title.strip().casefold() != title_filter.casefold() ): continue safe_title = _sanitize_target_component(highlight.title) highlight_dirname = ( f"{base_username}_highlight_{safe_title}_{highlight.unique_id}" ) download_dir = target_root / highlight_dirname before = _snapshot_files(download_dir) num_items = 0 num_downloaded = 0 for item in highlight.get_items(): num_items += 1 if loader.download_storyitem(item, target=highlight_dirname): num_downloaded += 1 after = _snapshot_files(download_dir) downloaded_files = (after - before) or after results.append( { "title": highlight.title, "unique_id": highlight.unique_id, "download_dir": str(download_dir), "num_items": num_items, "num_downloaded": num_downloaded, "image_files": _collect_paths_by_suffixes( downloaded_files, {".jpg", ".jpeg", ".png", ".webp"}, ), "video_files": _collect_paths_by_suffixes( downloaded_files, {".mp4"}, ), } ) if title_filter and not results: raise ValueError( "No highlights matched the given highlight_title." ) return { "success": True, "username_target": target_username, "highlight_title": title_filter, "highlights": results, } except instaloader.exceptions.InstaloaderException as exc: return { "success": False, "error": f"Instaloader error: {exc.__class__.__name__}: {exc}", } except Exception as exc: return {"success": False, "error": str(exc)} def run() -> None: """Run the MCP server over stdio.""" mcp.run()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KaiQin04/ig-download-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server