Databricks MCP Server

by JustTryAI
Verified
""" MCP client tests for the Databricks MCP server. This module contains tests that use the MCP client to connect to and test the server. """ import asyncio import json import logging import os import sys from typing import Any, Dict, List, Optional import pytest from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.client.session import ClientSession # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) async def run_tests(): """Connect to and test the Databricks MCP server.""" logger.info("Connecting to Databricks MCP server...") # IMPORTANT: In MCP, the client launches the server process # We don't connect to an already running server! # Define the environment variables the server needs env = os.environ.copy() # Create parameters for connecting to the server # This will launch the server using the PowerShell script params = StdioServerParameters( command="pwsh", # Use PowerShell args=["-File", "./scripts/start_server.ps1"], # Run the startup script env=env # Pass environment variables ) # Use the client to start the server and connect to it logger.info("Launching server process...") try: async with stdio_client(params) as (recv, send): logger.info("Server launched, creating session...") session = ClientSession(recv, send) logger.info("Initializing session...") await session.initialize() # List available tools tools_response = await session.list_tools() tool_names = [t.name for t in tools_response.tools] logger.info(f"Available tools: {tool_names}") # Run tests for clusters if "list_clusters" in tool_names: await test_list_clusters(session) await test_get_cluster(session) else: logger.warning("Cluster tools not available") # Run tests for notebooks if "list_notebooks" in tool_names: await test_list_notebooks(session) await test_export_notebook(session) else: logger.warning("Notebook tools not available") logger.info("All tests completed successfully!") return True except Exception as e: logger.error(f"Error during tests: {e}", exc_info=True) return False # Skip all these tests until we fix the hanging issues @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation") @pytest.mark.asyncio async def test_list_clusters(session): """Test listing clusters.""" logger.info("Testing list_clusters...") response = await session.call_tool("list_clusters", {}) logger.info(f"list_clusters response: {json.dumps(response, indent=2)}") assert "clusters" in response, "Response should contain 'clusters' key" return response @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation") @pytest.mark.asyncio async def test_get_cluster(session): """Test getting cluster details.""" logger.info("Testing get_cluster...") # First list clusters to get a cluster_id clusters_response = await test_list_clusters(session) if not clusters_response.get("clusters"): logger.warning("No clusters found to test get_cluster") return # Get the first cluster ID cluster_id = clusters_response["clusters"][0]["cluster_id"] # Get cluster details response = await session.call_tool("get_cluster", {"cluster_id": cluster_id}) logger.info(f"get_cluster response: {json.dumps(response, indent=2)}") assert "cluster_id" in response, "Response should contain 'cluster_id' key" assert response["cluster_id"] == cluster_id, "Returned cluster ID should match requested ID" @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation") @pytest.mark.asyncio async def test_list_notebooks(session): """Test listing notebooks.""" logger.info("Testing list_notebooks...") response = await session.call_tool("list_notebooks", {"path": "/"}) logger.info(f"list_notebooks response: {json.dumps(response, indent=2)}") assert "objects" in response, "Response should contain 'objects' key" return response @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation") @pytest.mark.asyncio async def test_export_notebook(session): """Test exporting a notebook.""" logger.info("Testing export_notebook...") # First list notebooks to get a notebook path notebooks_response = await test_list_notebooks(session) if not notebooks_response.get("objects"): logger.warning("No notebooks found to test export_notebook") return # Find the first notebook (not a directory) notebook = None for obj in notebooks_response["objects"]: if obj.get("object_type") == "NOTEBOOK": notebook = obj break if not notebook: logger.warning("No notebooks found to test export_notebook") return # Get notebook path notebook_path = notebook["path"] # Export notebook response = await session.call_tool( "export_notebook", {"path": notebook_path, "format": "SOURCE"} ) logger.info(f"export_notebook response (truncated): {str(response)[:200]}...") assert "content" in response, "Response should contain 'content' key" async def main(): """Run the tests.""" success = await run_tests() return 0 if success else 1 if __name__ == "__main__": """Run the tests directly.""" sys.exit(asyncio.run(main()))