Skip to main content
Glama
inspirit941

Kakao Bot MCP Server

by inspirit941
kauth.py15.7 kB
import argparse import logging import os import pydantic import requests import json import time import datetime from oauth2client.client import ( flow_from_clientsecrets, FlowExchangeError, OAuth2Credentials, Credentials, ) # Kakao API endpoints KAKAO_AUTH_URL = "https://kauth.kakao.com/oauth/authorize" KAKAO_TOKEN_URL = "https://kauth.kakao.com/oauth/token" KAKAO_USER_INFO_URL = "https://kapi.kakao.com/v2/user/me" def get_kauth_file() -> str: parser = argparse.ArgumentParser() parser.add_argument( "--kauth-file", type=str, default="./.kauth.json", help="Path to client secrets file", ) args, _ = parser.parse_known_args() return args.kauth_file def get_credentials_dir() -> str: parser = argparse.ArgumentParser() parser.add_argument( "--credentials-dir", type=str, default=".", help="Directory to store OAuth2 credentials", ) args, _ = parser.parse_known_args() return args.credentials_dir def get_accounts_file() -> str: parser = argparse.ArgumentParser() parser.add_argument( "--accounts-file", type=str, default="./.accounts.json", help="Path to accounts configuration file", ) args, _ = parser.parse_known_args() return args.accounts_file CLIENTSECRETS_LOCATION = get_kauth_file() # Configuration - should be moved to environment variables REDIRECT_URI = "http://localhost:8000/code" TOKEN_INFO_URL = "https://kapi.kakao.com/v1/user/access_token_info" SCOPES = ["openid", "profile_nickname", "talk_message", "account_email"] class GetCredentialsException(Exception): """Error raised when an error occurred while retrieving credentials. Attributes: authorization_url: Authorization URL to redirect the user to in order to request offline access. """ def __init__(self, authorization_url): """Construct a GetCredentialsException.""" self.authorization_url = authorization_url class CodeExchangeException(GetCredentialsException): """Error raised when a code exchange has failed.""" class NoRefreshTokenException(GetCredentialsException): """Error raised when no refresh token has been found.""" class NoUserEmailException(Exception): """Error raised when no user ID could be retrieved.""" class TokenRefreshError(Exception): """Error raised when token refresh fails.""" pass class AccountInfo(pydantic.BaseModel): email: str account_type: str extra_info: str def __init__(self, email: str, account_type: str, extra_info: str = ""): super().__init__(email=email, account_type=account_type, extra_info=extra_info) def to_description(self): return f"""Account for email: {self.email} of type: {self.account_type}. Extra info for: {self.extra_info}""" def get_account_info() -> list[AccountInfo]: accounts_file = get_accounts_file() with open(accounts_file) as f: data = json.load(f) accounts = data.get("accounts", []) return [AccountInfo.model_validate(acc) for acc in accounts] def _get_credential_filename(email_address: str) -> str: creds_dir = get_credentials_dir() return os.path.join(creds_dir, f".oauth2.{email_address}.json") def get_authorization_url(email_address: str, state: str): """Retrieve the authorization URL. Args: email_address: User's e-mail address. state: State for the authorization URL. Returns: Authorization URL to redirect the user to. """ flow = flow_from_clientsecrets( CLIENTSECRETS_LOCATION, " ".join(SCOPES), redirect_uri=REDIRECT_URI ) if state != "": flow.params["state"] = state return flow.step1_get_authorize_url(state=state) def get_accounts_file() -> str: parser = argparse.ArgumentParser() parser.add_argument( "--accounts-file", type=str, default="./.accounts.json", help="Path to accounts configuration file", ) args, _ = parser.parse_known_args() return args.accounts_file def get_stored_credentials(email_address: str) -> OAuth2Credentials | None: """Retrieved stored credentials for the provided user ID. Args: email_address: User's email address. Returns: Stored oauth2client.client.OAuth2Credentials if found, None otherwise. """ try: cred_file_path = _get_credential_filename(email_address=email_address) if not os.path.exists(cred_file_path): logging.warning( f"No stored Oauth2 credentials yet at path: {cred_file_path}" ) return None with open(cred_file_path, "r") as f: data = f.read() return Credentials.new_from_json(data) except Exception as e: logging.error(e) return None raise None def store_credentials(credentials: OAuth2Credentials, email_address: str): """Store OAuth 2.0 credentials in the specified directory.""" cred_file_path = _get_credential_filename(email_address=email_address) os.makedirs(os.path.dirname(cred_file_path), exist_ok=True) data = credentials.to_json() with open(cred_file_path, "w") as f: f.write(data) def exchange_code(authorization_code): """Exchange an authorization code for OAuth 2.0 credentials. Args: authorization_code: Authorization code to exchange for OAuth 2.0 credentials. Returns: oauth2client.client.OAuth2Credentials instance. Raises: CodeExchangeException: an error occurred. """ flow = flow_from_clientsecrets(CLIENTSECRETS_LOCATION, " ".join(SCOPES)) flow.redirect_uri = REDIRECT_URI flow.token_info_uri = TOKEN_INFO_URL try: credentials = flow.step2_exchange(authorization_code) return credentials except FlowExchangeError as error: logging.error("An error occurred: %s", error) raise CodeExchangeException(None) def refresh_token(credentials: OAuth2Credentials, email_address: str): """Refresh the access token of the given credentials if expired. Args: credentials: OAuth2Credentials instance with refresh token email_address: User's e-mail address. Returns: Updated OAuth2Credentials instance Raises: TokenRefreshError: If token refresh fails """ if not credentials.refresh_token: logging.error("No refresh token available") raise TokenRefreshError("No refresh token available") if not credentials.access_token_expired: # Token not expired, no need to refresh return credentials try: logging.info("Access token expired, refreshing...") # Get client credentials from file with open(CLIENTSECRETS_LOCATION, "r") as f: client_secret_data = json.load(f) client_id = client_secret_data.get("web", {}).get("client_id") client_secret = client_secret_data.get("web", {}).get("client_secret") if not client_id or not client_secret: raise TokenRefreshError( "Client ID or secret not found in client secrets file" ) # Simple refresh token request refresh_data = { "grant_type": "refresh_token", "client_id": client_id, "client_secret": client_secret, "refresh_token": credentials.refresh_token, } response = requests.post( KAKAO_TOKEN_URL, data=refresh_data, headers={"Content-Type": "application/x-www-form-urlencoded;charset=utf-8"}, ) if response.status_code != 200: logging.error(f"Token refresh failed: {response.status_code}") logging.error(f"Response content: {response.text}") raise TokenRefreshError(f"Token refresh failed: {response.status_code}") token_data = response.json() # Manually update credentials fields credentials.access_token = token_data.get("access_token") credentials.token_type = token_data.get("token_type", "bearer") # Handle id_token (JWT format) parsing id_token_jwt = token_data.get("id_token") if id_token_jwt: # Store the raw JWT credentials.id_token_jwt = id_token_jwt # Parse the JWT payload try: # JWT consists of header.payload.signature # Split by dots and take the second part (payload) payload_base64 = id_token_jwt.split(".")[1] # Add padding for proper base64 decoding if needed padding_needed = len(payload_base64) % 4 if padding_needed: payload_base64 += "=" * (4 - padding_needed) # Base64 decode and parse as JSON import base64 payload_bytes = base64.urlsafe_b64decode(payload_base64) payload_json = payload_bytes.decode("utf-8") payload = json.loads(payload_json) # Store the parsed payload in id_token field credentials.id_token = payload except Exception as e: logging.warning(f"Failed to parse id_token JWT: {str(e)}") # Even if parsing fails, we keep the JWT itself # Update refresh token only if a new one is provided in the response new_refresh_token = token_data.get("refresh_token") if new_refresh_token: credentials.refresh_token = new_refresh_token # Calculate and update token expiry time expires_in = token_data.get("expires_in") if expires_in is not None: # Create a timezone-naive datetime object (no UTC timezone info) # OAuth2Credentials expects a naive datetime for compatibility expiry_time = datetime.datetime.utcnow() + datetime.timedelta( seconds=int(expires_in) ) # Store as a naive datetime to avoid timezone comparison issues credentials.token_expiry = expiry_time # For logging or diagnostic purposes only expiry_str = expiry_time.strftime("%Y-%m-%dT%H:%M:%SZ") logging.info(f"New token expiry set to: {expiry_str}") # Update scopes - only if present in the response scope_string = token_data.get("scope") if scope_string: credentials.scopes = scope_string.split() # Instead of credentials.from_json(token_data), # we manually updated the credentials object above. # Now just store the updated credentials. store_credentials(credentials=credentials, email_address=email_address) logging.info("Successfully refreshed access token") return credentials except Exception as e: logging.error(f"Token refresh failed: {str(e)}") raise TokenRefreshError(f"Error refreshing token: {str(e)}") def get_user_info(credentials: OAuth2Credentials): """Send a request to the UserInfo API to retrieve the user's information. Args: credentials: oauth2client.client.OAuth2Credentials instance to authorize the request. Returns: User information as a dict. """ try: # Check if the access token is expired and refresh if needed if credentials.access_token_expired: logging.info("Access token is expired, refreshing") credentials = refresh_token(credentials, credentials.id_token["email"]) # Make the request with current/refreshed token headers = { "Authorization": f"Bearer {credentials.access_token}", "Content-type": "application/x-www-form-urlencoded;charset=utf-8", } # Make request to Kakao user info API response = requests.get(KAKAO_USER_INFO_URL, headers=headers) # Check if the request was successful if response.status_code != 200: logging.error(f"Error fetching user info: {response.status_code}") logging.error(f"Response content: {response.text}") raise Exception(f"Error fetching user info: {response.status_code}") # Parse JSON response user_info = response.json() # Verify the user has an ID if ( not user_info or "kakao_account" not in user_info or "email" not in user_info["kakao_account"] ): logging.error("No user ID found in response") raise NoUserEmailException() # Store refreshed credentials with user ID if we just refreshed the token email_address = user_info["kakao_account"]["email"] store_credentials(credentials, email_address=email_address) return user_info except TokenRefreshError as e: raise e except Exception as e: logging.error(f"An error occurred retrieving user info: {e}") raise def get_credentials(authorization_code: str, state: str): """Retrieve credentials using the provided authorization code. This function exchanges the authorization code for an access token and queries the UserInfo API to retrieve the user's e-mail address. If a refresh token has been retrieved along with an access token, it is stored in the application database using the user's e-mail address as key. If no refresh token has been retrieved, the function checks in the application database for one and returns it if found or raises a NoRefreshTokenException with the authorization URL to redirect the user to. Args: authorization_code: Authorization code to use to retrieve an access token. state: State to set to the authorization URL in case of error. Returns: oauth2client.client.OAuth2Credentials instance containing an access and refresh token. Raises: CodeExchangeError: Could not exchange the authorization code. NoRefreshTokenException: No refresh token could be retrieved from the available sources. """ email_address = "" try: credentials = exchange_code(authorization_code) import json # Get user info to store with credentials try: user_info = get_user_info(credentials) # If we have user email, use it as the identifier if not ( user_info and "kakao_account" in user_info and "email" in user_info["kakao_account"] ): raise NoUserEmailException() email_address = user_info["kakao_account"]["email"] except Exception as e: logging.warning(f"Failed to get user info: {e}") # Use a timestamp as a fallback for the credential filename raise e store_credentials(credentials, email_address=email_address) return credentials except CodeExchangeException as error: logging.error("An error occurred during code exchange.") # Drive apps should try to retrieve the user and credentials for the current # session. # If none is available, redirect the user to the authorization URL. error.authorization_url = get_authorization_url( email_address=email_address, state=state ) raise error except NoUserEmailException: logging.error("No user email could be retrieved.") # No refresh token has been retrieved. authorization_url = get_authorization_url(email_address=email_address, state=state) raise NoRefreshTokenException(authorization_url)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/inspirit941/kakao-bot-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server