Skip to main content
Glama
main_window.py23.3 kB
""" MCP UI - Main Window Main window for the MCP Memory Server desktop application """ import logging import requests from typing import Dict, Any from PySide6.QtWidgets import ( QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QSplitter, QTextEdit, QLabel, QPushButton, QGroupBox, QTabWidget, QStatusBar, QMessageBox, ) from PySide6.QtCore import Qt, QTimer, Signal from PySide6.QtGui import QAction, QTextCursor from .widgets import ( GenericMemoryBrowserWidget, AgentManagerWidget, SessionManagerWidget ) from .widgets.enhanced_conversation_widget import EnhancedConversationWidget from .widgets.notification_panel import NotificationPanel from .services.realtime_service import RealtimeService from .services.notification_service import NotificationService from .services.data_export_import_service import DataExportImportService from .services.conversation_service import ConversationService from .services.session_service import SessionService from .dialogs.data_export_import_dialogs import show_export_dialog, show_import_dialog logger = logging.getLogger(__name__) class ServerWidget(QWidget): """Enhanced server connection widget with reconnection logic""" connection_status_changed = Signal(bool) def __init__(self, config: Dict[str, Any]): super().__init__() self.config = config self.server_url = ( f"http://{config['server']['host']}:{config['server']['port']}" ) self.connected = False self.reconnecting = False # Reconnection settings self.retry_attempts = 0 self.max_retry_attempts = 5 self.base_retry_delay = 2 # seconds self.current_retry_delay = self.base_retry_delay self.consecutive_failures = 0 self.last_successful_connection = None # Setup timers first, then UI (since UI setup calls check_connection) self.setup_timer() self.setup_ui() def setup_ui(self): """Set up the server widget UI""" layout = QVBoxLayout(self) # Connection status with info status_layout = QHBoxLayout() self.status_label = QLabel("Server: Checking...") self.status_label.setStyleSheet("QLabel { font-weight: bold; }") status_layout.addWidget(self.status_label) # Connect button self.connect_btn = QPushButton("Connect") self.connect_btn.setMaximumWidth(100) self.connect_btn.clicked.connect(self.manual_reconnect) status_layout.addWidget(self.connect_btn) layout.addLayout(status_layout) # Server URL url_label = QLabel(f"URL: {self.server_url}") layout.addWidget(url_label) # Connection info self.info_label = QLabel("") self.info_label.setStyleSheet("QLabel { color: gray; font-size: 10px; }") layout.addWidget(self.info_label) # Server log log_group = QGroupBox("Server Status") log_layout = QVBoxLayout(log_group) self.log_text = QTextEdit() self.log_text.setMaximumHeight(200) self.log_text.setReadOnly(True) log_layout.addWidget(self.log_text) layout.addWidget(log_group) # Initial connection check self.check_connection() def setup_timer(self): """Set up timer for periodic connection checks""" self.timer = QTimer() self.timer.timeout.connect(self.check_connection) self.timer.start(5000) # Check every 5 seconds (more responsive) # Separate timer for reconnection attempts self.reconnect_timer = QTimer() self.reconnect_timer.setSingleShot(True) self.reconnect_timer.timeout.connect(self.attempt_reconnection) def check_connection(self): """Check server connection status""" try: response = requests.get(f"{self.server_url}/health", timeout=3) if response.status_code == 200: # Connection successful if not self.connected or self.reconnecting: # We've recovered from a disconnection self.reset_retry_state() self.log_message("✅ Connection restored!") self.set_connected(True) data = response.json() status = data.get("status", "OK") self.log_message(f"✓ Server healthy: {status}") # Update last successful connection time import datetime self.last_successful_connection = datetime.datetime.now() else: error_msg = f"Server error: {response.status_code}" self.handle_connection_failure(error_msg) except Exception as e: self.handle_connection_failure(f"Connection failed: {str(e)}") def handle_connection_failure(self, error_message: str): """Handle connection failure with smart reconnection logic""" self.consecutive_failures += 1 if self.connected: # First failure - log and mark as disconnected self.set_connected(False) self.log_message(f"✗ {error_message}") if not self.reconnecting and self.retry_attempts < self.max_retry_attempts: # Start reconnection process self.start_reconnection_process() # Only log every few attempts to avoid spam if self.consecutive_failures == 1 or self.consecutive_failures % 3 == 0: self.log_message(f"✗ {error_message}") def start_reconnection_process(self): """Start the reconnection process""" if self.retry_attempts >= self.max_retry_attempts: msg = "❌ Max reconnection attempts exceeded. Manual reconnection required." self.log_message(msg) self.set_reconnecting(False) return self.set_reconnecting(True) self.retry_attempts += 1 # Calculate exponential backoff delay (max 30 seconds) delay = self.base_retry_delay * (2 ** (self.retry_attempts - 1)) self.current_retry_delay = min(delay, 30) msg = ( f"🔄 Attempting reconnection {self.retry_attempts}/" f"{self.max_retry_attempts} in {self.current_retry_delay}s..." ) self.log_message(msg) # Start reconnection timer self.reconnect_timer.start(int(self.current_retry_delay * 1000)) def attempt_reconnection(self): """Attempt to reconnect (called by timer)""" self.log_message(f"🔄 Reconnection attempt {self.retry_attempts}...") self.check_connection() if not self.connected: # This attempt failed, try again QTimer.singleShot(1000, self.start_reconnection_process) def manual_reconnect(self): """Manual reconnection triggered by user""" self.reset_retry_state() self.log_message("🔄 Manual reconnection requested...") self.check_connection() def reset_retry_state(self): """Reset the retry state after successful connection""" self.retry_attempts = 0 self.current_retry_delay = self.base_retry_delay self.consecutive_failures = 0 self.set_reconnecting(False) if self.reconnect_timer.isActive(): self.reconnect_timer.stop() def set_reconnecting(self, reconnecting: bool): """Update reconnecting status""" self.reconnecting = reconnecting self.update_ui_state() def update_ui_state(self): """Update UI based on current connection state""" import datetime current_time = datetime.datetime.now().strftime("%H:%M:%S") if self.connected: self.status_label.setText("Server: Connected") self.status_label.setStyleSheet( "QLabel { color: green; font-weight: bold; }" ) self.connect_btn.setText("Reconnect") self.info_label.setText(f"Last check: {current_time}") elif self.reconnecting: self.status_label.setText("Server: Reconnecting...") self.status_label.setStyleSheet( "QLabel { color: orange; font-weight: bold; }" ) self.connect_btn.setText("Retry Now") if self.retry_attempts > 0: retry_info = ( f"Retry {self.retry_attempts}/" f"{self.max_retry_attempts} - " f"Next in {self.current_retry_delay}s" ) self.info_label.setText(retry_info) else: self.status_label.setText("Server: Disconnected") self.status_label.setStyleSheet("QLabel { color: red; font-weight: bold; }") self.connect_btn.setText("Connect") if self.retry_attempts >= self.max_retry_attempts: msg = "Max retries exceeded - click Connect to retry" self.info_label.setText(msg) else: self.info_label.setText(f"Connection lost at {current_time}") def set_connected(self, connected: bool): """Update connection status""" if self.connected != connected: self.connected = connected self.update_ui_state() self.connection_status_changed.emit(connected) def log_message(self, message: str): """Add message to log with timestamp""" import datetime timestamp = datetime.datetime.now().strftime("%H:%M:%S") cursor = self.log_text.textCursor() cursor.movePosition(QTextCursor.End) cursor.insertText(f"[{timestamp}] {message}\n") self.log_text.setTextCursor(cursor) # Keep log size manageable if self.log_text.document().lineCount() > 100: cursor.movePosition(QTextCursor.Start) cursor.movePosition(QTextCursor.Down, QTextCursor.KeepAnchor, 10) cursor.removeSelectedText() class MCPMainWindow(QMainWindow): """Main window for MCP Memory Server UI""" def __init__(self, config: Dict[str, Any]): super().__init__() self.config = config # Set window properties self.setWindowTitle("MCP Memory Server UI") geometry = config["ui"]["window_geometry"] self.resize(geometry["width"], geometry["height"]) self.setMinimumSize(800, 600) # Initialize services self.setup_services() # Set up UI self.setup_menu() self.setup_central_widget() self.setup_status_bar() # Set up notification panel after services self.setup_notification_panel() # Connect real-time updates self.connect_realtime_services() logger.info("MCP main window initialized successfully") def setup_menu(self): """Set up menu bar""" menubar = self.menuBar() # File menu file_menu = menubar.addMenu("&File") new_action = QAction("&New Session", self) new_action.setShortcut("Ctrl+N") file_menu.addAction(new_action) file_menu.addSeparator() # Export/Import actions export_action = QAction("&Export Data...", self) export_action.setShortcut("Ctrl+E") export_action.triggered.connect(self.show_export_dialog) file_menu.addAction(export_action) import_action = QAction("&Import Data...", self) import_action.setShortcut("Ctrl+I") import_action.triggered.connect(self.show_import_dialog) file_menu.addAction(import_action) file_menu.addSeparator() exit_action = QAction("E&xit", self) exit_action.setShortcut("Ctrl+Q") exit_action.triggered.connect(self.close) file_menu.addAction(exit_action) # Settings menu settings_menu = menubar.addMenu("&Settings") notifications_action = QAction("&Notifications...", self) notifications_action.triggered.connect(self.show_notification_preferences) settings_menu.addAction(notifications_action) # Help menu help_menu = menubar.addMenu("&Help") about_action = QAction("&About", self) about_action.triggered.connect(self.show_about) help_menu.addAction(about_action) def setup_central_widget(self): """Set up the main UI layout""" central_widget = QWidget() self.setCentralWidget(central_widget) # Main horizontal splitter (three panels) main_splitter = QSplitter(Qt.Horizontal) # Left side - Server and Memory Browser left_widget = QTabWidget() # Server tab self.server_widget = ServerWidget(self.config) self.server_widget.connection_status_changed.connect( self.on_connection_status_changed ) server_tab_index = left_widget.addTab(self.server_widget, "Server") left_widget.setTabEnabled(server_tab_index, False) # DISABLED # Memory Browser - using new generic system (FUNCTIONAL) self.memory_browser = GenericMemoryBrowserWidget("generic") memory_tab_index = left_widget.addTab(self.memory_browser, "Memory") # Create dummy server_url for disabled tabs server_host = self.config["server"]["host"] server_port = self.config["server"]["port"] server_url = f"http://{server_host}:{server_port}" # Agent Manager tab - VISIBLE but DISABLED self.agent_manager = AgentManagerWidget(server_url) agent_tab_index = left_widget.addTab(self.agent_manager, "Agents") left_widget.setTabEnabled(agent_tab_index, False) # Session Manager tab - VISIBLE but DISABLED self.session_manager = SessionManagerWidget(server_url) session_tab_index = left_widget.addTab( self.session_manager, "Sessions" ) left_widget.setTabEnabled(session_tab_index, False) # Set Memory tab as the default active tab left_widget.setCurrentIndex(memory_tab_index) # Middle - Conversation (using enhanced conversation widget with services) self.conversation_widget = EnhancedConversationWidget( self.conversation_service, use_mcp=True ) # Right side - Notifications panel # (Will be initialized after services are set up) self.notification_panel_placeholder = QWidget() # Add to main splitter main_splitter.addWidget(left_widget) main_splitter.addWidget(self.conversation_widget) main_splitter.addWidget(self.notification_panel_placeholder) # Set splitter proportions (30% left, 50% middle, 20% right) main_splitter.setSizes([300, 500, 200]) # Add main splitter to central widget layout = QHBoxLayout(central_widget) layout.addWidget(main_splitter) def setup_notification_panel(self): """Set up the notification panel after services are initialized""" if hasattr(self, "notification_service"): self.notification_panel = NotificationPanel(self.notification_service) # Replace placeholder with actual notification panel parent_splitter = self.notification_panel_placeholder.parent() if parent_splitter: # Get the index of the placeholder for i in range(parent_splitter.count()): if ( parent_splitter.widget(i) == self.notification_panel_placeholder ): # noqa # Remove placeholder self.notification_panel_placeholder.deleteLater() # Insert notification panel at the same position parent_splitter.insertWidget(i, self.notification_panel) break def setup_status_bar(self): """Set up status bar""" self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("Ready - MCP Memory Server UI") def on_connection_status_changed(self, connected: bool): """Handle server connection status changes""" if connected: self.status_bar.showMessage("Connected to MCP Memory Server") self.conversation_widget.send_btn.setEnabled(True) else: self.status_bar.showMessage("Disconnected from server") self.conversation_widget.send_btn.setEnabled(False) def show_about(self): """Show about dialog""" QMessageBox.about( self, "About MCP Memory Server UI", "MCP Memory Server UI\n\n" "A desktop interface for the MCP Memory Server.\n" "Built with PySide6 for vector memory management and agent operations.", ) def setup_services(self): """Initialize real-time and notification services""" server_config = self.config["server"] server_url = f"http://{server_config['host']}:{server_config['port']}" ws_url = f"ws://{server_config['host']}:{server_config['port']}" # Initialize services self.session_service = SessionService(server_url) self.conversation_service = ConversationService(self.session_service) self.realtime_service = RealtimeService(ws_url) self.notification_service = NotificationService(self) self.data_export_import_service = DataExportImportService(server_url) logger.info("Services initialized successfully") def connect_realtime_services(self): """Connect real-time service signals to UI updates""" # Connect realtime service to notification service self.realtime_service.notification_received.connect( self.notification_service.show_notification ) # Connect session updates self.realtime_service.session_updated.connect(self.on_session_updated) # Connect memory updates self.realtime_service.memory_updated.connect(self.on_memory_updated) # Connect agent status changes self.realtime_service.agent_status_changed.connect(self.on_agent_status_changed) # Connect server status changes self.realtime_service.server_status_changed.connect( self.on_server_status_changed ) # Connect session manager signals - DISABLED for memory testing # if hasattr(self, "session_manager"): # logger.info("Connecting session manager signals to conversation widget") # self.session_manager.session_started.connect( # lambda config: self._on_session_started_for_conversation(config) # ) # self.session_manager.session_ended.connect( # lambda session_id: self._on_session_ended_for_conversation(session_id) # ) # else: # logger.warning( # "No session_manager found - conversation widget won't receive session updates!" # ) logger.info("Real-time service connections established") def _on_session_started_for_conversation(self, config: dict): """Handle session started signal for conversation widget""" session_id = config.get("session_id") or config.get("name") logger.info(f"Main Window: Session started callback - session_id: {session_id}") logger.info(f"Main Window: Full config: {config}") if session_id: logger.info( f"Main Window: Setting conversation widget session_id to: {session_id}" ) self.conversation_widget.set_session_id(session_id) # CRITICAL FIX: Start the conversation in the service logger.info( f"Main Window: Starting conversation service for session: {session_id}" ) self.conversation_service.start_conversation(session_id, config) else: logger.warning("Session started but no session_id or name in config!") logger.warning(f"Config keys: {list(config.keys())}") def _on_session_ended_for_conversation(self, session_id: str): """Handle session ended signal for conversation widget""" logger.info(f"Main Window: Session ended callback - session_id: {session_id}") self.conversation_widget.set_session_id(None) def on_session_updated(self, session_id: str, update_data: dict): """Handle session update events""" # Session manager disabled for memory testing # if hasattr(self, "session_manager"): # # Update session manager if available # self.session_manager.refresh_sessions() # Update status bar with session status status = update_data.get("status", "unknown") self.status_bar.showMessage(f"Session {session_id}: {status}") def on_memory_updated(self, scope: str, update_data: dict): """Handle memory update events""" if hasattr(self, "memory_browser"): # Refresh memory browser self.memory_browser.refresh_memory() def on_agent_status_changed(self, agent_id: str, status_data: dict): """Handle agent status change events""" if hasattr(self, "agent_manager"): # Update agent manager if available pass # Agent manager can implement refresh if needed def on_server_status_changed(self, status_data: dict): """Handle server status change events""" server_status = status_data.get("status", "unknown") self.status_bar.showMessage(f"Server status: {server_status}") def connect_to_session(self, session_id: str): """Connect to a specific session for real-time updates""" self.realtime_service.connect_to_session(session_id) log_msg = f"Connected to real-time updates for session: {session_id}" logger.info(log_msg) def disconnect_from_session(self, session_id: str): """Disconnect from session real-time updates""" self.realtime_service.disconnect_from_session(session_id) logger.info(f"Disconnected from session: {session_id}") def show_notification_preferences(self): """Show notification preferences dialog""" self.notification_service.show_preferences_dialog() def show_export_dialog(self): """Show data export dialog""" try: show_export_dialog(self.data_export_import_service, self) except Exception as e: logger.error(f"Failed to show export dialog: {e}") QMessageBox.critical(self, "Error", f"Failed to open export dialog:\n{e}") def show_import_dialog(self): """Show data import dialog""" try: show_import_dialog(self.data_export_import_service, self) except Exception as e: logger.error(f"Failed to show import dialog: {e}") QMessageBox.critical(self, "Error", f"Failed to open import dialog:\n{e}") def closeEvent(self, event): """Handle window close event""" # Clean up services if hasattr(self, "realtime_service"): self.realtime_service.disconnect_all() logger.info("MCP main window closing") super().closeEvent(event)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server