Skip to main content
Glama
agent_client.py23.7 kB
#!/usr/bin/env python3 """ Reltio MCP Client Chat Bot A CLI chat bot that connects to Reltio MCP server using OAuth 2.0 authentication and provides an interactive interface for querying Reltio data. """ import asyncio import base64 import http.server import os import signal import sys import threading import time import urllib.parse import webbrowser from collections import deque from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import requests from langchain.chat_models import init_chat_model from langchain_core.messages import AIMessage, HumanMessage, ToolMessage from langchain_mcp_adapters.client import MultiServerMCPClient from langgraph.prebuilt import create_react_agent # ─────────── Configuration Constants ─────────── NAMESPACE="reltio_namespace" #namespace of the reltio instance RELTIO_CLIENT_ID = "client_id"#client id of the reltio instance RELTIO_CLIENT_SECRET = "client_secret"#client secret of the reltio instance # User Configuration - Only these need to be set by user MODEL_ID = "anthropic:claude-3-5-sonnet-20241022" # model id should be in the format of provider:model_id example: anthropic:claude-3-5-sonnet-20241022,google_genai:gemini-2.0-flash-001,openai:gpt-4o-mini API_KEY = "API_KEY" # User provides api key for the model provider # Environment variable names for different providers ENV_VAR_NAMES = { "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY", "azure_openai": "AZURE_OPENAI_API_KEY", "google_genai": "GOOGLE_API_KEY" } ##use this to configure model settings based on your needs model_configs={ "temperature":0.1, # "max_tokens":300, # "top_p":1, } ##system prompt for the agent SYSTEM_PROMPT = """You are a helpful AI assistant that works with Reltio data. Provide detailed and comprehensive answers, don't skip important details. Be helpful, accurate, and thorough in your responses.""" # Global flag for graceful shutdown shutdown_flag = False DEBUG_MODE=False ##if you want to see the debug messages set it to True # ─────────── Signal Handling ─────────── def signal_handler(signum, frame): """Handle interrupt signals for graceful shutdown.""" global shutdown_flag print("\n🛑 Received interrupt signal. Shutting down gracefully...") shutdown_flag = True # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # ─────────── Utility Functions ─────────── def detect_provider_from_model(model_id: str) -> str: """Detect provider from model ID.""" model_lower = model_id.lower() if model_lower.startswith(('gpt-', 'text-embedding-', 'dall-e')): return "openai" elif model_lower.startswith(('claude-', 'sonnet', 'opus', 'haiku')): return "anthropic" elif model_lower.startswith(('gemini-', 'text-bison', 'chat-bison')): return "google_genai" elif model_lower.startswith(('gpt-4', 'gpt-35')): return "azure_openai" else: return "anthropic" def get_token_expiration_time(token_created_time: int, expires_in: int) -> str: """Get human-readable expiration time for token.""" expiration_timestamp = token_created_time + expires_in exp_time = datetime.fromtimestamp(expiration_timestamp) return exp_time.strftime("%Y-%m-%d %H:%M:%S") def is_token_expired(token_created_time: int, expires_in: int, buffer_minutes: int = 5) -> bool: """Check if token is expired or will expire within buffer time.""" current_time = int(time.time()) buffer_seconds = buffer_minutes * 60 expiration_time = token_created_time + expires_in return current_time + buffer_seconds >= expiration_time def get_user_input(prompt: str = "\n👤 You: ") -> Optional[str]: """Get user input with proper interrupt handling.""" try: return input(prompt).strip() except (KeyboardInterrupt, EOFError): print("\n👋 Goodbye!") global shutdown_flag shutdown_flag = True return None # ─────────── OAuth Handler ─────────── class OAuthRedirectHandler(http.server.BaseHTTPRequestHandler): """HTTP request handler for OAuth redirect.""" def do_GET(self) -> None: """Handle GET request for OAuth callback.""" parsed = urllib.parse.urlparse(self.path) query = urllib.parse.parse_qs(parsed.query) if "code" in query: code = query["code"][0] print(f"✅ Received auth code: {code}") self.send_response(200) self.end_headers() self.wfile.write(b"<h1>Authorization successful. You can close this window.</h1>") self.server.auth_code = code else: self.send_response(400) self.end_headers() self.wfile.write(b"<h1>Missing code.</h1>") def log_message(self, format: str, *args) -> None: """Suppress server logs.""" pass # ─────────── OAuth Functions ─────────── def run_temp_server(port: int = 8123, client_id: str = "reltio_ui", auth_endpoint: Optional[str] = None) -> Optional[str]: """Run a temporary HTTP server to capture OAuth redirect.""" ports_to_try = [port, 8124, 8125, 8126, 8127] for current_port in ports_to_try: server = None try: server = http.server.HTTPServer(('localhost', current_port), OAuthRedirectHandler) if auth_endpoint: auth_url = f"{auth_endpoint}?client_id={client_id}&redirect_uri=http://localhost:{current_port}/callback&response_type=code" else: auth_url = f"https://login.reltio.com?client_id={client_id}&redirect_uri=http://localhost:{current_port}/callback&response_type=code" print(f"🌐 Starting OAuth server on http://localhost:{current_port}/callback ...") time.sleep(0.5) # Ensure server is ready webbrowser.open(auth_url) print(f"🌐 Waiting for auth redirect...") # Use a timeout to prevent hanging server.timeout = 60 # 60 second timeout server.handle_request() return getattr(server, "auth_code", None) except OSError as e: if "Address already in use" in str(e): print(f"⚠️ Port {current_port} is busy, trying next port...") if server: try: server.server_close() except: pass continue else: print(f"❌ Server error: {e}") return None except KeyboardInterrupt: print("\n🛑 OAuth process interrupted by user") return None except Exception as e: print(f"❌ Error in OAuth server: {e}") return None finally: if server: try: server.server_close() except: pass print(f"❌ Could not find an available port. Tried: {ports_to_try}") return None def fetch_oauth_metadata(base_url: str) -> Tuple[bool, Any]: """Fetch OAuth metadata from the well-known endpoint.""" try: domain = base_url.replace('/ai/tools/mcp/', '') oauth_url = f"{domain}/ai/tools/.well-known/oauth-authorization-server" print(f"🔍 Fetching OAuth metadata from: {oauth_url}") response = requests.get(oauth_url, timeout=30) if response.status_code == 200: metadata = response.json() print(f"✅ OAuth metadata fetched successfully") return True, metadata else: return False, f"Status {response.status_code}: {response.text}" except Exception as e: return False, str(e) def exchange_code_for_token( token_endpoint: str, auth_code: str, client_id: str, client_secret: str ) -> Tuple[bool, Any]: """Exchange authorization code for access token.""" headers = { 'Content-Type': 'application/json', "Authorization": f"Basic {base64.b64encode(f'{client_id}:{client_secret}'.encode()).decode()}" } data = { 'grant_type': 'authorization_code', 'code': auth_code, } try: print(f"🔄 Exchanging auth code for token at: {token_endpoint}") response = requests.post(token_endpoint, json=data, headers=headers, timeout=30) if response.status_code == 200: token_data = response.json() print(f"✅ Access token obtained successfully") return True, token_data else: return False, f"Status {response.status_code}: {response.text}" except Exception as e: return False, str(e) # ─────────── Reltio MCP Client ─────────── class ReltioMcpClient: """Client for Reltio MCP server with OAuth authentication.""" def __init__(self, mcp_url: str, client_id: str, client_secret: str): self.mcp_url = mcp_url self.client_id = client_id self.client_secret = client_secret self.access_token: Optional[str] = None self.token_created_time: Optional[int] = None self.expires_in: Optional[int] = None self.token_endpoint: Optional[str] = None self.authorization_endpoint: Optional[str] = None self.oauth_metadata: Optional[Dict[str, Any]] = None def authenticate(self) -> bool: """Perform OAuth authentication flow.""" print(f"🔐 Starting OAuth authentication for: {self.mcp_url}") success, metadata = fetch_oauth_metadata(self.mcp_url) if not success: print(f"❌ Failed to fetch OAuth metadata: {metadata}") return False self.oauth_metadata = metadata self.authorization_endpoint = metadata.get("authorization_endpoint") self.token_endpoint = metadata.get("token_endpoint") if not self.authorization_endpoint or not self.token_endpoint: print(f"❌ Missing required OAuth endpoints in metadata") return False print(f"📍 Authorization endpoint: {self.authorization_endpoint}") print(f"📍 Token endpoint: {self.token_endpoint}") return self._perform_authentication() def _perform_authentication(self) -> bool: """Perform the actual authentication steps.""" auth_code = run_temp_server(client_id=self.client_id, auth_endpoint=self.authorization_endpoint) if not auth_code: print(f"❌ Failed to obtain authorization code") return False success, token_data = exchange_code_for_token( self.token_endpoint, auth_code, self.client_id, self.client_secret ) if not success: print(f"❌ Failed to exchange code for token: {token_data}") return False self.access_token = token_data.get("access_token") self.expires_in = token_data.get("expires_in") self.token_created_time = int(time.time()) if not self.access_token: print(f"❌ No access token in response") return False if not self.expires_in: print(f"⚠️ No expires_in in token response") self.expires_in = 3600 # Default to 1 hour if not provided exp_time = get_token_expiration_time(self.token_created_time, self.expires_in) print(f"⏰ Token expires at: {exp_time}") print(f"⏰ Token expires in: {self.expires_in} seconds") print(f"✅ Authentication successful!") return True def validate_and_refresh_token(self) -> bool: """Validate token and refresh if needed.""" if not self.access_token or not self.token_created_time or not self.expires_in: print("🔐 No access token available. Starting authentication...") return self.authenticate() if is_token_expired(self.token_created_time, self.expires_in): print("⏰ Token has expired. Starting re-authentication...") return self.authenticate() exp_time = get_token_expiration_time(self.token_created_time, self.expires_in) print(f"✅ Token is valid until: {exp_time}") return True def get_valid_token(self) -> Optional[str]: """Get a valid access token, refreshing if necessary.""" if self.validate_and_refresh_token(): return self.access_token return None # ─────────── Chat Bot ─────────── class ChatBot: """Interactive chat bot for Reltio MCP interactions.""" def __init__(self, agent, mcp_client: ReltioMcpClient, max_history: int = 20): self.agent = agent self.mcp_client = mcp_client self.message_history: deque = deque(maxlen=max_history) self.system_message = {"role": "system", "content": SYSTEM_PROMPT} def add_message(self, message) -> None: """Add a message to the history.""" self.message_history.append(message) def get_messages_for_agent(self) -> List[Dict[str, str]]: """Get messages in the format expected by the agent.""" messages = [self.system_message] messages.extend(list(self.message_history)) return messages async def chat_loop(self) -> None: """Main chat loop.""" self._print_welcome_message() while not shutdown_flag: try: if shutdown_flag: break user_input = get_user_input() if shutdown_flag or user_input is None: break if self._handle_special_commands(user_input): continue if not self.mcp_client.get_valid_token(): print("❌ Authentication failed. Please try again.") continue await self._process_user_message(user_input) except KeyboardInterrupt: print("\n👋 Interrupted by user") break except Exception as e: print(f"❌ Error: {e}") print("Please try again.") print("👋 Chat session ended.") def _print_welcome_message(self) -> None: """Print welcome message and available commands.""" print("🤖 Reltio MCP Chat Bot") print("=" * 50) print("Type 'quit', 'exit', or 'bye' to end the conversation") print("Type 'clear' to clear message history") print("Type 'history' to see recent messages") print("Type 'token' to check token status") print("Type 'reauth' to force re-authentication") print("Press Ctrl+C to exit at any time") print("-" * 50) def _handle_special_commands(self, user_input: str) -> bool: """Handle special commands and return True if command was handled.""" if user_input.lower() in ['quit', 'exit', 'bye']: print("👋 Goodbye!") global shutdown_flag shutdown_flag = True return True elif user_input.lower() == 'clear': self.message_history.clear() print("🗑️ Message history cleared!") return True elif user_input.lower() == 'history': self.show_history() return True elif user_input.lower() == 'token': self.show_token_status() return True elif user_input.lower() == 'reauth': print("🔄 Forcing re-authentication...") if self.mcp_client.authenticate(): print("✅ Re-authentication successful!") else: print("❌ Re-authentication failed!") return True elif not user_input: return True return False async def _process_user_message(self, user_input: str) -> None: """Process a user message and generate response.""" user_message = HumanMessage(content=user_input) self.add_message(user_message) messages = self.get_messages_for_agent() print("🤖 Bot: Thinking...") response = await self.agent.ainvoke({"messages": messages}) if "messages" in response: for msg in response["messages"]: if isinstance(msg, (AIMessage, HumanMessage, ToolMessage)): self.add_message(msg) combined_response = self._combine_response_messages(response.get("messages", [])) if combined_response: print(f"🤖 Bot: {combined_response}") else: print("🤖 Bot: Sorry, I couldn't generate a response.") def _combine_response_messages(self, messages: List) -> str: """Combine all messages from the response into a readable format.""" if not messages: return "" response_parts = [] for msg in messages[::-1]: if isinstance(msg, AIMessage): if msg.content: if isinstance(msg.content, list): combined_content = self._process_content_list(msg.content) if combined_content: response_parts.append(combined_content) else: content = str(msg.content) if msg.content else "" if content: response_parts.append(content) elif isinstance(msg, ToolMessage): continue elif isinstance(msg, HumanMessage): break string_parts = [str(part) for part in response_parts[::-1] if part] return "\n".join(string_parts) if string_parts else "" def _process_content_list(self, content_list: List) -> str: """Process a list of content elements and combine them intelligently.""" if not content_list: return "" combined_parts = [] for item in content_list: if isinstance(item, dict): if item.get('type') == 'text' and item.get('text'): combined_parts.append(item['text']) elif item.get('type') == 'tool_use': tool_name = item.get('name', 'Unknown Tool') tool_input = item.get('input', {}) tool_info = f"[Tool: {tool_name}] {tool_input}" combined_parts.append(tool_info) return " ".join(combined_parts) if combined_parts else "" def show_history(self) -> None: """Show recent message history.""" if not self.message_history: print("📝 No message history yet.") return print("\n📝 Recent Messages:") print("-" * 30) for i, msg in enumerate(self.message_history, 1): role_emoji, role_name = self._get_message_role_info(msg) content = msg.content[:100] + "..." if len(msg.content) > 100 else msg.content print(f"{i}. {role_emoji} {role_name}: {content}") print("-" * 30) def _get_message_role_info(self, msg) -> Tuple[str, str]: """Get emoji and role name for a message.""" if isinstance(msg, HumanMessage): return "👤", "User" elif isinstance(msg, AIMessage): return "🤖", "Assistant" elif isinstance(msg, ToolMessage): return "🔧", "Tool" else: return "❓", "Unknown" def show_token_status(self) -> None: """Show current token status.""" if not self.mcp_client.access_token: print("🔑 No access token available") return if not self.mcp_client.token_created_time or not self.mcp_client.expires_in: print("🔑 Token information incomplete") return exp_time = get_token_expiration_time(self.mcp_client.token_created_time, self.mcp_client.expires_in) is_expired = is_token_expired(self.mcp_client.token_created_time, self.mcp_client.expires_in) print("\n🔑 Token Status:") print("-" * 20) print(f"Status: {'❌ Expired' if is_expired else '✅ Valid'}") print(f"Expires: {exp_time}") print(f"Expires in: {self.mcp_client.expires_in} seconds") print(f"Token: {self.mcp_client.access_token[:20]}...") print("-" * 20) # ─────────── Main Logic ─────────── async def main() -> None: """Main function to run the Reltio MCP chat bot.""" RELTIO_MCP_SERVER = f"https://{NAMESPACE}.reltio.com/ai/tools/mcp/" try: config = { "RELTIO_MCP_SERVER": RELTIO_MCP_SERVER.rstrip('/') + '/', "RELTIO_CLIENT_ID": RELTIO_CLIENT_ID, "RELTIO_CLIENT_SECRET": RELTIO_CLIENT_SECRET, "MODEL_ID": MODEL_ID, "API_KEY": API_KEY } _print_configuration(config) provider = detect_provider_from_model(config['MODEL_ID']) os.environ[ENV_VAR_NAMES[provider]] = config["API_KEY"] client = ReltioMcpClient( config["RELTIO_MCP_SERVER"], config["RELTIO_CLIENT_ID"], config["RELTIO_CLIENT_SECRET"] ) if not client.authenticate(): print("❌ Authentication failed. Exiting.") return mcp_servers = { "reltio_server": { "transport": "streamable_http", "url": config["RELTIO_MCP_SERVER"], "headers": {"Authorization": f"Bearer {client.access_token}"}, } } mcp_client = MultiServerMCPClient(mcp_servers) tools = await mcp_client.get_tools() print("✅ Loaded tools:", [t.name for t in tools]) model = init_chat_model(config["MODEL_ID"],**model_configs) agent = create_react_agent(model, tools=tools,debug=DEBUG_MODE) chat_bot = ChatBot(agent, client) await chat_bot.chat_loop() except KeyboardInterrupt: print("\n👋 Application interrupted by user") except Exception as e: print(f"❌ Fatal error: {e}") finally: print("🔄 Cleaning up resources...") def _print_configuration(config: Dict[str, str]) -> None: """Print configuration information.""" print("🔧 Using Configuration:") print(f" MCP Server: {config['RELTIO_MCP_SERVER']}") print(f" Client ID: {config['RELTIO_CLIENT_ID']}") print(f" Provider: {detect_provider_from_model(config['MODEL_ID'])}") print(f" Model: {config['MODEL_ID']}") print() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n👋 Application terminated by user") except Exception as e: print(f"❌ Unexpected error: {e}") finally: print("🏁 Application shutdown complete")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reltio-ai/reltio-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server