local_backend.py•13.9 kB
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License
"""
Local Backend Implementation
This backend uses the Jupyter Server's local API directly when running as an extension.
It provides efficient local access to contents_manager and kernel_manager.
"""
from typing import Optional, Any, Union, Literal, TYPE_CHECKING
import asyncio
from mcp.types import ImageContent
from jupyter_mcp_server.jupyter_extension.backends.base import Backend
from jupyter_mcp_server.utils import safe_extract_outputs
if TYPE_CHECKING:
    from jupyter_server.serverapp import ServerApp
class LocalBackend(Backend):
    """
    Backend that uses local Jupyter Server API directly.
    
    Uses:
    - serverapp.contents_manager for notebook file operations
    - serverapp.kernel_manager for kernel management
    - serverapp.kernel_spec_manager for kernel specs
    
    This backend is only available when running as a Jupyter Server extension
    with document_url="local" or runtime_url="local".
    """
    
    def __init__(self, serverapp: 'ServerApp'):
        """
        Initialize local backend with direct serverapp access.
        
        Args:
            serverapp: Jupyter ServerApp instance
        """
        self.serverapp = serverapp
        self.contents_manager = serverapp.contents_manager
        self.kernel_manager = serverapp.kernel_manager
        self.kernel_spec_manager = serverapp.kernel_spec_manager
    
    # Notebook operations
    
    async def get_notebook_content(self, path: str) -> dict[str, Any]:
        """
        Get notebook content using local contents_manager.
        
        Args:
            path: Path to notebook file
            
        Returns:
            Notebook content dictionary
        """
        model = await asyncio.to_thread(
            self.contents_manager.get,
            path,
            type='notebook',
            content=True
        )
        return model['content']
    
    async def list_notebooks(self, path: str = "") -> list[str]:
        """
        List all notebooks recursively using local contents_manager.
        
        Args:
            path: Directory path to search
            
        Returns:
            List of notebook paths
        """
        notebooks = []
        await self._list_notebooks_recursive(path, notebooks)
        return notebooks
    
    async def _list_notebooks_recursive(self, path: str, notebooks: list[str]) -> None:
        """Helper to recursively list notebooks."""
        try:
            model = await asyncio.to_thread(
                self.contents_manager.get,
                path,
                content=True
            )
            
            if model['type'] == 'directory':
                for item in model['content']:
                    item_path = f"{path}/{item['name']}" if path else item['name']
                    
                    if item['type'] == 'directory':
                        await self._list_notebooks_recursive(item_path, notebooks)
                    elif item['type'] == 'notebook' or item['name'].endswith('.ipynb'):
                        notebooks.append(item_path)
        except Exception:
            # Skip directories we can't access
            pass
    
    async def notebook_exists(self, path: str) -> bool:
        """
        Check if notebook exists using local contents_manager.
        
        Args:
            path: Path to notebook
            
        Returns:
            True if exists
        """
        try:
            await asyncio.to_thread(
                self.contents_manager.get,
                path,
                content=False
            )
            return True
        except Exception:
            return False
    
    async def create_notebook(self, path: str) -> dict[str, Any]:
        """
        Create a new notebook using local contents_manager.
        
        Args:
            path: Path for new notebook
            
        Returns:
            Created notebook content
        """
        model = await asyncio.to_thread(
            self.contents_manager.new,
            path=path
        )
        return model['content']
    
    # Cell operations
    
    async def read_cells(
        self, 
        path: str, 
        start_index: Optional[int] = None,
        end_index: Optional[int] = None
    ) -> list[dict[str, Any]]:
        """
        Read cells from notebook.
        
        Args:
            path: Notebook path
            start_index: Start index
            end_index: End index
            
        Returns:
            List of cells
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        if start_index is not None or end_index is not None:
            start = start_index or 0
            end = end_index if end_index is not None else len(cells)
            cells = cells[start:end]
        
        return cells
    
    async def append_cell(
        self, 
        path: str, 
        cell_type: Literal["code", "markdown"],
        source: Union[str, list[str]]
    ) -> int:
        """
        Append a cell to notebook.
        
        Args:
            path: Notebook path
            cell_type: Cell type
            source: Cell source
            
        Returns:
            Index of appended cell
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        # Normalize source to list of strings
        if isinstance(source, str):
            source = source.splitlines(keepends=True)
        
        new_cell = {
            'cell_type': cell_type,
            'metadata': {},
            'source': source
        }
        
        if cell_type == 'code':
            new_cell['outputs'] = []
            new_cell['execution_count'] = None
        
        cells.append(new_cell)
        content['cells'] = cells
        
        # Save updated notebook
        await asyncio.to_thread(
            self.contents_manager.save,
            {
                'type': 'notebook',
                'content': content
            },
            path
        )
        
        return len(cells) - 1
    
    async def insert_cell(
        self,
        path: str,
        cell_index: int,
        cell_type: Literal["code", "markdown"],
        source: Union[str, list[str]]
    ) -> int:
        """
        Insert a cell at specific index.
        
        Args:
            path: Notebook path
            cell_index: Insert position
            cell_type: Cell type
            source: Cell source
            
        Returns:
            Index of inserted cell
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        # Normalize source
        if isinstance(source, str):
            source = source.splitlines(keepends=True)
        
        new_cell = {
            'cell_type': cell_type,
            'metadata': {},
            'source': source
        }
        
        if cell_type == 'code':
            new_cell['outputs'] = []
            new_cell['execution_count'] = None
        
        cells.insert(cell_index, new_cell)
        content['cells'] = cells
        
        # Save updated notebook
        await asyncio.to_thread(
            self.contents_manager.save,
            {
                'type': 'notebook',
                'content': content
            },
            path
        )
        
        return cell_index
    
    async def delete_cell(self, path: str, cell_index: int) -> None:
        """
        Delete a cell from notebook.
        
        Args:
            path: Notebook path
            cell_index: Index to delete
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        if 0 <= cell_index < len(cells):
            cells.pop(cell_index)
            content['cells'] = cells
            
            await asyncio.to_thread(
                self.contents_manager.save,
                {
                    'type': 'notebook',
                    'content': content
                },
                path
            )
    
    async def overwrite_cell(
        self,
        path: str,
        cell_index: int,
        new_source: Union[str, list[str]]
    ) -> tuple[str, str]:
        """
        Overwrite cell content.
        
        Args:
            path: Notebook path
            cell_index: Cell index
            new_source: New source
            
        Returns:
            Tuple of (old_source, new_source)
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        if cell_index < 0 or cell_index >= len(cells):
            raise ValueError(f"Cell index {cell_index} out of range")
        
        cell = cells[cell_index]
        old_source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source']
        
        # Normalize new source
        if isinstance(new_source, str):
            new_source_str = new_source
            new_source = new_source.splitlines(keepends=True)
        else:
            new_source_str = ''.join(new_source)
        
        cell['source'] = new_source
        content['cells'] = cells
        
        await asyncio.to_thread(
            self.contents_manager.save,
            {
                'type': 'notebook',
                'content': content
            },
            path
        )
        
        return (old_source, new_source_str)
    
    # Kernel operations
    
    async def get_or_create_kernel(self, path: str, kernel_id: Optional[str] = None) -> str:
        """
        Get existing kernel or create new one.
        
        Args:
            path: Notebook path (for context)
            kernel_id: Specific kernel ID
            
        Returns:
            Kernel ID
        """
        if kernel_id and kernel_id in self.kernel_manager:
            return kernel_id
        
        # Start new kernel
        kernel_id = await self.kernel_manager.start_kernel()
        return kernel_id
    
    async def execute_cell(
        self,
        path: str,
        cell_index: int,
        kernel_id: str,
        timeout_seconds: int = 300
    ) -> list[Union[str, ImageContent]]:
        """
        Execute a cell using local kernel manager.
        
        Args:
            path: Notebook path
            cell_index: Cell index
            kernel_id: Kernel ID
            timeout_seconds: Timeout
            
        Returns:
            List of outputs
        """
        # Get cell source
        cells = await self.read_cells(path)
        if cell_index < 0 or cell_index >= len(cells):
            raise ValueError(f"Cell index {cell_index} out of range")
        
        cell = cells[cell_index]
        source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source']
        
        # Get kernel client
        kernel = self.kernel_manager.get_kernel(kernel_id)
        client = kernel.client()
        
        # Execute code
        msg_id = client.execute(source)
        
        # Collect outputs
        outputs = []
        start_time = asyncio.get_event_loop().time()
        
        while True:
            if asyncio.get_event_loop().time() - start_time > timeout_seconds:
                raise TimeoutError(f"Cell execution exceeded {timeout_seconds} seconds")
            
            try:
                msg = await asyncio.wait_for(
                    asyncio.to_thread(client.get_iopub_msg, timeout=1),
                    timeout=2
                )
                
                msg_type = msg['header']['msg_type']
                
                if msg_type == 'status':
                    if msg['content']['execution_state'] == 'idle':
                        break
                elif msg_type in ['execute_result', 'display_data']:
                    outputs.append(msg['content'])
                elif msg_type == 'stream':
                    outputs.append(msg['content'])
                elif msg_type == 'error':
                    outputs.append(msg['content'])
            except asyncio.TimeoutError:
                continue
            except Exception:
                break
        
        # Update cell with outputs
        content = await self.get_notebook_content(path)
        if cell_index < len(content['cells']):
            content['cells'][cell_index]['outputs'] = outputs
            await asyncio.to_thread(
                self.contents_manager.save,
                {'type': 'notebook', 'content': content},
                path
            )
        
        return safe_extract_outputs(outputs)
    
    async def interrupt_kernel(self, kernel_id: str) -> None:
        """Interrupt a kernel."""
        if kernel_id in self.kernel_manager:
            kernel = self.kernel_manager.get_kernel(kernel_id)
            await kernel.interrupt()
    
    async def restart_kernel(self, kernel_id: str) -> None:
        """Restart a kernel."""
        if kernel_id in self.kernel_manager:
            await self.kernel_manager.restart_kernel(kernel_id)
    
    async def shutdown_kernel(self, kernel_id: str) -> None:
        """Shutdown a kernel."""
        if kernel_id in self.kernel_manager:
            await self.kernel_manager.shutdown_kernel(kernel_id)
    
    async def list_kernels(self) -> list[dict[str, Any]]:
        """List all running kernels."""
        return [
            {
                'id': kid,
                'name': self.kernel_manager.get_kernel(kid).kernel_name
            }
            for kid in self.kernel_manager.list_kernel_ids()
        ]
    
    async def kernel_exists(self, kernel_id: str) -> bool:
        """Check if kernel exists."""
        return kernel_id in self.kernel_manager