Skip to main content
Glama

K8s MCP

by rahul007-bit
mcp_client.py13.7 kB
""" MCP Client with Google Gemini - STREAMING VERSION (New SDK) """ import asyncio import json import os from typing import List, Dict, Any from fastmcp import Client from google import genai GENI_API_KEY = os.getenv("GOOGLE_API_KEY") if not GENI_API_KEY: raise ValueError("GOOGLE_API_KEY environment variable not set") class GeminiMCPClient: def __init__(self, mcp_server_url: str = "http://localhost:8000/sse", model: str = "gemini-2.5-flash", api_key: str = ""): self.mcp_server_url = mcp_server_url self.model_name = model self.api_key = api_key or os.getenv("GOOGLE_API_KEY") print(self.api_key) self.client = genai.Client(api_key=self.api_key) self.mcp_client = None self.available_tools = [] self.conversation_history = [] def _clean_schema_for_gemini(self, schema: Dict) -> Dict: """Remove Gemini-incompatible fields from schema.""" if not isinstance(schema, dict): return schema cleaned = {} for key, value in schema.items(): # Skip incompatible fields if key in ['additional_properties', 'additionalProperties', 'anyOf', 'any_of', 'allOf', 'all_of', 'oneOf', 'one_of']: continue # Recursively clean nested objects if isinstance(value, dict): cleaned[key] = self._clean_schema_for_gemini(value) elif isinstance(value, list): cleaned[key] = [ self._clean_schema_for_gemini(item) if isinstance(item, dict) else item for item in value ] else: cleaned[key] = value return cleaned async def connect(self): """Connect to the MCP server and discover available tools.""" self.mcp_client = Client(self.mcp_server_url) await self.mcp_client.__aenter__() tools_response = await self.mcp_client.list_tools() self.available_tools = tools_response print(f"✓ Connected to {self.mcp_server_url}") print(f"✓ Found {len(self.available_tools)} tools:") for tool in self.available_tools: print(f" • {tool.name}") print() async def disconnect(self): if self.mcp_client: await self.mcp_client.__aexit__(None, None, None) def _format_tools_for_gemini(self) -> List[Dict]: """Convert MCP tools to Gemini format.""" gemini_tools = [] for tool in self.available_tools: properties = {} required = [] if hasattr(tool, 'inputSchema') and tool.inputSchema: schema = tool.inputSchema raw_properties = schema.get('properties', {}) required = schema.get('required', []) # Clean each property for Gemini compatibility for prop_name, prop_schema in raw_properties.items(): properties[prop_name] = self._clean_schema_for_gemini(prop_schema) gemini_tools.append({ 'function_declarations': [{ 'name': tool.name, 'description': tool.description or f"Execute {tool.name}", 'parameters': { 'type': 'OBJECT', 'properties': properties, 'required': required } }] }) return gemini_tools def _create_system_instruction(self) -> str: """Create system instruction for the model.""" return """You are a Senior DevOps Engineer and CKA (Certified Kubernetes Administrator) certified expert assistant working in a CRITICAL PRODUCTION ENVIRONMENT. PERSONA & EXPERTISE: - Senior DevOps Engineer with 10+ years of experience in cloud-native technologies - CKA Certified Kubernetes Administrator with deep understanding of K8s internals - Expert in production-grade deployments, high availability, and disaster recovery - Strong background in security best practices, RBAC, and compliance - Experienced in multi-cluster management, GitOps, and CI/CD pipelines - Proficient in troubleshooting complex distributed systems under pressure CRITICAL ENVIRONMENT PROTOCOLS: ⚠️ PRODUCTION SYSTEM - Exercise extreme caution with all operations - Always verify namespace, cluster context, and resource names before operations - Never perform destructive operations without explicit user confirmation - Prioritize system stability and minimize downtime - Flag any high-risk operations (deletions, scaling, restarts) with clear warnings - Assume zero-downtime requirements unless stated otherwise OPERATIONAL RULES: 1. Tool Usage: - Only call K8s tools when user requests specific cluster information - For greetings/general questions, respond directly WITHOUT tools - Call multiple tools if needed for comprehensive analysis - Always validate tool results before presenting to user 2. Response Format: - Provide clear, professionally formatted responses - Use tables for structured data (pods, deployments, services) - Include relevant metrics (CPU, memory, replicas, age) - Highlight anomalies, errors, or potential issues - Add actionable recommendations when problems detected 3. Communication Style: - Professional, concise, and technically accurate - Use industry-standard terminology - Explain complex concepts clearly when needed - Proactively warn about risks and implications - Suggest best practices aligned with production standards 4. Decision Framework: - Does this query require cluster data? → Use tools - Is this a destructive operation? → Request confirmation + warn - Is this a general question? → Respond directly with expertise - Is there ambiguity? → Ask clarifying questions before acting 5. Change Verification Protocol: - After ANY create/update/delete operation, ALWAYS verify the change - Use appropriate kubectl get/describe commands to confirm resource state - Check that pods are running, services are ready, and configurations are applied - Verify rollout status for deployments and statefulsets - Report back to user with confirmation that changes are live and healthy - If verification fails, immediately alert user and provide troubleshooting steps SECURITY & COMPLIANCE: - Never expose sensitive data (secrets, tokens, passwords) - Respect RBAC policies and namespace boundaries - Assume all environments are production unless specified as dev/test - Follow principle of least privilege Think before acting: Does this require tools? What's the risk level? How can I provide maximum value safely?""" async def _call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Call an MCP tool.""" try: result = await self.mcp_client.call_tool(tool_name, arguments) if hasattr(result, 'content') and result.content: if isinstance(result.content, list): return '\n'.join([ item.text if hasattr(item, 'text') else str(item) for item in result.content ]) return str(result.content) return str(result) except Exception as e: return f"Error: {str(e)}" async def chat_stream(self, user_message: str, max_iterations: int = 10): """Process user message with streaming output.""" # Add user message to history self.conversation_history.append({ 'role': 'user', 'parts': [{'text': user_message}] }) tools = self._format_tools_for_gemini() iteration = 0 while iteration < max_iterations: iteration += 1 # Create config config = { 'system_instruction': self._create_system_instruction(), 'tools': tools if tools else None, } # Generate content with streaming response = self.client.models.generate_content_stream( model=self.model_name, contents=self.conversation_history, config=config ) current_text = "" function_calls = [] # Process streaming chunks for chunk in response: # Check for function calls and text in parts if hasattr(chunk, 'candidates') and chunk.candidates: for candidate in chunk.candidates: if hasattr(candidate, 'content') and candidate.content: if hasattr(candidate.content, 'parts') and candidate.content.parts: for part in candidate.content.parts: # Handle text parts if hasattr(part, 'text') and part.text: current_text += part.text print(part.text, end='', flush=True) # Handle function calls if hasattr(part, 'function_call') and part.function_call: function_calls.append(part.function_call) # If no function calls, we're done if not function_calls: if current_text: self.conversation_history.append({ 'role': 'model', 'parts': [{'text': current_text}] }) break # Execute function calls print(f"\n\n[Iteration {iteration}] Calling {len(function_calls)} tool(s)...") # Add assistant message with function calls self.conversation_history.append({ 'role': 'model', 'parts': [{'function_call': fc} for fc in function_calls] }) # Execute each function and collect responses function_response_parts = [] for fc in function_calls: tool_name = fc.name tool_args = dict(fc.args) if hasattr(fc, 'args') else {} print(f" → {tool_name}({json.dumps(tool_args)})") # Call the MCP tool tool_result = await self._call_mcp_tool(tool_name, tool_args) print(f" ✓ Got result") # Add function response function_response_parts.append({ 'function_response': { 'name': tool_name, 'response': {'result': tool_result} } }) # Add function responses to history self.conversation_history.append({ 'role': 'user', 'parts': function_response_parts }) print() if iteration >= max_iterations: print("\n\n(Max iterations reached)") async def run_interactive(self): """Interactive chat loop with streaming.""" print("="*60) print(" MCP Client with Google Gemini - STREAMING MODE") print("="*60) print(f" Model: {self.model_name}") print(" Commands: clear, tools, exit") print("="*60) print() while True: try: user_input = input("\nYou: ").strip() if not user_input: continue if user_input.lower() in ['exit', 'quit']: break if user_input.lower() == 'clear': self.conversation_history = [] print("✓ History cleared") continue if user_input.lower() == 'tools': for tool in self.available_tools: print(f" • {tool.name}") continue print("\nA: ", end='', flush=True) await self.chat_stream(user_input) print() except KeyboardInterrupt: print("\n\nGoodbye!") break except Exception as e: print(f"\nError: {str(e)}") import traceback traceback.print_exc() async def main(): client = GeminiMCPClient( mcp_server_url="http://localhost:8000/sse", model="gemini-2.5-flash", api_key= GENI_API_KEY ) try: await client.connect() await client.run_interactive() finally: await client.disconnect() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rahul007-bit/k8s-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server