Skip to main content
Glama

Norman Finance MCP Server

Official
get_token.py7.75 kB
#!/usr/bin/env python3 """Get an OAuth token from the Norman MCP server for testing.""" import sys import requests import json import base64 import hashlib import secrets import urllib.parse import webbrowser import http.server import socketserver import threading import time # Server configuration SERVER_HOST = "localhost" SERVER_PORT = 8989 CALLBACK_HOST = f"http://{SERVER_HOST}:{SERVER_PORT}" CALLBACK_PATH = "/callback" CALLBACK_URL = f"{CALLBACK_HOST}{CALLBACK_PATH}" # MCP Server configuration MCP_HOST = "localhost" MCP_PORT = 3001 MCP_URL = f"http://{MCP_HOST}:{MCP_PORT}" # Global variables to store token information access_token = None refresh_token = None token_event = threading.Event() def generate_code_verifier(): """Generate a code verifier for PKCE.""" code_verifier = secrets.token_urlsafe(43) # Generate at least 43 bytes of random data return code_verifier def generate_code_challenge(code_verifier): """Generate a code challenge from code verifier using S256 method.""" code_verifier_bytes = code_verifier.encode('ascii') hash_object = hashlib.sha256(code_verifier_bytes) code_challenge_bytes = hash_object.digest() code_challenge = base64.urlsafe_b64encode(code_challenge_bytes).decode('ascii').rstrip('=') return code_challenge def register_client(): """Register a test client with the server.""" client_id = f"test_client_{secrets.token_hex(4)}" register_url = f"{MCP_URL}/norman/register-client?client_id={client_id}&redirect_uri={urllib.parse.quote(CALLBACK_URL)}" print(f"Registering client with ID: {client_id}") response = requests.get(register_url) if response.status_code != 200: print(f"Failed to register client: {response.text}") sys.exit(1) client_data = response.json() print(f"Client registered successfully:") print(f" Client ID: {client_data['client_id']}") print(f" Client Secret: {client_data['client_secret']}") print(f" Redirect URI: {client_data['redirect_uri']}") return client_data class TokenHandler(http.server.BaseHTTPRequestHandler): """HTTP request handler to receive OAuth callback.""" def do_GET(self): """Handle GET request - this will be the OAuth callback.""" global access_token, refresh_token if not self.path.startswith(CALLBACK_PATH): self.send_response(404) self.end_headers() self.wfile.write(b"Not found") return print(f"Received callback: {self.path}") # Extract the code parameter query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query) code = query_params.get('code', [''])[0] if not code: self.send_response(400) self.end_headers() self.wfile.write(b"No code parameter in callback") return print(f"Received authorization code: {code}") # Exchange code for token client_id = self.server.client_data["client_id"] client_secret = self.server.client_data["client_secret"] code_verifier = self.server.code_verifier token_url = f"{MCP_URL}/token" token_data = { "grant_type": "authorization_code", "client_id": client_id, "client_secret": client_secret, "code": code, "redirect_uri": CALLBACK_URL, "code_verifier": code_verifier } try: response = requests.post(token_url, data=token_data) if response.status_code != 200: error_html = f""" <html><body> <h1>Error getting token</h1> <p>Status code: {response.status_code}</p> <p>Response: {response.text}</p> </body></html> """.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(error_html)) self.end_headers() self.wfile.write(error_html) return token_response = response.json() access_token = token_response.get("access_token") refresh_token = token_response.get("refresh_token") print(f"Received access token: {access_token}") if refresh_token: print(f"Received refresh token: {refresh_token}") # Notify the main thread that we have the token token_event.set() # Return success page success_html = f""" <html><body> <h1>Authentication Successful!</h1> <p>You can now close this window and return to the terminal.</p> <h2>Token Information:</h2> <pre>{json.dumps(token_response, indent=2)}</pre> </body></html> """.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(success_html)) self.end_headers() self.wfile.write(success_html) except Exception as e: print(f"Error exchanging code for token: {str(e)}") self.send_response(500) self.end_headers() self.wfile.write(f"Error: {str(e)}".encode('utf-8')) def log_message(self, format, *args): """Override to disable logging.""" return def start_callback_server(client_data, code_verifier): """Start the callback server to receive the OAuth redirect.""" handler = TokenHandler with socketserver.TCPServer((SERVER_HOST, SERVER_PORT), handler) as httpd: print(f"Callback server started at {CALLBACK_URL}") # Add client data and code verifier to the server instance httpd.client_data = client_data httpd.code_verifier = code_verifier # Run the server until we get the token while not token_event.is_set(): httpd.handle_request() def main(): """Main entry point.""" # Register a client client_data = register_client() # Generate PKCE code verifier and challenge code_verifier = generate_code_verifier() code_challenge = generate_code_challenge(code_verifier) print(f"Code verifier: {code_verifier}") print(f"Code challenge: {code_challenge}") # Start the callback server in a separate thread server_thread = threading.Thread( target=start_callback_server, args=(client_data, code_verifier) ) server_thread.daemon = True server_thread.start() # Build the authorization URL client_id = client_data["client_id"] authorize_params = { "response_type": "code", "client_id": client_id, "redirect_uri": CALLBACK_URL, "code_challenge": code_challenge, "code_challenge_method": "S256", } authorize_url = f"{MCP_URL}/authorize?{urllib.parse.urlencode(authorize_params)}" print(f"Opening browser to authorize URL: {authorize_url}") webbrowser.open(authorize_url) # Wait for the token to be received print("Waiting for authorization...") token_event.wait(timeout=300) # 5 minutes timeout if not access_token: print("Failed to get access token within the timeout period.") return 1 print("\nAccesss token for use in debugging:") print(f"{access_token}") return 0 if __name__ == "__main__": sys.exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/norman-finance/norman-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server