Skip to main content
Glama
mcp_gemini_client.py6.79 kB
""" This Client Example is for RELTIO ENTERPRISE MCP SERVER ONLY """ import asyncio import os import base64 import requests from typing import Deque, List, Optional from contextlib import AsyncExitStack from collections import deque from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from google import genai from google.genai import types # Constants MODEL_NAME: str = "gemini-2.0-flash" #model name MESSAGE_HISTORY_LIMIT = 10 #message history limit CLIENT_ID ="client_id" #reltio client id CLIENT_SECRET ="client_secret" #reltio client secret TOKEN_URL = "https://auth.reltio.com" #reltio auth server url for prod: https://auth.reltio.com, for stg: https://auth-stg.reltio.com NAMESPACE = "namespace" #reltio namespace eg test prod, etc GOOGLE_API_KEY = "MODEL_API_KEY" #model api key def get_auth_token() -> str: credentials = f"{CLIENT_ID}:{CLIENT_SECRET}" basic_token = base64.b64encode(credentials.encode()).decode() headers = { "Authorization": f"Basic {basic_token}", "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", } response = requests.post(f"{TOKEN_URL}/oauth/token?grant_type=client_credentials", headers=headers) response.raise_for_status() return response.json()["access_token"] class MCPChatClient: def __init__(self) -> None: self._streams_ctx: Optional[AsyncExitStack] = None self._session_ctx: Optional[AsyncExitStack] = None self.session: Optional[ClientSession] = None self.token = get_auth_token() self.gemini = genai.Client(api_key=GOOGLE_API_KEY) self.history: Deque[types.Content] = deque(maxlen=MESSAGE_HISTORY_LIMIT) async def connect(self, server_url: str) -> None: headers = {"Authorization": f"Bearer {self.token}"} self._streams_ctx = streamablehttp_client(url=server_url, headers=headers) read_stream, write_stream, _ = await self._streams_ctx.__aenter__() self._session_ctx = ClientSession(read_stream, write_stream) self.session = await self._session_ctx.__aenter__() await self.session.initialize() tools = (await self.session.list_tools()).tools print(f"Connected to MCP server; available tools: {[tool.name for tool in tools]}") async def cleanup(self) -> None: if self._session_ctx: await self._session_ctx.__aexit__(None, None, None) if self._streams_ctx: await self._streams_ctx.__aexit__(None, None, None) def _filter_tool_schema(self, tool: types.Tool) -> types.Tool: schema_keys = list(types.Schema.model_fields.keys()) if not hasattr(tool, 'inputSchema') or not tool.inputSchema or 'properties' not in tool.inputSchema: return tool def filter_nested_dict(d): if not isinstance(d, dict): return keys_to_remove = [k for k in d.keys() if k not in schema_keys] for k in keys_to_remove: d.pop(k, None) for key, value in d.items(): if isinstance(value, dict): filter_nested_dict(value) elif isinstance(value, list): for item in value: if isinstance(item, dict): filter_nested_dict(item) props = tool.inputSchema.get("properties", {}) for key, value in props.items(): if isinstance(value, dict): filter_nested_dict(value) return tool async def process(self, user_query: str) -> str: assert self.session, "MCP session not initialized" self.history.append(types.Content(role="user", parts=[types.Part(text=user_query)])) tools = (await self.session.list_tools()).tools filtered_tools = [self._filter_tool_schema(tool) for tool in tools] gemini_tools: List[types.Tool] = [ types.Tool(function_declarations=[ types.FunctionDeclaration( name=tool.name, description=tool.description, parameters=tool.inputSchema ) ]) for tool in filtered_tools ] config = types.GenerateContentConfig(tools=gemini_tools) contents = list(self.history) base_response = self.gemini.models.generate_content( model=MODEL_NAME, contents=contents, config=config, ) candidate = base_response.candidates[0] response_text = "" for part in candidate.content.parts: function_call = part.function_call if function_call: response_text += f"\nCalling tool: {function_call.name} with args: {function_call.args}\n" result = await self.session.call_tool(function_call.name, function_call.args) self.history.append(types.Content( role="assistant", parts=[types.Part(function_call=function_call)] )) result_part = types.Part.from_function_response( name=function_call.name, response={"result": result}, ) self.history.append(types.Content(role="model", parts=[result_part])) final_resp = self.gemini.models.generate_content( model=MODEL_NAME, contents=list(self.history), config=config, ) response_text += final_resp.candidates[0].content.parts[0].text or "" else: response_text += part.text or "" self.history.append(types.Content(role="model", parts=[types.Part(text=response_text)])) return response_text async def chat_loop(self) -> None: print("Starting chat session (type 'quit' to exit)\n") while True: user_input = input("You: ").strip() if user_input.lower() in ("quit", "exit"): break try: answer = await self.process(user_input) print(f"Assistant: {answer}\n") except AssertionError as ae: print(f"Configuration error: {ae}") break except Exception as exc: print(f"Error processing query: {exc}") async def main() -> None: client = MCPChatClient() try: mcp_server_url = f"https://{NAMESPACE}.reltio.com/ai/tools/mcp/" # Full URL like https://<ns>.reltio.com/ai/tools/mcp/ await client.connect(server_url=mcp_server_url) await client.chat_loop() finally: await client.cleanup() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reltio-ai/reltio-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server