tools.py•11.1 kB
"""MCP tools implementation for Google Contacts."""
import asyncio
from typing import Dict, List, Optional, Any, Union
from mcp.server.fastmcp import FastMCP
import traceback
from mcp_google_contacts_server.google_contacts_service import GoogleContactsService, GoogleContactsError
from mcp_google_contacts_server.formatters import format_contact, format_contacts_list, format_directory_people
from mcp_google_contacts_server.config import config
# Global service instance
contacts_service = None
def init_service() -> Optional[GoogleContactsService]:
    """Initialize and return a Google Contacts service instance.
    
    Returns:
        GoogleContactsService instance or None if initialization fails
    """
    global contacts_service
    
    if contacts_service:
        return contacts_service
    
    try:
        # First try environment variables
        try:
            contacts_service = GoogleContactsService.from_env()
            print("Successfully loaded credentials from environment variables.")
            return contacts_service
        except GoogleContactsError:
            pass
            
        # Then try default file locations
        for path in config.credentials_paths:
            if path.exists():
                try:
                    print(f"Found credentials file at {path}")
                    contacts_service = GoogleContactsService.from_file(path)
                    print("Successfully loaded credentials from file.")
                    return contacts_service
                except GoogleContactsError as e:
                    print(f"Error with credentials at {path}: {e}")
                    continue
                
        print("No valid credentials found. Please provide credentials to use Google Contacts.")
        return None
        
    except Exception as e:
        print(f"Error initializing Google Contacts service: {str(e)}")
        traceback.print_exc()
        return None
def register_tools(mcp: FastMCP) -> None:
    """Register all Google Contacts tools with the MCP server.
    
    Args:
        mcp: FastMCP server instance
    """
    
    @mcp.tool()
    async def list_contacts(name_filter: Optional[str] = None, max_results: int = 100) -> str:
        """List all contacts or filter by name.
        
        Args:
            name_filter: Optional filter to find contacts by name
            max_results: Maximum number of results to return (default: 100)
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            contacts = service.list_contacts(name_filter, max_results)
            return format_contacts_list(contacts)
        except Exception as e:
            return f"Error: Failed to list contacts - {str(e)}"
    @mcp.tool()
    async def get_contact(identifier: str) -> str:
        """Get a contact by resource name or email.
        
        Args:
            identifier: Resource name (people/*) or email address of the contact
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            contact = service.get_contact(identifier)
            return format_contact(contact)
        except Exception as e:
            return f"Error: Failed to get contact - {str(e)}"
    @mcp.tool()
    async def create_contact(given_name: str, family_name: Optional[str] = None, 
                           email: Optional[str] = None, phone: Optional[str] = None) -> str:
        """Create a new contact.
        
        Args:
            given_name: First name of the contact
            family_name: Last name of the contact
            email: Email address of the contact
            phone: Phone number of the contact
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            contact = service.create_contact(
                given_name, 
                family_name, 
                email, 
                phone
            )
            return f"Contact created successfully!\n\n{format_contact(contact)}"
        except Exception as e:
            return f"Error: Failed to create contact - {str(e)}"
    @mcp.tool()
    async def update_contact(resource_name: str, given_name: Optional[str] = None, 
                           family_name: Optional[str] = None, email: Optional[str] = None,
                           phone: Optional[str] = None) -> str:
        """Update an existing contact.
        
        Args:
            resource_name: Contact resource name (people/*)
            given_name: Updated first name
            family_name: Updated last name
            email: Updated email address
            phone: Updated phone number
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            contact = service.update_contact(
                resource_name,
                given_name,
                family_name,
                email,
                phone
            )
            return f"Contact updated successfully!\n\n{format_contact(contact)}"
        except Exception as e:
            return f"Error: Failed to update contact - {str(e)}"
    @mcp.tool()
    async def delete_contact(resource_name: str) -> str:
        """Delete a contact by resource name.
        
        Args:
            resource_name: Contact resource name (people/*) to delete
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            result = service.delete_contact(resource_name)
            if result.get('success'):
                return f"Contact {resource_name} deleted successfully."
            else:
                return f"Failed to delete contact: {result.get('message', 'Unknown error')}"
        except Exception as e:
            return f"Error: Failed to delete contact - {str(e)}"
    @mcp.tool()
    async def search_contacts(query: str, max_results: int = 10) -> str:
        """Search contacts by name, email, or phone number.
        
        Args:
            query: Search term to find in contacts
            max_results: Maximum number of results to return (default: 10)
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            # Get all contacts and filter locally with more flexible search
            all_contacts = service.list_contacts(max_results=max(100, max_results*2))
            
            query = query.lower()
            matches = []
            
            for contact in all_contacts:
                if (query in contact.get('displayName', '').lower() or
                    query in contact.get('givenName', '').lower() or
                    query in contact.get('familyName', '').lower() or
                    query in str(contact.get('email', '')).lower() or
                    query in str(contact.get('phone', '')).lower()):
                    matches.append(contact)
                    
                if len(matches) >= max_results:
                    break
                    
            if not matches:
                return f"No contacts found matching '{query}'."
                
            return f"Search results for '{query}':\n\n{format_contacts_list(matches)}"
        except Exception as e:
            return f"Error: Failed to search contacts - {str(e)}"
    @mcp.tool()
    async def list_workspace_users(query: Optional[str] = None, max_results: int = 50) -> str:
        """List Google Workspace users in your organization's directory.
        
        This tool allows you to search and list users in your Google Workspace directory,
        including their email addresses and other information.
        
        Args:
            query: Optional search term to find specific users (name, email, etc.)
            max_results: Maximum number of results to return (default: 50)
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            workspace_users = service.list_directory_people(query=query, max_results=max_results)
            return format_directory_people(workspace_users, query)
        except Exception as e:
            return f"Error: Failed to list Google Workspace users - {str(e)}"
    @mcp.tool()
    async def search_directory(query: str, max_results: int = 20) -> str:
        """Search for people specifically in the Google Workspace directory.
        
        This performs a more targeted search of your organization's directory.
        
        Args:
            query: Search term to find specific directory members
            max_results: Maximum number of results to return (default: 20)
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            results = service.search_directory(query, max_results)
            return format_directory_people(results, query)
        except Exception as e:
            return f"Error: Failed to search directory - {str(e)}"
    @mcp.tool()
    async def get_other_contacts(max_results: int = 50) -> str:
        """Retrieve contacts from the 'Other contacts' section.
        
        Other contacts are people you've interacted with but haven't added to your contacts list.
        These often include email correspondents that aren't in your main contacts.
        
        Args:
            max_results: Maximum number of results to return (default: 50)
        """
        service = init_service()
        if not service:
            return "Error: Google Contacts service is not available. Please check your credentials."
        
        try:
            other_contacts = service.get_other_contacts(max_results)
            
            if not other_contacts:
                return "No 'Other contacts' found in your Google account."
            
            # Count how many have email addresses
            with_email = sum(1 for c in other_contacts if c.get('email'))
            
            # Format and return the results
            formatted_list = format_contacts_list(other_contacts)
            return f"Other Contacts (people you've interacted with but haven't added):\n\n{formatted_list}\n\n{with_email} of these contacts have email addresses."
        except Exception as e:
            return f"Error: Failed to retrieve other contacts - {str(e)}"