Skip to main content
Glama
mm-repos

Azure AI Search MCP Server

by mm-repos
server.py23.8 kB
"""MCP Server implementation for Azure AI Search.""" import asyncio import logging import sys import time from typing import Any, Dict, List, Optional try: from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import ( GetPromptResult, Prompt, PromptArgument, PromptMessage, Resource, TextContent, Tool, ) from pydantic import AnyUrl MCP_AVAILABLE = True except ImportError: # Use stderr to avoid interfering with JSON-RPC stdout print( "MCP library not available. Please install with: pip install mcp", file=sys.stderr, ) MCP_AVAILABLE = False from langsmith import traceable from .chain import AzureSearchChain from .config import config from .prompt_manager import PromptManager # Set up logging to stderr to avoid interfering with JSON-RPC stdout logging.basicConfig( level=getattr(logging, config.mcp_server.log_level), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stderr)], ) logger = logging.getLogger(__name__) class AzureSearchMCPServer: """Azure AI Search MCP Server.""" def __init__(self): """Initialize the MCP server.""" self._init_warnings = [] # Check configuration before initializing components self._check_configuration() # Initialize components (even if config is incomplete - they'll handle it gracefully) self.search_chain = AzureSearchChain() self.prompt_manager = PromptManager() if MCP_AVAILABLE: self.server = Server(config.mcp_server.name) self._setup_handlers() def _check_configuration(self): """Check and report configuration issues.""" missing_azure = [] if not config.azure_search.endpoint: missing_azure.append("AZURE_SEARCH_ENDPOINT") if not config.azure_search.api_key: missing_azure.append("AZURE_SEARCH_API_KEY") if not config.azure_search.index_name: missing_azure.append("AZURE_SEARCH_INDEX_NAME") if missing_azure: warning = f"Azure Search configuration incomplete. Missing: {', '.join(missing_azure)}. Search functionality will be disabled." self._init_warnings.append(warning) print(f"[CONFIG_WARNING] {warning}", file=sys.stderr) if not config.gemini.api_key: warning = "Gemini API key not found. LLM-based formatting will be disabled." self._init_warnings.append(warning) print(f"[CONFIG_WARNING] {warning}", file=sys.stderr) def _setup_handlers(self): """Set up MCP server handlers.""" if not MCP_AVAILABLE: return @self.server.list_resources() async def handle_list_resources() -> List[Resource]: """List available resources.""" resources = [] # Add each tool as a resource tools = self._get_available_tools() for tool in tools: resources.append( Resource( uri=AnyUrl(f"tool://{tool.name}"), name=tool.name, description=tool.description, mimeType="application/json", ) ) # Add each prompt as a resource prompts = self._get_available_prompts() for prompt in prompts: resources.append( Resource( uri=AnyUrl(f"prompt://{prompt.name}"), name=prompt.name, description=prompt.description, mimeType="text/plain", ) ) return resources @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """List available tools.""" return self._get_available_tools() @self.server.list_prompts() async def handle_list_prompts() -> List[Prompt]: """List available prompts.""" return self._get_available_prompts() @self.server.get_prompt() async def handle_get_prompt(name: str, arguments: Dict[str, str]) -> GetPromptResult: """Get a specific prompt.""" try: if arguments is None: arguments = {} query = arguments.get("query", "") documents = arguments.get("documents", "") if name.startswith("format_"): # Extract format type format_type = name[7:] # Remove "format_" prefix # Get the prompt template for this format template = self.prompt_manager.get_prompt_template_for_format( format_type ) # Format the prompt with the provided arguments formatted_prompt = template.format_messages( query=query, documents=documents ) return GetPromptResult( description=f"Format documents in {format_type} style", messages=[ PromptMessage( role="user", content=TextContent( type="text", text=str(msg.content) if hasattr(msg, "content") else str(msg), ), ) for msg in formatted_prompt ], ) elif name.startswith("persona_"): # Extract persona type persona_type = name[8:] # Remove "persona_" prefix # Get persona information persona_info = self.prompt_manager.get_persona_info(persona_type) # Create basic template basic_template = f"""Acting as {persona_info["name"]}: {persona_info["description"]} Please respond to the following query with the characteristics and expertise of {persona_info["name"]}: <query> {query} </query> **Response:**""" return GetPromptResult( description=f"Persona prompt for {persona_info['name']}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=basic_template), ) ], ) else: error_text = f"Unknown prompt: {name}" return GetPromptResult( description="Error", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=error_text), ) ], ) except Exception as e: logger.error(f"Error getting prompt {name}: {str(e)}") error_text = f"Error getting prompt: {str(e)}" return GetPromptResult( description="Error", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=error_text), ) ], ) @self.server.call_tool() @traceable(name="mcp_tool_call") async def handle_call_tool( name: str, arguments: Dict[str, Any] ) -> List[TextContent]: """Handle tool calls.""" import time start_time = time.time() # Add a timeout wrapper to prevent hanging async def _execute_tool(): try: if name == "search_documents": # Check if configuration is valid before attempting search if self._init_warnings: warning_msg = ( "Search functionality is disabled due to configuration issues:\n" + "\n".join(f"• {w}" for w in self._init_warnings) ) return [ TextContent( type="text", text=f"❌ {warning_msg}\n\nPlease set the required environment variables and restart the server.", ) ] query = arguments["query"] top_k = arguments.get("top_k", 5) format_type = arguments.get("format_type", "structured") search_result = await self.search_chain.run( query, top_k=top_k, output_format=format_type ) result = search_result["context"] text_content = TextContent(type="text", text=result) return [text_content] elif name == "search_and_summarize": # Check if configuration is valid before attempting search if self._init_warnings: warning_msg = ( "Search functionality is disabled due to configuration issues:\n" + "\n".join(f"• {w}" for w in self._init_warnings) ) return [ TextContent( type="text", text=f"❌ {warning_msg}\n\nPlease set the required environment variables and restart the server.", ) ] query = arguments["query"] top_k = arguments.get("top_k", 5) search_result = await self.search_chain.run( query, top_k=top_k, output_format="summary" ) result = search_result["context"] return [TextContent(type="text", text=result)] elif name == "search_with_analysis": # Check if configuration is valid before attempting search if self._init_warnings: warning_msg = ( "Search functionality is disabled due to configuration issues:\n" + "\n".join(f"• {w}" for w in self._init_warnings) ) return [ TextContent( type="text", text=f"❌ {warning_msg}\n\nPlease set the required environment variables and restart the server.", ) ] query = arguments["query"] top_k = arguments.get("top_k", 5) search_result = await self.search_chain.run( query, top_k=top_k, output_format="analysis" ) result = search_result["context"] return [TextContent(type="text", text=result)] elif name == "get_document_context": document_ids = arguments["document_ids"] result = await self.search_chain.get_document_context_tool( document_ids ) return [TextContent(type="text", text=result)] else: error_msg = f"Unknown tool: {name}" logger.error(error_msg) return [TextContent(type="text", text=f"Error: {error_msg}")] except asyncio.CancelledError: elapsed = time.time() - start_time logger.warning(f"Tool call cancelled: {name} after {elapsed:.2f}s") raise # Re-raise to properly handle cancellation except Exception as e: print(f"!!! EXCEPTION MESSAGE: {str(e)} !!!", file=sys.stderr) logger.error(f"Critical error in tool {name}: {e}") raise # Run with timeout try: return await asyncio.wait_for(_execute_tool(), timeout=30.0) except asyncio.TimeoutError: elapsed = time.time() - start_time logger.error(f"Tool call timeout: {name} after {elapsed:.2f}s") return [ TextContent( type="text", text=f"Error: Tool call timed out after {elapsed:.2f}s. The search service may be experiencing delays.", ) ] except Exception as e: elapsed = time.time() - start_time logger.error(f"Tool call error: {name} after {elapsed:.2f}s: {e}") return [ TextContent(type="text", text=f"Error: Tool call failed: {str(e)}") ] def _get_available_tools(self) -> List[Tool]: """Get list of available tools.""" return [ Tool( name="search_documents", description="Search for relevant documents in Azure AI Search with AI-enhanced analysis and formatting", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The search query to find relevant documents", }, "top_k": { "type": "integer", "description": "Number of documents to return (default: 5)", "default": 5, }, "format_type": { "type": "string", "description": "Format type: 'summary' (AI-enhanced default), 'structured', or 'analysis'", "enum": ["summary", "structured", "analysis"], "default": "summary", }, }, "required": ["query"], }, ), Tool( name="search_and_summarize", description="Search for documents and provide a concise summary of findings", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The search query to find relevant documents", }, "top_k": { "type": "integer", "description": "Number of documents to return (default: 5)", "default": 5, }, }, "required": ["query"], }, ), Tool( name="search_with_analysis", description="Search for documents and provide detailed relevance analysis", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The search query to find relevant documents", }, "top_k": { "type": "integer", "description": "Number of documents to return (default: 5)", "default": 5, }, }, "required": ["query"], }, ), Tool( name="get_document_context", description="Retrieve specific documents by their IDs for detailed context", inputSchema={ "type": "object", "properties": { "document_ids": { "type": "string", "description": "Comma-separated list of document IDs to retrieve", }, "format_output": { "type": "boolean", "description": "Whether to format the output for readability (default: true)", "default": True, }, }, "required": ["document_ids"], }, ), ] def _get_available_prompts(self) -> List[Prompt]: """Get list of available prompts.""" return [ Prompt( name="search_query_optimization", description="Optimize search queries for better results", arguments=[ PromptArgument( name="query", description="The search query to optimize", required=True, ) ], ), Prompt( name="document_analysis", description="Analyze document relevance and extract insights", arguments=[ PromptArgument( name="documents", description="Documents to analyze", required=True, ), PromptArgument( name="query", description="The original search query", required=True, ), ], ), ] async def run_standalone_mode(self): """Run in standalone mode for testing without MCP.""" logger.info("Running in standalone mode (MCP not available)") # Simple command-line interface for testing print("Azure AI Search Server - Standalone Mode") print("Available commands:") print(" search <query> - Search for documents (structured format)") print(" summary <query> - Search and provide summary") print(" analysis <query> - Search with relevance analysis") print(" get <doc_ids> - Get document context") print(" quit - Exit") while True: try: command = input("\n> ").strip() if not command: continue if command.lower() in ["quit", "exit"]: break parts = command.split(" ", 1) if len(parts) < 2: print("Please provide a query or document IDs") continue cmd, query = parts if cmd == "search": search_result = await self.search_chain.run( query, output_format="structured" ) result = search_result["context"] print(f"\nSearch Results:\n{result}") elif cmd == "summary": search_result = await self.search_chain.run( query, output_format="summary" ) result = search_result["context"] print(f"\nSummary:\n{result}") elif cmd == "analysis": search_result = await self.search_chain.run( query, output_format="analysis" ) result = search_result["context"] print(f"\nAnalysis:\n{result}") elif cmd == "get": result = await self.search_chain.get_document_context_tool(query) print(f"\nDocument Context:\n{result}") else: print( "Unknown command. Use 'search', 'summary', 'analysis', 'get', or 'quit'" ) except KeyboardInterrupt: break except Exception as e: print(f"Error: {e}") await self.search_chain.close() async def run_mcp_mode(self): """Run in MCP mode.""" if not MCP_AVAILABLE: raise RuntimeError("MCP library not available") logger.info(f"Starting Azure AI Search MCP Server v{config.mcp_server.version}") try: # Use stdio transport from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): from mcp.server import NotificationOptions from mcp.server.models import InitializationOptions initialization_options = InitializationOptions( server_name=config.mcp_server.name, server_version=config.mcp_server.version, capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ) await self.server.run(read_stream, write_stream, initialization_options) except asyncio.CancelledError: logger.info("Server run was cancelled") raise except Exception as e: import traceback print(f"[{time.strftime('%H:%M:%S')}] SERVER_TRACEBACK:", file=sys.stderr) traceback.print_exc(file=sys.stderr) logger.error(f"Server error: {str(e)}") raise finally: await self.search_chain.close() def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser(description="Azure AI Search MCP Server") parser.add_argument( "--standalone", action="store_true", help="Run in standalone mode for testing" ) parser.add_argument("--debug", action="store_true", help="Enable debug logging") args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) async def async_main(): server = AzureSearchMCPServer() # Determine mode based on arguments if args.standalone or not MCP_AVAILABLE: # Standalone mode for testing logger.info("Running in standalone mode") await server.run_standalone_mode() else: # MCP mode (default) logger.info("Running in MCP mode (awaiting client connection)") await server.run_mcp_mode() try: asyncio.run(async_main()) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server failed: {e}") raise if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mm-repos/langgraph-claude-azure-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server