LPS MCP

by lpsDevelopers
Verified
#!/usr/bin/env python3 import os import json import sys import pathlib from datetime import datetime from typing import List, Dict, Any, Optional, Union, TypedDict from mcp.server.fastmcp import FastMCP, Context # Create a FastMCP server instance mcp = FastMCP("secure-filesystem-server") # Command line argument parsing if len(sys.argv) < 2: print("Usage: python filesystem_server.py <allowed-directory> [additional-directories...]", file=sys.stderr) sys.exit(1) # Normalize all paths consistently def normalize_path(p: str) -> str: return os.path.normpath(p) def expand_home(filepath: str) -> str: if filepath.startswith('~/') or filepath == '~': return os.path.join(os.path.expanduser('~'), filepath[1:]) return filepath # Store allowed directories in normalized form allowed_directories = [ normalize_path(os.path.abspath(expand_home(dir))) for dir in sys.argv[1:] ] # Validate that all directories exist and are accessible for dir_path in sys.argv[1:]: expanded_path = expand_home(dir_path) try: stats = os.stat(expanded_path) if not os.path.isdir(expanded_path): print(f"Error: {dir_path} is not a directory", file=sys.stderr) sys.exit(1) except OSError as e: print(f"Error accessing directory {dir_path}: {e}", file=sys.stderr) sys.exit(1) # Security utilities async def validate_path(requested_path: str) -> str: """Validate and resolve file paths against allowed directories for security.""" expanded_path = expand_home(requested_path) absolute = os.path.abspath(expanded_path) normalized_requested = normalize_path(absolute) # Check if path is within allowed directories is_allowed = any(normalized_requested.startswith(dir) for dir in allowed_directories) if not is_allowed: raise ValueError(f"Access denied - path outside allowed directories: {absolute} not in {', '.join(allowed_directories)}") # Handle symlinks by checking their real path try: real_path = os.path.realpath(absolute) normalized_real = normalize_path(real_path) is_real_path_allowed = any(normalized_real.startswith(dir) for dir in allowed_directories) if not is_real_path_allowed: raise ValueError("Access denied - symlink target outside allowed directories") return real_path except OSError: # For paths that don't exist yet, verify parent directory parent_dir = os.path.dirname(absolute) try: real_parent_path = os.path.realpath(parent_dir) normalized_parent = normalize_path(real_parent_path) is_parent_allowed = any(normalized_parent.startswith(dir) for dir in allowed_directories) if not is_parent_allowed: raise ValueError("Access denied - parent directory outside allowed directories") return absolute except OSError: raise ValueError(f"Parent directory does not exist: {parent_dir}") async def get_file_stats(file_path: str) -> Dict[str, Union[int, str, bool]]: """Get detailed file information.""" stats = os.stat(file_path) return { "size": stats.st_size, "created": datetime.fromtimestamp(stats.st_ctime).isoformat(), "modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), "accessed": datetime.fromtimestamp(stats.st_atime).isoformat(), "isDirectory": os.path.isdir(file_path), "isFile": os.path.isfile(file_path), "permissions": oct(stats.st_mode)[-3:], } async def search_files( root_path: str, pattern: str, exclude_patterns: Optional[List[str]] = None ) -> List[str]: """Search for files matching a pattern, with optional exclusions.""" if exclude_patterns is None: exclude_patterns = [] results = [] for root, dirs, files in os.walk(root_path): # Check if we should process this directory based on exclude patterns try: # Validate each path before processing await validate_path(root) # Filter out directories in exclude list rel_path = os.path.relpath(root, root_path) dirs[:] = [d for d in dirs if not any( os.path.relpath(os.path.join(root, d), root_path).startswith(exclude_pattern) for exclude_pattern in exclude_patterns )] # Check all entries in this directory for name in dirs + files: full_path = os.path.join(root, name) try: await validate_path(full_path) if pattern.lower() in name.lower(): results.append(full_path) except ValueError: # Skip invalid paths continue except ValueError: # Skip invalid paths continue return results # Sequential Thinking Tool class ThoughtData(TypedDict, total=False): thought: str thoughtNumber: int totalThoughts: int nextThoughtNeeded: bool isRevision: Optional[bool] revisesThought: Optional[int] branchFromThought: Optional[int] branchId: Optional[str] needsMoreThoughts: Optional[bool] class SequentialThinkingServer: def __init__(self): self.thought_history = [] self.branches = {} def validate_thought_data(self, data: Dict[str, Any]) -> ThoughtData: if not isinstance(data.get('thought'), str): raise ValueError('Invalid thought: must be a string') if not isinstance(data.get('thoughtNumber'), int): raise ValueError('Invalid thoughtNumber: must be a number') if not isinstance(data.get('totalThoughts'), int): raise ValueError('Invalid totalThoughts: must be a number') if not isinstance(data.get('nextThoughtNeeded'), bool): raise ValueError('Invalid nextThoughtNeeded: must be a boolean') return { 'thought': data['thought'], 'thoughtNumber': data['thoughtNumber'], 'totalThoughts': data['totalThoughts'], 'nextThoughtNeeded': data['nextThoughtNeeded'], 'isRevision': data.get('isRevision'), 'revisesThought': data.get('revisesThought'), 'branchFromThought': data.get('branchFromThought'), 'branchId': data.get('branchId'), 'needsMoreThoughts': data.get('needsMoreThoughts') } def format_thought(self, thought_data: ThoughtData) -> str: """Format a thought with colored borders and context""" thought_num = thought_data['thoughtNumber'] total = thought_data['totalThoughts'] thought = thought_data['thought'] is_revision = thought_data.get('isRevision', False) revises = thought_data.get('revisesThought') branch_from = thought_data.get('branchFromThought') branch_id = thought_data.get('branchId') # Create appropriate prefix and context if is_revision: prefix = "🔄 Revision" context = f" (revising thought {revises})" elif branch_from: prefix = "🌿 Branch" context = f" (from thought {branch_from}, ID: {branch_id})" else: prefix = "💭 Thought" context = "" header = f"{prefix} {thought_num}/{total}{context}" border_len = max(len(header), len(thought)) + 4 border = "─" * border_len # Build the formatted output output = f"\n┌{border}┐\n" output += f"│ {header.ljust(border_len)} │\n" output += f"├{border}┤\n" output += f"│ {thought.ljust(border_len)} │\n" output += f"└{border}┘" return output def process_thought(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """Process a thought and return the response""" try: validated_input = self.validate_thought_data(input_data) if validated_input['thoughtNumber'] > validated_input['totalThoughts']: validated_input['totalThoughts'] = validated_input['thoughtNumber'] self.thought_history.append(validated_input) # Track branches if applicable if validated_input.get('branchFromThought') and validated_input.get('branchId'): branch_id = validated_input['branchId'] if branch_id not in self.branches: self.branches[branch_id] = [] self.branches[branch_id].append(validated_input) # Format and log the thought formatted_thought = self.format_thought(validated_input) print(formatted_thought, file=sys.stderr) # Return response return { 'thoughtNumber': validated_input['thoughtNumber'], 'totalThoughts': validated_input['totalThoughts'], 'nextThoughtNeeded': validated_input['nextThoughtNeeded'], 'branches': list(self.branches.keys()), 'thoughtHistoryLength': len(self.thought_history) } except Exception as e: return { 'error': str(e), 'status': 'failed' } # Create a single instance of the sequential thinking server thinking_server = SequentialThinkingServer() # Tool implementations @mcp.tool() async def read_file(path: str) -> str: """Read the complete contents of a file from the file system. Handles various text encodings and provides detailed error messages if the file cannot be read. Use this tool when you need to examine the contents of a single file. Only works within allowed directories. Args: path: The path to the file to read """ valid_path = await validate_path(path) with open(valid_path, "r", encoding="utf-8") as f: return f.read() @mcp.tool() async def read_multiple_files(paths: List[str]) -> str: """Read the contents of multiple files simultaneously. This is more efficient than reading files one by one when you need to analyze or compare multiple files. Each file's content is returned with its path as a reference. Failed reads for individual files won't stop the entire operation. Only works within allowed directories. Args: paths: List of file paths to read """ results = [] for file_path in paths: try: valid_path = await validate_path(file_path) with open(valid_path, "r", encoding="utf-8") as f: content = f.read() results.append(f"{file_path}:\n{content}\n") except Exception as e: results.append(f"{file_path}: Error - {str(e)}") return "\n---\n".join(results) @mcp.tool() async def list_directory(path: str) -> str: """Get a detailed listing of all files and directories in a specified path. Results clearly distinguish between files and directories with [FILE] and [DIR] prefixes. This tool is essential for understanding directory structure and finding specific files within a directory. Only works within allowed directories. Args: path: Directory path to list """ valid_path = await validate_path(path) entries = os.listdir(valid_path) formatted = [] for entry in entries: entry_path = os.path.join(valid_path, entry) is_dir = os.path.isdir(entry_path) formatted.append(f"{'[DIR]' if is_dir else '[FILE]'} {entry}") return "\n".join(formatted) @mcp.tool() async def directory_tree(path: str) -> str: """Get a recursive tree view of files and directories as a JSON structure. Each entry includes 'name', 'type' (file/directory), and 'children' for directories. Files have no children array, while directories always have a children array (which may be empty). The output is formatted with 2-space indentation for readability. Only works within allowed directories. Args: path: Root directory path for the tree """ valid_path = await validate_path(path) async def build_tree(current_path): entries = os.listdir(current_path) result = [] for entry in entries: entry_path = os.path.join(current_path, entry) try: await validate_path(entry_path) is_dir = os.path.isdir(entry_path) entry_data = { "name": entry, "type": "directory" if is_dir else "file" } if is_dir: entry_data["children"] = await build_tree(entry_path) result.append(entry_data) except ValueError: # Skip invalid paths continue return result tree_data = await build_tree(valid_path) return json.dumps(tree_data, indent=2) @mcp.tool() async def search_files_tool(path: str, pattern: str, exclude_patterns: Optional[List[str]] = None) -> str: """Recursively search for files and directories matching a pattern. Searches through all subdirectories from the starting path. The search is case-insensitive and matches partial names. Returns full paths to all matching items. Great for finding files when you don't know their exact location. Only searches within allowed directories. Args: path: Directory to start searching from pattern: Text pattern to search for in file/directory names exclude_patterns: Optional list of patterns to exclude from search """ valid_path = await validate_path(path) results = await search_files(valid_path, pattern, exclude_patterns or []) return "\n".join(results) if results else "No matches found" @mcp.tool() async def get_file_info(path: str) -> str: """Retrieve detailed metadata about a file or directory. Returns comprehensive information including size, creation time, last modified time, permissions, and type. This tool is perfect for understanding file characteristics without reading the actual content. Only works within allowed directories. Args: path: Path to the file or directory """ valid_path = await validate_path(path) info = await get_file_stats(valid_path) return "\n".join(f"{key}: {value}" for key, value in info.items()) @mcp.tool() def list_allowed_directories() -> str: """Returns the list of directories that this server is allowed to access. Use this to understand which directories are available before trying to access files. """ return f"Allowed directories:\n{os.linesep.join(allowed_directories)}" @mcp.tool() def sequentialthinking( thought: str, thoughtNumber: int, totalThoughts: int, nextThoughtNeeded: bool, isRevision: Optional[bool] = None, revisesThought: Optional[int] = None, branchFromThought: Optional[int] = None, branchId: Optional[str] = None, needsMoreThoughts: Optional[bool] = None ) -> str: """A detailed tool for dynamic and reflective problem-solving through thoughts. This tool helps analyze problems through a flexible thinking process that can adapt and evolve. Each thought can build on, question, or revise previous insights as understanding deepens. When to use this tool: - Breaking down complex problems into steps - Planning and design with room for revision - Analysis that might need course correction - Problems where the full scope might not be clear initially - Problems that require a multi-step solution - Tasks that need to maintain context over multiple steps - Situations where irrelevant information needs to be filtered out Args: thought: Your current thinking step thoughtNumber: Current number in sequence (can go beyond initial total if needed) totalThoughts: Current estimate of thoughts needed (can be adjusted up/down) nextThoughtNeeded: Whether another thought step is needed isRevision: Whether this revises previous thinking revisesThought: Which thought is being reconsidered branchFromThought: Branching point thought number branchId: Branch identifier needsMoreThoughts: If more thoughts are needed """ input_data = { 'thought': thought, 'thoughtNumber': thoughtNumber, 'totalThoughts': totalThoughts, 'nextThoughtNeeded': nextThoughtNeeded } # Add optional parameters if provided if isRevision is not None: input_data['isRevision'] = isRevision if revisesThought is not None: input_data['revisesThought'] = revisesThought if branchFromThought is not None: input_data['branchFromThought'] = branchFromThought if branchId is not None: input_data['branchId'] = branchId if needsMoreThoughts is not None: input_data['needsMoreThoughts'] = needsMoreThoughts response = thinking_server.process_thought(input_data) return json.dumps(response, indent=2) # Run the server if __name__ == "__main__": print("Secure MCP Filesystem Server with Sequential Thinking running", file=sys.stderr) print(f"Allowed directories: {allowed_directories}", file=sys.stderr) mcp.run(transport='stdio')