import json
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Union, Any
from fastmcp import FastMCP, Context
import tempfile
import uuid
# Create the FastMCP server
mcp = FastMCP(name="Jupyter Notebook Server")
@mcp.tool
async def read_notebook_cells(
notebook_path: str,
cell_type: Optional[str] = None,
ctx: Context = None
) -> List[Dict[str, Any]]:
"""
Read cells from a Jupyter notebook.
Args:
notebook_path: Absolute path to the .ipynb file
cell_type: Optional filter by cell type ('code', 'markdown', 'raw')
Returns:
List of cell dictionaries with metadata
"""
try:
path = Path(notebook_path)
if not path.exists():
return {"error": f"Notebook not found: {notebook_path}"}
if not path.suffix == '.ipynb':
return {"error": f"File is not a Jupyter notebook: {notebook_path}"}
with open(path, 'r', encoding='utf-8') as f:
notebook = json.load(f)
cells = notebook.get('cells', [])
# Filter by cell type if specified
if cell_type:
cells = [cell for cell in cells if cell.get('cell_type') == cell_type]
# Format cells for better readability
formatted_cells = []
for i, cell in enumerate(cells):
formatted_cell = {
'index': i,
'cell_type': cell.get('cell_type', 'unknown'),
'source': ''.join(cell.get('source', [])),
'metadata': cell.get('metadata', {}),
}
# Add execution info for code cells
if cell.get('cell_type') == 'code':
formatted_cell['execution_count'] = cell.get('execution_count')
formatted_cell['outputs'] = cell.get('outputs', [])
formatted_cells.append(formatted_cell)
if ctx:
await ctx.info(f"Read {len(formatted_cells)} cells from {notebook_path}")
return formatted_cells
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON in notebook: {str(e)}"}
except Exception as e:
return {"error": f"Error reading notebook: {str(e)}"}
@mcp.tool
async def add_cell_to_notebook(
notebook_path: str,
cell_content: str,
cell_type: str = "code",
position: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
ctx: Context = None
) -> Dict[str, Any]:
"""
Add a new cell to a Jupyter notebook.
Args:
notebook_path: Absolute path to the .ipynb file
cell_content: Content of the new cell
cell_type: Type of cell ('code', 'markdown', 'raw')
position: Position to insert cell (default: append to end)
metadata: Optional cell metadata
Returns:
Status of the operation
"""
try:
path = Path(notebook_path)
# Create new notebook if it doesn't exist
if not path.exists():
notebook = {
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 4
}
if ctx:
await ctx.info(f"Creating new notebook: {notebook_path}")
else:
if not path.suffix == '.ipynb':
return {"error": f"File is not a Jupyter notebook: {notebook_path}"}
with open(path, 'r', encoding='utf-8') as f:
notebook = json.load(f)
# Create the new cell
new_cell = {
"cell_type": cell_type,
"metadata": metadata or {},
"source": cell_content.splitlines(True)
}
# Add cell-type specific fields
if cell_type == "code":
new_cell["execution_count"] = None
new_cell["outputs"] = []
# Insert cell at specified position or append
if position is not None and 0 <= position <= len(notebook["cells"]):
notebook["cells"].insert(position, new_cell)
if ctx:
await ctx.info(f"Inserted cell at position {position}")
else:
notebook["cells"].append(new_cell)
position = len(notebook["cells"]) - 1
if ctx:
await ctx.info(f"Added cell at end (position {position})")
# Save the notebook
with open(path, 'w', encoding='utf-8') as f:
json.dump(notebook, f, indent=2, ensure_ascii=False)
return {
"success": True,
"message": f"Added {cell_type} cell to {notebook_path}",
"position": position,
"total_cells": len(notebook["cells"])
}
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON in notebook: {str(e)}"}
except Exception as e:
return {"error": f"Error adding cell: {str(e)}"}
@mcp.tool
async def execute_notebook_cell(
notebook_path: str,
cell_index: int,
kernel_name: str = "python3",
timeout: int = 30,
ctx: Context = None
) -> Dict[str, Any]:
"""
Execute a specific cell in a Jupyter notebook.
Args:
notebook_path: Absolute path to the .ipynb file
cell_index: Index of the cell to execute (0-based)
kernel_name: Jupyter kernel to use for execution
timeout: Execution timeout in seconds
Returns:
Execution result with outputs
"""
try:
path = Path(notebook_path)
if not path.exists():
return {"error": f"Notebook not found: {notebook_path}"}
if not path.suffix == '.ipynb':
return {"error": f"File is not a Jupyter notebook: {notebook_path}"}
with open(path, 'r', encoding='utf-8') as f:
notebook = json.load(f)
cells = notebook.get('cells', [])
if cell_index < 0 or cell_index >= len(cells):
return {"error": f"Cell index {cell_index} out of range. Notebook has {len(cells)} cells."}
cell = cells[cell_index]
if cell.get('cell_type') != 'code':
return {"error": f"Cell {cell_index} is not a code cell"}
# Get cell source code
source_code = ''.join(cell.get('source', []))
if ctx:
await ctx.info(f"Executing cell {cell_index} in {notebook_path}")
# Create a temporary Python file to execute
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_file:
temp_file.write(source_code)
temp_file_path = temp_file.name
try:
# Execute the code using subprocess
result = subprocess.run(
[sys.executable, temp_file_path],
capture_output=True,
text=True,
timeout=timeout
)
# Update cell with execution results
cell['execution_count'] = (cell.get('execution_count') or 0) + 1
outputs = []
# Add stdout output
if result.stdout:
outputs.append({
"output_type": "stream",
"name": "stdout",
"text": result.stdout.splitlines(True)
})
# Add stderr output
if result.stderr:
outputs.append({
"output_type": "stream",
"name": "stderr",
"text": result.stderr.splitlines(True)
})
# Add error output if execution failed
if result.returncode != 0:
outputs.append({
"output_type": "error",
"ename": "ExecutionError",
"evalue": f"Process exited with code {result.returncode}",
"traceback": result.stderr.splitlines()
})
cell['outputs'] = outputs
# Save the updated notebook
with open(path, 'w', encoding='utf-8') as f:
json.dump(notebook, f, indent=2, ensure_ascii=False)
if ctx:
await ctx.info(f"Cell {cell_index} executed successfully")
return {
"success": True,
"cell_index": cell_index,
"execution_count": cell['execution_count'],
"outputs": outputs,
"return_code": result.returncode
}
finally:
# Clean up temporary file
Path(temp_file_path).unlink(missing_ok=True)
except subprocess.TimeoutExpired:
return {"error": f"Cell execution timed out after {timeout} seconds"}
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON in notebook: {str(e)}"}
except Exception as e:
return {"error": f"Error executing cell: {str(e)}"}
@mcp.tool
async def execute_entire_notebook(
notebook_path: str,
kernel_name: str = "python3",
timeout_per_cell: int = 30,
stop_on_error: bool = True,
ctx: Context = None
) -> Dict[str, Any]:
"""
Execute all code cells in a Jupyter notebook sequentially.
Args:
notebook_path: Absolute path to the .ipynb file
kernel_name: Jupyter kernel to use for execution
timeout_per_cell: Timeout per cell in seconds
stop_on_error: Whether to stop execution if a cell fails
Returns:
Execution summary with results for each cell
"""
try:
path = Path(notebook_path)
if not path.exists():
return {"error": f"Notebook not found: {notebook_path}"}
with open(path, 'r', encoding='utf-8') as f:
notebook = json.load(f)
cells = notebook.get('cells', [])
code_cells = [(i, cell) for i, cell in enumerate(cells) if cell.get('cell_type') == 'code']
if not code_cells:
return {"error": "No code cells found in notebook"}
if ctx:
await ctx.info(f"Executing {len(code_cells)} code cells in {notebook_path}")
results = []
executed_count = 0
for cell_index, cell in code_cells:
if ctx:
await ctx.report_progress(executed_count, len(code_cells))
# Execute the cell
result = await execute_notebook_cell(
notebook_path,
cell_index,
kernel_name,
timeout_per_cell,
ctx
)
results.append({
"cell_index": cell_index,
"result": result
})
executed_count += 1
# Stop on error if requested
if stop_on_error and not result.get("success", False):
if ctx:
await ctx.warning(f"Execution stopped at cell {cell_index} due to error")
break
if ctx:
await ctx.report_progress(len(code_cells), len(code_cells))
await ctx.info(f"Executed {executed_count}/{len(code_cells)} cells")
return {
"success": True,
"total_code_cells": len(code_cells),
"executed_cells": executed_count,
"results": results
}
except Exception as e:
return {"error": f"Error executing notebook: {str(e)}"}
@mcp.tool
async def get_notebook_info(notebook_path: str, ctx: Context = None) -> Dict[str, Any]:
"""
Get basic information about a Jupyter notebook.
Args:
notebook_path: Absolute path to the .ipynb file
Returns:
Notebook metadata and statistics
"""
try:
path = Path(notebook_path)
if not path.exists():
return {"error": f"Notebook not found: {notebook_path}"}
if not path.suffix == '.ipynb':
return {"error": f"File is not a Jupyter notebook: {notebook_path}"}
with open(path, 'r', encoding='utf-8') as f:
notebook = json.load(f)
cells = notebook.get('cells', [])
# Count cells by type
cell_counts = {}
for cell in cells:
cell_type = cell.get('cell_type', 'unknown')
cell_counts[cell_type] = cell_counts.get(cell_type, 0) + 1
# Get executed code cells count
executed_cells = sum(1 for cell in cells
if cell.get('cell_type') == 'code'
and cell.get('execution_count') is not None)
info = {
"notebook_path": str(path),
"total_cells": len(cells),
"cell_counts": cell_counts,
"executed_code_cells": executed_cells,
"nbformat": notebook.get('nbformat'),
"nbformat_minor": notebook.get('nbformat_minor'),
"metadata": notebook.get('metadata', {})
}
if ctx:
await ctx.info(f"Retrieved info for {notebook_path}")
return info
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON in notebook: {str(e)}"}
except Exception as e:
return {"error": f"Error reading notebook info: {str(e)}"}
def main():
print("Jupyter Notebook MCP Server")
print("Available tools:")
print("- read_notebook_cells: Read cells from a notebook")
print("- add_cell_to_notebook: Add a new cell to a notebook")
print("- execute_notebook_cell: Execute a specific cell")
print("- execute_entire_notebook: Execute all code cells")
print("- get_notebook_info: Get notebook metadata and statistics")
if __name__ == "__main__":
mcp.run()