Skip to main content
Glama

LiveKit RAG Assistant

by THENABILMAN
mcp_client.py8.62 kB
""" MCP Client for connecting to mcp_server_standard.py Provides a persistent connection interface for Streamlit application """ import asyncio import json import sys import os import logging from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager logger = logging.getLogger(__name__) class MCPClient: """Manages persistent connection to MCP server.""" def __init__(self): self.process = None self.session = None self._initialized = False @asynccontextmanager async def _get_session(self): """Context manager for MCP client session.""" try: from mcp.client.stdio import stdio_client, StdioServerParameters from mcp.client.session import ClientSession # Get path to server server_path = os.path.join(os.path.dirname(__file__), "mcp_server_standard.py") # Create server parameters server_params = StdioServerParameters( command=sys.executable, args=[server_path] ) # Connect to server via stdio async with stdio_client(server_params) as (read, write): logger.info("✅ Connected to MCP Server") # Create session async with ClientSession(read, write) as session: yield session except Exception as e: logger.error(f"Failed to connect to MCP server: {str(e)}") raise async def search_documentation( self, query: str, top_k: int = 4 ) -> List[str]: """ Search LiveKit documentation via MCP server. Args: query: Search query top_k: Number of results to return Returns: List of formatted documentation chunks """ try: async with self._get_session() as session: logger.info(f"Searching documentation for: {query}") # Call tool via MCP protocol response = await session.call_tool( name="search_documentation", arguments={ "query": query, "top_k": top_k } ) # Extract text from response if response and hasattr(response, 'content') and response.content: # Response is a list of TextContent objects text_content = response.content[0].text if hasattr(response.content[0], 'text') else str(response.content[0]) # If response is already good, return as-is if text_content.strip(): # Split by document separators and filter docs = [] for section in text_content.split("---"): section = section.strip() if section and len(section) > 20: docs.append(section) if docs: logger.info(f"✓ Retrieved {len(docs)} documentation chunks") return docs logger.warning(f"Empty or invalid response from documentation search: {text_content[:100]}") return [] else: logger.warning(f"No content in response: {response}") return [] except Exception as e: logger.error(f"Documentation search error: {str(e)}", exc_info=True) raise async def search_web( self, query: str, topic: str = "general" ) -> List[Dict[str, str]]: """ Search web via MCP server. Args: query: Search query topic: Search topic type ('general' or 'news') Returns: List of search results with title, url, content """ try: async with self._get_session() as session: logger.info(f"Searching web for: {query}") # Call tool via MCP protocol response = await session.call_tool( name="search_web", arguments={ "query": query, "topic": topic } ) # Extract text from response if response and hasattr(response, 'content') and response.content: text_content = response.content[0].text if hasattr(response.content[0], 'text') else str(response.content[0]) # Parse formatted results results = [] for result_section in text_content.split("---"): result_section = result_section.strip() if result_section and len(result_section) > 20: results.append(result_section) logger.info(f"✓ Retrieved {len(results)} web results") return results if results else [] else: logger.warning("Empty response from web search") return [] except Exception as e: logger.error(f"Web search error: {str(e)}") raise async def list_available_tools(self) -> List[Dict[str, Any]]: """List available MCP tools.""" try: async with self._get_client() as (read, write): tools = await read.list_tools() logger.info(f"✓ Available tools: {[t.name for t in tools]}") return tools except Exception as e: logger.error(f"Error listing tools: {str(e)}") raise # Global client instance (for Streamlit caching) _mcp_client: Optional[MCPClient] = None def get_mcp_client() -> MCPClient: """Get or create global MCP client instance.""" global _mcp_client if _mcp_client is None: _mcp_client = MCPClient() return _mcp_client # Streamlit-friendly wrapper functions def search_documentation_sync(query: str, top_k: int = 4) -> List[str]: """ Synchronous wrapper for documentation search. Use this in Streamlit apps. """ client = get_mcp_client() try: # Handle event loop for Streamlit compatibility try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.info(f"🔍 Searching for: {query}") # Run async function results = loop.run_until_complete( client.search_documentation(query, top_k) ) if results: logger.info(f"✅ Found {len(results)} results") else: logger.warning(f"⚠️ No results found for: {query}") return results except Exception as e: logger.error(f"Search failed: {str(e)}", exc_info=True) return [] def search_web_sync(query: str, topic: str = "general") -> List[Dict[str, str]]: """ Synchronous wrapper for web search. Use this in Streamlit apps. """ client = get_mcp_client() try: # Handle event loop for Streamlit compatibility try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.info(f"🌐 Web searching for: {query}") # Run async function results = loop.run_until_complete( client.search_web(query, topic) ) if results: logger.info(f"✅ Found {len(results)} web results") else: logger.warning(f"⚠️ No web results found for: {query}") return results except Exception as e: logger.error(f"Web search failed: {str(e)}", exc_info=True) return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/THENABILMAN/THENABILMAN_LiveKit_MCP_Assistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server