Skip to main content
Glama
mcp_claude_client.py5.48 kB
""" This Client Example is for RELTIO ENTERPRISE MCP SERVER ONLY """ import asyncio import base64 import requests from typing import Optional from contextlib import AsyncExitStack from anthropic import Anthropic from dotenv import load_dotenv from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client load_dotenv() # Constants NAMESPACE="namepace" #reltio namspace eg test prod, etc MODEL_NAME = "model_name" #claude-3-5-sonnet-20241022 MESSAGE_HISTORY_LIMIT = 10 CLIENT_ID ="client_id" #reltio client id CLIENT_SECRET = "client_secret" #reltio client secret AUTH_SERVER_URL = "https://auth.reltio.com" #reltio auth server url for prod: https://auth.reltio.com, for stg: https://auth-stg.reltio.com API_KEY="api_key" #model api key def get_auth_token() -> str: """Fetch access token using client credentials.""" credentials = f"{CLIENT_ID}:{CLIENT_SECRET}" basic_token = base64.b64encode(credentials.encode()).decode() headers = { "Authorization": f"Basic {basic_token}", "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", } response = requests.post(f"{AUTH_SERVER_URL}/oauth/token?grant_type=client_credentials", headers=headers) response.raise_for_status() token_payload = response.json() return token_payload["access_token"] class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.anthropic = Anthropic(api_key=API_KEY) self.exit_stack = AsyncExitStack() self.messages = [] self.token = get_auth_token() async def connect_to_streamable_http_server(self, server_url: str) -> None: """Establish a Streamable HTTP connection to the MCP server and initialize the client session.""" headers = {"Authorization": f"Bearer {self.token}"} self._streams_context = streamablehttp_client(url=server_url, headers=headers) read_stream, write_stream, _ = await self._streams_context.__aenter__() self._session_context = ClientSession(read_stream, write_stream) self.session: ClientSession = await self._session_context.__aenter__() await self.session.initialize() response = await self.session.list_tools() tool_names = [tool.name for tool in response.tools] print("Initialized Streamable HTTP client...") print("Connected tools:", tool_names) async def cleanup(self) -> None: """Clean up resources and exit contexts properly.""" if self._session_context: await self._session_context.__aexit__(None, None, None) if self._streams_context: await self._streams_context.__aexit__(None, None, None) async def process_query(self, query: str) -> str: """Send a query to Claude, handle any tool invocations, and return the response.""" self.messages.append({"role": "user", "content": query}) response = await self.session.list_tools() tools = [ {"name": tool.name, "description": tool.description, "input_schema": tool.inputSchema} for tool in response.tools ] final_text = [] claude_response = self.anthropic.messages.create( model=MODEL_NAME, max_tokens=1000, messages=self.messages, tools=tools, ) for content in claude_response.content: if content.type == "text": final_text.append(content.text) elif content.type == "tool_use": tool_name = content.name tool_args = content.input result = await self.session.call_tool(tool_name, tool_args) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]") if getattr(content, "text", None): self.messages.append({"role": "assistant", "content": content.text}) self.messages.append({"role": "user", "content": result.content}) claude_followup = self.anthropic.messages.create( model=MODEL_NAME, max_tokens=1000, messages=self.messages, ) if claude_followup.content: followup_text = claude_followup.content[0].text final_text.append(followup_text) assistant_reply = "\n".join(final_text) self.messages.append({"role": "assistant", "content": assistant_reply}) self.messages = self.messages[-MESSAGE_HISTORY_LIMIT:] return assistant_reply async def chat_loop(self) -> None: """Run an interactive REPL for chatting with Claude.""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == "quit": break response = await self.process_query(query) print("\n" + response) except Exception as err: print(f"\nError: {str(err)}") async def main(): client = MCPClient() mcp_url=f"https://{NAMESPACE}.reltio.com/ai/tools/mcp/" try: await client.connect_to_streamable_http_server(mcp_url) await client.chat_loop() finally: await client.cleanup() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reltio-ai/reltio-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server