Skip to main content
Glama

MCP Server

by DPoitrast
interactive_agent.py21.3 kB
#!/usr/bin/env python3 """Interactive MCP Agent with OpenAI integration.""" import os import sys import json import asyncio import readline from typing import Dict, List, Optional, Any from datetime import datetime import argparse # Add the project root to the Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from agent.mcp_agent import MCPAgent class InteractiveAgent: """Interactive agent with conversation management and commands.""" def __init__( self, base_url: str = "http://localhost:8000", openai_api_key: Optional[str] = None, username: str = "johndoe", password: str = "secret", enable_streaming: bool = True ): self.base_url = base_url self.username = username self.password = password self.conversation_history: List[Dict[str, str]] = [] self.session_data: Dict[str, Any] = {} self.access_token: Optional[str] = None self.enable_streaming = enable_streaming self.session_file = os.path.expanduser('~/.mcp_agent_session.json') # Initialize the agent self.agent = MCPAgent( base_url=base_url, openai_api_key=openai_api_key or os.getenv("OPENAI_API_KEY") ) # Setup readline for better input handling self._setup_readline() # Available commands self.commands = { '/help': self._cmd_help, '/quit': self._cmd_quit, '/exit': self._cmd_quit, '/clear': self._cmd_clear, '/history': self._cmd_history, '/login': self._cmd_login, '/tools': self._cmd_tools, '/status': self._cmd_status, '/save': self._cmd_save, '/load': self._cmd_load, '/execute': self._cmd_execute, '/chat': self._cmd_chat_mode, '/smart': self._cmd_smart_mode, '/stream': self._cmd_toggle_streaming, '/session': self._cmd_session_info, '/model': self._cmd_model_info, } self.chat_mode = True # Default to chat mode self.current_model = "gpt-4o-mini" # Default model # Load previous session if available self._load_session() def _setup_readline(self): """Setup readline for command history and completion.""" try: # Enable tab completion readline.parse_and_bind('tab: complete') # Set completion function readline.set_completer(self._completer) # Load history if it exists history_file = os.path.expanduser('~/.mcp_agent_history') if os.path.exists(history_file): readline.read_history_file(history_file) except Exception: # If readline is not available, continue without it pass def _completer(self, text: str, state: int) -> Optional[str]: """Tab completion for commands and tools.""" options = [] if text.startswith('/'): # Command completion options = [cmd for cmd in self.commands.keys() if cmd.startswith(text)] else: # Tool name completion tools = self.agent.get_available_tools() tool_names = [tool.get('name', '') for tool in tools] options = [name for name in tool_names if name.startswith(text)] try: return options[state] except IndexError: return None def _save_history(self): """Save command history.""" try: history_file = os.path.expanduser('~/.mcp_agent_history') readline.write_history_file(history_file) except Exception: pass def _load_session(self): """Load previous session data.""" try: if os.path.exists(self.session_file): with open(self.session_file, 'r') as f: session_data = json.load(f) # Restore conversation history if recent (within 24 hours) timestamp = session_data.get('timestamp') if timestamp: session_time = datetime.fromisoformat(timestamp) if (datetime.now() - session_time).total_seconds() < 86400: # 24 hours self.conversation_history = session_data.get('conversation_history', []) self.session_data = session_data.get('session_data', {}) except Exception: # If loading fails, start fresh pass def _save_session(self): """Save current session data.""" try: session_data = { 'timestamp': datetime.now().isoformat(), 'conversation_history': self.conversation_history, 'session_data': self.session_data, 'chat_mode': self.chat_mode, 'enable_streaming': self.enable_streaming } with open(self.session_file, 'w') as f: json.dump(session_data, f, indent=2) except Exception: pass async def _get_access_token(self) -> bool: """Get access token for authentication.""" import requests try: response = requests.post( f"{self.base_url}/api/v1/token", data={ "username": self.username, "password": self.password }, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if response.status_code == 200: token_data = response.json() self.access_token = token_data.get("access_token") print(f"✓ Logged in as {self.username}") return True else: print(f"❌ Login failed: {response.status_code}") return False except Exception as e: print(f"❌ Login error: {e}") return False def _print_welcome(self): """Print welcome message.""" print("🤖 Interactive MCP Agent") print("=" * 50) print(f"Connected to: {self.base_url}") print(f"OpenAI Available: {'✓' if self.agent.openai_client else '❌'}") print(f"MCP Tools: {len(self.agent.get_available_tools())}") print() print("Type '/help' for commands or just start chatting!") print("Use '/quit' to exit") print("-" * 50) def _cmd_help(self, args: str = "") -> str: """Show help information.""" help_text = """ 🤖 Interactive MCP Agent Commands: CONVERSATION: /chat - Switch to chat mode (talk to OpenAI) /smart - Switch to smart mode (AI + MCP operations) /stream - Toggle streaming responses /clear - Clear conversation history /history - Show conversation history MCP OPERATIONS: /tools - List available MCP tools /execute <tool> - Execute an MCP tool /status - Show agent status SESSION MANAGEMENT: /session - Show session information /model [name] - Show/change OpenAI model (default: gpt-4o-mini) /login - Re-authenticate with the server /save <file> - Save conversation to file /load <file> - Load conversation from file SYSTEM: /help - Show this help /quit, /exit - Exit the agent MODES: - Chat Mode (💬): Direct conversation with OpenAI - Smart Mode (🧠): AI interprets requests and executes MCP operations FEATURES: - 📡 Streaming: Real-time response streaming (toggle with /stream) - 💾 Auto-save: Sessions automatically saved and restored - 🔄 Tab completion: Command and tool name completion Just type normally to chat with the agent! """ return help_text.strip() def _cmd_quit(self, args: str = "") -> str: """Quit the interactive agent.""" self._save_history() print("\n👋 Goodbye!") sys.exit(0) def _cmd_clear(self, args: str = "") -> str: """Clear conversation history.""" self.conversation_history = [] return "✓ Conversation history cleared" def _cmd_history(self, args: str = "") -> str: """Show conversation history.""" if not self.conversation_history: return "No conversation history" history_text = "\n📋 Conversation History:\n" + "-" * 30 + "\n" for i, msg in enumerate(self.conversation_history): role = msg['role'].capitalize() content = msg['content'][:100] + "..." if len(msg['content']) > 100 else msg['content'] history_text += f"{i+1}. {role}: {content}\n" return history_text async def _cmd_login(self, args: str = "") -> str: """Re-authenticate with the server.""" if await self._get_access_token(): return "✓ Successfully re-authenticated" else: return "❌ Authentication failed" def _cmd_tools(self, args: str = "") -> str: """List available MCP tools.""" tools = self.agent.get_available_tools() tools_text = f"\n🔧 Available MCP Tools ({len(tools)} total):\n" + "-" * 40 + "\n" for tool in tools[:20]: # Show first 20 tools name = tool.get('name', 'Unknown') desc = tool.get('description', 'No description') tools_text += f"• {name}\n {desc}\n\n" if len(tools) > 20: tools_text += f"... and {len(tools) - 20} more tools\n" return tools_text def _cmd_status(self, args: str = "") -> str: """Show agent status.""" status = { "Agent": "✓ Active", "OpenAI": "✓ Connected" if self.agent.openai_client else "❌ Not connected", "MCP Server": self.base_url, "Authentication": "✓ Logged in" if self.access_token else "❌ Not logged in", "Tools Available": len(self.agent.get_available_tools()), "Conversation Messages": len(self.conversation_history), "Mode": "Smart Mode" if not self.chat_mode else "Chat Mode" } status_text = "\n📊 Agent Status:\n" + "-" * 20 + "\n" for key, value in status.items(): status_text += f"{key}: {value}\n" return status_text def _cmd_save(self, args: str = "") -> str: """Save conversation to file.""" filename = args.strip() or f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" try: data = { "timestamp": datetime.now().isoformat(), "conversation_history": self.conversation_history, "session_data": self.session_data } with open(filename, 'w') as f: json.dump(data, f, indent=2) return f"✓ Conversation saved to {filename}" except Exception as e: return f"❌ Failed to save: {e}" def _cmd_load(self, args: str = "") -> str: """Load conversation from file.""" filename = args.strip() if not filename: return "❌ Please specify a filename" try: with open(filename, 'r') as f: data = json.load(f) self.conversation_history = data.get("conversation_history", []) self.session_data = data.get("session_data", {}) return f"✓ Conversation loaded from {filename} ({len(self.conversation_history)} messages)" except Exception as e: return f"❌ Failed to load: {e}" async def _cmd_execute(self, args: str = "") -> str: """Execute an MCP tool directly.""" if not args: return "❌ Please specify a tool name" if not self.access_token: if not await self._get_access_token(): return "❌ Authentication required" try: tool_name = args.strip() result = self.agent.execute_operation(tool_name, self.access_token) return f"✓ Tool executed successfully:\n{json.dumps(result, indent=2)}" except Exception as e: return f"❌ Tool execution failed: {e}" def _cmd_chat_mode(self, args: str = "") -> str: """Switch to chat mode.""" self.chat_mode = True return "✓ Switched to Chat Mode - Direct conversation with OpenAI" def _cmd_smart_mode(self, args: str = "") -> str: """Switch to smart mode.""" self.chat_mode = False return "✓ Switched to Smart Mode - AI will execute MCP operations" def _cmd_toggle_streaming(self, args: str = "") -> str: """Toggle streaming mode.""" self.enable_streaming = not self.enable_streaming status = "enabled" if self.enable_streaming else "disabled" return f"✓ Streaming {status}" def _cmd_session_info(self, args: str = "") -> str: """Show session information.""" session_info = f""" 📱 Session Information: - Messages in history: {len(self.conversation_history)} - Mode: {'Chat' if self.chat_mode else 'Smart'} - Model: {self.current_model} - Streaming: {'enabled' if self.enable_streaming else 'disabled'} - Session file: {self.session_file} - Auto-save: enabled """ return session_info.strip() def _cmd_model_info(self, args: str = "") -> str: """Show or change the current model.""" args = args.strip() if not args: # Show current model info available_models = [ "gpt-4o-mini", "gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo" ] model_info = f""" 🤖 Model Information: - Current model: {self.current_model} - Available models: {', '.join(available_models)} Usage: /model <model_name> to switch models Example: /model gpt-4o """ return model_info.strip() else: # Change model new_model = args.lower() # Simple validation valid_models = ["gpt-4o-mini", "gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"] if new_model in valid_models: old_model = self.current_model self.current_model = new_model return f"✓ Model changed from {old_model} to {new_model}" else: return f"❌ Invalid model: {new_model}. Available: {', '.join(valid_models)}" async def _process_command(self, user_input: str) -> str: """Process a command.""" parts = user_input.split(' ', 1) command = parts[0] args = parts[1] if len(parts) > 1 else "" if command in self.commands: cmd_func = self.commands[command] if asyncio.iscoroutinefunction(cmd_func): return await cmd_func(args) else: return cmd_func(args) else: return f"❌ Unknown command: {command}. Type '/help' for available commands." async def _process_conversation(self, user_input: str): """Process a conversation message with streaming support.""" if not self.agent.openai_client: return "❌ OpenAI not available. Please set OPENAI_API_KEY environment variable." try: if self.chat_mode: # Direct chat with OpenAI result = self.agent.chat_with_openai( user_message=user_input, conversation_history=self.conversation_history, system_prompt="You are a helpful assistant for an MCP (Model Context Protocol) system. Be conversational and helpful.", model=self.current_model, stream=self.enable_streaming ) if result.get("is_streaming") and self.enable_streaming: # Handle streaming response print("🤖 Agent: ", end="", flush=True) full_response = "" try: for chunk in result["stream"]: print(chunk, end="", flush=True) full_response += chunk print() # New line after streaming # Update conversation history manually since streaming doesn't do it automatically self.conversation_history.append({"role": "user", "content": user_input}) self.conversation_history.append({"role": "assistant", "content": full_response}) return None # Indicate streaming was handled except Exception as e: print(f"\n❌ Streaming error: {e}") return f"❌ Streaming error: {e}" else: # Non-streaming response self.conversation_history = result["conversation_history"] return result["response"] else: # Smart mode - AI + MCP operations (no streaming for now) if not self.access_token: await self._get_access_token() if not self.access_token: return "❌ Authentication required for MCP operations" result = self.agent.intelligent_mcp_query( user_request=user_input, token=self.access_token, conversation_history=self.conversation_history ) self.conversation_history = result["conversation_history"] response = result["response"] if result.get("action_taken"): response += f"\n\n🔧 Executed: {result['action_taken']['tool']}" return response except Exception as e: return f"❌ Error: {e}" async def run(self): """Run the interactive agent.""" self._print_welcome() # Try to authenticate if not await self._get_access_token(): print("⚠️ Authentication failed - some features may be limited") print() try: while True: try: # Show current mode in prompt mode_indicator = "💬" if self.chat_mode else "🧠" stream_indicator = "📡" if self.enable_streaming else "" user_input = input(f"{mode_indicator}{stream_indicator} You: ").strip() if not user_input: continue # Process commands or conversation if user_input.startswith('/'): response = await self._process_command(user_input) print(f"🤖 Agent: {response}") else: response = await self._process_conversation(user_input) # Only print response if it wasn't handled by streaming if response is not None: print(f"🤖 Agent: {response}") print() # Auto-save session periodically self._save_session() except KeyboardInterrupt: print("\n\nUse '/quit' to exit gracefully.") continue except EOFError: break finally: self._save_history() async def main(): """Main function.""" parser = argparse.ArgumentParser(description="Interactive MCP Agent with OpenAI") parser.add_argument("--url", default="http://localhost:8000", help="MCP server URL") parser.add_argument("--username", default="johndoe", help="Username for authentication") parser.add_argument("--password", default="secret", help="Password for authentication") parser.add_argument("--openai-key", help="OpenAI API key (or set OPENAI_API_KEY env var)") args = parser.parse_args() agent = InteractiveAgent( base_url=args.url, openai_api_key=args.openai_key, username=args.username, password=args.password ) await agent.run() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n👋 Goodbye!") sys.exit(0)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DPoitrast/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server