mcp_server.py•9.23 kB
#!/usr/bin/env python3
import asyncio
import logging
import os
import sys
from typing import Any
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from rag_tools import RagTools
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr), # Log to stderr so it doesn't interfere with MCP protocol on stdout
]
)
logger = logging.getLogger(__name__)
# Initialize the MCP server
server = Server("rag-mcp-server")
# Initialize tools
try:
rag_tools = RagTools()
logger.info("RAG tools initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize RAG tools: {e}")
sys.exit(1)
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
try:
logger.debug("Listing available tools")
return [
types.Tool(
name="listContentNames",
description="List all content names from the organization's database optionally filtered by name",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Optional name filter to search for specific content"
}
},
"additionalProperties": False
}
),
types.Tool(
name="searchOrganizationContents",
description="Search through the organization's content database using semantic search",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": ["query"],
"additionalProperties": False
}
),
types.Tool(
name="deleteOrganizationContent",
description="Delete specific content from the organization's database",
inputSchema={
"type": "object",
"properties": {
"contentId": {
"type": "string",
"description": "The ID of the content to delete"
}
},
"required": ["contentId"],
"additionalProperties": False
}
),
types.Tool(
name="uploadContentFileAboutOrganization",
description="Upload content file about the organization",
inputSchema={
"type": "object",
"properties": {
"file": {
"type": "string",
"description": "The file content to upload"
},
"fileName": {
"type": "string",
"description": "The file name"
},
"role": {
"type": "string",
"description": "The roles of the user"
}
},
"required": ["file", "fileName"],
"additionalProperties": False
}
),
types.Tool(
name="uploadContentUrlAboutOrganization",
description="Upload content url about the organization",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to upload"
},
"role": {
"type": "string",
"description": "The roles of the user"
}
},
"required": ["url"],
"additionalProperties": False
}
)
]
except Exception as e:
logger.error(f"Error listing tools: {e}")
raise
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""Handle tool calls"""
if arguments is None:
arguments = {}
try:
logger.info(f"Executing tool: {name} with arguments: {arguments}")
if name == "listContentNames":
result = rag_tools.list_content_names(arguments.get("name"))
logger.debug(f"Tool {name} executed successfully")
return [types.TextContent(type="text", text=str(result))]
elif name == "searchOrganizationContents":
if "query" not in arguments:
raise ValueError("Query parameter is required")
result = rag_tools.search_organization_contents(arguments["query"])
logger.debug(f"Tool {name} executed successfully")
return [types.TextContent(type="text", text=str(result))]
elif name == "deleteOrganizationContent":
if "contentId" not in arguments:
raise ValueError("contentId parameter is required")
result = rag_tools.delete_organization_content(arguments["contentId"])
logger.debug(f"Tool {name} executed successfully")
return [types.TextContent(type="text", text=str(result))]
elif name == "uploadContentFileAboutOrganization":
if "file" not in arguments or "fileName" not in arguments:
raise ValueError("file and fileName parameters are required")
result = rag_tools.upload_content_file_about_organization(
arguments["file"],
arguments["fileName"],
arguments.get("role", "")
)
logger.debug(f"Tool {name} executed successfully")
return [types.TextContent(type="text", text=str(result))]
elif name == "uploadContentUrlAboutOrganization":
if "url" not in arguments:
raise ValueError("url parameter is required")
result = rag_tools.upload_content_url_about_organization(
arguments["url"],
arguments.get("role", "")
)
logger.debug(f"Tool {name} executed successfully")
return [types.TextContent(type="text", text=str(result))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async def main():
"""Main entry point for the MCP server"""
try:
logger.info("Starting RAG MCP Server...")
# Environment variables for configuration
content_service_url = os.getenv('CONTENT_SERVICE_URL', 'http://localhost:8080')
user_id = os.getenv('USER_ID', 'invalid')
logger.info(f"Content Service URL: {content_service_url}")
logger.info(f"User ID: {user_id}")
# Set default content service URL if not provided
if not os.getenv('CONTENT_SERVICE_URL'):
os.environ['CONTENT_SERVICE_URL'] = 'http://localhost:8080'
logger.info("Using default content service URL")
logger.info("Initializing MCP server communication...")
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP server communication established, starting server...")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="rag-mcp-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
except KeyboardInterrupt:
logger.info("Server shutdown requested by user (Ctrl+C)")
sys.exit(0)
except Exception as e:
logger.error(f"Fatal error starting MCP server: {e}")
logger.exception("Full error traceback:")
sys.exit(1)
if __name__ == "__main__":
try:
print("RAG MCP Server starting up...")
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Server shutdown requested")
sys.exit(0)
except Exception as e:
logger.error(f"Fatal error in main: {e}")
logger.exception("Full error traceback:")
sys.exit(1)
finally:
logger.info("RAG MCP Server shutdown complete")