Skip to main content
Glama

DingTalk MCP Server V2

by wllcnm
import asyncio import logging import os import time import hmac import base64 import hashlib import json from typing import List from urllib.parse import quote_plus from datetime import datetime, timedelta import aiohttp from mcp.server import Server from mcp.types import Tool, TextContent from mcp.server.stdio import stdio_server # 日志配置 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("dingding_mcp_server") class DingdingMCPServer: def __init__(self): logger.info("Initializing DingTalk MCP server...") self.app = Server("dingding_mcp_server") self.setup_tools() self.access_token = None self.token_expires = 0 self.v2_access_token = None self.v2_token_expires = 0 self.session = None logger.info("Server initialized successfully") async def ensure_session(self): if self.session is None: logger.debug("Creating new aiohttp session") self.session = aiohttp.ClientSession() async def get_access_token(self): """获取旧版 API 的 access_token""" logger.debug("Getting access token...") if self.access_token and time.time() < self.token_expires: logger.debug("Using cached access token") return self.access_token await self.ensure_session() app_key = os.environ.get("DINGTALK_APP_KEY") app_secret = os.environ.get("DINGTALK_APP_SECRET") if not all([app_key, app_secret]): logger.error("Missing DingTalk API credentials") raise ValueError("Missing DingTalk API credentials in environment variables") url = "https://oapi.dingtalk.com/gettoken" params = { "appkey": app_key, "appsecret": app_secret } logger.debug(f"Requesting access token from {url}") async with self.session.get(url, params=params) as response: data = await response.json() logger.debug(f"Access token response: {data}") if data.get("errcode") == 0: self.access_token = data["access_token"] self.token_expires = time.time() + data["expires_in"] - 200 return self.access_token else: logger.error(f"Failed to get access token: {data}") raise Exception(f"Failed to get access token: {data}") async def get_v2_access_token(self): """获取新版 API 的 access_token""" logger.debug("Getting v2 access token...") if self.v2_access_token and time.time() < self.v2_token_expires: logger.debug("Using cached v2 access token") return self.v2_access_token await self.ensure_session() app_key = os.environ.get("DINGTALK_APP_KEY") app_secret = os.environ.get("DINGTALK_APP_SECRET") if not all([app_key, app_secret]): logger.error("Missing DingTalk API credentials") raise ValueError("Missing DingTalk API credentials in environment variables") url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" data = { "appKey": app_key, "appSecret": app_secret } logger.debug(f"Requesting v2 access token from {url}") async with self.session.post(url, json=data) as response: result = await response.json() logger.debug(f"V2 access token response: {result}") if "accessToken" in result: self.v2_access_token = result["accessToken"] self.v2_token_expires = time.time() + result.get("expireIn", 7200) - 200 return self.v2_access_token else: logger.error(f"Failed to get v2 access token: {result}") raise Exception(f"Failed to get v2 access token: {result}") async def get_department_list(self, access_token: str): """获取部门列表""" url = "https://oapi.dingtalk.com/department/list" params = {"access_token": access_token} async with self.session.get(url, params=params) as response: data = await response.json() if data.get("errcode") == 0: return data["department"] else: raise Exception(f"Failed to get department list: {data}") async def get_department_users(self, access_token: str, department_id: int): """获取部门用户基础信息""" url = "https://oapi.dingtalk.com/user/simplelist" params = { "access_token": access_token, "department_id": department_id } async with self.session.get(url, params=params) as response: data = await response.json() if data.get("errcode") == 0: return data["userlist"] else: raise Exception(f"Failed to get department users: {data}") async def get_user_detail(self, access_token: str, userid: str): """获取用户详细信息""" url = "https://oapi.dingtalk.com/user/get" params = { "access_token": access_token, "userid": userid } async with self.session.get(url, params=params) as response: data = await response.json() if data.get("errcode") == 0: return data else: raise Exception(f"Failed to get user detail: {data}") async def get_user_unionid(self, access_token: str, userid: str): """获取用户的 unionId""" url = "https://oapi.dingtalk.com/topapi/v2/user/get" params = { "access_token": access_token } data = { "userid": userid } headers = { "Content-Type": "application/json" } async with self.session.post(url, params=params, json=data, headers=headers) as response: result = await response.json() if result.get("errcode") == 0: return result["result"]["unionid"] else: raise Exception(f"Failed to get user unionid: {result}") async def get_calendar_list(self, access_token: str, userid: str, start_time: int = None, end_time: int = None, max_results: int = 50, next_token: str = None): """获取用户日程列表""" # 1. 先获取用户的 unionId unionid = await self.get_user_unionid(access_token, userid) # 2. 构建日历 API URL url = f"https://api.dingtalk.com/v1.0/calendar/users/{unionid}/calendars/primary/events" # 3. 如果没有指定时间范围,默认查询从现在开始7天内的日程 if not start_time: start_time = int(time.time() * 1000) if not end_time: end_time = int((time.time() + 7 * 24 * 3600) * 1000) # 默认7天 params = { "maxResults": max_results, "timeMin": datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%dT%H:%M:%S+08:00"), "timeMax": datetime.fromtimestamp(end_time / 1000).strftime("%Y-%m-%dT%H:%M:%S+08:00") } # 4. 添加分页 token if next_token: params["nextToken"] = next_token headers = { "x-acs-dingtalk-access-token": access_token, "Content-Type": "application/json" } logger.debug(f"Requesting calendar list with params: {params}") async with self.session.get(url, params=params, headers=headers) as response: result = await response.json() logger.debug(f"Calendar list response: {result}") if "events" in result: return result else: raise Exception(f"Failed to get calendar list: {result}") def setup_tools(self): logger.info("Setting up tools...") @self.app.list_tools() async def list_tools() -> List[Tool]: logger.debug("Listing available tools") tools = [ Tool( name="get_access_token", description="获取钉钉 access_token。当你需要测试 API 连接状态,或者需要手动获取 access_token 时使用此工具。" "注意:通常不需要手动调用此工具,因为其他工具会自动获取和管理 token。" "每个 access_token 的有效期为 7200 秒,有效期内重复获取会返回相同结果,并自动续期。", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="find_user_by_name", description="根据用户姓名查询用户的详细信息。当你只知道用户的姓名,需要获取该用户的其他信息(如 userid、手机号、邮箱等)时使用此工具。" "此工具会执行以下步骤:1) 获取所有部门列表;2) 遍历每个部门查找指定姓名的用户;3) 返回所有匹配用户的详细信息。" "如果公司内有多个同名用户,会返回所有匹配用户的信息。" "适用场景:根据用户姓名查找其 userid、获取用户联系方式、验证用户身份等。", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "要查询的用户姓名,例如:张三" } }, "required": ["name"] } ), Tool( name="get_department_list", description="获取企业内所有部门的列表信息。当你需要了解公司的组织架构,或者需要获取某个部门的 ID 时使用此工具。" "返回的信息包括:部门ID、部门名称、父部门ID等。" "这通常是查询用户信息的第一步,因为很多其他 API 都需要部门 ID。" "适用场景:获取组织架构、查找部门ID、统计部门数量等。", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_department_users", description="获取指定部门下的所有成员列表。当你需要了解某个部门有哪些成员,或者需要批量获取部门成员信息时使用此工具。" "返回的是成员的基础信息,包括:userid、名字、职位等。" "如果需要某个用户的详细信息,需要再调用 get_user_detail 工具。" "适用场景:统计部门人数、获取部门成员列表、查找部门内特定职位的人员等。", inputSchema={ "type": "object", "properties": { "department_id": { "type": "integer", "description": "部门的ID,可以通过 get_department_list 工具获取" } }, "required": ["department_id"] } ), Tool( name="get_user_detail", description="获取指定用户的详细信息。当你已经知道用户的 userid,需要获取该用户的详细信息时使用此工具。" "返回的详细信息包括:姓名、手机号、邮箱、所在部门、职位、工号、是否管理员等。" "这是获取用户最完整信息的方法,通常在确认用户身份、获取用户联系方式时使用。" "适用场景:获取用户联系方式、验证用户身份、查询用户所在部门和职位等。", inputSchema={ "type": "object", "properties": { "userid": { "type": "string", "description": "用户的 userid,例如:202407101414" } }, "required": ["userid"] } ), Tool( name="get_calendar_list", description="查询指定用户的日程列表。当你需要查看用户的日程安排,或者需要了解会议室预订情况时使用此工具。" "支持的功能:1) 按时间范围筛选日程;2) 分页获取日程列表;3) 获取会议详情。" "返回的信息包括:日程标题、开始时间、结束时间、地点、组织者、参与者、会议链接等。" "如果不指定时间范围,默认查询从当前时间开始7天内的日程。" "适用场景:查询用户日程安排、获取会议室预订情况、统计会议时长、导出日程表等。", inputSchema={ "type": "object", "properties": { "userid": { "type": "string", "description": "要查询日程的用户ID,例如:202407101414" }, "start_time": { "type": "integer", "description": "开始时间的时间戳(毫秒),例如:1704067200000 表示 2024-01-01 00:00:00" }, "end_time": { "type": "integer", "description": "结束时间的时间戳(毫秒),例如:1704153600000 表示 2024-01-02 00:00:00" }, "max_results": { "type": "integer", "description": "单次返回的最大日程数量,默认为 50,建议不要设置太大以免影响性能", "default": 50 }, "next_token": { "type": "string", "description": "分页标记,从上一次查询结果中获取,用于获取下一页数据" } }, "required": ["userid"] } ) ] logger.debug(f"Available tools: {[tool.name for tool in tools]}") return tools @self.app.call_tool() async def call_tool(name: str, arguments: dict) -> List[TextContent]: logger.info(f"Calling tool: {name} with arguments: {arguments}") await self.ensure_session() try: if name == "get_access_token": access_token = await self.get_access_token() return [TextContent(type="text", text=f"Access Token: {access_token}")] elif name == "find_user_by_name": logger.debug(f"Finding user by name: {arguments['name']}") name = arguments["name"] access_token = await self.get_access_token() found_users = [] departments = await self.get_department_list(access_token) for dept in departments: users = await self.get_department_users(access_token, dept["id"]) for user in users: if user["name"] == name: user_detail = await self.get_user_detail(access_token, user["userid"]) found_users.append(user_detail) if found_users: response = { "total": len(found_users), "users": found_users } return [TextContent(type="text", text=f"Found users: {json.dumps(response, ensure_ascii=False, indent=2)}")] else: return [TextContent(type="text", text=f"User not found: {name}")] elif name == "get_department_list": logger.debug("Getting department list") access_token = await self.get_access_token() departments = await self.get_department_list(access_token) return [TextContent(type="text", text=f"Department list: {json.dumps(departments, ensure_ascii=False, indent=2)}")] elif name == "get_department_users": logger.debug(f"Getting users for department: {arguments['department_id']}") access_token = await self.get_access_token() department_id = arguments["department_id"] users = await self.get_department_users(access_token, department_id) return [TextContent(type="text", text=f"Department users: {json.dumps(users, ensure_ascii=False, indent=2)}")] elif name == "get_user_detail": logger.debug(f"Getting user detail for: {arguments['userid']}") access_token = await self.get_access_token() userid = arguments["userid"] user_detail = await self.get_user_detail(access_token, userid) return [TextContent(type="text", text=f"User detail: {json.dumps(user_detail, ensure_ascii=False, indent=2)}")] elif name == "get_calendar_list": logger.debug(f"Getting calendar list for user: {arguments['userid']}") access_token = await self.get_v2_access_token() userid = arguments["userid"] start_time = arguments.get("start_time") end_time = arguments.get("end_time") max_results = arguments.get("max_results", 50) next_token = arguments.get("next_token") calendar_list = await self.get_calendar_list( access_token, userid, start_time, end_time, max_results, next_token ) formatted_events = [] for event in calendar_list.get("events", []): formatted_event = { "summary": event.get("summary", "无标题"), "start_time": event.get("start", {}).get("dateTime"), "end_time": event.get("end", {}).get("dateTime"), "location": event.get("location", {}).get("meetingRooms", ["无地点"])[0] if event.get("location", {}).get("meetingRooms") else "无地点", "organizer": event.get("organizer", {}).get("displayName", "未知"), "description": event.get("description", "无描述"), "status": event.get("status", "未知"), "attendees": [ { "name": attendee.get("displayName", "未知"), "response": attendee.get("responseStatus", "未知") } for attendee in event.get("attendees", []) ], "online_meeting": event.get("onlineMeetingInfo", {}).get("extraInfo", {}).get("extraUrl", "无会议链接") } formatted_events.append(formatted_event) response = { "events": formatted_events, "next_token": calendar_list.get("nextToken"), "total": len(formatted_events) } return [TextContent(type="text", text=f"Calendar events: {json.dumps(response, ensure_ascii=False, indent=2)}")] else: logger.warning(f"Unknown tool: {name}") return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Error calling tool {name}: {str(e)}", exc_info=True) return [TextContent(type="text", text=f"Error calling tool {name}: {str(e)}")] async def run(self): logger.info("Starting DingTalk MCP server...") async with stdio_server() as (read_stream, write_stream): try: logger.debug("Initializing MCP server") await self.app.run( read_stream, write_stream, self.app.create_initialization_options() ) except Exception as e: logger.error(f"Server error: {str(e)}", exc_info=True) raise finally: if self.session: logger.debug("Closing aiohttp session") await self.session.close() def main(): logger.info("Starting main function") server = DingdingMCPServer() asyncio.run(server.run()) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wllcnm/dingding_mcp_v2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server