Skip to main content
Glama

K8s MCP

by rahul007-bit
mcp-client-together.py11.3 kB
""" MCP Client with Together AI - STREAMING VERSION (FIXED) """ import asyncio import json import os from typing import List, Dict, Any from fastmcp import Client from together import Together class TogetherMCPClient: def __init__(self, mcp_server_url: str = "http://localhost:8000/sse", model: str = "meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key: str = None): self.mcp_server_url = mcp_server_url self.model = model self.api_key = api_key or os.getenv("TOGETHER_API_KEY") self.together_client = Together(api_key=self.api_key) self.mcp_client = None self.available_tools = [] self.conversation_history = [] async def connect(self): """Connect to the MCP server and discover available tools.""" self.mcp_client = Client(self.mcp_server_url) await self.mcp_client.__aenter__() tools_response = await self.mcp_client.list_tools() self.available_tools = tools_response print(f"✓ Connected to {self.mcp_server_url}") print(f"✓ Found {len(self.available_tools)} tools:") for tool in self.available_tools: print(f" • {tool.name}") print() async def disconnect(self): if self.mcp_client: await self.mcp_client.__aexit__(None, None, None) def _format_tools_for_together(self) -> List[Dict[str, Any]]: """Convert MCP tools to Together AI format.""" together_tools = [] for tool in self.available_tools: properties = {} required = [] if hasattr(tool, 'inputSchema') and tool.inputSchema: schema = tool.inputSchema properties = schema.get('properties', {}) required = schema.get('required', []) together_tools.append({ 'type': 'function', 'function': { 'name': tool.name, 'description': tool.description or f"Execute {tool.name}", 'parameters': { 'type': 'object', 'properties': properties, 'required': required } } }) return together_tools def _create_system_prompt(self) -> str: """Create system prompt for the model.""" return """You are a Kubernetes assistant with access to tools. RULES: 1. Only call tools when the user asks for K8s information 2. For greetings/general questions, respond directly WITHOUT tools 3. You can call multiple tools if needed 4. After tool results, provide a clear, formatted response Think: Does this query need tools? If no, just respond naturally.""" async def _call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Call an MCP tool.""" try: result = await self.mcp_client.call_tool(tool_name, arguments) if hasattr(result, 'content') and result.content: if isinstance(result.content, list): return '\n'.join([ item.text if hasattr(item, 'text') else str(item) for item in result.content ]) return str(result.content) return str(result) except Exception as e: return f"Error: {str(e)}" async def chat_stream(self, user_message: str, max_iterations: int = 5): """Process user message with streaming output.""" self.conversation_history.append({ 'role': 'user', 'content': user_message }) messages = [ {'role': 'system', 'content': self._create_system_prompt()}, *self.conversation_history ] tools = self._format_tools_for_together() iteration = 0 while iteration < max_iterations: iteration += 1 # Stream the response from Together AI stream = self.together_client.chat.completions.create( model=self.model, messages=messages, tools=tools, stream=True, ) current_content = "" tool_calls_list = [] tool_call_buffer = {} # Process stream chunks for chunk in stream: if not chunk.choices: continue delta = chunk.choices[0].delta # Handle content streaming if hasattr(delta, 'content') and delta.content: content = delta.content current_content += content print(content, end='', flush=True) # Handle tool calls (delta format) if hasattr(delta, 'tool_calls') and delta.tool_calls: for tc_delta in delta.tool_calls: # Get index - handle both object and dict if isinstance(tc_delta, dict): idx = tc_delta.get('index', 0) tc_id = tc_delta.get('id') tc_function = tc_delta.get('function', {}) else: idx = getattr(tc_delta, 'index', 0) tc_id = getattr(tc_delta, 'id', None) tc_function = getattr(tc_delta, 'function', None) # Initialize buffer for this index if idx not in tool_call_buffer: tool_call_buffer[idx] = { 'id': tc_id or f"call_{idx}", 'type': 'function', 'function': { 'name': '', 'arguments': '' } } # Update ID if present if tc_id: tool_call_buffer[idx]['id'] = tc_id # Update function name and arguments if tc_function: if isinstance(tc_function, dict): if 'name' in tc_function and tc_function['name']: tool_call_buffer[idx]['function']['name'] = tc_function['name'] if 'arguments' in tc_function and tc_function['arguments']: tool_call_buffer[idx]['function']['arguments'] += tc_function['arguments'] else: if hasattr(tc_function, 'name') and tc_function.name: tool_call_buffer[idx]['function']['name'] = tc_function.name if hasattr(tc_function, 'arguments') and tc_function.arguments: tool_call_buffer[idx]['function']['arguments'] += tc_function.arguments # Convert tool call buffer to list if tool_call_buffer: tool_calls_list = [tool_call_buffer[i] for i in sorted(tool_call_buffer.keys())] # Check if there were tool calls if tool_calls_list: print(f"\n\n[Iteration {iteration}] Calling {len(tool_calls_list)} tool(s)...") # Add assistant message with tool calls messages.append({ 'role': 'assistant', 'content': current_content or None, 'tool_calls': tool_calls_list }) for tool_call in tool_calls_list: tool_name = tool_call['function']['name'] tool_args_str = tool_call['function']['arguments'] # Parse arguments try: tool_args = json.loads(tool_args_str) if isinstance(tool_args_str, str) else tool_args_str except json.JSONDecodeError: tool_args = {} print(f" → {tool_name}({json.dumps(tool_args)})") tool_result = await self._call_mcp_tool(tool_name, tool_args) print(f" ✓ Got result") # Add tool result message messages.append({ 'role': 'tool', 'tool_call_id': tool_call['id'], 'name': tool_name, 'content': tool_result, }) print() continue # No tool calls - final response if current_content: self.conversation_history.append({ 'role': 'assistant', 'content': current_content }) break if iteration >= max_iterations: print("\n\n(Max iterations reached)") async def run_interactive(self): """Interactive chat loop with streaming.""" print("="*60) print(" MCP Client with Together AI - STREAMING MODE") print("="*60) print(f" Model: {self.model}") print(" Commands: clear, tools, exit") print("="*60) print() while True: try: user_input = input("\nYou: ").strip() if not user_input: continue if user_input.lower() in ['exit', 'quit']: break if user_input.lower() == 'clear': self.conversation_history = [] print("✓ History cleared") continue if user_input.lower() == 'tools': for tool in self.available_tools: print(f" • {tool.name}") continue print("\nA: ", end='', flush=True) await self.chat_stream(user_input) print() except KeyboardInterrupt: print("\n\nGoodbye!") break except Exception as e: print(f"\nError: {str(e)}") import traceback traceback.print_exc() async def main(): client = TogetherMCPClient( mcp_server_url="http://localhost:8000/sse", model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", ) try: await client.connect() await client.run_interactive() finally: await client.disconnect() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rahul007-bit/k8s-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server