Skip to main content
Glama
by mikemc
tasks.py23.9 kB
#!/usr/bin/env python3 import logging import json from typing import Optional, Dict, Any from mcp.server.fastmcp import Context logger = logging.getLogger("todoist-mcp-server") def todoist_add_task( ctx: Context, content: str, description: Optional[str] = None, project_id: Optional[str] = None, section_id: Optional[str] = None, parent_id: Optional[str] = None, order: Optional[int] = None, labels: Optional[list[str]] = None, priority: Optional[int] = None, due_string: Optional[str] = None, due_date: Optional[str] = None, due_datetime: Optional[str] = None, due_lang: Optional[str] = None, assignee_id: Optional[str] = None, duration: Optional[int] = None, duration_unit: Optional[str] = None, deadline_date: Optional[str] = None, deadline_lang: Optional[str] = None ) -> str: """Create a new task in Todoist with optional description, due date, and priority Args: content: The content/title of the task description: Detailed description of the task (optional) project_id: Task project ID. If not set, task is put to user's Inbox (optional) section_id: ID of section to put task into (optional) parent_id: Parent task ID (optional) order: Non-zero integer value used to sort tasks under the same parent (optional) labels: The task's labels (a list of names that may represent either personal or shared labels) (optional) priority: Task priority from 1 (normal) to 4 (urgent) (optional) due_string: Natural language due date like 'tomorrow', 'next Monday', 'Jan 23' (optional) due_date: Specific date in YYYY-MM-DD format relative to user's timezone (optional) due_datetime: Specific date and time in RFC3339 format in UTC (optional) due_lang: 2-letter code specifying language in case due_string is not written in English (optional) assignee_id: The responsible user ID (only applies to shared tasks) (optional) duration: A positive integer for the amount of duration_unit the task will take (optional) duration_unit: The unit of time that the duration field represents (minute or day) (optional) deadline_date: Specific date in YYYY-MM-DD format relative to user's timezone (optional) deadline_lang: 2-letter code specifying language of deadline (optional) """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Creating task: {content}") task_params = {"content": content} # Efficiently filter out None values to avoid sending unnecessary API parameters optional_params = { "description": description, "project_id": project_id, "section_id": section_id, "parent_id": parent_id, "order": order, "labels": labels, "assignee_id": assignee_id, "due_string": due_string, "due_lang": due_lang, "deadline_lang": deadline_lang, } for key, value in optional_params.items(): if value is not None: task_params[key] = value # Transform string dates to objects since v3 API expects proper date/datetime types if due_date is not None: from datetime import date if isinstance(due_date, str): task_params["due_date"] = date.fromisoformat(due_date) else: task_params["due_date"] = due_date if due_datetime is not None: from datetime import datetime if isinstance(due_datetime, str): # Normalize RFC3339 format to Python's expected format if due_datetime.endswith('Z'): due_datetime = due_datetime[:-1] + '+00:00' task_params["due_datetime"] = datetime.fromisoformat(due_datetime) else: task_params["due_datetime"] = due_datetime if deadline_date is not None: from datetime import date if isinstance(deadline_date, str): task_params["deadline_date"] = date.fromisoformat(deadline_date) else: task_params["deadline_date"] = deadline_date # Validate priority bounds to prevent API errors if priority is not None and 1 <= priority <= 4: task_params["priority"] = priority # Duration requires both values to be meaningful - enforce this constraint if duration is not None and duration_unit is not None: if duration > 0 and duration_unit in ["minute", "day"]: task_params["duration"] = duration task_params["duration_unit"] = duration_unit else: logger.warning("Invalid duration parameters: duration must be > 0 and unit must be 'minute' or 'day'") task = todoist_client.add_task(**task_params) logger.info(f"Task created successfully: {task.id}") return json.dumps(task.to_dict(), indent=2, default=str) except Exception as error: logger.error(f"Error creating task: {error}") return f"Error creating task: {str(error)}" def todoist_get_tasks( ctx: Context, project_id: Optional[str] = None, section_id: Optional[str] = None, parent_id: Optional[str] = None, label: Optional[str] = None, ids: Optional[list[str]] = None, nmax: Optional[int] = 100, limit: int = 200 ) -> str: """Get a list of tasks from Todoist with basic filters This is a wrapper around the Todoist API's get_tasks method that handles pagination automatically. By default, it will fetch up to 100 matching tasks. Set nmax=None to fetch ALL matching tasks across multiple API calls. For natural language filtering (like 'today', 'overdue'), use todoist_filter_tasks instead. Examples: # Get up to 100 tasks (default) todoist_get_tasks(ctx) # Get all tasks in a project (up to 100 by default) todoist_get_tasks(ctx, project_id="12345") # Get first 500 tasks total todoist_get_tasks(ctx, nmax=500) # Get ALL tasks (unlimited) todoist_get_tasks(ctx, nmax=None) # Get specific tasks by ID todoist_get_tasks(ctx, ids=["task1", "task2", "task3"]) Args: project_id: Filter tasks by project ID (optional) section_id: Filter tasks by section ID (optional) parent_id: Filter tasks by parent task ID (optional) label: Filter tasks by label name (optional) ids: A list of the IDs of the tasks to retrieve (optional) nmax: Maximum total number of tasks to return. Set to None for ALL matching tasks (default: 100) limit: Number of tasks to fetch per API request (default: 200, max: 200) """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Getting tasks with project_id: {project_id}, section_id: {section_id}, parent_id: {parent_id}, label: {label}, nmax: {nmax}, limit: {limit}") # Early exit for zero requests to avoid unnecessary API calls if nmax is not None: if nmax == 0: logger.info("nmax=0 specified, returning empty result") return [] elif nmax < 0: logger.warning(f"Invalid nmax {nmax}, using default of 100") nmax = 100 if limit > 200: logger.warning(f"Limit {limit} exceeds API maximum of 200, using 200 instead") limit = 200 elif limit <= 0: logger.warning(f"Invalid limit {limit}, using default of 200") limit = 200 # Key optimization: match page size to actual need to reduce API payload effective_limit = limit if nmax is not None and nmax < limit: effective_limit = nmax logger.info(f"Optimized limit from {limit} to {effective_limit} to match nmax") params = {} if project_id: params["project_id"] = project_id if section_id: params["section_id"] = section_id if parent_id: params["parent_id"] = parent_id if label: params["label"] = label if ids: params["ids"] = ids params["limit"] = effective_limit tasks_iterator = todoist_client.get_tasks(**params) all_tasks = [] pages_fetched = 0 for task_batch in tasks_iterator: pages_fetched += 1 all_tasks.extend(task_batch) logger.info(f"Fetched page {pages_fetched} with {len(task_batch)} tasks (total: {len(all_tasks)})") if nmax is not None and len(all_tasks) >= nmax: # Trim excess - rare due to effective_limit optimization, but handles edge cases all_tasks = all_tasks[:nmax] logger.info(f"Reached nmax of {nmax} tasks, stopping pagination") break # Todoist API signals end of results by returning fewer items than requested if len(task_batch) < effective_limit: logger.info(f"Received {len(task_batch)} tasks (less than limit {effective_limit}), reached end of results") break if not all_tasks: logger.info("No tasks found matching the criteria") return "No tasks found matching the criteria" logger.info(f"Retrieved {len(all_tasks)} tasks total across {pages_fetched} pages") if nmax is None: logger.info("Fetched ALL matching tasks (nmax=None specified)") elif len(all_tasks) == nmax: logger.info(f"Retrieved exactly the requested {nmax} tasks") return json.dumps([task.to_dict() for task in all_tasks], indent=2, default=str) except Exception as error: logger.error(f"Error getting tasks: {error}") return f"Error getting tasks: {str(error)}" def todoist_filter_tasks( ctx: Context, filter: str, lang: Optional[str] = None, nmax: Optional[int] = 100, limit: int = 200 ) -> str: """Get tasks using Todoist's natural language filter This uses the new filter_tasks method for queries like 'today', 'overdue', 'priority 1', etc. Args: filter: Natural language filter like 'today', 'tomorrow', 'next week', 'priority 1', 'overdue' lang: Language for task content (e.g., 'en') (optional) nmax: Maximum total number of tasks to return. Set to None for ALL matching tasks (default: 100) limit: Number of tasks to fetch per API request (default: 200, max: 200) """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Filtering tasks with filter: '{filter}', lang: {lang}, nmax: {nmax}, limit: {limit}") # Early exit for zero requests to avoid unnecessary API calls if nmax is not None: if nmax == 0: logger.info("nmax=0 specified, returning empty result") return [] elif nmax < 0: logger.warning(f"Invalid nmax {nmax}, using default of 100") nmax = 100 if limit > 200: logger.warning(f"Limit {limit} exceeds API maximum of 200, using 200 instead") limit = 200 elif limit <= 0: logger.warning(f"Invalid limit {limit}, using default of 200") limit = 200 # Key optimization: match page size to actual need to reduce API payload effective_limit = limit if nmax is not None and nmax < limit: effective_limit = nmax logger.info(f"Optimized limit from {limit} to {effective_limit} to match nmax") params = {"query": filter} if lang: params["lang"] = lang params["limit"] = effective_limit tasks_iterator = todoist_client.filter_tasks(**params) all_tasks = [] pages_fetched = 0 for task_batch in tasks_iterator: pages_fetched += 1 all_tasks.extend(task_batch) logger.info(f"Fetched page {pages_fetched} with {len(task_batch)} tasks (total: {len(all_tasks)})") if nmax is not None and len(all_tasks) >= nmax: # Trim excess - rare due to effective_limit optimization, but handles edge cases all_tasks = all_tasks[:nmax] logger.info(f"Reached nmax of {nmax} tasks, stopping pagination") break # Todoist API signals end of results by returning fewer items than requested if len(task_batch) < effective_limit: logger.info(f"Received {len(task_batch)} tasks (less than limit {effective_limit}), reached end of results") break if not all_tasks: logger.info("No tasks found matching the filter") return "No tasks found matching the filter" logger.info(f"Retrieved {len(all_tasks)} tasks total across {pages_fetched} pages") if nmax is None: logger.info("Fetched ALL matching tasks (nmax=None specified)") elif len(all_tasks) == nmax: logger.info(f"Retrieved exactly the requested {nmax} tasks") return json.dumps([task.to_dict() for task in all_tasks], indent=2, default=str) except Exception as error: logger.error(f"Error filtering tasks: {error}") return f"Error filtering tasks: {str(error)}" def todoist_get_task(ctx: Context, task_id: str) -> str: """Get an active task from Todoist Args: task_id: ID of the task to retrieve """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Getting task with ID: {task_id}") task = todoist_client.get_task(task_id=task_id) if not task: logger.info(f"No task found with ID: {task_id}") return f"No task found with ID: {task_id}" logger.info(f"Retrieved task: {task.id}") return json.dumps(task.to_dict(), indent=2, default=str) except Exception as error: logger.error(f"Error getting task: {error}") return f"Error getting task: {str(error)}" def todoist_update_task( ctx: Context, task_id: str, content: Optional[str] = None, description: Optional[str] = None, labels: Optional[list[str]] = None, priority: Optional[int] = None, due_string: Optional[str] = None, due_date: Optional[str] = None, due_datetime: Optional[str] = None, due_lang: Optional[str] = None, assignee_id: Optional[str] = None, duration: Optional[int] = None, duration_unit: Optional[str] = None, deadline_date: Optional[str] = None, deadline_lang: Optional[str] = None ) -> str: """Update an existing task in Todoist Args: task_id: ID of the task to update content: New content/title for the task (optional) description: New description for the task (optional) labels: New labels for the task (optional) priority: New priority level from 1 (normal) to 4 (urgent) (optional) due_string: New due date in natural language like 'tomorrow', 'next Monday' (optional) due_date: New specific date in YYYY-MM-DD format (optional) due_datetime: New specific date and time in RFC3339 format in UTC (optional) due_lang: 2-letter code specifying language in case due_string is not written in English (optional) assignee_id: The responsible user ID or null to unset (for shared tasks) (optional) duration: A positive integer for the amount of duration_unit the task will take (optional) duration_unit: The unit of time that the duration field represents (minute or day) (optional) deadline_date: Specific date in YYYY-MM-DD format relative to user's timezone (optional) deadline_lang: 2-letter code specifying language of deadline (optional) """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Updating task with ID: {task_id}") # Verify task exists before attempting update to provide better error messages try: task = todoist_client.get_task(task_id=task_id) original_content = task.content except Exception as error: logger.warning(f"Error getting task with ID: {task_id}: {error}") return f"Could not verify task with ID: {task_id}. Update aborted." update_data = {} # Apply same parameter filtering strategy as create optional_params = { "content": content, "description": description, "labels": labels, "due_string": due_string, "due_lang": due_lang, "assignee_id": assignee_id, "deadline_lang": deadline_lang, } for key, value in optional_params.items(): if value is not None: update_data[key] = value # Apply same date transformation logic as create for consistency if due_date is not None: from datetime import date if isinstance(due_date, str): update_data["due_date"] = date.fromisoformat(due_date) else: update_data["due_date"] = due_date if due_datetime is not None: from datetime import datetime if isinstance(due_datetime, str): if due_datetime.endswith('Z'): due_datetime = due_datetime[:-1] + '+00:00' update_data["due_datetime"] = datetime.fromisoformat(due_datetime) else: update_data["due_datetime"] = due_datetime if deadline_date is not None: from datetime import date if isinstance(deadline_date, str): update_data["deadline_date"] = date.fromisoformat(deadline_date) else: update_data["deadline_date"] = deadline_date if priority is not None and 1 <= priority <= 4: update_data["priority"] = priority if duration is not None and duration_unit is not None: if duration > 0 and duration_unit in ["minute", "day"]: update_data["duration"] = duration update_data["duration_unit"] = duration_unit else: logger.warning("Invalid duration parameters: duration must be > 0 and unit must be 'minute' or 'day'") if len(update_data) == 0: return f"No update parameters provided for task: {original_content} (ID: {task_id})" updated_task = todoist_client.update_task(task_id, **update_data) logger.info(f"Task updated successfully: {task_id}") return json.dumps(updated_task.to_dict(), indent=2, default=str) except Exception as error: logger.error(f"Error updating task: {error}") return f"Error updating task: {str(error)}" def todoist_complete_task(ctx: Context, task_id: str) -> str: """Close a task in Todoist (i.e., mark the task as complete) Args: task_id: ID of the task to close """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Closing task with ID: {task_id}") # Pre-fetch task content for meaningful success messages try: task = todoist_client.get_task(task_id=task_id) task_content = task.content except Exception as error: logger.warning(f"Error getting task with ID: {task_id}: {error}") return f"Could not verify task with ID: {task_id}. Task closing aborted." is_success = todoist_client.complete_task(task_id=task_id) logger.info(f"Task closed successfully: {task_id}") return f"Successfully closed task: {task_content} (ID: {task_id})" except Exception as error: logger.error(f"Error closing task: {error}") return f"Error closing task: {str(error)}" def todoist_uncomplete_task(ctx: Context, task_id: str) -> str: """Reopen a task in Todoist (i.e., mark the task as incomplete) Args: task_id: ID of the task to reopen """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Reopening task with ID: {task_id}") try: task = todoist_client.get_task(task_id=task_id) task_content = task.content except Exception as error: logger.warning(f"Error getting task with ID: {task_id}: {error}") return f"Could not verify task with ID: {task_id}. Task reopening aborted." is_success = todoist_client.uncomplete_task(task_id=task_id) logger.info(f"Task reopened successfully: {task_id}") return f"Successfully reopened task: {task_content} (ID: {task_id})" except Exception as error: logger.error(f"Error reopening task: {error}") return f"Error reopening task: {str(error)}" def todoist_move_task( ctx: Context, task_id: str, parent_id: Optional[str] = None, section_id: Optional[str] = None, project_id: Optional[str] = None ) -> str: """Move a task to a different location Args: task_id: ID of the task to move parent_id: ID of the destination parent task (optional) section_id: ID of the destination section (optional) project_id: ID of the destination project (optional) Note: Only one of parent_id, section_id or project_id must be set. """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Moving task with ID: {task_id}") try: task = todoist_client.get_task(task_id=task_id) task_content = task.content except Exception as error: logger.warning(f"Error getting task with ID: {task_id}: {error}") return f"Could not verify task with ID: {task_id}. Task move aborted." # Validate exclusive destination constraint - API requirement destination_count = sum(1 for x in [parent_id, section_id, project_id] if x is not None) if destination_count != 1: return "Error: Exactly one of parent_id, section_id, or project_id must be specified" is_success = todoist_client.move_task( task_id=task_id, parent_id=parent_id, section_id=section_id, project_id=project_id ) if is_success: logger.info(f"Task moved successfully: {task_id}") return f"Successfully moved task: {task_content} (ID: {task_id})" else: error_msg = "Failed to move task" logger.error(error_msg) return error_msg except Exception as error: logger.error(f"Error moving task: {error}") return f"Error moving task: {str(error)}" def todoist_delete_task(ctx: Context, task_id: str) -> str: """Delete a task from Todoist Args: task_id: ID of the task to delete """ todoist_client = ctx.request_context.lifespan_context.todoist_client try: logger.info(f"Deleting task with ID: {task_id}") try: task = todoist_client.get_task(task_id=task_id) task_content = task.content except Exception as error: logger.warning(f"Error getting task with ID: {task_id}: {error}") return f"Could not verify task with ID: {task_id}. Deletion aborted." is_success = todoist_client.delete_task(task_id=task_id) logger.info(f"Task deleted successfully: {task_id}") return f"Successfully deleted task: {task_content} (ID: {task_id})" except Exception as error: logger.error(f"Error deleting task: {error}") return f"Error deleting task: {str(error)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mikemc/todoist-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server