Skip to main content
Glama

MCP Server Neurolorap

by aindreyway
collector.py18.5 kB
"""Code collection functionality. This module provides functionality to collect and document code files from specified paths, creating a comprehensive markdown file containing the contents of all relevant files. """ import fnmatch import logging import os from pathlib import Path from typing import Dict, List, Optional, Union from .storage import StorageManager # Get module logger logger = logging.getLogger(__name__) class LanguageMap: """Mapping of file extensions to markdown code block languages.""" EXTENSIONS: Dict[str, str] = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".jsx": "jsx", ".tsx": "tsx", ".html": "html", ".css": "css", ".scss": "scss", ".sass": "sass", ".less": "less", ".md": "markdown", ".json": "json", ".yml": "yaml", ".yaml": "yaml", ".sh": "bash", ".bash": "bash", ".zsh": "bash", ".bat": "batch", ".ps1": "powershell", ".sql": "sql", ".java": "java", ".cpp": "cpp", ".hpp": "cpp", ".c": "c", ".h": "c", ".rs": "rust", ".go": "go", ".rb": "ruby", ".php": "php", ".swift": "swift", ".kt": "kotlin", ".kts": "kotlin", ".r": "r", ".lua": "lua", ".m": "matlab", ".pl": "perl", ".xml": "xml", ".toml": "toml", ".ini": "ini", ".conf": "conf", } @classmethod def get_language(cls, file_path: Path) -> str: """Get language identifier for a file extension. Args: file_path: Path to get extension from. Returns: str: Language identifier for markdown code block. """ return cls.EXTENSIONS.get(file_path.suffix.lower(), "") class CodeCollector: """Main class for collecting and processing code files.""" def __init__( self, project_root: Optional[Path] = None, subproject_id: Optional[str] = None, ) -> None: """Initialize the CodeCollector. Args: project_root: Optional path to project root directory. If not provided, uses current working directory. subproject_id: Optional subproject identifier. If provided, will be appended to project name. """ # Get the project root directory self.project_root = project_root or Path.cwd() logger.debug("Project root: %s", self.project_root) logger.debug("Current working directory: %s", Path.cwd()) # Initialize storage manager self.storage = StorageManager(project_root, subproject_id) self.storage.setup() # Load ignore patterns self.ignore_patterns = self.load_ignore_patterns() def load_ignore_patterns(self) -> List[str]: """Load ignore patterns from .neuroloraignore file. Returns: List[str]: List of ignore patterns """ # First check for user's .neuroloraignore ignore_file = self.project_root / ".neuroloraignore" logger.debug(f"Looking for ignore file at: {ignore_file}") logger.debug(f"Ignore file exists: {ignore_file.exists()}") patterns: List[str] = [] try: if ignore_file.exists(): with open(ignore_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() # Skip empty lines and comments if line and not line.startswith("#"): patterns.append(line) logger.debug(f"Loaded {len(patterns)} ignore patterns") else: logger.debug("No .neuroloraignore found, using empty patterns") except FileNotFoundError: logger.warning("Could not find .neuroloraignore file") except PermissionError: logger.error("Permission denied accessing .neuroloraignore") except UnicodeDecodeError: logger.error("Invalid file encoding in .neuroloraignore") except IOError as e: logger.error(f"I/O error reading .neuroloraignore: {str(e)}") except Exception as e: logger.error( f"Unexpected error loading .neuroloraignore: {str(e)}" ) logger.debug("Stack trace:", exc_info=True) logger.debug(f"Ignore patterns: {patterns}") return patterns def should_ignore_file(self, file_path: Path) -> bool: """Check if file should be ignored based on patterns. Args: file_path: Path to check. Returns: bool: True if file should be ignored, False otherwise. """ # Get relative path from project root try: relative_path = file_path.relative_to(self.project_root) logger.debug(f"Checking relative path: {relative_path}") except ValueError: relative_path = file_path logger.debug(f"Using absolute path: {relative_path}") str_path = str(relative_path) # Check each ignore pattern for pattern in self.ignore_patterns: # Handle directory patterns (ending with /) if pattern.endswith("/"): if any(part == pattern[:-1] for part in relative_path.parts): logger.debug( f"Ignoring {str_path} (matches dir pattern {pattern})" ) return True # Handle file patterns elif fnmatch.fnmatch(str_path, pattern) or fnmatch.fnmatch( file_path.name, pattern ): logger.debug( f"Ignoring {str_path} (matches file pattern {pattern})" ) return True # Additional checks if "FULL_CODE_" in str(file_path): logger.debug(f"Ignoring {str_path} (generated file)") return True # Always ignore .neuroloraignore files if file_path.name == ".neuroloraignore": logger.debug(f"Ignoring {str_path} (ignore file)") return True try: if ( file_path.exists() and file_path.stat().st_size > 1024 * 1024 ): # Skip files > 1MB logger.debug(f"Ignoring {str_path} (too large)") return True except (FileNotFoundError, PermissionError) as e: logger.error(f"Error checking file size for {str_path}: {str(e)}") return True logger.debug(f"Including {str_path}") return False def make_anchor(self, path: Path) -> str: """Create a valid markdown anchor from a path. Args: path: Path to convert to anchor. Returns: str: Valid markdown anchor. """ anchor = str(path).lower() return anchor.replace("/", "-").replace(".", "-").replace(" ", "-") def collect_files(self, input_paths: Union[str, List[str]]) -> List[Path]: """Collect all relevant files from input paths. Args: input_paths: Path(s) to process. Returns: List[Path]: List of files to process. """ all_files: List[Path] = [] # Convert single path to list if isinstance(input_paths, str): input_paths = [input_paths] logger.debug(f"Processing input paths: {input_paths}") for input_path in input_paths: try: # Convert relative path to absolute using project_root path = Path(input_path) if not path.is_absolute(): path = (self.project_root / path).resolve() else: path = path.resolve() logger.debug(f"Processing path: {path}") logger.debug(f"Path absolute: {path.absolute()}") logger.debug(f"Path exists: {path.exists()}") logger.debug(f"Path is file: {path.is_file()}") logger.debug(f"Path is dir: {path.is_dir()}") if not path.exists(): logger.error(f"Path does not exist: {path}") continue except FileNotFoundError: logger.error(f"Path not found: {input_path}") continue except PermissionError: logger.error(f"Permission denied accessing path: {input_path}") continue except OSError as e: logger.error( f"OS error processing path {input_path}: {str(e)}" ) continue except Exception as e: logger.error( f"Unexpected error processing path {input_path}: {str(e)}" ) logger.debug("Stack trace:", exc_info=True) continue if path.is_file(): if not self.should_ignore_file(path): all_files.append(path) else: for root, dirs, files in os.walk(path): root_path = Path(root) logger.debug(f"Walking directory: {root_path}") # Remove ignored directories in-place dirs[:] = [ d for d in dirs if not self.should_ignore_file(root_path / d) ] logger.debug(f"Filtered directories: {dirs}") for file in sorted(files): file_path = root_path / file if not self.should_ignore_file(file_path): all_files.append(file_path) # Sort files with PROJECT_SUMMARY.md first def sort_key(path: Path) -> tuple[int, str]: try: relative_path = path.relative_to(self.project_root) except ValueError: relative_path = path # Sort order: PROJECT_SUMMARY.md first, # then alphabetically is_summary = relative_path.name == "PROJECT_SUMMARY.md" return (0 if is_summary else 1, str(relative_path)) sorted_files = sorted(all_files, key=sort_key) logger.info(f"Found {len(sorted_files)} files to process") logger.debug(f"Files to process: {sorted_files}") return sorted_files def read_file_content(self, file_path: Path) -> str: """Read content of a file with proper encoding handling. Args: file_path: Path to the file to read. Returns: str: Content of the file or error message. """ try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: logger.error(f"File not found: {file_path}") return "[File not found]" except PermissionError: logger.error(f"Permission denied accessing file: {file_path}") return "[Permission denied]" except UnicodeDecodeError: logger.warning(f"Binary file detected: {file_path}") return "[Binary file content not shown]" except IOError as e: logger.error(f"I/O error reading file {file_path}: {str(e)}") return f"[I/O error: {str(e)}]" except Exception as e: logger.error( f"Unexpected error reading file {file_path}: {str(e)}" ) logger.debug("Stack trace:", exc_info=True) return f"[Unexpected error: {str(e)}]" def collect_code( self, input_paths: Union[str, List[str]], title: str = "Code Collection", ) -> Optional[Path]: """Process all files and generate markdown documentation. Args: input_paths: Path(s) to process. title: Title for the collection. Returns: Optional[Path]: Path to generated markdown file or None if failed. The path is returned as a Path object if successful, None if failed. """ try: all_files = self.collect_files(input_paths) if not all_files: logger.warning("No files found to process") return None # Create output files with timestamp and path info from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Convert input paths to string if isinstance(input_paths, str): path_str = input_paths else: path_str = "_".join(input_paths) # Clean up path string path_str = path_str.replace("/", "_").replace(".", "_") code_output_path = self.storage.get_output_path( f"FULL_CODE_{timestamp}_{path_str}_{title}.md" ) # Create parent directory if it doesn't exist code_output_path.parent.mkdir(parents=True, exist_ok=True) # Write code collection file with open(code_output_path, "w", encoding="utf-8") as output_file: # Write header output_file.write(f"# {title}\n\n") output_file.write( "This file contains code from the specified paths, " "organized by file path.\n\n" ) # Write table of contents output_file.write("## Table of Contents\n\n") for file_path in all_files: try: relative_path = file_path.relative_to( self.project_root ) except ValueError: relative_path = file_path anchor = self.make_anchor(relative_path) output_file.write(f"- [{relative_path}](#{anchor})\n") # Write file contents output_file.write("\n## Files\n\n") for file_path in all_files: try: relative_path = file_path.relative_to( self.project_root ) except ValueError: relative_path = file_path content = self.read_file_content(file_path) anchor = self.make_anchor(relative_path) lang = LanguageMap.get_language(file_path) output_file.write(f"### {relative_path} {{{anchor}}}\n") output_file.write(f"```{lang}\n{content}\n```\n\n") logger.debug(f"Processed: {relative_path}") # Force flush and sync output_file.flush() os.fsync(output_file.fileno()) # Force sync to ensure file is visible os.sync() # Wait a bit for filesystem to update import time time.sleep(0.1) # Create analysis prompt file with timestamp analyze_output_path = self.storage.get_output_path( f"PROMPT_ANALYZE_{timestamp}_{path_str}_{title}.md" ) prompt_path = ( Path(__file__).parent / "prompts" / "analyze.prompt.md" ) # Read code content first code_content = self.read_file_content(code_output_path) prompt_content = self.read_file_content(prompt_path) # Write analysis file with open( analyze_output_path, "w", encoding="utf-8" ) as analyze_file: analyze_file.write(prompt_content) analyze_file.write("\n") analyze_file.write(code_content) # Force flush and sync analyze_file.flush() os.fsync(analyze_file.fileno()) # Force sync to ensure file is visible os.sync() # Wait a bit for filesystem to update time.sleep(0.1) # Touch files and all parent directories # to trigger VSCode file watcher try: # Touch output files os.utime(code_output_path, None) os.utime(analyze_output_path, None) # Touch all parent directories up to project root current = code_output_path.parent while ( current != self.project_root and current != current.parent ): os.utime(current, None) current = current.parent os.utime(self.project_root, None) except Exception as e: logger.debug( f"Failed to touch project root: {str(e)}" ) # nosec B110 # Intentionally catching all exceptions for # filesystem operations # Verify files exist and are accessible if not code_output_path.exists(): raise RuntimeError( f"Failed to create code file: {code_output_path}" ) if not analyze_output_path.exists(): raise RuntimeError( f"Failed to create analysis file: {analyze_output_path}" ) logger.debug(f"Analysis prompt created: {analyze_output_path}") logger.info("Code collection complete!") logger.debug(f"Output file: {code_output_path}") return Path(code_output_path) except FileNotFoundError as e: logger.error(f"File not found error: {str(e)}") return None except PermissionError as e: logger.error(f"Permission denied error: {str(e)}") return None except OSError as e: logger.error(f"OS error during code collection: {str(e)}") return None except RuntimeError as e: logger.error(f"Runtime error during code collection: {str(e)}") return None except Exception as e: logger.error(f"Unexpected error during code collection: {str(e)}") logger.debug("Stack trace:", exc_info=True) return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aindreyway/mcp-server-neurolora-p'

If you have feedback or need assistance with the MCP directory API, please join our Discord server