server.py•13.7 kB
"""FastMCP server for Toggl time tracking data aggregation."""
from fastmcp import FastMCP
from typing import Annotated, Optional
from pydantic import Field
import asyncio
import logging
from pathlib import Path
from .config import Config
from .services import CacheService, TogglService, ParserService, AggregatorService
from .utils import validate_date_range, log_info, log_error
# Create logs directory if it doesn't exist
# Use absolute path relative to project root
project_root = Path(__file__).parent.parent.parent
logs_dir = project_root / "logs"
logs_dir.mkdir(exist_ok=True)
log_file = logs_dir / "toggl_mcp.log"
# Configure logging with both stderr and file output
# Use stderr to avoid interfering with FastMCP's STDIO protocol
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
force=True,
handlers=[
logging.StreamHandler(sys.stderr), # Log to stderr
logging.FileHandler(log_file) # Also log to file
]
)
logger = logging.getLogger(__name__)
logger.info("=" * 80)
logger.info("Toggl MCP Server starting up")
logger.info(f"Logging to: {log_file}")
logger.info("=" * 80)
# Create MCP server instance
mcp = FastMCP(
name="Toggl MCP Server",
instructions="Time tracking data fetcher and aggregator using Toggl API"
)
# Initialize services
cache_service: Optional[CacheService] = None
toggl_service: Optional[TogglService] = None
parser_service: Optional[ParserService] = None
aggregator_service: Optional[AggregatorService] = None
initialization_error: Optional[str] = None
def _initialize_services():
"""Initialize services with timeout protection."""
global cache_service, toggl_service, parser_service, aggregator_service, initialization_error
try:
logger.info("Starting service initialization...")
Config.validate()
logger.info("✓ Configuration validated")
logger.info(f" - API Token: {Config.TOGGL_API_TOKEN[:8]}...")
logger.info(f" - Workspace ID: {Config.TOGGL_WORKSPACE_ID}")
logger.info(f" - Cache Dir: {Config.CACHE_DIR}")
logger.info(f" - Cache TTL: {Config.CACHE_TTL_HOURS} hours")
cache_service = CacheService(
cache_dir=Config.CACHE_DIR,
ttl_hours=Config.CACHE_TTL_HOURS
)
logger.info("✓ CacheService initialized (synchronous, ~0ms)")
toggl_service = TogglService(
api_token=Config.TOGGL_API_TOKEN,
workspace_id=Config.TOGGL_WORKSPACE_ID
)
logger.info("✓ TogglService initialized (synchronous, ~0ms)")
parser_service = ParserService()
logger.info("✓ ParserService initialized (synchronous, ~0ms)")
aggregator_service = AggregatorService()
logger.info("✓ AggregatorService initialized (synchronous, ~0ms)")
logger.info("=" * 80)
logger.info("✓✓ All services initialized successfully in <1s!")
logger.info("=" * 80)
initialization_error = None
except ValueError as e:
error_msg = f"Configuration validation error: {e}"
logger.error("=" * 80)
logger.error(f"✗ {error_msg}")
logger.error("=" * 80)
initialization_error = error_msg
raise
except Exception as e:
error_msg = f"Failed to initialize services: {e}"
logger.error("=" * 80)
logger.error(f"✗ {error_msg}")
logger.error("=" * 80)
import traceback
logger.error(traceback.format_exc())
initialization_error = error_msg
raise
@mcp.tool(
name="health_check",
description="""
Check the health and connection status of the Toggl MCP server.
Returns:
- status: "healthy" if all services are initialized, "unhealthy" otherwise
- services: Status of each service
- error: Any initialization errors encountered
- configuration: Masked configuration details for debugging
Use this tool to diagnose connection issues before calling other tools.
""",
)
async def health_check() -> dict:
"""Check MCP server health and connection status."""
try:
logger.info("Health check requested")
# Try to initialize if not done
if not toggl_service and not initialization_error:
try:
logger.info("Services not initialized, attempting initialization...")
_initialize_services()
except Exception as e:
logger.error(f"Health check: initialization failed with: {e}")
# Build status response
status = {
"status": "healthy" if toggl_service else "unhealthy",
"timestamp": str(asyncio.get_event_loop().time()),
"services": {
"cache_service": "initialized" if cache_service else "not initialized",
"toggl_service": "initialized" if toggl_service else "not initialized",
"parser_service": "initialized" if parser_service else "not initialized",
"aggregator_service": "initialized" if aggregator_service else "not initialized",
},
"configuration": {
"api_token": Config.TOGGL_API_TOKEN[:8] + "..." if Config.TOGGL_API_TOKEN else "NOT SET",
"workspace_id": Config.TOGGL_WORKSPACE_ID,
"cache_dir": Config.CACHE_DIR,
},
}
if initialization_error:
status["initialization_error"] = initialization_error
# If Toggl service exists, note that it's ready
if toggl_service:
logger.info("Health check: Toggl service is ready")
status["toggl_api"] = "ready"
logger.info(f"Health check result: {status['status']}")
return {
"status": "success",
"data": status,
"error": None,
}
except Exception as e:
logger.error(f"Health check error: {e}")
return {
"status": "error",
"data": None,
"error": {
"code": "HEALTH_CHECK_FAILED",
"message": str(e),
},
}
@mcp.tool(
name="get_workspace_users",
description="""
Retrieve a list of all users in the Toggl workspace.
Use this tool when you need to:
- Discover available users for filtering
- Get user IDs and emails for aggregation
- Build user lists for batch operations
Returns array of user objects with id, email, and name.
""",
)
async def get_workspace_users() -> dict:
"""List all users in the Toggl workspace."""
try:
logger.info("=" * 80)
logger.info("get_workspace_users() called")
# Initialize services if not already done
if not toggl_service:
logger.info("Services not initialized, initializing now...")
_initialize_services()
if not toggl_service:
error_msg = "Toggl service not initialized"
logger.error(f"✗ {error_msg}")
return {
"status": "error",
"data": None,
"error": {"code": "SERVICE_NOT_INITIALIZED", "message": error_msg}
}
logger.info("Fetching workspace users...")
try:
# Add timeout to prevent hanging indefinitely
users = await asyncio.wait_for(
toggl_service.get_workspace_users(),
timeout=20.0 # 20 seconds max for this operation
)
logger.info(f"✓ Successfully fetched {len(users)} users")
except asyncio.TimeoutError:
error_msg = "Request timed out after 20 seconds while fetching users"
logger.error(f"✗ {error_msg}")
return {
"status": "error",
"data": None,
"error": {"code": "TIMEOUT", "message": error_msg}
}
return {
"status": "success",
"data": users,
"error": None
}
except Exception as e:
logger.error("=" * 80)
logger.error(f"✗ Error fetching users: {e}")
logger.error("=" * 80)
import traceback
logger.error(traceback.format_exc())
return {
"status": "error",
"data": None,
"error": {"code": "API_ERROR", "message": str(e)}
}
@mcp.tool(
name="get_toggl_aggregated_data",
description="""
Fetch and aggregate Toggl time tracking data for a given date range.
Use this tool when you need to:
- Get time entry summaries for a specific period
- Analyze matched Fibery entity references (#ID [DB] [TYPE])
- Calculate total hours by user and project
- Generate time tracking reports
The tool automatically:
- Parses descriptions to extract Fibery entity references
- Groups entries by user and entity
- Aggregates durations and calculates hours
- Caches results for 1 hour
Max date range: 7 days per request (due to API limits).
For larger ranges, make multiple requests and aggregate client-side.
""",
)
async def get_toggl_aggregated_data(
start_date: Annotated[str, Field(
description="Start date in ISO 8601 format (YYYY-MM-DD). Example: '2025-10-06'",
pattern=r'^\d{4}-\d{2}-\d{2}$'
)],
end_date: Annotated[str, Field(
description="End date in ISO 8601 format (YYYY-MM-DD). Must be >= start_date and <= start_date + 7 days. Example: '2025-10-13'",
pattern=r'^\d{4}-\d{2}-\d{2}$'
)],
user_id: Annotated[Optional[str], Field(
description="Optional: Filter to single user by ID. If omitted, returns all users. Example: '12345'",
default=None
)] = None
) -> dict:
"""Fetch and aggregate Toggl data for all users or single user."""
try:
# Initialize services if not already done
if not toggl_service:
_initialize_services()
# Check services
if not all([cache_service, toggl_service, parser_service, aggregator_service]):
return {
"status": "error",
"data": None,
"error": {"code": "SERVICE_NOT_INITIALIZED", "message": "Services not initialized"}
}
# 1. Validate dates
validation_error = validate_date_range(start_date, end_date)
if validation_error:
return {
"status": "error",
"data": None,
"error": validation_error
}
# 2. Try cache
log_info(f"Checking cache for {start_date} to {end_date}")
cached_data = cache_service.get(start_date, end_date, user_id)
if cached_data:
log_info("Cache hit")
return {
"status": "success",
"data": cached_data,
"error": None,
"metadata": {"source": "cache"}
}
log_info("Cache miss, fetching from API")
# 3. Fetch time entries with timeout
try:
time_entries = await asyncio.wait_for(
toggl_service.get_time_entries(
start_date, end_date,
user_id=user_id
),
timeout=40.0 # 40 seconds max (allows for retries with backoff)
)
except asyncio.TimeoutError:
error_msg = f"Request timed out after 40 seconds fetching entries for {start_date} to {end_date}"
log_error(error_msg)
return {
"status": "error",
"data": None,
"error": {"code": "TIMEOUT", "message": error_msg}
}
log_info(f"Fetched {len(time_entries)} time entries")
# 4. Parse descriptions
parsed_entries = [
parser_service.parse_description(e["description"])
for e in time_entries
]
# 5. Aggregate
aggregated = aggregator_service.aggregate(parsed_entries, time_entries)
log_info(f"Aggregated {len(aggregated.get('users', {}))} users from entries")
# 6. Build response
response_data = {
"users": aggregated["users"],
"statistics": aggregated["statistics"]
}
log_info(f"Response includes {len(response_data['users'])} users and statistics")
# 7. Cache
cache_service.set(start_date, end_date, response_data, user_id)
log_info("Data cached")
log_info(f"Returning success response with {len(response_data['users'])} users")
return {
"status": "success",
"data": response_data,
"error": None,
"metadata": {
"source": "api",
"entries_fetched": len(time_entries)
}
}
except Exception as e:
log_error(f"Error in get_toggl_aggregated_data: {e}")
import traceback
traceback.print_exc()
return {
"status": "error",
"data": None,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
if __name__ == "__main__":
logger.info("=" * 80)
logger.info("Starting Toggl MCP Server")
logger.info("=" * 80)
try:
mcp.run()
except KeyboardInterrupt:
logger.info("Shutdown requested by user")
except Exception as e:
logger.error("=" * 80)
logger.error(f"✗ Fatal error: {e}")
logger.error("=" * 80)
import traceback
logger.error(traceback.format_exc())
raise