Skip to main content
Glama

SP-MCP

by organicmoron
MIT License
10
  • Linux
  • Apple
mcp_server.py19.9 kB
#!/usr/bin/env python3 # MCP Server for Super Productivity Integration import asyncio import json import logging import os import re import sys from pathlib import Path from typing import Any, Dict, List, Optional, Sequence import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions class SuperProductivityMCPServer: def __init__(self): self.server = Server("super-productivity") self.setup_directories() self.setup_logging() self.setup_tools() def setup_directories(self): if os.name == 'nt': # Windows data_dir = os.environ.get('APPDATA', os.path.expanduser('~/AppData/Roaming')) else: # Linux/Mac data_dir = os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) self.base_dir = Path(data_dir) / 'super-productivity-mcp' self.command_dir = self.base_dir / 'plugin_commands' self.response_dir = self.base_dir / 'plugin_responses' # Create directories self.base_dir.mkdir(parents=True, exist_ok=True) self.command_dir.mkdir(parents=True, exist_ok=True) self.response_dir.mkdir(parents=True, exist_ok=True) logging.info(f"MCP Server using directory: {self.base_dir}") logging.info(f"Command directory: {self.command_dir}") logging.info(f"Response directory: {self.response_dir}") def setup_logging(self): log_file = self.base_dir / 'mcp_server.log' logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stderr) ] ) def setup_tools(self): """Set up MCP tools""" @self.server.list_tools() async def handle_list_tools() -> List[types.Tool]: """List available tools""" return [ types.Tool( name="create_task", description="Create a new task in Super Productivity. When users provide natural language with time/date references, convert them to Super Productivity syntax in the title field.", inputSchema={ "type": "object", "properties": { "title": { "type": "string", "description": "Task title with Super Productivity syntax. Convert natural language time/date references to @syntax using days/weeks/months from TODAY (e.g., 'tomorrow' -> '@1days', 'Friday at 3pm' -> '@fri 3pm', 'next week' -> '@7days', 'push back a week' -> '@14days' if task was already a week out). Use @Xdays, @Yweeks, or @Zmonths where X/Y/Z is the number from today. Add #tags for urgency/priority and +projects as needed." }, "notes": { "type": "string", "description": "Task notes/description" }, "project_id": { "type": "string", "description": "Project ID to assign task to" }, "parent_id": { "type": "string", "description": "Parent task ID for subtasks" } }, "required": ["title"] } ), types.Tool( name="get_tasks", description="Get all tasks from Super Productivity", inputSchema={ "type": "object", "properties": { "include_done": { "type": "boolean", "description": "Include completed tasks", "default": True } } } ), types.Tool( name="update_task", description="Update an existing task. When users provide natural language with time/date references, convert them to Super Productivity syntax in the title field.", inputSchema={ "type": "object", "properties": { "task_id": { "type": "string", "description": "Task ID to update" }, "title": { "type": "string", "description": "New task title with Super Productivity syntax. Convert natural language time/date references to @syntax using days/weeks/months from TODAY (e.g., 'push back a week' -> '@14days' if task was already a week out, 'move to next Friday' -> '@5days' if next Friday is 5 days from today, 'reschedule for tomorrow' -> '@1days'). Use @Xdays, @Yweeks, or @Zmonths where X/Y/Z is the number from today. Add #tags for urgency/priority and +projects as needed." }, "notes": { "type": "string", "description": "New task notes" }, "is_done": { "type": "boolean", "description": "Mark task as done/undone" }, "time_estimate": { "type": "integer", "description": "Time estimate in milliseconds" }, "time_spent": { "type": "integer", "description": "Time spent in milliseconds" } }, "required": ["task_id"] } ), types.Tool( name="complete_and_archive_task", description="Complete a task (mark as done) in Super Productivity - NOTE: True deletion is not supported", inputSchema={ "type": "object", "properties": { "task_id": { "type": "string", "description": "Task ID to complete" } }, "required": ["task_id"] } ), types.Tool( name="get_projects", description="Get all projects from Super Productivity", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="create_project", description="Create a new project", inputSchema={ "type": "object", "properties": { "title": { "type": "string", "description": "Project title" }, "description": { "type": "string", "description": "Project description" }, "color": { "type": "string", "description": "Project color (hex code)" } }, "required": ["title"] } ), types.Tool( name="get_tags", description="Get all tags from Super Productivity", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="create_tag", description="Create a new tag", inputSchema={ "type": "object", "properties": { "title": { "type": "string", "description": "Tag title" }, "color": { "type": "string", "description": "Tag color (hex code)" } }, "required": ["title"] } ), types.Tool( name="show_notification", description="Show a notification in Super Productivity", inputSchema={ "type": "object", "properties": { "message": { "type": "string", "description": "Notification message" }, "type": { "type": "string", "enum": ["success", "info", "warning", "error"], "description": "Notification type", "default": "info" } }, "required": ["message"] } ), types.Tool( name="debug_directories", description="Debug the communication directories and show their status", inputSchema={ "type": "object", "properties": {} } ) ] @self.server.call_tool() async def handle_call_tool( name: str, arguments: Dict[str, Any] ) -> List[types.TextContent]: """Handle tool calls""" try: if name == "create_task": result = await self.create_task(arguments) elif name == "get_tasks": result = await self.get_tasks(arguments) elif name == "update_task": result = await self.update_task(arguments) elif name == "complete_and_archive_task": result = await self.complete_and_archive_task(arguments) elif name == "get_projects": result = await self.get_projects(arguments) elif name == "create_project": result = await self.create_project(arguments) elif name == "get_tags": result = await self.get_tags(arguments) elif name == "create_tag": result = await self.create_tag(arguments) elif name == "show_notification": result = await self.show_notification(arguments) elif name == "debug_directories": result = await self.debug_directories(arguments) else: raise ValueError(f"Unknown tool: {name}") return [types.TextContent(type="text", text=str(result))] except Exception as e: logging.error(f"Error in tool {name}: {str(e)}") return [types.TextContent(type="text", text=f"Error: {str(e)}")] async def send_command(self, action: str, **kwargs) -> Dict[str, Any]: """Send a command to Super Productivity plugin""" command = { "action": action, "id": f"{action}_{asyncio.get_event_loop().time()}", "timestamp": asyncio.get_event_loop().time(), **kwargs } # Write command file command_file = self.command_dir / f"{command['id']}.json" with open(command_file, 'w') as f: json.dump(command, f, indent=2) logging.info(f"Sent command: {action} -> {command_file}") # Wait for response (with timeout) response_file = self.response_dir / f"{command['id']}_response.json" for _ in range(30): # Wait up to 30 seconds if response_file.exists(): try: with open(response_file, 'r') as f: response = json.load(f) # Clean up response file response_file.unlink() logging.info(f"Received response for {action}: {response.get('success', 'unknown')}") return response except Exception as e: logging.error(f"Error reading response file: {e}") break await asyncio.sleep(1) # Timeout logging.warning(f"Timeout waiting for response to {action}") return {"success": False, "error": "Timeout waiting for response"} def parse_task_syntax(self, title: str) -> tuple: """Parse Super Productivity task syntax from title""" title_clean = title # Extract tags from title (format: #tagname) tag_matches = re.findall(r'#(\w+)', title_clean) title_clean = re.sub(r'\s*#\w+', '', title_clean).strip() # Extract projects from title (format: +projectname) project_matches = re.findall(r'\+(\w+)', title_clean) title_clean = re.sub(r'\s*\+\w+', '', title_clean).strip() # Extract scheduling syntax (format: @fri 4pm, @tomorrow, @2024-01-15, etc.) schedule_matches = re.findall(r'@(\w+(?:\s+\d+[ap]m)?)', title_clean, re.IGNORECASE) title_clean = re.sub(r'\s*@\w+(?:\s+\d+[ap]m)?', '', title_clean, flags=re.IGNORECASE).strip() # Extract time estimate/spent syntax (format: 10m/3h, 2h, 30m, etc.) time_matches = re.findall(r'(\d+[mh](?:/\d+[mh])?)', title_clean) title_clean = re.sub(r'\s*\d+[mh](?:/\d+[mh])?', '', title_clean).strip() return title_clean, tag_matches, project_matches, schedule_matches, time_matches async def create_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Create a new task""" title = args.get("title", "") # Create the task data - Claude should have already converted natural language to SP syntax task_data = { "title": title, # Use title as provided by Claude (should already have @syntax) "notes": args.get("notes", ""), "timeEstimate": args.get("time_estimate", 0), "projectId": args.get("project_id"), "parentId": args.get("parent_id"), "tagIds": [] } return await self.send_command("addTask", data=task_data) async def get_tasks(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get all tasks""" return await self.send_command("getTasks") async def update_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Update a task""" task_id = args.get("task_id") if not task_id: return {"success": False, "error": "task_id is required"} updates = {} # Handle title - Claude should have already converted natural language to SP syntax if "title" in args: updates["title"] = args["title"] if "notes" in args: updates["notes"] = args["notes"] if "is_done" in args: updates["isDone"] = args["is_done"] if args["is_done"]: updates["doneOn"] = asyncio.get_event_loop().time() * 1000 else: updates["doneOn"] = None if "time_estimate" in args: updates["timeEstimate"] = args["time_estimate"] if "time_spent" in args: updates["timeSpent"] = args["time_spent"] return await self.send_command("updateTask", taskId=task_id, data=updates) async def complete_and_archive_task(self, args: Dict[str, Any]) -> Dict[str, Any]: """Complete a task (mark as done) - true deletion is not supported""" task_id = args.get("task_id") if not task_id: return {"success": False, "error": "task_id is required"} # Mark task as done instead of deleting return await self.send_command("setTaskDone", taskId=task_id) async def get_projects(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get all projects""" return await self.send_command("getAllProjects") async def create_project(self, args: Dict[str, Any]) -> Dict[str, Any]: """Create a new project""" project_data = { "title": args.get("title", ""), "description": args.get("description", ""), "color": args.get("color", "#2196F3") } return await self.send_command("addProject", data=project_data) async def get_tags(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get all tags""" return await self.send_command("getAllTags") async def create_tag(self, args: Dict[str, Any]) -> Dict[str, Any]: """Create a new tag""" tag_data = { "title": args.get("title", ""), "color": args.get("color", "#FF9800") } return await self.send_command("addTag", data=tag_data) async def show_notification(self, args: Dict[str, Any]) -> Dict[str, Any]: """Show a notification""" return await self.send_command("showSnack", message=args.get("message", "")) async def debug_directories(self, args: Dict[str, Any]) -> Dict[str, Any]: return { "success": True, "base_directory": str(self.base_dir), "command_directory": str(self.command_dir), "response_directory": str(self.response_dir), "directories_exist": { "base": self.base_dir.exists(), "commands": self.command_dir.exists(), "responses": self.response_dir.exists() } } async def run(self): """Run the MCP server""" logging.info("Starting Super Productivity MCP Server...") # Initialize server async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="super-productivity", server_version="1.0.0", capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) async def main(): """Main entry point""" server = SuperProductivityMCPServer() await server.run() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/organicmoron/SP-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server