"""MCP server for downloading Instagram videos via Instaloader."""
from __future__ import annotations
from pathlib import Path
import os
import re
from typing import Any
from mcp.server.fastmcp import FastMCP
import instaloader
DEFAULT_DOWNLOAD_ROOT: Path = Path.home() / "Downloads" / "instagram"
try:
from dotenv import load_dotenv
except Exception:
load_dotenv = None
if load_dotenv is not None:
load_dotenv()
def _resolve_download_root(download_root: str | None) -> Path:
if download_root is None:
return DEFAULT_DOWNLOAD_ROOT
cleaned = download_root.strip()
if not cleaned:
return DEFAULT_DOWNLOAD_ROOT
return Path(cleaned).expanduser()
def extract_shortcode(url: str) -> str:
"""Extract an Instagram shortcode from a supported URL.
Supported formats include:
- https://www.instagram.com/p/<shortcode>/
- https://www.instagram.com/reel/<shortcode>/
- https://www.instagram.com/tv/<shortcode>/
Args:
url: Instagram post/reel/tv URL.
Returns:
The extracted shortcode.
Raises:
ValueError: If the shortcode cannot be extracted.
"""
pattern = (
r"(?:https?://)?(?:www\.)?(?:m\.)?instagram\.com/"
r"(?:p|reel|tv)/([A-Za-z0-9_-]+)"
)
match = re.search(pattern, url)
if not match:
raise ValueError("Unsupported Instagram URL format.")
return match.group(1)
def _build_instaloader(
download_root: Path,
download_pictures: bool = False,
download_videos: bool = True,
save_metadata: bool = False,
save_caption: bool = False,
) -> instaloader.Instaloader:
dirname_pattern = str(download_root / "{target}")
post_metadata_txt_pattern = "{caption}" if save_caption else ""
return instaloader.Instaloader(
quiet=True,
dirname_pattern=dirname_pattern,
download_pictures=download_pictures,
download_videos=download_videos,
download_video_thumbnails=False,
download_geotags=False,
download_comments=False,
save_metadata=save_metadata,
compress_json=False,
post_metadata_txt_pattern=post_metadata_txt_pattern,
)
def _login_if_needed(
loader: instaloader.Instaloader,
username: str | None,
password: str | None,
) -> None:
if username is None and password is None:
return
if not username or not password:
raise ValueError(
"Both username and password must be provided together."
)
loader.login(username, password)
def _normalize_credential(value: str | None) -> str | None:
if value is None:
return None
stripped = value.strip()
return stripped if stripped else None
def _resolve_credentials(
username: str | None,
password: str | None,
) -> tuple[str | None, str | None]:
user = _normalize_credential(username)
pwd = _normalize_credential(password)
if user is None or pwd is None:
env_user = _normalize_credential(os.getenv("IG_USERNAME"))
env_pwd = _normalize_credential(os.getenv("IG_PASSWORD"))
if user is None:
user = env_user
if pwd is None:
pwd = env_pwd
return user, pwd
def _require_login(
username: str | None,
password: str | None,
feature: str,
) -> None:
if not username or not password:
raise ValueError(
f"Downloading {feature} requires login. "
"Provide username/password or set IG_USERNAME/IG_PASSWORD."
)
def _post_has_video(post: instaloader.Post) -> bool:
if getattr(post, "is_video", False):
return True
try:
for node in post.get_sidecar_nodes():
if getattr(node, "is_video", False):
return True
return False
except Exception:
return False
def _snapshot_files(root: Path) -> set[Path]:
if not root.exists():
return set()
return {path for path in root.rglob("*") if path.is_file()}
def _collect_paths_by_suffixes(
paths: set[Path],
suffixes: set[str],
) -> list[str]:
matched = [str(path) for path in paths if path.suffix.lower() in suffixes]
return sorted(matched)
def _sanitize_target_component(value: str) -> str:
cleaned = value.strip()
if not cleaned:
return "untitled"
sanitized = re.sub(r"[^A-Za-z0-9_-]+", "_", cleaned)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
return sanitized if sanitized else "untitled"
def _download_post_for_shortcode(
loader: instaloader.Instaloader,
shortcode: str,
download_root: Path,
) -> tuple[Path, set[Path]]:
download_root.mkdir(parents=True, exist_ok=True)
download_dir = download_root / shortcode
before = _snapshot_files(download_dir)
post = instaloader.Post.from_shortcode(loader.context, shortcode)
loader.download_post(post, target=shortcode)
after = _snapshot_files(download_dir)
if not after:
raise RuntimeError("Download completed but no files were found.")
new_files = after - before
return download_dir, new_files if new_files else after
def _download_videos_for_shortcode(
loader: instaloader.Instaloader,
shortcode: str,
download_root: Path,
) -> list[Path]:
download_root.mkdir(parents=True, exist_ok=True)
download_dir = download_root / shortcode
before = _snapshot_files(download_dir)
post = instaloader.Post.from_shortcode(loader.context, shortcode)
if not _post_has_video(post):
raise ValueError("The Instagram post does not contain video content.")
loader.download_post(post, target=shortcode)
after = _snapshot_files(download_dir)
new_files = after - before
video_files = sorted(
[path for path in new_files if path.suffix.lower() == ".mp4"],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if video_files:
return video_files
all_videos = sorted(
[path for path in after if path.suffix.lower() == ".mp4"],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if all_videos:
return all_videos
raise RuntimeError("Download completed but no video files were found.")
mcp = FastMCP("ig-download-mcp")
@mcp.tool()
def download_instagram_video(
url: str,
username: str | None = None,
password: str | None = None,
save_metadata: bool = True,
save_caption: bool = True,
download_root: str | None = None,
) -> dict[str, Any]:
"""Download an Instagram video's media files to the local filesystem.
Args:
url: Instagram post/reel/tv URL.
username: Optional Instagram username for authenticated access.
password: Optional Instagram password for authenticated access.
save_metadata: Save JSON metadata files when available.
save_caption: Save caption into a text file when available.
download_root: Optional override for download directory.
Returns:
A JSON-serializable dict containing download results.
"""
try:
shortcode = extract_shortcode(url)
target_root = _resolve_download_root(download_root)
loader = _build_instaloader(
target_root,
download_pictures=False,
download_videos=True,
save_metadata=save_metadata,
save_caption=save_caption,
)
user, pwd = _resolve_credentials(username, password)
_login_if_needed(loader, user, pwd)
video_files = _download_videos_for_shortcode(
loader,
shortcode,
target_root,
)
download_dir = target_root / shortcode
after = _snapshot_files(download_dir)
metadata_files = _collect_paths_by_suffixes(after, {".json"})
caption_files = _collect_paths_by_suffixes(after, {".txt"})
return {
"success": True,
"shortcode": shortcode,
"download_dir": str(download_dir),
"video_files": [str(path) for path in video_files],
"metadata_files": metadata_files,
"caption_files": caption_files,
}
except instaloader.exceptions.InstaloaderException as exc:
return {
"success": False,
"error": f"Instaloader error: {exc.__class__.__name__}: {exc}",
}
except Exception as exc:
return {"success": False, "error": str(exc)}
@mcp.tool()
def download_instagram_post(
url: str,
include_videos: bool = True,
save_metadata: bool = True,
save_caption: bool = True,
username: str | None = None,
password: str | None = None,
download_root: str | None = None,
) -> dict[str, Any]:
"""Download an Instagram post's media files to the local filesystem.
Args:
url: Instagram post/reel/tv URL.
include_videos: Whether to download videos alongside pictures.
save_metadata: Save JSON metadata files when available.
save_caption: Save caption into a text file when available.
username: Optional Instagram username for authenticated access.
password: Optional Instagram password for authenticated access.
download_root: Optional override for download directory.
Returns:
A JSON-serializable dict containing download results.
"""
try:
shortcode = extract_shortcode(url)
target_root = _resolve_download_root(download_root)
loader = _build_instaloader(
target_root,
download_pictures=True,
download_videos=include_videos,
save_metadata=save_metadata,
save_caption=save_caption,
)
user, pwd = _resolve_credentials(username, password)
_login_if_needed(loader, user, pwd)
download_dir, downloaded_files = _download_post_for_shortcode(
loader,
shortcode,
target_root,
)
image_files = _collect_paths_by_suffixes(
downloaded_files,
{".jpg", ".jpeg", ".png", ".webp"},
)
video_files = _collect_paths_by_suffixes(downloaded_files, {".mp4"})
metadata_files = _collect_paths_by_suffixes(
downloaded_files,
{".json"},
)
caption_files = _collect_paths_by_suffixes(
downloaded_files,
{".txt"},
)
return {
"success": True,
"shortcode": shortcode,
"download_dir": str(download_dir),
"image_files": image_files,
"video_files": video_files,
"metadata_files": metadata_files,
"caption_files": caption_files,
}
except instaloader.exceptions.InstaloaderException as exc:
return {
"success": False,
"error": f"Instaloader error: {exc.__class__.__name__}: {exc}",
}
except Exception as exc:
return {"success": False, "error": str(exc)}
@mcp.tool()
def download_instagram_profile_pic(
username_target: str,
username: str | None = None,
password: str | None = None,
download_root: str | None = None,
) -> dict[str, Any]:
"""Download an Instagram profile picture to the local filesystem.
Args:
username_target: Target Instagram username.
username: Optional Instagram username for authenticated access.
password: Optional Instagram password for authenticated access.
download_root: Optional override for download directory.
Returns:
A JSON-serializable dict containing download results.
"""
try:
target_username = username_target.strip()
if not target_username:
raise ValueError("username_target must not be empty.")
target_root = _resolve_download_root(download_root)
loader = _build_instaloader(
target_root,
download_pictures=True,
download_videos=False,
save_metadata=False,
save_caption=False,
)
user, pwd = _resolve_credentials(username, password)
_login_if_needed(loader, user, pwd)
profile = instaloader.Profile.from_username(
loader.context,
target_username,
)
download_dir = target_root / target_username.lower()
before = _snapshot_files(download_dir)
loader.download_profilepic(profile)
after = _snapshot_files(download_dir)
if not after:
raise RuntimeError("Download completed but no files were found.")
downloaded_files = (after - before) or after
image_files = _collect_paths_by_suffixes(
downloaded_files,
{".jpg", ".jpeg", ".png", ".webp"},
)
return {
"success": True,
"username_target": target_username,
"download_dir": str(download_dir),
"image_files": image_files,
}
except instaloader.exceptions.InstaloaderException as exc:
return {
"success": False,
"error": f"Instaloader error: {exc.__class__.__name__}: {exc}",
}
except Exception as exc:
return {"success": False, "error": str(exc)}
@mcp.tool()
def download_instagram_stories(
username_target: str,
username: str | None = None,
password: str | None = None,
download_root: str | None = None,
) -> dict[str, Any]:
"""Download Instagram stories for a target user.
Note: This feature requires login.
Args:
username_target: Target Instagram username.
username: Instagram username for authenticated access.
password: Instagram password for authenticated access.
download_root: Optional override for download directory.
Returns:
A JSON-serializable dict containing download results.
"""
try:
target_username = username_target.strip()
if not target_username:
raise ValueError("username_target must not be empty.")
target_root = _resolve_download_root(download_root)
loader = _build_instaloader(
target_root,
download_pictures=True,
download_videos=True,
save_metadata=False,
save_caption=False,
)
user, pwd = _resolve_credentials(username, password)
_require_login(user, pwd, "stories")
_login_if_needed(loader, user, pwd)
target_dirname = target_username.lower()
profile = instaloader.Profile.from_username(
loader.context,
target_dirname,
)
download_dir = target_root / target_dirname
before = _snapshot_files(download_dir)
num_items = 0
num_downloaded = 0
for story in loader.get_stories(userids=[profile.userid]):
for item in story.get_items():
num_items += 1
if loader.download_storyitem(item, target=target_dirname):
num_downloaded += 1
after = _snapshot_files(download_dir)
downloaded_files = (after - before) or after
image_files = _collect_paths_by_suffixes(
downloaded_files,
{".jpg", ".jpeg", ".png", ".webp"},
)
video_files = _collect_paths_by_suffixes(downloaded_files, {".mp4"})
return {
"success": True,
"username_target": target_username,
"download_dir": str(download_dir),
"num_items": num_items,
"num_downloaded": num_downloaded,
"image_files": image_files,
"video_files": video_files,
}
except instaloader.exceptions.InstaloaderException as exc:
return {
"success": False,
"error": f"Instaloader error: {exc.__class__.__name__}: {exc}",
}
except Exception as exc:
return {"success": False, "error": str(exc)}
@mcp.tool()
def download_instagram_highlights(
username_target: str,
highlight_title: str | None = None,
username: str | None = None,
password: str | None = None,
download_root: str | None = None,
) -> dict[str, Any]:
"""Download Instagram highlights for a target user.
Note: This feature requires login.
Args:
username_target: Target Instagram username.
highlight_title: Optional highlight title filter.
username: Instagram username for authenticated access.
password: Instagram password for authenticated access.
download_root: Optional override for download directory.
Returns:
A JSON-serializable dict containing download results.
"""
try:
target_username = username_target.strip()
if not target_username:
raise ValueError("username_target must not be empty.")
title_filter = highlight_title.strip() if highlight_title else None
target_root = _resolve_download_root(download_root)
loader = _build_instaloader(
target_root,
download_pictures=True,
download_videos=True,
save_metadata=False,
save_caption=False,
)
user, pwd = _resolve_credentials(username, password)
_require_login(user, pwd, "highlights")
_login_if_needed(loader, user, pwd)
base_username = target_username.lower()
profile = instaloader.Profile.from_username(
loader.context,
base_username,
)
results: list[dict[str, Any]] = []
for highlight in loader.get_highlights(profile):
if title_filter and (
highlight.title.strip().casefold()
!= title_filter.casefold()
):
continue
safe_title = _sanitize_target_component(highlight.title)
highlight_dirname = (
f"{base_username}_highlight_{safe_title}_{highlight.unique_id}"
)
download_dir = target_root / highlight_dirname
before = _snapshot_files(download_dir)
num_items = 0
num_downloaded = 0
for item in highlight.get_items():
num_items += 1
if loader.download_storyitem(item, target=highlight_dirname):
num_downloaded += 1
after = _snapshot_files(download_dir)
downloaded_files = (after - before) or after
results.append(
{
"title": highlight.title,
"unique_id": highlight.unique_id,
"download_dir": str(download_dir),
"num_items": num_items,
"num_downloaded": num_downloaded,
"image_files": _collect_paths_by_suffixes(
downloaded_files,
{".jpg", ".jpeg", ".png", ".webp"},
),
"video_files": _collect_paths_by_suffixes(
downloaded_files,
{".mp4"},
),
}
)
if title_filter and not results:
raise ValueError(
"No highlights matched the given highlight_title."
)
return {
"success": True,
"username_target": target_username,
"highlight_title": title_filter,
"highlights": results,
}
except instaloader.exceptions.InstaloaderException as exc:
return {
"success": False,
"error": f"Instaloader error: {exc.__class__.__name__}: {exc}",
}
except Exception as exc:
return {"success": False, "error": str(exc)}
def run() -> None:
"""Run the MCP server over stdio."""
mcp.run()