MCP Development Server
by dillip285
- src
- mcp_dev_server
- docker
"""Container output streaming and file synchronization."""
import os
import time
import asyncio
import hashlib
import collections
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional, AsyncGenerator, Any
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from ..utils.logging import setup_logging
from ..utils.errors import StreamError, SyncError
logger = setup_logging(__name__)
class OutputFormat(str, Enum):
"""Output stream formats."""
STDOUT = "stdout"
STDERR = "stderr"
COMBINED = "combined"
FORMATTED = "formatted"
class StreamConfig:
"""Stream configuration."""
def __init__(
self,
format: OutputFormat = OutputFormat.COMBINED,
buffer_size: int = 1024,
filters: Optional[List[str]] = None,
timestamp: bool = False
):
self.format = format
self.buffer_size = buffer_size
self.filters = filters or []
self.timestamp = timestamp
class SyncConfig:
"""Synchronization configuration."""
def __init__(
self,
ignore_patterns: Optional[List[str]] = None,
sync_interval: float = 1.0,
atomic: bool = True
):
self.ignore_patterns = ignore_patterns or []
self.sync_interval = sync_interval
self.atomic = atomic
class StreamInfo:
"""Information about an active stream."""
def __init__(self, task: asyncio.Task, config: StreamConfig):
self.task = task
self.config = config
self.start_time = datetime.now()
class EnhancedOutputStreamManager:
"""Enhanced streaming output manager."""
def __init__(self, docker_manager):
self.docker_manager = docker_manager
self.active_streams: Dict[str, StreamInfo] = {}
self._buffer = collections.deque(maxlen=1000) # Keep last 1000 messages
async def start_stream(
self,
container_name: str,
command: str,
config: StreamConfig,
callback: Optional[callable] = None
) -> AsyncGenerator[str, None]:
"""Start enhanced output stream."""
try:
container = self.docker_manager.containers.get(container_name)
if not container:
raise StreamError(f"Container not found: {container_name}")
# Create execution with specified format
exec_result = container.exec_run(
command,
stream=True,
demux=True,
socket=True # Use socket for better streaming
)
async def stream_handler():
buffer = []
try:
async for data in exec_result.output:
# Apply format and filtering
processed_data = self._process_stream_data(data, config)
if processed_data:
buffer.extend(processed_data)
if len(buffer) >= config.buffer_size:
output = ''.join(buffer)
buffer.clear()
self._buffer.append(output)
if callback:
await callback(output)
yield output
except Exception as e:
logger.error(f"Stream processing error: {str(e)}")
raise StreamError(f"Stream processing error: {str(e)}")
finally:
if buffer:
output = ''.join(buffer)
self._buffer.append(output)
if callback:
await callback(output)
yield output
if container_name in self.active_streams:
del self.active_streams[container_name]
# Create and store stream task
stream_task = asyncio.create_task(stream_handler())
self.active_streams[container_name] = StreamInfo(stream_task, config)
async for output in stream_task:
yield output
except Exception as e:
logger.error(f"Failed to start stream: {str(e)}")
raise StreamError(f"Failed to start stream: {str(e)}")
def _process_stream_data(
self,
data: bytes,
config: StreamConfig
) -> Optional[str]:
"""Process stream data according to config."""
if not data:
return None
# Split streams if demuxed
stdout, stderr = data if isinstance(data, tuple) else (data, None)
# Apply format
if config.format == OutputFormat.STDOUT and stdout:
output = stdout.decode()
elif config.format == OutputFormat.STDERR and stderr:
output = stderr.decode()
elif config.format == OutputFormat.COMBINED:
output = ''
if stdout:
output += stdout.decode()
if stderr:
output += stderr.decode()
elif config.format == OutputFormat.FORMATTED:
output = self._format_output(stdout, stderr)
else:
return None
# Apply filters
for filter_pattern in config.filters:
if filter_pattern in output:
return None
# Add timestamp if requested
if config.timestamp:
output = f"[{datetime.now().isoformat()}] {output}"
return output
@staticmethod
def _format_output(stdout: Optional[bytes], stderr: Optional[bytes]) -> str:
"""Format output with colors and prefixes."""
output = []
if stdout:
output.append(f"\033[32m[OUT]\033[0m {stdout.decode()}")
if stderr:
output.append(f"\033[31m[ERR]\033[0m {stderr.decode()}")
return '\n'.join(output)
async def stop_stream(self, container_name: str) -> None:
"""Stop streaming from a container."""
if stream_info := self.active_streams.get(container_name):
stream_info.task.cancel()
try:
await stream_info.task
except asyncio.CancelledError:
pass
del self.active_streams[container_name]
class BiDirectionalSync:
"""Enhanced bi-directional file synchronization."""
def __init__(self, docker_manager):
self.docker_manager = docker_manager
self.sync_handlers: Dict[str, EnhancedSyncHandler] = {}
self.observer = Observer()
self.observer.start()
async def start_sync(
self,
container_name: str,
host_path: str,
container_path: str,
config: SyncConfig
) -> None:
"""Start bi-directional file sync."""
try:
# Validate paths
if not os.path.exists(host_path):
raise SyncError(f"Host path does not exist: {host_path}")
container = self.docker_manager.containers.get(container_name)
if not container:
raise SyncError(f"Container not found: {container_name}")
# Create sync handler
handler = EnhancedSyncHandler(
container=container,
container_path=container_path,
host_path=host_path,
config=config
)
# Start watching both directions
self.observer.schedule(
handler,
host_path,
recursive=True
)
# Start container file watcher
await handler.start_container_watcher()
self.sync_handlers[container_name] = handler
logger.info(f"Started bi-directional sync for container: {container_name}")
except Exception as e:
raise SyncError(f"Failed to start sync: {str(e)}")
async def stop_sync(self, container_name: str) -> None:
"""Stop synchronization for a container."""
if handler := self.sync_handlers.get(container_name):
self.observer.unschedule_all()
await handler.stop_container_watcher()
del self.sync_handlers[container_name]
logger.info(f"Stopped sync for container: {container_name}")
async def cleanup(self) -> None:
"""Clean up all synchronization handlers."""
for container_name in list(self.sync_handlers.keys()):
await self.stop_sync(container_name)
self.observer.stop()
self.observer.join()
class EnhancedSyncHandler(FileSystemEventHandler):
"""Enhanced sync handler with bi-directional support."""
def __init__(
self,
container,
container_path: str,
host_path: str,
config: SyncConfig
):
super().__init__()
self.container = container
self.container_path = container_path
self.host_path = host_path
self.config = config
self.sync_lock = asyncio.Lock()
self.pending_syncs: Dict[str, float] = {}
self._container_watcher: Optional[asyncio.Task] = None
async def start_container_watcher(self) -> None:
"""Start watching container files."""
cmd = f"""
inotifywait -m -r -e modify,create,delete,move {self.container_path}
"""
exec_result = self.container.exec_run(
cmd,
stream=True,
detach=True
)
self._container_watcher = asyncio.create_task(
self._handle_container_events(exec_result.output)
)
async def stop_container_watcher(self) -> None:
"""Stop container file watcher."""
if self._container_watcher:
self._container_watcher.cancel()
try:
await self._container_watcher
except asyncio.CancelledError:
pass
self._container_watcher = None
async def _handle_container_events(self, output_stream: AsyncGenerator) -> None:
"""Handle container file events."""
try:
async for event in output_stream:
await self._handle_container_change(event.decode())
except Exception as e:
logger.error(f"Container watcher error: {str(e)}")
async def _handle_container_change(self, event: str) -> None:
"""Handle container file change."""
try:
# Parse inotify event
parts = event.strip().split()
if len(parts) >= 3:
path = parts[0]
change_type = parts[1]
filename = parts[2]
container_path = os.path.join(path, filename)
host_path = self._container_to_host_path(container_path)
# Apply filters
if self._should_ignore(host_path):
return
async with self.sync_lock:
# Check if change is from host sync
if host_path in self.pending_syncs:
if time.time() - self.pending_syncs[host_path] < self.config.sync_interval:
return
# Sync from container to host
await self._sync_to_host(container_path, host_path)
except Exception as e:
logger.error(f"Error handling container change: {str(e)}")
def _container_to_host_path(self, container_path: str) -> str:
"""Convert container path to host path."""
rel_path = os.path.relpath(container_path, self.container_path)
return os.path.join(self.host_path, rel_path)
def _should_ignore(self, path: str) -> bool:
"""Check if path should be ignored."""
return any(pattern in path for pattern in self.config.ignore_patterns)
async def _sync_to_host(
self,
container_path: str,
host_path: str
) -> None:
"""Sync file from container to host."""
try:
# Get file from container
stream, stat = self.container.get_archive(container_path)
# Create parent directories
os.makedirs(os.path.dirname(host_path), exist_ok=True)
if self.config.atomic:
# Save file atomically using temporary file
tmp_path = f"{host_path}.tmp"
with open(tmp_path, 'wb') as f:
for chunk in stream:
f.write(chunk)
os.rename(tmp_path, host_path)
else:
# Direct write
with open(host_path, 'wb') as f:
for chunk in stream:
f.write(chunk)
# Update sync tracking
self.pending_syncs[host_path] = time.time()
except Exception as e:
logger.error(f"Error syncing to host: {str(e)}")
raise SyncError(f"Failed to sync file {container_path}: {str(e)}")