#!/usr/bin/env python3
"""
MCP Server for Claude Desktop - File Operations
Provides read and write access to files in the documents subfolder.
"""
import asyncio
import logging
import pathlib
from datetime import datetime
from typing import Any, Dict
from mcp.server import FastMCP
from mcp.types import CallToolResult, TextContent
# Create logs directory if it doesn't exist
logs_dir = pathlib.Path("logs")
logs_dir.mkdir(exist_ok=True)
# Configure logging to both file and console
log_filename = logs_dir / f"mcp_server_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_filename, encoding="utf-8"),
logging.StreamHandler(), # Also log to console
],
)
logger = logging.getLogger(__name__)
# Constants
DOCUMENTS_DIR = pathlib.Path(__file__).parent.parent / "documents"
class FileMCPServer:
def __init__(self):
self.server = FastMCP("file-operations")
self.setup_tools()
def setup_tools(self):
"""Register all available tools."""
# Add tools using FastMCP's add_tool method
self.server.add_tool(
fn=self.read_file,
name="read_file",
description="Read the contents of a file from the documents folder",
)
self.server.add_tool(
fn=self.write_file,
name="write_file",
description="Write content to a file in the documents folder",
)
self.server.add_tool(
fn=self.list_files,
name="list_files",
description="List all files and directories in the documents folder",
)
self.server.add_tool(
fn=self.delete_file,
name="delete_file",
description="Delete a file or directory from the documents folder",
)
self.server.add_tool(
fn=self.create_directory,
name="create_directory",
description="Create a directory in the documents folder",
)
self.server.add_tool(
fn=self.delete_directory,
name="delete_directory",
description=(
"Delete a directory and all its contents recursively from the documents folder"
),
)
self.server.add_tool(
fn=self.read_pdf,
name="read_PDF",
description="Extract and read text content from PDF files in the documents folder",
)
self.server.add_tool(
fn=self.get_pdf_info,
name="get_PDF_info",
description="Get PDF metadata and information (title, author, pages, etc.)",
)
def _validate_path(self, file_path: str) -> pathlib.Path:
"""Validate and return a safe path within the documents directory."""
# Convert to Path object and resolve to absolute path
full_path = (DOCUMENTS_DIR / file_path).resolve()
# Ensure the path is within the documents directory
documents_abs = DOCUMENTS_DIR.resolve()
if not str(full_path).startswith(str(documents_abs)):
raise ValueError(
f"Path {file_path} is outside the allowed documents directory"
)
return full_path
def _check_file_locks(self, path: pathlib.Path) -> list:
"""Check for potentially locked files in a directory."""
locked_files = []
try:
for item in path.rglob("*"):
if item.is_file():
try:
# Try to open the file in exclusive mode to check if it's locked
with open(item, "r+b"):
pass
except (PermissionError, OSError):
locked_files.append(str(item.relative_to(DOCUMENTS_DIR)))
except Exception:
pass
return locked_files
async def read_file(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Read a file from the documents folder."""
file_path = arguments["file_path"]
full_path = self._validate_path(file_path)
if not full_path.exists():
return CallToolResult(
content=[TextContent(type="text", text=f"File not found: {file_path}")]
)
if not full_path.is_file():
return CallToolResult(
content=[
TextContent(type="text", text=f"Path is not a file: {file_path}")
]
)
try:
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
return CallToolResult(
content=[
TextContent(
type="text", text=f"File contents of {file_path}:\n\n{content}"
)
]
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Error reading file: {str(e)}")]
)
async def write_file(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Write content to a file in the documents folder."""
file_path = arguments["file_path"]
content = arguments["content"]
mode = arguments.get("mode", "w")
full_path = self._validate_path(file_path)
# Create parent directories if they don't exist
full_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(full_path, mode, encoding="utf-8") as f:
f.write(content)
action = "appended to" if mode == "a" else "written to"
return CallToolResult(
content=[
TextContent(type="text", text=f"Content {action} file: {file_path}")
]
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Error writing file: {str(e)}")]
)
async def list_files(self, arguments: Dict[str, Any]) -> CallToolResult:
"""List files in the documents folder."""
subdirectory = arguments.get("subdirectory", "")
search_pattern = arguments.get("search_pattern", "")
recursive = arguments.get("recursive", False)
if subdirectory:
list_path = self._validate_path(subdirectory)
else:
list_path = DOCUMENTS_DIR
if not list_path.exists():
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Directory not found: {subdirectory or 'documents'}",
)
]
)
if not list_path.is_dir():
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Path is not a directory: {subdirectory or 'documents'}",
)
]
)
try:
files = []
if search_pattern:
# Use glob pattern matching
if recursive:
pattern = list_path.glob(f"**/{search_pattern}")
else:
pattern = list_path.glob(search_pattern)
for item in pattern:
if item.is_file():
rel_path = item.relative_to(DOCUMENTS_DIR)
files.append(f"📄 {rel_path}")
elif item.is_dir():
rel_path = item.relative_to(DOCUMENTS_DIR)
files.append(f"📁 {rel_path}/")
else:
# List all items in directory
if recursive:
for item in list_path.rglob("*"):
if item.is_file():
rel_path = item.relative_to(DOCUMENTS_DIR)
files.append(f"📄 {rel_path}")
elif item.is_dir():
rel_path = item.relative_to(DOCUMENTS_DIR)
files.append(f"📁 {rel_path}/")
else:
for item in list_path.iterdir():
if item.is_file():
rel_path = item.relative_to(DOCUMENTS_DIR)
files.append(f"📄 {rel_path}")
elif item.is_dir():
rel_path = item.relative_to(DOCUMENTS_DIR)
files.append(f"📁 {rel_path}/")
if not files:
search_info = ""
if search_pattern:
search_info = f" matching '{search_pattern}'"
if recursive:
search_info += " (recursive)"
result_text = (
f"No files found in {subdirectory or 'documents'} "
f"directory{search_info}"
)
else:
search_info = ""
if search_pattern:
search_info = f" matching '{search_pattern}'"
if recursive:
search_info += " (recursive)"
result_text = (
f"Files in {subdirectory or 'documents'} directory{search_info}:\n"
+ "\n".join(sorted(files))
)
return CallToolResult(content=[TextContent(type="text", text=result_text)])
except Exception as e:
return CallToolResult(
content=[
TextContent(type="text", text=f"Error listing files: {str(e)}")
]
)
async def delete_file(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Delete a file or directory from the documents folder."""
file_path = arguments["file_path"]
recursive = arguments.get("recursive", False)
full_path = self._validate_path(file_path)
# Protect the documents directory itself
if full_path == DOCUMENTS_DIR:
return CallToolResult(
content=[
TextContent(
type="text",
text="Cannot delete the documents directory itself for safety reasons.",
)
]
)
if not full_path.exists():
return CallToolResult(
content=[
TextContent(
type="text", text=f"File or directory not found: {file_path}"
)
]
)
try:
if full_path.is_file():
full_path.unlink()
return CallToolResult(
content=[
TextContent(type="text", text=f"File deleted: {file_path}")
]
)
elif full_path.is_dir():
if recursive:
import shutil
import time
# Count items before deletion for better feedback
item_count = sum(1 for _ in full_path.rglob("*"))
# Try to delete with retry logic for Windows access issues
max_retries = 3
retry_delay = 0.5 # seconds
for attempt in range(max_retries):
try:
shutil.rmtree(full_path)
break
except PermissionError as e:
if attempt < max_retries - 1:
logger.warning(
f"Permission error deleting {file_path} (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
# Final attempt failed
locked_files = self._check_file_locks(full_path)
error_msg = (
f"Error deleting directory '{file_path}': Access denied. "
"The directory may contain files that are currently in use."
)
if locked_files:
error_msg += f" Potentially locked files: {', '.join(locked_files[:5])}"
if len(locked_files) > 5:
error_msg += (
f" and {len(locked_files) - 5} more"
)
error_msg += " Please close any applications that might be using files in this directory and try again."
return CallToolResult(
content=[
TextContent(
type="text",
text=error_msg,
)
]
)
except OSError as e:
if attempt < max_retries - 1:
logger.warning(
f"OS error deleting {file_path} (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(retry_delay)
retry_delay *= 2
else:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Error deleting directory '{file_path}': {str(e)}",
)
]
)
return CallToolResult(
content=[
TextContent(
type="text",
text=(
f"Directory '{file_path}' deleted recursively "
f"({item_count} items removed)"
),
)
]
)
else:
# Check if directory is empty
if any(full_path.iterdir()):
return CallToolResult(
content=[
TextContent(
type="text",
text=(
f"Directory '{file_path}' is not empty. "
"Use recursive=true to delete non-empty directories."
),
)
]
)
else:
full_path.rmdir()
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Empty directory deleted: {file_path}",
)
]
)
else:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Path is neither a file nor directory: {file_path}",
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text", text=f"Error deleting file or directory: {str(e)}"
)
]
)
async def create_directory(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Create a directory in the documents folder."""
dir_path = arguments["dir_path"]
full_path = self._validate_path(dir_path)
if full_path.exists():
return CallToolResult(
content=[
TextContent(
type="text", text=f"Directory already exists: {dir_path}"
)
]
)
try:
full_path.mkdir(parents=True, exist_ok=True)
return CallToolResult(
content=[
TextContent(type="text", text=f"Directory created: {dir_path}")
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(type="text", text=f"Error creating directory: {str(e)}")
]
)
async def delete_directory(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Delete a directory and all its contents recursively."""
dir_path = arguments["dir_path"]
full_path = self._validate_path(dir_path)
# Protect the documents directory itself
if full_path == DOCUMENTS_DIR:
return CallToolResult(
content=[
TextContent(
type="text",
text="Cannot delete the documents directory itself for safety reasons.",
)
]
)
if not full_path.exists():
return CallToolResult(
content=[
TextContent(type="text", text=f"Directory not found: {dir_path}")
]
)
if not full_path.is_dir():
return CallToolResult(
content=[
TextContent(
type="text", text=f"Path is not a directory: {dir_path}"
)
]
)
try:
import shutil
import time
# Count items before deletion for better feedback
item_count = sum(1 for _ in full_path.rglob("*"))
# Try to delete with retry logic for Windows access issues
max_retries = 3
retry_delay = 0.5 # seconds
for attempt in range(max_retries):
try:
shutil.rmtree(full_path)
break
except PermissionError as e:
if attempt < max_retries - 1:
logger.warning(
f"Permission error deleting {dir_path} (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
# Final attempt failed
locked_files = self._check_file_locks(full_path)
error_msg = (
f"Error deleting directory '{dir_path}': Access denied. "
"The directory may contain files that are currently in use."
)
if locked_files:
error_msg += f" Potentially locked files: {', '.join(locked_files[:5])}"
if len(locked_files) > 5:
error_msg += f" and {len(locked_files) - 5} more"
error_msg += " Please close any applications that might be using files in this directory and try again."
return CallToolResult(
content=[
TextContent(
type="text",
text=error_msg,
)
]
)
except OSError as e:
if attempt < max_retries - 1:
logger.warning(
f"OS error deleting {dir_path} (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(retry_delay)
retry_delay *= 2
else:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Error deleting directory '{dir_path}': {str(e)}",
)
]
)
return CallToolResult(
content=[
TextContent(
type="text",
text=(
f"Directory '{dir_path}' deleted successfully "
f"({item_count} items removed)"
),
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(type="text", text=f"Error deleting directory: {str(e)}")
]
)
async def read_pdf(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Extract and read text content from PDF files."""
file_path = arguments["file_path"]
full_path = self._validate_path(file_path)
if not full_path.exists():
return CallToolResult(
content=[
TextContent(type="text", text=f"PDF file not found: {file_path}")
]
)
if not full_path.is_file():
return CallToolResult(
content=[
TextContent(type="text", text=f"Path is not a file: {file_path}")
]
)
# Check if it's actually a PDF
if not file_path.lower().endswith(".pdf"):
return CallToolResult(
content=[
TextContent(type="text", text=f"File is not a PDF: {file_path}")
]
)
try:
import pdfplumber
extracted_text = []
with pdfplumber.open(full_path) as pdf:
total_pages = len(pdf.pages)
for page_num, page in enumerate(pdf.pages, 1):
page_text = page.extract_text()
if page_text:
extracted_text.append(f"--- Page {page_num} ---\n{page_text}")
else:
extracted_text.append(
f"--- Page {page_num} ---\n[No text content found]"
)
if not extracted_text:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"No text content found in PDF: {file_path}",
)
]
)
full_text = "\n\n".join(extracted_text)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"PDF content from {file_path} ({total_pages} pages):\n\n{full_text}",
)
]
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Error reading PDF: {str(e)}")]
)
async def get_pdf_info(self, arguments: Dict[str, Any]) -> CallToolResult:
"""Get PDF metadata and information."""
file_path = arguments["file_path"]
full_path = self._validate_path(file_path)
if not full_path.exists():
return CallToolResult(
content=[
TextContent(type="text", text=f"PDF file not found: {file_path}")
]
)
if not full_path.is_file():
return CallToolResult(
content=[
TextContent(type="text", text=f"Path is not a file: {file_path}")
]
)
if not file_path.lower().endswith(".pdf"):
return CallToolResult(
content=[
TextContent(type="text", text=f"File is not a PDF: {file_path}")
]
)
try:
import pdfplumber
import PyPDF2
info = []
info.append(f"File: {file_path}")
info.append(f"Size: {full_path.stat().st_size:,} bytes")
# Get basic info with pdfplumber
with pdfplumber.open(full_path) as pdf:
info.append(f"Pages: {len(pdf.pages)}")
# Get metadata with PyPDF2
try:
with open(full_path, "rb") as file:
pdf_reader = PyPDF2.PdfReader(file)
metadata = pdf_reader.metadata
if metadata:
if metadata.title:
info.append(f"Title: {metadata.title}")
if metadata.author:
info.append(f"Author: {metadata.author}")
if metadata.subject:
info.append(f"Subject: {metadata.subject}")
if metadata.creator:
info.append(f"Creator: {metadata.creator}")
if metadata.producer:
info.append(f"Producer: {metadata.producer}")
if metadata.creation_date:
info.append(f"Creation Date: {metadata.creation_date}")
if metadata.modification_date:
info.append(
f"Modification Date: {metadata.modification_date}"
)
# Check if PDF is encrypted
if pdf_reader.is_encrypted:
info.append("Status: Encrypted (password protected)")
else:
info.append("Status: Not encrypted")
except Exception as e:
info.append(f"Metadata extraction error: {str(e)}")
return CallToolResult(
content=[
TextContent(
type="text",
text="PDF Information:\n\n" + "\n".join(info),
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(type="text", text=f"Error reading PDF info: {str(e)}")
]
)
async def main():
"""Main entry point."""
logger.info("Starting MCP File Operations Server...")
# Ensure documents directory exists
DOCUMENTS_DIR.mkdir(exist_ok=True)
logger.info(f"Documents directory ready: {DOCUMENTS_DIR.absolute()}")
# Create and run the server
server = FileMCPServer()
logger.info("Server initialized successfully")
try:
logger.info("MCP stdio server started, waiting for connections...")
await server.server.run_stdio_async()
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as e:
logger.error(f"Server error: {e}")
logger.error(f"Error type: {type(e).__name__}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise
finally:
logger.info("Server shutdown complete")
if __name__ == "__main__":
asyncio.run(main())