# server.py
# Instagram API MCP Server - EnsembleData API を使用
import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from typing import Optional
from ensembledata.api import EDClient
# ログ設定
import os
import tempfile
log_dir = os.path.join(tempfile.gettempdir(), 'insta-mcp-server')
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'debug.log')
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# EnsembleData API設定
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 実際のAPIキーを使用する場合は、環境変数から読み込むことを推奨
API_KEY = os.getenv("ENSEMBLEDATA_API_KEY", "d6O4Z7TpUlFZ0lMO")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# MCPサーバー作成
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
mcp = FastMCP("insta-mcp-server")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# ヘルパー関数
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def _get_client() -> EDClient:
"""EnsembleData クライアントを取得"""
if not API_KEY:
raise ValueError("ENSEMBLEDATA_API_KEY not configured")
return EDClient(token=API_KEY)
def format_user_info(data: dict, detailed: bool = False) -> str:
"""ユーザー情報を整形"""
if "error" in data:
return f"❌ エラー: {data['error']}"
result = "━━━━━━━━━━━━━━━━━━━━\n"
result += "👤 Instagram ユーザー詳細情報\n"
result += "━━━━━━━━━━━━━━━━━━━━\n\n"
# 基本情報
if "username" in data:
result += f"🔤 ユーザー名: @{data['username']}\n"
if "full_name" in data:
result += f"📛 表示名: {data['full_name']}\n"
if "id" in data:
result += f"🆔 ユーザーID: {data['id']}\n"
# 認証バッジ
if "is_verified" in data and data["is_verified"]:
result += "✅ 認証済みアカウント\n"
# アカウントタイプ
account_types = []
if data.get("is_private"):
account_types.append("非公開")
if data.get("is_business_account"):
account_types.append("ビジネス")
if data.get("is_professional_account"):
account_types.append("プロフェッショナル")
if account_types:
result += f"📋 アカウントタイプ: {', '.join(account_types)}\n"
# カテゴリー
if "category_name" in data and data["category_name"]:
result += f"🏷️ カテゴリー: {data['category_name']}\n"
result += "\n"
# プロフィール
if "biography" in data and data["biography"]:
bio_text = data["biography"]
# 長いプロフィールは省略
if len(bio_text) > 200:
bio_text = bio_text[:200] + "..."
result += f"📝 プロフィール:\n{bio_text}\n\n"
# 外部リンク
if "external_url" in data and data["external_url"]:
result += f"🔗 外部リンク: {data['external_url']}\n"
# バイオリンク
if "bio_links" in data and data["bio_links"]:
result += "🔗 バイオリンク:\n"
for link in data["bio_links"][:3]: # 最大3つまで表示
if "url" in link:
title = link.get("title", "リンク")
result += f" • {title if title else link['url']}\n"
result += "\n"
# 統計情報
result += "📊 統計情報:\n"
# フォロワー数(複数のフィールド形式に対応)
follower_count = 0
if "edge_followed_by" in data and "count" in data["edge_followed_by"]:
follower_count = data["edge_followed_by"]["count"]
elif "follower_count" in data:
follower_count = data["follower_count"]
elif "followers" in data:
follower_count = data["followers"]
if follower_count > 0:
result += f" 👥 フォロワー: {follower_count:,}\n"
# フォロー数
following_count = 0
if "edge_follow" in data and "count" in data["edge_follow"]:
following_count = data["edge_follow"]["count"]
elif "following_count" in data:
following_count = data["following_count"]
elif "following" in data:
following_count = data["following"]
if following_count > 0:
result += f" ➕ フォロー中: {following_count:,}\n"
# 投稿数
media_count = 0
if "edge_owner_to_timeline_media" in data and "count" in data["edge_owner_to_timeline_media"]:
media_count = data["edge_owner_to_timeline_media"]["count"]
elif "media_count" in data:
media_count = data["media_count"]
if media_count > 0:
result += f" 📸 投稿数: {media_count:,}\n"
# リール/動画数
if "edge_felix_video_timeline" in data:
video_count = data["edge_felix_video_timeline"].get("count", 0)
if video_count > 0:
result += f" 🎬 動画/リール: {video_count:,}\n"
# ハイライト数
if "highlight_reel_count" in data and data["highlight_reel_count"] > 0:
result += f" ⭐ ハイライト: {data['highlight_reel_count']}\n"
result += "\n"
# プロフィール画像
if "profile_pic_url_hd" in data:
result += f"🖼️ プロフィール画像(HD):\n{data['profile_pic_url_hd']}\n\n"
elif "profile_pic_url" in data:
result += f"🖼️ プロフィール画像:\n{data['profile_pic_url']}\n\n"
# ビジネス情報(該当する場合)
if data.get("is_business_account") or data.get("is_professional_account"):
business_info = []
if "business_email" in data and data["business_email"]:
business_info.append(f"📧 Email: {data['business_email']}")
if "business_phone_number" in data and data["business_phone_number"]:
business_info.append(f"📞 電話: {data['business_phone_number']}")
if "business_category_name" in data and data["business_category_name"]:
business_info.append(f"🏢 業種: {data['business_category_name']}")
if business_info:
result += "💼 ビジネス情報:\n"
for info in business_info:
result += f" {info}\n"
result += "\n"
return result
def format_posts(data: dict, limit: int = 10) -> str:
"""投稿リストを整形"""
if "error" in data:
return f"❌ エラー: {data['error']}"
posts = data.get("posts", data.get("data", []))
if not posts:
return "投稿が見つかりませんでした。"
result = f"━━━━━━━━━━━━━━━━━━━━\n"
result += f"📸 Instagram 投稿({min(len(posts), limit)}件)\n"
result += f"━━━━━━━━━━━━━━━━━━━━\n\n"
for idx, post in enumerate(posts[:limit], 1):
result += f"【投稿 {idx}】\n"
# キャプション
if "edge_media_to_caption" in post:
edges = post["edge_media_to_caption"].get("edges", [])
if edges and len(edges) > 0:
caption_text = edges[0].get("node", {}).get("text", "")
if caption_text:
caption = caption_text[:100] + "..." if len(caption_text) > 100 else caption_text
result += f"📝 {caption}\n"
elif "caption" in post:
caption = post["caption"][:100] + "..." if len(post.get("caption", "")) > 100 else post.get("caption", "")
result += f"📝 {caption}\n"
# いいね数
if "edge_liked_by" in post and "count" in post["edge_liked_by"]:
result += f"❤️ {post['edge_liked_by']['count']:,} いいね\n"
elif "like_count" in post:
result += f"❤️ {post['like_count']:,} いいね\n"
# コメント数
if "edge_media_to_comment" in post and "count" in post["edge_media_to_comment"]:
result += f"💬 {post['edge_media_to_comment']['count']:,} コメント\n"
elif "comment_count" in post:
result += f"💬 {post['comment_count']:,} コメント\n"
# URL
if "shortcode" in post:
result += f"🔗 https://instagram.com/p/{post['shortcode']}\n"
# メディアタイプ
if "is_video" in post:
media_type = "🎥 動画" if post["is_video"] else "📷 画像"
result += f"タイプ: {media_type}\n"
result += "\n"
return result
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# ツール定義
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@mcp.tool()
async def instagram_user_info(username: str) -> str:
"""
Instagramユーザーの詳細情報を取得します。
プロフィール、統計、外部リンクなどの包括的な情報を提供します。
Args:
username: Instagramユーザー名(@なし)
Returns:
ユーザー詳細情報の文字列
"""
logger.info(f"instagram_user_info called with username={username}")
try:
client = _get_client()
# 非同期で実行するため、asyncio.to_threadを使用
result = await asyncio.to_thread(
client.instagram.user_detailed_info,
username=username
)
data = result.data if hasattr(result, 'data') else {}
# API使用量情報を追加
formatted_result = format_user_info(data, detailed=True)
if hasattr(result, 'units_charged'):
formatted_result += f"\n⚡ API使用量: {result.units_charged} ユニット\n"
return formatted_result
except Exception as e:
logger.exception(f"Error in instagram_user_info: {e}")
return format_user_info({"error": str(e)})
@mcp.tool()
async def instagram_user_posts(username: str, count: int = 12) -> str:
"""
Instagramユーザーの投稿一覧を取得します。
Args:
username: Instagramユーザー名(@なし)
count: 取得する投稿数(デフォルト: 12)
Returns:
投稿一覧の文字列
"""
logger.info(f"instagram_user_posts called with username={username}, count={count}")
try:
client = _get_client()
# ユーザー詳細情報を取得(投稿情報も含まれる)
user_info = await asyncio.to_thread(
client.instagram.user_detailed_info,
username=username
)
data = user_info.data if hasattr(user_info, 'data') else {}
# 投稿情報を取得
posts_edges = data.get('edge_owner_to_timeline_media', {}).get('edges', [])
posts = [edge.get('node', {}) for edge in posts_edges[:count]]
return format_posts({"posts": posts}, limit=count)
except Exception as e:
logger.exception(f"Error in instagram_user_posts: {e}")
return format_posts({"error": str(e)}, limit=count)
@mcp.tool()
async def instagram_user_stats(username: str) -> str:
"""
Instagramユーザーの統計情報を取得します。
Args:
username: Instagramユーザー名(@なし)
Returns:
統計情報の文字列
"""
logger.info(f"instagram_user_stats called with username={username}")
try:
client = _get_client()
# まずuser_idを取得
user_info = await asyncio.to_thread(
client.instagram.user_detailed_info,
username=username
)
user_id = user_info.data.get('id') if hasattr(user_info, 'data') else None
if not user_id:
return "❌ エラー: ユーザーIDが取得できませんでした"
# user_idで基本統計を取得
stats_result = await asyncio.to_thread(
client.instagram.user_basic_stats,
user_id=int(user_id)
)
data = stats_result.data if hasattr(stats_result, 'data') else {}
result = "━━━━━━━━━━━━━━━━━━━━\n"
result += "📊 Instagram 統計情報\n"
result += "━━━━━━━━━━━━━━━━━━━━\n\n"
if "followers" in data:
result += f"👥 フォロワー数: {data['followers']:,}\n"
elif "follower_count" in data:
result += f"👥 フォロワー数: {data['follower_count']:,}\n"
if "following" in data:
result += f"➕ フォロー数: {data['following']:,}\n"
elif "following_count" in data:
result += f"➕ フォロー数: {data['following_count']:,}\n"
if "media_count" in data:
result += f"📸 投稿数: {data['media_count']:,}\n"
if "engagement_rate" in data:
result += f"💫 エンゲージメント率: {data['engagement_rate']:.2f}%\n"
return result
except Exception as e:
logger.exception(f"Error in instagram_user_stats: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
async def instagram_user_reels(username: str, count: int = 12) -> str:
"""
Instagramユーザーのリール(短尺動画)を取得します。
Args:
username: Instagramユーザー名(@なし)
count: 取得するリール数(デフォルト: 12)
Returns:
リール一覧の文字列
"""
logger.info(f"instagram_user_reels called with username={username}, count={count}")
try:
client = _get_client()
# まずuser_idを取得
user_info = await asyncio.to_thread(
client.instagram.user_detailed_info,
username=username
)
user_id = user_info.data.get('id') if hasattr(user_info, 'data') else None
if not user_id:
return "❌ エラー: ユーザーIDが取得できませんでした"
# depthを計算(countに基づいて適切なdepthを設定)
depth = max(1, (count // 10) + 1)
# リールを取得
reels_result = await asyncio.to_thread(
client.instagram.user_reels,
user_id=int(user_id),
depth=depth
)
reels_data = reels_result.data if hasattr(reels_result, 'data') else {}
reels = reels_data.get('reels', []) if isinstance(reels_data, dict) else []
if not reels:
return "リールが見つかりませんでした。"
result = f"━━━━━━━━━━━━━━━━━━━━\n"
result += f"🎬 Instagram リール({min(len(reels), count)}件)\n"
result += f"━━━━━━━━━━━━━━━━━━━━\n\n"
for idx, reel in enumerate(reels[:count], 1):
media = reel.get('media', {}) if isinstance(reel, dict) else {}
result += f"【リール {idx}】\n"
if "code" in media:
result += f"🔗 https://www.instagram.com/reel/{media['code']}/\n"
if "play_count" in media:
result += f"▶️ 再生数: {media['play_count']:,}\n"
if "like_count" in media:
result += f"❤️ いいね: {media['like_count']:,}\n"
if "comment_count" in media:
result += f"💬 コメント: {media['comment_count']:,}\n"
result += "\n"
return result
except Exception as e:
logger.exception(f"Error in instagram_user_reels: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
async def instagram_search(
query: str,
search_type: str = "users"
) -> str:
"""
Instagramでユーザーやハッシュタグを検索します。
Args:
query: 検索キーワード
search_type: 検索タイプ("users", "hashtags", "places")- 現在は"users"のみ対応
Returns:
検索結果の文字列
"""
logger.info(f"instagram_search called with query={query}, type={search_type}")
try:
client = _get_client()
# 検索を実行
search_result = await asyncio.to_thread(
client.instagram.search,
text=query
)
data = search_result.data if hasattr(search_result, 'data') else {}
users_data = data.get('users', []) if isinstance(data, dict) else []
if not users_data:
return "検索結果が見つかりませんでした。"
# userオブジェクトを抽出
users = [item.get('user', {}) for item in users_data if isinstance(item, dict) and 'user' in item]
if not users:
return "検索結果が見つかりませんでした。"
result = f"━━━━━━━━━━━━━━━━━━━━\n"
result += f"🔍 検索結果: {query}\n"
result += f"━━━━━━━━━━━━━━━━━━━━\n\n"
for idx, user in enumerate(users[:10], 1):
result += f"【{idx}】"
if "username" in user:
result += f" @{user['username']}"
if "full_name" in user:
result += f" - {user['full_name']}"
if "follower_count" in user:
result += f" ({user['follower_count']:,} フォロワー)"
elif "pk" in user:
result += f" (ID: {user['pk']})"
result += "\n"
return result
except Exception as e:
logger.exception(f"Error in instagram_search: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
async def instagram_post_info(post_url: str, n_comments: int = 10) -> str:
"""
Instagram投稿の詳細情報とコメントを取得します。
Args:
post_url: Instagram投稿のURL(例: https://instagram.com/p/xxxxx)
n_comments: 取得するコメント数(デフォルト: 10、最大: 50)
Returns:
投稿情報の文字列
"""
logger.info(f"instagram_post_info called with post_url={post_url}, n_comments={n_comments}")
try:
# URLからショートコードを抽出
shortcode = post_url.split("/p/")[-1].split("/")[0].split("?")[0]
# コメント数の上限をチェック
n_comments = min(max(0, n_comments), 50)
client = _get_client()
# 投稿情報とコメントを取得
post_result = await asyncio.to_thread(
client.instagram.post_info_and_comments,
code=shortcode,
n_comments_to_fetch=n_comments
)
data = post_result.data if hasattr(post_result, 'data') else {}
if not data or "error" in data:
return f"❌ エラー: 投稿情報を取得できませんでした"
result = "━━━━━━━━━━━━━━━━━━━━\n"
result += "📸 Instagram 投稿詳細\n"
result += "━━━━━━━━━━━━━━━━━━━━\n\n"
# 基本情報
result += f"🔗 URL: https://instagram.com/p/{shortcode}\n"
if "id" in data:
result += f"📌 投稿ID: {data['id']}\n"
# 投稿者情報
if "owner" in data:
owner = data["owner"]
if "username" in owner:
result += f"👤 投稿者: @{owner['username']}"
if "full_name" in owner:
result += f" ({owner['full_name']})"
result += "\n"
if "is_verified" in owner and owner["is_verified"]:
result += "✓ 認証済みアカウント\n"
# キャプション
if "edge_media_to_caption" in data:
edges = data["edge_media_to_caption"].get("edges", [])
if edges and len(edges) > 0:
caption_text = edges[0].get("node", {}).get("text", "")
if caption_text:
# キャプションが長い場合は省略
display_caption = caption_text[:200] + "..." if len(caption_text) > 200 else caption_text
result += f"\n📝 キャプション:\n{display_caption}\n"
# 画像/動画情報
result += "\n📊 投稿統計:\n"
if "is_video" in data:
media_type = "🎥 動画" if data["is_video"] else "📷 画像"
result += f"タイプ: {media_type}\n"
if "dimensions" in data:
dims = data["dimensions"]
result += f"サイズ: {dims.get('width', 0)} x {dims.get('height', 0)}px\n"
# エンゲージメント統計
if "edge_media_preview_like" in data:
like_count = data["edge_media_preview_like"].get("count", 0)
result += f"❤️ いいね: {like_count:,}\n"
if "edge_media_to_comment" in data:
comment_count = data["edge_media_to_comment"].get("count", 0)
result += f"💬 コメント: {comment_count:,}\n"
# 投稿日時
if "taken_at_timestamp" in data:
from datetime import datetime
timestamp = data["taken_at_timestamp"]
dt = datetime.fromtimestamp(timestamp)
result += f"📅 投稿日時: {dt.strftime('%Y-%m-%d %H:%M:%S')}\n"
# 位置情報
if "location" in data and data["location"]:
location = data["location"]
if "name" in location:
result += f"📍 位置情報: {location['name']}\n"
# 画像URL
if "display_url" in data:
result += f"\n🖼️ 画像URL:\n{data['display_url']}\n"
# コメント情報
if n_comments > 0 and "edge_media_to_comment" in data:
comment_edges = data["edge_media_to_comment"].get("edges", [])
if comment_edges:
result += f"\n💬 コメント({len(comment_edges)}件):\n"
result += "━━━━━━━━━━━━━━━━━━━━\n"
for idx, edge in enumerate(comment_edges[:n_comments], 1):
comment_node = edge.get("node", {})
if comment_node:
result += f"\n【コメント {idx}】\n"
# コメント投稿者
comment_owner = comment_node.get("owner", {})
if "username" in comment_owner:
result += f"👤 @{comment_owner['username']}: "
# コメント内容
comment_text = comment_node.get("text", "")
display_text = comment_text[:100] + "..." if len(comment_text) > 100 else comment_text
result += f"{display_text}\n"
# いいね数
if "edge_liked_by" in comment_node:
likes = comment_node["edge_liked_by"].get("count", 0)
if likes > 0:
result += f" ❤️ {likes:,} いいね\n"
# API使用量情報
if hasattr(post_result, 'units_charged'):
result += f"\n⚡ API使用量: {post_result.units_charged} ユニット\n"
return result
except Exception as e:
logger.exception(f"Error in instagram_post_info: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
async def instagram_follower_count(username: str) -> str:
"""
Instagramユーザーのフォロワー数を取得します。
Args:
username: Instagramユーザー名(@なし)
Returns:
フォロワー数の文字列
"""
logger.info(f"instagram_follower_count called with username={username}")
try:
client = _get_client()
# まずuser_idを取得
user_info = await asyncio.to_thread(
client.instagram.user_detailed_info,
username=username
)
user_id = user_info.data.get('id') if hasattr(user_info, 'data') else None
if not user_id:
return "❌ エラー: ユーザーIDが取得できませんでした"
# user_idで基本統計を取得
stats_result = await asyncio.to_thread(
client.instagram.user_basic_stats,
user_id=int(user_id)
)
data = stats_result.data if hasattr(stats_result, 'data') else {}
count = data.get("followers", data.get("follower_count", 0))
return f"👥 @{username} のフォロワー数: {count:,}"
except Exception as e:
logger.exception(f"Error in instagram_follower_count: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
async def instagram_user_tagged_posts(
username: str,
cursor: str = ""
) -> str:
"""
指定されたユーザーがタグ付けされた投稿を取得します。
各リクエストで最新10件のタグ付けされた投稿を返します。
Args:
username: Instagramユーザー名(@なし)
cursor: ページネーション用のカーソル(次のページを取得する場合に使用、空文字列の場合は最初のページ)
Returns:
タグ付けされた投稿一覧の文字列
"""
logger.info(f"instagram_user_tagged_posts called with username={username}, cursor={cursor}")
try:
client = _get_client()
# まずuser_idを取得
user_info = await asyncio.to_thread(
client.instagram.user_detailed_info,
username=username
)
user_id = user_info.data.get('id') if hasattr(user_info, 'data') else None
if not user_id:
return "❌ エラー: ユーザーIDが取得できませんでした"
# タグ付けされた投稿を取得
# cursorが空文字列の場合はNoneを渡す(API仕様に合わせる)
kwargs = {"user_id": int(user_id)}
if cursor:
kwargs["cursor"] = cursor
tagged_result = await asyncio.to_thread(
client.instagram.user_tagged_posts,
**kwargs
)
data = tagged_result.data if hasattr(tagged_result, 'data') else {}
# レスポンス形式に応じて投稿を抽出
if isinstance(data, dict):
posts_data = data.get('posts', [])
next_cursor = data.get('nextCursor', '')
else:
posts_data = []
next_cursor = ''
# posts配列からnodeを抽出
posts = []
for item in posts_data:
if isinstance(item, dict):
if 'node' in item:
posts.append(item['node'])
else:
posts.append(item)
if not posts:
return f"@{username} がタグ付けされた投稿が見つかりませんでした。"
result = f"━━━━━━━━━━━━━━━━━━━━\n"
result += f"🏷️ @{username} がタグ付けされた投稿({len(posts)}件)\n"
result += f"━━━━━━━━━━━━━━━━━━━━\n\n"
for idx, post in enumerate(posts, 1):
result += f"【投稿 {idx}】\n"
# 投稿者情報
if "owner" in post:
owner = post["owner"]
if "username" in owner:
result += f"👤 投稿者: @{owner['username']}\n"
# キャプション
if "edge_media_to_caption" in post:
edges = post["edge_media_to_caption"].get("edges", [])
if edges and len(edges) > 0:
caption_text = edges[0].get("node", {}).get("text", "")
if caption_text:
caption = caption_text[:100] + "..." if len(caption_text) > 100 else caption_text
result += f"📝 {caption}\n"
# いいね数
if "edge_liked_by" in post and "count" in post["edge_liked_by"]:
result += f"❤️ {post['edge_liked_by']['count']:,} いいね\n"
elif "edge_media_preview_like" in post and "count" in post["edge_media_preview_like"]:
result += f"❤️ {post['edge_media_preview_like']['count']:,} いいね\n"
# コメント数
if "edge_media_to_comment" in post and "count" in post["edge_media_to_comment"]:
result += f"💬 {post['edge_media_to_comment']['count']:,} コメント\n"
# URL
if "shortcode" in post:
result += f"🔗 https://instagram.com/p/{post['shortcode']}\n"
# メディアタイプ
if "is_video" in post:
media_type = "🎥 動画" if post["is_video"] else "📷 画像"
result += f"タイプ: {media_type}\n"
if post["is_video"] and "video_view_count" in post:
result += f"▶️ 再生数: {post['video_view_count']:,}\n"
# 投稿日時
if "taken_at_timestamp" in post:
from datetime import datetime
timestamp = post["taken_at_timestamp"]
dt = datetime.fromtimestamp(timestamp)
result += f"📅 投稿日時: {dt.strftime('%Y-%m-%d %H:%M:%S')}\n"
result += "\n"
# 次のページがある場合
if next_cursor:
result += f"📄 次のページがあります(cursor: {next_cursor[:50]}...)\n"
# API使用量情報
if hasattr(tagged_result, 'units_charged'):
result += f"\n⚡ API使用量: {tagged_result.units_charged} ユニット\n"
return result
except Exception as e:
logger.exception(f"Error in instagram_user_tagged_posts: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
async def instagram_music_posts(
music_id: str,
cursor: str = ""
) -> str:
"""
指定された音楽IDに関連する投稿を取得します。
ページネーションに対応しています。
Args:
music_id: 音楽ID(例: "3473785566185564")
cursor: ページネーション用のカーソル(次のページを取得する場合に使用、空文字列の場合は最初のページ)
Returns:
音楽に関連する投稿一覧の文字列
"""
logger.info(f"instagram_music_posts called with music_id={music_id}, cursor={cursor}")
try:
client = _get_client()
# 音楽に関連する投稿を取得
# cursorが空文字列の場合はNoneを渡す(API仕様に合わせる)
kwargs = {"music_id": music_id}
if cursor:
kwargs["cursor"] = cursor
music_result = await asyncio.to_thread(
client.instagram.music_posts,
**kwargs
)
data = music_result.data if hasattr(music_result, 'data') else {}
# レスポンス形式に応じて投稿を抽出
if isinstance(data, dict):
posts_data = data.get('posts', [])
next_cursor = data.get('nextCursor', '')
else:
posts_data = []
next_cursor = ''
# posts配列からnodeを抽出
posts = []
for item in posts_data:
if isinstance(item, dict):
if 'node' in item:
posts.append(item['node'])
else:
posts.append(item)
if not posts:
return f"音楽ID {music_id} に関連する投稿が見つかりませんでした。"
result = f"━━━━━━━━━━━━━━━━━━━━\n"
result += f"🎵 音楽ID {music_id} に関連する投稿({len(posts)}件)\n"
result += f"━━━━━━━━━━━━━━━━━━━━\n\n"
for idx, post in enumerate(posts, 1):
result += f"【投稿 {idx}】\n"
# 投稿者情報
if "owner" in post:
owner = post["owner"]
if "username" in owner:
result += f"👤 投稿者: @{owner['username']}"
if "full_name" in owner:
result += f" ({owner['full_name']})"
result += "\n"
# キャプション
if "edge_media_to_caption" in post:
edges = post["edge_media_to_caption"].get("edges", [])
if edges and len(edges) > 0:
caption_text = edges[0].get("node", {}).get("text", "")
if caption_text:
caption = caption_text[:100] + "..." if len(caption_text) > 100 else caption_text
result += f"📝 {caption}\n"
elif "caption" in post:
caption = post["caption"][:100] + "..." if len(post.get("caption", "")) > 100 else post.get("caption", "")
result += f"📝 {caption}\n"
# いいね数
if "edge_liked_by" in post and "count" in post["edge_liked_by"]:
result += f"❤️ {post['edge_liked_by']['count']:,} いいね\n"
elif "edge_media_preview_like" in post and "count" in post["edge_media_preview_like"]:
result += f"❤️ {post['edge_media_preview_like']['count']:,} いいね\n"
elif "like_count" in post:
result += f"❤️ {post['like_count']:,} いいね\n"
# コメント数
if "edge_media_to_comment" in post and "count" in post["edge_media_to_comment"]:
result += f"💬 {post['edge_media_to_comment']['count']:,} コメント\n"
elif "comment_count" in post:
result += f"💬 {post['comment_count']:,} コメント\n"
# URL
if "shortcode" in post:
result += f"🔗 https://instagram.com/p/{post['shortcode']}\n"
elif "code" in post:
result += f"🔗 https://instagram.com/p/{post['code']}\n"
# メディアタイプ
if "is_video" in post:
media_type = "🎥 動画" if post["is_video"] else "📷 画像"
result += f"タイプ: {media_type}\n"
if post["is_video"] and "video_view_count" in post:
result += f"▶️ 再生数: {post['video_view_count']:,}\n"
# 投稿日時
if "taken_at_timestamp" in post:
from datetime import datetime
timestamp = post["taken_at_timestamp"]
dt = datetime.fromtimestamp(timestamp)
result += f"📅 投稿日時: {dt.strftime('%Y-%m-%d %H:%M:%S')}\n"
# 画像URL
if "display_url" in post:
result += f"🖼️ 画像URL: {post['display_url'][:80]}...\n"
elif "thumbnail_src" in post:
result += f"🖼️ サムネイル: {post['thumbnail_src'][:80]}...\n"
result += "\n"
# 次のページがある場合
if next_cursor:
result += f"📄 次のページがあります(cursor: {next_cursor[:50]}...)\n"
# API使用量情報
if hasattr(music_result, 'units_charged'):
result += f"\n⚡ API使用量: {music_result.units_charged} ユニット\n"
return result
except Exception as e:
logger.exception(f"Error in instagram_music_posts: {e}")
return f"❌ エラー: {str(e)}"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# サーバー起動
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def main():
mcp.run(transport="stdio")
if __name__ == "__main__":
main()