Skip to main content
Glama

Pure Agentic MCP Server

by adamsalah13
streamlit_app.py17.3 kB
""" Streamlit App for Pure Agentic MCP Server Clean interface for interacting with all MCP tools via HTTP """ import streamlit as st import asyncio import httpx import json import logging from typing import Optional, Dict, List # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration DEFAULT_SERVER_URL = "http://localhost:8000" async def get_server_status(server_url: str = DEFAULT_SERVER_URL) -> Dict: """Get server status and available tools""" try: async with httpx.AsyncClient(timeout=10.0) as client: # Try ping first ping_response = await client.get(f"{server_url}/ping") ping_response.raise_for_status() # Get tools list tools_response = await client.get(f"{server_url}/tools") tools_response.raise_for_status() # Get agent status status_response = await client.get(f"{server_url}/agent/status") status_response.raise_for_status() return { "healthy": True, "ping": ping_response.json(), "tools": tools_response.json(), "agents": status_response.json() } except Exception as e: logger.error(f"Error getting server status: {e}") return {"healthy": False, "error": str(e)} async def call_tool(tool_name: str, arguments: Dict, server_url: str = DEFAULT_SERVER_URL) -> Dict: """Call a specific MCP tool""" try: async with httpx.AsyncClient(timeout=30.0) as client: payload = { "tool_name": tool_name, "arguments": arguments } response = await client.post(f"{server_url}/tools/call", json=payload) response.raise_for_status() return response.json() except httpx.ConnectError: return {"status": "error", "message": "Cannot connect to server. Start with: python simple_mcp_host.py"} except httpx.TimeoutException: return {"status": "error", "message": "Request timed out. Please try again."} except Exception as e: logger.error(f"Error calling tool {tool_name}: {e}") return {"status": "error", "message": str(e)} async def chat_with_ai(message: str, agent: str, server_url: str = DEFAULT_SERVER_URL) -> str: """Chat with AI agents (OpenAI or Ollama)""" tool_name = f"{agent}_chat" arguments = {"message": message} result = await call_tool(tool_name, arguments, server_url) if result.get("status") == "success": response_data = result.get("result", {}) if isinstance(response_data, dict): content = response_data.get("content", "") model = response_data.get("model", agent) return f"{content}\n\n_({model})_" return str(response_data) else: return f"❌ Error: {result.get('message', 'Unknown error')}" async def analyze_text(text: str, analysis_type: str = "general", server_url: str = DEFAULT_SERVER_URL) -> str: """Analyze text using available AI tools""" try: # Try the smart analyze endpoint first async with httpx.AsyncClient(timeout=30.0) as client: payload = { "text": text, "analysis_type": analysis_type } response = await client.post(f"{server_url}/analyze", json=payload) response.raise_for_status() result = response.json() if result.get("status") == "success": analysis_data = result.get("result", {}) if isinstance(analysis_data, dict): analysis = analysis_data.get("analysis", "") used_tool = result.get("used_tool", "unknown") return f"{analysis}\n\n_Analysis by: {used_tool}_" return str(analysis_data) else: return f"❌ Analysis Error: {result.get('message', 'Unknown error')}" except Exception as e: return f"❌ Analysis failed: {str(e)}" # Streamlit UI Configuration st.set_page_config( page_title="Agentic MCP Server", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" ) st.title("🤖 Pure Agentic MCP Server") st.markdown("**Clean interface for interacting with MCP tools organized by intelligent agents**") # Sidebar Configuration with st.sidebar: st.header("⚙️ Configuration") # Server URL server_url = st.text_input("Server URL:", value=DEFAULT_SERVER_URL) # Check server status if st.button("🔍 Check Server Status"): with st.spinner("Checking server..."): status = asyncio.run(get_server_status(server_url)) if status.get("healthy"): st.success("✅ Server is running") # Show agent status agent_status = status.get("agents", {}).get("agent_status", {}) if agent_status: st.subheader("🤖 Agents") for agent_name, info in agent_status.get("agents", {}).items(): icon = "🟢" if info.get("available") else "🔴" tool_count = info.get("tool_count", 0) st.write(f"{icon} **{agent_name.title()}**: {tool_count} tools") # Show available tools tools_data = status.get("tools", {}) if tools_data.get("tools"): st.subheader("🔧 Available Tools") tools = tools_data["tools"] st.write(f"Total: {len(tools)} tools") # Group tools by agent tool_groups = {} for tool in tools: tool_name = tool["name"] agent_name = tool_name.split("_")[0] if agent_name not in tool_groups: tool_groups[agent_name] = [] tool_groups[agent_name].append(tool_name) for agent, agent_tools in tool_groups.items(): st.write(f"**{agent.title()}**: {', '.join(agent_tools)}") else: st.error("❌ Server not responding") st.info("Start server with: `python simple_mcp_host.py`") if status.get("error"): st.error(f"Error: {status['error']}") # Main Interface Tabs tab1, tab2, tab3, tab4 = st.tabs(["💬 AI Chat", "📄 Text Analysis", "📁 File Operations", "🔧 Raw Tool Calls"]) # Tab 1: AI Chat with tab1: st.header("💬 Chat with AI Agents") # Agent selection col1, col2 = st.columns([3, 1]) with col1: agent = st.selectbox( "Select AI Agent:", ["openai", "ollama"], format_func=lambda x: {"openai": "🌐 OpenAI (Cloud)", "ollama": "🏠 Ollama (Local)"}[x] ) with col2: if st.button("🔄 Test Agent"): with st.spinner(f"Testing {agent}..."): test_result = asyncio.run(chat_with_ai("Hello! Just testing the connection.", agent, server_url)) st.text_area("Test Result:", test_result, height=100) # Chat interface if "chat_history" not in st.session_state: st.session_state.chat_history = [] # Chat input user_message = st.text_input("Your message:", placeholder=f"Ask {agent} anything...") col1, col2 = st.columns([1, 1]) with col1: if st.button("📨 Send Message", disabled=not user_message.strip()): if user_message.strip(): with st.spinner(f"🤔 {agent.title()} is thinking..."): response = asyncio.run(chat_with_ai(user_message, agent, server_url)) st.session_state.chat_history.append({ "user": user_message, "agent": agent, "response": response }) with col2: if st.button("🗑️ Clear Chat"): st.session_state.chat_history = [] # Display chat history if st.session_state.chat_history: st.subheader("📜 Chat History") for i, chat in enumerate(reversed(st.session_state.chat_history)): with st.expander(f"💬 {chat['agent'].title()}: {chat['user'][:50]}..."): st.markdown(f"**👤 You:** {chat['user']}") st.markdown(f"**🤖 {chat['agent'].title()}:**") st.markdown(chat['response']) # Tab 2: Text Analysis with tab2: st.header("📄 Intelligent Text Analysis") # Analysis input text_to_analyze = st.text_area( "Text to analyze:", placeholder="Enter text for AI-powered analysis...", height=150 ) analysis_type = st.selectbox( "Analysis Type:", ["general", "sentiment", "summary", "keywords", "classification"], format_func=lambda x: x.title() ) if st.button("🔍 Analyze Text", disabled=not text_to_analyze.strip()): if text_to_analyze.strip(): with st.spinner("🧠 Analyzing text..."): analysis = asyncio.run(analyze_text(text_to_analyze, analysis_type, server_url)) st.subheader("📊 Analysis Result") st.markdown(analysis) # Tab 3: File Operations with tab3: st.header("📁 File System Operations") # File operation selector operation = st.selectbox( "Select Operation:", ["list", "read", "info", "search"], format_func=lambda x: { "list": "📋 List Files", "read": "📖 Read File", "info": "ℹ️ File Info", "search": "🔍 Search Files" }[x] ) if operation == "list": col1, col2 = st.columns(2) with col1: path = st.text_input("Directory path:", value=".") pattern = st.text_input("File pattern:", value="*", help="e.g., *.py, *.txt") with col2: recursive = st.checkbox("Recursive", value=False) show_hidden = st.checkbox("Show hidden files", value=False) if st.button("📋 List Files"): with st.spinner("📁 Reading directory..."): result = asyncio.run(call_tool("file_list", { "path": path, "pattern": pattern, "recursive": recursive, "show_hidden": show_hidden }, server_url)) if result.get("status") == "success": file_data = result["result"] st.success(f"Found {file_data.get('total_files', 0)} files, {file_data.get('total_directories', 0)} directories") # Show directories if file_data.get("directories"): st.subheader("📁 Directories") for dir_info in file_data["directories"][:10]: # Limit display st.write(f"📁 {dir_info['name']}") # Show files if file_data.get("files"): st.subheader("📄 Files") for file_info in file_data["files"][:20]: # Limit display size_kb = file_info["size"] / 1024 st.write(f"📄 {file_info['name']} ({size_kb:.1f} KB)") else: st.error(f"Error: {result.get('message', 'Unknown error')}") elif operation == "read": file_path = st.text_input("File path:", placeholder="path/to/file.txt") if st.button("📖 Read File") and file_path: with st.spinner("📖 Reading file..."): result = asyncio.run(call_tool("file_read", {"path": file_path}, server_url)) if result.get("status") == "success": file_data = result["result"] st.success(f"File size: {file_data.get('size', 0)} bytes") st.text_area("File content:", file_data.get("content", ""), height=300) else: st.error(f"Error: {result.get('message', 'Unknown error')}") elif operation == "info": file_path = st.text_input("Path:", placeholder="path/to/file_or_directory") if st.button("ℹ️ Get Info") and file_path: with st.spinner("ℹ️ Getting file info..."): result = asyncio.run(call_tool("file_info", {"path": file_path}, server_url)) if result.get("status") == "success": info = result["result"] col1, col2 = st.columns(2) with col1: st.write(f"**Name:** {info.get('name', 'Unknown')}") st.write(f"**Type:** {'Directory' if info.get('is_directory') else 'File'}") st.write(f"**Size:** {info.get('size', 0)} bytes") with col2: st.write(f"**Exists:** {info.get('exists', False)}") st.write(f"**Permissions:** {info.get('permissions', 'Unknown')}") if info.get('extension'): st.write(f"**Extension:** {info['extension']}") else: st.error(f"Error: {result.get('message', 'Unknown error')}") # Tab 4: Raw Tool Calls with tab4: st.header("🔧 Raw MCP Tool Interface") st.markdown("Direct access to all MCP tools for advanced users") # Get available tools for dropdown if st.button("🔄 Refresh Tools"): with st.spinner("Getting available tools..."): status = asyncio.run(get_server_status(server_url)) if status.get("healthy"): tools_data = status.get("tools", {}) st.session_state.available_tools = tools_data.get("tools", []) else: st.error("Cannot get tools - server not responding") if "available_tools" not in st.session_state: st.session_state.available_tools = [] if st.session_state.available_tools: # Tool selection tool_names = [tool["name"] for tool in st.session_state.available_tools] selected_tool = st.selectbox("Select Tool:", tool_names) # Show tool info if selected_tool: tool_info = next((tool for tool in st.session_state.available_tools if tool["name"] == selected_tool), None) if tool_info: st.subheader(f"🔧 {selected_tool}") st.write(f"**Description:** {tool_info.get('description', 'No description')}") # Show schema schema = tool_info.get("inputSchema", {}) if schema.get("properties"): st.write("**Parameters:**") for prop_name, prop_info in schema["properties"].items(): required = "✓" if prop_name in schema.get("required", []) else " " st.write(f"- `{prop_name}` ({prop_info.get('type', 'unknown')}) {required} - {prop_info.get('description', '')}") # Arguments input st.subheader("📝 Tool Arguments") arguments_json = st.text_area( "Arguments (JSON):", placeholder='{"param1": "value1", "param2": "value2"}', height=100 ) # Call tool if st.button("⚡ Execute Tool"): if selected_tool and arguments_json: try: arguments = json.loads(arguments_json) if arguments_json.strip() else {} with st.spinner(f"⚡ Executing {selected_tool}..."): result = asyncio.run(call_tool(selected_tool, arguments, server_url)) st.subheader("📊 Result") if result.get("status") == "success": st.success("✅ Tool executed successfully") st.json(result.get("result", {})) else: st.error(f"❌ Tool execution failed: {result.get('message', 'Unknown error')}") except json.JSONDecodeError: st.error("❌ Invalid JSON in arguments") else: st.warning("⚠️ Please select a tool and provide arguments") else: st.info("📡 Click 'Refresh Tools' to load available tools") # Footer st.markdown("---") st.markdown(""" **Pure Agentic MCP Server** - Intelligent tool organization through domain experts 🤖 **AI Agents**: OpenAI (cloud) and Ollama (local) 📁 **File Agent**: Complete file system operations 🔧 **Raw Tools**: Direct MCP tool access """)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/adamsalah13/mcp_server_full'

If you have feedback or need assistance with the MCP directory API, please join our Discord server