LPS MCP
by lpsDevelopers
Verified
#!/usr/bin/env python3
import os
import json
import sys
import pathlib
from datetime import datetime
from typing import List, Dict, Any, Optional, Union, TypedDict
from mcp.server.fastmcp import FastMCP, Context
# Create a FastMCP server instance
mcp = FastMCP("secure-filesystem-server")
# Command line argument parsing
if len(sys.argv) < 2:
print("Usage: python filesystem_server.py <allowed-directory> [additional-directories...]", file=sys.stderr)
sys.exit(1)
# Normalize all paths consistently
def normalize_path(p: str) -> str:
return os.path.normpath(p)
def expand_home(filepath: str) -> str:
if filepath.startswith('~/') or filepath == '~':
return os.path.join(os.path.expanduser('~'), filepath[1:])
return filepath
# Store allowed directories in normalized form
allowed_directories = [
normalize_path(os.path.abspath(expand_home(dir)))
for dir in sys.argv[1:]
]
# Validate that all directories exist and are accessible
for dir_path in sys.argv[1:]:
expanded_path = expand_home(dir_path)
try:
stats = os.stat(expanded_path)
if not os.path.isdir(expanded_path):
print(f"Error: {dir_path} is not a directory", file=sys.stderr)
sys.exit(1)
except OSError as e:
print(f"Error accessing directory {dir_path}: {e}", file=sys.stderr)
sys.exit(1)
# Security utilities
async def validate_path(requested_path: str) -> str:
"""Valida y resuelve las rutas de archivos contra directorios permitidos por seguridad."""
expanded_path = expand_home(requested_path)
absolute = os.path.abspath(expanded_path)
normalized_requested = normalize_path(absolute)
# Check if path is within allowed directories
is_allowed = any(normalized_requested.startswith(dir) for dir in allowed_directories)
if not is_allowed:
raise ValueError(f"Acceso denegado - ruta fuera de los directorios permitidos: {absolute} no está en {', '.join(allowed_directories)}")
# Handle symlinks by checking their real path
try:
real_path = os.path.realpath(absolute)
normalized_real = normalize_path(real_path)
is_real_path_allowed = any(normalized_real.startswith(dir) for dir in allowed_directories)
if not is_real_path_allowed:
raise ValueError("Acceso denegado - destino del enlace simbólico fuera de los directorios permitidos")
return real_path
except OSError:
# For paths that don't exist yet, verify parent directory
parent_dir = os.path.dirname(absolute)
try:
real_parent_path = os.path.realpath(parent_dir)
normalized_parent = normalize_path(real_parent_path)
is_parent_allowed = any(normalized_parent.startswith(dir) for dir in allowed_directories)
if not is_parent_allowed:
raise ValueError("Acceso denegado - directorio padre fuera de los directorios permitidos")
return absolute
except OSError:
raise ValueError(f"El directorio padre no existe: {parent_dir}")
async def get_file_stats(file_path: str) -> Dict[str, Union[int, str, bool]]:
"""Obtiene información detallada del archivo."""
stats = os.stat(file_path)
return {
"size": stats.st_size,
"created": datetime.fromtimestamp(stats.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
"accessed": datetime.fromtimestamp(stats.st_atime).isoformat(),
"isDirectory": os.path.isdir(file_path),
"isFile": os.path.isfile(file_path),
"permissions": oct(stats.st_mode)[-3:],
}
async def search_files(
root_path: str,
pattern: str,
exclude_patterns: Optional[List[str]] = None
) -> List[str]:
"""Busca archivos que coincidan con un patrón, con exclusiones opcionales."""
if exclude_patterns is None:
exclude_patterns = []
results = []
for root, dirs, files in os.walk(root_path):
# Check if we should process this directory based on exclude patterns
try:
# Validate each path before processing
await validate_path(root)
# Filter out directories in exclude list
rel_path = os.path.relpath(root, root_path)
dirs[:] = [d for d in dirs if not any(
os.path.relpath(os.path.join(root, d), root_path).startswith(exclude_pattern)
for exclude_pattern in exclude_patterns
)]
# Check all entries in this directory
for name in dirs + files:
full_path = os.path.join(root, name)
try:
await validate_path(full_path)
if pattern.lower() in name.lower():
results.append(full_path)
except ValueError:
# Skip invalid paths
continue
except ValueError:
# Skip invalid paths
continue
return results
# Sequential Thinking Tool
class ThoughtData(TypedDict, total=False):
thought: str
thoughtNumber: int
totalThoughts: int
nextThoughtNeeded: bool
isRevision: Optional[bool]
revisesThought: Optional[int]
branchFromThought: Optional[int]
branchId: Optional[str]
needsMoreThoughts: Optional[bool]
class SequentialThinkingServer:
def __init__(self):
self.thought_history = []
self.branches = {}
def validate_thought_data(self, data: Dict[str, Any]) -> ThoughtData:
if not isinstance(data.get('thought'), str):
raise ValueError('Invalid thought: must be a string')
if not isinstance(data.get('thoughtNumber'), int):
raise ValueError('Invalid thoughtNumber: must be a number')
if not isinstance(data.get('totalThoughts'), int):
raise ValueError('Invalid totalThoughts: must be a number')
if not isinstance(data.get('nextThoughtNeeded'), bool):
raise ValueError('Invalid nextThoughtNeeded: must be a boolean')
return {
'thought': data['thought'],
'thoughtNumber': data['thoughtNumber'],
'totalThoughts': data['totalThoughts'],
'nextThoughtNeeded': data['nextThoughtNeeded'],
'isRevision': data.get('isRevision'),
'revisesThought': data.get('revisesThought'),
'branchFromThought': data.get('branchFromThought'),
'branchId': data.get('branchId'),
'needsMoreThoughts': data.get('needsMoreThoughts')
}
def format_thought(self, thought_data: ThoughtData) -> str:
"""Formatea un pensamiento con bordes coloreados y contexto"""
thought_num = thought_data['thoughtNumber']
total = thought_data['totalThoughts']
thought = thought_data['thought']
is_revision = thought_data.get('isRevision', False)
revises = thought_data.get('revisesThought')
branch_from = thought_data.get('branchFromThought')
branch_id = thought_data.get('branchId')
# Create appropriate prefix and context
if is_revision:
prefix = "🔄 Revisión"
context = f" (revisando pensamiento {revises})"
elif branch_from:
prefix = "🌿 Rama"
context = f" (desde pensamiento {branch_from}, ID: {branch_id})"
else:
prefix = "💭 Pensamiento"
context = ""
header = f"{prefix} {thought_num}/{total}{context}"
border_len = max(len(header), len(thought)) + 4
border = "─" * border_len
# Build the formatted output
output = f"\n┌{border}┐\n"
output += f"│ {header.ljust(border_len)} │\n"
output += f"├{border}┤\n"
output += f"│ {thought.ljust(border_len)} │\n"
output += f"└{border}┘"
return output
def process_thought(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Procesa un pensamiento y devuelve la respuesta"""
try:
validated_input = self.validate_thought_data(input_data)
if validated_input['thoughtNumber'] > validated_input['totalThoughts']:
validated_input['totalThoughts'] = validated_input['thoughtNumber']
self.thought_history.append(validated_input)
# Track branches if applicable
if validated_input.get('branchFromThought') and validated_input.get('branchId'):
branch_id = validated_input['branchId']
if branch_id not in self.branches:
self.branches[branch_id] = []
self.branches[branch_id].append(validated_input)
# Format and log the thought
formatted_thought = self.format_thought(validated_input)
print(formatted_thought, file=sys.stderr)
# Return response
return {
'thoughtNumber': validated_input['thoughtNumber'],
'totalThoughts': validated_input['totalThoughts'],
'nextThoughtNeeded': validated_input['nextThoughtNeeded'],
'branches': list(self.branches.keys()),
'thoughtHistoryLength': len(self.thought_history)
}
except Exception as e:
return {
'error': str(e),
'status': 'failed'
}
# Create a single instance of the sequential thinking server
thinking_server = SequentialThinkingServer()
# Tool implementations
@mcp.tool()
async def read_file(path: str) -> str:
"""Lee el contenido completo de un archivo del sistema de archivos.
Maneja varias codificaciones de texto y proporciona mensajes de error detallados
si el archivo no puede ser leído. Usa esta herramienta cuando necesites examinar
el contenido de un solo archivo. Solo funciona dentro de los directorios permitidos.
Args:
path: La ruta al archivo a leer
"""
valid_path = await validate_path(path)
with open(valid_path, "r", encoding="utf-8") as f:
return f.read()
@mcp.tool()
async def read_multiple_files(paths: List[str]) -> str:
"""Lee el contenido de múltiples archivos simultáneamente.
Esto es más eficiente que leer archivos uno por uno cuando necesitas analizar
o comparar múltiples archivos. El contenido de cada archivo se devuelve con su
ruta como referencia. Las lecturas fallidas para archivos individuales no detendrán
la operación completa. Solo funciona dentro de los directorios permitidos.
Args:
paths: Lista de rutas de archivos a leer
"""
results = []
for file_path in paths:
try:
valid_path = await validate_path(file_path)
with open(valid_path, "r", encoding="utf-8") as f:
content = f.read()
results.append(f"{file_path}:\n{content}\n")
except Exception as e:
results.append(f"{file_path}: Error - {str(e)}")
return "\n---\n".join(results)
@mcp.tool()
async def list_directory(path: str) -> str:
"""Obtiene un listado detallado de todos los archivos y directorios en una ruta especificada.
Los resultados distinguen claramente entre archivos y directorios con prefijos [FILE] y [DIR].
Esta herramienta es esencial para entender la estructura de directorios y
encontrar archivos específicos dentro de un directorio. Solo funciona dentro de los directorios permitidos.
Args:
path: Ruta del directorio a listar
"""
valid_path = await validate_path(path)
entries = os.listdir(valid_path)
formatted = []
for entry in entries:
entry_path = os.path.join(valid_path, entry)
is_dir = os.path.isdir(entry_path)
formatted.append(f"{'[DIR]' if is_dir else '[FILE]'} {entry}")
return "\n".join(formatted)
@mcp.tool()
async def directory_tree(path: str) -> str:
"""Obtiene una vista en árbol recursiva de archivos y directorios como una estructura JSON.
Cada entrada incluye 'name', 'type' (file/directory), y 'children' para directorios.
Los archivos no tienen array children, mientras que los directorios siempre tienen un array children (que puede estar vacío).
La salida está formateada con una indentación de 2 espacios para facilitar la lectura. Solo funciona dentro de los directorios permitidos.
Args:
path: Ruta del directorio raíz para el árbol
"""
valid_path = await validate_path(path)
async def build_tree(current_path):
entries = os.listdir(current_path)
result = []
for entry in entries:
entry_path = os.path.join(current_path, entry)
try:
await validate_path(entry_path)
is_dir = os.path.isdir(entry_path)
entry_data = {
"name": entry,
"type": "directory" if is_dir else "file"
}
if is_dir:
entry_data["children"] = await build_tree(entry_path)
result.append(entry_data)
except ValueError:
# Skip invalid paths
continue
return result
tree_data = await build_tree(valid_path)
return json.dumps(tree_data, indent=2)
@mcp.tool()
async def search_files_tool(path: str, pattern: str, exclude_patterns: Optional[List[str]] = None) -> str:
"""Busca recursivamente archivos y directorios que coincidan con un patrón.
Busca a través de todos los subdirectorios desde la ruta de inicio. La búsqueda
no distingue entre mayúsculas y minúsculas y coincide con nombres parciales. Devuelve rutas completas a todos
los elementos coincidentes. Excelente para encontrar archivos cuando no conoces su ubicación exacta.
Solo busca dentro de los directorios permitidos.
Args:
path: Directorio desde donde comenzar la búsqueda
pattern: Patrón de texto para buscar en nombres de archivos/directorios
exclude_patterns: Lista opcional de patrones a excluir de la búsqueda
"""
valid_path = await validate_path(path)
results = await search_files(valid_path, pattern, exclude_patterns or [])
return "\n".join(results) if results else "No se encontraron coincidencias"
@mcp.tool()
async def get_file_info(path: str) -> str:
"""Recupera metadatos detallados sobre un archivo o directorio.
Devuelve información completa incluyendo tamaño, tiempo de creación, tiempo de última modificación, permisos,
y tipo. Esta herramienta es perfecta para entender las características de un archivo
sin leer el contenido real. Solo funciona dentro de los directorios permitidos.
Args:
path: Ruta al archivo o directorio
"""
valid_path = await validate_path(path)
info = await get_file_stats(valid_path)
return "\n".join(f"{key}: {value}" for key, value in info.items())
@mcp.tool()
def list_allowed_directories() -> str:
"""Devuelve la lista de directorios a los que este servidor tiene permiso para acceder.
Usa esto para entender qué directorios están disponibles antes de intentar acceder a los archivos.
"""
return f"Directorios permitidos:\n{os.linesep.join(allowed_directories)}"
@mcp.tool()
def sequentialthinking(
thought: str,
thoughtNumber: int,
totalThoughts: int,
nextThoughtNeeded: bool,
isRevision: Optional[bool] = None,
revisesThought: Optional[int] = None,
branchFromThought: Optional[int] = None,
branchId: Optional[str] = None,
needsMoreThoughts: Optional[bool] = None
) -> str:
"""Una herramienta detallada para la resolución dinámica y reflexiva de problemas a través de pensamientos.
Esta herramienta ayuda a analizar problemas mediante un proceso de pensamiento flexible que puede adaptarse y evolucionar.
Cada pensamiento puede basarse en, cuestionar o revisar perspectivas previas a medida que se profundiza la comprensión.
Cuándo usar esta herramienta:
- Desglosar problemas complejos en pasos
- Planificación y diseño con espacio para revisión
- Análisis que podría necesitar corrección de rumbo
- Problemas donde el alcance completo podría no estar claro inicialmente
- Problemas que requieren una solución de múltiples pasos
- Tareas que necesitan mantener contexto a lo largo de múltiples pasos
- Situaciones donde la información irrelevante debe ser filtrada
Args:
thought: Tu paso de pensamiento actual
thoughtNumber: Número actual en la secuencia (puede ir más allá del total inicial si es necesario)
totalThoughts: Estimación actual de pensamientos necesarios (puede ajustarse hacia arriba/abajo)
nextThoughtNeeded: Si se necesita otro paso de pensamiento
isRevision: Si esto revisa un pensamiento previo
revisesThought: Qué pensamiento está siendo reconsiderado
branchFromThought: Número de pensamiento del punto de ramificación
branchId: Identificador de rama
needsMoreThoughts: Si se necesitan más pensamientos
"""
input_data = {
'thought': thought,
'thoughtNumber': thoughtNumber,
'totalThoughts': totalThoughts,
'nextThoughtNeeded': nextThoughtNeeded
}
# Add optional parameters if provided
if isRevision is not None:
input_data['isRevision'] = isRevision
if revisesThought is not None:
input_data['revisesThought'] = revisesThought
if branchFromThought is not None:
input_data['branchFromThought'] = branchFromThought
if branchId is not None:
input_data['branchId'] = branchId
if needsMoreThoughts is not None:
input_data['needsMoreThoughts'] = needsMoreThoughts
response = thinking_server.process_thought(input_data)
return json.dumps(response, indent=2)
# Run the server
if __name__ == "__main__":
print("Servidor de Sistema de Archivos MCP Seguro con Pensamiento Secuencial en ejecución", file=sys.stderr)
print(f"Directorios permitidos: {allowed_directories}", file=sys.stderr)
mcp.run(transport='stdio')