Skip to main content
Glama
enhanced_client.py•10.7 kB
#!/usr/bin/env python3 """ Enhanced MCP Client - Full-featured client for your MCP server """ import asyncio import json import logging from typing import Dict, List, Any, Optional from datetime import datetime import sys import os # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from mcp_client.base_client import BaseMCPClient, MCPResponse class EnhancedMCPClient(BaseMCPClient): """Enhanced MCP client with advanced features.""" def __init__(self, server_url: str = "http://localhost:8000", client_name: str = "Enhanced MCP Client"): super().__init__(server_url, client_name) # Enhanced capabilities self.capabilities.update({ "document_processing": True, "agent_collaboration": True, "file_upload": True, "session_management": True }) # Session management self.session_id = f"client_{datetime.now().strftime('%Y%m%d_%H%M%S')}" self.command_history = [] self.agent_cache = {} async def initialize(self) -> bool: """Initialize the client with server handshake.""" if not await self.connect(): return False try: # Get server info server_info = await self.get_server_info() self.logger.info(f"Connected to: {server_info.get('status', 'Unknown server')}") # Cache available agents agents_info = await self.get_agents() self.agent_cache = agents_info.get("agents", {}) self.logger.info(f"Available agents: {len(self.agent_cache)}") return True except Exception as e: self.logger.error(f"Failed to initialize client: {e}") return False async def send_command(self, command: str, documents: List[Dict[str, Any]] = None, rag_mode: bool = False) -> Dict[str, Any]: """Send a command with optional document context.""" params = { "session_id": self.session_id, "rag_mode": rag_mode } if documents: params["documents_context"] = documents # Store in history self.command_history.append({ "command": command, "timestamp": datetime.now().isoformat(), "documents_count": len(documents) if documents else 0 }) response = await self.process_command(command, **params) # Log response if response.get("status") == "success": self.logger.info(f"Command processed successfully: {command[:50]}...") else: self.logger.warning(f"Command failed: {response.get('message', 'Unknown error')}") return response async def upload_document(self, filename: str, content: str, doc_type: str = "text") -> Dict[str, Any]: """Upload a document for processing.""" document = { "filename": filename, "content": content, "type": doc_type, "uploaded_at": datetime.now().isoformat() } return document async def analyze_document(self, filename: str, content: str, query: str = "analyze this document") -> Dict[str, Any]: """Analyze a document with a specific query.""" document = await self.upload_document(filename, content) return await self.send_command(query, documents=[document], rag_mode=True) async def ask_about_author(self, filename: str, content: str) -> Dict[str, Any]: """Ask about the author of a document.""" return await self.analyze_document(filename, content, "who is the author and what is the summary") async def get_weather(self, location: str) -> Dict[str, Any]: """Get weather information for a location.""" return await self.send_command(f"get weather for {location}") async def search_documents(self, query: str) -> Dict[str, Any]: """Search for documents.""" return await self.send_command(f"search for {query}") async def get_agent_status(self) -> Dict[str, Any]: """Get detailed agent status from server.""" try: response = await self.send_request("mcp/agents/status") return response.result or response.error except Exception as e: return {"status": "error", "message": str(e)} async def reload_agents(self) -> Dict[str, Any]: """Reload agents on the server.""" try: endpoint = f"{self.server_url}/api/mcp/agents/reload" async with self.session.post(endpoint) as response: if response.status == 200: result = await response.json() # Update agent cache agents_info = await self.get_agents() self.agent_cache = agents_info.get("agents", {}) return result else: return {"status": "error", "code": response.status} except Exception as e: return {"status": "error", "message": str(e)} async def test_document_analysis(self) -> Dict[str, Any]: """Test general document analysis functionality.""" sample_content = ''' Sample Document Content Content Type: Text Document Processing Confidence: 95% Text Content: "This is a sample document for testing the analysis capabilities of the MCP system." Additional Context: This document demonstrates the system's ability to process and analyze various types of content, extract meaningful information, and provide comprehensive responses through agent collaboration. ''' return await self.analyze_document("sample_document.txt", sample_content, "analyze this document") async def batch_process_documents(self, documents: List[Dict[str, Any]], query: str = "analyze these documents") -> List[Dict[str, Any]]: """Process multiple documents in batch.""" results = [] for doc in documents: try: result = await self.analyze_document( doc.get("filename", "unknown"), doc.get("content", ""), query ) results.append({ "document": doc.get("filename", "unknown"), "result": result, "status": "success" }) except Exception as e: results.append({ "document": doc.get("filename", "unknown"), "error": str(e), "status": "error" }) return results def get_session_info(self) -> Dict[str, Any]: """Get current session information.""" return { "session_id": self.session_id, "command_history_count": len(self.command_history), "cached_agents": len(self.agent_cache), "connection_status": self.get_connection_status(), "recent_commands": self.command_history[-5:] if self.command_history else [] } async def interactive_mode(self): """Start interactive mode for testing.""" print(f"šŸ¤– {self.client_name} - Interactive Mode") print("=" * 50) print("Commands:") print(" help - Show this help") print(" status - Show connection status") print(" agents - Show available agents") print(" test - Test document analysis") print(" weather <location> - Get weather") print(" quit - Exit interactive mode") print("=" * 50) while True: try: command = input("\n> ").strip() if command.lower() in ['quit', 'exit', 'q']: break elif command.lower() == 'help': print("Available commands: help, status, agents, test, weather <location>, quit") elif command.lower() == 'status': status = self.get_session_info() print(f"Session: {status['session_id']}") print(f"Commands sent: {status['command_history_count']}") print(f"Connected: {status['connection_status']['connected']}") elif command.lower() == 'agents': agents = await self.get_agents() print(f"Available agents: {agents.get('total_agents', 0)}") for agent_id, agent_info in agents.get('agents', {}).items(): print(f" • {agent_id}: {agent_info.get('name', 'Unknown')}") elif command.lower() == 'test': print("Testing document analysis...") result = await self.test_document_analysis() print(f"Result: {result.get('status', 'unknown')}") if 'comprehensive_answer' in result: print(f"Answer: {result['comprehensive_answer'][:100]}...") elif 'message' in result: print(f"Message: {result['message'][:100]}...") elif command.lower().startswith('weather '): location = command[8:].strip() if location: result = await self.get_weather(location) print(f"Weather result: {result.get('status', 'unknown')}") else: # Send as general command result = await self.send_command(command) print(f"Response: {result.get('status', 'unknown')}") if result.get('message'): print(f"Message: {result['message']}") except KeyboardInterrupt: break except Exception as e: print(f"Error: {e}") print("\nšŸ‘‹ Goodbye!") # Convenience function async def create_client(server_url: str = "http://localhost:8000") -> EnhancedMCPClient: """Create and initialize an enhanced MCP client.""" client = EnhancedMCPClient(server_url) if await client.initialize(): return client else: raise ConnectionError(f"Failed to connect to MCP server at {server_url}") if __name__ == "__main__": async def main(): try: client = await create_client() await client.interactive_mode() except Exception as e: print(f"Error: {e}") finally: if 'client' in locals(): await client.disconnect() asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tensorwhiz141/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server