Skip to main content
Glama
bluesky_mcp.py24.5 kB
import os import sys # 强制 Windows 使用二进制模式标准输入输出,避免 \r\n 问题 # Antigravity 的 MCP 客户端对 \r (CR) 非常敏感,会导致 "invalid trailing data" 错误 if sys.platform == 'win32': import msvcrt msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) import json from datetime import datetime, timezone from typing import Optional, Any from functools import lru_cache from mcp.server.fastmcp import FastMCP from atproto import Client, client_utils from pydantic import BaseModel, Field from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 创建 MCP 服务器 mcp = FastMCP( name="Bluesky MCP", instructions="""A client for the Bluesky social network (AT Protocol). This toolset allows you to function as an autonomous user on Bluesky. Capabilities: - **Read**: Fetch timelines, user profiles (`get_profile`), and search for posts (`search_posts`). - **Write**: Create new posts (`send_post`) and reply to others (`reply_to_post`). - **React**: Like (`like_post`) and Repost (`repost`) content. - **Monitor**: Check notifications (`get_notifications`). Operational Rules: 1. **Character Limit**: Maximum 300 characters per post. The API will fail if exceeded. 2. **Threading**: To reply, use `reply_to_post` with the target post's URI. The tool handles the threading references automatically. 3. **Awareness**: Before posting about a topic, it is recommended to search or check the timeline to understand the context. """ ) class BlueskyClient: """Bluesky 客户端单例,管理登录状态""" _instance: Optional["BlueskyClient"] = None _client: Optional[Client] = None _logged_in: bool = False def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def get_client(self) -> Client: """获取已登录的客户端""" if self._client is None: self._client = Client() if not self._logged_in: handle = os.getenv("BLUESKY_HANDLE") password = os.getenv("BLUESKY_PASSWORD") if not handle or not password: raise ValueError( "Missing BLUESKY_HANDLE or BLUESKY_PASSWORD environment variables. " "Please set them before using this MCP server." ) self._client.login(handle, password) self._logged_in = True return self._client @property def me(self): """获取当前登录用户的信息""" return self.get_client().me def get_client() -> Client: """获取 Bluesky 客户端""" return BlueskyClient().get_client() def _get_attr(obj: Any, path: str, default: Any = None) -> Any: """Helper to safely get nested attributes from atproto objects or dicts""" parts = path.split(".") current = obj for part in parts: if isinstance(current, dict): current = current.get(part) else: current = getattr(current, part, None) if current is None: return default return current def format_post(post_data: Any, include_reply_context: bool = False) -> dict: """格式化帖子数据,使其更易读""" # Handle both dict and object input if isinstance(post_data, dict): post = post_data.get("post", post_data) else: post = getattr(post_data, "post", post_data) # Helper for attribute access def get(obj, attr, default=None): if isinstance(obj, dict): return obj.get(attr, default) return getattr(obj, attr, default) author = get(post, "author") record = get(post, "record") result = { "uri": get(post, "uri", ""), "cid": get(post, "cid", ""), "author": { "handle": get(author, "handle", ""), "display_name": get(author, "display_name", get(author, "displayName", get(author, "handle", ""))), "avatar": get(author, "avatar", ""), }, "text": get(record, "text", ""), "created_at": get(record, "created_at", get(record, "createdAt", "")), "likes": get(post, "like_count", get(post, "likeCount", 0)), "reposts": get(post, "repost_count", get(post, "repostCount", 0)), "replies": get(post, "reply_count", get(post, "replyCount", 0)), "indexed_at": get(post, "indexed_at", get(post, "indexedAt", "")), } # 如果有嵌入内容(链接卡片、图片等) embed = get(post, "embed") if embed: embed_type = get(embed, "$type") or getattr(embed, "py_type", "") if "external" in str(embed_type) or hasattr(embed, "external"): external = get(embed, "external") result["embed"] = { "type": "link", "url": get(external, "uri", ""), "title": get(external, "title", ""), "description": get(external, "description", ""), } elif "images" in str(embed_type) or hasattr(embed, "images"): images = get(embed, "images", []) result["embed"] = { "type": "images", "images": [ {"url": get(img, "fullsize", ""), "alt": get(img, "alt", "")} for img in images ] } # 如果是回复,包含回复上下文 if include_reply_context: reply = get(post_data, "reply") if reply: parent = get(reply, "parent") if parent: parent_author = get(parent, "author") parent_record = get(parent, "record") parent_text = get(parent_record, "text", "") result["reply_to"] = { "uri": get(parent, "uri", ""), "author": get(parent_author, "handle", ""), "text": parent_text, } return result def format_notification(notif: Any) -> dict: """格式化通知数据""" def get(obj, attr, default=None): if isinstance(obj, dict): return obj.get(attr, default) return getattr(obj, attr, default) author = get(notif, "author") record = get(notif, "record") return { "uri": get(notif, "uri", ""), "cid": get(notif, "cid", ""), "reason": get(notif, "reason", ""), # like, repost, follow, mention, reply, quote "author": { "handle": get(author, "handle", ""), "display_name": get(author, "display_name", get(author, "displayName", "")), }, "record_text": get(record, "text", ""), "indexed_at": get(notif, "indexed_at", get(notif, "indexedAt", "")), "is_read": get(notif, "is_read", get(notif, "isRead", False)), # 对于 like/repost,包含被互动的帖子信息 "subject_uri": get(notif, "reason_subject", get(notif, "reasonSubject", "")), } # ============================================================================ # 发帖相关工具 # ============================================================================ @mcp.tool() def send_post( text: str, link_url: Optional[str] = None, link_title: Optional[str] = None, link_description: Optional[str] = None, ) -> str: """ 发送一条 Bluesky 帖子。 CRITICAL LIMITATION: Bluesky posts are strictly limited to 300 characters (300 graphemes). If your text exceeds this, the API will return a 400 InvalidRequest error. You MUST condense your message to fit within this limit. Be concise. Link URLs count towards the limit. Args: text: 帖子内容 (Must be <= 300 chars) link_url: 可选的链接 URL(将在文本末尾添加链接) link_title: 链接标题(仅在提供 link_url 时有效) link_description: 链接描述(仅在提供 link_url 时有效) Returns: 发送成功后的帖子 URI,或者包含长度信息的错误提示 """ client = get_client() # 估算长度 (近似值,Bluesky 使用 grapheme 计数,Python len() 是 code points) input_length = len(text) try: if link_url: # 使用 TextBuilder 构建带链接的帖子 text_builder = client_utils.TextBuilder() text_builder.text(text) if not text.endswith(" ") and not text.endswith("\n"): text_builder.text(" ") text_builder.link(link_title or link_url, link_url) post = client.send_post(text_builder) else: post = client.send_post(text=text) return json.dumps({ "success": True, "uri": post.uri, "cid": post.cid, "message": f"Post sent successfully!" }, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({ "success": False, "error": "Failed to send post", "details": str(e), "input_length_approx": input_length, "limit": 300, "instruction": "Text is likely too long. Please shorten it to under 300 characters and try again." }, ensure_ascii=False, indent=2) @mcp.tool() def reply_to_post( post_uri: str, text: str, ) -> str: """ 回复一条帖子。 CRITICAL LIMITATION: Text must be <= 300 characters. Args: post_uri: 要回复的帖子 URI (格式: at://did:plc:xxx/app.bsky.feed.post/xxx) text: 回复内容 Returns: 回复帖子的 URI,或者包含长度信息的错误提示 """ client = get_client() input_length = len(text) try: # 获取原帖信息以构建回复引用 parent_post = client.get_post_thread(post_uri) parent = parent_post.thread.post # 构建回复引用 reply_ref = { "root": { "uri": parent.uri, "cid": parent.cid, }, "parent": { "uri": parent.uri, "cid": parent.cid, } } # 如果原帖本身是回复,需要追溯到根帖子 if hasattr(parent.record, "reply") and parent.record.reply: reply_ref["root"] = { "uri": parent.record.reply.root.uri, "cid": parent.record.reply.root.cid, } post = client.send_post(text=text, reply_to=reply_ref) return json.dumps({ "success": True, "uri": post.uri, "cid": post.cid, "replied_to": parent.author.handle, "message": f"Replied successfully to @{parent.author.handle}!" }, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({ "success": False, "error": "Failed to reply to post", "details": str(e), "input_length_approx": input_length, "limit": 300, "instruction": "Text is likely too long. Please shorten it to under 300 characters and try again." }, ensure_ascii=False, indent=2) @mcp.tool() def delete_post(post_uri: str) -> str: """ 删除一条帖子。 Args: post_uri: 要删除的帖子 URI Returns: 删除结果 """ client = get_client() # 使用 unsend 来删除帖子 (delete_post 需要 rkey,unsend 更方便) success = client.delete_post(post_uri) return json.dumps({ "success": True, "deleted_uri": post_uri, "message": "Post deleted successfully!" }, ensure_ascii=False, indent=2) # ============================================================================ # 浏览相关工具 # ============================================================================ @mcp.tool() def get_timeline(limit: int = 20, cursor: Optional[str] = None) -> str: """ 获取首页时间线(关注的人的帖子)。 Args: limit: 获取帖子数量,最大 100 cursor: 分页游标,用于获取下一页 Returns: 时间线帖子列表 """ client = get_client() timeline = client.get_timeline(limit=min(limit, 100), cursor=cursor) posts = [format_post(item, include_reply_context=True) for item in timeline.feed] return json.dumps({ "posts": posts, "cursor": timeline.cursor, "count": len(posts), }, ensure_ascii=False, indent=2) @mcp.tool() def get_author_feed( handle: str, limit: int = 20, cursor: Optional[str] = None, ) -> str: """ 获取某个用户的帖子列表。 Args: handle: 用户 handle (例如: nocturne.bsky.social) limit: 获取帖子数量,最大 100 cursor: 分页游标 Returns: 用户帖子列表 """ client = get_client() feed = client.get_author_feed(actor=handle, limit=min(limit, 100), cursor=cursor) posts = [format_post(item, include_reply_context=True) for item in feed.feed] return json.dumps({ "author": handle, "posts": posts, "cursor": feed.cursor, "count": len(posts), }, ensure_ascii=False, indent=2) @mcp.tool() def get_post_thread(post_uri: str, depth: int = 6) -> str: """ 获取帖子及其回复线程。 Args: post_uri: 帖子 URI depth: 获取回复深度,最大 6 Returns: 帖子线程(包括父帖和回复) """ client = get_client() thread = client.get_post_thread(uri=post_uri, depth=min(depth, 6)) def format_thread_post(thread_item): """递归格式化线程中的帖子""" if not thread_item or not hasattr(thread_item, "post"): return None result = format_post({"post": thread_item.post}) # 处理回复 if hasattr(thread_item, "replies") and thread_item.replies: result["replies"] = [ format_thread_post(reply) for reply in thread_item.replies if reply and hasattr(reply, "post") ] result["replies"] = [r for r in result["replies"] if r] return result # 格式化主帖 main_post = format_thread_post(thread.thread) # 格式化父帖(如果有) parent_chain = [] if hasattr(thread.thread, "parent") and thread.thread.parent: parent = thread.thread.parent while parent and hasattr(parent, "post"): parent_chain.insert(0, format_post({"post": parent.post})) parent = getattr(parent, "parent", None) return json.dumps({ "parent_chain": parent_chain, "post": main_post, }, ensure_ascii=False, indent=2) # ============================================================================ # 互动相关工具 # ============================================================================ @mcp.tool() def like_post(post_uri: str) -> str: """ 点赞一条帖子。 Args: post_uri: 帖子 URI Returns: 点赞结果 """ client = get_client() # 获取帖子的 cid thread = client.get_post_thread(uri=post_uri) post = thread.thread.post like = client.like(uri=post.uri, cid=post.cid) return json.dumps({ "success": True, "liked_post": post_uri, "like_uri": like.uri, "author": post.author.handle, "message": f"Liked @{post.author.handle}'s post!" }, ensure_ascii=False, indent=2) @mcp.tool() def unlike_post(post_uri: str) -> str: """ 取消点赞一条帖子。 Args: post_uri: 帖子 URI Returns: 取消点赞结果 """ client = get_client() success = client.unlike(post_uri) return json.dumps({ "success": True, "unliked_post": post_uri, "message": "Unliked successfully!" }, ensure_ascii=False, indent=2) @mcp.tool() def repost(post_uri: str) -> str: """ 转发一条帖子。 Args: post_uri: 帖子 URI Returns: 转发结果 """ client = get_client() # 获取帖子的 cid thread = client.get_post_thread(uri=post_uri) post = thread.thread.post repost_ref = client.repost(uri=post.uri, cid=post.cid) return json.dumps({ "success": True, "reposted_post": post_uri, "repost_uri": repost_ref.uri, "author": post.author.handle, "message": f"Reposted @{post.author.handle}'s post!" }, ensure_ascii=False, indent=2) @mcp.tool() def unrepost(post_uri: str) -> str: """ 取消转发一条帖子。 Args: post_uri: 帖子 URI Returns: 取消转发结果 """ client = get_client() success = client.unrepost(post_uri) return json.dumps({ "success": True, "unreposted_post": post_uri, "message": "Unreposted successfully!" }, ensure_ascii=False, indent=2) # ============================================================================ # 通知相关工具 # ============================================================================ @mcp.tool() def get_notifications( limit: int = 25, cursor: Optional[str] = None, filter_reason: Optional[str] = None, unread_only: bool = True, ) -> str: """ 获取通知列表(被提及、回复、点赞、转发、关注等)。 Args: limit: 获取通知数量,最大 100 cursor: 分页游标 filter_reason: 可选,筛选特定类型的通知 (like, repost, follow, mention, reply, quote) unread_only: 只返回未读通知,默认 True。获取未读通知后会自动标记为已读。 Returns: 通知列表 """ client = get_client() notifs = client.app.bsky.notification.list_notifications( {"limit": min(limit, 100), "cursor": cursor} ) notifications = [format_notification(n) for n in notifs.notifications] # 如果指定了筛选条件 if filter_reason: notifications = [n for n in notifications if n["reason"] == filter_reason] # 只返回未读通知 if unread_only: notifications = [n for n in notifications if not n["is_read"]] # 获取未读通知后自动标记为已读 if notifications: now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") client.app.bsky.notification.update_seen({"seenAt": now}) return json.dumps({ "notifications": notifications, "cursor": notifs.cursor if not unread_only else None, # 未读模式下不返回 cursor,因为过滤后分页无意义 "count": len(notifications), }, ensure_ascii=False, indent=2) @mcp.tool() def get_unread_count() -> str: """ 获取未读通知数量。 Returns: 未读通知数量 """ client = get_client() unread = client.app.bsky.notification.get_unread_count({}) return json.dumps({ "unread_count": unread.count, }, ensure_ascii=False, indent=2) # ============================================================================ # 社交关系相关工具 # ============================================================================ @mcp.tool() def get_profile(handle: str) -> str: """ 获取用户资料。 Args: handle: 用户 handle (例如: nocturne.bsky.social) Returns: 用户资料信息 """ client = get_client() profile = client.get_profile(actor=handle) return json.dumps({ "did": profile.did, "handle": profile.handle, "display_name": profile.display_name or profile.handle, "description": profile.description or "", "avatar": profile.avatar or "", "banner": profile.banner or "", "followers_count": profile.followers_count or 0, "follows_count": profile.follows_count or 0, "posts_count": profile.posts_count or 0, "indexed_at": profile.indexed_at or "", }, ensure_ascii=False, indent=2) @mcp.tool() def get_my_profile() -> str: """ 获取当前登录用户(Nocturne)的资料。 Returns: 当前用户资料信息 """ client = get_client() me = client.me # 获取完整资料 profile = client.get_profile(actor=me.handle) return json.dumps({ "did": me.did, "handle": me.handle, "display_name": profile.display_name or me.handle, "description": profile.description or "", "avatar": profile.avatar or "", "followers_count": profile.followers_count or 0, "follows_count": profile.follows_count or 0, "posts_count": profile.posts_count or 0, }, ensure_ascii=False, indent=2) @mcp.tool() def follow_user(handle: str) -> str: """ 关注一个用户。 Args: handle: 要关注的用户 handle Returns: 关注结果 """ client = get_client() # 先获取用户的 DID profile = client.get_profile(actor=handle) follow = client.follow(profile.did) return json.dumps({ "success": True, "followed": handle, "follow_uri": follow.uri, "message": f"Now following @{handle}!" }, ensure_ascii=False, indent=2) @mcp.tool() def unfollow_user(handle: str) -> str: """ 取消关注一个用户。 Args: handle: 要取消关注的用户 handle Returns: 取消关注结果 """ client = get_client() # 先获取用户的 DID profile = client.get_profile(actor=handle) success = client.unfollow(profile.did) return json.dumps({ "success": True, "unfollowed": handle, "message": f"Unfollowed @{handle}!" }, ensure_ascii=False, indent=2) @mcp.tool() def search_posts( query: str, limit: int = 25, cursor: Optional[str] = None, ) -> str: """ 搜索帖子。 Args: query: 搜索关键词 limit: 返回数量,最大 100 cursor: 分页游标 Returns: 搜索结果 """ client = get_client() # 使用 app.bsky.feed.searchPosts results = client.app.bsky.feed.search_posts({ "q": query, "limit": min(limit, 100), "cursor": cursor, }) posts = [format_post({"post": p}) for p in results.posts] return json.dumps({ "query": query, "posts": posts, "cursor": results.cursor if hasattr(results, "cursor") else None, "count": len(posts), }, ensure_ascii=False, indent=2) @mcp.tool() def search_users( query: str, limit: int = 25, cursor: Optional[str] = None, ) -> str: """ 搜索用户。 Args: query: 搜索关键词 limit: 返回数量,最大 100 cursor: 分页游标 Returns: 搜索结果 """ client = get_client() # 使用 app.bsky.actor.searchActors results = client.app.bsky.actor.search_actors({ "q": query, "limit": min(limit, 100), "cursor": cursor, }) users = [ { "did": u.did, "handle": u.handle, "display_name": u.display_name or u.handle, "description": (u.description or "")[:200], "avatar": u.avatar or "", } for u in results.actors ] return json.dumps({ "query": query, "users": users, "cursor": results.cursor if hasattr(results, "cursor") else None, "count": len(users), }, ensure_ascii=False, indent=2) # ============================================================================ # MCP 资源 (可选,用于暴露一些静态信息) # ============================================================================ @mcp.resource("bluesky://profile") def get_current_profile_resource() -> str: """ 当前登录用户的资料(作为 MCP 资源)。 """ return get_my_profile() @mcp.resource("bluesky://notifications/unread") def get_unread_count_resource() -> str: """ 未读通知数量(作为 MCP 资源)。 """ return get_unread_count() # ============================================================================ # 入口点 # ============================================================================ if __name__ == "__main__": # 使用 stdio 传输运行 MCP 服务器 mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dataojitori/mcp-server-bluesky-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server