Skip to main content
Glama

MCP Reminder Service

by xiaohui
main.py3.75 kB
#!/usr/bin/env python3 """ MCP 提醒服务入口点 基于 FastMCP 实现,支持多种传输方式: - stdio: 用于 Cursor MCP 集成 - http: HTTP API 服务器 - sse: Server-Sent Events 服务器 """ import os import sys from pathlib import Path # 添加项目根目录到 Python 路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from fastmcp import FastMCP from mcp_reminder.config.settings import Settings from mcp_reminder.tools.message_tools import MessageTools from mcp_reminder.tools.scheduler_tools import SchedulerTools from mcp_reminder.utils.logger import setup_logging, get_logger def main(): """主函数""" # 加载配置 settings = Settings.from_env() settings.validate() # 设置日志 setup_logging(settings.logging) logger = get_logger(__name__) # 创建 FastMCP 应用 app = FastMCP("mcp-reminder") # 初始化工具 message_tools = MessageTools(settings) scheduler_tools = SchedulerTools(settings) # 注册工具 @app.tool() async def send_message(provider: str, message: str) -> str: """发送消息到指定平台""" result = await message_tools.send_message(provider, message) return str(result) @app.tool() async def send_to_all(message: str) -> str: """向所有配置的平台发送消息""" result = await message_tools.send_to_all(message) return str(result) @app.tool() async def test_connection() -> str: """测试所有提供商的连接""" result = await message_tools.test_connection() return str(result) @app.tool() async def create_schedule(name: str, cron: str, message: str, providers: list) -> str: """创建定时任务""" result = await scheduler_tools.create_schedule(name, cron, message, providers) return str(result) @app.tool() async def list_schedules() -> str: """列出所有定时任务""" result = await scheduler_tools.list_schedules() return str(result) @app.tool() async def delete_schedule(name: str) -> str: """删除定时任务""" result = await scheduler_tools.delete_schedule(name) return str(result) @app.tool() async def update_schedule(name: str, cron: str = None, message: str = None, providers: list = None) -> str: """更新定时任务""" result = await scheduler_tools.update_schedule(name, cron, message, providers) return str(result) # 注册资源 @app.resource("resource://status") def get_status(): """获取服务器状态""" return { "status": "running", "version": "3.0.0", "providers": settings.get_configured_providers(), } logger.info("=== MCP 提醒服务启动 ===") logger.info(f"服务器模式: {settings.server.mode}") logger.info(f"服务器地址: {settings.server.host}:{settings.server.port}") logger.info(f"已配置提供商: {', '.join([k for k, v in settings.get_configured_providers().items() if v])}") logger.info("=============================") # 根据模式启动服务器 if settings.server.mode == "http": app.run(transport="http", host=settings.server.host, port=settings.server.port) elif settings.server.mode == "sse": app.run(transport="sse", host=settings.server.host, port=settings.server.port) else: app.run() # 默认使用 stdio if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n收到中断信号,正在退出...") sys.exit(0) except Exception as e: print(f"服务启动失败: {e}") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xiaohui/mcp_reminder'

If you have feedback or need assistance with the MCP directory API, please join our Discord server