Skip to main content
Glama

MCP Server

by DPoitrast
web_interface.py24 kB
#!/usr/bin/env python3 """Simple web interface for the interactive MCP agent.""" import os import sys import json import asyncio from typing import Dict, List, Optional, Any from datetime import datetime import argparse # Add the project root to the Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, Cookie from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import requests from agent.mcp_agent import MCPAgent class WebAgent: """Web-based interactive agent using MCP database.""" def __init__( self, base_url: str = "http://localhost:8000", openai_api_key: Optional[str] = None ): self.base_url = base_url self.agent = MCPAgent( base_url=base_url, openai_api_key=openai_api_key or os.getenv("OPENAI_API_KEY") ) # Store sessions in memory but authenticate with MCP database self.authenticated_sessions: Dict[str, Dict[str, Any]] = {} async def authenticate_user(self, username: str, password: str) -> Optional[str]: """Authenticate user with MCP database and return access token.""" try: response = requests.post( f"{self.base_url}/api/v1/token", data={ "username": username, "password": password }, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if response.status_code == 200: token_data = response.json() return token_data.get("access_token") else: return None except Exception as e: print(f"Authentication error: {e}") return None async def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """Verify token with MCP database and get user info.""" try: response = requests.get( f"{self.base_url}/api/v1/users/me", headers={"Authorization": f"Bearer {token}"} ) if response.status_code == 200: return response.json() else: return None except Exception as e: print(f"Token verification error: {e}") return None def get_session(self, session_id: str, user_id: str) -> Dict[str, Any]: """Get or create an authenticated session.""" session_key = f"{user_id}_{session_id}" if session_key not in self.authenticated_sessions: self.authenticated_sessions[session_key] = { "conversation_history": [], "chat_mode": True, "user_id": user_id, "session_id": session_id, "created_at": datetime.now().isoformat() } return self.authenticated_sessions[session_key] async def process_message( self, session_id: str, message: str, user_id: str, access_token: str, chat_mode: bool = True, stream: bool = False ) -> Dict[str, Any]: """Process a message in an authenticated session.""" session = self.get_session(session_id, user_id) if not self.agent.openai_client: return { "error": "OpenAI not available. Please set OPENAI_API_KEY environment variable." } try: if chat_mode: # Direct chat mode result = self.agent.chat_with_openai( user_message=message, conversation_history=session["conversation_history"], system_prompt="You are a helpful assistant for an MCP system. Be conversational and helpful.", model="gpt-4o-mini", stream=stream ) if result.get("is_streaming"): return {"stream": True, "generator": result["stream"]} else: session["conversation_history"] = result["conversation_history"] return { "response": result["response"], "usage": result.get("usage") } else: # Smart mode - use intelligent MCP query result = self.agent.intelligent_mcp_query( user_request=message, token=access_token, conversation_history=session["conversation_history"] ) session["conversation_history"] = result["conversation_history"] response = result["response"] if result.get("action_taken"): response += f"\n\n🔧 Executed: {result['action_taken']['tool']}" return { "response": response, "mcp_result": result.get("mcp_result"), "action_taken": result.get("action_taken") } except Exception as e: return {"error": f"Error: {e}"} # Global web agent instance web_agent = WebAgent() # FastAPI app app = FastAPI(title="Interactive MCP Agent Web Interface") # Security security = HTTPBearer() # HTML template HTML_TEMPLATE = """ <!DOCTYPE html> <html> <head> <title>Interactive MCP Agent</title> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1"> <style> body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; line-height: 1.6; } .container { max-width: 800px; margin: 0 auto; background: white; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); overflow: hidden; } .header { background: #007bff; color: white; padding: 20px; text-align: center; } .chat-container { height: 500px; overflow-y: auto; padding: 20px; border-bottom: 1px solid #eee; } .message { margin-bottom: 15px; padding: 10px; border-radius: 8px; } .user-message { background: #e3f2fd; margin-left: 50px; } .agent-message { background: #f1f8e9; margin-right: 50px; } .input-container { padding: 20px; display: flex; gap: 10px; } #messageInput { flex: 1; padding: 12px; border: 1px solid #ddd; border-radius: 5px; font-size: 16px; } #sendButton { padding: 12px 24px; background: #007bff; color: white; border: none; border-radius: 5px; cursor: pointer; font-size: 16px; } #sendButton:hover { background: #0056b3; } #sendButton:disabled { background: #ccc; cursor: not-allowed; } .status { text-align: center; padding: 10px; font-size: 14px; color: #666; } .error { color: #dc3545; background: #f8d7da; padding: 10px; border-radius: 5px; margin: 10px 0; } .typing { font-style: italic; color: #666; } </style> </head> <body> <div class="container"> <div class="header"> <h1>🤖 Interactive MCP Agent</h1> <p>Chat with the AI-powered MCP agent</p> <div id="userInfo" style="display: none;"> <small>Logged in as: <span id="username"></span> | <button onclick="logout()" style="background: none; border: none; color: white; cursor: pointer;">Logout</button></small> </div> </div> <!-- Login Form --> <div id="loginForm" class="input-container" style="flex-direction: column; gap: 15px;"> <h3 style="margin: 0; text-align: center;">Login to MCP Database</h3> <input type="text" id="usernameInput" placeholder="Username (johndoe)" style="width: 100%;"> <input type="password" id="passwordInput" placeholder="Password (secret)" style="width: 100%;"> <button id="loginButton" style="width: 100%;">Login</button> <small style="text-align: center; color: #666;"> Default users: johndoe/secret or alice/wonderland </small> </div> <!-- Chat Interface (hidden until logged in) --> <div id="chatInterface" style="display: none;"> <div class="chat-container" id="chatContainer"> <div class="message agent-message"> <strong>🤖 Agent:</strong> Hello! I'm your interactive MCP agent connected to the same database as the CLI. I can help you with various tasks and answer questions. What would you like to know? </div> </div> <div style="padding: 10px; border-bottom: 1px solid #eee; text-align: center;"> <label> <input type="radio" name="mode" value="chat" checked> 💬 Chat Mode </label> <label style="margin-left: 20px;"> <input type="radio" name="mode" value="smart"> 🧠 Smart Mode (MCP Operations) </label> </div> <div class="status" id="status"> Ready to chat </div> <div class="input-container"> <input type="text" id="messageInput" placeholder="Type your message here..." autofocus> <button id="sendButton">Send</button> </div> </div> </div> <script> class MCPAgent { constructor() { this.sessionId = this.generateSessionId(); this.accessToken = null; this.userInfo = null; // UI elements this.chatContainer = document.getElementById('chatContainer'); this.messageInput = document.getElementById('messageInput'); this.sendButton = document.getElementById('sendButton'); this.status = document.getElementById('status'); this.loginForm = document.getElementById('loginForm'); this.chatInterface = document.getElementById('chatInterface'); this.usernameInput = document.getElementById('usernameInput'); this.passwordInput = document.getElementById('passwordInput'); this.loginButton = document.getElementById('loginButton'); this.setupEventListeners(); this.checkExistingAuth(); } generateSessionId() { return 'session_' + Math.random().toString(36).substr(2, 9); } setupEventListeners() { // Chat events this.sendButton.addEventListener('click', () => this.sendMessage()); this.messageInput.addEventListener('keypress', (e) => { if (e.key === 'Enter') this.sendMessage(); }); // Login events this.loginButton.addEventListener('click', () => this.login()); this.usernameInput.addEventListener('keypress', (e) => { if (e.key === 'Enter') this.login(); }); this.passwordInput.addEventListener('keypress', (e) => { if (e.key === 'Enter') this.login(); }); } checkExistingAuth() { const token = localStorage.getItem('mcp_access_token'); if (token) { this.accessToken = token; this.showChatInterface(); } } async login() { const username = this.usernameInput.value.trim(); const password = this.passwordInput.value.trim(); if (!username || !password) { alert('Please enter both username and password'); return; } this.loginButton.disabled = true; this.loginButton.textContent = 'Logging in...'; try { const response = await fetch('/login', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ username: username, password: password }) }); if (response.ok) { const data = await response.json(); this.accessToken = data.access_token; this.userInfo = data.user_info; // Store token localStorage.setItem('mcp_access_token', this.accessToken); this.showChatInterface(); } else { const error = await response.json(); alert(`Login failed: ${error.detail || 'Unknown error'}`); } } catch (error) { alert(`Login error: ${error.message}`); } finally { this.loginButton.disabled = false; this.loginButton.textContent = 'Login'; } } showChatInterface() { this.loginForm.style.display = 'none'; this.chatInterface.style.display = 'block'; if (this.userInfo) { document.getElementById('username').textContent = this.userInfo.username || 'User'; document.getElementById('userInfo').style.display = 'block'; } this.messageInput.focus(); } logout() { localStorage.removeItem('mcp_access_token'); this.accessToken = null; this.userInfo = null; this.loginForm.style.display = 'block'; this.chatInterface.style.display = 'none'; document.getElementById('userInfo').style.display = 'none'; // Clear form this.usernameInput.value = ''; this.passwordInput.value = ''; this.usernameInput.focus(); } async sendMessage() { const message = this.messageInput.value.trim(); if (!message || !this.accessToken) return; // Get selected mode const chatMode = document.querySelector('input[name="mode"]:checked').value === 'chat'; this.addMessage('user', message); this.messageInput.value = ''; this.setLoading(true); try { const response = await fetch('/chat', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.accessToken}` }, body: JSON.stringify({ session_id: this.sessionId, message: message, chat_mode: chatMode }) }); if (response.status === 401) { // Token expired, need to re-login this.logout(); alert('Session expired. Please log in again.'); return; } const data = await response.json(); if (data.error) { this.addMessage('agent', `❌ ${data.error}`, 'error'); } else { let responseText = data.response; if (data.action_taken) { responseText += `\\n\\n🔧 Executed: ${data.action_taken.tool}`; } this.addMessage('agent', responseText); } } catch (error) { this.addMessage('agent', `❌ Network error: ${error.message}`, 'error'); } finally { this.setLoading(false); } } addMessage(sender, content, className = '') { const messageDiv = document.createElement('div'); messageDiv.className = `message ${sender}-message ${className}`; const icon = sender === 'user' ? '👤' : '🤖'; const label = sender === 'user' ? 'You' : 'Agent'; messageDiv.innerHTML = `<strong>${icon} ${label}:</strong> ${content}`; this.chatContainer.appendChild(messageDiv); this.chatContainer.scrollTop = this.chatContainer.scrollHeight; } setLoading(loading) { this.sendButton.disabled = loading; this.status.textContent = loading ? 'Thinking...' : 'Ready to chat'; if (loading) { const typingDiv = document.createElement('div'); typingDiv.className = 'message agent-message typing'; typingDiv.id = 'typing'; typingDiv.innerHTML = '<strong>🤖 Agent:</strong> Typing...'; this.chatContainer.appendChild(typingDiv); } else { const typingDiv = document.getElementById('typing'); if (typingDiv) typingDiv.remove(); } this.chatContainer.scrollTop = this.chatContainer.scrollHeight; } } // Global functions function logout() { if (window.mcpAgent) { window.mcpAgent.logout(); } } // Initialize the agent when the page loads document.addEventListener('DOMContentLoaded', () => { window.mcpAgent = new MCPAgent(); }); </script> </body> </html> """ class LoginRequest(BaseModel): username: str password: str class ChatRequest(BaseModel): session_id: str message: str chat_mode: bool = True class AuthResponse(BaseModel): access_token: str token_type: str = "bearer" user_info: Dict[str, Any] async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): """Get current user from token.""" user_info = await web_agent.verify_token(credentials.credentials) if not user_info: raise HTTPException(status_code=401, detail="Invalid authentication credentials") return {"token": credentials.credentials, "user_info": user_info} @app.get("/", response_class=HTMLResponse) async def get_interface(): """Serve the web interface.""" return HTML_TEMPLATE @app.post("/login", response_model=AuthResponse) async def login_endpoint(request: LoginRequest): """Authenticate user with MCP database.""" try: access_token = await web_agent.authenticate_user(request.username, request.password) if not access_token: raise HTTPException(status_code=401, detail="Invalid username or password") # Get user info user_info = await web_agent.verify_token(access_token) if not user_info: raise HTTPException(status_code=401, detail="Authentication failed") return AuthResponse( access_token=access_token, user_info=user_info ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Authentication error: {e}") @app.post("/chat") async def chat_endpoint( request: ChatRequest, current_user: Dict[str, Any] = Depends(get_current_user) ): """Handle authenticated chat messages.""" try: user_info = current_user["user_info"] access_token = current_user["token"] result = await web_agent.process_message( session_id=request.session_id, message=request.message, user_id=str(user_info.get("id", user_info.get("username", "unknown"))), access_token=access_token, chat_mode=request.chat_mode ) return result except Exception as e: return {"error": f"Internal error: {e}"} @app.get("/status") async def status_endpoint(current_user: Dict[str, Any] = Depends(get_current_user)): """Get authenticated agent status.""" user_info = current_user["user_info"] return { "status": "operational", "openai_available": web_agent.agent.openai_client is not None, "tools_available": len(web_agent.agent.get_available_tools()), "active_sessions": len(web_agent.authenticated_sessions), "user": user_info.get("username", "unknown"), "database_connected": True # Since we're using the same MCP database } @app.get("/tools") async def tools_endpoint(current_user: Dict[str, Any] = Depends(get_current_user)): """Get available MCP tools (authenticated).""" tools = web_agent.agent.get_available_tools() user_info = current_user["user_info"] return { "tools": tools, "count": len(tools), "user": user_info.get("username", "unknown") } def main(): """Run the web interface.""" parser = argparse.ArgumentParser(description="Web interface for Interactive MCP Agent") parser.add_argument("--host", default="localhost", help="Host to bind to") parser.add_argument("--port", type=int, default=8080, help="Port to bind to") parser.add_argument("--mcp-url", default="http://localhost:8000", help="MCP server URL") parser.add_argument("--openai-key", help="OpenAI API key") args = parser.parse_args() # Initialize global web agent global web_agent web_agent = WebAgent( base_url=args.mcp_url, openai_api_key=args.openai_key ) print(f"🌐 Starting web interface on http://{args.host}:{args.port}") print(f"🔗 MCP Server: {args.mcp_url}") print(f"🤖 OpenAI Available: {web_agent.agent.openai_client is not None}") print(f"🔧 Tools Available: {len(web_agent.agent.get_available_tools())}") import uvicorn uvicorn.run(app, host=args.host, port=args.port) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DPoitrast/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server