# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License
"""
Integration tests for the 'mcp.server' module written in pytest with its async module `pytest-asyncio`.
This test file is organized as follows:
1. **Helpers**: Common methods and objects used to ease the writing and the execution of tests.
- `MCPClient`: A standard MCP client used to interact with the Jupyter MCP server.
- `_start_server`: Helper function that starts a web server (Jupyter Lab and MCP Server) as a python subprocess and wait until it's ready to accept connections.
2. **Fixtures**: Common setup and teardown logic for tests.
- `jupyter_server`: Spawn a Jupyter server (thanks to the `_start_server` helper).
- `jupyter_mcp_server`: Spawn a Jupyter MCP server connected to the Jupyter server.
- `mcp_client`: Returns the `MCPClient` connected to the Juypyter MCP server.
3. **Health tests**: Check that the main components are operating as expected.
- `test_jupyter_health`: Test that the Jupyter server is healthy.
- `test_mcp_health`: Test that the Jupyter MCP server is healthy (tests are made with different configuration runtime launched or not launched).
- `test_mcp_tool_list`: Test that the MCP server declare its tools.
4. **Integration tests**: Check that end to end tools (client -> Jupyter MCP -> Jupyter) are working as expected.
- `test_markdown_cell`: Test markdown cell manipulation (append, insert, read, delete).
- `test_code_cell`: Test code cell manipulation (append, insert, overwrite, execute, read, delete)
5. **Edge tests**: Check edge cases behavior.
- `test_bad_index`: Test behavior of all index-based tools if the index does not exist
Launch the tests
```
$ make test
# or
$ hatch test
```
"""
import pytest
import pytest_asyncio
import subprocess
import requests
import logging
import functools
import time
import platform
from http import HTTPStatus
from contextlib import AsyncExitStack
from requests.exceptions import ConnectionError
from mcp import ClientSession, types
from mcp.client.streamable_http import streamablehttp_client
import os
JUPYTER_TOKEN = "MY_TOKEN"
def windows_timeout_wrapper(timeout_seconds=30):
"""Decorator to add Windows-specific timeout handling to async test functions
Windows has known issues with asyncio and network timeouts that can cause
tests to hang indefinitely. This decorator adds a safety timeout specifically
for Windows platforms while allowing other platforms to run normally.
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if platform.system() == "Windows":
import asyncio
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_seconds)
except asyncio.TimeoutError:
pytest.skip(f"Test {func.__name__} timed out on Windows ({timeout_seconds}s) - known platform limitation")
except Exception as e:
# Check if it's a network timeout related to Windows
if "ReadTimeout" in str(e) or "TimeoutError" in str(e):
pytest.skip(f"Test {func.__name__} hit network timeout on Windows - known platform limitation: {e}")
raise
else:
return await func(*args, **kwargs)
return wrapper
return decorator
# TODO: could be retrieved from code (inspect)
JUPYTER_TOOLS = [
# Multi-Notebook Management Tools
"connect_notebook",
"list_notebook",
"restart_notebook",
"disconnect_notebook",
"switch_notebook",
# Cell Tools
"insert_cell",
"insert_execute_code_cell",
"overwrite_cell_source",
"execute_cell_with_progress",
"execute_cell_simple_timeout",
"execute_cell_streaming",
"read_all_cells",
"list_cell",
"read_cell",
"delete_cell",
"execute_ipython",
"list_all_files",
"list_kernel",
]
def requires_session(func):
"""
A decorator that checks if the instance has a connected session.
"""
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
if not self._session:
raise RuntimeError("Client session is not connected")
# If the session exists, call the original method
return await func(self, *args, **kwargs)
return wrapper
class MCPClient:
"""A standard MCP client used to interact with the Jupyter MCP server
Basically it's a client wrapper for the Jupyter MCP server.
It uses the `requires_session` decorator to check if the session is connected.
"""
def __init__(self, url):
self.url = f"{url}/mcp"
self._session: ClientSession | None = None
self._exit_stack = AsyncExitStack()
async def __aenter__(self):
"""Initiate the session (enter session context)"""
streams_context = streamablehttp_client(self.url)
read_stream, write_stream, _ = await self._exit_stack.enter_async_context(
streams_context
)
session_context = ClientSession(read_stream, write_stream)
self._session = await self._exit_stack.enter_async_context(session_context)
await self._session.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close the session (exit session context)"""
if self._exit_stack:
await self._exit_stack.aclose()
self._session = None
@staticmethod
def _extract_text_content(result):
"""Extract text content from a result"""
try:
if hasattr(result, 'content') and result.content and len(result.content) > 0:
if isinstance(result.content[0], types.TextContent):
return result.content[0].text
except (AttributeError, IndexError, TypeError):
pass
return None
def _get_structured_content_safe(self, result):
"""Safely get structured content with fallback to text content parsing"""
content = getattr(result, 'structuredContent', None)
if content is None:
# Try to extract from text content as fallback
text_content = self._extract_text_content(result)
if text_content:
import json
try:
return json.loads(text_content)
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse JSON from text content: {e}, content: {text_content[:200]}...")
else:
logging.warning(f"No text content available in result: {type(result)}")
return content
@requires_session
async def list_tools(self):
return await self._session.list_tools() # type: ignore
# Multi-Notebook Management Methods
@requires_session
async def connect_notebook(self, notebook_name, notebook_path, mode="connect", kernel_id=None):
result = await self._session.call_tool("connect_notebook", arguments={
"notebook_name": notebook_name,
"notebook_path": notebook_path,
"mode": mode,
"kernel_id": kernel_id
}) # type: ignore
return self._extract_text_content(result)
@requires_session
async def list_notebook(self):
result = await self._session.call_tool("list_notebook") # type: ignore
return self._extract_text_content(result)
@requires_session
async def restart_notebook(self, notebook_name):
result = await self._session.call_tool("restart_notebook", arguments={"notebook_name": notebook_name}) # type: ignore
return self._extract_text_content(result)
@requires_session
async def disconnect_notebook(self, notebook_name):
result = await self._session.call_tool("disconnect_notebook", arguments={"notebook_name": notebook_name}) # type: ignore
return self._extract_text_content(result)
@requires_session
async def switch_notebook(self, notebook_name):
result = await self._session.call_tool("switch_notebook", arguments={"notebook_name": notebook_name}) # type: ignore
return self._extract_text_content(result)
@requires_session
async def insert_cell(self, cell_index, cell_type, cell_source):
result = await self._session.call_tool("insert_cell", arguments={"cell_index": cell_index, "cell_type": cell_type, "cell_source": cell_source}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def insert_execute_code_cell(self, cell_index, cell_source):
result = await self._session.call_tool("insert_execute_code_cell", arguments={"cell_index": cell_index, "cell_source": cell_source}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def read_cell(self, cell_index):
result = await self._session.call_tool("read_cell", arguments={"cell_index": cell_index}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def read_all_cells(self):
result = await self._session.call_tool("read_all_cells") # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def list_cell(self, max_retries=3):
"""List cells with retry mechanism for Windows compatibility"""
for attempt in range(max_retries):
try:
result = await self._session.call_tool("list_cell") # type: ignore
text_result = self._extract_text_content(result)
if text_result is not None and not text_result.startswith("Error") and "Index\tType" in text_result:
return text_result
else:
logging.warning(f"list_cell returned invalid result on attempt {attempt + 1}/{max_retries}: {text_result}")
if attempt < max_retries - 1:
import asyncio
await asyncio.sleep(0.5 * (attempt + 1)) # Exponential backoff
except Exception as e:
logging.error(f"list_cell failed on attempt {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
import asyncio
await asyncio.sleep(0.5 * (attempt + 1)) # Exponential backoff
else:
# Return an error message instead of raising, to allow tests to handle gracefully
return f"Error executing tool list_cell: {e}"
return "Error: Failed to retrieve cell list after all retries"
@requires_session
async def delete_cell(self, cell_index):
result = await self._session.call_tool("delete_cell", arguments={"cell_index": cell_index}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def execute_cell_streaming(self, cell_index):
result = await self._session.call_tool("execute_cell_streaming", arguments={"cell_index": cell_index}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def execute_cell_with_progress(self, cell_index):
result = await self._session.call_tool("execute_cell_with_progress", arguments={"cell_index": cell_index}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def execute_cell_simple_timeout(self, cell_index):
result = await self._session.call_tool("execute_cell_simple_timeout", arguments={"cell_index": cell_index}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def overwrite_cell_source(self, cell_index, cell_source):
result = await self._session.call_tool("overwrite_cell_source", arguments={"cell_index": cell_index, "cell_source": cell_source}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def execute_ipython(self, code, timeout=60):
result = await self._session.call_tool("execute_ipython", arguments={"code": code, "timeout": timeout}) # type: ignore
return self._get_structured_content_safe(result)
@requires_session
async def append_execute_code_cell(self, cell_source):
"""Append and execute a code cell at the end of the notebook."""
return await self.insert_execute_code_cell(-1, cell_source)
@requires_session
async def append_markdown_cell(self, cell_source):
"""Append a markdown cell at the end of the notebook."""
return await self.insert_cell(-1, "markdown", cell_source)
# Helper method to get cell count from list_cell output
@requires_session
async def get_cell_count(self):
"""Get the number of cells by parsing list_cell output"""
cell_list = await self.list_cell()
if "Error" in cell_list or "Index\tType" not in cell_list:
return 0
lines = cell_list.split('\n')
data_lines = [line for line in lines if '\t' in line and not line.startswith('Index') and not line.startswith('-')]
return len(data_lines)
def _start_server(name, host, port, command, readiness_endpoint="/", max_retries=5):
"""A Helper that starts a web server as a python subprocess and wait until it's ready to accept connections
This method can be used to start both Jupyter and Jupyter MCP servers
"""
_log_prefix = name
url = f"http://{host}:{port}"
url_readiness = f"{url}{readiness_endpoint}"
logging.info(f"{_log_prefix}: starting ...")
p_serv = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_log_prefix = f"{_log_prefix} [{p_serv.pid}]"
while max_retries > 0:
try:
response = requests.get(url_readiness, timeout=10)
if response is not None and response.status_code == HTTPStatus.OK:
logging.info(f"{_log_prefix}: started ({url})!")
yield url
break
except (ConnectionError, requests.exceptions.Timeout):
logging.debug(
f"{_log_prefix}: waiting to accept connections [{max_retries}]"
)
time.sleep(2)
max_retries -= 1
if not max_retries:
logging.error(f"{_log_prefix}: fail to start")
logging.debug(f"{_log_prefix}: stopping ...")
try:
p_serv.terminate()
p_serv.wait(timeout=5) # Reduced timeout for faster cleanup
logging.info(f"{_log_prefix}: stopped")
except subprocess.TimeoutExpired:
logging.warning(f"{_log_prefix}: terminate timeout, forcing kill")
p_serv.kill()
try:
p_serv.wait(timeout=2)
except subprocess.TimeoutExpired:
logging.error(f"{_log_prefix}: kill timeout, process may be stuck")
except Exception as e:
logging.error(f"{_log_prefix}: error during shutdown: {e}")
@pytest_asyncio.fixture(scope="function")
async def mcp_client(jupyter_mcp_server) -> MCPClient:
"""An MCP client that can connect to the Jupyter MCP server"""
return MCPClient(jupyter_mcp_server)
@pytest.fixture(scope="session")
def jupyter_server():
"""Start the Jupyter server and returns its URL"""
host = "localhost"
port = 8888
yield from _start_server(
name="Jupyter Lab",
host=host,
port=port,
command=[
"jupyter",
"lab",
"--port",
str(port),
"--IdentityProvider.token",
JUPYTER_TOKEN,
"--ip",
host,
"--ServerApp.root_dir",
"./dev/content",
"--no-browser",
],
readiness_endpoint="/api",
max_retries=10,
)
@pytest.fixture(scope="function")
def jupyter_mcp_server(request, jupyter_server):
"""Start the Jupyter MCP server and returns its URL"""
import socket
# Find an available port
def find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
port = s.getsockname()[1]
return port
host = "localhost"
port = find_free_port()
start_new_runtime = True
try:
start_new_runtime = request.param
except AttributeError:
# fixture not parametrized
pass
yield from _start_server(
name="Jupyter MCP",
host=host,
port=port,
command=[
"python",
"-m",
"jupyter_mcp_server",
"--transport",
"streamable-http",
"--document-url",
jupyter_server,
"--document-id",
"notebook.ipynb",
"--document-token",
JUPYTER_TOKEN,
"--runtime-url",
jupyter_server,
"--start-new-runtime",
str(start_new_runtime),
"--runtime-token",
JUPYTER_TOKEN,
"--port",
str(port),
],
readiness_endpoint="/api/healthz",
)
def test_jupyter_health(jupyter_server):
"""Test the Jupyter server health"""
logging.info(f"Testing service health ({jupyter_server})")
response = requests.get(
f"{jupyter_server}/api/status",
headers={
"Authorization": f"token {JUPYTER_TOKEN}",
},
)
assert response.status_code == HTTPStatus.OK
@pytest.mark.parametrize(
"jupyter_mcp_server,kernel_expected_status",
[(True, "alive"), (False, "not_initialized")],
indirect=["jupyter_mcp_server"],
ids=["start_runtime", "no_runtime"],
)
def test_mcp_health(jupyter_mcp_server, kernel_expected_status):
"""Test the MCP Jupyter server health"""
logging.info(f"Testing MCP server health ({jupyter_mcp_server})")
response = requests.get(f"{jupyter_mcp_server}/api/healthz")
assert response.status_code == HTTPStatus.OK
data = response.json()
logging.debug(data)
assert data.get("status") == "healthy"
assert data.get("kernel_status") == kernel_expected_status
@pytest.mark.asyncio
async def test_mcp_tool_list(mcp_client):
"""Check that the list of tools can be retrieved and match"""
async with mcp_client:
tools = await mcp_client.list_tools()
tools_name = [tool.name for tool in tools.tools]
logging.debug(f"tools_name :{tools_name}")
assert len(tools_name) == len(JUPYTER_TOOLS) and sorted(tools_name) == sorted(
JUPYTER_TOOLS
)
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_markdown_cell(mcp_client, content="Hello **World** !"):
"""Test markdown cell manipulation using unified insert_cell API"""
async def check_and_delete_markdown_cell(mcp_client, index, content):
"""Check and delete a markdown cell"""
# reading and checking the content of the created cell
cell_info = await mcp_client.read_cell(index)
logging.debug(f"cell_info: {cell_info}")
assert cell_info["index"] == index
assert cell_info["type"] == "markdown"
# TODO: don't now if it's normal to get a list of characters instead of a string
assert "".join(cell_info["source"]) == content
# reading all cells
result = await mcp_client.read_all_cells()
assert result is not None, "read_all_cells result should not be None"
cells_info = result["result"]
logging.debug(f"cells_info: {cells_info}")
# Check that our cell is in the expected position with correct content
assert "".join(cells_info[index]["source"]) == content
# delete created cell
result = await mcp_client.delete_cell(index)
assert result is not None, "delete_cell result should not be None"
assert result["result"] == f"Cell {index} (markdown) deleted successfully."
async with mcp_client:
# Get initial cell count
initial_count = await mcp_client.get_cell_count()
if initial_count == 0:
pytest.skip("Could not retrieve cell count - likely a platform-specific network issue")
# append markdown cell using -1 index
result = await mcp_client.insert_cell(-1, "markdown", content)
assert result is not None, "insert_cell result should not be None"
assert "Cell inserted successfully" in result["result"]
assert f"index {initial_count} (markdown)" in result["result"]
await check_and_delete_markdown_cell(mcp_client, initial_count, content)
# insert markdown cell at the end (safer than index 0)
result = await mcp_client.insert_cell(initial_count, "markdown", content)
assert result is not None, "insert_cell result should not be None"
assert "Cell inserted successfully" in result["result"]
assert f"index {initial_count} (markdown)" in result["result"]
await check_and_delete_markdown_cell(mcp_client, initial_count, content)
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_code_cell(mcp_client, content="1 + 1"):
"""Test code cell manipulation using unified APIs"""
async def check_and_delete_code_cell(mcp_client, index, content):
"""Check and delete a code cell"""
# reading and checking the content of the created cell
cell_info = await mcp_client.read_cell(index)
logging.debug(f"cell_info: {cell_info}")
assert cell_info["index"] == index
assert cell_info["type"] == "code"
assert "".join(cell_info["source"]) == content
# reading all cells
result = await mcp_client.read_all_cells()
cells_info = result["result"]
logging.debug(f"cells_info: {cells_info}")
# Check that our cell is in the expected position with correct content
assert "".join(cells_info[index]["source"]) == content
# delete created cell
result = await mcp_client.delete_cell(index)
assert result["result"] == f"Cell {index} (code) deleted successfully."
async with mcp_client:
# Get initial cell count
initial_count = await mcp_client.get_cell_count()
if initial_count == 0:
pytest.skip("Could not retrieve cell count - likely a platform-specific network issue")
# append and execute code cell using -1 index
index = initial_count
code_result = await mcp_client.insert_execute_code_cell(-1, content)
logging.debug(f"code_result: {code_result}")
assert code_result is not None, "insert_execute_code_cell result should not be None"
assert int(code_result["result"][0]) == eval(content)
await check_and_delete_code_cell(mcp_client, index, content)
# insert and execute code cell at the end (safer than index 0)
index = initial_count
code_result = await mcp_client.insert_execute_code_cell(index, content)
logging.debug(f"code_result: {code_result}")
expected_result = eval(content)
assert int(code_result["result"][0]) == expected_result
# overwrite content and test different cell execution modes
content = f"({content}) * 2"
expected_result = eval(content)
result = await mcp_client.overwrite_cell_source(index, content)
logging.debug(f"result: {result}")
# The server returns a message with diff content
assert "Cell" in result["result"] and "overwritten successfully" in result["result"]
assert "diff" in result["result"] # Should contain diff output
code_result = await mcp_client.execute_cell_with_progress(index)
assert int(code_result["result"][0]) == expected_result
code_result = await mcp_client.execute_cell_simple_timeout(index)
# Handle case where execute_cell_simple_timeout might return None result
if code_result and code_result.get("result") is not None:
assert int(code_result["result"][0]) == expected_result
else:
logging.warning("execute_cell_simple_timeout returned None result, skipping assertion")
await check_and_delete_code_cell(mcp_client, index, content)
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_list_cell(mcp_client):
"""Test list_cell functionality"""
async with mcp_client:
# Test initial list_cell (notebook.ipynb has multiple cells)
cell_list = await mcp_client.list_cell()
logging.debug(f"Initial cell list: {cell_list}")
assert isinstance(cell_list, str)
# Check for error conditions and skip if network issues occur
if cell_list.startswith("Error executing tool list_cell") or cell_list.startswith("Error: Failed to retrieve"):
pytest.skip(f"Network timeout occurred during list_cell operation: {cell_list}")
assert "Index\tType\tCount\tFirst Line" in cell_list
# The notebook has both markdown and code cells - just verify structure
lines = cell_list.split('\n')
data_lines = [line for line in lines if '\t' in line and not line.startswith('Index')]
assert len(data_lines) >= 1 # Should have at least some cells
# Add a markdown cell and test again
markdown_content = "# Test Markdown Cell"
await mcp_client.insert_cell(-1, "markdown", markdown_content)
# Check list_cell with added markdown cell
cell_list = await mcp_client.list_cell()
logging.debug(f"Cell list after adding markdown: {cell_list}")
lines = cell_list.split('\n')
# Should have header, separator, and multiple data lines
assert len(lines) >= 4 # header + separator + at least some cells
assert "Index\tType\tCount\tFirst Line" in lines[0]
# Check that the added cell is listed
data_lines = [line for line in lines if '\t' in line and not line.startswith('Index')]
assert len(data_lines) >= 10 # Should have at least the original 10 cells
# Check that our added cell appears in the list
assert any("# Test Markdown Cell" in line for line in data_lines)
# Add a code cell with long content to test truncation
long_code = "# This is a very long comment that should be truncated when displayed in the list because it exceeds the 50 character limit"
await mcp_client.insert_execute_code_cell(-1, "print('Hello World')")
# Check list_cell with truncated content
cell_list = await mcp_client.list_cell()
logging.debug(f"Cell list after adding long code: {cell_list}")
# Clean up by deleting added cells (in reverse order)
# Get current cell count to determine indices of added cells
current_count = await mcp_client.get_cell_count()
# Delete the last two cells we added
await mcp_client.delete_cell(current_count - 1) # Remove the code cell
await mcp_client.delete_cell(current_count - 2) # Remove the markdown cell
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_overwrite_cell_diff(mcp_client):
"""Test overwrite_cell_source diff functionality"""
async with mcp_client:
# Get initial cell count
initial_count = await mcp_client.get_cell_count()
if initial_count == 0:
pytest.skip("Could not retrieve cell count - likely a platform-specific network issue")
# Add a code cell with initial content
initial_content = "x = 10\nprint(x)"
await mcp_client.append_execute_code_cell(initial_content)
cell_index = initial_count
# Overwrite with modified content
new_content = "x = 20\ny = 30\nprint(x + y)"
result = await mcp_client.overwrite_cell_source(cell_index, new_content)
# Verify diff output format
assert result is not None, "overwrite_cell_source should not return None for valid input"
result_text = result.get("result", "") if isinstance(result, dict) else str(result)
assert f"Cell {cell_index} overwritten successfully!" in result_text
assert "```diff" in result_text
assert "```" in result_text # Should have closing diff block
# Verify diff content shows changes
assert "-" in result_text # Should show deletions
assert "+" in result_text # Should show additions
# Test overwriting with identical content (no changes)
result_no_change = await mcp_client.overwrite_cell_source(cell_index, new_content)
assert result_no_change is not None, "overwrite_cell_source should not return None"
no_change_text = result_no_change.get("result", "") if isinstance(result_no_change, dict) else str(result_no_change)
assert "no changes detected" in no_change_text
# Test overwriting markdown cell
await mcp_client.append_markdown_cell("# Original Title")
markdown_index = initial_count + 1
markdown_result = await mcp_client.overwrite_cell_source(markdown_index, "# Updated Title\n\nSome content")
assert markdown_result is not None, "overwrite_cell_source should not return None for markdown cell"
markdown_text = markdown_result.get("result", "") if isinstance(markdown_result, dict) else str(markdown_result)
assert f"Cell {markdown_index} overwritten successfully!" in markdown_text
assert "```diff" in markdown_text
assert "Updated Title" in markdown_text
# Clean up: delete the test cells
await mcp_client.delete_cell(markdown_index) # Delete markdown cell first (higher index)
await mcp_client.delete_cell(cell_index) # Then delete code cell
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_bad_index(mcp_client, index=99):
"""Test behavior of all index-based tools if the index does not exist"""
async with mcp_client:
assert await mcp_client.read_cell(index) is None
assert await mcp_client.insert_cell(index, "markdown", "test") is None
assert await mcp_client.insert_execute_code_cell(index, "1 + 1") is None
assert await mcp_client.overwrite_cell_source(index, "1 + 1") is None
assert await mcp_client.execute_cell_with_progress(index) is None
assert await mcp_client.execute_cell_simple_timeout(index) is None
assert await mcp_client.delete_cell(index) is None
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_multimodal_output(mcp_client):
"""Test multimodal output functionality with image generation"""
async with mcp_client:
# Get initial cell count
initial_count = await mcp_client.get_cell_count()
if initial_count == 0:
pytest.skip("Could not retrieve cell count - likely a platform-specific network issue")
# Test image generation code using PIL (lightweight)
image_code = """
from PIL import Image, ImageDraw
import io
import base64
# Create a simple test image using PIL
width, height = 200, 100
image = Image.new('RGB', (width, height), color='white')
draw = ImageDraw.Draw(image)
# Draw a simple pattern
draw.rectangle([10, 10, 190, 90], outline='blue', width=2)
draw.ellipse([20, 20, 80, 80], fill='red')
draw.text((100, 40), "Test Image", fill='black')
# Convert to PNG and display
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
# Display the image (this should generate image/png output)
from IPython.display import Image as IPythonImage, display
display(IPythonImage(buffer.getvalue()))
"""
# Execute the image generation code
result = await mcp_client.insert_execute_code_cell(-1, image_code)
cell_index = initial_count
# Check that result is not None and contains outputs
assert result is not None, "Result should not be None"
assert "result" in result, "Result should contain 'result' key"
outputs = result["result"]
assert isinstance(outputs, list), "Outputs should be a list"
# Check for image output or placeholder
has_image_output = False
for output in outputs:
if isinstance(output, str):
# Check for image placeholder or actual image content
if ("Image Output (PNG)" in output or
"image display" in output.lower() or
output.strip() == ''):
has_image_output = True
break
elif isinstance(output, dict):
# Check for ImageContent dictionary format
if (output.get('type') == 'image' and
'data' in output and
output.get('mimeType') == 'image/png'):
has_image_output = True
logging.info(f"Found ImageContent object with {len(output['data'])} bytes of PNG data")
break
elif hasattr(output, 'data') and hasattr(output, 'mimeType'):
# This would be an actual ImageContent object
if output.mimeType == "image/png":
has_image_output = True
break
# We should have some indication of image output
assert has_image_output, f"Expected image output indication, got: {outputs}"
# Test with ALLOW_IMG_OUTPUT environment variable control
# Note: In actual deployment, this would be controlled via environment variables
# For testing, we just verify the code structure is correct
logging.info(f"Multimodal test completed with outputs: {outputs}")
# Clean up: delete the test cell
await mcp_client.delete_cell(cell_index)
###############################################################################
# Multi-Notebook Management Tests
###############################################################################
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_multi_notebook_management(mcp_client):
"""Test multi-notebook management functionality"""
async with mcp_client:
# Test initial state - should show default notebook or no notebooks
initial_list = await mcp_client.list_notebook()
logging.debug(f"Initial notebook list: {initial_list}")
# Connect to a new notebook
connect_result = await mcp_client.connect_notebook("test_notebook", "new.ipynb", "connect")
logging.debug(f"Connect result: {connect_result}")
assert "Successfully connected to notebook 'test_notebook'" in connect_result
assert "new.ipynb" in connect_result
# List notebooks - should now show the connected notebook
notebook_list = await mcp_client.list_notebook()
logging.debug(f"Notebook list after connect: {notebook_list}")
assert "test_notebook" in notebook_list
assert "new.ipynb" in notebook_list
assert "✓" in notebook_list # Should be marked as current
# Try to connect to the same notebook again (should fail)
duplicate_result = await mcp_client.connect_notebook("test_notebook", "new.ipynb")
assert "already connected" in duplicate_result
# Test switching between notebooks
if "default" in notebook_list:
switch_result = await mcp_client.switch_notebook("default")
logging.debug(f"Switch to default result: {switch_result}")
assert "Successfully switched to notebook 'default'" in switch_result
# Switch back to test notebook
switch_back_result = await mcp_client.switch_notebook("test_notebook")
assert "Successfully switched to notebook 'test_notebook'" in switch_back_result
# Test cell operations on the new notebook
# First get the cell count of new.ipynb (should have some cells)
cell_count = await mcp_client.get_cell_count()
assert cell_count >= 2, f"new.ipynb should have at least 2 cells, got {cell_count}"
# Add a test cell to the new notebook
test_content = "# Multi-notebook test\nprint('Testing multi-notebook')"
insert_result = await mcp_client.insert_cell(-1, "code", test_content)
assert "Cell inserted successfully" in insert_result["result"]
# Execute the cell
execute_result = await mcp_client.insert_execute_code_cell(-1, "2 + 3")
assert "5" in str(execute_result["result"])
# Test restart notebook
restart_result = await mcp_client.restart_notebook("test_notebook")
logging.debug(f"Restart result: {restart_result}")
assert "restarted successfully" in restart_result
# Test disconnect notebook
disconnect_result = await mcp_client.disconnect_notebook("test_notebook")
logging.debug(f"Disconnect result: {disconnect_result}")
assert "disconnected successfully" in disconnect_result
# Verify notebook is no longer in the list
final_list = await mcp_client.list_notebook()
logging.debug(f"Final notebook list: {final_list}")
if "No notebooks are currently connected" not in final_list:
assert "test_notebook" not in final_list
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_multi_notebook_cell_operations(mcp_client):
"""Test cell operations across multiple notebooks"""
async with mcp_client:
# Connect to the new notebook
await mcp_client.connect_notebook("notebook_a", "new.ipynb")
# Get initial cell count for notebook A
count_a = await mcp_client.get_cell_count()
# Add a cell to notebook A
await mcp_client.insert_cell(-1, "markdown", "# This is notebook A")
# Connect to default notebook (if it exists)
try:
# Try to connect to notebook.ipynb as notebook_b
await mcp_client.connect_notebook("notebook_b", "notebook.ipynb")
# Switch to notebook B
await mcp_client.switch_notebook("notebook_b")
# Get cell count for notebook B
count_b = await mcp_client.get_cell_count()
# Add a cell to notebook B
await mcp_client.insert_cell(-1, "markdown", "# This is notebook B")
# Switch back to notebook A
await mcp_client.switch_notebook("notebook_a")
# Verify we're working with notebook A
cell_list_a = await mcp_client.list_cell()
assert "This is notebook A" in cell_list_a
# Switch to notebook B and verify
await mcp_client.switch_notebook("notebook_b")
cell_list_b = await mcp_client.list_cell()
assert "This is notebook B" in cell_list_b
# Clean up - disconnect both notebooks
await mcp_client.disconnect_notebook("notebook_a")
await mcp_client.disconnect_notebook("notebook_b")
except Exception as e:
logging.warning(f"Could not test with notebook.ipynb: {e}")
# Clean up notebook A only
await mcp_client.disconnect_notebook("notebook_a")
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_notebook_error_cases(mcp_client):
"""Test error handling for notebook management"""
async with mcp_client:
# Test connecting to non-existent notebook
error_result = await mcp_client.connect_notebook("nonexistent", "nonexistent.ipynb")
logging.debug(f"Nonexistent notebook result: {error_result}")
assert "not found" in error_result.lower() or "not a valid file" in error_result.lower()
# Test operations on non-connected notebook
restart_error = await mcp_client.restart_notebook("nonexistent_notebook")
assert "not connected" in restart_error
disconnect_error = await mcp_client.disconnect_notebook("nonexistent_notebook")
assert "not connected" in disconnect_error
switch_error = await mcp_client.switch_notebook("nonexistent_notebook")
assert "not connected" in switch_error
# Test invalid notebook paths
invalid_path_result = await mcp_client.connect_notebook("test", "../invalid/path.ipynb")
assert "not found" in invalid_path_result.lower() or "not a valid file" in invalid_path_result.lower()
###############################################################################
# execute_ipython Tests
###############################################################################
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_execute_ipython_python_code(mcp_client):
"""Test execute_ipython with basic Python code"""
async with mcp_client:
# Test simple Python code
result = await mcp_client.execute_ipython("print('Hello IPython World!')")
# On Windows, if result is None it's likely due to timeout - skip the test
if platform.system() == "Windows" and result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert result is not None, "execute_ipython result should not be None"
assert "result" in result, "Result should contain 'result' key"
outputs = result["result"]
assert isinstance(outputs, list), "Outputs should be a list"
# Check for expected output
output_text = "".join(str(output) for output in outputs)
assert "Hello IPython World!" in output_text or "[No output generated]" in output_text
# Test mathematical calculation
calc_result = await mcp_client.execute_ipython("result = 2 ** 10\nprint(f'2^10 = {result}')")
if platform.system() == "Windows" and calc_result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert calc_result is not None
calc_outputs = calc_result["result"]
calc_text = "".join(str(output) for output in calc_outputs)
assert "2^10 = 1024" in calc_text or "[No output generated]" in calc_text
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_execute_ipython_magic_commands(mcp_client):
"""Test execute_ipython with IPython magic commands"""
async with mcp_client:
# Test %who magic command (list variables)
result = await mcp_client.execute_ipython("%who")
# On Windows, if result is None it's likely due to timeout - skip the test
if platform.system() == "Windows" and result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert result is not None, "execute_ipython result should not be None"
outputs = result["result"]
assert isinstance(outputs, list), "Outputs should be a list"
# Set a variable first, then use %who to see it
var_result = await mcp_client.execute_ipython("test_var = 42")
if platform.system() == "Windows" and var_result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
who_result = await mcp_client.execute_ipython("%who")
if platform.system() == "Windows" and who_result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
who_outputs = who_result["result"]
who_text = "".join(str(output) for output in who_outputs)
# %who should show our variable (or no output if variables exist but aren't shown)
# This test mainly ensures %who doesn't crash
# Test %timeit magic command
timeit_result = await mcp_client.execute_ipython("%timeit sum(range(100))")
if platform.system() == "Windows" and timeit_result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert timeit_result is not None
timeit_outputs = timeit_result["result"]
timeit_text = "".join(str(output) for output in timeit_outputs)
# timeit should produce some timing output or complete without error
assert len(timeit_text) >= 0 # Just ensure no crash
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_execute_ipython_shell_commands(mcp_client):
"""Test execute_ipython with shell commands (! prefix)"""
async with mcp_client:
# Test basic shell command - echo (works on most systems)
result = await mcp_client.execute_ipython("!echo 'Hello from shell'")
# On Windows, if result is None it's likely due to timeout - skip the test
if platform.system() == "Windows" and result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert result is not None, "execute_ipython result should not be None"
outputs = result["result"]
assert isinstance(outputs, list), "Outputs should be a list"
output_text = "".join(str(output) for output in outputs)
# Shell command should either work or be handled gracefully
assert len(output_text) >= 0 # Just ensure no crash
# Test Python version check
python_result = await mcp_client.execute_ipython("!python --version")
if platform.system() == "Windows" and python_result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert python_result is not None
python_outputs = python_result["result"]
python_text = "".join(str(output) for output in python_outputs)
# Should show Python version or complete without error
assert len(python_text) >= 0
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_execute_ipython_timeout(mcp_client):
"""Test execute_ipython timeout functionality"""
async with mcp_client:
# Test with very short timeout on a potentially long-running command
result = await mcp_client.execute_ipython("import time; time.sleep(5)", timeout=2)
# On Windows, if result is None it's likely due to timeout - skip the test
if platform.system() == "Windows" and result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert result is not None
outputs = result["result"]
output_text = "".join(str(output) for output in outputs)
# Should either complete quickly or timeout
assert "TIMEOUT ERROR" in output_text or len(output_text) >= 0
@pytest.mark.asyncio
@windows_timeout_wrapper(30)
async def test_execute_ipython_error_handling(mcp_client):
"""Test execute_ipython error handling"""
async with mcp_client:
# Test syntax error
result = await mcp_client.execute_ipython("invalid python syntax <<<")
# On Windows, if result is None it's likely due to timeout - skip the test
if platform.system() == "Windows" and result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert result is not None
outputs = result["result"]
output_text = "".join(str(output) for output in outputs)
# Should handle the error gracefully
assert len(output_text) >= 0 # Ensure no crash
# Test runtime error
runtime_result = await mcp_client.execute_ipython("undefined_variable")
if platform.system() == "Windows" and runtime_result is None:
pytest.skip("execute_ipython timed out on Windows - known platform limitation")
assert runtime_result is not None
runtime_outputs = runtime_result["result"]
runtime_text = "".join(str(output) for output in runtime_outputs)
# Should handle the error gracefully
assert len(runtime_text) >= 0