import os
import sys
# 强制 Windows 使用二进制模式标准输入输出,避免 \r\n 问题
# Antigravity 的 MCP 客户端对 \r (CR) 非常敏感,会导致 "invalid trailing data" 错误
if sys.platform == 'win32':
import msvcrt
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
import json
from datetime import datetime, timezone
from typing import Optional, Any
from functools import lru_cache
from mcp.server.fastmcp import FastMCP
from atproto import Client, client_utils
from pydantic import BaseModel, Field
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 创建 MCP 服务器
mcp = FastMCP(
name="Bluesky MCP",
instructions="""A client for the Bluesky social network (AT Protocol).
This toolset allows you to function as an autonomous user on Bluesky.
Capabilities:
- **Read**: Fetch timelines, user profiles (`get_profile`), and search for posts (`search_posts`).
- **Write**: Create new posts (`send_post`) and reply to others (`reply_to_post`).
- **React**: Like (`like_post`) and Repost (`repost`) content.
- **Monitor**: Check notifications (`get_notifications`).
Operational Rules:
1. **Character Limit**: Maximum 300 characters per post. The API will fail if exceeded.
2. **Threading**: To reply, use `reply_to_post` with the target post's URI. The tool handles the threading references automatically.
3. **Awareness**: Before posting about a topic, it is recommended to search or check the timeline to understand the context.
"""
)
class BlueskyClient:
"""Bluesky 客户端单例,管理登录状态"""
_instance: Optional["BlueskyClient"] = None
_client: Optional[Client] = None
_logged_in: bool = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def get_client(self) -> Client:
"""获取已登录的客户端"""
if self._client is None:
self._client = Client()
if not self._logged_in:
handle = os.getenv("BLUESKY_HANDLE")
password = os.getenv("BLUESKY_PASSWORD")
if not handle or not password:
raise ValueError(
"Missing BLUESKY_HANDLE or BLUESKY_PASSWORD environment variables. "
"Please set them before using this MCP server."
)
self._client.login(handle, password)
self._logged_in = True
return self._client
@property
def me(self):
"""获取当前登录用户的信息"""
return self.get_client().me
def get_client() -> Client:
"""获取 Bluesky 客户端"""
return BlueskyClient().get_client()
def _get_attr(obj: Any, path: str, default: Any = None) -> Any:
"""Helper to safely get nested attributes from atproto objects or dicts"""
parts = path.split(".")
current = obj
for part in parts:
if isinstance(current, dict):
current = current.get(part)
else:
current = getattr(current, part, None)
if current is None:
return default
return current
def format_post(post_data: Any, include_reply_context: bool = False) -> dict:
"""格式化帖子数据,使其更易读"""
# Handle both dict and object input
if isinstance(post_data, dict):
post = post_data.get("post", post_data)
else:
post = getattr(post_data, "post", post_data)
# Helper for attribute access
def get(obj, attr, default=None):
if isinstance(obj, dict):
return obj.get(attr, default)
return getattr(obj, attr, default)
author = get(post, "author")
record = get(post, "record")
result = {
"uri": get(post, "uri", ""),
"cid": get(post, "cid", ""),
"author": {
"handle": get(author, "handle", ""),
"display_name": get(author, "display_name", get(author, "displayName", get(author, "handle", ""))),
"avatar": get(author, "avatar", ""),
},
"text": get(record, "text", ""),
"created_at": get(record, "created_at", get(record, "createdAt", "")),
"likes": get(post, "like_count", get(post, "likeCount", 0)),
"reposts": get(post, "repost_count", get(post, "repostCount", 0)),
"replies": get(post, "reply_count", get(post, "replyCount", 0)),
"indexed_at": get(post, "indexed_at", get(post, "indexedAt", "")),
}
# 如果有嵌入内容(链接卡片、图片等)
embed = get(post, "embed")
if embed:
embed_type = get(embed, "$type") or getattr(embed, "py_type", "")
if "external" in str(embed_type) or hasattr(embed, "external"):
external = get(embed, "external")
result["embed"] = {
"type": "link",
"url": get(external, "uri", ""),
"title": get(external, "title", ""),
"description": get(external, "description", ""),
}
elif "images" in str(embed_type) or hasattr(embed, "images"):
images = get(embed, "images", [])
result["embed"] = {
"type": "images",
"images": [
{"url": get(img, "fullsize", ""), "alt": get(img, "alt", "")}
for img in images
]
}
# 如果是回复,包含回复上下文
if include_reply_context:
reply = get(post_data, "reply")
if reply:
parent = get(reply, "parent")
if parent:
parent_author = get(parent, "author")
parent_record = get(parent, "record")
parent_text = get(parent_record, "text", "")
result["reply_to"] = {
"uri": get(parent, "uri", ""),
"author": get(parent_author, "handle", ""),
"text": parent_text,
}
return result
def format_notification(notif: Any) -> dict:
"""格式化通知数据"""
def get(obj, attr, default=None):
if isinstance(obj, dict):
return obj.get(attr, default)
return getattr(obj, attr, default)
author = get(notif, "author")
record = get(notif, "record")
return {
"uri": get(notif, "uri", ""),
"cid": get(notif, "cid", ""),
"reason": get(notif, "reason", ""), # like, repost, follow, mention, reply, quote
"author": {
"handle": get(author, "handle", ""),
"display_name": get(author, "display_name", get(author, "displayName", "")),
},
"record_text": get(record, "text", ""),
"indexed_at": get(notif, "indexed_at", get(notif, "indexedAt", "")),
"is_read": get(notif, "is_read", get(notif, "isRead", False)),
# 对于 like/repost,包含被互动的帖子信息
"subject_uri": get(notif, "reason_subject", get(notif, "reasonSubject", "")),
}
# ============================================================================
# 发帖相关工具
# ============================================================================
@mcp.tool()
def send_post(
text: str,
link_url: Optional[str] = None,
link_title: Optional[str] = None,
link_description: Optional[str] = None,
) -> str:
"""
发送一条 Bluesky 帖子。
CRITICAL LIMITATION: Bluesky posts are strictly limited to 300 characters (300 graphemes).
If your text exceeds this, the API will return a 400 InvalidRequest error.
You MUST condense your message to fit within this limit. Be concise.
Link URLs count towards the limit.
Args:
text: 帖子内容 (Must be <= 300 chars)
link_url: 可选的链接 URL(将在文本末尾添加链接)
link_title: 链接标题(仅在提供 link_url 时有效)
link_description: 链接描述(仅在提供 link_url 时有效)
Returns:
发送成功后的帖子 URI,或者包含长度信息的错误提示
"""
client = get_client()
# 估算长度 (近似值,Bluesky 使用 grapheme 计数,Python len() 是 code points)
input_length = len(text)
try:
if link_url:
# 使用 TextBuilder 构建带链接的帖子
text_builder = client_utils.TextBuilder()
text_builder.text(text)
if not text.endswith(" ") and not text.endswith("\n"):
text_builder.text(" ")
text_builder.link(link_title or link_url, link_url)
post = client.send_post(text_builder)
else:
post = client.send_post(text=text)
return json.dumps({
"success": True,
"uri": post.uri,
"cid": post.cid,
"message": f"Post sent successfully!"
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": "Failed to send post",
"details": str(e),
"input_length_approx": input_length,
"limit": 300,
"instruction": "Text is likely too long. Please shorten it to under 300 characters and try again."
}, ensure_ascii=False, indent=2)
@mcp.tool()
def reply_to_post(
post_uri: str,
text: str,
) -> str:
"""
回复一条帖子。
CRITICAL LIMITATION: Text must be <= 300 characters.
Args:
post_uri: 要回复的帖子 URI (格式: at://did:plc:xxx/app.bsky.feed.post/xxx)
text: 回复内容
Returns:
回复帖子的 URI,或者包含长度信息的错误提示
"""
client = get_client()
input_length = len(text)
try:
# 获取原帖信息以构建回复引用
parent_post = client.get_post_thread(post_uri)
parent = parent_post.thread.post
# 构建回复引用
reply_ref = {
"root": {
"uri": parent.uri,
"cid": parent.cid,
},
"parent": {
"uri": parent.uri,
"cid": parent.cid,
}
}
# 如果原帖本身是回复,需要追溯到根帖子
if hasattr(parent.record, "reply") and parent.record.reply:
reply_ref["root"] = {
"uri": parent.record.reply.root.uri,
"cid": parent.record.reply.root.cid,
}
post = client.send_post(text=text, reply_to=reply_ref)
return json.dumps({
"success": True,
"uri": post.uri,
"cid": post.cid,
"replied_to": parent.author.handle,
"message": f"Replied successfully to @{parent.author.handle}!"
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": "Failed to reply to post",
"details": str(e),
"input_length_approx": input_length,
"limit": 300,
"instruction": "Text is likely too long. Please shorten it to under 300 characters and try again."
}, ensure_ascii=False, indent=2)
@mcp.tool()
def delete_post(post_uri: str) -> str:
"""
删除一条帖子。
Args:
post_uri: 要删除的帖子 URI
Returns:
删除结果
"""
client = get_client()
# 使用 unsend 来删除帖子 (delete_post 需要 rkey,unsend 更方便)
success = client.delete_post(post_uri)
return json.dumps({
"success": True,
"deleted_uri": post_uri,
"message": "Post deleted successfully!"
}, ensure_ascii=False, indent=2)
# ============================================================================
# 浏览相关工具
# ============================================================================
@mcp.tool()
def get_timeline(limit: int = 20, cursor: Optional[str] = None) -> str:
"""
获取首页时间线(关注的人的帖子)。
Args:
limit: 获取帖子数量,最大 100
cursor: 分页游标,用于获取下一页
Returns:
时间线帖子列表
"""
client = get_client()
timeline = client.get_timeline(limit=min(limit, 100), cursor=cursor)
posts = [format_post(item, include_reply_context=True) for item in timeline.feed]
return json.dumps({
"posts": posts,
"cursor": timeline.cursor,
"count": len(posts),
}, ensure_ascii=False, indent=2)
@mcp.tool()
def get_author_feed(
handle: str,
limit: int = 20,
cursor: Optional[str] = None,
) -> str:
"""
获取某个用户的帖子列表。
Args:
handle: 用户 handle (例如: nocturne.bsky.social)
limit: 获取帖子数量,最大 100
cursor: 分页游标
Returns:
用户帖子列表
"""
client = get_client()
feed = client.get_author_feed(actor=handle, limit=min(limit, 100), cursor=cursor)
posts = [format_post(item, include_reply_context=True) for item in feed.feed]
return json.dumps({
"author": handle,
"posts": posts,
"cursor": feed.cursor,
"count": len(posts),
}, ensure_ascii=False, indent=2)
@mcp.tool()
def get_post_thread(post_uri: str, depth: int = 6) -> str:
"""
获取帖子及其回复线程。
Args:
post_uri: 帖子 URI
depth: 获取回复深度,最大 6
Returns:
帖子线程(包括父帖和回复)
"""
client = get_client()
thread = client.get_post_thread(uri=post_uri, depth=min(depth, 6))
def format_thread_post(thread_item):
"""递归格式化线程中的帖子"""
if not thread_item or not hasattr(thread_item, "post"):
return None
result = format_post({"post": thread_item.post})
# 处理回复
if hasattr(thread_item, "replies") and thread_item.replies:
result["replies"] = [
format_thread_post(reply)
for reply in thread_item.replies
if reply and hasattr(reply, "post")
]
result["replies"] = [r for r in result["replies"] if r]
return result
# 格式化主帖
main_post = format_thread_post(thread.thread)
# 格式化父帖(如果有)
parent_chain = []
if hasattr(thread.thread, "parent") and thread.thread.parent:
parent = thread.thread.parent
while parent and hasattr(parent, "post"):
parent_chain.insert(0, format_post({"post": parent.post}))
parent = getattr(parent, "parent", None)
return json.dumps({
"parent_chain": parent_chain,
"post": main_post,
}, ensure_ascii=False, indent=2)
# ============================================================================
# 互动相关工具
# ============================================================================
@mcp.tool()
def like_post(post_uri: str) -> str:
"""
点赞一条帖子。
Args:
post_uri: 帖子 URI
Returns:
点赞结果
"""
client = get_client()
# 获取帖子的 cid
thread = client.get_post_thread(uri=post_uri)
post = thread.thread.post
like = client.like(uri=post.uri, cid=post.cid)
return json.dumps({
"success": True,
"liked_post": post_uri,
"like_uri": like.uri,
"author": post.author.handle,
"message": f"Liked @{post.author.handle}'s post!"
}, ensure_ascii=False, indent=2)
@mcp.tool()
def unlike_post(post_uri: str) -> str:
"""
取消点赞一条帖子。
Args:
post_uri: 帖子 URI
Returns:
取消点赞结果
"""
client = get_client()
success = client.unlike(post_uri)
return json.dumps({
"success": True,
"unliked_post": post_uri,
"message": "Unliked successfully!"
}, ensure_ascii=False, indent=2)
@mcp.tool()
def repost(post_uri: str) -> str:
"""
转发一条帖子。
Args:
post_uri: 帖子 URI
Returns:
转发结果
"""
client = get_client()
# 获取帖子的 cid
thread = client.get_post_thread(uri=post_uri)
post = thread.thread.post
repost_ref = client.repost(uri=post.uri, cid=post.cid)
return json.dumps({
"success": True,
"reposted_post": post_uri,
"repost_uri": repost_ref.uri,
"author": post.author.handle,
"message": f"Reposted @{post.author.handle}'s post!"
}, ensure_ascii=False, indent=2)
@mcp.tool()
def unrepost(post_uri: str) -> str:
"""
取消转发一条帖子。
Args:
post_uri: 帖子 URI
Returns:
取消转发结果
"""
client = get_client()
success = client.unrepost(post_uri)
return json.dumps({
"success": True,
"unreposted_post": post_uri,
"message": "Unreposted successfully!"
}, ensure_ascii=False, indent=2)
# ============================================================================
# 通知相关工具
# ============================================================================
@mcp.tool()
def get_notifications(
limit: int = 25,
cursor: Optional[str] = None,
filter_reason: Optional[str] = None,
unread_only: bool = True,
) -> str:
"""
获取通知列表(被提及、回复、点赞、转发、关注等)。
Args:
limit: 获取通知数量,最大 100
cursor: 分页游标
filter_reason: 可选,筛选特定类型的通知 (like, repost, follow, mention, reply, quote)
unread_only: 只返回未读通知,默认 True。获取未读通知后会自动标记为已读。
Returns:
通知列表
"""
client = get_client()
notifs = client.app.bsky.notification.list_notifications(
{"limit": min(limit, 100), "cursor": cursor}
)
notifications = [format_notification(n) for n in notifs.notifications]
# 如果指定了筛选条件
if filter_reason:
notifications = [n for n in notifications if n["reason"] == filter_reason]
# 只返回未读通知
if unread_only:
notifications = [n for n in notifications if not n["is_read"]]
# 获取未读通知后自动标记为已读
if notifications:
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
client.app.bsky.notification.update_seen({"seenAt": now})
return json.dumps({
"notifications": notifications,
"cursor": notifs.cursor if not unread_only else None, # 未读模式下不返回 cursor,因为过滤后分页无意义
"count": len(notifications),
}, ensure_ascii=False, indent=2)
@mcp.tool()
def get_unread_count() -> str:
"""
获取未读通知数量。
Returns:
未读通知数量
"""
client = get_client()
unread = client.app.bsky.notification.get_unread_count({})
return json.dumps({
"unread_count": unread.count,
}, ensure_ascii=False, indent=2)
# ============================================================================
# 社交关系相关工具
# ============================================================================
@mcp.tool()
def get_profile(handle: str) -> str:
"""
获取用户资料。
Args:
handle: 用户 handle (例如: nocturne.bsky.social)
Returns:
用户资料信息
"""
client = get_client()
profile = client.get_profile(actor=handle)
return json.dumps({
"did": profile.did,
"handle": profile.handle,
"display_name": profile.display_name or profile.handle,
"description": profile.description or "",
"avatar": profile.avatar or "",
"banner": profile.banner or "",
"followers_count": profile.followers_count or 0,
"follows_count": profile.follows_count or 0,
"posts_count": profile.posts_count or 0,
"indexed_at": profile.indexed_at or "",
}, ensure_ascii=False, indent=2)
@mcp.tool()
def get_my_profile() -> str:
"""
获取当前登录用户(Nocturne)的资料。
Returns:
当前用户资料信息
"""
client = get_client()
me = client.me
# 获取完整资料
profile = client.get_profile(actor=me.handle)
return json.dumps({
"did": me.did,
"handle": me.handle,
"display_name": profile.display_name or me.handle,
"description": profile.description or "",
"avatar": profile.avatar or "",
"followers_count": profile.followers_count or 0,
"follows_count": profile.follows_count or 0,
"posts_count": profile.posts_count or 0,
}, ensure_ascii=False, indent=2)
@mcp.tool()
def follow_user(handle: str) -> str:
"""
关注一个用户。
Args:
handle: 要关注的用户 handle
Returns:
关注结果
"""
client = get_client()
# 先获取用户的 DID
profile = client.get_profile(actor=handle)
follow = client.follow(profile.did)
return json.dumps({
"success": True,
"followed": handle,
"follow_uri": follow.uri,
"message": f"Now following @{handle}!"
}, ensure_ascii=False, indent=2)
@mcp.tool()
def unfollow_user(handle: str) -> str:
"""
取消关注一个用户。
Args:
handle: 要取消关注的用户 handle
Returns:
取消关注结果
"""
client = get_client()
# 先获取用户的 DID
profile = client.get_profile(actor=handle)
success = client.unfollow(profile.did)
return json.dumps({
"success": True,
"unfollowed": handle,
"message": f"Unfollowed @{handle}!"
}, ensure_ascii=False, indent=2)
@mcp.tool()
def search_posts(
query: str,
limit: int = 25,
cursor: Optional[str] = None,
) -> str:
"""
搜索帖子。
Args:
query: 搜索关键词
limit: 返回数量,最大 100
cursor: 分页游标
Returns:
搜索结果
"""
client = get_client()
# 使用 app.bsky.feed.searchPosts
results = client.app.bsky.feed.search_posts({
"q": query,
"limit": min(limit, 100),
"cursor": cursor,
})
posts = [format_post({"post": p}) for p in results.posts]
return json.dumps({
"query": query,
"posts": posts,
"cursor": results.cursor if hasattr(results, "cursor") else None,
"count": len(posts),
}, ensure_ascii=False, indent=2)
@mcp.tool()
def search_users(
query: str,
limit: int = 25,
cursor: Optional[str] = None,
) -> str:
"""
搜索用户。
Args:
query: 搜索关键词
limit: 返回数量,最大 100
cursor: 分页游标
Returns:
搜索结果
"""
client = get_client()
# 使用 app.bsky.actor.searchActors
results = client.app.bsky.actor.search_actors({
"q": query,
"limit": min(limit, 100),
"cursor": cursor,
})
users = [
{
"did": u.did,
"handle": u.handle,
"display_name": u.display_name or u.handle,
"description": (u.description or "")[:200],
"avatar": u.avatar or "",
}
for u in results.actors
]
return json.dumps({
"query": query,
"users": users,
"cursor": results.cursor if hasattr(results, "cursor") else None,
"count": len(users),
}, ensure_ascii=False, indent=2)
# ============================================================================
# MCP 资源 (可选,用于暴露一些静态信息)
# ============================================================================
@mcp.resource("bluesky://profile")
def get_current_profile_resource() -> str:
"""
当前登录用户的资料(作为 MCP 资源)。
"""
return get_my_profile()
@mcp.resource("bluesky://notifications/unread")
def get_unread_count_resource() -> str:
"""
未读通知数量(作为 MCP 资源)。
"""
return get_unread_count()
# ============================================================================
# 入口点
# ============================================================================
if __name__ == "__main__":
# 使用 stdio 传输运行 MCP 服务器
mcp.run()