Skip to main content
Glama

PAELLADOC

by jlcases
repopackpy-output.txt266 kB
================================================================ RepopackPy Output File ================================================================ This file was generated by RepopackPy on: 2025-04-23T04:47:53.216367 Purpose: -------- This file contains a packed representation of the entire repository's contents. It is designed to be easily consumable by AI systems for analysis, code review, or other automated processes. File Format: ------------ The content is organized as follows: 1. This header section 2. Repository structure 3. Multiple file entries, each consisting of: a. A separator line (================) b. The file path (File: path/to/file) c. Another separator line d. The full contents of the file e. A blank line Usage Guidelines: ----------------- 1. This file should be treated as read-only. Any changes should be made to the original repository files, not this packed version. 2. When processing this file, use the separators and "File:" markers to distinguish between different files in the repository. 3. Be aware that this file may contain sensitive information. Handle it with the same level of security as you would the original repository. Notes: ------ - Some files may have been excluded based on .gitignore rules and RepopackPy's configuration. - Binary files are not included in this packed representation. Please refer to the Repository Structure section for a complete list of file paths, including binary files. For more information about RepopackPy, visit: https://github.com/abinthomasonline/repopack-py ================================================================ Repository Structure ================================================================ paelladoc/ adapters/ input/ __init__.py output/ chroma/ chroma_vector_store_adapter.py filesystem/ taxonomy_provider.py sqlite/ db_models.py mapper.py models.py sqlite_memory_adapter.py __init__.py persistence/ __init__.py plugins/ code/ __init__.py code_generation.py generate_context.py generate_doc.py core/ __init__.py continue_proj.py help.py list_projects.py paella.py verification.py memory/ __init__.py project_memory.py product/ __init__.py product_management.py styles/ __init__.py coding_styles.py git_workflows.py templates/ __init__.py templates.py __init__.py services/ system_time_service.py __init__.py application/ services/ memory_service.py vector_store_service.py utils/ behavior_enforcer.py __init__.py config/ database.py domain/ models/ enums.py fix_metadata.py language.py project.py services/ time_service.py __init__.py core_logic.py infrastructure/ __init__.py ports/ input/ __init__.py mcp_port.py mcp_server_adapter.py output/ __init__.py memory_port.py taxonomy_provider.py vector_store_port.py __init__.py __init__.py tests/ e2e/ test_cursor_simulation.py integration/ adapters/ output/ test_chroma_vector_store_adapter.py test_sqlite_memory_adapter.py test_sqlite_memory_adapter_config.py plugins/ core/ test_list_projects.py test_paella.py test_alembic_config.py test_server.py unit/ application/ services/ test_memory_service.py test_vector_store_service.py utils/ test_behavior_enforcer.py config/ test_database.py domain/ models/ test_project.py test_ping_tool.py README.md conftest.py update_test_references.py ================================================================ Repository Files ================================================================ ================ File: update_test_references.py ================ #!/usr/bin/env python3 import os import re def update_references_in_file(file_path): with open(file_path, "r") as f: content = f.read() # 1. Actualizar referencias a ProjectMetadata por ProjectInfo content = re.sub(r"Metadata as ProjectMetadata", "ProjectInfo", content) content = re.sub(r"ProjectMetadata", "ProjectInfo", content) # 2. Actualizar referencias a metadata por project_info content = re.sub(r"\.metadata\.", ".project_info.", content) content = re.sub(r"memory\.metadata", "memory.project_info", content) content = re.sub( r"original_memory\.metadata", "original_memory.project_info", content ) content = re.sub(r"project\.metadata", "project.project_info", content) with open(file_path, "w") as f: f.write(content) print(f"Actualizado: {file_path}") def find_and_update_test_files(directory): for root, _, files in os.walk(directory): for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) update_references_in_file(file_path) if __name__ == "__main__": find_and_update_test_files("tests") # Tambi√©n actualizar los adaptadores find_and_update_test_files("paelladoc/adapters") ================ File: paelladoc/config/database.py ================ """Database configuration module.""" import os from pathlib import Path import json import logging logger = logging.getLogger(__name__) CONFIG_FILE_NAME = "paelladoc_config.json" def get_project_root() -> Path: """Get the project root directory.""" return Path(__file__).parent.parent.parent.parent def get_config_file() -> Path: """Get the path to the configuration file.""" # Check multiple locations in order of precedence possible_locations = [ Path.cwd() / CONFIG_FILE_NAME, # Current directory (development) Path.home() / ".paelladoc" / CONFIG_FILE_NAME, # User's home directory Path("/etc/paelladoc") / CONFIG_FILE_NAME, # System-wide configuration ] for location in possible_locations: if location.exists(): return location # If no config file exists, use the default in user's home default_location = Path.home() / ".paelladoc" / CONFIG_FILE_NAME default_location.parent.mkdir(parents=True, exist_ok=True) if not default_location.exists(): default_config = { "db_path": str(Path.home() / ".paelladoc" / "memory.db"), "environment": "production", } with open(default_location, "w") as f: json.dump(default_config, f, indent=2) return default_location def get_db_path() -> Path: """ Get the database path based on multiple configuration sources. Priority: 1. PAELLADOC_DB_PATH environment variable if set 2. Path specified in configuration file 3. Default path in user's home directory (~/.paelladoc/memory.db) The configuration can be set during package installation with: pip install paelladoc --install-option="--db-path=/path/to/db" Or by editing the config file at: - ./paelladoc_config.json (development) - ~/.paelladoc/paelladoc_config.json (user) - /etc/paelladoc/paelladoc_config.json (system) """ # 1. Check environment variable first (highest priority) env_path = os.getenv("PAELLADOC_DB_PATH") if env_path: db_path = Path(env_path) logger.info(f"Using database path from environment variable: {db_path}") return db_path # 2. Check configuration file config_file = get_config_file() try: with open(config_file) as f: config = json.load(f) if "db_path" in config: db_path = Path(config["db_path"]) logger.info( f"Using database path from config file {config_file}: {db_path}" ) return db_path except Exception as e: logger.warning(f"Error reading config file {config_file}: {e}") # 3. Default to production path in user's home db_path = Path.home() / ".paelladoc" / "memory.db" db_path.parent.mkdir(parents=True, exist_ok=True) logger.info(f"Using default database path: {db_path}") return db_path def set_db_path(path: str | Path) -> None: """ Set the database path in the configuration file. This can be used programmatically or during package installation. """ config_file = get_config_file() try: if config_file.exists(): with open(config_file) as f: config = json.load(f) else: config = {} config["db_path"] = str(Path(path).resolve()) with open(config_file, "w") as f: json.dump(config, f, indent=2) logger.info(f"Updated database path in {config_file} to: {path}") except Exception as e: logger.error(f"Error updating database path in config file: {e}") raise # Default paths for reference (These might become less relevant or just informative) # DEVELOPMENT_DB_PATH = get_project_root() / "paelladoc_memory.db" PRODUCTION_DB_PATH = Path.home() / ".paelladoc" / "memory.db" DEFAULT_DB_PATH = get_db_path() ================ File: paelladoc/adapters/plugins/__init__.py ================ import pkgutil import importlib import logging from pathlib import Path logger = logging.getLogger(__name__) # Dynamically import all submodules (like core, code, styles, etc.) # This ensures their __init__.py files are executed, which should in turn # import the actual plugin files containing @mcp.tool decorators. package_path = str(Path(__file__).parent) package_name = __name__ logger.info(f"Dynamically loading plugins from: {package_path}") for module_info in pkgutil.iter_modules([package_path]): if module_info.ispkg: # Only import potential packages (directories) sub_package_name = f"{package_name}.{module_info.name}" try: importlib.import_module(sub_package_name) logger.debug(f"Successfully imported plugin package: {sub_package_name}") except Exception as e: logger.warning(f"Could not import plugin package {sub_package_name}: {e}") logger.info("Finished dynamic plugin package loading.") ================ File: paelladoc/adapters/plugins/core/verification.py ================ from paelladoc.domain.core_logic import mcp, logger from typing import Dict, Any from paelladoc.domain.models.project import ProjectMemory # Domain models from paelladoc.domain.models.project import ( DocumentStatus, Bucket, ) # Adapter for persistence from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # Adapter for taxonomy loading from paelladoc.adapters.output.filesystem.taxonomy_provider import ( FileSystemTaxonomyProvider, ) # Behavior configuration BEHAVIOR_CONFIG = { "check_mece_coverage": True, "enforce_documentation_first": True, "block_code_generation_if_incomplete": True, "minimum_coverage_threshold": 0.7, # 70% minimum coverage (default, can be overridden) "taxonomy_version_check": True, } # Instantiate the taxonomy provider # TODO: Replace direct instantiation with Dependency Injection TAXONOMY_PROVIDER = FileSystemTaxonomyProvider() def validate_mece_structure(memory: ProjectMemory) -> dict: """Validates the MECE taxonomy structure of a project against available taxonomies.""" validation = { "is_valid": True, "missing_dimensions": [], "invalid_combinations": [], "warnings": [], } # Get available taxonomies from the provider try: valid_taxonomies = TAXONOMY_PROVIDER.get_available_taxonomies() except Exception as e: logger.error(f"Failed to load taxonomies for validation: {e}", exc_info=True) validation["warnings"].append( "Could not load taxonomy definitions for validation." ) # Mark as invalid if we can't load definitions? validation["is_valid"] = False return validation # Check required dimensions if not memory.platform_taxonomy: validation["missing_dimensions"].append("platform") elif memory.platform_taxonomy not in valid_taxonomies.get("platform", []): validation["invalid_combinations"].append( f"Invalid platform taxonomy: {memory.platform_taxonomy}" ) if not memory.domain_taxonomy: validation["missing_dimensions"].append("domain") elif memory.domain_taxonomy not in valid_taxonomies.get("domain", []): validation["invalid_combinations"].append( f"Invalid domain taxonomy: {memory.domain_taxonomy}" ) if not memory.size_taxonomy: validation["missing_dimensions"].append("size") elif memory.size_taxonomy not in valid_taxonomies.get("size", []): validation["invalid_combinations"].append( f"Invalid size taxonomy: {memory.size_taxonomy}" ) # Compliance is optional if ( memory.compliance_taxonomy and memory.compliance_taxonomy not in valid_taxonomies.get("compliance", []) ): validation["invalid_combinations"].append( f"Invalid compliance taxonomy: {memory.compliance_taxonomy}" ) # Validate specific combinations if memory.platform_taxonomy and memory.domain_taxonomy: # Example: Mobile apps shouldn't use CMS domain if ( memory.platform_taxonomy in ["ios-native", "android-native", "react-native", "flutter"] and memory.domain_taxonomy == "cms" ): validation["warnings"].append( "Mobile platforms rarely implement full CMS functionality" ) # Update overall validity validation["is_valid"] = ( not validation["missing_dimensions"] and not validation["invalid_combinations"] ) return validation @mcp.tool( name="core_verification", description="Verifies documentation coverage against the MECE taxonomy", ) async def core_verification(project_name: str) -> dict: """Checks documentation against templates and project memory. Calculates an overall quality/completion score based on MECE taxonomy coverage. Returns an error if documentation is incomplete based on defined criteria. Args: project_name: The name of the project to verify Returns: A dictionary with verification results and coverage metrics """ logger.info(f"Executing core.verification for project: {project_name}") # --- Initialize the memory adapter --- try: memory_adapter = SQLiteMemoryAdapter() logger.info( f"core.verification using DB path: {memory_adapter.db_path.resolve()}" ) except Exception as e: logger.error(f"Failed to instantiate SQLiteMemoryAdapter: {e}", exc_info=True) return { "status": "error", "message": "Internal server error: Could not initialize memory adapter.", } # --- Load Project Memory --- try: memory = await memory_adapter.load_memory(project_name) if not memory: logger.warning( f"Project '{project_name}' not found for VERIFICATION command." ) return { "status": "error", "message": f"Project '{project_name}' not found. Use PAELLA command to start it.", } logger.info(f"Successfully loaded memory for project: {project_name}") except Exception as e: logger.error(f"Error loading memory for '{project_name}': {e}", exc_info=True) return { "status": "error", "message": f"Failed to load project memory: {e}", } # Add MECE validation mece_validation = validate_mece_structure(memory) # Calculate coverage only if MECE structure is valid if not mece_validation["is_valid"]: return { "status": "error", "message": "Invalid MECE taxonomy structure", "validation": mece_validation, } # --- Check for custom taxonomy --- custom_taxonomy = None relevant_buckets = set() min_threshold = BEHAVIOR_CONFIG["minimum_coverage_threshold"] if hasattr(memory, "custom_taxonomy") and memory.custom_taxonomy: logger.info(f"Using custom taxonomy for project '{project_name}'") custom_taxonomy = memory.custom_taxonomy # Load relevant buckets from custom taxonomy relevant_buckets = set(custom_taxonomy.get("buckets", [])) logger.info(f"Custom taxonomy has {len(relevant_buckets)} relevant buckets") # Use custom threshold if specified if "minimum_coverage_threshold" in custom_taxonomy: min_threshold = custom_taxonomy["minimum_coverage_threshold"] logger.info(f"Using custom threshold: {min_threshold}") else: logger.info("No custom taxonomy found, using all buckets") # Use all buckets except system ones relevant_buckets = { bucket.value for bucket in Bucket if bucket != Bucket.UNKNOWN } # --- Calculate MECE Coverage --- # Get completion stats for each bucket bucket_stats: Dict[str, Dict[str, Any]] = {} total_artifacts = 0 total_completed = 0 total_in_progress = 0 total_pending = 0 # Skip these buckets as they're more system-oriented, not documentation system_buckets = { Bucket.UNKNOWN, Bucket.MAINTAIN_CORE_FUNCTIONALITY, Bucket.GOVERN_TOOLING_SCRIPTS, } system_bucket_values = {b.value for b in system_buckets} # Custom bucket weights (either from custom taxonomy or defaults) bucket_weights = {} # If we have custom taxonomy with bucket details and weights if custom_taxonomy and "bucket_details" in custom_taxonomy: for bucket_name, details in custom_taxonomy["bucket_details"].items(): if "weight" in details: bucket_weights[bucket_name] = details["weight"] # Default weights for important buckets if not specified in custom taxonomy if not bucket_weights: bucket_weights = { Bucket.INITIATE_INITIAL_PRODUCT_DOCS.value: 1.5, # High importance Bucket.ELABORATE_SPECIFICATION_AND_PLANNING.value: 1.3, # High importance Bucket.GOVERN_STANDARDS_METHODOLOGIES.value: 1.2, # Medium-high importance Bucket.GENERATE_CORE_FUNCTIONALITY.value: 1.1, # Medium-high importance } # Calculate stats for each bucket for bucket in Bucket: bucket_value = bucket.value # Skip system buckets and buckets not in the relevant set if ( bucket in system_buckets or bucket_value in system_bucket_values or (relevant_buckets and bucket_value not in relevant_buckets) ): continue artifacts = memory.artifacts.get(bucket, []) if not artifacts: # If no artifacts but bucket is relevant, track as empty bucket if bucket_value in relevant_buckets: bucket_stats[bucket_value] = { "total": 0, "completed": 0, "in_progress": 0, "pending": 0, "completion_percentage": 0.0, } continue bucket_total = len(artifacts) bucket_completed = sum( 1 for a in artifacts if a.status == DocumentStatus.COMPLETED ) bucket_in_progress = sum( 1 for a in artifacts if a.status == DocumentStatus.IN_PROGRESS ) bucket_pending = bucket_total - bucket_completed - bucket_in_progress # Calculate completion percentage completion_pct = ( (bucket_completed + (bucket_in_progress * 0.5)) / bucket_total if bucket_total > 0 else 0 ) # Store statistics bucket_stats[bucket_value] = { "total": bucket_total, "completed": bucket_completed, "in_progress": bucket_in_progress, "pending": bucket_pending, "completion_percentage": completion_pct, } # Update global counters total_artifacts += bucket_total total_completed += bucket_completed total_in_progress += bucket_in_progress total_pending += bucket_pending # Add custom buckets from taxonomy that aren't standard Bucket enums if custom_taxonomy and "buckets" in custom_taxonomy: for bucket_name in custom_taxonomy["buckets"]: # Skip if already processed above if bucket_name in bucket_stats: continue # This is a custom bucket not in the standard Bucket enum # For now, treat it as empty/pending bucket_stats[bucket_name] = { "total": 0, "completed": 0, "in_progress": 0, "pending": 0, "completion_percentage": 0.0, "custom": True, } # Calculate overall weighted completion percentage if total_artifacts > 0: # Simple (unweighted) calculation simple_completion_pct = ( total_completed + (total_in_progress * 0.5) ) / total_artifacts # Weighted calculation weighted_sum = 0 weight_sum = 0 for bucket_name, stats in bucket_stats.items(): if stats.get("total", 0) == 0: continue # Get weight for this bucket (default to 1.0) bucket_weight = bucket_weights.get(bucket_name, 1.0) weight_sum += bucket_weight weighted_sum += stats["completion_percentage"] * bucket_weight weighted_completion_pct = weighted_sum / weight_sum if weight_sum > 0 else 0 else: simple_completion_pct = 0 weighted_completion_pct = 0 # Determine overall status is_complete = weighted_completion_pct >= min_threshold # Identify buckets that need attention (< 50% complete) needs_attention = [] for bucket, stats in bucket_stats.items(): if stats["completion_percentage"] < 0.5: needs_attention.append( { "bucket": bucket, "completion": stats["completion_percentage"], "missing_docs": stats["pending"], } ) # Sort by completion percentage (lowest first) needs_attention.sort(key=lambda x: x["completion"]) # Create verification result result = { "status": "ok", "project_name": project_name, "overall_status": "complete" if is_complete else "incomplete", "completion_percentage": weighted_completion_pct, "simple_completion_percentage": simple_completion_pct, "meets_threshold": is_complete, "threshold": min_threshold, "total_artifacts": total_artifacts, "total_completed": total_completed, "total_in_progress": total_in_progress, "total_pending": total_pending, "bucket_stats": bucket_stats, "needs_attention": needs_attention, "taxonomy_version": memory.taxonomy_version, "custom_taxonomy": bool(custom_taxonomy), "message": ( f"Documentation is {weighted_completion_pct:.1%} complete " f"({'meets' if is_complete else 'does not meet'} {min_threshold:.1%} threshold)." ), "allow_code_generation": is_complete or not BEHAVIOR_CONFIG["block_code_generation_if_incomplete"], "mece_validation": mece_validation, "taxonomy_structure": { "platform": memory.platform_taxonomy, "domain": memory.domain_taxonomy, "size": memory.size_taxonomy, "compliance": memory.compliance_taxonomy, }, } return result ================ File: paelladoc/adapters/plugins/core/__init__.py ================ import pkgutil import importlib import logging from pathlib import Path logger = logging.getLogger(__name__) # Dynamically import all modules within this 'core' package # to ensure @mcp.tool decorators are executed. package_path = str(Path(__file__).parent) package_name = __name__ logger.info(f"Dynamically loading core plugins from: {package_path}") for module_info in pkgutil.iter_modules([package_path]): # Import all .py files (except __init__.py itself) if module_info.name != "__init__" and not module_info.ispkg: module_name = f"{package_name}.{module_info.name}" try: importlib.import_module(module_name) logger.debug(f"Successfully loaded core plugin module: {module_name}") except Exception as e: logger.warning(f"Could not load core plugin module {module_name}: {e}") logger.info("Finished dynamic core plugin loading.") """ Core plugins for PAELLADOC command handling. Imports: - help: Provides the HELP command functionality. - paella: Initiates new documentation projects. - continue_proj: Continues existing documentation projects. - verification: Verifies documentation integrity. - list_projects: Lists existing projects. """ # Removed explicit imports and __all__, relying on dynamic loading above # from .help import core_help # from .paella import core_paella # This was causing issues # from .continue_proj import core_continue # from .verification import core_verification # from .list_projects import list_projects # # __all__ = [ # "core_help", # "core_paella", # "core_continue", # "core_verification", # "list_projects", # ] ================ File: paelladoc/adapters/plugins/core/paella.py ================ """PAELLADOC project initialization module.""" from pathlib import Path from typing import Dict, Optional # Import the shared FastMCP instance from core_logic from paelladoc.domain.core_logic import mcp, logger # Domain models and services from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, Bucket, DocumentStatus, set_time_service, ) from paelladoc.adapters.services.system_time_service import SystemTimeService # Adapter for persistence from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # Initialize logger for this module # logger is already imported from core_logic # Create FastMCP instance - REMOVED, using imported instance # mcp = FastMCP("PAELLADOC") @mcp.tool() async def paella_init( base_path: str, documentation_language: str, interaction_language: str, new_project_name: str, platform_taxonomy: str, # e.g., "pwa", "web-frontend", "vscode-extension" domain_taxonomy: str, size_taxonomy: str, compliance_taxonomy: str, custom_taxonomy: Optional[Dict] = None, # Still optional ) -> Dict: """ Initiates the conversational workflow to define and document a new PAELLADOC project. This tool gathers essential project details, including the core taxonomies (platform, domain, size, compliance) which are mandatory for project setup and analysis. It creates the project structure and persists the initial memory state with all provided information. Once executed successfully, the project is initialized with its defined taxonomies and ready for the next conversational steps. Args: base_path: Base path where the project documentation will be stored. documentation_language: Primary language for the generated documentation (e.g., 'en', 'es'). interaction_language: Language used during conversational interactions (e.g., 'en', 'es'). new_project_name: Unique name for the new PAELLADOC project. platform_taxonomy: Identifier for the target platform (e.g., "pwa", "web-frontend"). domain_taxonomy: Identifier for the project's domain (e.g., "ecommerce", "healthcare"). size_taxonomy: Identifier for the estimated project size (e.g., "mvp", "enterprise"). compliance_taxonomy: Identifier for any compliance requirements (e.g., "gdpr", "none"). custom_taxonomy: (Optional) A dictionary for any user-defined taxonomy. Returns: A dictionary confirming the project's creation ('status': 'ok') or detailing an error ('status': 'error'). On success, includes the 'project_name' and resolved 'base_path'. """ logger.info( f"Initializing new project: {new_project_name} with taxonomies: Platform={platform_taxonomy}, Domain={domain_taxonomy}, Size={size_taxonomy}, Compliance={compliance_taxonomy}" ) try: # Initialize TimeService with SystemTimeService implementation set_time_service(SystemTimeService()) # Initialize memory adapter memory_adapter = SQLiteMemoryAdapter() # Create absolute path abs_base_path = Path(base_path).expanduser().resolve() # Ensure the base directory exists abs_base_path.mkdir(parents=True, exist_ok=True) # Create project memory - passing required taxonomies directly project_memory = ProjectMemory( project_info=ProjectInfo( name=new_project_name, interaction_language=interaction_language, documentation_language=documentation_language, base_path=abs_base_path, platform_taxonomy=platform_taxonomy, domain_taxonomy=domain_taxonomy, size_taxonomy=size_taxonomy, compliance_taxonomy=compliance_taxonomy, custom_taxonomy=custom_taxonomy if custom_taxonomy else {}, ), artifacts={ Bucket.INITIATE_INITIAL_PRODUCT_DOCS: [ { "name": "Project Charter", "status": DocumentStatus.PENDING, "bucket": Bucket.INITIATE_INITIAL_PRODUCT_DOCS, "path": Path("Project_Charter.md"), } ] }, platform_taxonomy=platform_taxonomy, domain_taxonomy=domain_taxonomy, size_taxonomy=size_taxonomy, compliance_taxonomy=compliance_taxonomy, custom_taxonomy=custom_taxonomy if custom_taxonomy else {}, ) # Save to memory await memory_adapter.save_memory(project_memory) return { "status": "ok", "message": f"Project '{new_project_name}' created successfully at {abs_base_path}", "project_name": new_project_name, "base_path": str(abs_base_path), } except Exception as e: logger.error(f"Error creating project: {str(e)}") return {"status": "error", "message": f"Failed to create project: {str(e)}"} @mcp.tool() async def paella_list() -> Dict: """Retrieves and lists the names of all PAELLADOC projects stored in the system memory. This is useful for identifying available projects that can be selected using the 'paella_select' or 'core_continue' tools to resume work. Returns: A dictionary containing the operation status ('ok' or 'error'), a list of project names under the 'projects' key, and a confirmation message. """ try: memory_adapter = SQLiteMemoryAdapter() projects = await memory_adapter.list_projects() return { "status": "ok", "projects": projects, "message": "Projects retrieved successfully", } except Exception as e: logger.error(f"Error listing projects: {str(e)}") return {"status": "error", "message": f"Failed to list projects: {str(e)}"} @mcp.tool() async def paella_select(project_name: str) -> Dict: """ Loads the memory of an existing PAELLADOC project and sets it as the active context. This tool makes the specified project the current focus for subsequent conversational commands and actions within the Paelladoc session. It retrieves the project's state from the persistent memory. Args: project_name: The name of the existing PAELLADOC project to activate. Returns: A dictionary containing the operation status ('ok' or 'error'), a confirmation message, and key details of the selected project (name, base path). Returns an error status if the project is not found. """ try: memory_adapter = SQLiteMemoryAdapter() project_memory = await memory_adapter.load_memory(project_name) if project_memory: return { "status": "ok", "message": f"Project '{project_name}' selected", "project_name": project_name, "base_path": str(project_memory.project_info.base_path), } else: return {"status": "error", "message": f"Project '{project_name}' not found"} except Exception as e: logger.error(f"Error selecting project: {str(e)}") return {"status": "error", "message": f"Failed to select project: {str(e)}"} # Remove the main execution block as this module is not meant to be run directly # if __name__ == "__main__": # mcp.run() ================ File: paelladoc/adapters/plugins/core/list_projects.py ================ """ Plugin for listing existing PAELLADOC projects. """ import logging from typing import Dict, Any from pathlib import Path from paelladoc.domain.core_logic import mcp # Adapter for persistence from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # Project model is not needed here, we only list names logger = logging.getLogger(__name__) @mcp.tool( name="core_list_projects", description="Lists the names of existing PAELLADOC projects found in the memory.", ) async def list_projects( db_path: str = None, ) -> Dict[str, Any]: # Keep db_path for testing """Retrieves the list of project names from the persistence layer. Args: db_path: Optional database path to use (primarily for testing). Returns: A dictionary containing the status and a list of project names. """ logger.info(f"Executing core.list_projects command. DB path: {db_path}") try: # Use the provided db_path (for tests) or the default path from the adapter memory_adapter = ( SQLiteMemoryAdapter(db_path=Path(db_path)) if db_path else SQLiteMemoryAdapter() ) logger.info( f"core.list_projects using DB path: {memory_adapter.db_path.resolve()}" ) # Log the actual path used except Exception as e: logger.error(f"Failed to instantiate SQLiteMemoryAdapter: {e}", exc_info=True) return { "status": "error", "message": "Internal server error: Could not initialize memory adapter.", "projects": [], # Return empty list on error } try: # Use the correct method to get only names project_names = await memory_adapter.list_projects() count = len(project_names) message = ( f"Found {count} project{'s' if count != 1 else ''}." if count > 0 else "No projects found." ) logger.info(message) return { "status": "ok", # Use 'ok' for success "message": message, "projects": project_names, # Return the list of names } except Exception as e: logger.error( f"Error retrieving projects from memory adapter: {e}", exc_info=True ) return { "status": "error", "message": f"Error retrieving projects: {str(e)}", "projects": [], # Return empty list on error } ================ File: paelladoc/adapters/plugins/core/continue_proj.py ================ from paelladoc.domain.core_logic import mcp import logging # Initialize logger for this module logger = logging.getLogger(__name__) # Domain models from paelladoc.domain.models.project import ( DocumentStatus, Bucket, ) # Adapter for persistence from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # Extracted behavior configuration from the original MDC file BEHAVIOR_CONFIG = { "calculate_documentation_completion": True, "code_after_documentation": True, "confirm_each_parameter": True, "conversation_required": True, "documentation_first": True, "documentation_section_sequence": [ "project_definition", "market_research", "user_research", "problem_definition", "product_definition", "architecture_decisions", "product_roadmap", "user_stories", "technical_architecture", "technical_specifications", "component_specification", "api_specification", "database_design", "frontend_architecture", "testing_strategy", "devops_pipeline", "security_framework", "documentation_framework", ], "enforce_one_question_rule": True, "force_single_question_mode": True, "guide_documentation_sequence": True, "interactive": True, "load_memory_file": True, "max_questions_per_message": 1, "memory_path": "/docs/{project_name}/.memory.json", "one_parameter_at_a_time": True, "prevent_web_search": True, "prohibit_multiple_questions": True, "provide_section_guidance": True, "require_step_confirmation": True, "sequential_questions": True, "single_question_mode": True, "strict_parameter_sequence": True, "strict_question_sequence": True, "track_documentation_completion": True, "update_last_modified": True, "wait_for_response": True, "wait_for_user_response": True, } # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="core_continue", description="Continues work on an existing PAELLADOC project." ) async def core_continue( project_name: str, ) -> dict: # Added project_name argument, made async """Loads an existing project's memory and suggests the next steps. Args: project_name (str): The name of the project to continue. Behavior Config: this tool has associated behavior configuration extracted from the MDC file. See the `BEHAVIOR_CONFIG` variable in the source code. """ logging.info( f"Executing initial implementation for core.continue for project: {project_name}..." ) # --- Dependency Injection (Temporary Direct Instantiation) --- # TODO: Replace with proper dependency injection try: # Use the default path defined in the adapter (project root) memory_adapter = SQLiteMemoryAdapter() logger.info(f"core.continue using DB path: {memory_adapter.db_path.resolve()}") # Fetch the list of existing projects (Removed assignment as it's not used here) # existing_projects = await memory_adapter.list_projects() except Exception as e: logging.error(f"Failed to instantiate SQLiteMemoryAdapter: {e}", exc_info=True) return { "status": "error", "message": "Internal server error: Could not initialize memory adapter.", } # --- Load Project Memory --- try: memory = await memory_adapter.load_memory(project_name) if not memory: logging.warning(f"Project '{project_name}' not found for CONTINUE command.") return { "status": "error", "message": f"Project '{project_name}' not found. Use PAELLA command to start it.", } logging.info(f"Successfully loaded memory for project: {project_name}") except Exception as e: logging.error(f"Error loading memory for '{project_name}': {e}", exc_info=True) return { "status": "error", "message": f"Failed to load project memory: {e}", } # --- Calculate Next Step (Simplified) --- # TODO: Implement sophisticated logic based on BEHAVIOR_CONFIG['documentation_section_sequence'] # For now, find the first pending artifact or report overall status. next_step_suggestion = ( "No pending artifacts found. Project might be complete or need verification." ) found_pending = False # Define a somewhat logical bucket order for checking progress # This could be moved to config or derived from the taxonomy later bucket_order = [ Bucket.INITIATE_INITIAL_PRODUCT_DOCS, Bucket.ELABORATE_DISCOVERY_AND_RESEARCH, Bucket.ELABORATE_IDEATION_AND_DESIGN, Bucket.ELABORATE_SPECIFICATION_AND_PLANNING, Bucket.ELABORATE_CORE_AND_SUPPORT, Bucket.GOVERN_STANDARDS_METHODOLOGIES, Bucket.GOVERN_VERIFICATION_VALIDATION, Bucket.GENERATE_CORE_FUNCTIONALITY, Bucket.GENERATE_SUPPORTING_ELEMENTS, Bucket.DEPLOY_PIPELINES_AND_AUTOMATION, Bucket.DEPLOY_INFRASTRUCTURE_AND_CONFIG, Bucket.OPERATE_RUNBOOKS_AND_SOPS, Bucket.OPERATE_MONITORING_AND_ALERTING, Bucket.ITERATE_LEARNING_AND_ANALYSIS, Bucket.ITERATE_PLANNING_AND_RETROSPECTION, # Core/System/Other buckets can be checked last or based on context Bucket.INITIATE_CORE_SETUP, Bucket.GOVERN_CORE_SYSTEM, Bucket.GOVERN_MEMORY_TEMPLATES, Bucket.GOVERN_TOOLING_SCRIPTS, Bucket.MAINTAIN_CORE_FUNCTIONALITY, Bucket.MAINTAIN_SUPPORTING_ELEMENTS, Bucket.DEPLOY_GUIDES_AND_CHECKLISTS, Bucket.DEPLOY_SECURITY, Bucket.OPERATE_MAINTENANCE, Bucket.UNKNOWN, ] for bucket in bucket_order: # Use .get() to safely access potentially missing buckets in memory.artifacts artifacts_in_bucket = memory.artifacts.get(bucket, []) for artifact in artifacts_in_bucket: if artifact.status == DocumentStatus.PENDING: next_step_suggestion = f"Next suggested step: Work on artifact '{artifact.name}' ({artifact.path}) in bucket '{bucket.value}'." found_pending = True break # Found the first pending, stop searching this bucket if found_pending: break # Stop searching other buckets # Get overall phase completion for context phase_completion_summary = "Phase completion: " # Define phases based on Bucket enum prefixes phases = sorted( list(set(b.value.split("::")[0] for b in Bucket if "::" in b.value)) ) phase_summaries = [] try: for phase in phases: stats = memory.get_phase_completion(phase) if stats["total"] > 0: # Only show phases with artifacts phase_summaries.append( f"{phase}({stats['completion_percentage']:.0f}%)" ) if not phase_summaries: phase_completion_summary += "(No artifacts tracked yet)" else: phase_completion_summary += ", ".join(phase_summaries) except Exception as e: logging.warning(f"Could not calculate phase completion: {e}") phase_completion_summary += "(Calculation error)" # --- Return Status and Suggestion --- return { "status": "ok", "message": f"Project '{project_name}' loaded. {phase_completion_summary}", "next_step": next_step_suggestion, # Optionally return parts of the memory if needed by the client # "current_taxonomy_version": memory.taxonomy_version } ================ File: paelladoc/adapters/plugins/core/help.py ================ from paelladoc.domain.core_logic import mcp import logging # Adapter for taxonomy loading from paelladoc.adapters.output.filesystem.taxonomy_provider import ( FileSystemTaxonomyProvider, ) # Instantiate the taxonomy provider # TODO: Replace direct instantiation with Dependency Injection TAXONOMY_PROVIDER = FileSystemTaxonomyProvider() # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="core_help", description="Shows help information about available commands", ) def core_help(command: str = None, format: str = "detailed") -> dict: """Provides help information about available PAELLADOC commands. Args: command: Optional specific command to get help for format: Output format (detailed, summary, examples) Returns: Dictionary with help information """ logging.info(f"Executing core.help with command={command}, format={format}") # Define available commands commands = { "paella": { "description": "Initiates the documentation process for a new project", "parameters": [ { "name": "project_name", "type": "string", "required": True, "description": "Name of the project to document", }, { "name": "base_path", "type": "string", "required": True, "description": "Base path for project documentation", }, { "name": "documentation_language", "type": "string", "required": False, "description": "Language for documentation (e.g. 'es', 'en')", }, { "name": "interaction_language", "type": "string", "required": False, "description": "Language for interaction (e.g. 'es', 'en')", }, ], "example": "PAELLA my_project ~/projects/my_project en en", }, "continue": { "description": "Continues working on an existing project", "parameters": [ { "name": "project_name", "type": "string", "required": True, "description": "Name of the project to continue with", }, ], "example": "CONTINUE my_project", }, "verification": { "description": "Verifies documentation coverage against the MECE taxonomy", "parameters": [ { "name": "project_name", "type": "string", "required": True, "description": "Name of the project to verify", }, ], "example": "VERIFY my_project", }, "select_taxonomy": { "description": "Guides users through selecting and customizing a project taxonomy", "parameters": [ { "name": "project_name", "type": "string", "required": True, "description": "Name of the project to customize taxonomy for", }, { "name": "size_category", "type": "string", "required": False, "description": "Project size category (personal, hobbyist, mvp, startup, enterprise)", }, { "name": "domain_type", "type": "string", "required": False, "description": "Project domain type (web, mobile, iot, ai/ml, etc.)", }, { "name": "platform_type", "type": "string", "required": False, "description": "Platform implementation type (chrome-extension, ios-native, android-native, etc.)", }, { "name": "compliance_needs", "type": "string", "required": False, "description": "Compliance requirements (none, hipaa, gdpr, etc.)", }, { "name": "custom_threshold", "type": "float", "required": False, "description": "Custom coverage threshold (0.0-1.0)", }, ], "example": "SELECT-TAXONOMY my_project --size=mvp --domain=web --platform=chrome-extension", }, "taxonomy_info": { "description": "Shows information about available taxonomies and categories", "parameters": [], "example": "TAXONOMY-INFO", }, "help": { "description": "Shows help information about available commands", "parameters": [ { "name": "command", "type": "string", "required": False, "description": "Specific command to get help for", }, { "name": "format", "type": "string", "required": False, "description": "Output format (detailed, summary, examples)", }, ], "example": "HELP paella", }, } # If a specific command is requested if command and command in commands: return {"status": "ok", "command": command, "help": commands[command]} # Otherwise return all commands result = { "status": "ok", "available_commands": list(commands.keys()), "format": format, } # Add command information based on format if format == "detailed": result["commands"] = commands try: available_taxonomies = TAXONOMY_PROVIDER.get_available_taxonomies() if "select_taxonomy" in commands: commands["select_taxonomy"]["available_options"] = available_taxonomies if "taxonomy_info" in commands: commands["taxonomy_info"]["available_taxonomies"] = available_taxonomies except Exception as e: logging.error(f"Failed to load taxonomies for help: {e}", exc_info=True) # Continue without taxonomy info if loading fails elif format == "summary": result["commands"] = { cmd: info["description"] for cmd, info in commands.items() } elif format == "examples": result["commands"] = {cmd: info["example"] for cmd, info in commands.items()} return result ================ File: paelladoc/adapters/plugins/memory/project_memory.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="memory.project_memory", description="Manages the project's memory file (.memory.json)", ) def memory_project_memory() -> dict: """Handles operations related to the project memory. Likely used internally by other commands (PAELLA, CONTINUE, VERIFY) to load, save, and update project state, progress, and metadata. Provides the HELP CONTEXT (though this might be deprecated). """ # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for memory.project_memory...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for memory.project_memory", } ================ File: paelladoc/adapters/plugins/code/generate_doc.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool(name="code.generate_doc", description="3. Wait for user selection") def code_generate_doc() -> dict: """3. Wait for user selection""" # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for code.generate_doc...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for code.generate_doc", } ================ File: paelladoc/adapters/plugins/code/generate_context.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="code.generate_context", description="This automatically creates the context file that will be used by GENERATE-DOC for interactive documentation generation.", ) def code_generate_context() -> dict: """This automatically creates the context file that will be used by GENERATE-DOC for interactive documentation generation.""" # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for code.generate_context...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for code.generate_context", } ================ File: paelladoc/adapters/plugins/code/code_generation.py ================ from paelladoc.domain.core_logic import mcp from typing import Optional, List, Dict, Any # Add necessary types import logging # Extracted behavior configuration from the original MDC file BEHAVIOR_CONFIG = { 'abort_if_documentation_incomplete': True, 'code_after_documentation': True, 'confirm_each_parameter': True, 'conversation_required': True, 'documentation_first': True, 'documentation_verification_path': '/docs/{project_name}/.memory.json', 'enforce_one_question_rule': True, 'extract_from_complete_documentation': True, 'force_single_question_mode': True, 'guide_to_continue_command': True, 'interactive': True, 'max_questions_per_message': 1, 'one_parameter_at_a_time': True, 'prevent_web_search': True, 'prohibit_multiple_questions': True, 'require_complete_documentation': True, 'require_step_confirmation': True, 'required_documentation_sections': [ 'project_definition', 'market_research', 'user_research', 'problem_definition', 'product_definition', 'architecture_decisions', 'product_roadmap', 'user_stories', 'technical_architecture', 'technical_specifications', 'api_specification', 'database_design'], 'sequential_questions': True, 'single_question_mode': True, 'strict_parameter_sequence': True, 'strict_question_sequence': True, 'verify_documentation_completeness': True, 'wait_for_response': True, 'wait_for_user_response': True} # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool(name="code.code_generation", description="The command uses the script at `.cursor/rules/scripts/extract_repo_content.py` to perform the repository extraction, which leverages repopack-py to convert the codebase to text.") def code_code_generation() -> dict: """The command uses the script at `.cursor/rules/scripts/extract_repo_content.py` to perform the repository extraction, which leverages repopack-py to convert the codebase to text.""" Behavior Config: this tool has associated behavior configuration extracted from the MDC file. See the `BEHAVIOR_CONFIG` variable in the source code. """ # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info(f"Executing stub for code.code_generation...") # Example: Print parameters local_vars = locals() param_values = { } logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return {"status": "ok", "message": f"Successfully executed stub for code.code_generation"} ================ File: paelladoc/adapters/plugins/product/product_management.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="product.product_management", description='Manages product features like stories, tasks, etc. Access: stakeholder: ["read_only"]', ) def product_product_management() -> dict: """Manages product management features. Handles user stories, tasks, sprints, meeting notes, reports, etc. Example access control mentioned in description: stakeholder: ["read_only"] """ # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for product.product_management...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for product.product_management", } ================ File: paelladoc/adapters/plugins/styles/coding_styles.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="styles.coding_styles", description="Manages coding style guides for the project.", ) def styles_coding_styles() -> dict: """Applies, customizes, or lists coding styles. Supports styles like frontend, backend, chrome_extension, etc. Uses operations: apply, customize, list, show. """ # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for styles.coding_styles...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for styles.coding_styles", } ================ File: paelladoc/adapters/plugins/styles/git_workflows.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool( name="styles.git_workflows", description="Manages Git workflow methodologies for the project.", ) def styles_git_workflows() -> dict: """Applies or customizes Git workflows. Supports workflows like GitHub Flow, GitFlow, Trunk-Based. Provides guidance based on project complexity. Simple projects → GitHub Flow Complex projects → GitFlow or Trunk-Based """ # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for styles.git_workflows...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for styles.git_workflows", } ================ File: paelladoc/adapters/plugins/templates/templates.py ================ from paelladoc.domain.core_logic import mcp import logging # Insert behavior config here # TODO: Review imports and add any other necessary modules @mcp.tool(name="templates.templates", description="Manages documentation templates.") def templates_templates() -> dict: """Handles the lifecycle of documentation templates. Likely allows listing, showing, creating, or updating templates. The previous description mentioned workflows, which seems incorrect here. """ # TODO: Implement the actual logic of the command here # Access parameters using their variable names (e.g., param) # Access behavior config using BEHAVIOR_CONFIG dict (if present) logging.info("Executing stub for templates.templates...") # Example: Print parameters local_vars = locals() param_values = {} logging.info(f"Parameters received: {param_values}") # Replace with actual return value based on command logic return { "status": "ok", "message": "Successfully executed stub for templates.templates", } ================ File: paelladoc/adapters/output/chroma/chroma_vector_store_adapter.py ================ import logging import uuid from typing import List, Dict, Any, Optional from pathlib import Path import chromadb from chromadb.api.models.Collection import Collection # Import NotFoundError from the appropriate module depending on chromadb version try: from chromadb.errors import NotFoundError except ImportError: try: from chromadb.api.errors import NotFoundError except ImportError: class NotFoundError(ValueError): """Fallback NotFoundError inheriting from ValueError for broader compatibility.""" pass # Ports and Domain Models/Helpers from paelladoc.ports.output.vector_store_port import VectorStorePort, SearchResult logger = logging.getLogger(__name__) # Default path for persistent ChromaDB data DEFAULT_CHROMA_PATH = Path.home() / ".paelladoc" / "chroma_data" class ChromaSearchResult(SearchResult): """Concrete implementation of SearchResult for Chroma results.""" def __init__( self, id: str, distance: Optional[float], metadata: Optional[Dict[str, Any]], document: Optional[str], ): self.id = id self.distance = distance self.metadata = metadata self.document = document class ChromaVectorStoreAdapter(VectorStorePort): """ChromaDB implementation of the VectorStorePort.""" def __init__( self, persist_path: Optional[Path] = DEFAULT_CHROMA_PATH, in_memory: bool = False, ): """Initializes the ChromaDB client. Args: persist_path: Path to store persistent Chroma data. Ignored if in_memory is True. in_memory: If True, runs ChromaDB entirely in memory (data is lost on exit). """ if in_memory: logger.info("Initializing ChromaDB client in-memory.") self.client = chromadb.Client() else: self.persist_path = persist_path or DEFAULT_CHROMA_PATH self.persist_path.mkdir(parents=True, exist_ok=True) logger.info( f"Initializing persistent ChromaDB client at: {self.persist_path}" ) self.client = chromadb.PersistentClient(path=str(self.persist_path)) # TODO: Consider configuration for embedding function, distance function, etc. # Using Chroma's defaults for now (all-MiniLM-L6-v2 and cosine distance) async def get_or_create_collection(self, collection_name: str) -> Collection: """Gets or creates a Chroma collection.""" try: collection = self.client.get_collection(name=collection_name) logger.debug(f"Retrieved existing Chroma collection: {collection_name}") return collection except (NotFoundError, ValueError) as e: # Handle case where collection does not exist (NotFoundError or ValueError) if "does not exist" in str(e): # Check if the error indicates non-existence logger.debug(f"Collection '{collection_name}' not found, creating...") collection = self.client.create_collection(name=collection_name) logger.info(f"Created new Chroma collection: {collection_name}") return collection else: logger.error( f"Unexpected error getting collection '{collection_name}': {e}", exc_info=True, ) raise except Exception as e: logger.error( f"Error getting or creating collection '{collection_name}': {e}", exc_info=True, ) raise async def add_documents( self, collection_name: str, documents: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, ids: Optional[List[str]] = None, ) -> List[str]: """Adds documents to the specified Chroma collection.""" collection = await self.get_or_create_collection(collection_name) # Generate IDs if not provided if not ids: ids = [str(uuid.uuid4()) for _ in documents] elif len(ids) != len(documents): raise ValueError("Number of ids must match number of documents") # Add documents to the collection (this handles embedding generation) try: # collection.add is synchronous in the current chromadb client API collection.add(documents=documents, metadatas=metadatas, ids=ids) logger.info( f"Added {len(documents)} documents to collection '{collection_name}'." ) return ids except Exception as e: logger.error( f"Error adding documents to collection '{collection_name}': {e}", exc_info=True, ) raise async def search_similar( self, collection_name: str, query_texts: List[str], n_results: int = 5, where: Optional[Dict[str, Any]] = None, where_document: Optional[Dict[str, Any]] = None, include: Optional[List[str]] = ["metadatas", "documents", "distances"], ) -> List[List[SearchResult]]: """Searches for similar documents in the Chroma collection.""" try: collection = self.client.get_collection(name=collection_name) except (NotFoundError, ValueError) as e: # Handle case where collection does not exist if "does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for search.") return [[] for _ in query_texts] else: logger.error( f"Unexpected error retrieving collection '{collection_name}' for search: {e}", exc_info=True, ) raise except Exception as e: logger.error( f"Error retrieving collection '{collection_name}' for search: {e}", exc_info=True, ) raise try: # collection.query is synchronous results = collection.query( query_texts=query_texts, n_results=n_results, where=where, where_document=where_document, include=include, ) # Map Chroma's result structure to our SearchResult list of lists # Chroma returns a dict with keys like 'ids', 'distances', 'metadatas', 'documents' # Each value is a list of lists (one inner list per query) mapped_results: List[List[SearchResult]] = [] num_queries = len(query_texts) result_ids = results.get("ids") or [[] for _ in range(num_queries)] result_distances = results.get("distances") or [ [] for _ in range(num_queries) ] result_metadatas = results.get("metadatas") or [ [] for _ in range(num_queries) ] result_documents = results.get("documents") or [ [] for _ in range(num_queries) ] for i in range(num_queries): query_results = [] # Ensure all result lists have the expected length for the i-th query num_docs_for_query = ( len(result_ids[i]) if result_ids and i < len(result_ids) else 0 ) for j in range(num_docs_for_query): query_results.append( ChromaSearchResult( id=result_ids[i][j] if result_ids and i < len(result_ids) and j < len(result_ids[i]) else "N/A", distance=result_distances[i][j] if result_distances and i < len(result_distances) and j < len(result_distances[i]) else None, metadata=result_metadatas[i][j] if result_metadatas and i < len(result_metadatas) and j < len(result_metadatas[i]) else None, document=result_documents[i][j] if result_documents and i < len(result_documents) and j < len(result_documents[i]) else None, ) ) mapped_results.append(query_results) return mapped_results except Exception as e: logger.error( f"Error querying collection '{collection_name}': {e}", exc_info=True ) raise async def delete_collection(self, collection_name: str) -> None: """Deletes a Chroma collection.""" try: self.client.delete_collection(name=collection_name) logger.info(f"Deleted Chroma collection: {collection_name}") except (NotFoundError, ValueError) as e: # Handle case where collection does not exist if "does not exist" in str(e): logger.warning( f"Attempted to delete non-existent collection: {collection_name}" ) else: logger.error( f"Unexpected error deleting collection '{collection_name}': {e}", exc_info=True, ) raise except Exception as e: logger.error( f"Error deleting collection '{collection_name}': {e}", exc_info=True ) raise ================ File: paelladoc/adapters/output/filesystem/taxonomy_provider.py ================ import logging from pathlib import Path from typing import Dict, List from paelladoc.ports.output.taxonomy_provider import TaxonomyProvider logger = logging.getLogger(__name__) # Determine the base path relative to this file's location # Assumes this structure: src/paelladoc/adapters/output/filesystem/taxonomy_provider.py # And taxonomies are at: project_root/taxonomies/ ADAPTER_DIR = Path(__file__).parent SRC_DIR = ADAPTER_DIR.parent.parent.parent PROJECT_ROOT = SRC_DIR.parent TAXONOMY_BASE_PATH = PROJECT_ROOT / "taxonomies" class FileSystemTaxonomyProvider(TaxonomyProvider): """Provides available taxonomy information by scanning filesystem directories.""" def __init__(self, base_path: Path = TAXONOMY_BASE_PATH): """Initializes the provider with the base path to the taxonomy directories.""" self.base_path = base_path if not self.base_path.is_dir(): logger.error( f"Taxonomy base path not found or not a directory: {self.base_path.resolve()}" ) # Raise an error or handle appropriately? For now, log and continue. # raise FileNotFoundError(f"Taxonomy base path not found: {self.base_path}") self._cached_taxonomies: Dict[str, List[str]] | None = None def get_available_taxonomies(self) -> Dict[str, List[str]]: """Scans the taxonomy directories and loads available taxonomy names. Uses a simple cache to avoid repeated filesystem scans. """ if self._cached_taxonomies is not None: logger.debug("Returning cached taxonomies") return self._cached_taxonomies logger.debug(f"Scanning for taxonomies in: {self.base_path.resolve()}") available_taxonomies = {} categories = ["platform", "domain", "size", "compliance"] if not self.base_path.is_dir(): logger.error( f"Cannot scan taxonomies, base path is invalid: {self.base_path.resolve()}" ) return {cat: [] for cat in categories} # Return empty if base path is bad for category in categories: category_path = self.base_path / category if category_path.is_dir(): try: tax_files = sorted( f.stem # Get filename without .json extension for f in category_path.glob("*.json") if f.is_file() ) available_taxonomies[category] = tax_files logger.debug( f"Found {len(tax_files)} taxonomies in '{category}': {tax_files}" ) except OSError as e: logger.error( f"Error reading taxonomy directory {category_path}: {e}" ) available_taxonomies[category] = [] else: available_taxonomies[category] = [] logger.warning(f"Taxonomy directory not found: {category_path}") self._cached_taxonomies = available_taxonomies logger.info( f"Loaded {sum(len(v) for v in available_taxonomies.values())} taxonomies across {len(categories)} categories." ) return available_taxonomies ================ File: paelladoc/adapters/output/sqlite/models.py ================ from typing import List, Optional from sqlmodel import Field, Relationship, SQLModel, Column, JSON import datetime # Note: Domain Enums like DocumentStatus are not directly used here, # we store their string representation (e.g., 'pending'). # The adapter layer will handle the conversion. # --- Database Models --- # Forward references are needed for relationships defined before the target model class ProjectInfoDB(SQLModel, table=True): # Represents the metadata associated with a project memory entry id: Optional[int] = Field(default=None, primary_key=True) # name field is stored in ProjectMemoryDB as it's the primary identifier language: Optional[str] = None purpose: Optional[str] = None target_audience: Optional[str] = None objectives: Optional[List[str]] = Field(default=None, sa_column=Column(JSON)) # Define the one-to-one relationship back to ProjectMemoryDB # Use Optional because a metadata row might briefly exist before being linked project_memory: Optional["ProjectMemoryDB"] = Relationship( back_populates="project_meta" ) class ProjectDocumentDB(SQLModel, table=True): # Represents a single document tracked within a project memory id: Optional[int] = Field(default=None, primary_key=True) name: str = Field(index=True) # Name of the document file (e.g., "README.md") template_origin: Optional[str] = None status: str = Field(default="pending", index=True) # Store enum string value # Foreign key to link back to the main project memory entry project_memory_id: Optional[int] = Field( default=None, foreign_key="projectmemorydb.id" ) # Define the many-to-one relationship back to ProjectMemoryDB project_memory: Optional["ProjectMemoryDB"] = Relationship( back_populates="documents" ) class ProjectMemoryDB(SQLModel, table=True): # Represents the main project memory entry in the database id: Optional[int] = Field(default=None, primary_key=True) # Use project_name from metadata as the main unique identifier for lookups name: str = Field( index=True, unique=True ) # Changed from project_name to match domain model # New fields to match domain model base_path: str = Field(default="") # Store as string, convert to Path in adapter interaction_language: str = Field(default="en-US") documentation_language: str = Field(default="en-US") taxonomy_version: str = Field(default="0.5") created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) last_updated_at: datetime.datetime = Field(default_factory=datetime.datetime.now) # Foreign key to link to the associated metadata entry project_meta_id: Optional[int] = Field( default=None, foreign_key="projectmetadatadb.id", unique=True ) # Define the one-to-one relationship to ProjectInfoDB project_meta: Optional[ProjectInfoDB] = Relationship( back_populates="project_memory" ) # Define the one-to-many relationship to ProjectDocumentDB documents: List[ProjectDocumentDB] = Relationship(back_populates="project_memory") ================ File: paelladoc/adapters/output/sqlite/mapper.py ================ """ Mapping functions between domain models and SQLite DB models. """ import logging from typing import Dict, List, Optional from pathlib import Path import datetime import uuid # Domain Models from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, ArtifactMeta, Bucket, # Import if needed for mapping logic (e.g., default status) ) # Database Models from .db_models import ProjectMemoryDB, ArtifactMetaDB logger = logging.getLogger(__name__) def _ensure_utc(dt: Optional[datetime.datetime]) -> Optional[datetime.datetime]: """Ensures a datetime object is UTC, converting naive datetimes.""" if dt is None: return None if dt.tzinfo is None: # Assume naive datetimes from DB are UTC, or handle conversion if needed return dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) def map_db_to_domain(db_memory: ProjectMemoryDB) -> ProjectMemory: """Maps the DB model hierarchy to the domain ProjectMemory model.""" # Map ProjectInfo (formerly metadata) domain_project_info = ProjectInfo( name=db_memory.name, language=db_memory.language, purpose=db_memory.purpose, target_audience=db_memory.target_audience, objectives=db_memory.objectives if db_memory.objectives else [], # Handle potential None from DB base_path=Path(db_memory.base_path) if db_memory.base_path else None, interaction_language=db_memory.interaction_language, documentation_language=db_memory.documentation_language, taxonomy_version=db_memory.taxonomy_version, platform_taxonomy=db_memory.platform_taxonomy, domain_taxonomy=db_memory.domain_taxonomy, size_taxonomy=db_memory.size_taxonomy, compliance_taxonomy=db_memory.compliance_taxonomy, custom_taxonomy=db_memory.custom_taxonomy if db_memory.custom_taxonomy else {}, # Handle potential None taxonomy_validation=db_memory.taxonomy_validation if db_memory.taxonomy_validation else {}, # Handle potential None ) # Map Artifacts domain_artifacts: Dict[Bucket, List[ArtifactMeta]] = { bucket: [] for bucket in Bucket # Initialize all buckets } if db_memory.artifacts: # Check if artifacts relationship is loaded/exists for db_artifact in db_memory.artifacts: try: # Attempt to get the bucket enum member; default to UNKNOWN if invalid bucket_enum = Bucket(db_artifact.bucket) except ValueError: logger.warning( f"Artifact {db_artifact.id} has invalid bucket value '{db_artifact.bucket}' stored in DB. Mapping to UNKNOWN." ) bucket_enum = Bucket.UNKNOWN domain_artifact = ArtifactMeta( id=db_artifact.id, name=db_artifact.name, bucket=bucket_enum, path=Path(db_artifact.path), # Use path string directly created_at=_ensure_utc(db_artifact.created_at), updated_at=_ensure_utc(db_artifact.updated_at), created_by=db_artifact.created_by, modified_by=db_artifact.modified_by, status=db_artifact.status, ) # Append to the correct bucket list, handle UNKNOWN explicitly if needed elsewhere domain_artifacts[bucket_enum].append(domain_artifact) # Remove empty buckets if desired (or keep them as per domain logic) # domain_artifacts = {k: v for k, v in domain_artifacts.items() if v} # Assemble the final domain ProjectMemory object domain_memory = ProjectMemory( project_info=domain_project_info, artifacts=domain_artifacts, taxonomy_version=db_memory.taxonomy_version, created_at=_ensure_utc(db_memory.created_at), last_updated_at=_ensure_utc(db_memory.last_updated_at), created_by=db_memory.created_by, modified_by=db_memory.modified_by, # Map taxonomy fields from ProjectMemoryDB to ProjectMemory platform_taxonomy=db_memory.platform_taxonomy, domain_taxonomy=db_memory.domain_taxonomy, size_taxonomy=db_memory.size_taxonomy, compliance_taxonomy=db_memory.compliance_taxonomy, custom_taxonomy=db_memory.custom_taxonomy if db_memory.custom_taxonomy else {}, taxonomy_validation=db_memory.taxonomy_validation if db_memory.taxonomy_validation else {}, ) return domain_memory def map_domain_to_db( domain_memory: ProjectMemory, existing_db_memory: Optional[ProjectMemoryDB] = None ) -> ProjectMemoryDB: """ Maps the domain ProjectMemory model to a ProjectMemoryDB model. Handles both creating a new DB object and updating an existing one. """ now_utc = datetime.datetime.now(datetime.timezone.utc) # --- Map Project Info / Top-Level Fields --- project_info = domain_memory.project_info if existing_db_memory: db_memory = existing_db_memory # Update fields from ProjectInfo db_memory.language = project_info.language db_memory.purpose = project_info.purpose db_memory.target_audience = project_info.target_audience db_memory.objectives = project_info.objectives db_memory.base_path = ( str(project_info.base_path) if project_info.base_path else None ) db_memory.interaction_language = project_info.interaction_language db_memory.documentation_language = project_info.documentation_language # Update fields from ProjectMemory db_memory.taxonomy_version = domain_memory.taxonomy_version db_memory.last_updated_at = ( _ensure_utc(domain_memory.last_updated_at) or now_utc ) db_memory.modified_by = domain_memory.modified_by db_memory.platform_taxonomy = domain_memory.platform_taxonomy db_memory.domain_taxonomy = domain_memory.domain_taxonomy db_memory.size_taxonomy = domain_memory.size_taxonomy db_memory.compliance_taxonomy = domain_memory.compliance_taxonomy db_memory.custom_taxonomy = domain_memory.custom_taxonomy db_memory.taxonomy_validation = domain_memory.taxonomy_validation else: # Create new ProjectMemoryDB db_memory = ProjectMemoryDB( name=project_info.name, language=project_info.language, purpose=project_info.purpose, target_audience=project_info.target_audience, objectives=project_info.objectives, base_path=str(project_info.base_path) if project_info.base_path else None, interaction_language=project_info.interaction_language, documentation_language=project_info.documentation_language, taxonomy_version=domain_memory.taxonomy_version, created_at=_ensure_utc(domain_memory.created_at) or now_utc, last_updated_at=_ensure_utc(domain_memory.last_updated_at) or now_utc, created_by=domain_memory.created_by, modified_by=domain_memory.modified_by, platform_taxonomy=domain_memory.platform_taxonomy, domain_taxonomy=domain_memory.domain_taxonomy, size_taxonomy=domain_memory.size_taxonomy, compliance_taxonomy=domain_memory.compliance_taxonomy, custom_taxonomy=domain_memory.custom_taxonomy, taxonomy_validation=domain_memory.taxonomy_validation, artifacts=[], # Initialize relationship list ) # --- Map Artifacts --- # This logic needs the db_memory.id if creating new artifacts, # so it's better handled within the adapter's session context after flushing. # This function will return the populated/updated ProjectMemoryDB *without* # fully resolved artifacts if it's a new object. The adapter will handle artifact sync. # If updating, we can potentially return the artifact list structure needed? # For simplicity, let's return the main object mapping and let the adapter handle artifact sync. return db_memory def sync_artifacts_db( session, # Pass the SQLAlchemy session domain_memory: ProjectMemory, db_memory: ProjectMemoryDB, # Assumes db_memory exists and has an ID ) -> None: """ Synchronizes the ArtifactMetaDB entries based on the domain model's artifacts. This function should be called within the adapter's session context *after* the ProjectMemoryDB object exists and has an ID (i.e., after adding and flushing if new). Args: session: The active SQLAlchemy AsyncSession. domain_memory: The source domain model. db_memory: The target database model (must have an ID). """ if not db_memory.id: logger.error("Cannot sync artifacts: ProjectMemoryDB object has no ID.") # Or raise an error? return # Use eager loading if artifacts aren't already loaded # This check might be redundant depending on how db_memory was obtained if "artifacts" not in db_memory.__dict__: # Basic check if relationship is loaded logger.warning( "Artifacts relationship not loaded on db_memory. Explicit loading might be needed." ) # Potentially load it here if necessary, but ideally it's loaded beforehand # await session.refresh(db_memory, attribute_names=['artifacts']) db_artifacts_map: Dict[uuid.UUID, ArtifactMetaDB] = { a.id: a for a in db_memory.artifacts } domain_artifact_ids = set() artifacts_to_add = [] artifacts_to_delete = [] for bucket, domain_artifact_list in domain_memory.artifacts.items(): for domain_artifact in domain_artifact_list: if not isinstance(domain_artifact, ArtifactMeta): logger.warning( f"Skipping non-ArtifactMeta item found in domain artifacts: {domain_artifact}" ) continue # Skip if somehow a non-artifact is in the list domain_artifact_ids.add(domain_artifact.id) db_artifact = db_artifacts_map.get(domain_artifact.id) if db_artifact: # Update existing artifact db_artifact.name = domain_artifact.name db_artifact.bucket = domain_artifact.bucket # Store enum directly db_artifact.path = str(domain_artifact.path) db_artifact.status = domain_artifact.status # Store enum directly db_artifact.updated_at = _ensure_utc( domain_artifact.updated_at ) or datetime.datetime.now(datetime.timezone.utc) db_artifact.modified_by = domain_artifact.modified_by # No need to add to session explicitly if object is already managed else: # Create new artifact DB object new_db_artifact = ArtifactMetaDB( id=domain_artifact.id, project_memory_id=db_memory.id, # Link to parent name=domain_artifact.name, bucket=domain_artifact.bucket, path=str(domain_artifact.path), created_at=_ensure_utc(domain_artifact.created_at) or datetime.datetime.now(datetime.timezone.utc), updated_at=_ensure_utc(domain_artifact.updated_at) or datetime.datetime.now(datetime.timezone.utc), created_by=domain_artifact.created_by, modified_by=domain_artifact.modified_by, status=domain_artifact.status, ) artifacts_to_add.append(new_db_artifact) # Identify artifacts to delete for db_artifact_id, db_artifact in db_artifacts_map.items(): if db_artifact_id not in domain_artifact_ids: artifacts_to_delete.append(db_artifact) # Perform session operations (caller should handle commit/rollback) if artifacts_to_add: session.add_all(artifacts_to_add) logger.debug( f"Adding {len(artifacts_to_add)} new artifacts to session for project {db_memory.name}." ) # Deleting requires awaiting async session.delete for each # This needs to be done carefully within the async context of the adapter # This function CANNOT await session.delete directly if it's synchronous. # Let's return the list of objects to delete. # Instead of deleting here, return the list to the async adapter method # for artifact_to_delete in artifacts_to_delete: # logger.debug(f"Marking artifact {artifact_to_delete.id} ({artifact_to_delete.name}) for deletion from project {db_memory.name}.") # # await session.delete(artifact_to_delete) # Cannot do async op here return artifacts_to_delete # Return list of DB objects to be deleted by the caller ================ File: paelladoc/adapters/output/sqlite/sqlite_memory_adapter.py ================ """SQLite adapter for project memory persistence.""" import logging from typing import Optional, List from pathlib import Path from sqlmodel import SQLModel, select from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, selectinload from sqlalchemy.exc import IntegrityError # Ports and Domain Models from paelladoc.ports.output.memory_port import MemoryPort from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, ) # Database Models for this adapter from .db_models import ProjectMemoryDB # Import the new mapper functions from .mapper import map_db_to_domain, map_domain_to_db, sync_artifacts_db # Configuration from paelladoc.config.database import get_db_path # Default database path (obtained via config logic) # DEFAULT_DB_PATH = get_db_path() # No longer needed as constant? __init__ uses get_db_path() logger = logging.getLogger(__name__) # Remove redundant/fragile PROJECT_ROOT calculation # PROJECT_ROOT = Path(__file__).parent.parent.parent.parent.parent # logger.info(f"Project root calculated as: {PROJECT_ROOT.resolve()}") # DEFAULT_DB_PATH = PROJECT_ROOT / "paelladoc_memory.db" # logger.info(f"Default database path set to: {DEFAULT_DB_PATH.resolve()}") class SQLiteMemoryAdapter(MemoryPort): """SQLite implementation of the MemoryPort using new MECE/Artifact models.""" # Keep __init__ from HEAD (using get_db_path) def __init__(self, db_path: str | Path | None = None): """ Initialize the SQLite adapter. Args: db_path: Optional custom database path. If not provided, uses the configured default. """ self.db_path = Path(db_path) if db_path else get_db_path() logger.info( f"Initializing SQLite adapter with database path: {self.db_path.resolve()}" ) # Ensure the parent directory exists self.db_path.parent.mkdir(parents=True, exist_ok=True) # Create async engine self.async_engine = create_async_engine( f"sqlite+aiosqlite:///{self.db_path}", echo=False, # Set to True for SQL query logging connect_args={"check_same_thread": False}, # Necessary for SQLite async ) # Create async session factory (named async_session) self.async_session = sessionmaker( self.async_engine, class_=AsyncSession, expire_on_commit=False ) logger.info("SQLiteMemoryAdapter initialized.") async def _create_db_and_tables(self): """Creates the database and tables if they don't exist.""" async with self.async_engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) logger.info("Database tables checked/created.") # --- MemoryPort Implementation --- # async def save_memory(self, memory: ProjectMemory) -> None: """Saves the project memory state (including artifacts) to SQLite using the mapper.""" project_name = memory.project_info.name logger.debug(f"Attempting to save memory for project: {project_name}") await self._create_db_and_tables() async with self.async_session() as session: try: # Try to load existing DB object statement = ( select(ProjectMemoryDB) .where(ProjectMemoryDB.name == project_name) .options( selectinload(ProjectMemoryDB.artifacts) ) # Eager load artifacts ) results = await session.execute(statement) existing_db_memory = results.scalars().first() # Use mapper to map domain object to DB object (create or update) db_memory = map_domain_to_db(memory, existing_db_memory) # Add the main object to the session (SQLModel handles INSERT or UPDATE) session.add(db_memory) # If creating, flush to get the ID before syncing artifacts if not existing_db_memory: await session.flush() # Get the db_memory.id logger.debug( f"Flushed new project {db_memory.name} with ID {db_memory.id}" ) # Sync artifacts (add/update/prepare for delete) # The mapper function now returns the list of artifacts to delete artifacts_to_delete = sync_artifacts_db(session, memory, db_memory) # Perform deletions if any artifacts were marked if artifacts_to_delete: logger.debug( f"Deleting {len(artifacts_to_delete)} artifacts from session for project {project_name}" ) for artifact_to_del in artifacts_to_delete: await session.delete(artifact_to_del) # Commit all changes (project add/update, artifact add/update/delete) await session.commit() logger.info(f"Successfully saved memory for project: {project_name}") except IntegrityError as e: await session.rollback() logger.error( f"Integrity error saving project '{project_name}': {e}", exc_info=True, ) raise ValueError( f"Project '{project_name}' might already exist or another integrity issue occurred." ) from e except Exception as e: await session.rollback() logger.error( f"Unexpected error saving project '{project_name}': {e}", exc_info=True, ) raise async def load_memory(self, project_name: str) -> Optional[ProjectMemory]: """Loads project memory (including artifacts) from SQLite using the mapper.""" logger.debug(f"Attempting to load memory for project: {project_name}") await self._create_db_and_tables() async with self.async_session() as session: try: statement = ( select(ProjectMemoryDB) .where(ProjectMemoryDB.name == project_name) .options( selectinload(ProjectMemoryDB.artifacts) ) # Eager load artifacts ) results = await session.execute(statement) db_memory = results.scalars().first() if db_memory: logger.debug( f"Found project '{project_name}' in DB, mapping to domain model." ) # Use the mapper function return map_db_to_domain(db_memory) else: logger.debug(f"Project '{project_name}' not found in DB.") return None except Exception as e: logger.error( f"Error loading project '{project_name}': {e}", exc_info=True ) # Optional: Re-raise a custom domain exception? return None # Return None on error for now async def project_exists(self, project_name: str) -> bool: """Checks if a project memory exists in the SQLite database.""" logger.debug(f"Checking existence for project: {project_name}") await self._create_db_and_tables() async with self.async_session() as session: try: statement = select(ProjectMemoryDB.id).where( ProjectMemoryDB.name == project_name ) results = await session.execute(statement) exists = results.scalars().first() is not None logger.debug(f"Project '{project_name}' exists: {exists}") return exists except Exception as e: logger.error( f"Error checking project existence for '{project_name}': {e}", exc_info=True, ) return False async def list_projects(self) -> List[ProjectInfo]: # Return ProjectInfo objects """Lists basic info for all projects stored in the database.""" logger.debug("Listing all projects info from database.") await self._create_db_and_tables() projects_info: List[ProjectInfo] = [] async with self.async_session() as session: try: # Select necessary columns for ProjectInfo statement = select( ProjectMemoryDB.name, ProjectMemoryDB.language, ProjectMemoryDB.purpose, ProjectMemoryDB.target_audience, ProjectMemoryDB.objectives, ProjectMemoryDB.base_path, ProjectMemoryDB.interaction_language, ProjectMemoryDB.documentation_language, ProjectMemoryDB.taxonomy_version, # Added taxonomy version ProjectMemoryDB.platform_taxonomy, ProjectMemoryDB.domain_taxonomy, ProjectMemoryDB.size_taxonomy, ProjectMemoryDB.compliance_taxonomy, ProjectMemoryDB.custom_taxonomy, ProjectMemoryDB.taxonomy_validation, ) results = await session.execute(statement) for row in results.all(): # Manually map row to ProjectInfo domain model # Consider a dedicated mapper function if this gets complex info = ProjectInfo( name=row.name, language=row.language, purpose=row.purpose, target_audience=row.target_audience, objectives=row.objectives if row.objectives else [], base_path=Path(row.base_path) if row.base_path else None, interaction_language=row.interaction_language, documentation_language=row.documentation_language, taxonomy_version=row.taxonomy_version, platform_taxonomy=row.platform_taxonomy, domain_taxonomy=row.domain_taxonomy, size_taxonomy=row.size_taxonomy, compliance_taxonomy=row.compliance_taxonomy, custom_taxonomy=row.custom_taxonomy if row.custom_taxonomy else {}, taxonomy_validation=row.taxonomy_validation if row.taxonomy_validation else {}, ) projects_info.append(info) logger.debug(f"Found {len(projects_info)} projects.") return projects_info except Exception as e: logger.error(f"Error listing projects: {e}", exc_info=True) return [] # Return empty list on error # list_projects_names removed as list_projects now returns ProjectInfo # Remove ensure_utc helper method from the adapter (should be in mapper) # def ensure_utc(self, dt: datetime.datetime) -> datetime.datetime: # ... ================ File: paelladoc/adapters/output/sqlite/db_models.py ================ from typing import List, Optional from uuid import UUID, uuid4 from datetime import datetime from pathlib import Path from sqlmodel import Field, Relationship, SQLModel, Column # Import Column for JSON from sqlalchemy.sql.sqltypes import JSON # Import JSON type from paelladoc.domain.models.project import ( Bucket, DocumentStatus, ) # Import enums from domain # --- Knowledge Graph Documentation --- """ Knowledge Graph (KG) Ready Model Design This file defines SQLModel models with relationships specifically designed to be KG-compatible. Each relationship defined here (through foreign keys) represents a potential edge in a knowledge graph. Primary Nodes: - ProjectMemoryDB: Represents a project (central node) - ArtifactMetaDB: Represents documentation artifacts - TaxonomyDB: Represents MECE taxonomy selections Edge Types (Relationships): 1. HAS_ARTIFACT: ProjectMemoryDB -> ArtifactMetaDB - Direction: Project contains artifacts - Properties: None (simple containment) - FK: ArtifactMetaDB.project_memory_id -> ProjectMemoryDB.id 2. HAS_TAXONOMY: ProjectMemoryDB -> TaxonomyDB - Direction: Project uses taxonomy combinations - Properties: Selected categories - Validates MECE structure 3. IMPLEMENTS: ArtifactMetaDB -> TaxonomyDB - Direction: Artifact implements taxonomy requirements - Properties: Coverage metrics Future Potential Edges: 1. DEPENDS_ON: ArtifactMetaDB -> ArtifactMetaDB - Would represent dependencies between artifacts - Need to add a dependencies table or attribute 2. CREATED_BY: ArtifactMetaDB -> User - Connects artifacts to creators - Already tracking created_by/modified_by fields Query Patterns: - Find all artifacts for a project: ProjectMemoryDB -[HAS_ARTIFACT]-> ArtifactMetaDB - Find taxonomy coverage: ProjectMemoryDB -[HAS_TAXONOMY]-> TaxonomyDB - Validate MECE structure: TaxonomyDB -[IMPLEMENTS]-> ArtifactMetaDB MECE Structure Support: - Platform taxonomies (web, mobile, desktop, extensions) - Domain taxonomies (infra, tools, data/AI, business) - Size taxonomies (personal to enterprise) - Compliance taxonomies (GDPR, HIPAA, PCI) """ # --- Artifact Model --- class ArtifactMetaDB(SQLModel, table=True): """Database model for ArtifactMeta""" # Use the domain UUID as the primary key id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) # KG Edge: HAS_ARTIFACT (ProjectMemoryDB -> ArtifactMetaDB) # This foreign key creates a directional relationship from Project to Artifact project_memory_id: UUID = Field(foreign_key="projectmemorydb.id", index=True) name: str = Field(index=True) bucket: Bucket = Field(index=True) # Store enum value directly path: str = Field(index=True) # Store Path as string created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) # KG Node Properties for actor/authorship tracking # These fields can be used to create CREATED_BY and MODIFIED_BY edges in a KG created_by: Optional[str] = Field(default=None, index=True) modified_by: Optional[str] = Field(default=None, index=True) status: DocumentStatus = Field(index=True) # Store enum value directly # Define the relationship back to ProjectMemoryDB # This defines the reverse navigation for the HAS_ARTIFACT relationship project_memory: "ProjectMemoryDB" = Relationship(back_populates="artifacts") # KG-Ready: Store Path as string for easier querying/linking def __init__(self, *, path: Path, **kwargs): super().__init__(path=str(path), **kwargs) @property def path_obj(self) -> Path: return Path(self.path) # --- Project Memory Model --- class ProjectMemoryDB(SQLModel, table=True): """Project memory database model.""" # Use a separate UUID for the DB primary key, keep metadata name unique? # Or use metadata.name as PK? For now, using UUID. id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) name: str = Field(unique=True, index=True) # From metadata.name language: Optional[str] = Field(default=None) purpose: Optional[str] = Field(default=None) target_audience: Optional[str] = Field(default=None) objectives: Optional[List[str]] = Field( sa_column=Column(JSON), default=None ) # Store list as JSON base_path: Optional[str] = Field( default=None ) # Store as string representation of Path interaction_language: Optional[str] = Field(default=None) documentation_language: Optional[str] = Field(default=None) taxonomy_version: str created_at: datetime = Field(default_factory=datetime.utcnow) last_updated_at: datetime = Field(default_factory=datetime.utcnow) # KG Node Properties for actor/authorship tracking created_by: Optional[str] = Field(default=None, index=True) modified_by: Optional[str] = Field(default=None, index=True) # MECE Taxonomy Configuration platform_taxonomy: Optional[str] = Field(index=True) # Selected platform taxonomy domain_taxonomy: Optional[str] = Field(index=True) # Selected domain taxonomy size_taxonomy: Optional[str] = Field(index=True) # Selected size taxonomy compliance_taxonomy: Optional[str] = Field( index=True ) # Selected compliance taxonomy # Custom taxonomy configuration for this project custom_taxonomy: Optional[dict] = Field( sa_column=Column(JSON), default=None ) # Store as JSON object # MECE validation status taxonomy_validation: Optional[dict] = Field( sa_column=Column(JSON), default=None ) # Store validation results # Define the one-to-many relationship to ArtifactMetaDB # KG Edge: HAS_ARTIFACT (ProjectMemoryDB -> ArtifactMetaDB) # artifacts will be loaded automatically by SQLModel/SQLAlchemy when accessed artifacts: List["ArtifactMetaDB"] = Relationship(back_populates="project_memory") # TODO: Decide how to handle the old 'documents' field if migration is needed. # Could be another JSON field temporarily or migrated into ArtifactMetaDB. # For now, omitting it, assuming new structure only or migration handles it. ================ File: paelladoc/adapters/services/system_time_service.py ================ """System implementation of the time service.""" import datetime from ...domain.services.time_service import TimeService class SystemTimeService(TimeService): """System implementation of TimeService using system clock.""" def get_current_time(self) -> datetime.datetime: """Get current timestamp in UTC using system clock.""" return datetime.datetime.now(datetime.timezone.utc) def ensure_utc(self, dt: datetime.datetime) -> datetime.datetime: """Convert a datetime to UTC. If the datetime has no timezone info, assumes it's in UTC. """ if dt.tzinfo is None: return dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) ================ File: paelladoc/application/utils/behavior_enforcer.py ================ """ Utility for enforcing behavior rules defined in tool configurations. """ import logging from typing import Dict, Any, Set, Optional # Assuming MCPContext structure or relevant parts are accessible # from mcp.context import Context as MCPContext # Or use Any for now logger = logging.getLogger(__name__) class BehaviorViolationError(Exception): """Custom exception raised when a behavior rule is violated.""" def __init__(self, message: str): self.message = message super().__init__(self.message) class BehaviorEnforcer: """Enforces conversational behavior based on tool config and context.""" @staticmethod def enforce( tool_name: str, behavior_config: Optional[Dict[str, Any]], ctx: Optional[Any], # Replace Any with actual MCPContext type if available provided_args: Optional[Dict[str, Any]] ): """Checks current context and arguments against behavior rules. Args: tool_name: The name of the tool being called. behavior_config: The BEHAVIOR_CONFIG dictionary for the tool. ctx: The current MCP context object (expected to have ctx.progress). provided_args: The arguments passed to the tool function in the current call. Raises: BehaviorViolationError: If a rule is violated. """ if not behavior_config: logger.debug(f"No behavior config for tool '{tool_name}', skipping enforcement.") return if not ctx or not hasattr(ctx, 'progress') or not provided_args: logger.warning(f"Behavior enforcement skipped for '{tool_name}': missing context or args.") # Decide if this should be an error or just skipped return # --- Enforce fixed_question_order --- if "fixed_question_order" in behavior_config: sequence = behavior_config["fixed_question_order"] if not isinstance(sequence, list): logger.warning(f"Invalid 'fixed_question_order' in config for {tool_name}. Skipping check.") return # Assume ctx.progress['collected_params'] holds previously gathered arguments collected_params: Set[str] = ctx.progress.get("collected_params", set()) # Identify arguments provided in *this* specific call (non-None values) current_call_args = {k for k, v in provided_args.items() if v is not None} # Identify which of the currently provided args are *new* (not already collected) newly_provided_params = current_call_args - collected_params if not newly_provided_params: # No *new* parameters were provided in this call. # This might be okay if just confirming or if sequence is done. # Or maybe it should error if the sequence is *not* done? # For now, allow proceeding. Behavior could be refined. logger.debug(f"Tool '{tool_name}': No new parameters provided, sequence check passes by default.") return # Find the first parameter in the defined sequence that hasn't been collected yet expected_next_param = None for param in sequence: if param not in collected_params: expected_next_param = param break if expected_next_param is None: # The defined sequence is complete. # Should we allow providing *other* (optional?) parameters now? # If strict_parameter_sequence is True, maybe disallow? # For now, allow extra parameters after the main sequence. logger.debug(f"Tool '{tool_name}': Sequence complete, allowing provided args: {newly_provided_params}") return # --- Enforce one_parameter_at_a_time (implicitly for sequence) --- # Check if exactly one *new* parameter was provided and if it's the expected one. if len(newly_provided_params) > 1: raise BehaviorViolationError( f"Tool '{tool_name}' expects parameters sequentially. " f"Expected next: '{expected_next_param}'. " f"Provided multiple new parameters: {newly_provided_params}. " f"Collected so far: {collected_params}." ) provided_param = list(newly_provided_params)[0] if provided_param != expected_next_param: raise BehaviorViolationError( f"Tool '{tool_name}' expects parameters sequentially. " f"Expected next: '{expected_next_param}'. " f"Got unexpected new parameter: '{provided_param}'. " f"Collected so far: {collected_params}." ) # If we reach here, exactly one new parameter was provided and it was the expected one. logger.debug(f"Tool '{tool_name}': Correct sequential parameter '{provided_param}' provided.") # --- Add other rule checks here as needed --- # e.g., max_questions_per_message (more complex, needs turn context) # e.g., documentation_first (likely better as separate middleware/check) # If all checks pass return ================ File: paelladoc/application/services/vector_store_service.py ================ import logging from typing import List, Dict, Any, Optional # Ports and SearchResult from paelladoc.ports.output.vector_store_port import VectorStorePort, SearchResult logger = logging.getLogger(__name__) class VectorStoreService: """Application service for interacting with the vector store. Uses the VectorStorePort to abstract the underlying vector database. """ def __init__(self, vector_store_port: VectorStorePort): """Initializes the service with a VectorStorePort implementation.""" self.vector_store_port = vector_store_port logger.info(f"VectorStoreService initialized with port: {type(vector_store_port).__name__}") async def add_texts_to_collection( self, collection_name: str, documents: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, ids: Optional[List[str]] = None ) -> List[str]: """Adds text documents to a specific collection.""" logger.debug(f"Service: Adding {len(documents)} documents to vector store collection '{collection_name}'") try: added_ids = await self.vector_store_port.add_documents( collection_name=collection_name, documents=documents, metadatas=metadatas, ids=ids ) logger.info(f"Service: Successfully added documents to collection '{collection_name}' with IDs: {added_ids}") return added_ids except Exception as e: logger.error(f"Service: Error adding documents to collection '{collection_name}': {e}", exc_info=True) # Re-raise or handle specific exceptions as needed raise async def find_similar_texts( self, collection_name: str, query_texts: List[str], n_results: int = 5, filter_metadata: Optional[Dict[str, Any]] = None, filter_document: Optional[Dict[str, Any]] = None ) -> List[List[SearchResult]]: """Finds documents similar to the query texts within a collection.""" logger.debug(f"Service: Searching collection '{collection_name}' for texts similar to: {query_texts} (n={n_results})") try: results = await self.vector_store_port.search_similar( collection_name=collection_name, query_texts=query_texts, n_results=n_results, where=filter_metadata, # Pass filters to the port where_document=filter_document, # Include common fields by default include=["metadatas", "documents", "distances", "ids"] ) logger.info(f"Service: Found {sum(len(r) for r in results)} potential results for {len(query_texts)} queries in '{collection_name}'.") return results except Exception as e: logger.error(f"Service: Error searching collection '{collection_name}': {e}", exc_info=True) # Re-raise or handle specific exceptions as needed raise async def ensure_collection_exists(self, collection_name: str): """Ensures a collection exists, creating it if necessary.""" logger.debug(f"Service: Ensuring collection '{collection_name}' exists.") try: await self.vector_store_port.get_or_create_collection(collection_name) logger.info(f"Service: Collection '{collection_name}' checked/created.") except Exception as e: logger.error(f"Service: Error ensuring collection '{collection_name}' exists: {e}", exc_info=True) raise async def remove_collection(self, collection_name: str): """Removes a collection entirely.""" logger.debug(f"Service: Attempting to remove collection '{collection_name}'.") try: await self.vector_store_port.delete_collection(collection_name) logger.info(f"Service: Collection '{collection_name}' removed.") except Exception as e: logger.error(f"Service: Error removing collection '{collection_name}': {e}", exc_info=True) raise ================ File: paelladoc/application/services/memory_service.py ================ import logging from typing import Optional, Dict, Any, List, Callable, Awaitable # Domain Models from paelladoc.domain.models.project import ( ProjectMemory, DocumentStatus, ArtifactMeta, Bucket, ) # Ports from paelladoc.ports.output.memory_port import MemoryPort logger = logging.getLogger(__name__) # Type definition for event handlers EventHandler = Callable[[str, Dict[str, Any]], Awaitable[None]] class MemoryService: """Application service for managing project memory operations. Uses the MemoryPort to interact with the persistence layer. """ def __init__(self, memory_port: MemoryPort): """Initializes the service with a MemoryPort implementation.""" self.memory_port = memory_port self._event_handlers: Dict[str, List[EventHandler]] = {} logger.info( f"MemoryService initialized with port: {type(memory_port).__name__}" ) # --- Event System --- async def _emit_event(self, event_name: str, event_data: Dict[str, Any]) -> None: """Emits an event to all registered handlers for that event type. Args: event_name: The name of the event (e.g., 'artifact_created') event_data: Dictionary with event data """ if event_name not in self._event_handlers: logger.debug(f"No handlers registered for event: {event_name}") return handlers = self._event_handlers[event_name] logger.debug(f"Emitting event '{event_name}' to {len(handlers)} handlers") for handler in handlers: try: await handler(event_name, event_data) except Exception as e: logger.error( f"Error in event handler for '{event_name}': {e}", exc_info=True ) def register_event_handler(self, event_name: str, handler: EventHandler) -> None: """Registers a handler function for a specific event type. Args: event_name: The event name to listen for handler: Async function that will be called when the event occurs """ if event_name not in self._event_handlers: self._event_handlers[event_name] = [] self._event_handlers[event_name].append(handler) logger.debug(f"Registered handler for event: {event_name}") def unregister_event_handler(self, event_name: str, handler: EventHandler) -> bool: """Unregisters a handler function for a specific event type. Args: event_name: The event name handler: The handler function to remove Returns: True if the handler was removed, False if not found """ if event_name not in self._event_handlers: return False handlers = self._event_handlers[event_name] if handler in handlers: handlers.remove(handler) logger.debug(f"Unregistered handler for event: {event_name}") return True return False # --- Memory Service Methods --- async def get_project_memory(self, project_name: str) -> Optional[ProjectMemory]: """Retrieves the memory for a specific project.""" logger.debug(f"Service: Attempting to get memory for project '{project_name}'") memory = await self.memory_port.load_memory(project_name) if memory: await self._emit_event( "memory_loaded", { "project_name": project_name, "memory_id": str(memory.project_info.name), "timestamp": memory.last_updated_at.isoformat() if memory.last_updated_at else None, }, ) return memory async def check_project_exists(self, project_name: str) -> bool: """Checks if a project memory already exists.""" logger.debug(f"Service: Checking existence for project '{project_name}'") return await self.memory_port.project_exists(project_name) async def create_project_memory(self, memory: ProjectMemory) -> ProjectMemory: """Creates a new project memory entry. Raises: ValueError: If a project with the same name already exists. """ project_name = memory.project_info.name logger.debug( f"Service: Attempting to create memory for project '{project_name}'" ) exists = await self.check_project_exists(project_name) if exists: logger.error(f"Cannot create project '{project_name}': already exists.") raise ValueError(f"Project memory for '{project_name}' already exists.") await self.memory_port.save_memory(memory) logger.info( f"Service: Successfully created memory for project '{project_name}'" ) # Emit project_created event await self._emit_event( "project_created", { "project_name": project_name, "base_path": str(memory.project_info.base_path) if memory.project_info.base_path else None, "timestamp": memory.created_at.isoformat() if memory.created_at else None, "project_info_details": { k: v for k, v in memory.project_info.dict().items() if k not in ["name", "base_path"] and v is not None }, }, ) # Emit taxonomy event if taxonomy fields were provided if ( memory.platform_taxonomy or memory.domain_taxonomy or memory.size_taxonomy or memory.compliance_taxonomy or memory.custom_taxonomy ): await self._emit_event( "taxonomy_updated", { "project_name": project_name, "new_taxonomy": { "platform": memory.platform_taxonomy, "domain": memory.domain_taxonomy, "size": memory.size_taxonomy, "compliance": memory.compliance_taxonomy, "custom": memory.custom_taxonomy, }, "old_taxonomy": None, # First time setting it }, ) # Emit artifact_created events for initial artifacts for bucket, artifacts in memory.artifacts.items(): for artifact in artifacts: await self._emit_event( "artifact_created", { "project_name": project_name, "artifact_id": str(artifact.id), "artifact_name": artifact.name, "bucket": bucket.value, "path": str(artifact.path), "status": artifact.status.value, "timestamp": artifact.created_at.isoformat() if artifact.created_at else None, "created_by": artifact.created_by, }, ) return memory # Return the saved object (could also reload it) async def update_project_memory(self, memory: ProjectMemory) -> ProjectMemory: """Updates an existing project memory entry. Raises: ValueError: If the project does not exist. """ project_name = memory.project_info.name logger.debug( f"Service: Attempting to update memory for project '{project_name}'" ) # Ensure the project exists before attempting an update # Note: save_memory itself handles the create/update logic, but this check # makes the service layer's intent clearer and prevents accidental creation. exists = await self.check_project_exists(project_name) if not exists: logger.error(f"Cannot update project '{project_name}': does not exist.") raise ValueError( f"Project memory for '{project_name}' does not exist. Use create_project_memory first." ) # Get the old memory to compare changes old_memory = await self.memory_port.load_memory(project_name) # Save the updated memory await self.memory_port.save_memory(memory) logger.info( f"Service: Successfully updated memory for project '{project_name}'" ) # Emit project_updated event await self._emit_event( "project_updated", { "project_name": project_name, "timestamp": memory.last_updated_at.isoformat() if memory.last_updated_at else None, }, ) # Check if taxonomy fields changed if old_memory and ( memory.platform_taxonomy != old_memory.platform_taxonomy or memory.domain_taxonomy != old_memory.domain_taxonomy or memory.size_taxonomy != old_memory.size_taxonomy or memory.compliance_taxonomy != old_memory.compliance_taxonomy ): await self._emit_event( "taxonomy_updated", { "project_name": project_name, "timestamp": memory.last_updated_at.isoformat() if memory.last_updated_at else None, "new_taxonomy": { "platform": memory.platform_taxonomy, "domain": memory.domain_taxonomy, "size": memory.size_taxonomy, "compliance": memory.compliance_taxonomy, }, "old_taxonomy": { "platform": old_memory.platform_taxonomy, "domain": old_memory.domain_taxonomy, "size": old_memory.size_taxonomy, "compliance": old_memory.compliance_taxonomy, }, }, ) # Check for new or updated artifacts if old_memory: # Track artifacts by ID to detect changes for bucket, artifacts in memory.artifacts.items(): # Skip empty buckets if not artifacts: continue old_bucket_artifacts = old_memory.artifacts.get(bucket, []) old_artifact_ids = {str(a.id): a for a in old_bucket_artifacts} # Check each artifact in the new memory for artifact in artifacts: artifact_id = str(artifact.id) # If artifact didn't exist before, it's new if artifact_id not in old_artifact_ids: await self._emit_event( "artifact_created", { "project_name": project_name, "artifact_id": artifact_id, "artifact_name": artifact.name, "bucket": bucket.value, "path": str(artifact.path), "status": artifact.status.value, "timestamp": artifact.created_at.isoformat() if artifact.created_at else None, "created_by": artifact.created_by, }, ) else: # If artifact existed, check if it was updated old_artifact = old_artifact_ids[artifact_id] if ( artifact.status != old_artifact.status or artifact.updated_at != old_artifact.updated_at ): await self._emit_event( "artifact_updated", { "project_name": project_name, "artifact_id": artifact_id, "artifact_name": artifact.name, "bucket": bucket.value, "path": str(artifact.path), "old_status": old_artifact.status.value, "new_status": artifact.status.value, "timestamp": artifact.updated_at.isoformat() if artifact.updated_at else None, "modified_by": artifact.modified_by, }, ) return memory # Return the updated object # Example of a more specific use case method: async def update_document_status_in_memory( self, project_name: str, document_name: str, new_status: DocumentStatus ) -> Optional[ProjectMemory]: """Updates the status of a specific document within a project's memory.""" logger.debug( f"Service: Updating status for document '{document_name}' in project '{project_name}' to {new_status}" ) memory = await self.get_project_memory(project_name) if not memory: logger.warning( f"Project '{project_name}' not found, cannot update document status." ) return None if document_name not in memory.documents: logger.warning( f"Document '{document_name}' not found in project '{project_name}', cannot update status." ) # Or should we raise an error? return memory # Return unchanged memory? # Get old status for event old_status = memory.documents[document_name].status # Update status memory.update_document_status( document_name, new_status ) # Use domain model method # Save the updated memory await self.memory_port.save_memory(memory) logger.info( f"Service: Saved updated status for document '{document_name}' in project '{project_name}'" ) # Emit document_status_changed event await self._emit_event( "document_status_changed", { "project_name": project_name, "document_name": document_name, "old_status": old_status.value, "new_status": new_status.value, "timestamp": memory.last_updated_at.isoformat() if memory.last_updated_at else None, }, ) return memory async def add_artifact( self, project_name: str, artifact: ArtifactMeta, author: Optional[str] = None ) -> Optional[ProjectMemory]: """Adds a new artifact to a project's memory. Args: project_name: The name of the project artifact: The artifact to add author: Optional name of the author creating the artifact Returns: The updated project memory, or None if project not found """ logger.debug( f"Service: Adding artifact '{artifact.name}' to project '{project_name}'" ) # Set author if provided if author and not artifact.created_by: artifact.created_by = author artifact.modified_by = author memory = await self.get_project_memory(project_name) if not memory: logger.warning(f"Project '{project_name}' not found, cannot add artifact.") return None # Add the artifact added = memory.add_artifact(artifact) if not added: logger.warning( f"Artifact with path '{artifact.path}' already exists in project '{project_name}'" ) return memory # Save the updated memory await self.memory_port.save_memory(memory) logger.info( f"Service: Saved new artifact '{artifact.name}' in project '{project_name}'" ) # Emit artifact_created event await self._emit_event( "artifact_created", { "project_name": project_name, "artifact_id": str(artifact.id), "artifact_name": artifact.name, "bucket": artifact.bucket.value, "path": str(artifact.path), "status": artifact.status.value, "timestamp": artifact.created_at.isoformat() if artifact.created_at else None, "created_by": artifact.created_by, }, ) return memory async def update_artifact_status( self, project_name: str, bucket: Bucket, artifact_name: str, new_status: DocumentStatus, modifier: Optional[str] = None, ) -> Optional[ProjectMemory]: """Updates the status of a specific artifact within a project's memory. Args: project_name: The name of the project bucket: The bucket containing the artifact artifact_name: The name of the artifact to update new_status: The new status to set modifier: Optional name of the person making the change Returns: The updated project memory, or None if project not found """ logger.debug( f"Service: Updating status for artifact '{artifact_name}' in project '{project_name}' to {new_status}" ) memory = await self.get_project_memory(project_name) if not memory: logger.warning( f"Project '{project_name}' not found, cannot update artifact status." ) return None # Get the artifact to check its current status artifact = memory.get_artifact(bucket, artifact_name) if not artifact: logger.warning( f"Artifact '{artifact_name}' not found in bucket '{bucket.value}' for project '{project_name}'" ) return memory old_status = artifact.status # Update the artifact status updated = memory.update_artifact_status( bucket, artifact_name, new_status, modifier ) if not updated: logger.warning( f"Failed to update status for artifact '{artifact_name}' in project '{project_name}'" ) return memory # Save the updated memory await self.memory_port.save_memory(memory) logger.info( f"Service: Saved updated status for artifact '{artifact_name}' in project '{project_name}'" ) # Emit artifact_updated event await self._emit_event( "artifact_updated", { "project_name": project_name, "artifact_id": str(artifact.id), "artifact_name": artifact_name, "bucket": bucket.value, "old_status": old_status.value, "new_status": new_status.value, "timestamp": artifact.updated_at.isoformat() if artifact.updated_at else None, "modified_by": modifier or artifact.modified_by, }, ) return memory ================ File: paelladoc/ports/input/mcp_port.py ================ from abc import ABC, abstractmethod from typing import Any, Dict class MCPPort(ABC): """Input port for MCP (Model-Command-Process) operations.""" @abstractmethod def process_command(self, command: str, args: Dict[str, Any]) -> Dict[str, Any]: """Process an MCP command with its arguments.""" pass @abstractmethod def register_plugin(self, plugin: Any) -> None: """Register a new plugin.""" pass ================ File: paelladoc/ports/input/mcp_server_adapter.py ================ #!/usr/bin/env python3 """ PAELLADOC MCP Server entry point (Input Adapter). Relies on paelladoc_core.py (now core_logic.py in domain) for MCP functionality and FastMCP instance. Simply runs the imported MCP instance. Adds server-specific resources and prompts using decorators. """ import sys import logging from pathlib import Path import time # Add time import # Import TextContent for prompt definition from mcp.types import TextContent # Assuming mcp is installed in .venv # Import the core FastMCP instance and logger from the domain layer from paelladoc.domain.core_logic import mcp, logger # Corrected import path # --- Import plugin packages to trigger their __init__.py dynamic loading --- # # This ensures decorators within the package modules are executed when the server starts # Import core plugins package # This will execute plugins/core/__init__.py which dynamically loads modules like paella.py # We might need other plugin packages later, e.g.: # from paelladoc.adapters.plugins import code_analysis # from paelladoc.adapters.plugins import product_management # --- Add specific tools/resources/prompts for this entry point using decorators --- # # These are defined directly in this adapter file and might be deprecated later @mcp.resource("docs://readme") # Use decorator def get_readme() -> str: """Get the project README content.""" try: # Assuming README.md is in the project root (cwd) readme_path = Path("README.md") if readme_path.exists(): return readme_path.read_text() else: logger.warning("README.md not found in project root.") return "README.md not found" # Keep simple return for resource except Exception as e: logger.error(f"Error reading README.md: {e}", exc_info=True) return f"Error reading README.md: {str(e)}" @mcp.resource("docs://templates/{template_name}") # Use decorator def get_template(template_name: str) -> str: """Get a documentation template.""" # Corrected path relative to src directory base_path = Path(__file__).parent.parent.parent.parent # Should point to src/ template_path = ( base_path / "paelladoc" / "adapters" / "plugins" / "templates" / f"{template_name}.md" ) try: if template_path.exists(): return template_path.read_text() else: logger.warning(f"Template {template_name} not found at {template_path}") return f"Error: Template {template_name} not found" except Exception as e: logger.error(f"Error reading template {template_name}: {e}", exc_info=True) return f"Error reading template {template_name}: {str(e)}" @mcp.prompt() # Use decorator def paella_command(project_name: str) -> TextContent: """Create a PAELLA command prompt.""" return TextContent( type="text", text=f"Initiating PAELLADOC for project: {project_name}.\n" f"Please specify: 1. Project type, 2. Methodologies, 3. Git workflow.", ) # --- Main Execution Logic --- # if __name__ == "__main__": # Configure file logging try: log_file = "paelladoc_server.log" file_handler = logging.FileHandler(log_file) file_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ) logging.getLogger().addHandler(file_handler) logging.getLogger().setLevel(logging.DEBUG) logger.info(f"Logging configured. Outputting to {log_file}") except Exception as log_e: logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" ) # Re-get logger after basicConfig potentially reconfigured root logger = logging.getLogger(__name__) logger.error(f"Could not configure file logging: {log_e}. Logging to stderr.") # Check command line arguments to determine run mode run_mode = ( "stdio" if "--stdio" in sys.argv else "web" ) # Default to stdio if --stdio present try: if run_mode == "stdio": logger.info( "Starting PAELLADOC MCP server in STDIO mode via FastMCP mcp.run(transport='stdio')..." ) logger.debug("Waiting 10 seconds before mcp.run()...") time.sleep(10) # Add sleep before run logger.debug('Attempting mcp.run(transport="stdio")') mcp.run(transport="stdio") # Explicitly request stdio transport else: # Attempt to run the default web server (SSE) # Note: FastMCP's default run() might try stdio first anyway if no host/port specified logger.warning( "Starting PAELLADOC MCP server in default mode (likely web/SSE) via FastMCP mcp.run()..." ) logger.warning("Use --stdio argument for direct client integration.") mcp.run() # Run with default settings (tries SSE/web) logger.info(f"PAELLADOC MCP server finished (mode: {run_mode}).") except Exception as e: logger.critical(f"Failed to start or run MCP server: {e}", exc_info=True) sys.exit(1) ================ File: paelladoc/ports/output/taxonomy_provider.py ================ from abc import ABC, abstractmethod from typing import Dict, List class TaxonomyProvider(ABC): """Abstract interface for providing available taxonomy information.""" @abstractmethod def get_available_taxonomies(self) -> Dict[str, List[str]]: """Returns a dictionary of available taxonomies grouped by category. Example: { "platform": ["web-frontend", "ios-native", ...], "domain": ["ecommerce", "ai-ml", ...], ... } """ pass ================ File: paelladoc/ports/output/vector_store_port.py ================ from abc import ABC, abstractmethod from typing import List, Dict, Any, Optional class SearchResult(ABC): """Represents a single search result from the vector store.""" # Define common attributes for a search result id: str distance: Optional[float] = None metadata: Optional[Dict[str, Any]] = None document: Optional[str] = None class VectorStorePort(ABC): """Output Port defining operations for a vector store.""" @abstractmethod async def add_documents( self, collection_name: str, documents: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, ids: Optional[List[str]] = None ) -> List[str]: """Adds documents (text) to a specific collection in the vector store. Embeddings are typically generated automatically by the implementation. Args: collection_name: The name of the collection to add documents to. documents: A list of text documents to add. metadatas: Optional list of metadata dictionaries corresponding to each document. ids: Optional list of unique IDs for each document. Returns: A list of IDs for the added documents. """ pass @abstractmethod async def search_similar( self, collection_name: str, query_texts: List[str], n_results: int = 5, where: Optional[Dict[str, Any]] = None, where_document: Optional[Dict[str, Any]] = None, include: Optional[List[str]] = ["metadatas", "documents", "distances"] ) -> List[List[SearchResult]]: """Searches for documents in a collection similar to the query texts. Args: collection_name: The name of the collection to search within. query_texts: A list of query texts to find similar documents for. n_results: The maximum number of results to return for each query. where: Optional filter criteria for metadata. where_document: Optional filter criteria for document content. include: Optional list specifying what data to include in results. Returns: A list of lists of SearchResult objects, one list per query text. """ pass @abstractmethod async def get_or_create_collection(self, collection_name: str) -> Any: """Gets or creates a collection in the vector store. The return type is Any for now, as it depends on the specific library's collection object representation (e.g., Chroma's Collection). Args: collection_name: The name of the collection. Returns: The collection object. """ pass @abstractmethod async def delete_collection(self, collection_name: str) -> None: """Deletes a collection from the vector store. Args: collection_name: The name of the collection to delete. """ pass # Add other potential methods like: # async def delete_documents(self, collection_name: str, ids: List[str]) -> None: ... # async def update_documents(...) -> None: ... ================ File: paelladoc/ports/output/memory_port.py ================ from abc import ABC, abstractmethod from typing import Optional, List # Import the domain model it needs to interact with from paelladoc.domain.models.project import ProjectMemory class MemoryPort(ABC): """Output Port defining operations for project memory persistence.""" @abstractmethod async def save_memory(self, memory: ProjectMemory) -> None: """Saves the entire project memory state. Args: memory: The ProjectMemory object to save. """ pass @abstractmethod async def load_memory(self, project_name: str) -> Optional[ProjectMemory]: """Loads the project memory state for a given project name. Args: project_name: The unique name of the project to load. Returns: The ProjectMemory object if found, otherwise None. """ pass @abstractmethod async def project_exists(self, project_name: str) -> bool: """Checks if a project memory exists for the given name. Args: project_name: The unique name of the project to check. Returns: True if the project memory exists, False otherwise. """ pass @abstractmethod async def list_projects(self) -> List[str]: """Lists the names of all existing projects. Returns: A list of project names as strings. Returns an empty list if no projects exist. """ pass # Potentially add other methods later if needed, e.g., delete_memory ================ File: paelladoc/domain/core_logic.py ================ """ Core PAELLADOC MCP Logic. Handles MCP instance creation, plugin loading, and base tool registration. Uses FastMCP for compatibility with decorators. """ import logging from mcp.server.fastmcp import FastMCP # Use FastMCP from typing import Dict, Any # Configure base logger (handlers will be added by server.py) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Create the MCP server instance using FastMCP mcp = FastMCP("PAELLADOC") # --- Register Tools/Prompts --- # # Import plugins dynamically to register tools/prompts try: # Import from the new adapters location import paelladoc.adapters.plugins logger.info("Successfully loaded plugins from paelladoc.adapters.plugins") except ImportError as e: # Log as warning, server might still be usable with base tools logger.warning(f"Could not import plugins from paelladoc.adapters.plugins: {e}") except Exception as e: # Log as error for unexpected issues during import logger.error(f"An unexpected error occurred during plugin import: {e}", exc_info=True) @mcp.tool() # Use decorator again def ping(random_string: str = "") -> Dict[str, Any]: """ Basic health check; returns pong. Args: random_string (str, optional): Dummy parameter for no-parameter tools Returns: Dict[str, Any]: Response with status and message """ logger.debug(f"Ping tool called with parameter: {random_string}") return { "status": "ok", "message": "pong" } # Tools will be registered here by plugins # Note: No `if __name__ == "__main__":` block here. # This file is intended to be imported by the entry point (server.py). ================ File: paelladoc/domain/models/enums.py ================ from enum import Enum from typing import Set class DocumentStatus(str, Enum): """Status of a document in the project memory""" PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" ARCHIVED = "archived" class Bucket(str, Enum): """MECE taxonomy buckets for categorizing artifacts""" # Initiate categories INITIATE_CORE_SETUP = "Initiate::CoreSetup" INITIATE_INITIAL_PRODUCT_DOCS = "Initiate::InitialProductDocs" # Elaborate categories ELABORATE_DISCOVERY_AND_RESEARCH = "Elaborate::DiscoveryAndResearch" ELABORATE_IDEATION_AND_DESIGN = "Elaborate::IdeationAndDesign" ELABORATE_SPECIFICATION_AND_PLANNING = "Elaborate::SpecificationAndPlanning" ELABORATE_CORE_AND_SUPPORT = "Elaborate::CoreAndSupport" # Govern categories GOVERN_CORE_SYSTEM = "Govern::CoreSystem" GOVERN_STANDARDS_METHODOLOGIES = "Govern::StandardsMethodologies" GOVERN_VERIFICATION_VALIDATION = "Govern::VerificationValidation" GOVERN_MEMORY_TEMPLATES = "Govern::MemoryTemplates" GOVERN_TOOLING_SCRIPTS = "Govern::ToolingScripts" # Generate categories GENERATE_CORE_FUNCTIONALITY = "Generate::CoreFunctionality" GENERATE_SUPPORTING_ELEMENTS = "Generate::SupportingElements" # Maintain categories MAINTAIN_CORE_FUNCTIONALITY = "Maintain::CoreFunctionality" MAINTAIN_SUPPORTING_ELEMENTS = "Maintain::SupportingElements" # Deploy categories DEPLOY_PIPELINES_AND_AUTOMATION = "Deploy::PipelinesAndAutomation" DEPLOY_INFRASTRUCTURE_AND_CONFIG = "Deploy::InfrastructureAndConfig" DEPLOY_GUIDES_AND_CHECKLISTS = "Deploy::GuidesAndChecklists" DEPLOY_SECURITY = "Deploy::Security" # Operate categories OPERATE_RUNBOOKS_AND_SOPS = "Operate::RunbooksAndSOPs" OPERATE_MONITORING_AND_ALERTING = "Operate::MonitoringAndAlerting" OPERATE_MAINTENANCE = "Operate::Maintenance" # Iterate categories ITERATE_LEARNING_AND_ANALYSIS = "Iterate::LearningAndAnalysis" ITERATE_PLANNING_AND_RETROSPECTION = "Iterate::PlanningAndRetrospection" # Special bucket for artifacts not matching any pattern UNKNOWN = "Unknown" @classmethod def get_phase_buckets(cls, phase: str) -> Set["Bucket"]: """Get all buckets belonging to a specific phase""" return {bucket for bucket in cls if bucket.value.startswith(f"{phase}::")} ================ File: paelladoc/domain/models/fix_metadata.py ================ import re # Leer el archivo with open("project.py", "r") as f: content = f.read() # Cambiar nombre de clase Metadata a ProjectInfo para evitar palabras reservadas content = re.sub( r"class ProjectMetadata\(BaseModel\):", "class ProjectInfo(BaseModel):", content ) # Cambiar referencias a project_metadata para usar project_info content = re.sub( r"project_metadata: ProjectMetadata", "project_info: ProjectInfo", content ) # Asegurarse de usar project_info en lugar de metadata en todo el código content = re.sub(r"memory\.metadata\.", "memory.project_info.", content) content = re.sub(r"self\.metadata\.", "self.project_info.", content) # Corregir timestamps utcnow() content = content.replace( "datetime.utcnow()", "datetime.datetime.now(datetime.timezone.utc)" ) # Guardar el archivo modificado with open("project.py", "w") as f: f.write(content) print("Modificación completada correctamente") ================ File: paelladoc/domain/models/language.py ================ """Language model for PAELLADOC. This module defines the supported languages and their metadata. Following BCP 47 language tags (e.g., en-US, es-ES). """ from dataclasses import dataclass from typing import Dict, List from enum import Enum @dataclass class Language: """Represents a supported language with its code and name.""" code: str name: str native_name: str = "" class LanguageService: """Service for managing supported languages.""" # Core supported languages (minimal set for initial implementation) SUPPORTED_LANGUAGES: Dict[str, Language] = { "es-ES": Language("es-ES", "Spanish (Spain)", "Español (España)"), "en-US": Language("en-US", "English (US)", "English (US)"), } @classmethod def get_language(cls, code: str) -> Language: """Get language by code.""" return cls.SUPPORTED_LANGUAGES.get(code, Language(code, code, code)) @classmethod def get_all_languages(cls) -> List[Language]: """Get all supported languages.""" return list(cls.SUPPORTED_LANGUAGES.values()) @classmethod def is_supported(cls, code: str) -> bool: """Check if a language code is supported.""" return code in cls.SUPPORTED_LANGUAGES class SupportedLanguage(str, Enum): """ Supported languages for PAELLADOC interaction and documentation. Uses standard language codes (e.g., en-US, es-ES). """ EN_US = "en-US" # English (US) ES_ES = "es-ES" # Spanish (Spain) @classmethod def from_code(cls, code: str) -> "SupportedLanguage": """Convert a language code to a SupportedLanguage enum.""" code = code.lower() if code in ["en", "en-us"]: return cls.EN_US elif code in ["es", "es-es"]: return cls.ES_ES raise ValueError(f"Unsupported language code: {code}") def __str__(self) -> str: """Return the language code as a string.""" return self.value ================ File: paelladoc/domain/models/project.py ================ from typing import List, Dict, Optional, Any from pydantic import BaseModel, Field import datetime from pathlib import Path import uuid from .enums import DocumentStatus, Bucket from ..services.time_service import TimeService import logging logger = logging.getLogger(__name__) # Singleton instance of the time service # This will be injected by the application layer time_service: TimeService = None def set_time_service(service: TimeService): """Set the time service instance to be used by the domain models.""" global time_service time_service = service class ProjectDocument(BaseModel): name: str # e.g., "README.md", "CONTRIBUTING.md" template_origin: Optional[str] = None # Path or identifier of the template used status: DocumentStatus = DocumentStatus.PENDING class ProjectInfo(BaseModel): name: str = Field(..., min_length=1) language: Optional[str] = None purpose: Optional[str] = None target_audience: Optional[str] = None objectives: Optional[List[str]] = Field(default_factory=list) base_path: Optional[Path] = None interaction_language: Optional[str] = None documentation_language: Optional[str] = None taxonomy_version: str = "1.0" # Default or loaded? # Add the new taxonomy fields here as well for the domain model platform_taxonomy: str domain_taxonomy: str size_taxonomy: str compliance_taxonomy: str # Consider if this one could truly be optional sometimes? custom_taxonomy: Optional[Dict[str, Any]] = Field(default_factory=dict) taxonomy_validation: Optional[Dict[str, Any]] = Field(default_factory=dict) model_config = {"arbitrary_types_allowed": True, "validate_assignment": True} class ArtifactMeta(BaseModel): """Metadata for an artifact categorized according to the MECE taxonomy""" id: uuid.UUID = Field(default_factory=uuid.uuid4) name: str bucket: Bucket path: Path # Relative path from project root created_at: datetime.datetime = None updated_at: datetime.datetime = None created_by: Optional[str] = None modified_by: Optional[str] = None status: DocumentStatus = DocumentStatus.PENDING def __init__(self, **data: Any): super().__init__(**data) if not time_service: raise RuntimeError("TimeService not initialized") now = time_service.get_current_time() if self.created_at is None: self.created_at = now if self.updated_at is None: self.updated_at = now if self.created_by is not None and self.modified_by is None: self.modified_by = self.created_by def update_timestamp(self, modifier: Optional[str] = None): if not time_service: raise RuntimeError("TimeService not initialized") self.updated_at = time_service.get_current_time() if modifier: self.modified_by = modifier def update_status(self, status: DocumentStatus, modifier: Optional[str] = None): self.status = status self.update_timestamp(modifier=modifier) class ProjectMemory(BaseModel): project_info: ProjectInfo documents: Dict[str, ProjectDocument] = {} # Dict key is document name/path # New taxonomy-based structure taxonomy_version: str = "0.5" artifacts: Dict[Bucket, List[ArtifactMeta]] = Field( default_factory=lambda: {bucket: [] for bucket in Bucket} ) # Consider adding: achievements, issues, decisions later? created_at: datetime.datetime = None last_updated_at: datetime.datetime = None created_by: Optional[str] = None modified_by: Optional[str] = None # Add the new taxonomy fields here directly if they belong to the ProjectMemory state # Or ensure they are loaded/accessed via metadata if that's the design # Let's add them directly for consistency with the DB model and tests platform_taxonomy: str domain_taxonomy: str size_taxonomy: str compliance_taxonomy: str # Consider if this one could truly be optional sometimes? custom_taxonomy: Optional[Dict[str, Any]] = Field(default_factory=dict) taxonomy_validation: Optional[Dict[str, Any]] = Field(default_factory=dict) def __init__(self, **data): # Rename metadata to project_metadata if needed for backward compatibility if "metadata" in data and "project_metadata" not in data: data["project_metadata"] = data.pop("metadata") super().__init__(**data) if not time_service: raise RuntimeError("TimeService not initialized") if self.created_at is None: self.created_at = time_service.get_current_time() if self.last_updated_at is None: self.last_updated_at = time_service.get_current_time() def update_timestamp(self): if not time_service: raise RuntimeError("TimeService not initialized") self.last_updated_at = time_service.get_current_time() def get_document(self, name: str) -> Optional[ProjectDocument]: return self.documents.get(name) def update_document_status(self, name: str, status: DocumentStatus): doc = self.get_document(name) if doc: doc.status = status self.update_timestamp() else: # TODO: Decide error handling (log or raise?) # For now, just pass # Consider logging: logger.warning( # f"Attempted to update status for non-existent doc: {name}" # ) pass def add_document(self, doc: ProjectDocument): if doc.name not in self.documents: self.documents[doc.name] = doc self.update_timestamp() else: # TODO: Decide error handling (log or raise?) # For now, just pass # Consider logging: logger.warning( # f"Attempted to add duplicate document: {doc.name}" # ) pass # New methods for artifact management def get_artifact(self, bucket: Bucket, name: str) -> Optional[ArtifactMeta]: """Get an artifact by bucket and name""" for artifact in self.artifacts.get(bucket, []): if artifact.name == name: return artifact return None def get_artifact_by_path(self, path: Path) -> Optional[ArtifactMeta]: """Get an artifact by path, searching across all buckets""" path_str = str(path) for bucket_artifacts in self.artifacts.values(): for artifact in bucket_artifacts: if str(artifact.path) == path_str: return artifact return None def add_artifact(self, artifact: ArtifactMeta) -> bool: """Adds an artifact to the correct bucket, checking for path duplicates.""" # Check if artifact with the same path already exists in any bucket for bucket_artifacts in self.artifacts.values(): for existing_artifact in bucket_artifacts: if existing_artifact.path == artifact.path: logger.warning( f"Artifact with path {artifact.path} already exists." ) return False # Indicate artifact was not added if artifact.bucket not in self.artifacts: self.artifacts[artifact.bucket] = [] self.artifacts[artifact.bucket].append(artifact) self._update_timestamp() return True # Indicate artifact was added def update_artifact_status( self, bucket: Bucket, artifact_name: str, new_status: DocumentStatus, modifier: Optional[str] = None, ) -> bool: """Updates the status of a specific artifact.""" artifact = self.get_artifact(bucket, artifact_name) if artifact: artifact.status = new_status artifact.updated_at = datetime.datetime.now(datetime.timezone.utc) if modifier: artifact.modified_by = modifier self._update_timestamp() return True return False def _update_timestamp(self): """Updates the last_updated_at timestamp.""" self.last_updated_at = datetime.datetime.now(datetime.timezone.utc) def get_bucket_completion(self, bucket: Bucket) -> dict: """Get completion stats for a bucket""" artifacts = self.artifacts.get(bucket, []) total = len(artifacts) completed = sum(1 for a in artifacts if a.status == DocumentStatus.COMPLETED) in_progress = sum( 1 for a in artifacts if a.status == DocumentStatus.IN_PROGRESS ) pending = total - completed - in_progress return { "total": total, "completed": completed, "in_progress": in_progress, "pending": pending, "completion_percentage": (completed / total * 100) if total > 0 else 0, } def get_phase_completion(self, phase: str) -> dict: """Get completion stats for an entire phase""" phase_buckets = Bucket.get_phase_buckets(phase) total = 0 completed = 0 in_progress = 0 for bucket in phase_buckets: stats = self.get_bucket_completion(bucket) total += stats["total"] completed += stats["completed"] in_progress += stats["in_progress"] pending = total - completed - in_progress return { "phase": phase, "buckets": len(phase_buckets), "total": total, "completed": completed, "in_progress": in_progress, "pending": pending, "completion_percentage": (completed / total * 100) if total > 0 else 0, } ================ File: paelladoc/domain/services/time_service.py ================ """Time service for domain timestamp handling.""" from abc import ABC, abstractmethod import datetime class TimeService(ABC): """Abstract base class for time operations in the domain.""" @abstractmethod def get_current_time(self) -> datetime.datetime: """Get current timestamp in UTC. Returns: datetime.datetime: Current time in UTC. """ pass @abstractmethod def ensure_utc(self, dt: datetime.datetime) -> datetime.datetime: """Ensure a datetime is in UTC. If the datetime has no timezone info, assumes it's in UTC. Args: dt: Datetime to convert Returns: datetime.datetime: UTC datetime with timezone info """ pass ================ File: tests/conftest.py ================ import pytest from datetime import datetime, timezone, timedelta from pathlib import Path import sys # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.absolute() sys.path.insert(0, str(project_root)) # Import TimeService components from paelladoc.domain.services.time_service import TimeService from paelladoc.domain.models.project import set_time_service class MockTimeService(TimeService): """Mock time service for testing.""" def __init__(self, fixed_time=None): """Initialize with optional fixed time.""" self.fixed_time = fixed_time or datetime.now(timezone.utc) self.call_count = 0 def get_current_time(self) -> datetime: """Get the mocked current time, incrementing by microseconds on each call.""" # Increment call count self.call_count += 1 # Return fixed time plus microseconds based on call count to ensure # timestamps are different when multiple calls happen in sequence return self.fixed_time + timedelta(microseconds=self.call_count) def ensure_utc(self, dt: datetime) -> datetime: """Ensure a datetime is in UTC.""" if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) @pytest.fixture(scope="session", autouse=True) def setup_time_service(): """Set up the time service globally for all tests.""" # Using a fixed time for consistent testing fixed_time = datetime(2025, 4, 20, 12, 0, 0, tzinfo=timezone.utc) mock_service = MockTimeService(fixed_time) set_time_service(mock_service) return mock_service ================ File: tests/README.md ================ # MCP Server Tests This directory contains tests for the Paelladoc MCP server following hexagonal architecture principles. Tests are organized into three main categories: ## Test Structure ``` tests/ ├── unit/ # Unit tests for individual components │ └── test_tools.py # Tests for MCP tools in isolation ├── integration/ # Integration tests for component interactions │ └── test_server.py # Tests for server STDIO communication └── e2e/ # End-to-end tests simulating real-world usage └── test_cursor_simulation.py # Simulates Cursor interaction ``` ## Test Categories 1. **Unit Tests** (`unit/`) - Test individual functions/components in isolation - Don't require a running server - Fast to execute - Example: Testing the `ping()` function directly 2. **Integration Tests** (`integration/`) - Test interactions between components - Verify STDIO communication with the server - Example: Starting the server and sending/receiving messages 3. **End-to-End Tests** (`e2e/`) - Simulate real-world usage scenarios - Test the system as a whole - Example: Simulating how Cursor would interact with the server ## Running Tests ### Run All Tests ```bash python -m unittest discover mcp_server/tests ``` ### Run Tests by Category ```bash # Unit tests only python -m unittest discover mcp_server/tests/unit # Integration tests only python -m unittest discover mcp_server/tests/integration # End-to-end tests only python -m unittest discover mcp_server/tests/e2e ``` ### Run a Specific Test File ```bash python -m unittest mcp_server/tests/unit/test_tools.py ``` ### Run a Specific Test Case ```bash python -m unittest mcp_server.tests.unit.test_tools.TestToolsPing ``` ### Run a Specific Test Method ```bash python -m unittest mcp_server.tests.unit.test_tools.TestToolsPing.test_ping_returns_dict ``` ## TDD Process These tests follow the Test-Driven Development (TDD) approach: 1. **RED**: Write failing tests first 2. **GREEN**: Implement the minimal code to make tests pass 3. **REFACTOR**: Improve the code while keeping tests passing ## Adding New Tests When adding new MCP tools: 1. Create unit tests for the tool's functionality 2. Add integration tests for the tool's STDIO communication 3. Update E2E tests to verify Cursor interaction with the tool ================ File: tests/unit/test_ping_tool.py ================ """ Unit tests for Paelladoc MCP tools. Following TDD approach - tests are written before implementation. """ import unittest import sys from pathlib import Path # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.absolute() if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) # Import directly from the domain layer from paelladoc.domain import core_logic class TestPingTool(unittest.TestCase): """Unit tests for the ping tool following TDD methodology.""" def test_ping_exists(self): """Test that the ping function exists.""" self.assertTrue( hasattr(core_logic, "ping"), "The ping function does not exist in core_logic", ) def test_ping_returns_dict(self): """Test that ping returns a dictionary.""" result = core_logic.ping() self.assertIsInstance(result, dict, "ping should return a dictionary") def test_ping_has_required_fields(self): """Test that ping response has the required fields.""" result = core_logic.ping() self.assertIn("status", result, "ping response should contain a 'status' field") self.assertIn( "message", result, "ping response should contain a 'message' field" ) def test_ping_returns_expected_values(self): """Test that ping returns the expected values.""" result = core_logic.ping() self.assertEqual( result["status"], "ok", f"ping status should be 'ok', got '{result['status']}'", ) self.assertEqual( result["message"], "pong", f"ping message should be 'pong', got '{result['message']}'", ) if __name__ == "__main__": unittest.main() ================ File: tests/unit/config/test_database.py ================ """Unit tests for database configuration module.""" import os from pathlib import Path import pytest from paelladoc.config.database import ( get_project_root, get_db_path, PRODUCTION_DB_PATH, ) @pytest.fixture def clean_env(): """Remove relevant environment variables before each test.""" # Store original values original_db_path = os.environ.get("PAELLADOC_DB_PATH") original_env = os.environ.get("PAELLADOC_ENV") # Remove variables if "PAELLADOC_DB_PATH" in os.environ: del os.environ["PAELLADOC_DB_PATH"] if "PAELLADOC_ENV" in os.environ: del os.environ["PAELLADOC_ENV"] yield # Restore original values if original_db_path is not None: os.environ["PAELLADOC_DB_PATH"] = original_db_path if original_env is not None: os.environ["PAELLADOC_ENV"] = original_env def test_get_project_root(): """Test that get_project_root returns a valid directory.""" root = get_project_root() assert isinstance(root, Path) assert root.exists() assert root.is_dir() assert (root / "src").exists() assert (root / "src" / "paelladoc").exists() assert (root / "pyproject.toml").exists() def test_get_db_path_with_env_var(clean_env): """Test that PAELLADOC_DB_PATH environment variable takes precedence.""" custom_path = "/custom/path/db.sqlite" os.environ["PAELLADOC_DB_PATH"] = custom_path db_path = get_db_path() assert isinstance(db_path, Path) assert str(db_path) == custom_path def test_get_db_path_production_default(clean_env): """Test that production mode uses home directory.""" db_path = get_db_path() assert isinstance(db_path, Path) assert db_path == PRODUCTION_DB_PATH assert db_path.name == "memory.db" assert db_path.parent.name == ".paelladoc" assert db_path.parent.parent == Path.home() def test_production_db_path_constant(): """Test that PRODUCTION_DB_PATH is correctly set.""" assert isinstance(PRODUCTION_DB_PATH, Path) assert PRODUCTION_DB_PATH.name == "memory.db" assert PRODUCTION_DB_PATH.parent.name == ".paelladoc" assert PRODUCTION_DB_PATH.parent.parent == Path.home() ================ File: tests/unit/application/utils/test_behavior_enforcer.py ================ """ Unit tests for the BehaviorEnforcer utility. """ import unittest import sys from pathlib import Path from typing import Set, Optional # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) # Module to test from paelladoc.application.utils.behavior_enforcer import ( BehaviorEnforcer, BehaviorViolationError, ) # Mock context object for tests class MockContext: def __init__(self, collected_params: Optional[Set[str]] = None): self.progress = { "collected_params": collected_params if collected_params is not None else set() } class TestBehaviorEnforcer(unittest.TestCase): """Unit tests for the BehaviorEnforcer.""" def setUp(self): self.tool_name = "test.tool" self.sequence = ["param1", "param2", "param3"] self.behavior_config = {"fixed_question_order": self.sequence} def test_enforce_no_config(self): """Test that enforcement passes if no behavior_config is provided.""" try: BehaviorEnforcer.enforce(self.tool_name, None, MockContext(), {"arg": 1}) except BehaviorViolationError: self.fail("Enforcement should pass when no config is given.") def test_enforce_no_fixed_order(self): """Test enforcement passes if 'fixed_question_order' is not in config.""" config = {"other_rule": True} try: BehaviorEnforcer.enforce( self.tool_name, config, MockContext(), {"param1": "value"} ) except BehaviorViolationError: self.fail( "Enforcement should pass when fixed_question_order is not defined." ) def test_enforce_no_context_or_args(self): """Test enforcement passes (logs warning) if context or args are missing.""" # Note: Current implementation returns None (passes), might change behavior later. try: BehaviorEnforcer.enforce(self.tool_name, self.behavior_config, None, None) BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, MockContext(), None ) BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, None, {"param1": "a"} ) except BehaviorViolationError: self.fail("Enforcement should pass when context or args are missing.") def test_enforce_no_new_params_provided(self): """Test enforcement passes if no *new* parameters are provided.""" ctx = MockContext(collected_params={"param1"}) # Providing only already collected param provided_args = {"param1": "new_value", "param2": None} try: BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) except BehaviorViolationError as e: self.fail( f"Enforcement should pass when only old params are provided. Raised: {e}" ) def test_enforce_correct_first_param(self): """Test enforcement passes when the correct first parameter is provided.""" ctx = MockContext() provided_args = {"param1": "value1"} try: BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) except BehaviorViolationError as e: self.fail(f"Enforcement failed for correct first param. Raised: {e}") def test_enforce_correct_second_param(self): """Test enforcement passes when the correct second parameter is provided.""" ctx = MockContext(collected_params={"param1"}) provided_args = { "param1": "value1", "param2": "value2", } # param1 is old, param2 is new try: BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) except BehaviorViolationError as e: self.fail(f"Enforcement failed for correct second param. Raised: {e}") def test_enforce_incorrect_first_param(self): """Test enforcement fails when the wrong first parameter is provided.""" ctx = MockContext() provided_args = {"param2": "value2"} # Should be param1 with self.assertRaisesRegex( BehaviorViolationError, "Expected next: 'param1'. Got unexpected new parameter: 'param2'", ): BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) def test_enforce_incorrect_second_param(self): """Test enforcement fails when the wrong second parameter is provided.""" ctx = MockContext(collected_params={"param1"}) provided_args = {"param1": "val1", "param3": "value3"} # Should be param2 with self.assertRaisesRegex( BehaviorViolationError, "Expected next: 'param2'. Got unexpected new parameter: 'param3'", ): BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) def test_enforce_multiple_new_params_fails(self): """Test enforcement fails when multiple new parameters are provided at once.""" ctx = MockContext() provided_args = {"param1": "value1", "param2": "value2"} # Both are new # Adjust regex to match the more detailed error message expected_regex = ( r"Tool 'test.tool' expects parameters sequentially. " r"Expected next: 'param1'. " # Use regex to handle potential set order variations {'param1', 'param2'} or {'param2', 'param1'} r"Provided multiple new parameters: {('param1', 'param2'|'param2', 'param1')}. " r"Collected so far: set\(\)." ) with self.assertRaisesRegex(BehaviorViolationError, expected_regex): BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) def test_enforce_multiple_new_params_later_fails(self): """Test enforcement fails when multiple new params are provided later in sequence.""" ctx = MockContext(collected_params={"param1"}) provided_args = { "param1": "v1", "param2": "value2", "param3": "value3", } # param2 and param3 are new # Adjust regex to match the more detailed error message expected_regex = ( r"Tool 'test.tool' expects parameters sequentially. " r"Expected next: 'param2'. " # Use regex to handle potential set order variations r"Provided multiple new parameters: {('param2', 'param3'|'param3', 'param2')}. " r"Collected so far: {'param1'}." ) with self.assertRaisesRegex(BehaviorViolationError, expected_regex): BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) def test_enforce_params_after_sequence_complete_passes(self): """Test enforcement passes when providing args after the sequence is complete.""" ctx = MockContext(collected_params={"param1", "param2", "param3"}) provided_args = { "param1": "v1", "param2": "v2", "param3": "v3", "optional_param": "opt", } try: BehaviorEnforcer.enforce( self.tool_name, self.behavior_config, ctx, provided_args ) except BehaviorViolationError as e: self.fail( f"Enforcement should pass for args after sequence complete. Raised: {e}" ) # if __name__ == "__main__": # unittest.main() ================ File: tests/unit/application/services/test_memory_service.py ================ """ Unit tests for the MemoryService. """ from unittest.mock import AsyncMock # Use AsyncMock for async methods import sys from pathlib import Path import pytest # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) # Modules to test from paelladoc.application.services.memory_service import MemoryService from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, # Ensure this line is correct ) from paelladoc.ports.output.memory_port import MemoryPort # --- Pytest Fixtures --- @pytest.fixture def mock_memory_port() -> AsyncMock: """Provides a mocked MemoryPort instance for tests.""" return AsyncMock(spec=MemoryPort) @pytest.fixture def memory_service(mock_memory_port: AsyncMock) -> MemoryService: """Provides a MemoryService instance initialized with a mocked port.""" return MemoryService(memory_port=mock_memory_port) # --- Tests for Taxonomy Events (Pytest Style) --- @pytest.mark.asyncio async def test_update_project_memory_emits_taxonomy_updated_event( memory_service: MemoryService, mock_memory_port: AsyncMock ): """Test that taxonomy_updated event is emitted when taxonomy fields change.""" # Arrange project_name = "tax-event-project" old_memory = ProjectMemory( project_info=ProjectInfo( name=project_name, base_path="/fake", taxonomy_version="1.0" ) ) old_memory.platform_taxonomy = "web-frontend" old_memory.domain_taxonomy = "ecommerce" old_memory.size_taxonomy = "smb" old_memory.compliance_taxonomy = None new_memory = ProjectMemory( project_info=ProjectInfo( name=project_name, base_path="/fake", taxonomy_version="1.0" ) ) new_memory.platform_taxonomy = "ios-native" # Changed new_memory.domain_taxonomy = "ecommerce" new_memory.size_taxonomy = "enterprise" # Changed new_memory.compliance_taxonomy = "gdpr" # Changed # Mock the port methods mock_memory_port.project_exists.return_value = True mock_memory_port.load_memory.return_value = old_memory mock_memory_port.save_memory.return_value = None # Async function returns None # Create and register a mock event handler mock_handler = AsyncMock() memory_service.register_event_handler("taxonomy_updated", mock_handler) # Also register for project_updated to ensure it's still called mock_project_updated_handler = AsyncMock() memory_service.register_event_handler( "project_updated", mock_project_updated_handler ) # Act await memory_service.update_project_memory(new_memory) # Assert mock_memory_port.save_memory.assert_awaited_once_with(new_memory) # Check project_updated event was called mock_project_updated_handler.assert_awaited_once() assert mock_project_updated_handler.await_args[0][0] == "project_updated" # Check taxonomy_updated event was called with correct data mock_handler.assert_awaited_once() event_name, event_data = mock_handler.await_args[0] assert event_name == "taxonomy_updated" assert event_data["project_name"] == project_name assert event_data["new_taxonomy"] == { "platform": "ios-native", "domain": "ecommerce", "size": "enterprise", "compliance": "gdpr", } assert event_data["old_taxonomy"] == { "platform": "web-frontend", "domain": "ecommerce", "size": "smb", "compliance": None, } @pytest.mark.asyncio async def test_update_project_memory_no_taxonomy_change_no_event( memory_service: MemoryService, mock_memory_port: AsyncMock ): """Test that taxonomy_updated event is NOT emitted if taxonomy fields don't change.""" # Arrange project_name = "no-tax-event-project" old_memory = ProjectMemory( project_info=ProjectInfo( name=project_name, base_path="/fake", taxonomy_version="1.0" ) ) old_memory.platform_taxonomy = "web-frontend" old_memory.domain_taxonomy = "ecommerce" old_memory.size_taxonomy = "smb" old_memory.compliance_taxonomy = None new_memory = ProjectMemory( project_info=ProjectInfo( name=project_name, base_path="/fake", taxonomy_version="1.0" ) ) # Keep taxonomy fields the same new_memory.platform_taxonomy = "web-frontend" new_memory.domain_taxonomy = "ecommerce" new_memory.size_taxonomy = "smb" new_memory.compliance_taxonomy = None # Make some other change to trigger update new_memory.project_info.purpose = "Updated purpose" # Mock the port methods mock_memory_port.project_exists.return_value = True mock_memory_port.load_memory.return_value = old_memory mock_memory_port.save_memory.return_value = None # Create and register a mock event handler mock_handler = AsyncMock() memory_service.register_event_handler("taxonomy_updated", mock_handler) # Also register for project_updated to ensure it's still called mock_project_updated_handler = AsyncMock() memory_service.register_event_handler( "project_updated", mock_project_updated_handler ) # Act await memory_service.update_project_memory(new_memory) # Assert mock_memory_port.save_memory.assert_awaited_once_with(new_memory) # Check project_updated event was called (because metadata changed) mock_project_updated_handler.assert_awaited_once() # Check taxonomy_updated event was NOT called mock_handler.assert_not_awaited() # NOTE: Keep the existing unittest class for other tests for now, or refactor all later. # If keeping both styles, ensure imports and module structure support it. # class TestMemoryService(unittest.IsolatedAsyncioTestCase): # ... (existing unittest tests) ... ================ File: tests/unit/application/services/test_vector_store_service.py ================ """ Unit tests for the VectorStoreService. """ import unittest from unittest.mock import AsyncMock, MagicMock # Added MagicMock for SearchResult import sys from pathlib import Path from typing import List, Dict, Any # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) # Modules to test from paelladoc.application.services.vector_store_service import VectorStoreService from paelladoc.ports.output.vector_store_port import VectorStorePort, SearchResult # Dummy SearchResult implementation for tests class MockSearchResult(SearchResult): def __init__( self, id: str, distance: float, metadata: Dict[str, Any], document: str ): self.id = id self.distance = distance self.metadata = metadata self.document = document class TestVectorStoreService(unittest.IsolatedAsyncioTestCase): """Unit tests for the VectorStoreService using a mocked VectorStorePort.""" def setUp(self): """Set up a mocked VectorStorePort before each test.""" self.mock_vector_store_port = AsyncMock(spec=VectorStorePort) self.vector_store_service = VectorStoreService( vector_store_port=self.mock_vector_store_port ) # --- Test Cases --- # async def test_add_texts_to_collection_calls_port(self): """Verify add_texts_to_collection calls add_documents on the port.""" collection_name = "test_coll" documents = ["doc1", "doc2"] metadatas = [{"s": 1}, {"s": 2}] ids = ["id1", "id2"] expected_ids = ids self.mock_vector_store_port.add_documents.return_value = expected_ids actual_ids = await self.vector_store_service.add_texts_to_collection( collection_name, documents, metadatas, ids ) self.mock_vector_store_port.add_documents.assert_awaited_once_with( collection_name=collection_name, documents=documents, metadatas=metadatas, ids=ids, ) self.assertEqual(actual_ids, expected_ids) async def test_add_texts_to_collection_reraises_exception(self): """Verify add_texts_to_collection re-raises port exceptions.""" collection_name = "test_coll_fail" documents = ["doc1"] test_exception = ValueError("Port error") self.mock_vector_store_port.add_documents.side_effect = test_exception with self.assertRaises(ValueError) as cm: await self.vector_store_service.add_texts_to_collection( collection_name, documents ) self.assertEqual(cm.exception, test_exception) self.mock_vector_store_port.add_documents.assert_awaited_once() async def test_find_similar_texts_calls_port(self): """Verify find_similar_texts calls search_similar on the port.""" collection_name = "test_search_coll" query_texts = ["query1"] n_results = 3 filter_metadata = {"year": 2024} filter_document = None # Example expected_results: List[List[SearchResult]] = [ [MockSearchResult("res1", 0.5, {"year": 2024}, "doc text")] ] self.mock_vector_store_port.search_similar.return_value = expected_results actual_results = await self.vector_store_service.find_similar_texts( collection_name, query_texts, n_results, filter_metadata, filter_document ) self.mock_vector_store_port.search_similar.assert_awaited_once_with( collection_name=collection_name, query_texts=query_texts, n_results=n_results, where=filter_metadata, where_document=filter_document, include=[ "metadatas", "documents", "distances", "ids", ], # Check default include ) self.assertEqual(actual_results, expected_results) async def test_find_similar_texts_reraises_exception(self): """Verify find_similar_texts re-raises port exceptions.""" collection_name = "test_search_fail" query_texts = ["query1"] test_exception = RuntimeError("Search failed") self.mock_vector_store_port.search_similar.side_effect = test_exception with self.assertRaises(RuntimeError) as cm: await self.vector_store_service.find_similar_texts( collection_name, query_texts ) self.assertEqual(cm.exception, test_exception) self.mock_vector_store_port.search_similar.assert_awaited_once() async def test_ensure_collection_exists_calls_port(self): """Verify ensure_collection_exists calls get_or_create_collection on the port.""" collection_name = "ensure_coll" # Mock the port method to return a dummy collection object (can be anything) self.mock_vector_store_port.get_or_create_collection.return_value = MagicMock() await self.vector_store_service.ensure_collection_exists(collection_name) self.mock_vector_store_port.get_or_create_collection.assert_awaited_once_with( collection_name ) async def test_ensure_collection_exists_reraises_exception(self): """Verify ensure_collection_exists re-raises port exceptions.""" collection_name = "ensure_coll_fail" test_exception = ConnectionError("DB down") self.mock_vector_store_port.get_or_create_collection.side_effect = ( test_exception ) with self.assertRaises(ConnectionError) as cm: await self.vector_store_service.ensure_collection_exists(collection_name) self.assertEqual(cm.exception, test_exception) self.mock_vector_store_port.get_or_create_collection.assert_awaited_once_with( collection_name ) async def test_remove_collection_calls_port(self): """Verify remove_collection calls delete_collection on the port.""" collection_name = "remove_coll" self.mock_vector_store_port.delete_collection.return_value = ( None # Method returns None ) await self.vector_store_service.remove_collection(collection_name) self.mock_vector_store_port.delete_collection.assert_awaited_once_with( collection_name ) async def test_remove_collection_reraises_exception(self): """Verify remove_collection re-raises port exceptions.""" collection_name = "remove_coll_fail" test_exception = TimeoutError("Delete timed out") self.mock_vector_store_port.delete_collection.side_effect = test_exception with self.assertRaises(TimeoutError) as cm: await self.vector_store_service.remove_collection(collection_name) self.assertEqual(cm.exception, test_exception) self.mock_vector_store_port.delete_collection.assert_awaited_once_with( collection_name ) # if __name__ == "__main__": # unittest.main() ================ File: tests/unit/domain/models/test_project.py ================ import json import pytest from datetime import datetime from pathlib import Path from paelladoc.domain.models.project import ( DocumentStatus, Bucket, ArtifactMeta, ProjectInfo, ProjectMemory, # ProjectDocument, # Assuming this was removed or is internal ) class TestBucket: """Tests for the Bucket enum""" def test_bucket_values(self): """Test that all buckets have the correct string format""" for bucket in Bucket: if bucket is not Bucket.UNKNOWN: # Format should be "Phase::Subcategory" assert "::" in bucket.value phase, subcategory = bucket.value.split("::") assert phase in [ "Initiate", "Elaborate", "Govern", "Generate", "Maintain", "Deploy", "Operate", "Iterate", ] assert len(subcategory) > 0 else: assert bucket.value == "Unknown" def test_get_phase_buckets(self): """Test the get_phase_buckets class method""" initiate_buckets = Bucket.get_phase_buckets("Initiate") assert len(initiate_buckets) == 2 assert Bucket.INITIATE_CORE_SETUP in initiate_buckets assert Bucket.INITIATE_INITIAL_PRODUCT_DOCS in initiate_buckets elaborate_buckets = Bucket.get_phase_buckets("Elaborate") assert len(elaborate_buckets) == 4 # Should return empty set for non-existent phase nonexistent_buckets = Bucket.get_phase_buckets("NonExistent") assert len(nonexistent_buckets) == 0 class TestArtifactMeta: """Tests for the ArtifactMeta model""" def test_create_artifact_meta(self): """Test creating an ArtifactMeta instance""" artifact = ArtifactMeta( name="test_artifact", bucket=Bucket.INITIATE_CORE_SETUP, path=Path("docs/test_artifact.md"), status=DocumentStatus.IN_PROGRESS, ) assert artifact.name == "test_artifact" assert artifact.bucket == Bucket.INITIATE_CORE_SETUP assert artifact.path == Path("docs/test_artifact.md") assert artifact.status == DocumentStatus.IN_PROGRESS assert isinstance(artifact.created_at, datetime) assert isinstance(artifact.updated_at, datetime) def test_update_status(self): """Test updating an artifact's status""" artifact = ArtifactMeta( name="test_artifact", bucket=Bucket.INITIATE_CORE_SETUP, path=Path("docs/test_artifact.md"), ) # Default status should be PENDING assert artifact.status == DocumentStatus.PENDING # Store the original timestamp original_updated_at = artifact.updated_at # Update the status artifact.update_status(DocumentStatus.COMPLETED) # Check that status was updated assert artifact.status == DocumentStatus.COMPLETED # Check that timestamp was updated assert artifact.updated_at > original_updated_at def test_serialization_deserialization(self): """Test that ArtifactMeta can be serialized and deserialized""" artifact = ArtifactMeta( name="test_artifact", bucket=Bucket.ELABORATE_DISCOVERY_AND_RESEARCH, path=Path("docs/research/test_artifact.md"), status=DocumentStatus.COMPLETED, ) # Serialize to JSON artifact_json = artifact.model_dump_json() # Deserialize from JSON loaded_artifact = ArtifactMeta.model_validate_json(artifact_json) # Check that all fields were preserved assert loaded_artifact.name == artifact.name assert loaded_artifact.bucket == artifact.bucket assert loaded_artifact.path == artifact.path assert loaded_artifact.status == artifact.status assert loaded_artifact.created_at == artifact.created_at assert loaded_artifact.updated_at == artifact.updated_at @pytest.fixture def sample_project_memory(): """Fixture to create a sample ProjectMemory instance for testing.""" project_info = ProjectInfo( name="Test Project", description="A test project.", base_path="/path/to/project", documentation_language="en", interaction_language="en", platform_taxonomy="test_platform", # Added domain_taxonomy="test_domain", # Added size_taxonomy="test_size", # Added compliance_taxonomy="test_compliance" # Added ) return ProjectMemory( info=project_info, created_at=datetime.now(), updated_at=datetime.now(), metadata={"version": "1.0"} ) def test_project_info_initialization(): """Test ProjectInfo initialization.""" info = ProjectInfo( name="Another Test", description="Detailed desc.", base_path="/tmp", documentation_language="es", interaction_language="es", platform_taxonomy="test_platform_2", # Added domain_taxonomy="test_domain_2", # Added size_taxonomy="test_size_2", # Added compliance_taxonomy="test_compliance_2" # Added ) assert info.name == "Another Test" assert info.description == "Detailed desc." assert info.base_path == "/tmp" assert info.documentation_language == "es" assert info.interaction_language == "es" assert info.platform_taxonomy == "test_platform_2" assert info.domain_taxonomy == "test_domain_2" assert info.size_taxonomy == "test_size_2" assert info.compliance_taxonomy == "test_compliance_2" def test_project_memory_initialization(sample_project_memory): """Test ProjectMemory initialization using the fixture.""" assert sample_project_memory.info.name == "Test Project" assert "version" in sample_project_memory.metadata assert isinstance(sample_project_memory.created_at, datetime) assert isinstance(sample_project_memory.updated_at, datetime) # Check taxonomy fields added in fixture assert sample_project_memory.info.platform_taxonomy == "test_platform" assert sample_project_memory.info.domain_taxonomy == "test_domain" assert sample_project_memory.info.size_taxonomy == "test_size" assert sample_project_memory.info.compliance_taxonomy == "test_compliance" def test_project_memory_update(sample_project_memory): """Test ProjectMemory update.""" # Implementation of the test_project_memory_update method pass class TestProjectMemory: """Tests for the ProjectMemory model with taxonomy support""" def test_project_memory_initialization(self): """Test initializing ProjectMemory with taxonomy support""" project = ProjectMemory( project_info=ProjectInfo( name="test_project", # Add required taxonomy fields platform_taxonomy="test_platform", domain_taxonomy="test_domain", size_taxonomy="test_size", compliance_taxonomy="test_compliance", ), taxonomy_version="0.5", # Add required taxonomy fields also directly to ProjectMemory platform_taxonomy="test_platform", domain_taxonomy="test_domain", size_taxonomy="test_size", compliance_taxonomy="test_compliance", ) # Check that all buckets are initialized for bucket in Bucket: assert bucket in project.artifacts assert isinstance(project.artifacts[bucket], list) assert len(project.artifacts[bucket]) == 0 def test_add_artifact(self, sample_project_memory): """Test adding artifacts to ProjectMemory""" project = sample_project_memory # Check that artifacts were added to the correct buckets assert len(project.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS]) == 1 assert len(project.artifacts[Bucket.ELABORATE_DISCOVERY_AND_RESEARCH]) == 1 assert len(project.artifacts[Bucket.ELABORATE_SPECIFICATION_AND_PLANNING]) == 1 # Check that artifact was added with correct fields initiate_artifact = project.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS][0] assert initiate_artifact.name == "vision_doc" assert initiate_artifact.path == Path("docs/initiation/product_vision.md") assert initiate_artifact.status == DocumentStatus.PENDING # Test adding a duplicate (should return False) duplicate = ArtifactMeta( name="dup_vision", bucket=Bucket.INITIATE_CORE_SETUP, path=Path( "docs/initiation/product_vision.md" ), # Same path as existing artifact ) assert not project.add_artifact(duplicate) # Check that original buckets still have the same count assert len(project.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS]) == 1 assert ( len(project.artifacts[Bucket.INITIATE_CORE_SETUP]) == 0 ) # Duplicate wasn't added def test_get_artifact(self, sample_project_memory): """Test retrieving artifacts by bucket and name""" project = sample_project_memory # Get existing artifact artifact = project.get_artifact( Bucket.ELABORATE_DISCOVERY_AND_RESEARCH, "user_research" ) assert artifact is not None assert artifact.name == "user_research" assert artifact.bucket == Bucket.ELABORATE_DISCOVERY_AND_RESEARCH # Get non-existent artifact non_existent = project.get_artifact(Bucket.DEPLOY_SECURITY, "security_plan") assert non_existent is None def test_get_artifact_by_path(self, sample_project_memory): """Test retrieving artifacts by path""" project = sample_project_memory # Get existing artifact by path artifact = project.get_artifact_by_path(Path("docs/specs/api_specification.md")) assert artifact is not None assert artifact.name == "api_spec" assert artifact.bucket == Bucket.ELABORATE_SPECIFICATION_AND_PLANNING # Get non-existent artifact non_existent = project.get_artifact_by_path(Path("nonexistent/path.md")) assert non_existent is None def test_update_artifact_status(self, sample_project_memory): """Test updating artifact status""" project = sample_project_memory # Update existing artifact success = project.update_artifact_status( Bucket.INITIATE_INITIAL_PRODUCT_DOCS, "vision_doc", DocumentStatus.COMPLETED ) assert success # Verify the status was updated artifact = project.get_artifact( Bucket.INITIATE_INITIAL_PRODUCT_DOCS, "vision_doc" ) assert artifact.status == DocumentStatus.COMPLETED # Try to update non-existent artifact success = project.update_artifact_status( Bucket.DEPLOY_SECURITY, "nonexistent", DocumentStatus.COMPLETED ) assert not success def test_get_bucket_completion(self, sample_project_memory): """Test getting completion stats for buckets""" project = sample_project_memory # Bucket with one completed artifact elaborate_spec_stats = project.get_bucket_completion( Bucket.ELABORATE_SPECIFICATION_AND_PLANNING ) assert elaborate_spec_stats["total"] == 1 assert elaborate_spec_stats["completed"] == 1 assert elaborate_spec_stats["in_progress"] == 0 assert elaborate_spec_stats["pending"] == 0 assert elaborate_spec_stats["completion_percentage"] == 100.0 # Bucket with one in-progress artifact elaborate_research_stats = project.get_bucket_completion( Bucket.ELABORATE_DISCOVERY_AND_RESEARCH ) assert elaborate_research_stats["total"] == 1 assert elaborate_research_stats["completed"] == 0 assert elaborate_research_stats["in_progress"] == 1 assert elaborate_research_stats["pending"] == 0 assert elaborate_research_stats["completion_percentage"] == 0.0 # Empty bucket empty_bucket_stats = project.get_bucket_completion(Bucket.DEPLOY_SECURITY) assert empty_bucket_stats["total"] == 0 assert empty_bucket_stats["completion_percentage"] == 0.0 def test_get_phase_completion(self, sample_project_memory): """Test getting completion stats for entire phases""" project = sample_project_memory # Elaborate phase has 2 artifacts (1 completed, 1 in-progress) elaborate_stats = project.get_phase_completion("Elaborate") assert elaborate_stats["total"] == 2 assert elaborate_stats["completed"] == 1 assert elaborate_stats["in_progress"] == 1 assert elaborate_stats["pending"] == 0 assert elaborate_stats["completion_percentage"] == 50.0 assert elaborate_stats["buckets"] == 4 # All Elaborate buckets # Initiate phase has 1 pending artifact initiate_stats = project.get_phase_completion("Initiate") assert initiate_stats["total"] == 1 assert initiate_stats["completed"] == 0 assert initiate_stats["pending"] == 1 assert initiate_stats["completion_percentage"] == 0.0 # Deploy phase has 0 artifacts deploy_stats = project.get_phase_completion("Deploy") assert deploy_stats["total"] == 0 assert deploy_stats["completion_percentage"] == 0.0 def test_serialization_deserialization(self, sample_project_memory): """Test that ProjectMemory with taxonomy can be serialized and deserialized""" project = sample_project_memory # Serialize to JSON project_json = project.model_dump_json() # Check that JSON is valid parsed_json = json.loads(project_json) assert parsed_json["taxonomy_version"] == "0.5" assert "artifacts" in parsed_json # Deserialize from JSON loaded_project = ProjectMemory.model_validate_json(project_json) # Check that all fields were preserved assert loaded_project.project_info.name == project.project_info.name assert loaded_project.taxonomy_version == project.taxonomy_version # Check artifacts assert Bucket.INITIATE_INITIAL_PRODUCT_DOCS in loaded_project.artifacts assert Bucket.ELABORATE_DISCOVERY_AND_RESEARCH in loaded_project.artifacts assert Bucket.ELABORATE_SPECIFICATION_AND_PLANNING in loaded_project.artifacts # Check specific artifact fields were preserved loaded_artifact = loaded_project.get_artifact( Bucket.ELABORATE_SPECIFICATION_AND_PLANNING, "api_spec" ) assert loaded_artifact is not None assert loaded_artifact.name == "api_spec" assert loaded_artifact.path == Path("docs/specs/api_specification.md") assert loaded_artifact.status == DocumentStatus.COMPLETED # Verify completion stats are calculated correctly after deserialization stats = loaded_project.get_phase_completion("Elaborate") assert stats["completion_percentage"] == 50.0 ================ File: tests/integration/test_server.py ================ #!/usr/bin/env python """ Integration tests for the Paelladoc MCP server. These tests verify that the server correctly starts and responds to requests via STDIO communication. """ import unittest import sys import os import subprocess from pathlib import Path # Removed pty/select imports as PTY test is skipped import signal # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) # Constants SERVER_SCRIPT = project_root / "server.py" class TestServerIntegration(unittest.TestCase): """Integration tests for the MCP server STDIO communication.""" @unittest.skip( "Skipping PTY/STDIO test: FastMCP stdio interaction difficult to replicate reliably outside actual client environment." ) def test_server_responds_to_ping(self): """Verify that the server responds to a ping request via PTY STDIO. (SKIPPED)""" # request_id = str(uuid.uuid4()) # F841 - Removed env = os.environ.copy() env["PYTHONPATH"] = str(project_root) env["PYTHONUNBUFFERED"] = "1" # --- Start server using PTY --- # master_fd, slave_fd = pty.openpty() # PTY logic commented out server_process = None master_fd = None # Ensure master_fd is defined for finally block try: # server_process = subprocess.Popen(...) # os.close(slave_fd) # --- Test Communication --- # response_data = None # F841 - Removed # stderr_output = "" # F841 - Removed again # time.sleep(2) # if server_process.poll() is not None: # ... # mcp_request = {...} # request_json = json.dumps(mcp_request) + "\n" # print(f"Sending request via PTY: {request_json.strip()}") # os.write(master_fd, request_json.encode()) # # Read response from PTY master fd with timeout # stdout_line = "" # buffer = b"" # end_time = time.time() + 5 # while time.time() < end_time: # ... # print(f"Received raw response line: {stdout_line.strip()}") # if not stdout_line: # ... # response_data = json.loads(stdout_line) # print(f"Parsed response: {response_data}") # self.assertEqual(...) pass # Keep test structure but do nothing as it's skipped except Exception as e: # stderr_output = "" # F841 - Removed # ... (error handling commented out) ... self.fail(f"An error occurred during the PTY test (should be skipped): {e}") finally: # --- Cleanup --- if master_fd: try: os.close(master_fd) except OSError: pass if server_process and server_process.poll() is None: print("Terminating server process (if it was started)...") try: os.killpg(os.getpgid(server_process.pid), signal.SIGTERM) server_process.wait(timeout=2) except (ProcessLookupError, subprocess.TimeoutExpired, AttributeError): # Handle cases where process/pgid might not exist if startup failed early print( "Server cleanup notification: process termination might have failed or was not needed." ) if server_process and server_process.poll() is None: try: os.killpg(os.getpgid(server_process.pid), signal.SIGKILL) except Exception: pass # Final attempt except Exception as term_e: print(f"Error during termination: {term_e}") # Read any remaining stderr if server_process and server_process.stderr: stderr_rem = server_process.stderr.read().decode(errors="ignore") if stderr_rem: print(f"Remaining stderr: {stderr_rem}") if __name__ == "__main__": unittest.main() ================ File: tests/integration/test_alembic_config.py ================ """Integration tests for Alembic configuration.""" import os import pytest from pathlib import Path import uuid import subprocess # Import subprocess from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext from sqlalchemy.ext.asyncio import create_async_engine import sys # Import get_db_path to test its behavior directly from paelladoc.config.database import get_db_path # Get project root to build absolute paths if needed PROJECT_ROOT = Path(__file__).parent.parent.parent.parent.absolute() sys.path.insert(0, str(PROJECT_ROOT)) @pytest.fixture def clean_env(): """Remove relevant environment variables before each test.""" original_db_path = os.environ.get("PAELLADOC_DB_PATH") original_env = os.environ.get("PAELLADOC_ENV") if "PAELLADOC_DB_PATH" in os.environ: del os.environ["PAELLADOC_DB_PATH"] if "PAELLADOC_ENV" in os.environ: del os.environ["PAELLADOC_ENV"] yield if original_db_path is not None: os.environ["PAELLADOC_DB_PATH"] = original_db_path if original_env is not None: os.environ["PAELLADOC_ENV"] = original_env @pytest.fixture def temp_db_path(): """Create a temporary database path.""" test_db_name = f"test_alembic_{uuid.uuid4()}.db" # Use a simpler temp directory structure to avoid potential permission issues test_dir = Path("/tmp") / "paelladoc_test_dbs" test_db_path = test_dir / test_db_name test_db_path.parent.mkdir(parents=True, exist_ok=True) yield test_db_path # Cleanup try: if test_db_path.exists(): # No need for asyncio.sleep here as subprocess runs separately os.remove(test_db_path) if test_dir.exists() and not any(test_dir.iterdir()): test_dir.rmdir() except Exception as e: print(f"Error during cleanup: {e}") def run_alembic_command(command: list, env: dict): """Helper function to run alembic CLI commands via subprocess.""" # Ensure alembic is callable, adjust path if needed (e.g., use .venv/bin/alembic) alembic_executable = PROJECT_ROOT / ".venv" / "bin" / "alembic" if not alembic_executable.exists(): # Fallback or error if venv structure is different pytest.fail(f"Alembic executable not found at {alembic_executable}") cmd = [str(alembic_executable)] + command print(f"\nRunning subprocess: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, env={**os.environ, **env}, # Merge OS env with test-specific env cwd=PROJECT_ROOT, # Run from project root where alembic.ini is check=False, # Don't raise exception on non-zero exit, check manually ) print(f"Subprocess stdout:\n{result.stdout}") print(f"Subprocess stderr:\n{result.stderr}") if result.returncode != 0: pytest.fail( f"Alembic command {' '.join(command)} failed with exit code {result.returncode}\nStderr: {result.stderr}" ) return result def test_alembic_config_uses_db_path_via_env(clean_env, temp_db_path): """Test that env.py logic picks up PAELLADOC_DB_PATH.""" os.environ["PAELLADOC_DB_PATH"] = str(temp_db_path) # Verify that get_db_path() returns the expected path # as this is what env.py uses to construct the URL. resolved_path = get_db_path() assert resolved_path == temp_db_path @pytest.mark.asyncio async def test_alembic_migrations_work_with_config(clean_env, temp_db_path): """Test that migrations work by running alembic upgrade via subprocess.""" test_env = {"PAELLADOC_DB_PATH": str(temp_db_path)} # Ensure the temporary database file exists before running Alembic if not temp_db_path.exists(): temp_db_path.touch() # Run alembic upgrade head in a subprocess run_alembic_command(["upgrade", "head"], env=test_env) # Verify migrations applied using an async engine # Need the actual URL alembic used (which comes from env var) db_url = f"sqlite+aiosqlite:///{temp_db_path}" engine = create_async_engine(db_url) try: async with engine.connect() as conn: # Define a sync function to get revision def get_rev_sync(sync_conn): # Need alembic config to find script directory cfg = Config("alembic.ini") # Load config to get script location migration_context = MigrationContext.configure( connection=sync_conn, opts={"script": ScriptDirectory.from_config(cfg)}, ) return migration_context.get_current_revision() # Run the sync function using run_sync current_rev = await conn.run_sync(get_rev_sync) # Get head revision directly from script directory cfg = Config("alembic.ini") script = ScriptDirectory.from_config(cfg) head_rev = script.get_current_head() assert current_rev is not None, "DB revision is None after upgrade." assert current_rev == head_rev, ( f"DB revision {current_rev} does not match head {head_rev}" ) finally: await engine.dispose() @pytest.mark.asyncio async def test_alembic_downgrade_works_with_config(clean_env, temp_db_path): """Test that downgrades work by running alembic via subprocess.""" test_env = {"PAELLADOC_DB_PATH": str(temp_db_path)} # Ensure the temporary database file exists before running Alembic if not temp_db_path.exists(): temp_db_path.touch() # Run migrations up first run_alembic_command(["upgrade", "head"], env=test_env) # Run migrations down run_alembic_command(["downgrade", "base"], env=test_env) # Verify database is at base (no revision) db_url = f"sqlite+aiosqlite:///{temp_db_path}" engine = create_async_engine(db_url) try: async with engine.connect() as conn: # Define a sync function to get revision def get_rev_sync(sync_conn): cfg = Config("alembic.ini") # Load config to get script location migration_context = MigrationContext.configure( connection=sync_conn, opts={"script": ScriptDirectory.from_config(cfg)}, ) return migration_context.get_current_revision() # Run the sync function using run_sync current_rev = await conn.run_sync(get_rev_sync) assert current_rev is None, ( f"Expected base revision (None), got {current_rev}" ) finally: await engine.dispose() def test_alembic_respects_environment_precedence(clean_env, temp_db_path): """Test that PAELLADOC_DB_PATH takes precedence over PAELLADOC_ENV.""" # Set both environment variables os.environ["PAELLADOC_DB_PATH"] = str(temp_db_path) os.environ["PAELLADOC_ENV"] = "development" # This should be ignored # Verify that get_db_path() returns the path from PAELLADOC_DB_PATH resolved_path = get_db_path() assert resolved_path == temp_db_path ================ File: tests/integration/adapters/plugins/core/test_paella.py ================ """ Integration tests for the core.paella plugin. """ import pytest import asyncio import sys import os from pathlib import Path import uuid # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) from paelladoc.domain.models.language import SupportedLanguage from paelladoc.adapters.plugins.core.paella import ( paella_init, paella_list, paella_select, ) from paelladoc.domain.models.project import ( ProjectInfo, # Import Metadata and rename ) # Adapter for verification from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # --- Pytest Fixture for Temporary DB --- # @pytest.fixture(scope="function") async def memory_adapter(): """Provides an initialized SQLiteMemoryAdapter with a temporary DB.""" test_db_name = f"test_paella_{uuid.uuid4()}.db" test_dir = Path(__file__).parent / "temp_dbs" test_db_path = test_dir / test_db_name test_db_path.parent.mkdir(parents=True, exist_ok=True) print(f"\nSetting up test with DB: {test_db_path}") adapter = SQLiteMemoryAdapter(db_path=test_db_path) await adapter._create_db_and_tables() yield adapter print(f"Tearing down test, removing DB: {test_db_path}") await asyncio.sleep(0.01) # Brief pause for file lock release try: if test_db_path.exists(): os.remove(test_db_path) print(f"Removed DB: {test_db_path}") try: test_db_path.parent.rmdir() print(f"Removed test directory: {test_db_path.parent}") except OSError: pass # Directory not empty, likely other tests running concurrently except Exception as e: print(f"Error during teardown removing {test_db_path}: {e}") # --- Test Cases --- # @pytest.mark.asyncio async def test_create_new_project_asks_for_base_path_and_saves_it( memory_adapter, monkeypatch, ): """ Verify the interactive flow for creating a new project: 1. Asks for interaction language. 2. Lists projects (if any) and asks action (create new). 3. Asks for documentation language. 4. Asks for new project name (checks for existence). 5. Asks for base path. 6. Creates the project, saves absolute base path, saves initial memory. """ print("\nRunning: test_create_new_project_asks_for_base_path_and_saves_it") interaction_lang = SupportedLanguage.EN_US.value doc_lang = SupportedLanguage.EN_US.value project_name = f"test-project-{uuid.uuid4()}" base_path_input = "./test_paella_docs" # Relative path input expected_abs_base_path = Path(base_path_input).resolve() # --- Monkeypatch the database path resolution --- # Patch get_db_path where SQLiteMemoryAdapter imports it, # so core_paella uses the temporary DB path when it creates its own adapter. monkeypatch.setattr( "paelladoc.adapters.output.sqlite.sqlite_memory_adapter.get_db_path", lambda: memory_adapter.db_path, # Return the path from the fixture ) # Initialize project init_result = await paella_init( base_path=base_path_input, documentation_language=doc_lang, interaction_language=interaction_lang, new_project_name=project_name, ) assert init_result["status"] == "ok" assert init_result["project_name"] == project_name assert init_result["base_path"] == str(expected_abs_base_path) # Clean up if expected_abs_base_path.exists(): import shutil shutil.rmtree(expected_abs_base_path) @pytest.mark.asyncio async def test_paella_workflow(): """Test the complete PAELLA workflow.""" # Test data project_name = f"test_project_{uuid.uuid4().hex[:8]}" base_path = f"docs/{project_name}" doc_language = SupportedLanguage.EN_US.value int_language = SupportedLanguage.EN_US.value # Initialize project init_result = await paella_init( base_path=base_path, documentation_language=doc_language, interaction_language=int_language, new_project_name=project_name, ) assert init_result["status"] == "ok" assert init_result["project_name"] == project_name assert init_result["base_path"] == str(Path(base_path).expanduser().resolve()) # List projects list_result = await paella_list() assert list_result["status"] == "ok" assert isinstance(list_result["projects"], list) # Extract names from ProjectInfo objects before checking membership project_names_list = [ info.name for info in list_result["projects"] if isinstance(info, ProjectInfo) ] assert project_name in project_names_list # Select project select_result = await paella_select(project_name=project_name) assert select_result["status"] == "ok" assert select_result["project_name"] == project_name # Clean up project_dir = Path(base_path) if project_dir.exists(): import shutil shutil.rmtree(project_dir) ================ File: tests/integration/adapters/plugins/core/test_list_projects.py ================ """ Integration tests for the core.list_projects plugin. """ import pytest import asyncio import sys import os from pathlib import Path import uuid # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) # Adapter is needed to pre-populate the DB for the test from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # Import domain models to create test data from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, Bucket, ArtifactMeta, ) from paelladoc.domain.models.language import SupportedLanguage # --- Helper Function to create test data --- # def _create_sample_memory(name_suffix: str) -> ProjectMemory: """Helper to create a sample ProjectMemory object.""" project_name = f"test-project-{name_suffix}-{uuid.uuid4()}" # Add a dummy artifact to make it valid artifact = ArtifactMeta( name="dummy.md", bucket=Bucket.UNKNOWN, path=Path("dummy.md") ) memory = ProjectMemory( project_info=ProjectInfo( name=project_name, interaction_language=SupportedLanguage.EN_US, documentation_language=SupportedLanguage.EN_US, base_path=Path(f"./docs/{project_name}").resolve(), purpose="testing list projects", target_audience="devs", objectives=["test list"], ), artifacts={Bucket.UNKNOWN: [artifact]}, taxonomy_version="0.5", ) return memory # --- Pytest Fixture for Temporary DB (copied from test_paella) --- # @pytest.fixture(scope="function") async def memory_adapter(): """Provides an initialized SQLiteMemoryAdapter with a temporary DB.""" test_db_name = f"test_list_projects_{uuid.uuid4()}.db" test_dir = Path(__file__).parent / "temp_dbs_list" test_db_path = test_dir / test_db_name test_db_path.parent.mkdir(parents=True, exist_ok=True) print(f"\nSetting up test with DB: {test_db_path}") adapter = SQLiteMemoryAdapter(db_path=test_db_path) await adapter._create_db_and_tables() yield adapter # Provide the adapter to the test function # Teardown print(f"Tearing down test, removing DB: {test_db_path}") await asyncio.sleep(0.01) try: if test_db_path.exists(): os.remove(test_db_path) print(f"Removed DB: {test_db_path}") try: test_db_path.parent.rmdir() print(f"Removed test directory: {test_db_path.parent}") except OSError: pass except Exception as e: print(f"Error during teardown removing {test_db_path}: {e}") # --- Test Case --- # @pytest.mark.asyncio async def test_list_projects_returns_saved_projects( memory_adapter: SQLiteMemoryAdapter, ): """ Verify that core.list_projects correctly lists projects previously saved. THIS TEST WILL FAIL until the tool and adapter method are implemented. """ print("\nRunning: test_list_projects_returns_saved_projects") # Arrange: Save some projects directly using the adapter project1_memory = _create_sample_memory("list1") project2_memory = _create_sample_memory("list2") await memory_adapter.save_memory(project1_memory) await memory_adapter.save_memory(project2_memory) expected_project_names = sorted( [project1_memory.project_info.name, project2_memory.project_info.name] ) print(f"Saved projects: {expected_project_names}") # Act: Call the tool function with our test db_path from paelladoc.adapters.plugins.core.list_projects import list_projects # Pass the path to our temporary test database db_path_str = str(memory_adapter.db_path) print(f"Using test DB path: {db_path_str}") result = await list_projects(db_path=db_path_str) # Assert: Check the response assert result["status"] == "ok", f"Expected status ok, got {result.get('status')}" assert "projects" in result, "Response missing 'projects' key" assert isinstance(result["projects"], list), "'projects' should be a list" # Extract names from the ProjectInfo objects returned by the plugin returned_project_names = sorted( [info.name for info in result["projects"] if isinstance(info, ProjectInfo)] ) # Compare the sorted list of names assert returned_project_names == expected_project_names, ( f"Expected project names {expected_project_names}, but got {returned_project_names}" ) ================ File: tests/integration/adapters/output/test_sqlite_memory_adapter_config.py ================ """Integration tests for SQLite adapter configuration.""" import os import pytest import asyncio from pathlib import Path import uuid from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, ) @pytest.fixture def clean_env(): """Remove relevant environment variables before each test.""" original_db_path = os.environ.get("PAELLADOC_DB_PATH") original_env = os.environ.get("PAELLADOC_ENV") if "PAELLADOC_DB_PATH" in os.environ: del os.environ["PAELLADOC_DB_PATH"] if "PAELLADOC_ENV" in os.environ: del os.environ["PAELLADOC_ENV"] yield if original_db_path is not None: os.environ["PAELLADOC_DB_PATH"] = original_db_path if original_env is not None: os.environ["PAELLADOC_ENV"] = original_env @pytest.fixture async def temp_adapter(): """Create a temporary adapter with a unique database.""" test_db_name = f"test_config_{uuid.uuid4()}.db" test_dir = Path(__file__).parent / "temp_dbs" test_db_path = test_dir / test_db_name test_db_path.parent.mkdir(parents=True, exist_ok=True) adapter = SQLiteMemoryAdapter(db_path=test_db_path) await adapter._create_db_and_tables() yield adapter # Cleanup await asyncio.sleep(0.01) # Brief pause for file lock release try: if test_db_path.exists(): os.remove(test_db_path) test_db_path.parent.rmdir() except Exception as e: print(f"Error during cleanup: {e}") @pytest.mark.asyncio async def test_adapter_uses_custom_path(clean_env): """Verify adapter uses the path provided in __init__.""" custom_path = create_temp_db_path() adapter = SQLiteMemoryAdapter(db_path=custom_path) assert adapter.db_path == custom_path # Clean up the test file if it was created if custom_path.exists(): os.remove(custom_path) @pytest.mark.asyncio async def test_adapter_uses_env_var_path(clean_env): """Verify adapter uses PAELLADOC_DB_PATH environment variable if set.""" env_path = create_temp_db_path() os.environ["PAELLADOC_DB_PATH"] = str(env_path) adapter = SQLiteMemoryAdapter() # No path given, should use env var assert adapter.db_path == env_path if env_path.exists(): os.remove(env_path) @pytest.mark.asyncio async def test_adapter_uses_production_path(clean_env): """Verify adapter uses PRODUCTION_DB_PATH by default.""" # Ensure no env vars are set that override the default os.environ.pop("PAELLADOC_DB_PATH", None) os.environ.pop("PAELLADOC_ENV", None) adapter = SQLiteMemoryAdapter() expected_path = Path.home() / ".paelladoc" / "memory.db" # Get default directly assert adapter.db_path == expected_path @pytest.mark.asyncio async def test_adapter_creates_parent_directory(clean_env): """Verify the adapter ensures the parent directory for the DB exists.""" test_subdir = Path.home() / ".paelladoc_test_dir" / str(uuid.uuid4()) custom_path = test_subdir / "test_creation.db" # Ensure the directory does not exist initially if test_subdir.exists(): for item in test_subdir.iterdir(): # Clear if exists os.remove(item) os.rmdir(test_subdir) assert not test_subdir.exists() # The adapter instantiation triggers the directory creation _ = SQLiteMemoryAdapter(db_path=custom_path) # Assign to _ as intentionally unused # Initialization should create the parent directory assert test_subdir.exists() assert test_subdir.is_dir() # Clean up if custom_path.exists(): os.remove(custom_path) if test_subdir.exists(): os.rmdir(test_subdir) @pytest.mark.asyncio async def test_adapter_operations_with_custom_path(temp_adapter): """Test basic adapter operations with custom path.""" # Create test project project = ProjectMemory( project_info=ProjectInfo( name=f"test-project-{uuid.uuid4()}", language="python", purpose="Test project", target_audience="Developers", objectives=["Test database configuration"], ) ) # Test operations await temp_adapter.save_memory(project) assert await temp_adapter.project_exists(project.project_info.name) loaded = await temp_adapter.load_memory(project.project_info.name) assert loaded is not None assert loaded.project_info.name == project.project_info.name projects_info = await temp_adapter.list_projects() # Extract names from the returned ProjectInfo objects project_names = [info.name for info in projects_info] assert project.project_info.name in project_names # Helper function to create a unique temporary DB path def create_temp_db_path(prefix="test_adapter_config") -> Path: test_db_name = f"{prefix}_{uuid.uuid4()}.db" # Use /tmp or a similar temporary directory standard across systems test_db_path = Path("/tmp") / test_db_name # test_db_path.parent.mkdir(parents=True, exist_ok=True) # /tmp should exist print(f"\nGenerated temporary DB path: {test_db_path}") return test_db_path ================ File: tests/integration/adapters/output/test_sqlite_memory_adapter.py ================ """ Integration tests for the SQLiteMemoryAdapter. """ import pytest # Use pytest import asyncio import sys import os from pathlib import Path import uuid import datetime from typing import Dict, List # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) # Module to test from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter # Import updated domain models from paelladoc.domain.models.project import ( ProjectMemory, ProjectInfo, ArtifactMeta, DocumentStatus, Bucket, ) # --- Pytest Fixture for Temporary DB --- # @pytest.fixture(scope="function") # Recreate DB for each test function async def memory_adapter(): """Provides an initialized SQLiteMemoryAdapter with a temporary DB.""" test_db_name = f"test_memory_{uuid.uuid4()}.db" test_db_path = Path("./temp_test_dbs") / test_db_name test_db_path.parent.mkdir(parents=True, exist_ok=True) print(f"\nSetting up test with DB: {test_db_path}") adapter = SQLiteMemoryAdapter(db_path=test_db_path) await adapter._create_db_and_tables() yield adapter # Provide the adapter to the test function # Teardown: clean up the database print(f"Tearing down test, removing DB: {test_db_path}") # Dispose engine if needed # await adapter.async_engine.dispose() await asyncio.sleep(0.01) try: if test_db_path.exists(): os.remove(test_db_path) print(f"Removed DB: {test_db_path}") try: test_db_path.parent.rmdir() print(f"Removed test directory: {test_db_path.parent}") except OSError: pass # Directory not empty or other issue except Exception as e: print(f"Error during teardown removing {test_db_path}: {e}") # --- Helper Function --- # def _create_sample_memory(name_suffix: str) -> ProjectMemory: """Helper to create a sample ProjectMemory object with Artifacts.""" project_name = f"test-project-{name_suffix}" # Create sample artifacts artifact1 = ArtifactMeta( name="README", bucket=Bucket.INITIATE_INITIAL_PRODUCT_DOCS, path=Path("README.md"), status=DocumentStatus.PENDING, ) artifact2 = ArtifactMeta( name="main.py generation script", bucket=Bucket.GENERATE_SUPPORTING_ELEMENTS, path=Path("scripts/generate_main.py"), status=DocumentStatus.IN_PROGRESS, ) artifacts: Dict[Bucket, List[ArtifactMeta]] = { Bucket.INITIATE_INITIAL_PRODUCT_DOCS: [artifact1], Bucket.GENERATE_SUPPORTING_ELEMENTS: [artifact2], } memory = ProjectMemory( project_info=ProjectInfo( name=project_name, language="python", purpose="testing adapter v2", target_audience="devs", objectives=["test save artifacts", "test load artifacts"], ), artifacts=artifacts, taxonomy_version="0.5", ) return memory # --- Test Cases (using pytest and pytest-asyncio) --- # @pytest.mark.asyncio async def test_project_exists_on_empty_db(memory_adapter: SQLiteMemoryAdapter): """Test project_exists returns False when the DB is empty/project not saved.""" print("Running: test_project_exists_on_empty_db") exists = await memory_adapter.project_exists("nonexistent-project") assert not exists @pytest.mark.asyncio async def test_load_memory_on_empty_db(memory_adapter: SQLiteMemoryAdapter): """Test load_memory returns None when the DB is empty/project not saved.""" print("Running: test_load_memory_on_empty_db") loaded_memory = await memory_adapter.load_memory("nonexistent-project") assert loaded_memory is None @pytest.mark.asyncio async def test_save_and_load_new_project(memory_adapter: SQLiteMemoryAdapter): """Test saving a new project with artifacts and loading it back.""" print("Running: test_save_and_load_new_project") original_memory = _create_sample_memory("save-load-artifacts") project_name = original_memory.project_info.name original_artifacts = original_memory.artifacts artifact1_id = original_artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS][0].id artifact2_id = original_artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS][0].id # Save await memory_adapter.save_memory(original_memory) print(f"Saved project: {project_name}") # Load loaded_memory = await memory_adapter.load_memory(project_name) print(f"Loaded project: {project_name}") # Assertions assert loaded_memory is not None assert loaded_memory.project_info.name == original_memory.project_info.name assert loaded_memory.project_info.language == original_memory.project_info.language assert ( loaded_memory.project_info.objectives == original_memory.project_info.objectives ) assert loaded_memory.taxonomy_version == original_memory.taxonomy_version # Check artifacts dictionary structure # Note: If the adapter pads with empty buckets, adjust this check # For now, assume only buckets with artifacts are loaded assert Bucket.INITIATE_INITIAL_PRODUCT_DOCS in loaded_memory.artifacts assert Bucket.GENERATE_SUPPORTING_ELEMENTS in loaded_memory.artifacts assert len(loaded_memory.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS]) == 1 assert len(loaded_memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS]) == 1 # assert len(loaded_memory.artifacts[Bucket.DEPLOY_SECURITY]) == 0 # Check depends on adapter behavior # Check artifact details loaded_artifact1 = loaded_memory.get_artifact_by_path(Path("README.md")) assert loaded_artifact1 is not None assert loaded_artifact1.id == artifact1_id assert loaded_artifact1.name == "README" assert loaded_artifact1.bucket == Bucket.INITIATE_INITIAL_PRODUCT_DOCS assert loaded_artifact1.status == DocumentStatus.PENDING loaded_artifact2 = loaded_memory.get_artifact_by_path( Path("scripts/generate_main.py") ) assert loaded_artifact2 is not None assert loaded_artifact2.id == artifact2_id assert loaded_artifact2.name == "main.py generation script" assert loaded_artifact2.bucket == Bucket.GENERATE_SUPPORTING_ELEMENTS assert loaded_artifact2.status == DocumentStatus.IN_PROGRESS # Check timestamps - don't compare exact values since they'll be different due to persistence/mocking # Just verify that created_at is a valid UTC timestamp assert loaded_memory.created_at.tzinfo == datetime.timezone.utc assert isinstance(loaded_memory.created_at, datetime.datetime) assert isinstance(loaded_memory.last_updated_at, datetime.datetime) # Verify the loaded timestamps are in a reasonable range # Current time should be >= last_updated_at assert datetime.datetime.now(datetime.timezone.utc) >= loaded_memory.last_updated_at @pytest.mark.asyncio async def test_project_exists_after_save(memory_adapter: SQLiteMemoryAdapter): """Test project_exists returns True after a project is saved.""" print("Running: test_project_exists_after_save") memory_to_save = _create_sample_memory("exists-artifacts") project_name = memory_to_save.project_info.name await memory_adapter.save_memory(memory_to_save) print(f"Saved project: {project_name}") exists = await memory_adapter.project_exists(project_name) assert exists @pytest.mark.asyncio async def test_save_updates_project(memory_adapter: SQLiteMemoryAdapter): """Test saving updates: changing artifact status, adding, removing.""" print("Running: test_save_updates_project") # 1. Create and save initial state memory = _create_sample_memory("update-artifacts") project_name = memory.project_info.name artifact1 = memory.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS][0] # artifact2 = memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS][0] # No need to store if removing await memory_adapter.save_memory(memory) print(f"Initial save for {project_name}") # 2. Modify the domain object artifact1.update_status(DocumentStatus.COMPLETED) artifact3 = ArtifactMeta( name="Deployment Script", bucket=Bucket.DEPLOY_PIPELINES_AND_AUTOMATION, path=Path("deploy.sh"), ) # Add artifact3 - ensure bucket exists in dict first if artifact3.bucket not in memory.artifacts: memory.artifacts[artifact3.bucket] = [] memory.artifacts[artifact3.bucket].append(artifact3) # Remove artifact2 - remove the list if it becomes empty del memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS][0] if not memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS]: del memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS] # 3. Save the updated memory await memory_adapter.save_memory(memory) print(f"Saved updates for {project_name}") # 4. Load and verify loaded_memory = await memory_adapter.load_memory(project_name) assert loaded_memory is not None # Verify artifact1 status updated loaded_artifact1 = loaded_memory.get_artifact_by_path(Path("README.md")) assert loaded_artifact1 is not None assert loaded_artifact1.status == DocumentStatus.COMPLETED assert loaded_artifact1.id == artifact1.id # Verify artifact2 removed loaded_artifact2 = loaded_memory.get_artifact_by_path( Path("scripts/generate_main.py") ) assert loaded_artifact2 is None assert not loaded_memory.artifacts.get(Bucket.GENERATE_SUPPORTING_ELEMENTS) # Verify artifact3 added loaded_artifact3 = loaded_memory.get_artifact_by_path(Path("deploy.sh")) assert loaded_artifact3 is not None assert loaded_artifact3.name == "Deployment Script" assert loaded_artifact3.bucket == Bucket.DEPLOY_PIPELINES_AND_AUTOMATION assert loaded_artifact3.status == DocumentStatus.PENDING assert loaded_artifact3.id == artifact3.id # Run tests if executed directly (optional, better via test runner) # if __name__ == "__main__": # # Consider using asyncio.run() if needed for top-level execution # unittest.main() ================ File: tests/integration/adapters/output/test_chroma_vector_store_adapter.py ================ """ Integration tests for the ChromaVectorStoreAdapter. """ import unittest import asyncio import sys from pathlib import Path import uuid # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) # Module to test from paelladoc.adapters.output.chroma.chroma_vector_store_adapter import ( ChromaVectorStoreAdapter, NotFoundError, ) from paelladoc.ports.output.vector_store_port import SearchResult # Import base class # Import Chroma specific types for assertions if needed from chromadb.api.models.Collection import Collection class TestChromaVectorStoreAdapterIntegration(unittest.IsolatedAsyncioTestCase): """Integration tests using an in-memory ChromaDB client.""" def setUp(self): """Set up an in-memory Chroma client and a unique collection name.""" print("\nSetting up test...") self.adapter = ChromaVectorStoreAdapter(in_memory=True) # Generate a unique collection name for each test to ensure isolation self.collection_name = f"test_collection_{uuid.uuid4()}" print(f"Using collection name: {self.collection_name}") async def asyncTearDown(self): """Attempt to clean up the test collection.""" print( f"Tearing down test, attempting to delete collection: {self.collection_name}" ) try: # Use the adapter's method to delete await self.adapter.delete_collection(self.collection_name) print(f"Deleted collection: {self.collection_name}") except Exception as e: # Log error if deletion fails, but don't fail the test run print( f"Error during teardown deleting collection {self.collection_name}: {e}" ) # We can also try listing collections to see if it exists try: collections = self.adapter.client.list_collections() collection_names = [col.name for col in collections] if self.collection_name in collection_names: print( f"Collection {self.collection_name} still exists after teardown attempt." ) else: print( f"Collection {self.collection_name} confirmed deleted or never existed." ) except Exception as list_e: print(f"Error listing collections during teardown check: {list_e}") # --- Test Cases --- # async def test_get_or_create_collection_creates_new(self): """Test that a new collection is created if it doesn't exist.""" print(f"Running: {self._testMethodName}") collection = await self.adapter.get_or_create_collection(self.collection_name) self.assertIsInstance(collection, Collection) self.assertEqual(collection.name, self.collection_name) # Verify it exists in the client collections = self.adapter.client.list_collections() collection_names = [col.name for col in collections] self.assertIn(self.collection_name, collection_names) async def test_get_or_create_collection_retrieves_existing(self): """Test that an existing collection is retrieved.""" print(f"Running: {self._testMethodName}") # Create it first collection1 = await self.adapter.get_or_create_collection(self.collection_name) self.assertIsNotNone(collection1) # Get it again collection2 = await self.adapter.get_or_create_collection(self.collection_name) self.assertIsInstance(collection2, Collection) self.assertEqual(collection2.name, self.collection_name) # Check they are likely the same underlying collection (same ID) self.assertEqual(collection1.id, collection2.id) async def test_add_documents(self): """Test adding documents to a collection.""" print(f"Running: {self._testMethodName}") docs_to_add = ["doc one text", "doc two text"] metadatas = [{"source": "test1"}, {"source": "test2"}] ids = ["id1", "id2"] returned_ids = await self.adapter.add_documents( self.collection_name, docs_to_add, metadatas, ids ) self.assertEqual(returned_ids, ids) # Verify documents were added using the underlying client API collection = await self.adapter.get_or_create_collection(self.collection_name) results = collection.get(ids=ids, include=["metadatas", "documents"]) self.assertIsNotNone(results) self.assertListEqual(results["ids"], ids) self.assertListEqual(results["documents"], docs_to_add) self.assertListEqual(results["metadatas"], metadatas) self.assertEqual(collection.count(), 2) async def test_add_documents_without_ids(self): """Test adding documents letting Chroma generate IDs.""" print(f"Running: {self._testMethodName}") docs_to_add = ["auto id doc 1", "auto id doc 2"] metadatas = [{"type": "auto"}, {"type": "auto"}] returned_ids = await self.adapter.add_documents( self.collection_name, docs_to_add, metadatas ) self.assertEqual(len(returned_ids), 2) self.assertIsInstance(returned_ids[0], str) self.assertIsInstance(returned_ids[1], str) # Verify using the returned IDs collection = await self.adapter.get_or_create_collection(self.collection_name) results = collection.get(ids=returned_ids, include=["metadatas", "documents"]) self.assertIsNotNone(results) self.assertCountEqual( results["ids"], returned_ids ) # Order might not be guaranteed? self.assertCountEqual(results["documents"], docs_to_add) self.assertCountEqual(results["metadatas"], metadatas) self.assertEqual(collection.count(), 2) async def test_delete_collection(self): """Test deleting a collection.""" print(f"Running: {self._testMethodName}") # Create it first await self.adapter.get_or_create_collection(self.collection_name) # Verify it exists collections_before = self.adapter.client.list_collections() self.assertIn(self.collection_name, [c.name for c in collections_before]) # Delete it using the adapter await self.adapter.delete_collection(self.collection_name) # Verify it's gone collections_after = self.adapter.client.list_collections() self.assertNotIn(self.collection_name, [c.name for c in collections_after]) # Attempting to get it should now raise NotFoundError or ValueError (depending on Chroma version) with self.assertRaises((NotFoundError, ValueError)): self.adapter.client.get_collection(name=self.collection_name) async def _add_sample_search_data(self): """Helper to add some consistent data for search tests.""" docs = [ "This is the first document about apples.", "This document discusses oranges and citrus.", "A third document, focusing on bananas.", "Another apple document for testing similarity.", ] metadatas = [ {"source": "doc1", "type": "fruit", "year": 2023}, {"source": "doc2", "type": "fruit", "year": 2024}, {"source": "doc3", "type": "fruit", "year": 2023}, {"source": "doc4", "type": "fruit", "year": 2024}, ] ids = ["s_id1", "s_id2", "s_id3", "s_id4"] await self.adapter.add_documents(self.collection_name, docs, metadatas, ids) print(f"Added sample search data to collection: {self.collection_name}") # Short delay to allow potential indexing if needed (though likely not for in-memory) await asyncio.sleep(0.1) async def test_search_simple(self): """Test basic similarity search.""" print(f"Running: {self._testMethodName}") await self._add_sample_search_data() query = "Tell me about apples" results = await self.adapter.search_similar( self.collection_name, [query], n_results=2 ) self.assertEqual(len(results), 1) # One list for the single query self.assertEqual(len(results[0]), 2) # Two results requested # Check the content of the results (order might vary based on embedding similarity) result_docs = [r.document for r in results[0]] self.assertIn("This is the first document about apples.", result_docs) self.assertIn("Another apple document for testing similarity.", result_docs) # Check metadata and ID are included first_result = results[0][0] self.assertIsInstance(first_result, SearchResult) self.assertIsNotNone(first_result.id) self.assertIsNotNone(first_result.metadata) self.assertIsNotNone(first_result.distance) async def test_search_with_metadata_filter(self): """Test search with a 'where' clause for metadata filtering.""" print(f"Running: {self._testMethodName}") await self._add_sample_search_data() query = "Tell me about fruit" # Filter for documents from year 2023 where_filter = {"year": 2023} results = await self.adapter.search_similar( self.collection_name, [query], n_results=3, where=where_filter ) self.assertEqual(len(results), 1) # Should only find doc1 and doc3 from year 2023 self.assertLessEqual( len(results[0]), 2 ) # Might return fewer than n_results if filter is strict # Corrected: Access metadata via r.metadata, not r.project_info returned_sources = [r.metadata.get("source") for r in results[0] if r.metadata] # We expect only doc1 and doc3 from year 2023 expected_sources = ["doc1", "doc3"] self.assertCountEqual(returned_sources, expected_sources) async def test_search_no_results(self): """Test search for text unrelated to the documents.""" print(f"Running: {self._testMethodName}") await self._add_sample_search_data() query = "Information about programming languages" results = await self.adapter.search_similar( self.collection_name, [query], n_results=1 ) self.assertEqual(len(results), 1) # Depending on the embedding model, might still return *something* even if very dissimilar. # A more robust test might check the distance if available. # For now, let's assume it might return the closest, even if irrelevant, or empty. # If it returns results, ensure they are SearchResult instances if results[0]: self.assertIsInstance(results[0][0], SearchResult) else: self.assertEqual(len(results[0]), 0) # Or assert empty list async def test_search_in_nonexistent_collection(self): """Test search returns empty list if collection doesn't exist.""" print(f"Running: {self._testMethodName}") query = "anything" results = await self.adapter.search_similar( "nonexistent_collection_for_search", [query], n_results=1 ) self.assertEqual(len(results), 1) # Still returns a list for the query self.assertEqual(len(results[0]), 0) # But the inner list is empty # if __name__ == "__main__": # unittest.main() ================ File: tests/e2e/test_cursor_simulation.py ================ """ End-to-End tests for Paelladoc MCP Server. This simulates how Cursor would interact with the server. """ import unittest import sys from pathlib import Path # Ensure we can import Paelladoc modules project_root = Path(__file__).parent.parent.parent.absolute() if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) # Import directly from the domain layer from paelladoc.domain.core_logic import mcp, ping class TestCursorE2E(unittest.TestCase): """End-to-End tests simulating Cursor interacting with Paelladoc.""" def test_direct_ping_call(self): """Test direct call to the ping function.""" # Call the ping function directly result = ping() # Verify the result self.assertIsInstance(result, dict, "Ping should return a dict") self.assertEqual(result["status"], "ok", "Status should be 'ok'") self.assertEqual(result["message"], "pong", "Message should be 'pong'") def test_ping_with_parameter(self): """Test ping function with a parameter.""" # Call ping with a test parameter result = ping(random_string="test-parameter") # Verify the result self.assertIsInstance(result, dict, "Ping should return a dict") self.assertEqual(result["status"], "ok", "Status should be 'ok'") self.assertEqual(result["message"], "pong", "Message should be 'pong'") def test_mcp_tool_registration(self): """Verify that the ping tool is registered with MCP.""" # Get tools registered with MCP tool_manager = getattr(mcp, "_tool_manager", None) self.assertIsNotNone(tool_manager, "MCP should have a tool manager") tools = tool_manager.list_tools() # Check if the ping tool is registered tool_names = [tool.name for tool in tools] self.assertIn("ping", tool_names, "Ping tool should be registered with MCP") if __name__ == "__main__": unittest.main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jlcases/paelladoc'

If you have feedback or need assistance with the MCP directory API, please join our Discord server