Skip to main content
Glama

Toggl MCP Server

by ikido
server.py13.7 kB
"""FastMCP server for Toggl time tracking data aggregation.""" from fastmcp import FastMCP from typing import Annotated, Optional from pydantic import Field import asyncio import logging from pathlib import Path from .config import Config from .services import CacheService, TogglService, ParserService, AggregatorService from .utils import validate_date_range, log_info, log_error # Create logs directory if it doesn't exist # Use absolute path relative to project root project_root = Path(__file__).parent.parent.parent logs_dir = project_root / "logs" logs_dir.mkdir(exist_ok=True) log_file = logs_dir / "toggl_mcp.log" # Configure logging with both stderr and file output # Use stderr to avoid interfering with FastMCP's STDIO protocol import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', force=True, handlers=[ logging.StreamHandler(sys.stderr), # Log to stderr logging.FileHandler(log_file) # Also log to file ] ) logger = logging.getLogger(__name__) logger.info("=" * 80) logger.info("Toggl MCP Server starting up") logger.info(f"Logging to: {log_file}") logger.info("=" * 80) # Create MCP server instance mcp = FastMCP( name="Toggl MCP Server", instructions="Time tracking data fetcher and aggregator using Toggl API" ) # Initialize services cache_service: Optional[CacheService] = None toggl_service: Optional[TogglService] = None parser_service: Optional[ParserService] = None aggregator_service: Optional[AggregatorService] = None initialization_error: Optional[str] = None def _initialize_services(): """Initialize services with timeout protection.""" global cache_service, toggl_service, parser_service, aggregator_service, initialization_error try: logger.info("Starting service initialization...") Config.validate() logger.info("✓ Configuration validated") logger.info(f" - API Token: {Config.TOGGL_API_TOKEN[:8]}...") logger.info(f" - Workspace ID: {Config.TOGGL_WORKSPACE_ID}") logger.info(f" - Cache Dir: {Config.CACHE_DIR}") logger.info(f" - Cache TTL: {Config.CACHE_TTL_HOURS} hours") cache_service = CacheService( cache_dir=Config.CACHE_DIR, ttl_hours=Config.CACHE_TTL_HOURS ) logger.info("✓ CacheService initialized (synchronous, ~0ms)") toggl_service = TogglService( api_token=Config.TOGGL_API_TOKEN, workspace_id=Config.TOGGL_WORKSPACE_ID ) logger.info("✓ TogglService initialized (synchronous, ~0ms)") parser_service = ParserService() logger.info("✓ ParserService initialized (synchronous, ~0ms)") aggregator_service = AggregatorService() logger.info("✓ AggregatorService initialized (synchronous, ~0ms)") logger.info("=" * 80) logger.info("✓✓ All services initialized successfully in <1s!") logger.info("=" * 80) initialization_error = None except ValueError as e: error_msg = f"Configuration validation error: {e}" logger.error("=" * 80) logger.error(f"✗ {error_msg}") logger.error("=" * 80) initialization_error = error_msg raise except Exception as e: error_msg = f"Failed to initialize services: {e}" logger.error("=" * 80) logger.error(f"✗ {error_msg}") logger.error("=" * 80) import traceback logger.error(traceback.format_exc()) initialization_error = error_msg raise @mcp.tool( name="health_check", description=""" Check the health and connection status of the Toggl MCP server. Returns: - status: "healthy" if all services are initialized, "unhealthy" otherwise - services: Status of each service - error: Any initialization errors encountered - configuration: Masked configuration details for debugging Use this tool to diagnose connection issues before calling other tools. """, ) async def health_check() -> dict: """Check MCP server health and connection status.""" try: logger.info("Health check requested") # Try to initialize if not done if not toggl_service and not initialization_error: try: logger.info("Services not initialized, attempting initialization...") _initialize_services() except Exception as e: logger.error(f"Health check: initialization failed with: {e}") # Build status response status = { "status": "healthy" if toggl_service else "unhealthy", "timestamp": str(asyncio.get_event_loop().time()), "services": { "cache_service": "initialized" if cache_service else "not initialized", "toggl_service": "initialized" if toggl_service else "not initialized", "parser_service": "initialized" if parser_service else "not initialized", "aggregator_service": "initialized" if aggregator_service else "not initialized", }, "configuration": { "api_token": Config.TOGGL_API_TOKEN[:8] + "..." if Config.TOGGL_API_TOKEN else "NOT SET", "workspace_id": Config.TOGGL_WORKSPACE_ID, "cache_dir": Config.CACHE_DIR, }, } if initialization_error: status["initialization_error"] = initialization_error # If Toggl service exists, note that it's ready if toggl_service: logger.info("Health check: Toggl service is ready") status["toggl_api"] = "ready" logger.info(f"Health check result: {status['status']}") return { "status": "success", "data": status, "error": None, } except Exception as e: logger.error(f"Health check error: {e}") return { "status": "error", "data": None, "error": { "code": "HEALTH_CHECK_FAILED", "message": str(e), }, } @mcp.tool( name="get_workspace_users", description=""" Retrieve a list of all users in the Toggl workspace. Use this tool when you need to: - Discover available users for filtering - Get user IDs and emails for aggregation - Build user lists for batch operations Returns array of user objects with id, email, and name. """, ) async def get_workspace_users() -> dict: """List all users in the Toggl workspace.""" try: logger.info("=" * 80) logger.info("get_workspace_users() called") # Initialize services if not already done if not toggl_service: logger.info("Services not initialized, initializing now...") _initialize_services() if not toggl_service: error_msg = "Toggl service not initialized" logger.error(f"✗ {error_msg}") return { "status": "error", "data": None, "error": {"code": "SERVICE_NOT_INITIALIZED", "message": error_msg} } logger.info("Fetching workspace users...") try: # Add timeout to prevent hanging indefinitely users = await asyncio.wait_for( toggl_service.get_workspace_users(), timeout=20.0 # 20 seconds max for this operation ) logger.info(f"✓ Successfully fetched {len(users)} users") except asyncio.TimeoutError: error_msg = "Request timed out after 20 seconds while fetching users" logger.error(f"✗ {error_msg}") return { "status": "error", "data": None, "error": {"code": "TIMEOUT", "message": error_msg} } return { "status": "success", "data": users, "error": None } except Exception as e: logger.error("=" * 80) logger.error(f"✗ Error fetching users: {e}") logger.error("=" * 80) import traceback logger.error(traceback.format_exc()) return { "status": "error", "data": None, "error": {"code": "API_ERROR", "message": str(e)} } @mcp.tool( name="get_toggl_aggregated_data", description=""" Fetch and aggregate Toggl time tracking data for a given date range. Use this tool when you need to: - Get time entry summaries for a specific period - Analyze matched Fibery entity references (#ID [DB] [TYPE]) - Calculate total hours by user and project - Generate time tracking reports The tool automatically: - Parses descriptions to extract Fibery entity references - Groups entries by user and entity - Aggregates durations and calculates hours - Caches results for 1 hour Max date range: 7 days per request (due to API limits). For larger ranges, make multiple requests and aggregate client-side. """, ) async def get_toggl_aggregated_data( start_date: Annotated[str, Field( description="Start date in ISO 8601 format (YYYY-MM-DD). Example: '2025-10-06'", pattern=r'^\d{4}-\d{2}-\d{2}$' )], end_date: Annotated[str, Field( description="End date in ISO 8601 format (YYYY-MM-DD). Must be >= start_date and <= start_date + 7 days. Example: '2025-10-13'", pattern=r'^\d{4}-\d{2}-\d{2}$' )], user_id: Annotated[Optional[str], Field( description="Optional: Filter to single user by ID. If omitted, returns all users. Example: '12345'", default=None )] = None ) -> dict: """Fetch and aggregate Toggl data for all users or single user.""" try: # Initialize services if not already done if not toggl_service: _initialize_services() # Check services if not all([cache_service, toggl_service, parser_service, aggregator_service]): return { "status": "error", "data": None, "error": {"code": "SERVICE_NOT_INITIALIZED", "message": "Services not initialized"} } # 1. Validate dates validation_error = validate_date_range(start_date, end_date) if validation_error: return { "status": "error", "data": None, "error": validation_error } # 2. Try cache log_info(f"Checking cache for {start_date} to {end_date}") cached_data = cache_service.get(start_date, end_date, user_id) if cached_data: log_info("Cache hit") return { "status": "success", "data": cached_data, "error": None, "metadata": {"source": "cache"} } log_info("Cache miss, fetching from API") # 3. Fetch time entries with timeout try: time_entries = await asyncio.wait_for( toggl_service.get_time_entries( start_date, end_date, user_id=user_id ), timeout=40.0 # 40 seconds max (allows for retries with backoff) ) except asyncio.TimeoutError: error_msg = f"Request timed out after 40 seconds fetching entries for {start_date} to {end_date}" log_error(error_msg) return { "status": "error", "data": None, "error": {"code": "TIMEOUT", "message": error_msg} } log_info(f"Fetched {len(time_entries)} time entries") # 4. Parse descriptions parsed_entries = [ parser_service.parse_description(e["description"]) for e in time_entries ] # 5. Aggregate aggregated = aggregator_service.aggregate(parsed_entries, time_entries) log_info(f"Aggregated {len(aggregated.get('users', {}))} users from entries") # 6. Build response response_data = { "users": aggregated["users"], "statistics": aggregated["statistics"] } log_info(f"Response includes {len(response_data['users'])} users and statistics") # 7. Cache cache_service.set(start_date, end_date, response_data, user_id) log_info("Data cached") log_info(f"Returning success response with {len(response_data['users'])} users") return { "status": "success", "data": response_data, "error": None, "metadata": { "source": "api", "entries_fetched": len(time_entries) } } except Exception as e: log_error(f"Error in get_toggl_aggregated_data: {e}") import traceback traceback.print_exc() return { "status": "error", "data": None, "error": { "code": "INTERNAL_ERROR", "message": str(e) } } if __name__ == "__main__": logger.info("=" * 80) logger.info("Starting Toggl MCP Server") logger.info("=" * 80) try: mcp.run() except KeyboardInterrupt: logger.info("Shutdown requested by user") except Exception as e: logger.error("=" * 80) logger.error(f"✗ Fatal error: {e}") logger.error("=" * 80) import traceback logger.error(traceback.format_exc()) raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ikido/toggl-mcp-custom'

If you have feedback or need assistance with the MCP directory API, please join our Discord server