Skip to main content
Glama
edit_tools.py11.3 kB
import logging import os import tempfile from typing import Dict, Any, List, Optional try: import fcntl except ImportError: fcntl = None # Windows support from ..core.document import Document from ..core.path_utils import resolve_path logger = logging.getLogger(__name__) class CachedDocument: """Wrapper for Document with cache invalidation support""" def __init__(self, document: Document, mtime: float): self.document = document self.mtime = mtime # File modification time when loaded class FileLock: """Context manager for file locking""" def __init__(self, file_path: str, exclusive: bool = True): self.file_path = file_path self.exclusive = exclusive self.lock_file = None self.lock_path = f"{file_path}.lock" def __enter__(self): # Create lock file if it doesn't exist self.lock_file = open(self.lock_path, "w") if fcntl: if self.exclusive: fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX) else: fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_SH) return self def __exit__(self, exc_type, exc_val, exc_tb): if self.lock_file: if fcntl: fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN) self.lock_file.close() # Clean up lock file try: os.remove(self.lock_path) except OSError: pass # Ignore if already removed return False class EditTool: """Tool for working with document content""" def __init__(self): self._documents: Dict[str, CachedDocument] = {} def _get_file_mtime(self, file_path: str) -> float: """Get file modification time""" try: return os.path.getmtime(file_path) except OSError: return 0.0 def _is_cache_valid(self, abs_path: str) -> bool: """Check if cached document is still valid""" if abs_path not in self._documents: return False cached = self._documents[abs_path] current_mtime = self._get_file_mtime(abs_path) # Cache is valid if file hasn't been modified return cached.mtime == current_mtime def invalidate_cache(self, file_path: str) -> None: """Invalidate cache for a specific file""" abs_path = resolve_path(file_path) if abs_path in self._documents: del self._documents[abs_path] def invalidate_all_cache(self) -> None: """Invalidate all cached documents""" self._documents.clear() def get_doc(self, file_path: str) -> Document: """Get or load document by path with cache validation""" # Use centralized path resolution abs_path = resolve_path(file_path) # Check if cache is still valid if not self._is_cache_valid(abs_path): if os.path.exists(abs_path): with open(abs_path, "r", encoding="utf-8") as f: content = f.read() mtime = self._get_file_mtime(abs_path) self._documents[abs_path] = CachedDocument( document=Document(content=content), mtime=mtime ) else: self._documents[abs_path] = CachedDocument( document=Document(content=""), mtime=0.0 ) return self._documents[abs_path].document def _update_cache_mtime(self, abs_path: str) -> None: """Update cache mtime after writing file""" if abs_path in self._documents: self._documents[abs_path].mtime = self._get_file_mtime(abs_path) def _atomic_write(self, file_path: str, content: str) -> None: """Write file atomically using temp file + rename""" abs_path = resolve_path(file_path) dir_path = os.path.dirname(abs_path) # Write to temp file first fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix=".tmp_") try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(content) # Atomic rename os.replace(temp_path, abs_path) except Exception: # Clean up temp file on error try: os.unlink(temp_path) except OSError: pass raise async def get_structure( self, file_path: str, depth: Optional[int] = None ) -> List[dict]: doc = self.get_doc(file_path) return doc.get_structure(depth=depth) async def read(self, file_path: str, path: str) -> Dict[str, Any]: doc = self.get_doc(file_path) return doc.view_element(path) async def replace( self, file_path: str, path: str, new_content: str ) -> Dict[str, Any]: abs_path = resolve_path(file_path) with FileLock(abs_path): doc = self.get_doc(file_path) result = doc.replace(path, new_content) if "success" in result: try: self._atomic_write(file_path, doc.get_content()) self._update_cache_mtime(abs_path) # Confirm journal only after successful file write doc.confirm_journal() except Exception as e: # Rollback journal entry on write failure doc.rollback_last_entry() self.invalidate_cache(file_path) return {"error": f"Failed to write file: {e}"} # Remove internal field from result result.pop("_pending_entry", None) return result async def insert( self, file_path: str, path: str, element_type: str, content: str, where: str = "after", heading_level: int = 1, ) -> Dict[str, Any]: abs_path = resolve_path(file_path) with FileLock(abs_path): doc = self.get_doc(file_path) if where == "before": result = doc.insert_before(path, element_type, content, heading_level) else: result = doc.insert_after(path, element_type, content, heading_level) if "success" in result: try: self._atomic_write(file_path, doc.get_content()) self._update_cache_mtime(abs_path) doc.confirm_journal() except Exception as e: doc.rollback_last_entry() self.invalidate_cache(file_path) return {"error": f"Failed to write file: {e}"} return result async def delete(self, file_path: str, path: str) -> Dict[str, Any]: abs_path = resolve_path(file_path) with FileLock(abs_path): doc = self.get_doc(file_path) result = doc.delete(path) if "success" in result: try: self._atomic_write(file_path, doc.get_content()) self._update_cache_mtime(abs_path) doc.confirm_journal() except Exception as e: doc.rollback_last_entry() self.invalidate_cache(file_path) return {"error": f"Failed to write file: {e}"} return result async def undo(self, file_path: str, count: int = 1) -> Dict[str, Any]: abs_path = resolve_path(file_path) with FileLock(abs_path): doc = self.get_doc(file_path) result = doc.undo(count) if "success" in result: try: self._atomic_write(file_path, doc.get_content()) self._update_cache_mtime(abs_path) doc.confirm_journal() except Exception as e: self.invalidate_cache(file_path) return {"error": f"Failed to write file: {e}"} return result async def search(self, file_path: str, query: str) -> List[Dict[str, Any]]: doc = self.get_doc(file_path) return doc.search_text(query) async def get_context(self, file_path: str, path: str) -> Dict[str, Any]: doc = self.get_doc(file_path) return doc.get_context(path) async def move( self, file_path: str, src_path: str, dst_path: str, where: str = "after" ) -> Dict[str, Any]: abs_path = resolve_path(file_path) with FileLock(abs_path): doc = self.get_doc(file_path) result = doc.move_element(src_path, dst_path, where) if "success" in result: try: self._atomic_write(file_path, doc.get_content()) self._update_cache_mtime(abs_path) doc.confirm_journal() except Exception as e: self.invalidate_cache(file_path) return {"error": f"Failed to write file: {e}"} return result async def update_metadata( self, file_path: str, metadata: Dict[str, Any] ) -> Dict[str, Any]: abs_path = resolve_path(file_path) with FileLock(abs_path): doc = self.get_doc(file_path) result = doc.update_metadata(metadata) if "success" in result: try: self._atomic_write(file_path, doc.get_content()) self._update_cache_mtime(abs_path) doc.confirm_journal() except Exception as e: doc.rollback_last_entry() self.invalidate_cache(file_path) return {"error": f"Failed to write file: {e}"} return result # Global instance following the template _instance = EditTool() # Async wrappers following the template async def get_document_structure(file_path: str, depth: Optional[int] = None): return await _instance.get_structure(file_path, depth) async def read_element(file_path: str, path: str): return await _instance.read(file_path, path) async def replace_content(file_path: str, path: str, new_content: str): return await _instance.replace(file_path, path, new_content) async def insert_element( file_path: str, path: str, element_type: str, content: str, where: str = "after", heading_level: int = 1, ): return await _instance.insert( file_path, path, element_type, content, where, heading_level ) async def delete_element(file_path: str, path: str): return await _instance.delete(file_path, path) async def undo_changes(file_path: str, count: int = 1): return await _instance.undo(file_path, count) async def search_in_document(file_path: str, query: str): return await _instance.search(file_path, query) async def get_element_context(file_path: str, path: str): return await _instance.get_context(file_path, path) async def move_document_element( file_path: str, src_path: str, dst_path: str, where: str = "after" ): return await _instance.move(file_path, src_path, dst_path, where) async def update_document_metadata(file_path: str, metadata: Dict[str, Any]): return await _instance.update_metadata(file_path, metadata)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KazKozDev/markdown-editor-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server