Skip to main content
Glama

MCP Google Contacts Server

by RayanZaki
google_contacts_service.py22.5 kB
import json import os from typing import Dict, List, Optional, Union, Any from pathlib import Path from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from google.auth.transport.requests import Request from googleapiclient.errors import HttpError from mcp_google_contacts_server.config import config class GoogleContactsError(Exception): """Exception raised for errors in the Google Contacts service.""" pass class GoogleContactsService: """Service to interact with Google Contacts API.""" def __init__(self, credentials_info: Optional[Dict[str, Any]] = None, token_path: Optional[Path] = None): """Initialize the Google Contacts service with credentials info. Args: credentials_info: OAuth client credentials information token_path: Path to store the token file """ self.credentials_info = credentials_info self.token_path = token_path or config.token_path self.service = self._authenticate() @classmethod def from_file(cls, credentials_path: Union[str, Path], token_path: Optional[Path] = None) -> 'GoogleContactsService': """Create service instance from a credentials file. Args: credentials_path: Path to the credentials.json file token_path: Optional custom path to store the token Returns: Configured GoogleContactsService instance Raises: GoogleContactsError: If credentials file cannot be read """ try: # Load the credentials from the provided file with open(credentials_path, 'r') as file: credentials_info = json.load(file) return cls(credentials_info, token_path) except (FileNotFoundError, json.JSONDecodeError, IOError) as e: raise GoogleContactsError(f"Failed to load credentials from {credentials_path}: {str(e)}") @classmethod def from_env(cls, token_path: Optional[Path] = None) -> 'GoogleContactsService': """Create service instance from environment variables. Args: token_path: Optional custom path to store the token Returns: Configured GoogleContactsService instance Raises: GoogleContactsError: If required environment variables are missing """ client_id = os.environ.get("GOOGLE_CLIENT_ID") or config.google_client_id client_secret = os.environ.get("GOOGLE_CLIENT_SECRET") or config.google_client_secret if not client_id or not client_secret: raise GoogleContactsError( "Missing Google API credentials. Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET " "environment variables or provide a credentials file." ) # Build credentials info from environment variables credentials_info = { "installed": { "client_id": client_id, "client_secret": client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "redirect_uris": ["urn:ietf:wg:oauth:2.0:oob", "http://localhost"] } } return cls(credentials_info, token_path) def _authenticate(self): """Authenticate with Google using credentials info. Returns: Authenticated Google service client Raises: GoogleContactsError: If authentication fails """ try: creds = None token_path = self.token_path # Ensure token directory exists token_path.parent.mkdir(parents=True, exist_ok=True) # Check if we have existing token if token_path.exists(): with open(token_path, 'r') as token_file: creds = Credentials.from_authorized_user_info( json.load(token_file), config.scopes) # Check for refresh token in environment refresh_token = os.environ.get("GOOGLE_REFRESH_TOKEN") or config.google_refresh_token if not creds and refresh_token and self.credentials_info: client_id = self.credentials_info["installed"]["client_id"] client_secret = self.credentials_info["installed"]["client_secret"] creds = Credentials( None, # No access token initially refresh_token=refresh_token, token_uri="https://oauth2.googleapis.com/token", client_id=client_id, client_secret=client_secret, scopes=config.scopes ) # If credentials don't exist or are invalid, go through auth flow if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: if not self.credentials_info: raise GoogleContactsError( "No valid credentials found and no credentials info provided for authentication." ) flow = InstalledAppFlow.from_client_config( self.credentials_info, config.scopes) creds = flow.run_local_server(port=0) # Save the credentials for future use with open(token_path, 'w') as token: token.write(creds.to_json()) # Output refresh token for environment variable setup if creds.refresh_token: print("\nNew refresh token obtained. Consider setting this in your environment:") print(f"GOOGLE_REFRESH_TOKEN={creds.refresh_token}\n") # Build and return the Google Contacts service return build('people', 'v1', credentials=creds) except Exception as e: raise GoogleContactsError(f"Authentication failed: {str(e)}") def list_contacts(self, name_filter: Optional[str] = None, max_results: int = None) -> List[Dict[str, Any]]: """List contacts, optionally filtering by name. Args: name_filter: Optional filter to find contacts by name max_results: Maximum number of results to return Returns: List of contact dictionaries Raises: GoogleContactsError: If API request fails """ max_results = max_results or config.default_max_results try: # Get list of connections (contacts) results = self.service.people().connections().list( resourceName='people/me', pageSize=max_results, personFields='names,emailAddresses,phoneNumbers', sortOrder='FIRST_NAME_ASCENDING' ).execute() connections = results.get('connections', []) if not connections: return [] contacts = [] for person in connections: names = person.get('names', []) if not names: continue name = names[0] given_name = name.get('givenName', '') family_name = name.get('familyName', '') display_name = name.get('displayName', '') # Apply name filter if provided if name_filter and name_filter.lower() not in display_name.lower(): continue # Get email addresses emails = person.get('emailAddresses', []) email = emails[0].get('value') if emails else None # Get phone numbers phones = person.get('phoneNumbers', []) phone = phones[0].get('value') if phones else None contacts.append({ 'resourceName': person.get('resourceName'), 'givenName': given_name, 'familyName': family_name, 'displayName': display_name, 'email': email, 'phone': phone }) return contacts except HttpError as error: raise GoogleContactsError(f"Error listing contacts: {error}") def get_contact(self, identifier: str, include_email: bool = True, use_directory_api: bool = False) -> Dict[str, Any]: """Get a contact by resource name or email. Args: identifier: Resource name (people/*) or email address include_email: Whether to include email addresses use_directory_api: Whether to try the directory API as well Returns: Contact dictionary Raises: GoogleContactsError: If contact cannot be found or API request fails """ try: if identifier.startswith('people/'): # Determine which API to use based on parameters if use_directory_api: # For directory contacts try: person = self.service.people().people().get( resourceName=identifier, personFields='names,emailAddresses,phoneNumbers,organizations' ).execute() except HttpError: # Fall back to standard contacts API if directory API fails person = self.service.people().get( resourceName=identifier, personFields='names,emailAddresses,phoneNumbers' ).execute() else: # Standard contacts API person = self.service.people().get( resourceName=identifier, personFields='names,emailAddresses,phoneNumbers' ).execute() return self._format_contact(person) else: # Assume it's an email address and search for it contacts = self.list_contacts() for contact in contacts: if contact.get('email') == identifier: return contact # If not found in regular contacts, try directory if use_directory_api: directory_users = self.list_directory_people(query=identifier, max_results=1) if directory_users: return directory_users[0] raise GoogleContactsError(f"Contact with email {identifier} not found") except HttpError as error: raise GoogleContactsError(f"Error getting contact: {error}") def create_contact(self, given_name: str, family_name: Optional[str] = None, email: Optional[str] = None, phone: Optional[str] = None) -> Dict: """Create a new contact.""" try: contact_body = { 'names': [ { 'givenName': given_name, 'familyName': family_name or '' } ] } if email: contact_body['emailAddresses'] = [{'value': email}] if phone: contact_body['phoneNumbers'] = [{'value': phone}] person = self.service.people().createContact( body=contact_body ).execute() return self._format_contact(person) except HttpError as error: raise GoogleContactsError(f"Error creating contact: {error}") def update_contact(self, resource_name: str, given_name: Optional[str] = None, family_name: Optional[str] = None, email: Optional[str] = None, phone: Optional[str] = None) -> Dict: """Update an existing contact.""" try: # Get the etag for the contact first person = self.service.people().get( resourceName=resource_name, personFields='names,emailAddresses,phoneNumbers' ).execute() etag = person.get('etag') # Prepare update masks and body update_person = {'etag': etag, 'resourceName': resource_name} update_fields = [] # Update name if provided if given_name or family_name: current_name = person.get('names', [{}])[0] update_person['names'] = [{ 'givenName': given_name if given_name is not None else current_name.get('givenName', ''), 'familyName': family_name if family_name is not None else current_name.get('familyName', '') }] update_fields.append('names') # Update email if provided if email: update_person['emailAddresses'] = [{'value': email}] update_fields.append('emailAddresses') # Update phone if provided if phone: update_person['phoneNumbers'] = [{'value': phone}] update_fields.append('phoneNumbers') # Execute update if update_fields: updated_person = self.service.people().updateContact( resourceName=resource_name, updatePersonFields=','.join(update_fields), body=update_person ).execute() return self._format_contact(updated_person) else: return self._format_contact(person) except HttpError as error: raise GoogleContactsError(f"Error updating contact: {error}") def delete_contact(self, resource_name: str) -> Dict: """Delete a contact by resource name.""" try: self.service.people().deleteContact( resourceName=resource_name ).execute() return {'success': True, 'resourceName': resource_name} except HttpError as error: raise GoogleContactsError(f"Error deleting contact: {error}") def list_directory_people(self, query: Optional[str] = None, max_results: int = 50) -> List[Dict]: """List people from the Google Workspace directory. Args: query: Optional search query to filter directory results max_results: Maximum number of results to return Returns: List of formatted directory contact dictionaries """ try: # Check if directory API access is available directory_fields = 'names,emailAddresses,organizations,phoneNumbers' # Build the request, with or without a query if query: request = self.service.people().searchDirectoryPeople( query=query, readMask=directory_fields, sources=['DIRECTORY_SOURCE_TYPE_DOMAIN_CONTACT', 'DIRECTORY_SOURCE_TYPE_DOMAIN_PROFILE'], pageSize=max_results ) else: request = self.service.people().listDirectoryPeople( readMask=directory_fields, sources=['DIRECTORY_SOURCE_TYPE_DOMAIN_CONTACT', 'DIRECTORY_SOURCE_TYPE_DOMAIN_PROFILE'], pageSize=max_results ) # Execute the request response = request.execute() print("response; ", response) # Process the results people = response.get('people', []) if not people: return [] # Format each person entry directory_contacts = [] for person in people: contact = self._format_directory_person(person) directory_contacts.append(contact) return directory_contacts except HttpError as error: # Handle gracefully if not a Google Workspace account if error.resp.status == 403: print("Directory API access forbidden. This may not be a Google Workspace account.") return [] raise Exception(f"Error listing directory people: {error}") def search_directory(self, query: str, max_results: int = 20) -> List[Dict]: """Search for people in the Google Workspace directory. This is a more focused search function that uses the searchDirectoryPeople endpoint. Args: query: Search query to find specific users max_results: Maximum number of results to return Returns: List of matching directory contact dictionaries """ try: response = self.service.people().searchDirectoryPeople( query=query, readMask='names,emailAddresses,organizations,phoneNumbers', sources=['DIRECTORY_SOURCE_TYPE_DOMAIN_CONTACT', 'DIRECTORY_SOURCE_TYPE_DOMAIN_PROFILE'], pageSize=max_results ).execute() people = response.get('people', []) if not people: return [] # Format the results directory_results = [] for person in people: contact = self._format_directory_person(person) directory_results.append(contact) return directory_results except HttpError as error: if error.resp.status == 403: print("Directory search access forbidden. This may not be a Google Workspace account.") return [] raise Exception(f"Error searching directory: {error}") def get_other_contacts(self, max_results: int = 100) -> List[Dict]: """Get contacts from the 'Other contacts' section of Google Contacts. These are contacts that the user has interacted with but has not added to their contacts. Args: max_results: Maximum number of results to return Returns: List of other contact dictionaries """ try: response = self.service.otherContacts().list( readMask='names,emailAddresses,phoneNumbers', pageSize=max_results ).execute() other_contacts = response.get('otherContacts', []) if not other_contacts: return [] # Format the results contacts = [] for person in other_contacts: contact = self._format_contact(person) contacts.append(contact) return contacts except HttpError as error: raise Exception(f"Error getting other contacts: {error}") def _format_contact(self, person: Dict) -> Dict: """Format a Google People API person object into a simplified contact.""" names = person.get('names', []) emails = person.get('emailAddresses', []) phones = person.get('phoneNumbers', []) given_name = names[0].get('givenName', '') if names else '' family_name = names[0].get('familyName', '') if names else '' display_name = names[0].get('displayName', '') if names else f"{given_name} {family_name}".strip() return { 'resourceName': person.get('resourceName'), 'givenName': given_name, 'familyName': family_name, 'displayName': display_name, 'email': emails[0].get('value') if emails else None, 'phone': phones[0].get('value') if phones else None } def _format_directory_person(self, person: Dict) -> Dict: """Format a Google Directory API person object into a simplified contact. This handles the specific format of directory contacts which may have different organization and other fields compared to regular contacts. """ names = person.get('names', []) emails = person.get('emailAddresses', []) phones = person.get('phoneNumbers', []) orgs = person.get('organizations', []) given_name = names[0].get('givenName', '') if names else '' family_name = names[0].get('familyName', '') if names else '' display_name = names[0].get('displayName', '') if names else f"{given_name} {family_name}".strip() # Get organization details - these are often present in directory entries department = '' job_title = '' if orgs: department = orgs[0].get('department', '') job_title = orgs[0].get('title', '') return { 'resourceName': person.get('resourceName'), 'givenName': given_name, 'familyName': family_name, 'displayName': display_name, 'email': emails[0].get('value') if emails else None, 'phone': phones[0].get('value') if phones else None, 'department': department, 'jobTitle': job_title }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/RayanZaki/mcp-google-contacts-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server