Skip to main content
Glama

MCP Chat

agent_service.py2.34 kB
import asyncio from openai import AsyncOpenAI from agents import Agent, OpenAIChatCompletionsModel, Runner, RunResult from agents.tool import FunctionTool from mcp.types import Tool from core.tools import ToolManager from mcp_client import MCPClient async def convert_to_sdk_tool(tools_schema: list[Tool], mcp_clients: dict[str, MCPClient]) -> list[FunctionTool]: converted_tools = [] for tool in tools_schema: client = await ToolManager._find_client_with_tool( list(mcp_clients.values()), tool.name ) if client: converted_tools.append( FunctionTool( name=tool.name, description=tool.description or "", params_json_schema=tool.inputSchema, on_invoke_tool=ToolManager.execute_tool_dynamically(tool.name, client) ) ) else: raise ValueError(f"No client found for tool: {tool.name}") return converted_tools class AgentService: def __init__(self, model: str, api_key: str, base_url: str | None = None, clients=None): self.model = model self.api_key = api_key self.messages = [] self.client = AsyncOpenAI( api_key=api_key, base_url=base_url if base_url else None ) self.agent = Agent( name="Assistant", instructions="You are a helpful AI assistant that can use tools to answer questions.", model=OpenAIChatCompletionsModel( model=model, openai_client=self.client ), ) async def chat( self, query: str, system=None, mcp_clients: dict[str, MCPClient] = {}, ) -> RunResult: if system: self.agent.instructions = system tools = await ToolManager.get_all_tools(mcp_clients) if tools: self.agent.tools = await convert_to_sdk_tool(tools, mcp_clients) or [] # type: ignore self.messages.append({"role": "user", "content": query}) result = await Runner.run( self.agent, self.messages ) self.messages = result.to_input_list() return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Abdullah-1121/MCP-2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server