Skip to main content
Glama

MCP Everything

by s2005
mcp_client.py4.88 kB
import asyncio import sys from mcp.client.stdio import stdio_client from mcp import StdioServerParameters # Changed import from mcp.client.session import ClientSession from mcp.types import TextContent async def call_echo_tool(session: ClientSession, message: str) -> str: """Calls the 'echo' tool on the server and returns the text response.""" print(f"[CLIENT_LOG] Attempting to call 'echo' tool with message: '{message}'", flush=True) try: tool_result = await session.call_tool(name="echo", arguments={"message": message}) print(f"[CLIENT_LOG] 'echo' tool raw result: {tool_result}", flush=True) if tool_result.content and isinstance(tool_result.content, list) and len(tool_result.content) > 0: first_content = tool_result.content[0] if isinstance(first_content, TextContent) and hasattr(first_content, 'text'): print("[CLIENT_LOG] Parsed TextContent from 'echo' tool.", flush=True) return first_content.text elif isinstance(first_content, dict) and 'text' in first_content: print("[CLIENT_LOG] Parsed dict from 'echo' tool.", flush=True) return first_content['text'] elif isinstance(first_content, str): print("[CLIENT_LOG] Parsed str from 'echo' tool.", flush=True) return first_content error_message = f"Error: Could not parse echo tool result or result was empty. Result: {tool_result}" print(error_message, file=sys.stderr, flush=True) return error_message except Exception as e: error_detail = f"Exception during 'echo' tool call: {e}" print(error_detail, file=sys.stderr, flush=True) return error_detail async def main(): server_executable = sys.executable # Changed variable name server_args = ["mcp_server.py"] # Changed variable name # Use StdioServerParameters and pass command and args separately server_params = StdioServerParameters(command=server_executable, args=server_args, cwd=".") # Changed class and params try: print("Attempting to start and connect to MCP server...") async with stdio_client(server_params) as (reader, writer): print("Connected to server's stdio. Initializing client session...") async with ClientSession(reader, writer) as session: print("Client session started. Initializing MCP session...") init_response = await session.initialize() if init_response and init_response.server_info: print(f"Server initialized: Name={init_response.server_info.name}, Version={init_response.server_info.version}") else: print("Server initialization response not as expected or failed.", file=sys.stderr) return echo_message = "Hello MCP!" print(f"\nCalling 'echo' tool with message: '{echo_message}'") echo_response = await call_echo_tool(session, echo_message) print(f"--> 'echo' tool response: '{echo_response}'") a_val, b_val = 5, 7 print(f"\nCalling 'add' tool with a={a_val}, b={b_val}") add_response_structured = await session.call_tool(name="add", arguments={"a": a_val, "b": b_val}) add_response_text = f"Error: Could not parse 'add' tool result. Result: {add_response_structured}" if add_response_structured.content and \ isinstance(add_response_structured.content, list) and \ len(add_response_structured.content) > 0: first_content_add = add_response_structured.content[0] if isinstance(first_content_add, TextContent) and hasattr(first_content_add, 'text'): add_response_text = first_content_add.text elif isinstance(first_content_add, dict) and 'text' in first_content_add: add_response_text = first_content_add['text'] print(f"--> 'add' tool response: '{add_response_text}'") print("\nClient operations complete.") except ConnectionRefusedError: print("Connection to MCP server failed. Ensure the server script (mcp_server.py) is executable and correctly configured.", file=sys.stderr) except FileNotFoundError: # Corrected to refer to server_args[0] for the script name print(f"Error: Server script '{server_args[0]}' not found in the current directory. Ensure paths are correct. CWD for server: {server_params.cwd}", file=sys.stderr) except Exception as e: print(f"An unexpected error occurred: {e}", file=sys.stderr) finally: print("Client finished.") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/s2005/mcp-everything'

If you have feedback or need assistance with the MCP directory API, please join our Discord server