Things MCP Server
by excelsier
- src
- things_mcp
"""
Utility classes and functions for enhancing Things MCP reliability.
"""
import json
import time
import logging
import os
import platform
import subprocess
import urllib.parse
import mcp.types as types
from typing import Dict, Any, Optional, Callable, List, Union
logger = logging.getLogger(__name__)
def is_things_running() -> bool:
"""Check if Things app is running.
Returns:
bool: True if Things is running, False otherwise
"""
try:
if platform.system() != 'Darwin':
logger.warning("Things availability check only works on macOS")
return True
result = subprocess.run(
['osascript', '-e', 'tell application "System Events" to (name of processes) contains "Things3"'],
capture_output=True,
text=True,
check=False
)
is_running = result.stdout.strip().lower() == 'true'
# If Things is running, also check if it's responsive
if is_running:
# Simple ping to see if Things responds
ping_result = subprocess.run(
['osascript', '-e', 'tell application "Things3" to return name'],
capture_output=True,
text=True,
check=False,
timeout=2 # 2 second timeout
)
is_responsive = ping_result.returncode == 0
if not is_responsive:
logger.warning("Things is running but not responsive")
return False
return is_running
except subprocess.TimeoutExpired:
logger.warning("Things app check timed out")
return False
except Exception as e:
logger.error(f"Error checking if Things is running: {str(e)}")
return False
class ThingsAppState:
"""Track and manage Things app state"""
def __init__(self):
self.is_available = False
self.last_check_time = 0
self.check_interval = 5 # seconds
self.update_app_state()
def update_app_state(self):
"""Update app availability state"""
current_time = time.time()
if current_time - self.last_check_time > self.check_interval:
self.is_available = is_things_running()
self.last_check_time = current_time
return self.is_available
def wait_for_app_availability(self, timeout=10):
"""Wait for app to become available within timeout"""
start_time = time.time()
while time.time() - start_time < timeout:
if self.update_app_state():
return True
time.sleep(0.5)
return False
def validate_tool_registration(tools: list[types.Tool]) -> bool:
"""
Validate that all required Things MCP tools are properly registered.
Args:
tools: List of registered tools
Returns:
bool: True if all required tools are registered, False otherwise
"""
required_tool_names = [
"get-inbox", "get-today", "get-upcoming", "get-anytime",
"get-someday", "get-logbook", "get-trash", "get-todos",
"get-projects", "get-areas", "get-tags", "get-tagged-items",
"search-todos", "search-advanced", "get-recent", "add-todo",
"search-items", "add-project", "update-todo", "update-project", "show-item"
]
registered_tool_names = [tool.name for tool in tools]
# Check if all required tools are registered
missing_tools = [name for name in required_tool_names if name not in registered_tool_names]
if missing_tools:
logger.error(f"Missing required tool registrations: {missing_tools}")
return False
# Check if all registered tools have proper descriptions and parameters
for tool in tools:
if not tool.description or len(tool.description) < 10:
logger.warning(f"Tool '{tool.name}' has an insufficient description")
# Basic parameter validation could be added here
# This would depend on your tool schema requirements
return True
class CircuitBreaker:
"""Circuit breaker to prevent repeated failed attempts"""
# Circuit states
CLOSED = "closed" # Normal operation
OPEN = "open" # Not allowing operations
HALF_OPEN = "half-open" # Testing if system has recovered
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = 0
def record_failure(self):
"""Record a failure and potentially open the circuit"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
logger.warning(f"Circuit breaker opened after {self.failure_count} failures")
def record_success(self):
"""Record a success and reset the circuit if in half-open state"""
if self.state == self.HALF_OPEN:
self.state = self.CLOSED
self.failure_count = 0
logger.info("Circuit breaker closed after successful operation")
elif self.state == self.CLOSED:
self.failure_count = 0
def allow_operation(self):
"""Check if operation should be allowed"""
if self.state == self.CLOSED:
return True
if self.state == self.OPEN:
# Check if recovery timeout has elapsed
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = self.HALF_OPEN
logger.info("Circuit breaker half-open, testing system recovery")
return True
return False
# Half-open state allows a single test operation
return True
class DeadLetterQueue:
"""Store persistently failed operations for manual review"""
def __init__(self, dlq_file="things_dlq.json"):
self.dlq_file = dlq_file
self.queue = self._load_queue()
def _load_queue(self):
"""Load persisted queue"""
try:
if os.path.exists(self.dlq_file):
with open(self.dlq_file, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading DLQ: {str(e)}")
return []
def _save_queue(self):
"""Persist queue to disk"""
try:
with open(self.dlq_file, 'w') as f:
json.dump(self.queue, f)
except Exception as e:
logger.error(f"Error saving DLQ: {str(e)}")
def add_failed_operation(self, operation, params, error, attempts=1):
"""Add failed operation to dead letter queue"""
entry = {
"operation": operation,
"params": params,
"error": str(error),
"attempts": attempts,
"timestamp": time.time(),
"added_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
self.queue.append(entry)
self._save_queue()
logger.warning(f"Added to DLQ: {operation} with error: {str(error)}")
def retry_all(self):
"""Attempt to retry all operations in the DLQ"""
if not self.queue:
return {"success": True, "retried": 0, "failed": 0}
from url_scheme import construct_url, execute_url
from handlers import retry_operation
success_count = 0
failure_count = 0
remaining_queue = []
for entry in self.queue:
try:
url = construct_url(entry["operation"], entry["params"])
result = retry_operation(lambda: execute_url(url))
if result:
success_count += 1
else:
entry["attempts"] += 1
remaining_queue.append(entry)
failure_count += 1
except Exception:
entry["attempts"] += 1
remaining_queue.append(entry)
failure_count += 1
self.queue = remaining_queue
self._save_queue()
return {
"success": failure_count == 0,
"retried": success_count + failure_count,
"failed": failure_count
}
class RateLimiter:
"""Intelligent rate limiter for Things operations"""
def __init__(self, operations_per_minute=30):
self.operations_per_minute = operations_per_minute
self.operation_interval = 60 / operations_per_minute
self.last_operation_time = 0
def wait_if_needed(self):
"""Wait if necessary to maintain rate limit"""
current_time = time.time()
time_since_last = current_time - self.last_operation_time
if time_since_last < self.operation_interval:
# Need to wait
wait_time = self.operation_interval - time_since_last
time.sleep(wait_time)
self.last_operation_time = time.time()
def __call__(self, func):
"""Decorator to rate limit a function"""
def wrapper(*args, **kwargs):
self.wait_if_needed()
return func(*args, **kwargs)
return wrapper
def get_auth_token() -> Optional[str]:
"""Get the Things authentication token from various possible sources.
The function tries to get the token from:
1. Environment variable THINGS_AUTH_TOKEN
2. Local config file
3. Hardcoded fallback value
Returns:
str: Authentication token if found, None otherwise
"""
# Try environment variable first
token = os.environ.get('THINGS_AUTH_TOKEN')
if token:
logger.info("Using Things authentication token from environment variable")
return token
# Try local config file
try:
config_path = os.path.expanduser("~/.things_config.json")
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
if 'auth_token' in config and config['auth_token']:
logger.info("Using Things authentication token from config file")
return config['auth_token']
except Exception as e:
logger.warning(f"Failed to read auth token from config file: {str(e)}")
# No token found from dynamic sources
logger.warning("No Things authentication token found in environment or config")
return None
def detect_things_version():
"""Detect installed Things version"""
try:
# Use AppleScript to get Things version
script = 'tell application "Things3" to return version'
result = subprocess.run(
['osascript', '-e', script],
capture_output=True,
text=True,
check=False
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception as e:
logger.error(f"Error detecting Things version: {str(e)}")
return None
def validate_tool_registration(tool_list):
"""Validate that all tools are properly registered"""
required_tools = [
"get-inbox", "get-today", "get-upcoming", "get-anytime",
"get-someday", "get-logbook", "get-trash", "get-todos",
"get-projects", "get-areas", "get-tags", "get-tagged-items",
"search-todos", "search-advanced", "get-recent", "add-todo",
"add-project", "update-todo", "update-project", "show-item"
]
tool_names = [t.name for t in tool_list]
missing_tools = [tool for tool in required_tools if tool not in tool_names]
if missing_tools:
logger.error(f"Missing tool registrations: {missing_tools}")
return False
logger.info(f"All {len(tool_list)} tools are properly registered")
return True
# Create global instances
app_state = ThingsAppState()
circuit_breaker = CircuitBreaker()
dead_letter_queue = DeadLetterQueue()
rate_limiter = RateLimiter()