DingTalk MCP Server V2

  • src
import asyncio import logging import os import time import hmac import base64 import hashlib import json from typing import List from urllib.parse import quote_plus from datetime import datetime, timedelta import aiohttp from mcp.server import Server from mcp.types import Tool, TextContent from mcp.server.stdio import stdio_server # 日志配置 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("dingding_mcp_server") class DingdingMCPServer: def __init__(self): logger.info("Initializing DingTalk MCP server...") self.app = Server("dingding_mcp_server") self.setup_tools() self.access_token = None self.token_expires = 0 self.v2_access_token = None self.v2_token_expires = 0 self.session = None logger.info("Server initialized successfully") async def ensure_session(self): if self.session is None: logger.debug("Creating new aiohttp session") self.session = aiohttp.ClientSession() async def get_access_token(self): """获取旧版 API 的 access_token""" logger.debug("Getting access token...") if self.access_token and time.time() < self.token_expires: logger.debug("Using cached access token") return self.access_token await self.ensure_session() app_key = os.environ.get("DINGTALK_APP_KEY") app_secret = os.environ.get("DINGTALK_APP_SECRET") if not all([app_key, app_secret]): logger.error("Missing DingTalk API credentials") raise ValueError("Missing DingTalk API credentials in environment variables") url = "https://oapi.dingtalk.com/gettoken" params = { "appkey": app_key, "appsecret": app_secret } logger.debug(f"Requesting access token from {url}") async with self.session.get(url, params=params) as response: data = await response.json() logger.debug(f"Access token response: {data}") if data.get("errcode") == 0: self.access_token = data["access_token"] self.token_expires = time.time() + data["expires_in"] - 200 return self.access_token else: logger.error(f"Failed to get access token: {data}") raise Exception(f"Failed to get access token: {data}") async def get_v2_access_token(self): """获取新版 API 的 access_token""" logger.debug("Getting v2 access token...") if self.v2_access_token and time.time() < self.v2_token_expires: logger.debug("Using cached v2 access token") return self.v2_access_token await self.ensure_session() app_key = os.environ.get("DINGTALK_APP_KEY") app_secret = os.environ.get("DINGTALK_APP_SECRET") if not all([app_key, app_secret]): logger.error("Missing DingTalk API credentials") raise ValueError("Missing DingTalk API credentials in environment variables") url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" data = { "appKey": app_key, "appSecret": app_secret } logger.debug(f"Requesting v2 access token from {url}") async with self.session.post(url, json=data) as response: result = await response.json() logger.debug(f"V2 access token response: {result}") if "accessToken" in result: self.v2_access_token = result["accessToken"] self.v2_token_expires = time.time() + result.get("expireIn", 7200) - 200 return self.v2_access_token else: logger.error(f"Failed to get v2 access token: {result}") raise Exception(f"Failed to get v2 access token: {result}") async def get_department_list(self, access_token: str): """获取部门列表""" url = "https://oapi.dingtalk.com/department/list" params = {"access_token": access_token} async with self.session.get(url, params=params) as response: data = await response.json() if data.get("errcode") == 0: return data["department"] else: raise Exception(f"Failed to get department list: {data}") async def get_department_users(self, access_token: str, department_id: int): """获取部门用户基础信息""" url = "https://oapi.dingtalk.com/user/simplelist" params = { "access_token": access_token, "department_id": department_id } async with self.session.get(url, params=params) as response: data = await response.json() if data.get("errcode") == 0: return data["userlist"] else: raise Exception(f"Failed to get department users: {data}") async def get_user_detail(self, access_token: str, userid: str): """获取用户详细信息""" url = "https://oapi.dingtalk.com/user/get" params = { "access_token": access_token, "userid": userid } async with self.session.get(url, params=params) as response: data = await response.json() if data.get("errcode") == 0: return data else: raise Exception(f"Failed to get user detail: {data}") async def get_user_unionid(self, access_token: str, userid: str): """获取用户的 unionId""" url = "https://oapi.dingtalk.com/topapi/v2/user/get" params = { "access_token": access_token } data = { "userid": userid } headers = { "Content-Type": "application/json" } async with self.session.post(url, params=params, json=data, headers=headers) as response: result = await response.json() if result.get("errcode") == 0: return result["result"]["unionid"] else: raise Exception(f"Failed to get user unionid: {result}") async def get_calendar_list(self, access_token: str, userid: str, start_time: int = None, end_time: int = None, max_results: int = 50, next_token: str = None): """获取用户日程列表""" # 1. 先获取用户的 unionId unionid = await self.get_user_unionid(access_token, userid) # 2. 构建日历 API URL url = f"https://api.dingtalk.com/v1.0/calendar/users/{unionid}/calendars/primary/events" # 3. 如果没有指定时间范围,默认查询从现在开始7天内的日程 if not start_time: start_time = int(time.time() * 1000) if not end_time: end_time = int((time.time() + 7 * 24 * 3600) * 1000) # 默认7天 params = { "maxResults": max_results, "timeMin": datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%dT%H:%M:%S+08:00"), "timeMax": datetime.fromtimestamp(end_time / 1000).strftime("%Y-%m-%dT%H:%M:%S+08:00") } # 4. 添加分页 token if next_token: params["nextToken"] = next_token headers = { "x-acs-dingtalk-access-token": access_token, "Content-Type": "application/json" } logger.debug(f"Requesting calendar list with params: {params}") async with self.session.get(url, params=params, headers=headers) as response: result = await response.json() logger.debug(f"Calendar list response: {result}") if "events" in result: return result else: raise Exception(f"Failed to get calendar list: {result}") def setup_tools(self): logger.info("Setting up tools...") @self.app.list_tools() async def list_tools() -> List[Tool]: logger.debug("Listing available tools") tools = [ Tool( name="get_access_token", description="获取钉钉 access_token。当你需要测试 API 连接状态,或者需要手动获取 access_token 时使用此工具。" "注意:通常不需要手动调用此工具,因为其他工具会自动获取和管理 token。" "每个 access_token 的有效期为 7200 秒,有效期内重复获取会返回相同结果,并自动续期。", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="find_user_by_name", description="根据用户姓名查询用户的详细信息。当你只知道用户的姓名,需要获取该用户的其他信息(如 userid、手机号、邮箱等)时使用此工具。" "此工具会执行以下步骤:1) 获取所有部门列表;2) 遍历每个部门查找指定姓名的用户;3) 返回所有匹配用户的详细信息。" "如果公司内有多个同名用户,会返回所有匹配用户的信息。" "适用场景:根据用户姓名查找其 userid、获取用户联系方式、验证用户身份等。", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "要查询的用户姓名,例如:张三" } }, "required": ["name"] } ), Tool( name="get_department_list", description="获取企业内所有部门的列表信息。当你需要了解公司的组织架构,或者需要获取某个部门的 ID 时使用此工具。" "返回的信息包括:部门ID、部门名称、父部门ID等。" "这通常是查询用户信息的第一步,因为很多其他 API 都需要部门 ID。" "适用场景:获取组织架构、查找部门ID、统计部门数量等。", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_department_users", description="获取指定部门下的所有成员列表。当你需要了解某个部门有哪些成员,或者需要批量获取部门成员信息时使用此工具。" "返回的是成员的基础信息,包括:userid、名字、职位等。" "如果需要某个用户的详细信息,需要再调用 get_user_detail 工具。" "适用场景:统计部门人数、获取部门成员列表、查找部门内特定职位的人员等。", inputSchema={ "type": "object", "properties": { "department_id": { "type": "integer", "description": "部门的ID,可以通过 get_department_list 工具获取" } }, "required": ["department_id"] } ), Tool( name="get_user_detail", description="获取指定用户的详细信息。当你已经知道用户的 userid,需要获取该用户的详细信息时使用此工具。" "返回的详细信息包括:姓名、手机号、邮箱、所在部门、职位、工号、是否管理员等。" "这是获取用户最完整信息的方法,通常在确认用户身份、获取用户联系方式时使用。" "适用场景:获取用户联系方式、验证用户身份、查询用户所在部门和职位等。", inputSchema={ "type": "object", "properties": { "userid": { "type": "string", "description": "用户的 userid,例如:202407101414" } }, "required": ["userid"] } ), Tool( name="get_calendar_list", description="查询指定用户的日程列表。当你需要查看用户的日程安排,或者需要了解会议室预订情况时使用此工具。" "支持的功能:1) 按时间范围筛选日程;2) 分页获取日程列表;3) 获取会议详情。" "返回的信息包括:日程标题、开始时间、结束时间、地点、组织者、参与者、会议链接等。" "如果不指定时间范围,默认查询从当前时间开始7天内的日程。" "适用场景:查询用户日程安排、获取会议室预订情况、统计会议时长、导出日程表等。", inputSchema={ "type": "object", "properties": { "userid": { "type": "string", "description": "要查询日程的用户ID,例如:202407101414" }, "start_time": { "type": "integer", "description": "开始时间的时间戳(毫秒),例如:1704067200000 表示 2024-01-01 00:00:00" }, "end_time": { "type": "integer", "description": "结束时间的时间戳(毫秒),例如:1704153600000 表示 2024-01-02 00:00:00" }, "max_results": { "type": "integer", "description": "单次返回的最大日程数量,默认为 50,建议不要设置太大以免影响性能", "default": 50 }, "next_token": { "type": "string", "description": "分页标记,从上一次查询结果中获取,用于获取下一页数据" } }, "required": ["userid"] } ) ] logger.debug(f"Available tools: {[tool.name for tool in tools]}") return tools @self.app.call_tool() async def call_tool(name: str, arguments: dict) -> List[TextContent]: logger.info(f"Calling tool: {name} with arguments: {arguments}") await self.ensure_session() try: if name == "get_access_token": access_token = await self.get_access_token() return [TextContent(type="text", text=f"Access Token: {access_token}")] elif name == "find_user_by_name": logger.debug(f"Finding user by name: {arguments['name']}") name = arguments["name"] access_token = await self.get_access_token() found_users = [] departments = await self.get_department_list(access_token) for dept in departments: users = await self.get_department_users(access_token, dept["id"]) for user in users: if user["name"] == name: user_detail = await self.get_user_detail(access_token, user["userid"]) found_users.append(user_detail) if found_users: response = { "total": len(found_users), "users": found_users } return [TextContent(type="text", text=f"Found users: {json.dumps(response, ensure_ascii=False, indent=2)}")] else: return [TextContent(type="text", text=f"User not found: {name}")] elif name == "get_department_list": logger.debug("Getting department list") access_token = await self.get_access_token() departments = await self.get_department_list(access_token) return [TextContent(type="text", text=f"Department list: {json.dumps(departments, ensure_ascii=False, indent=2)}")] elif name == "get_department_users": logger.debug(f"Getting users for department: {arguments['department_id']}") access_token = await self.get_access_token() department_id = arguments["department_id"] users = await self.get_department_users(access_token, department_id) return [TextContent(type="text", text=f"Department users: {json.dumps(users, ensure_ascii=False, indent=2)}")] elif name == "get_user_detail": logger.debug(f"Getting user detail for: {arguments['userid']}") access_token = await self.get_access_token() userid = arguments["userid"] user_detail = await self.get_user_detail(access_token, userid) return [TextContent(type="text", text=f"User detail: {json.dumps(user_detail, ensure_ascii=False, indent=2)}")] elif name == "get_calendar_list": logger.debug(f"Getting calendar list for user: {arguments['userid']}") access_token = await self.get_v2_access_token() userid = arguments["userid"] start_time = arguments.get("start_time") end_time = arguments.get("end_time") max_results = arguments.get("max_results", 50) next_token = arguments.get("next_token") calendar_list = await self.get_calendar_list( access_token, userid, start_time, end_time, max_results, next_token ) formatted_events = [] for event in calendar_list.get("events", []): formatted_event = { "summary": event.get("summary", "无标题"), "start_time": event.get("start", {}).get("dateTime"), "end_time": event.get("end", {}).get("dateTime"), "location": event.get("location", {}).get("meetingRooms", ["无地点"])[0] if event.get("location", {}).get("meetingRooms") else "无地点", "organizer": event.get("organizer", {}).get("displayName", "未知"), "description": event.get("description", "无描述"), "status": event.get("status", "未知"), "attendees": [ { "name": attendee.get("displayName", "未知"), "response": attendee.get("responseStatus", "未知") } for attendee in event.get("attendees", []) ], "online_meeting": event.get("onlineMeetingInfo", {}).get("extraInfo", {}).get("extraUrl", "无会议链接") } formatted_events.append(formatted_event) response = { "events": formatted_events, "next_token": calendar_list.get("nextToken"), "total": len(formatted_events) } return [TextContent(type="text", text=f"Calendar events: {json.dumps(response, ensure_ascii=False, indent=2)}")] else: logger.warning(f"Unknown tool: {name}") return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Error calling tool {name}: {str(e)}", exc_info=True) return [TextContent(type="text", text=f"Error calling tool {name}: {str(e)}")] async def run(self): logger.info("Starting DingTalk MCP server...") async with stdio_server() as (read_stream, write_stream): try: logger.debug("Initializing MCP server") await self.app.run( read_stream, write_stream, self.app.create_initialization_options() ) except Exception as e: logger.error(f"Server error: {str(e)}", exc_info=True) raise finally: if self.session: logger.debug("Closing aiohttp session") await self.session.close() def main(): logger.info("Starting main function") server = DingdingMCPServer() asyncio.run(server.run()) if __name__ == "__main__": main()