repopackpy-output.txt•266 kB
================================================================
RepopackPy Output File
================================================================
This file was generated by RepopackPy on: 2025-04-23T04:47:53.216367
Purpose:
--------
This file contains a packed representation of the entire repository's contents.
It is designed to be easily consumable by AI systems for analysis, code review,
or other automated processes.
File Format:
------------
The content is organized as follows:
1. This header section
2. Repository structure
3. Multiple file entries, each consisting of:
a. A separator line (================)
b. The file path (File: path/to/file)
c. Another separator line
d. The full contents of the file
e. A blank line
Usage Guidelines:
-----------------
1. This file should be treated as read-only. Any changes should be made to the
original repository files, not this packed version.
2. When processing this file, use the separators and "File:" markers to
distinguish between different files in the repository.
3. Be aware that this file may contain sensitive information. Handle it with
the same level of security as you would the original repository.
Notes:
------
- Some files may have been excluded based on .gitignore rules and RepopackPy's
configuration.
- Binary files are not included in this packed representation. Please refer to
the Repository Structure section for a complete list of file paths, including
binary files.
For more information about RepopackPy, visit: https://github.com/abinthomasonline/repopack-py
================================================================
Repository Structure
================================================================
paelladoc/
adapters/
input/
__init__.py
output/
chroma/
chroma_vector_store_adapter.py
filesystem/
taxonomy_provider.py
sqlite/
db_models.py
mapper.py
models.py
sqlite_memory_adapter.py
__init__.py
persistence/
__init__.py
plugins/
code/
__init__.py
code_generation.py
generate_context.py
generate_doc.py
core/
__init__.py
continue_proj.py
help.py
list_projects.py
paella.py
verification.py
memory/
__init__.py
project_memory.py
product/
__init__.py
product_management.py
styles/
__init__.py
coding_styles.py
git_workflows.py
templates/
__init__.py
templates.py
__init__.py
services/
system_time_service.py
__init__.py
application/
services/
memory_service.py
vector_store_service.py
utils/
behavior_enforcer.py
__init__.py
config/
database.py
domain/
models/
enums.py
fix_metadata.py
language.py
project.py
services/
time_service.py
__init__.py
core_logic.py
infrastructure/
__init__.py
ports/
input/
__init__.py
mcp_port.py
mcp_server_adapter.py
output/
__init__.py
memory_port.py
taxonomy_provider.py
vector_store_port.py
__init__.py
__init__.py
tests/
e2e/
test_cursor_simulation.py
integration/
adapters/
output/
test_chroma_vector_store_adapter.py
test_sqlite_memory_adapter.py
test_sqlite_memory_adapter_config.py
plugins/
core/
test_list_projects.py
test_paella.py
test_alembic_config.py
test_server.py
unit/
application/
services/
test_memory_service.py
test_vector_store_service.py
utils/
test_behavior_enforcer.py
config/
test_database.py
domain/
models/
test_project.py
test_ping_tool.py
README.md
conftest.py
update_test_references.py
================================================================
Repository Files
================================================================
================
File: update_test_references.py
================
#!/usr/bin/env python3
import os
import re
def update_references_in_file(file_path):
with open(file_path, "r") as f:
content = f.read()
# 1. Actualizar referencias a ProjectMetadata por ProjectInfo
content = re.sub(r"Metadata as ProjectMetadata", "ProjectInfo", content)
content = re.sub(r"ProjectMetadata", "ProjectInfo", content)
# 2. Actualizar referencias a metadata por project_info
content = re.sub(r"\.metadata\.", ".project_info.", content)
content = re.sub(r"memory\.metadata", "memory.project_info", content)
content = re.sub(
r"original_memory\.metadata", "original_memory.project_info", content
)
content = re.sub(r"project\.metadata", "project.project_info", content)
with open(file_path, "w") as f:
f.write(content)
print(f"Actualizado: {file_path}")
def find_and_update_test_files(directory):
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
update_references_in_file(file_path)
if __name__ == "__main__":
find_and_update_test_files("tests")
# También actualizar los adaptadores
find_and_update_test_files("paelladoc/adapters")
================
File: paelladoc/config/database.py
================
"""Database configuration module."""
import os
from pathlib import Path
import json
import logging
logger = logging.getLogger(__name__)
CONFIG_FILE_NAME = "paelladoc_config.json"
def get_project_root() -> Path:
"""Get the project root directory."""
return Path(__file__).parent.parent.parent.parent
def get_config_file() -> Path:
"""Get the path to the configuration file."""
# Check multiple locations in order of precedence
possible_locations = [
Path.cwd() / CONFIG_FILE_NAME, # Current directory (development)
Path.home() / ".paelladoc" / CONFIG_FILE_NAME, # User's home directory
Path("/etc/paelladoc") / CONFIG_FILE_NAME, # System-wide configuration
]
for location in possible_locations:
if location.exists():
return location
# If no config file exists, use the default in user's home
default_location = Path.home() / ".paelladoc" / CONFIG_FILE_NAME
default_location.parent.mkdir(parents=True, exist_ok=True)
if not default_location.exists():
default_config = {
"db_path": str(Path.home() / ".paelladoc" / "memory.db"),
"environment": "production",
}
with open(default_location, "w") as f:
json.dump(default_config, f, indent=2)
return default_location
def get_db_path() -> Path:
"""
Get the database path based on multiple configuration sources.
Priority:
1. PAELLADOC_DB_PATH environment variable if set
2. Path specified in configuration file
3. Default path in user's home directory (~/.paelladoc/memory.db)
The configuration can be set during package installation with:
pip install paelladoc --install-option="--db-path=/path/to/db"
Or by editing the config file at:
- ./paelladoc_config.json (development)
- ~/.paelladoc/paelladoc_config.json (user)
- /etc/paelladoc/paelladoc_config.json (system)
"""
# 1. Check environment variable first (highest priority)
env_path = os.getenv("PAELLADOC_DB_PATH")
if env_path:
db_path = Path(env_path)
logger.info(f"Using database path from environment variable: {db_path}")
return db_path
# 2. Check configuration file
config_file = get_config_file()
try:
with open(config_file) as f:
config = json.load(f)
if "db_path" in config:
db_path = Path(config["db_path"])
logger.info(
f"Using database path from config file {config_file}: {db_path}"
)
return db_path
except Exception as e:
logger.warning(f"Error reading config file {config_file}: {e}")
# 3. Default to production path in user's home
db_path = Path.home() / ".paelladoc" / "memory.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Using default database path: {db_path}")
return db_path
def set_db_path(path: str | Path) -> None:
"""
Set the database path in the configuration file.
This can be used programmatically or during package installation.
"""
config_file = get_config_file()
try:
if config_file.exists():
with open(config_file) as f:
config = json.load(f)
else:
config = {}
config["db_path"] = str(Path(path).resolve())
with open(config_file, "w") as f:
json.dump(config, f, indent=2)
logger.info(f"Updated database path in {config_file} to: {path}")
except Exception as e:
logger.error(f"Error updating database path in config file: {e}")
raise
# Default paths for reference (These might become less relevant or just informative)
# DEVELOPMENT_DB_PATH = get_project_root() / "paelladoc_memory.db"
PRODUCTION_DB_PATH = Path.home() / ".paelladoc" / "memory.db"
DEFAULT_DB_PATH = get_db_path()
================
File: paelladoc/adapters/plugins/__init__.py
================
import pkgutil
import importlib
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Dynamically import all submodules (like core, code, styles, etc.)
# This ensures their __init__.py files are executed, which should in turn
# import the actual plugin files containing @mcp.tool decorators.
package_path = str(Path(__file__).parent)
package_name = __name__
logger.info(f"Dynamically loading plugins from: {package_path}")
for module_info in pkgutil.iter_modules([package_path]):
if module_info.ispkg: # Only import potential packages (directories)
sub_package_name = f"{package_name}.{module_info.name}"
try:
importlib.import_module(sub_package_name)
logger.debug(f"Successfully imported plugin package: {sub_package_name}")
except Exception as e:
logger.warning(f"Could not import plugin package {sub_package_name}: {e}")
logger.info("Finished dynamic plugin package loading.")
================
File: paelladoc/adapters/plugins/core/verification.py
================
from paelladoc.domain.core_logic import mcp, logger
from typing import Dict, Any
from paelladoc.domain.models.project import ProjectMemory
# Domain models
from paelladoc.domain.models.project import (
DocumentStatus,
Bucket,
)
# Adapter for persistence
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# Adapter for taxonomy loading
from paelladoc.adapters.output.filesystem.taxonomy_provider import (
FileSystemTaxonomyProvider,
)
# Behavior configuration
BEHAVIOR_CONFIG = {
"check_mece_coverage": True,
"enforce_documentation_first": True,
"block_code_generation_if_incomplete": True,
"minimum_coverage_threshold": 0.7, # 70% minimum coverage (default, can be overridden)
"taxonomy_version_check": True,
}
# Instantiate the taxonomy provider
# TODO: Replace direct instantiation with Dependency Injection
TAXONOMY_PROVIDER = FileSystemTaxonomyProvider()
def validate_mece_structure(memory: ProjectMemory) -> dict:
"""Validates the MECE taxonomy structure of a project against available taxonomies."""
validation = {
"is_valid": True,
"missing_dimensions": [],
"invalid_combinations": [],
"warnings": [],
}
# Get available taxonomies from the provider
try:
valid_taxonomies = TAXONOMY_PROVIDER.get_available_taxonomies()
except Exception as e:
logger.error(f"Failed to load taxonomies for validation: {e}", exc_info=True)
validation["warnings"].append(
"Could not load taxonomy definitions for validation."
)
# Mark as invalid if we can't load definitions?
validation["is_valid"] = False
return validation
# Check required dimensions
if not memory.platform_taxonomy:
validation["missing_dimensions"].append("platform")
elif memory.platform_taxonomy not in valid_taxonomies.get("platform", []):
validation["invalid_combinations"].append(
f"Invalid platform taxonomy: {memory.platform_taxonomy}"
)
if not memory.domain_taxonomy:
validation["missing_dimensions"].append("domain")
elif memory.domain_taxonomy not in valid_taxonomies.get("domain", []):
validation["invalid_combinations"].append(
f"Invalid domain taxonomy: {memory.domain_taxonomy}"
)
if not memory.size_taxonomy:
validation["missing_dimensions"].append("size")
elif memory.size_taxonomy not in valid_taxonomies.get("size", []):
validation["invalid_combinations"].append(
f"Invalid size taxonomy: {memory.size_taxonomy}"
)
# Compliance is optional
if (
memory.compliance_taxonomy
and memory.compliance_taxonomy not in valid_taxonomies.get("compliance", [])
):
validation["invalid_combinations"].append(
f"Invalid compliance taxonomy: {memory.compliance_taxonomy}"
)
# Validate specific combinations
if memory.platform_taxonomy and memory.domain_taxonomy:
# Example: Mobile apps shouldn't use CMS domain
if (
memory.platform_taxonomy
in ["ios-native", "android-native", "react-native", "flutter"]
and memory.domain_taxonomy == "cms"
):
validation["warnings"].append(
"Mobile platforms rarely implement full CMS functionality"
)
# Update overall validity
validation["is_valid"] = (
not validation["missing_dimensions"] and not validation["invalid_combinations"]
)
return validation
@mcp.tool(
name="core_verification",
description="Verifies documentation coverage against the MECE taxonomy",
)
async def core_verification(project_name: str) -> dict:
"""Checks documentation against templates and project memory.
Calculates an overall quality/completion score based on MECE taxonomy coverage.
Returns an error if documentation is incomplete based on defined criteria.
Args:
project_name: The name of the project to verify
Returns:
A dictionary with verification results and coverage metrics
"""
logger.info(f"Executing core.verification for project: {project_name}")
# --- Initialize the memory adapter ---
try:
memory_adapter = SQLiteMemoryAdapter()
logger.info(
f"core.verification using DB path: {memory_adapter.db_path.resolve()}"
)
except Exception as e:
logger.error(f"Failed to instantiate SQLiteMemoryAdapter: {e}", exc_info=True)
return {
"status": "error",
"message": "Internal server error: Could not initialize memory adapter.",
}
# --- Load Project Memory ---
try:
memory = await memory_adapter.load_memory(project_name)
if not memory:
logger.warning(
f"Project '{project_name}' not found for VERIFICATION command."
)
return {
"status": "error",
"message": f"Project '{project_name}' not found. Use PAELLA command to start it.",
}
logger.info(f"Successfully loaded memory for project: {project_name}")
except Exception as e:
logger.error(f"Error loading memory for '{project_name}': {e}", exc_info=True)
return {
"status": "error",
"message": f"Failed to load project memory: {e}",
}
# Add MECE validation
mece_validation = validate_mece_structure(memory)
# Calculate coverage only if MECE structure is valid
if not mece_validation["is_valid"]:
return {
"status": "error",
"message": "Invalid MECE taxonomy structure",
"validation": mece_validation,
}
# --- Check for custom taxonomy ---
custom_taxonomy = None
relevant_buckets = set()
min_threshold = BEHAVIOR_CONFIG["minimum_coverage_threshold"]
if hasattr(memory, "custom_taxonomy") and memory.custom_taxonomy:
logger.info(f"Using custom taxonomy for project '{project_name}'")
custom_taxonomy = memory.custom_taxonomy
# Load relevant buckets from custom taxonomy
relevant_buckets = set(custom_taxonomy.get("buckets", []))
logger.info(f"Custom taxonomy has {len(relevant_buckets)} relevant buckets")
# Use custom threshold if specified
if "minimum_coverage_threshold" in custom_taxonomy:
min_threshold = custom_taxonomy["minimum_coverage_threshold"]
logger.info(f"Using custom threshold: {min_threshold}")
else:
logger.info("No custom taxonomy found, using all buckets")
# Use all buckets except system ones
relevant_buckets = {
bucket.value for bucket in Bucket if bucket != Bucket.UNKNOWN
}
# --- Calculate MECE Coverage ---
# Get completion stats for each bucket
bucket_stats: Dict[str, Dict[str, Any]] = {}
total_artifacts = 0
total_completed = 0
total_in_progress = 0
total_pending = 0
# Skip these buckets as they're more system-oriented, not documentation
system_buckets = {
Bucket.UNKNOWN,
Bucket.MAINTAIN_CORE_FUNCTIONALITY,
Bucket.GOVERN_TOOLING_SCRIPTS,
}
system_bucket_values = {b.value for b in system_buckets}
# Custom bucket weights (either from custom taxonomy or defaults)
bucket_weights = {}
# If we have custom taxonomy with bucket details and weights
if custom_taxonomy and "bucket_details" in custom_taxonomy:
for bucket_name, details in custom_taxonomy["bucket_details"].items():
if "weight" in details:
bucket_weights[bucket_name] = details["weight"]
# Default weights for important buckets if not specified in custom taxonomy
if not bucket_weights:
bucket_weights = {
Bucket.INITIATE_INITIAL_PRODUCT_DOCS.value: 1.5, # High importance
Bucket.ELABORATE_SPECIFICATION_AND_PLANNING.value: 1.3, # High importance
Bucket.GOVERN_STANDARDS_METHODOLOGIES.value: 1.2, # Medium-high importance
Bucket.GENERATE_CORE_FUNCTIONALITY.value: 1.1, # Medium-high importance
}
# Calculate stats for each bucket
for bucket in Bucket:
bucket_value = bucket.value
# Skip system buckets and buckets not in the relevant set
if (
bucket in system_buckets
or bucket_value in system_bucket_values
or (relevant_buckets and bucket_value not in relevant_buckets)
):
continue
artifacts = memory.artifacts.get(bucket, [])
if not artifacts:
# If no artifacts but bucket is relevant, track as empty bucket
if bucket_value in relevant_buckets:
bucket_stats[bucket_value] = {
"total": 0,
"completed": 0,
"in_progress": 0,
"pending": 0,
"completion_percentage": 0.0,
}
continue
bucket_total = len(artifacts)
bucket_completed = sum(
1 for a in artifacts if a.status == DocumentStatus.COMPLETED
)
bucket_in_progress = sum(
1 for a in artifacts if a.status == DocumentStatus.IN_PROGRESS
)
bucket_pending = bucket_total - bucket_completed - bucket_in_progress
# Calculate completion percentage
completion_pct = (
(bucket_completed + (bucket_in_progress * 0.5)) / bucket_total
if bucket_total > 0
else 0
)
# Store statistics
bucket_stats[bucket_value] = {
"total": bucket_total,
"completed": bucket_completed,
"in_progress": bucket_in_progress,
"pending": bucket_pending,
"completion_percentage": completion_pct,
}
# Update global counters
total_artifacts += bucket_total
total_completed += bucket_completed
total_in_progress += bucket_in_progress
total_pending += bucket_pending
# Add custom buckets from taxonomy that aren't standard Bucket enums
if custom_taxonomy and "buckets" in custom_taxonomy:
for bucket_name in custom_taxonomy["buckets"]:
# Skip if already processed above
if bucket_name in bucket_stats:
continue
# This is a custom bucket not in the standard Bucket enum
# For now, treat it as empty/pending
bucket_stats[bucket_name] = {
"total": 0,
"completed": 0,
"in_progress": 0,
"pending": 0,
"completion_percentage": 0.0,
"custom": True,
}
# Calculate overall weighted completion percentage
if total_artifacts > 0:
# Simple (unweighted) calculation
simple_completion_pct = (
total_completed + (total_in_progress * 0.5)
) / total_artifacts
# Weighted calculation
weighted_sum = 0
weight_sum = 0
for bucket_name, stats in bucket_stats.items():
if stats.get("total", 0) == 0:
continue
# Get weight for this bucket (default to 1.0)
bucket_weight = bucket_weights.get(bucket_name, 1.0)
weight_sum += bucket_weight
weighted_sum += stats["completion_percentage"] * bucket_weight
weighted_completion_pct = weighted_sum / weight_sum if weight_sum > 0 else 0
else:
simple_completion_pct = 0
weighted_completion_pct = 0
# Determine overall status
is_complete = weighted_completion_pct >= min_threshold
# Identify buckets that need attention (< 50% complete)
needs_attention = []
for bucket, stats in bucket_stats.items():
if stats["completion_percentage"] < 0.5:
needs_attention.append(
{
"bucket": bucket,
"completion": stats["completion_percentage"],
"missing_docs": stats["pending"],
}
)
# Sort by completion percentage (lowest first)
needs_attention.sort(key=lambda x: x["completion"])
# Create verification result
result = {
"status": "ok",
"project_name": project_name,
"overall_status": "complete" if is_complete else "incomplete",
"completion_percentage": weighted_completion_pct,
"simple_completion_percentage": simple_completion_pct,
"meets_threshold": is_complete,
"threshold": min_threshold,
"total_artifacts": total_artifacts,
"total_completed": total_completed,
"total_in_progress": total_in_progress,
"total_pending": total_pending,
"bucket_stats": bucket_stats,
"needs_attention": needs_attention,
"taxonomy_version": memory.taxonomy_version,
"custom_taxonomy": bool(custom_taxonomy),
"message": (
f"Documentation is {weighted_completion_pct:.1%} complete "
f"({'meets' if is_complete else 'does not meet'} {min_threshold:.1%} threshold)."
),
"allow_code_generation": is_complete
or not BEHAVIOR_CONFIG["block_code_generation_if_incomplete"],
"mece_validation": mece_validation,
"taxonomy_structure": {
"platform": memory.platform_taxonomy,
"domain": memory.domain_taxonomy,
"size": memory.size_taxonomy,
"compliance": memory.compliance_taxonomy,
},
}
return result
================
File: paelladoc/adapters/plugins/core/__init__.py
================
import pkgutil
import importlib
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Dynamically import all modules within this 'core' package
# to ensure @mcp.tool decorators are executed.
package_path = str(Path(__file__).parent)
package_name = __name__
logger.info(f"Dynamically loading core plugins from: {package_path}")
for module_info in pkgutil.iter_modules([package_path]):
# Import all .py files (except __init__.py itself)
if module_info.name != "__init__" and not module_info.ispkg:
module_name = f"{package_name}.{module_info.name}"
try:
importlib.import_module(module_name)
logger.debug(f"Successfully loaded core plugin module: {module_name}")
except Exception as e:
logger.warning(f"Could not load core plugin module {module_name}: {e}")
logger.info("Finished dynamic core plugin loading.")
"""
Core plugins for PAELLADOC command handling.
Imports:
- help: Provides the HELP command functionality.
- paella: Initiates new documentation projects.
- continue_proj: Continues existing documentation projects.
- verification: Verifies documentation integrity.
- list_projects: Lists existing projects.
"""
# Removed explicit imports and __all__, relying on dynamic loading above
# from .help import core_help
# from .paella import core_paella # This was causing issues
# from .continue_proj import core_continue
# from .verification import core_verification
# from .list_projects import list_projects
#
# __all__ = [
# "core_help",
# "core_paella",
# "core_continue",
# "core_verification",
# "list_projects",
# ]
================
File: paelladoc/adapters/plugins/core/paella.py
================
"""PAELLADOC project initialization module."""
from pathlib import Path
from typing import Dict, Optional
# Import the shared FastMCP instance from core_logic
from paelladoc.domain.core_logic import mcp, logger
# Domain models and services
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo,
Bucket,
DocumentStatus,
set_time_service,
)
from paelladoc.adapters.services.system_time_service import SystemTimeService
# Adapter for persistence
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# Initialize logger for this module
# logger is already imported from core_logic
# Create FastMCP instance - REMOVED, using imported instance
# mcp = FastMCP("PAELLADOC")
@mcp.tool()
async def paella_init(
base_path: str,
documentation_language: str,
interaction_language: str,
new_project_name: str,
platform_taxonomy: str, # e.g., "pwa", "web-frontend", "vscode-extension"
domain_taxonomy: str,
size_taxonomy: str,
compliance_taxonomy: str,
custom_taxonomy: Optional[Dict] = None, # Still optional
) -> Dict:
"""
Initiates the conversational workflow to define and document a new PAELLADOC project.
This tool gathers essential project details, including the core taxonomies (platform,
domain, size, compliance) which are mandatory for project setup and analysis.
It creates the project structure and persists the initial memory state with all
provided information.
Once executed successfully, the project is initialized with its defined taxonomies and ready
for the next conversational steps.
Args:
base_path: Base path where the project documentation will be stored.
documentation_language: Primary language for the generated documentation (e.g., 'en', 'es').
interaction_language: Language used during conversational interactions (e.g., 'en', 'es').
new_project_name: Unique name for the new PAELLADOC project.
platform_taxonomy: Identifier for the target platform (e.g., "pwa", "web-frontend").
domain_taxonomy: Identifier for the project's domain (e.g., "ecommerce", "healthcare").
size_taxonomy: Identifier for the estimated project size (e.g., "mvp", "enterprise").
compliance_taxonomy: Identifier for any compliance requirements (e.g., "gdpr", "none").
custom_taxonomy: (Optional) A dictionary for any user-defined taxonomy.
Returns:
A dictionary confirming the project's creation ('status': 'ok') or detailing an error ('status': 'error').
On success, includes the 'project_name' and resolved 'base_path'.
"""
logger.info(
f"Initializing new project: {new_project_name} with taxonomies: Platform={platform_taxonomy}, Domain={domain_taxonomy}, Size={size_taxonomy}, Compliance={compliance_taxonomy}"
)
try:
# Initialize TimeService with SystemTimeService implementation
set_time_service(SystemTimeService())
# Initialize memory adapter
memory_adapter = SQLiteMemoryAdapter()
# Create absolute path
abs_base_path = Path(base_path).expanduser().resolve()
# Ensure the base directory exists
abs_base_path.mkdir(parents=True, exist_ok=True)
# Create project memory - passing required taxonomies directly
project_memory = ProjectMemory(
project_info=ProjectInfo(
name=new_project_name,
interaction_language=interaction_language,
documentation_language=documentation_language,
base_path=abs_base_path,
platform_taxonomy=platform_taxonomy,
domain_taxonomy=domain_taxonomy,
size_taxonomy=size_taxonomy,
compliance_taxonomy=compliance_taxonomy,
custom_taxonomy=custom_taxonomy if custom_taxonomy else {},
),
artifacts={
Bucket.INITIATE_INITIAL_PRODUCT_DOCS: [
{
"name": "Project Charter",
"status": DocumentStatus.PENDING,
"bucket": Bucket.INITIATE_INITIAL_PRODUCT_DOCS,
"path": Path("Project_Charter.md"),
}
]
},
platform_taxonomy=platform_taxonomy,
domain_taxonomy=domain_taxonomy,
size_taxonomy=size_taxonomy,
compliance_taxonomy=compliance_taxonomy,
custom_taxonomy=custom_taxonomy if custom_taxonomy else {},
)
# Save to memory
await memory_adapter.save_memory(project_memory)
return {
"status": "ok",
"message": f"Project '{new_project_name}' created successfully at {abs_base_path}",
"project_name": new_project_name,
"base_path": str(abs_base_path),
}
except Exception as e:
logger.error(f"Error creating project: {str(e)}")
return {"status": "error", "message": f"Failed to create project: {str(e)}"}
@mcp.tool()
async def paella_list() -> Dict:
"""Retrieves and lists the names of all PAELLADOC projects stored in the system memory.
This is useful for identifying available projects that can be selected using the
'paella_select' or 'core_continue' tools to resume work.
Returns:
A dictionary containing the operation status ('ok' or 'error'), a list of
project names under the 'projects' key, and a confirmation message.
"""
try:
memory_adapter = SQLiteMemoryAdapter()
projects = await memory_adapter.list_projects()
return {
"status": "ok",
"projects": projects,
"message": "Projects retrieved successfully",
}
except Exception as e:
logger.error(f"Error listing projects: {str(e)}")
return {"status": "error", "message": f"Failed to list projects: {str(e)}"}
@mcp.tool()
async def paella_select(project_name: str) -> Dict:
"""
Loads the memory of an existing PAELLADOC project and sets it as the active context.
This tool makes the specified project the current focus for subsequent conversational
commands and actions within the Paelladoc session. It retrieves the project's state
from the persistent memory.
Args:
project_name: The name of the existing PAELLADOC project to activate.
Returns:
A dictionary containing the operation status ('ok' or 'error'), a confirmation message,
and key details of the selected project (name, base path). Returns an error status
if the project is not found.
"""
try:
memory_adapter = SQLiteMemoryAdapter()
project_memory = await memory_adapter.load_memory(project_name)
if project_memory:
return {
"status": "ok",
"message": f"Project '{project_name}' selected",
"project_name": project_name,
"base_path": str(project_memory.project_info.base_path),
}
else:
return {"status": "error", "message": f"Project '{project_name}' not found"}
except Exception as e:
logger.error(f"Error selecting project: {str(e)}")
return {"status": "error", "message": f"Failed to select project: {str(e)}"}
# Remove the main execution block as this module is not meant to be run directly
# if __name__ == "__main__":
# mcp.run()
================
File: paelladoc/adapters/plugins/core/list_projects.py
================
"""
Plugin for listing existing PAELLADOC projects.
"""
import logging
from typing import Dict, Any
from pathlib import Path
from paelladoc.domain.core_logic import mcp
# Adapter for persistence
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# Project model is not needed here, we only list names
logger = logging.getLogger(__name__)
@mcp.tool(
name="core_list_projects",
description="Lists the names of existing PAELLADOC projects found in the memory.",
)
async def list_projects(
db_path: str = None,
) -> Dict[str, Any]: # Keep db_path for testing
"""Retrieves the list of project names from the persistence layer.
Args:
db_path: Optional database path to use (primarily for testing).
Returns:
A dictionary containing the status and a list of project names.
"""
logger.info(f"Executing core.list_projects command. DB path: {db_path}")
try:
# Use the provided db_path (for tests) or the default path from the adapter
memory_adapter = (
SQLiteMemoryAdapter(db_path=Path(db_path))
if db_path
else SQLiteMemoryAdapter()
)
logger.info(
f"core.list_projects using DB path: {memory_adapter.db_path.resolve()}"
) # Log the actual path used
except Exception as e:
logger.error(f"Failed to instantiate SQLiteMemoryAdapter: {e}", exc_info=True)
return {
"status": "error",
"message": "Internal server error: Could not initialize memory adapter.",
"projects": [], # Return empty list on error
}
try:
# Use the correct method to get only names
project_names = await memory_adapter.list_projects()
count = len(project_names)
message = (
f"Found {count} project{'s' if count != 1 else ''}."
if count > 0
else "No projects found."
)
logger.info(message)
return {
"status": "ok", # Use 'ok' for success
"message": message,
"projects": project_names, # Return the list of names
}
except Exception as e:
logger.error(
f"Error retrieving projects from memory adapter: {e}", exc_info=True
)
return {
"status": "error",
"message": f"Error retrieving projects: {str(e)}",
"projects": [], # Return empty list on error
}
================
File: paelladoc/adapters/plugins/core/continue_proj.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Initialize logger for this module
logger = logging.getLogger(__name__)
# Domain models
from paelladoc.domain.models.project import (
DocumentStatus,
Bucket,
)
# Adapter for persistence
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# Extracted behavior configuration from the original MDC file
BEHAVIOR_CONFIG = {
"calculate_documentation_completion": True,
"code_after_documentation": True,
"confirm_each_parameter": True,
"conversation_required": True,
"documentation_first": True,
"documentation_section_sequence": [
"project_definition",
"market_research",
"user_research",
"problem_definition",
"product_definition",
"architecture_decisions",
"product_roadmap",
"user_stories",
"technical_architecture",
"technical_specifications",
"component_specification",
"api_specification",
"database_design",
"frontend_architecture",
"testing_strategy",
"devops_pipeline",
"security_framework",
"documentation_framework",
],
"enforce_one_question_rule": True,
"force_single_question_mode": True,
"guide_documentation_sequence": True,
"interactive": True,
"load_memory_file": True,
"max_questions_per_message": 1,
"memory_path": "/docs/{project_name}/.memory.json",
"one_parameter_at_a_time": True,
"prevent_web_search": True,
"prohibit_multiple_questions": True,
"provide_section_guidance": True,
"require_step_confirmation": True,
"sequential_questions": True,
"single_question_mode": True,
"strict_parameter_sequence": True,
"strict_question_sequence": True,
"track_documentation_completion": True,
"update_last_modified": True,
"wait_for_response": True,
"wait_for_user_response": True,
}
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="core_continue", description="Continues work on an existing PAELLADOC project."
)
async def core_continue(
project_name: str,
) -> dict: # Added project_name argument, made async
"""Loads an existing project's memory and suggests the next steps.
Args:
project_name (str): The name of the project to continue.
Behavior Config: this tool has associated behavior configuration extracted
from the MDC file. See the `BEHAVIOR_CONFIG` variable in the source code.
"""
logging.info(
f"Executing initial implementation for core.continue for project: {project_name}..."
)
# --- Dependency Injection (Temporary Direct Instantiation) ---
# TODO: Replace with proper dependency injection
try:
# Use the default path defined in the adapter (project root)
memory_adapter = SQLiteMemoryAdapter()
logger.info(f"core.continue using DB path: {memory_adapter.db_path.resolve()}")
# Fetch the list of existing projects (Removed assignment as it's not used here)
# existing_projects = await memory_adapter.list_projects()
except Exception as e:
logging.error(f"Failed to instantiate SQLiteMemoryAdapter: {e}", exc_info=True)
return {
"status": "error",
"message": "Internal server error: Could not initialize memory adapter.",
}
# --- Load Project Memory ---
try:
memory = await memory_adapter.load_memory(project_name)
if not memory:
logging.warning(f"Project '{project_name}' not found for CONTINUE command.")
return {
"status": "error",
"message": f"Project '{project_name}' not found. Use PAELLA command to start it.",
}
logging.info(f"Successfully loaded memory for project: {project_name}")
except Exception as e:
logging.error(f"Error loading memory for '{project_name}': {e}", exc_info=True)
return {
"status": "error",
"message": f"Failed to load project memory: {e}",
}
# --- Calculate Next Step (Simplified) ---
# TODO: Implement sophisticated logic based on BEHAVIOR_CONFIG['documentation_section_sequence']
# For now, find the first pending artifact or report overall status.
next_step_suggestion = (
"No pending artifacts found. Project might be complete or need verification."
)
found_pending = False
# Define a somewhat logical bucket order for checking progress
# This could be moved to config or derived from the taxonomy later
bucket_order = [
Bucket.INITIATE_INITIAL_PRODUCT_DOCS,
Bucket.ELABORATE_DISCOVERY_AND_RESEARCH,
Bucket.ELABORATE_IDEATION_AND_DESIGN,
Bucket.ELABORATE_SPECIFICATION_AND_PLANNING,
Bucket.ELABORATE_CORE_AND_SUPPORT,
Bucket.GOVERN_STANDARDS_METHODOLOGIES,
Bucket.GOVERN_VERIFICATION_VALIDATION,
Bucket.GENERATE_CORE_FUNCTIONALITY,
Bucket.GENERATE_SUPPORTING_ELEMENTS,
Bucket.DEPLOY_PIPELINES_AND_AUTOMATION,
Bucket.DEPLOY_INFRASTRUCTURE_AND_CONFIG,
Bucket.OPERATE_RUNBOOKS_AND_SOPS,
Bucket.OPERATE_MONITORING_AND_ALERTING,
Bucket.ITERATE_LEARNING_AND_ANALYSIS,
Bucket.ITERATE_PLANNING_AND_RETROSPECTION,
# Core/System/Other buckets can be checked last or based on context
Bucket.INITIATE_CORE_SETUP,
Bucket.GOVERN_CORE_SYSTEM,
Bucket.GOVERN_MEMORY_TEMPLATES,
Bucket.GOVERN_TOOLING_SCRIPTS,
Bucket.MAINTAIN_CORE_FUNCTIONALITY,
Bucket.MAINTAIN_SUPPORTING_ELEMENTS,
Bucket.DEPLOY_GUIDES_AND_CHECKLISTS,
Bucket.DEPLOY_SECURITY,
Bucket.OPERATE_MAINTENANCE,
Bucket.UNKNOWN,
]
for bucket in bucket_order:
# Use .get() to safely access potentially missing buckets in memory.artifacts
artifacts_in_bucket = memory.artifacts.get(bucket, [])
for artifact in artifacts_in_bucket:
if artifact.status == DocumentStatus.PENDING:
next_step_suggestion = f"Next suggested step: Work on artifact '{artifact.name}' ({artifact.path}) in bucket '{bucket.value}'."
found_pending = True
break # Found the first pending, stop searching this bucket
if found_pending:
break # Stop searching other buckets
# Get overall phase completion for context
phase_completion_summary = "Phase completion: "
# Define phases based on Bucket enum prefixes
phases = sorted(
list(set(b.value.split("::")[0] for b in Bucket if "::" in b.value))
)
phase_summaries = []
try:
for phase in phases:
stats = memory.get_phase_completion(phase)
if stats["total"] > 0: # Only show phases with artifacts
phase_summaries.append(
f"{phase}({stats['completion_percentage']:.0f}%)"
)
if not phase_summaries:
phase_completion_summary += "(No artifacts tracked yet)"
else:
phase_completion_summary += ", ".join(phase_summaries)
except Exception as e:
logging.warning(f"Could not calculate phase completion: {e}")
phase_completion_summary += "(Calculation error)"
# --- Return Status and Suggestion ---
return {
"status": "ok",
"message": f"Project '{project_name}' loaded. {phase_completion_summary}",
"next_step": next_step_suggestion,
# Optionally return parts of the memory if needed by the client
# "current_taxonomy_version": memory.taxonomy_version
}
================
File: paelladoc/adapters/plugins/core/help.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Adapter for taxonomy loading
from paelladoc.adapters.output.filesystem.taxonomy_provider import (
FileSystemTaxonomyProvider,
)
# Instantiate the taxonomy provider
# TODO: Replace direct instantiation with Dependency Injection
TAXONOMY_PROVIDER = FileSystemTaxonomyProvider()
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="core_help",
description="Shows help information about available commands",
)
def core_help(command: str = None, format: str = "detailed") -> dict:
"""Provides help information about available PAELLADOC commands.
Args:
command: Optional specific command to get help for
format: Output format (detailed, summary, examples)
Returns:
Dictionary with help information
"""
logging.info(f"Executing core.help with command={command}, format={format}")
# Define available commands
commands = {
"paella": {
"description": "Initiates the documentation process for a new project",
"parameters": [
{
"name": "project_name",
"type": "string",
"required": True,
"description": "Name of the project to document",
},
{
"name": "base_path",
"type": "string",
"required": True,
"description": "Base path for project documentation",
},
{
"name": "documentation_language",
"type": "string",
"required": False,
"description": "Language for documentation (e.g. 'es', 'en')",
},
{
"name": "interaction_language",
"type": "string",
"required": False,
"description": "Language for interaction (e.g. 'es', 'en')",
},
],
"example": "PAELLA my_project ~/projects/my_project en en",
},
"continue": {
"description": "Continues working on an existing project",
"parameters": [
{
"name": "project_name",
"type": "string",
"required": True,
"description": "Name of the project to continue with",
},
],
"example": "CONTINUE my_project",
},
"verification": {
"description": "Verifies documentation coverage against the MECE taxonomy",
"parameters": [
{
"name": "project_name",
"type": "string",
"required": True,
"description": "Name of the project to verify",
},
],
"example": "VERIFY my_project",
},
"select_taxonomy": {
"description": "Guides users through selecting and customizing a project taxonomy",
"parameters": [
{
"name": "project_name",
"type": "string",
"required": True,
"description": "Name of the project to customize taxonomy for",
},
{
"name": "size_category",
"type": "string",
"required": False,
"description": "Project size category (personal, hobbyist, mvp, startup, enterprise)",
},
{
"name": "domain_type",
"type": "string",
"required": False,
"description": "Project domain type (web, mobile, iot, ai/ml, etc.)",
},
{
"name": "platform_type",
"type": "string",
"required": False,
"description": "Platform implementation type (chrome-extension, ios-native, android-native, etc.)",
},
{
"name": "compliance_needs",
"type": "string",
"required": False,
"description": "Compliance requirements (none, hipaa, gdpr, etc.)",
},
{
"name": "custom_threshold",
"type": "float",
"required": False,
"description": "Custom coverage threshold (0.0-1.0)",
},
],
"example": "SELECT-TAXONOMY my_project --size=mvp --domain=web --platform=chrome-extension",
},
"taxonomy_info": {
"description": "Shows information about available taxonomies and categories",
"parameters": [],
"example": "TAXONOMY-INFO",
},
"help": {
"description": "Shows help information about available commands",
"parameters": [
{
"name": "command",
"type": "string",
"required": False,
"description": "Specific command to get help for",
},
{
"name": "format",
"type": "string",
"required": False,
"description": "Output format (detailed, summary, examples)",
},
],
"example": "HELP paella",
},
}
# If a specific command is requested
if command and command in commands:
return {"status": "ok", "command": command, "help": commands[command]}
# Otherwise return all commands
result = {
"status": "ok",
"available_commands": list(commands.keys()),
"format": format,
}
# Add command information based on format
if format == "detailed":
result["commands"] = commands
try:
available_taxonomies = TAXONOMY_PROVIDER.get_available_taxonomies()
if "select_taxonomy" in commands:
commands["select_taxonomy"]["available_options"] = available_taxonomies
if "taxonomy_info" in commands:
commands["taxonomy_info"]["available_taxonomies"] = available_taxonomies
except Exception as e:
logging.error(f"Failed to load taxonomies for help: {e}", exc_info=True)
# Continue without taxonomy info if loading fails
elif format == "summary":
result["commands"] = {
cmd: info["description"] for cmd, info in commands.items()
}
elif format == "examples":
result["commands"] = {cmd: info["example"] for cmd, info in commands.items()}
return result
================
File: paelladoc/adapters/plugins/memory/project_memory.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="memory.project_memory",
description="Manages the project's memory file (.memory.json)",
)
def memory_project_memory() -> dict:
"""Handles operations related to the project memory.
Likely used internally by other commands (PAELLA, CONTINUE, VERIFY)
to load, save, and update project state, progress, and metadata.
Provides the HELP CONTEXT (though this might be deprecated).
"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for memory.project_memory...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for memory.project_memory",
}
================
File: paelladoc/adapters/plugins/code/generate_doc.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(name="code.generate_doc", description="3. Wait for user selection")
def code_generate_doc() -> dict:
"""3. Wait for user selection"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for code.generate_doc...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for code.generate_doc",
}
================
File: paelladoc/adapters/plugins/code/generate_context.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="code.generate_context",
description="This automatically creates the context file that will be used by GENERATE-DOC for interactive documentation generation.",
)
def code_generate_context() -> dict:
"""This automatically creates the context file that will be used by GENERATE-DOC for interactive documentation generation."""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for code.generate_context...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for code.generate_context",
}
================
File: paelladoc/adapters/plugins/code/code_generation.py
================
from paelladoc.domain.core_logic import mcp
from typing import Optional, List, Dict, Any # Add necessary types
import logging
# Extracted behavior configuration from the original MDC file
BEHAVIOR_CONFIG = { 'abort_if_documentation_incomplete': True,
'code_after_documentation': True,
'confirm_each_parameter': True,
'conversation_required': True,
'documentation_first': True,
'documentation_verification_path': '/docs/{project_name}/.memory.json',
'enforce_one_question_rule': True,
'extract_from_complete_documentation': True,
'force_single_question_mode': True,
'guide_to_continue_command': True,
'interactive': True,
'max_questions_per_message': 1,
'one_parameter_at_a_time': True,
'prevent_web_search': True,
'prohibit_multiple_questions': True,
'require_complete_documentation': True,
'require_step_confirmation': True,
'required_documentation_sections': [ 'project_definition',
'market_research',
'user_research',
'problem_definition',
'product_definition',
'architecture_decisions',
'product_roadmap',
'user_stories',
'technical_architecture',
'technical_specifications',
'api_specification',
'database_design'],
'sequential_questions': True,
'single_question_mode': True,
'strict_parameter_sequence': True,
'strict_question_sequence': True,
'verify_documentation_completeness': True,
'wait_for_response': True,
'wait_for_user_response': True}
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(name="code.code_generation", description="The command uses the script at `.cursor/rules/scripts/extract_repo_content.py` to perform the repository extraction, which leverages repopack-py to convert the codebase to text.")
def code_code_generation() -> dict:
"""The command uses the script at `.cursor/rules/scripts/extract_repo_content.py` to perform the repository extraction, which leverages repopack-py to convert the codebase to text."""
Behavior Config: this tool has associated behavior configuration extracted from the MDC file. See the `BEHAVIOR_CONFIG` variable in the source code.
"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info(f"Executing stub for code.code_generation...")
# Example: Print parameters
local_vars = locals()
param_values = { }
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {"status": "ok", "message": f"Successfully executed stub for code.code_generation"}
================
File: paelladoc/adapters/plugins/product/product_management.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="product.product_management",
description='Manages product features like stories, tasks, etc. Access: stakeholder: ["read_only"]',
)
def product_product_management() -> dict:
"""Manages product management features.
Handles user stories, tasks, sprints, meeting notes, reports, etc.
Example access control mentioned in description: stakeholder: ["read_only"]
"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for product.product_management...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for product.product_management",
}
================
File: paelladoc/adapters/plugins/styles/coding_styles.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="styles.coding_styles",
description="Manages coding style guides for the project.",
)
def styles_coding_styles() -> dict:
"""Applies, customizes, or lists coding styles.
Supports styles like frontend, backend, chrome_extension, etc.
Uses operations: apply, customize, list, show.
"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for styles.coding_styles...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for styles.coding_styles",
}
================
File: paelladoc/adapters/plugins/styles/git_workflows.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(
name="styles.git_workflows",
description="Manages Git workflow methodologies for the project.",
)
def styles_git_workflows() -> dict:
"""Applies or customizes Git workflows.
Supports workflows like GitHub Flow, GitFlow, Trunk-Based.
Provides guidance based on project complexity.
Simple projects → GitHub Flow
Complex projects → GitFlow or Trunk-Based
"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for styles.git_workflows...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for styles.git_workflows",
}
================
File: paelladoc/adapters/plugins/templates/templates.py
================
from paelladoc.domain.core_logic import mcp
import logging
# Insert behavior config here
# TODO: Review imports and add any other necessary modules
@mcp.tool(name="templates.templates", description="Manages documentation templates.")
def templates_templates() -> dict:
"""Handles the lifecycle of documentation templates.
Likely allows listing, showing, creating, or updating templates.
The previous description mentioned workflows, which seems incorrect here.
"""
# TODO: Implement the actual logic of the command here
# Access parameters using their variable names (e.g., param)
# Access behavior config using BEHAVIOR_CONFIG dict (if present)
logging.info("Executing stub for templates.templates...")
# Example: Print parameters
local_vars = locals()
param_values = {}
logging.info(f"Parameters received: {param_values}")
# Replace with actual return value based on command logic
return {
"status": "ok",
"message": "Successfully executed stub for templates.templates",
}
================
File: paelladoc/adapters/output/chroma/chroma_vector_store_adapter.py
================
import logging
import uuid
from typing import List, Dict, Any, Optional
from pathlib import Path
import chromadb
from chromadb.api.models.Collection import Collection
# Import NotFoundError from the appropriate module depending on chromadb version
try:
from chromadb.errors import NotFoundError
except ImportError:
try:
from chromadb.api.errors import NotFoundError
except ImportError:
class NotFoundError(ValueError):
"""Fallback NotFoundError inheriting from ValueError for broader compatibility."""
pass
# Ports and Domain Models/Helpers
from paelladoc.ports.output.vector_store_port import VectorStorePort, SearchResult
logger = logging.getLogger(__name__)
# Default path for persistent ChromaDB data
DEFAULT_CHROMA_PATH = Path.home() / ".paelladoc" / "chroma_data"
class ChromaSearchResult(SearchResult):
"""Concrete implementation of SearchResult for Chroma results."""
def __init__(
self,
id: str,
distance: Optional[float],
metadata: Optional[Dict[str, Any]],
document: Optional[str],
):
self.id = id
self.distance = distance
self.metadata = metadata
self.document = document
class ChromaVectorStoreAdapter(VectorStorePort):
"""ChromaDB implementation of the VectorStorePort."""
def __init__(
self,
persist_path: Optional[Path] = DEFAULT_CHROMA_PATH,
in_memory: bool = False,
):
"""Initializes the ChromaDB client.
Args:
persist_path: Path to store persistent Chroma data. Ignored if in_memory is True.
in_memory: If True, runs ChromaDB entirely in memory (data is lost on exit).
"""
if in_memory:
logger.info("Initializing ChromaDB client in-memory.")
self.client = chromadb.Client()
else:
self.persist_path = persist_path or DEFAULT_CHROMA_PATH
self.persist_path.mkdir(parents=True, exist_ok=True)
logger.info(
f"Initializing persistent ChromaDB client at: {self.persist_path}"
)
self.client = chromadb.PersistentClient(path=str(self.persist_path))
# TODO: Consider configuration for embedding function, distance function, etc.
# Using Chroma's defaults for now (all-MiniLM-L6-v2 and cosine distance)
async def get_or_create_collection(self, collection_name: str) -> Collection:
"""Gets or creates a Chroma collection."""
try:
collection = self.client.get_collection(name=collection_name)
logger.debug(f"Retrieved existing Chroma collection: {collection_name}")
return collection
except (NotFoundError, ValueError) as e:
# Handle case where collection does not exist (NotFoundError or ValueError)
if "does not exist" in str(e): # Check if the error indicates non-existence
logger.debug(f"Collection '{collection_name}' not found, creating...")
collection = self.client.create_collection(name=collection_name)
logger.info(f"Created new Chroma collection: {collection_name}")
return collection
else:
logger.error(
f"Unexpected error getting collection '{collection_name}': {e}",
exc_info=True,
)
raise
except Exception as e:
logger.error(
f"Error getting or creating collection '{collection_name}': {e}",
exc_info=True,
)
raise
async def add_documents(
self,
collection_name: str,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None,
) -> List[str]:
"""Adds documents to the specified Chroma collection."""
collection = await self.get_or_create_collection(collection_name)
# Generate IDs if not provided
if not ids:
ids = [str(uuid.uuid4()) for _ in documents]
elif len(ids) != len(documents):
raise ValueError("Number of ids must match number of documents")
# Add documents to the collection (this handles embedding generation)
try:
# collection.add is synchronous in the current chromadb client API
collection.add(documents=documents, metadatas=metadatas, ids=ids)
logger.info(
f"Added {len(documents)} documents to collection '{collection_name}'."
)
return ids
except Exception as e:
logger.error(
f"Error adding documents to collection '{collection_name}': {e}",
exc_info=True,
)
raise
async def search_similar(
self,
collection_name: str,
query_texts: List[str],
n_results: int = 5,
where: Optional[Dict[str, Any]] = None,
where_document: Optional[Dict[str, Any]] = None,
include: Optional[List[str]] = ["metadatas", "documents", "distances"],
) -> List[List[SearchResult]]:
"""Searches for similar documents in the Chroma collection."""
try:
collection = self.client.get_collection(name=collection_name)
except (NotFoundError, ValueError) as e:
# Handle case where collection does not exist
if "does not exist" in str(e):
logger.warning(f"Collection '{collection_name}' not found for search.")
return [[] for _ in query_texts]
else:
logger.error(
f"Unexpected error retrieving collection '{collection_name}' for search: {e}",
exc_info=True,
)
raise
except Exception as e:
logger.error(
f"Error retrieving collection '{collection_name}' for search: {e}",
exc_info=True,
)
raise
try:
# collection.query is synchronous
results = collection.query(
query_texts=query_texts,
n_results=n_results,
where=where,
where_document=where_document,
include=include,
)
# Map Chroma's result structure to our SearchResult list of lists
# Chroma returns a dict with keys like 'ids', 'distances', 'metadatas', 'documents'
# Each value is a list of lists (one inner list per query)
mapped_results: List[List[SearchResult]] = []
num_queries = len(query_texts)
result_ids = results.get("ids") or [[] for _ in range(num_queries)]
result_distances = results.get("distances") or [
[] for _ in range(num_queries)
]
result_metadatas = results.get("metadatas") or [
[] for _ in range(num_queries)
]
result_documents = results.get("documents") or [
[] for _ in range(num_queries)
]
for i in range(num_queries):
query_results = []
# Ensure all result lists have the expected length for the i-th query
num_docs_for_query = (
len(result_ids[i]) if result_ids and i < len(result_ids) else 0
)
for j in range(num_docs_for_query):
query_results.append(
ChromaSearchResult(
id=result_ids[i][j]
if result_ids
and i < len(result_ids)
and j < len(result_ids[i])
else "N/A",
distance=result_distances[i][j]
if result_distances
and i < len(result_distances)
and j < len(result_distances[i])
else None,
metadata=result_metadatas[i][j]
if result_metadatas
and i < len(result_metadatas)
and j < len(result_metadatas[i])
else None,
document=result_documents[i][j]
if result_documents
and i < len(result_documents)
and j < len(result_documents[i])
else None,
)
)
mapped_results.append(query_results)
return mapped_results
except Exception as e:
logger.error(
f"Error querying collection '{collection_name}': {e}", exc_info=True
)
raise
async def delete_collection(self, collection_name: str) -> None:
"""Deletes a Chroma collection."""
try:
self.client.delete_collection(name=collection_name)
logger.info(f"Deleted Chroma collection: {collection_name}")
except (NotFoundError, ValueError) as e:
# Handle case where collection does not exist
if "does not exist" in str(e):
logger.warning(
f"Attempted to delete non-existent collection: {collection_name}"
)
else:
logger.error(
f"Unexpected error deleting collection '{collection_name}': {e}",
exc_info=True,
)
raise
except Exception as e:
logger.error(
f"Error deleting collection '{collection_name}': {e}", exc_info=True
)
raise
================
File: paelladoc/adapters/output/filesystem/taxonomy_provider.py
================
import logging
from pathlib import Path
from typing import Dict, List
from paelladoc.ports.output.taxonomy_provider import TaxonomyProvider
logger = logging.getLogger(__name__)
# Determine the base path relative to this file's location
# Assumes this structure: src/paelladoc/adapters/output/filesystem/taxonomy_provider.py
# And taxonomies are at: project_root/taxonomies/
ADAPTER_DIR = Path(__file__).parent
SRC_DIR = ADAPTER_DIR.parent.parent.parent
PROJECT_ROOT = SRC_DIR.parent
TAXONOMY_BASE_PATH = PROJECT_ROOT / "taxonomies"
class FileSystemTaxonomyProvider(TaxonomyProvider):
"""Provides available taxonomy information by scanning filesystem directories."""
def __init__(self, base_path: Path = TAXONOMY_BASE_PATH):
"""Initializes the provider with the base path to the taxonomy directories."""
self.base_path = base_path
if not self.base_path.is_dir():
logger.error(
f"Taxonomy base path not found or not a directory: {self.base_path.resolve()}"
)
# Raise an error or handle appropriately? For now, log and continue.
# raise FileNotFoundError(f"Taxonomy base path not found: {self.base_path}")
self._cached_taxonomies: Dict[str, List[str]] | None = None
def get_available_taxonomies(self) -> Dict[str, List[str]]:
"""Scans the taxonomy directories and loads available taxonomy names.
Uses a simple cache to avoid repeated filesystem scans.
"""
if self._cached_taxonomies is not None:
logger.debug("Returning cached taxonomies")
return self._cached_taxonomies
logger.debug(f"Scanning for taxonomies in: {self.base_path.resolve()}")
available_taxonomies = {}
categories = ["platform", "domain", "size", "compliance"]
if not self.base_path.is_dir():
logger.error(
f"Cannot scan taxonomies, base path is invalid: {self.base_path.resolve()}"
)
return {cat: [] for cat in categories} # Return empty if base path is bad
for category in categories:
category_path = self.base_path / category
if category_path.is_dir():
try:
tax_files = sorted(
f.stem # Get filename without .json extension
for f in category_path.glob("*.json")
if f.is_file()
)
available_taxonomies[category] = tax_files
logger.debug(
f"Found {len(tax_files)} taxonomies in '{category}': {tax_files}"
)
except OSError as e:
logger.error(
f"Error reading taxonomy directory {category_path}: {e}"
)
available_taxonomies[category] = []
else:
available_taxonomies[category] = []
logger.warning(f"Taxonomy directory not found: {category_path}")
self._cached_taxonomies = available_taxonomies
logger.info(
f"Loaded {sum(len(v) for v in available_taxonomies.values())} taxonomies across {len(categories)} categories."
)
return available_taxonomies
================
File: paelladoc/adapters/output/sqlite/models.py
================
from typing import List, Optional
from sqlmodel import Field, Relationship, SQLModel, Column, JSON
import datetime
# Note: Domain Enums like DocumentStatus are not directly used here,
# we store their string representation (e.g., 'pending').
# The adapter layer will handle the conversion.
# --- Database Models ---
# Forward references are needed for relationships defined before the target model
class ProjectInfoDB(SQLModel, table=True):
# Represents the metadata associated with a project memory entry
id: Optional[int] = Field(default=None, primary_key=True)
# name field is stored in ProjectMemoryDB as it's the primary identifier
language: Optional[str] = None
purpose: Optional[str] = None
target_audience: Optional[str] = None
objectives: Optional[List[str]] = Field(default=None, sa_column=Column(JSON))
# Define the one-to-one relationship back to ProjectMemoryDB
# Use Optional because a metadata row might briefly exist before being linked
project_memory: Optional["ProjectMemoryDB"] = Relationship(
back_populates="project_meta"
)
class ProjectDocumentDB(SQLModel, table=True):
# Represents a single document tracked within a project memory
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True) # Name of the document file (e.g., "README.md")
template_origin: Optional[str] = None
status: str = Field(default="pending", index=True) # Store enum string value
# Foreign key to link back to the main project memory entry
project_memory_id: Optional[int] = Field(
default=None, foreign_key="projectmemorydb.id"
)
# Define the many-to-one relationship back to ProjectMemoryDB
project_memory: Optional["ProjectMemoryDB"] = Relationship(
back_populates="documents"
)
class ProjectMemoryDB(SQLModel, table=True):
# Represents the main project memory entry in the database
id: Optional[int] = Field(default=None, primary_key=True)
# Use project_name from metadata as the main unique identifier for lookups
name: str = Field(
index=True, unique=True
) # Changed from project_name to match domain model
# New fields to match domain model
base_path: str = Field(default="") # Store as string, convert to Path in adapter
interaction_language: str = Field(default="en-US")
documentation_language: str = Field(default="en-US")
taxonomy_version: str = Field(default="0.5")
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
last_updated_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
# Foreign key to link to the associated metadata entry
project_meta_id: Optional[int] = Field(
default=None, foreign_key="projectmetadatadb.id", unique=True
)
# Define the one-to-one relationship to ProjectInfoDB
project_meta: Optional[ProjectInfoDB] = Relationship(
back_populates="project_memory"
)
# Define the one-to-many relationship to ProjectDocumentDB
documents: List[ProjectDocumentDB] = Relationship(back_populates="project_memory")
================
File: paelladoc/adapters/output/sqlite/mapper.py
================
"""
Mapping functions between domain models and SQLite DB models.
"""
import logging
from typing import Dict, List, Optional
from pathlib import Path
import datetime
import uuid
# Domain Models
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo,
ArtifactMeta,
Bucket, # Import if needed for mapping logic (e.g., default status)
)
# Database Models
from .db_models import ProjectMemoryDB, ArtifactMetaDB
logger = logging.getLogger(__name__)
def _ensure_utc(dt: Optional[datetime.datetime]) -> Optional[datetime.datetime]:
"""Ensures a datetime object is UTC, converting naive datetimes."""
if dt is None:
return None
if dt.tzinfo is None:
# Assume naive datetimes from DB are UTC, or handle conversion if needed
return dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def map_db_to_domain(db_memory: ProjectMemoryDB) -> ProjectMemory:
"""Maps the DB model hierarchy to the domain ProjectMemory model."""
# Map ProjectInfo (formerly metadata)
domain_project_info = ProjectInfo(
name=db_memory.name,
language=db_memory.language,
purpose=db_memory.purpose,
target_audience=db_memory.target_audience,
objectives=db_memory.objectives
if db_memory.objectives
else [], # Handle potential None from DB
base_path=Path(db_memory.base_path) if db_memory.base_path else None,
interaction_language=db_memory.interaction_language,
documentation_language=db_memory.documentation_language,
taxonomy_version=db_memory.taxonomy_version,
platform_taxonomy=db_memory.platform_taxonomy,
domain_taxonomy=db_memory.domain_taxonomy,
size_taxonomy=db_memory.size_taxonomy,
compliance_taxonomy=db_memory.compliance_taxonomy,
custom_taxonomy=db_memory.custom_taxonomy
if db_memory.custom_taxonomy
else {}, # Handle potential None
taxonomy_validation=db_memory.taxonomy_validation
if db_memory.taxonomy_validation
else {}, # Handle potential None
)
# Map Artifacts
domain_artifacts: Dict[Bucket, List[ArtifactMeta]] = {
bucket: []
for bucket in Bucket # Initialize all buckets
}
if db_memory.artifacts: # Check if artifacts relationship is loaded/exists
for db_artifact in db_memory.artifacts:
try:
# Attempt to get the bucket enum member; default to UNKNOWN if invalid
bucket_enum = Bucket(db_artifact.bucket)
except ValueError:
logger.warning(
f"Artifact {db_artifact.id} has invalid bucket value '{db_artifact.bucket}' stored in DB. Mapping to UNKNOWN."
)
bucket_enum = Bucket.UNKNOWN
domain_artifact = ArtifactMeta(
id=db_artifact.id,
name=db_artifact.name,
bucket=bucket_enum,
path=Path(db_artifact.path), # Use path string directly
created_at=_ensure_utc(db_artifact.created_at),
updated_at=_ensure_utc(db_artifact.updated_at),
created_by=db_artifact.created_by,
modified_by=db_artifact.modified_by,
status=db_artifact.status,
)
# Append to the correct bucket list, handle UNKNOWN explicitly if needed elsewhere
domain_artifacts[bucket_enum].append(domain_artifact)
# Remove empty buckets if desired (or keep them as per domain logic)
# domain_artifacts = {k: v for k, v in domain_artifacts.items() if v}
# Assemble the final domain ProjectMemory object
domain_memory = ProjectMemory(
project_info=domain_project_info,
artifacts=domain_artifacts,
taxonomy_version=db_memory.taxonomy_version,
created_at=_ensure_utc(db_memory.created_at),
last_updated_at=_ensure_utc(db_memory.last_updated_at),
created_by=db_memory.created_by,
modified_by=db_memory.modified_by,
# Map taxonomy fields from ProjectMemoryDB to ProjectMemory
platform_taxonomy=db_memory.platform_taxonomy,
domain_taxonomy=db_memory.domain_taxonomy,
size_taxonomy=db_memory.size_taxonomy,
compliance_taxonomy=db_memory.compliance_taxonomy,
custom_taxonomy=db_memory.custom_taxonomy if db_memory.custom_taxonomy else {},
taxonomy_validation=db_memory.taxonomy_validation
if db_memory.taxonomy_validation
else {},
)
return domain_memory
def map_domain_to_db(
domain_memory: ProjectMemory, existing_db_memory: Optional[ProjectMemoryDB] = None
) -> ProjectMemoryDB:
"""
Maps the domain ProjectMemory model to a ProjectMemoryDB model.
Handles both creating a new DB object and updating an existing one.
"""
now_utc = datetime.datetime.now(datetime.timezone.utc)
# --- Map Project Info / Top-Level Fields ---
project_info = domain_memory.project_info
if existing_db_memory:
db_memory = existing_db_memory
# Update fields from ProjectInfo
db_memory.language = project_info.language
db_memory.purpose = project_info.purpose
db_memory.target_audience = project_info.target_audience
db_memory.objectives = project_info.objectives
db_memory.base_path = (
str(project_info.base_path) if project_info.base_path else None
)
db_memory.interaction_language = project_info.interaction_language
db_memory.documentation_language = project_info.documentation_language
# Update fields from ProjectMemory
db_memory.taxonomy_version = domain_memory.taxonomy_version
db_memory.last_updated_at = (
_ensure_utc(domain_memory.last_updated_at) or now_utc
)
db_memory.modified_by = domain_memory.modified_by
db_memory.platform_taxonomy = domain_memory.platform_taxonomy
db_memory.domain_taxonomy = domain_memory.domain_taxonomy
db_memory.size_taxonomy = domain_memory.size_taxonomy
db_memory.compliance_taxonomy = domain_memory.compliance_taxonomy
db_memory.custom_taxonomy = domain_memory.custom_taxonomy
db_memory.taxonomy_validation = domain_memory.taxonomy_validation
else:
# Create new ProjectMemoryDB
db_memory = ProjectMemoryDB(
name=project_info.name,
language=project_info.language,
purpose=project_info.purpose,
target_audience=project_info.target_audience,
objectives=project_info.objectives,
base_path=str(project_info.base_path) if project_info.base_path else None,
interaction_language=project_info.interaction_language,
documentation_language=project_info.documentation_language,
taxonomy_version=domain_memory.taxonomy_version,
created_at=_ensure_utc(domain_memory.created_at) or now_utc,
last_updated_at=_ensure_utc(domain_memory.last_updated_at) or now_utc,
created_by=domain_memory.created_by,
modified_by=domain_memory.modified_by,
platform_taxonomy=domain_memory.platform_taxonomy,
domain_taxonomy=domain_memory.domain_taxonomy,
size_taxonomy=domain_memory.size_taxonomy,
compliance_taxonomy=domain_memory.compliance_taxonomy,
custom_taxonomy=domain_memory.custom_taxonomy,
taxonomy_validation=domain_memory.taxonomy_validation,
artifacts=[], # Initialize relationship list
)
# --- Map Artifacts ---
# This logic needs the db_memory.id if creating new artifacts,
# so it's better handled within the adapter's session context after flushing.
# This function will return the populated/updated ProjectMemoryDB *without*
# fully resolved artifacts if it's a new object. The adapter will handle artifact sync.
# If updating, we can potentially return the artifact list structure needed?
# For simplicity, let's return the main object mapping and let the adapter handle artifact sync.
return db_memory
def sync_artifacts_db(
session, # Pass the SQLAlchemy session
domain_memory: ProjectMemory,
db_memory: ProjectMemoryDB, # Assumes db_memory exists and has an ID
) -> None:
"""
Synchronizes the ArtifactMetaDB entries based on the domain model's artifacts.
This function should be called within the adapter's session context *after*
the ProjectMemoryDB object exists and has an ID (i.e., after adding and flushing if new).
Args:
session: The active SQLAlchemy AsyncSession.
domain_memory: The source domain model.
db_memory: The target database model (must have an ID).
"""
if not db_memory.id:
logger.error("Cannot sync artifacts: ProjectMemoryDB object has no ID.")
# Or raise an error?
return
# Use eager loading if artifacts aren't already loaded
# This check might be redundant depending on how db_memory was obtained
if "artifacts" not in db_memory.__dict__: # Basic check if relationship is loaded
logger.warning(
"Artifacts relationship not loaded on db_memory. Explicit loading might be needed."
)
# Potentially load it here if necessary, but ideally it's loaded beforehand
# await session.refresh(db_memory, attribute_names=['artifacts'])
db_artifacts_map: Dict[uuid.UUID, ArtifactMetaDB] = {
a.id: a for a in db_memory.artifacts
}
domain_artifact_ids = set()
artifacts_to_add = []
artifacts_to_delete = []
for bucket, domain_artifact_list in domain_memory.artifacts.items():
for domain_artifact in domain_artifact_list:
if not isinstance(domain_artifact, ArtifactMeta):
logger.warning(
f"Skipping non-ArtifactMeta item found in domain artifacts: {domain_artifact}"
)
continue # Skip if somehow a non-artifact is in the list
domain_artifact_ids.add(domain_artifact.id)
db_artifact = db_artifacts_map.get(domain_artifact.id)
if db_artifact:
# Update existing artifact
db_artifact.name = domain_artifact.name
db_artifact.bucket = domain_artifact.bucket # Store enum directly
db_artifact.path = str(domain_artifact.path)
db_artifact.status = domain_artifact.status # Store enum directly
db_artifact.updated_at = _ensure_utc(
domain_artifact.updated_at
) or datetime.datetime.now(datetime.timezone.utc)
db_artifact.modified_by = domain_artifact.modified_by
# No need to add to session explicitly if object is already managed
else:
# Create new artifact DB object
new_db_artifact = ArtifactMetaDB(
id=domain_artifact.id,
project_memory_id=db_memory.id, # Link to parent
name=domain_artifact.name,
bucket=domain_artifact.bucket,
path=str(domain_artifact.path),
created_at=_ensure_utc(domain_artifact.created_at)
or datetime.datetime.now(datetime.timezone.utc),
updated_at=_ensure_utc(domain_artifact.updated_at)
or datetime.datetime.now(datetime.timezone.utc),
created_by=domain_artifact.created_by,
modified_by=domain_artifact.modified_by,
status=domain_artifact.status,
)
artifacts_to_add.append(new_db_artifact)
# Identify artifacts to delete
for db_artifact_id, db_artifact in db_artifacts_map.items():
if db_artifact_id not in domain_artifact_ids:
artifacts_to_delete.append(db_artifact)
# Perform session operations (caller should handle commit/rollback)
if artifacts_to_add:
session.add_all(artifacts_to_add)
logger.debug(
f"Adding {len(artifacts_to_add)} new artifacts to session for project {db_memory.name}."
)
# Deleting requires awaiting async session.delete for each
# This needs to be done carefully within the async context of the adapter
# This function CANNOT await session.delete directly if it's synchronous.
# Let's return the list of objects to delete.
# Instead of deleting here, return the list to the async adapter method
# for artifact_to_delete in artifacts_to_delete:
# logger.debug(f"Marking artifact {artifact_to_delete.id} ({artifact_to_delete.name}) for deletion from project {db_memory.name}.")
# # await session.delete(artifact_to_delete) # Cannot do async op here
return artifacts_to_delete # Return list of DB objects to be deleted by the caller
================
File: paelladoc/adapters/output/sqlite/sqlite_memory_adapter.py
================
"""SQLite adapter for project memory persistence."""
import logging
from typing import Optional, List
from pathlib import Path
from sqlmodel import SQLModel, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, selectinload
from sqlalchemy.exc import IntegrityError
# Ports and Domain Models
from paelladoc.ports.output.memory_port import MemoryPort
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo,
)
# Database Models for this adapter
from .db_models import ProjectMemoryDB
# Import the new mapper functions
from .mapper import map_db_to_domain, map_domain_to_db, sync_artifacts_db
# Configuration
from paelladoc.config.database import get_db_path
# Default database path (obtained via config logic)
# DEFAULT_DB_PATH = get_db_path() # No longer needed as constant? __init__ uses get_db_path()
logger = logging.getLogger(__name__)
# Remove redundant/fragile PROJECT_ROOT calculation
# PROJECT_ROOT = Path(__file__).parent.parent.parent.parent.parent
# logger.info(f"Project root calculated as: {PROJECT_ROOT.resolve()}")
# DEFAULT_DB_PATH = PROJECT_ROOT / "paelladoc_memory.db"
# logger.info(f"Default database path set to: {DEFAULT_DB_PATH.resolve()}")
class SQLiteMemoryAdapter(MemoryPort):
"""SQLite implementation of the MemoryPort using new MECE/Artifact models."""
# Keep __init__ from HEAD (using get_db_path)
def __init__(self, db_path: str | Path | None = None):
"""
Initialize the SQLite adapter.
Args:
db_path: Optional custom database path. If not provided, uses the configured default.
"""
self.db_path = Path(db_path) if db_path else get_db_path()
logger.info(
f"Initializing SQLite adapter with database path: {self.db_path.resolve()}"
)
# Ensure the parent directory exists
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Create async engine
self.async_engine = create_async_engine(
f"sqlite+aiosqlite:///{self.db_path}",
echo=False, # Set to True for SQL query logging
connect_args={"check_same_thread": False}, # Necessary for SQLite async
)
# Create async session factory (named async_session)
self.async_session = sessionmaker(
self.async_engine, class_=AsyncSession, expire_on_commit=False
)
logger.info("SQLiteMemoryAdapter initialized.")
async def _create_db_and_tables(self):
"""Creates the database and tables if they don't exist."""
async with self.async_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
logger.info("Database tables checked/created.")
# --- MemoryPort Implementation --- #
async def save_memory(self, memory: ProjectMemory) -> None:
"""Saves the project memory state (including artifacts) to SQLite using the mapper."""
project_name = memory.project_info.name
logger.debug(f"Attempting to save memory for project: {project_name}")
await self._create_db_and_tables()
async with self.async_session() as session:
try:
# Try to load existing DB object
statement = (
select(ProjectMemoryDB)
.where(ProjectMemoryDB.name == project_name)
.options(
selectinload(ProjectMemoryDB.artifacts)
) # Eager load artifacts
)
results = await session.execute(statement)
existing_db_memory = results.scalars().first()
# Use mapper to map domain object to DB object (create or update)
db_memory = map_domain_to_db(memory, existing_db_memory)
# Add the main object to the session (SQLModel handles INSERT or UPDATE)
session.add(db_memory)
# If creating, flush to get the ID before syncing artifacts
if not existing_db_memory:
await session.flush() # Get the db_memory.id
logger.debug(
f"Flushed new project {db_memory.name} with ID {db_memory.id}"
)
# Sync artifacts (add/update/prepare for delete)
# The mapper function now returns the list of artifacts to delete
artifacts_to_delete = sync_artifacts_db(session, memory, db_memory)
# Perform deletions if any artifacts were marked
if artifacts_to_delete:
logger.debug(
f"Deleting {len(artifacts_to_delete)} artifacts from session for project {project_name}"
)
for artifact_to_del in artifacts_to_delete:
await session.delete(artifact_to_del)
# Commit all changes (project add/update, artifact add/update/delete)
await session.commit()
logger.info(f"Successfully saved memory for project: {project_name}")
except IntegrityError as e:
await session.rollback()
logger.error(
f"Integrity error saving project '{project_name}': {e}",
exc_info=True,
)
raise ValueError(
f"Project '{project_name}' might already exist or another integrity issue occurred."
) from e
except Exception as e:
await session.rollback()
logger.error(
f"Unexpected error saving project '{project_name}': {e}",
exc_info=True,
)
raise
async def load_memory(self, project_name: str) -> Optional[ProjectMemory]:
"""Loads project memory (including artifacts) from SQLite using the mapper."""
logger.debug(f"Attempting to load memory for project: {project_name}")
await self._create_db_and_tables()
async with self.async_session() as session:
try:
statement = (
select(ProjectMemoryDB)
.where(ProjectMemoryDB.name == project_name)
.options(
selectinload(ProjectMemoryDB.artifacts)
) # Eager load artifacts
)
results = await session.execute(statement)
db_memory = results.scalars().first()
if db_memory:
logger.debug(
f"Found project '{project_name}' in DB, mapping to domain model."
)
# Use the mapper function
return map_db_to_domain(db_memory)
else:
logger.debug(f"Project '{project_name}' not found in DB.")
return None
except Exception as e:
logger.error(
f"Error loading project '{project_name}': {e}", exc_info=True
)
# Optional: Re-raise a custom domain exception?
return None # Return None on error for now
async def project_exists(self, project_name: str) -> bool:
"""Checks if a project memory exists in the SQLite database."""
logger.debug(f"Checking existence for project: {project_name}")
await self._create_db_and_tables()
async with self.async_session() as session:
try:
statement = select(ProjectMemoryDB.id).where(
ProjectMemoryDB.name == project_name
)
results = await session.execute(statement)
exists = results.scalars().first() is not None
logger.debug(f"Project '{project_name}' exists: {exists}")
return exists
except Exception as e:
logger.error(
f"Error checking project existence for '{project_name}': {e}",
exc_info=True,
)
return False
async def list_projects(self) -> List[ProjectInfo]: # Return ProjectInfo objects
"""Lists basic info for all projects stored in the database."""
logger.debug("Listing all projects info from database.")
await self._create_db_and_tables()
projects_info: List[ProjectInfo] = []
async with self.async_session() as session:
try:
# Select necessary columns for ProjectInfo
statement = select(
ProjectMemoryDB.name,
ProjectMemoryDB.language,
ProjectMemoryDB.purpose,
ProjectMemoryDB.target_audience,
ProjectMemoryDB.objectives,
ProjectMemoryDB.base_path,
ProjectMemoryDB.interaction_language,
ProjectMemoryDB.documentation_language,
ProjectMemoryDB.taxonomy_version, # Added taxonomy version
ProjectMemoryDB.platform_taxonomy,
ProjectMemoryDB.domain_taxonomy,
ProjectMemoryDB.size_taxonomy,
ProjectMemoryDB.compliance_taxonomy,
ProjectMemoryDB.custom_taxonomy,
ProjectMemoryDB.taxonomy_validation,
)
results = await session.execute(statement)
for row in results.all():
# Manually map row to ProjectInfo domain model
# Consider a dedicated mapper function if this gets complex
info = ProjectInfo(
name=row.name,
language=row.language,
purpose=row.purpose,
target_audience=row.target_audience,
objectives=row.objectives if row.objectives else [],
base_path=Path(row.base_path) if row.base_path else None,
interaction_language=row.interaction_language,
documentation_language=row.documentation_language,
taxonomy_version=row.taxonomy_version,
platform_taxonomy=row.platform_taxonomy,
domain_taxonomy=row.domain_taxonomy,
size_taxonomy=row.size_taxonomy,
compliance_taxonomy=row.compliance_taxonomy,
custom_taxonomy=row.custom_taxonomy
if row.custom_taxonomy
else {},
taxonomy_validation=row.taxonomy_validation
if row.taxonomy_validation
else {},
)
projects_info.append(info)
logger.debug(f"Found {len(projects_info)} projects.")
return projects_info
except Exception as e:
logger.error(f"Error listing projects: {e}", exc_info=True)
return [] # Return empty list on error
# list_projects_names removed as list_projects now returns ProjectInfo
# Remove ensure_utc helper method from the adapter (should be in mapper)
# def ensure_utc(self, dt: datetime.datetime) -> datetime.datetime:
# ...
================
File: paelladoc/adapters/output/sqlite/db_models.py
================
from typing import List, Optional
from uuid import UUID, uuid4
from datetime import datetime
from pathlib import Path
from sqlmodel import Field, Relationship, SQLModel, Column # Import Column for JSON
from sqlalchemy.sql.sqltypes import JSON # Import JSON type
from paelladoc.domain.models.project import (
Bucket,
DocumentStatus,
) # Import enums from domain
# --- Knowledge Graph Documentation ---
"""
Knowledge Graph (KG) Ready Model Design
This file defines SQLModel models with relationships specifically designed to be
KG-compatible. Each relationship defined here (through foreign keys) represents a
potential edge in a knowledge graph.
Primary Nodes:
- ProjectMemoryDB: Represents a project (central node)
- ArtifactMetaDB: Represents documentation artifacts
- TaxonomyDB: Represents MECE taxonomy selections
Edge Types (Relationships):
1. HAS_ARTIFACT: ProjectMemoryDB -> ArtifactMetaDB
- Direction: Project contains artifacts
- Properties: None (simple containment)
- FK: ArtifactMetaDB.project_memory_id -> ProjectMemoryDB.id
2. HAS_TAXONOMY: ProjectMemoryDB -> TaxonomyDB
- Direction: Project uses taxonomy combinations
- Properties: Selected categories
- Validates MECE structure
3. IMPLEMENTS: ArtifactMetaDB -> TaxonomyDB
- Direction: Artifact implements taxonomy requirements
- Properties: Coverage metrics
Future Potential Edges:
1. DEPENDS_ON: ArtifactMetaDB -> ArtifactMetaDB
- Would represent dependencies between artifacts
- Need to add a dependencies table or attribute
2. CREATED_BY: ArtifactMetaDB -> User
- Connects artifacts to creators
- Already tracking created_by/modified_by fields
Query Patterns:
- Find all artifacts for a project: ProjectMemoryDB -[HAS_ARTIFACT]-> ArtifactMetaDB
- Find taxonomy coverage: ProjectMemoryDB -[HAS_TAXONOMY]-> TaxonomyDB
- Validate MECE structure: TaxonomyDB -[IMPLEMENTS]-> ArtifactMetaDB
MECE Structure Support:
- Platform taxonomies (web, mobile, desktop, extensions)
- Domain taxonomies (infra, tools, data/AI, business)
- Size taxonomies (personal to enterprise)
- Compliance taxonomies (GDPR, HIPAA, PCI)
"""
# --- Artifact Model ---
class ArtifactMetaDB(SQLModel, table=True):
"""Database model for ArtifactMeta"""
# Use the domain UUID as the primary key
id: UUID = Field(default_factory=uuid4, primary_key=True, index=True)
# KG Edge: HAS_ARTIFACT (ProjectMemoryDB -> ArtifactMetaDB)
# This foreign key creates a directional relationship from Project to Artifact
project_memory_id: UUID = Field(foreign_key="projectmemorydb.id", index=True)
name: str = Field(index=True)
bucket: Bucket = Field(index=True) # Store enum value directly
path: str = Field(index=True) # Store Path as string
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# KG Node Properties for actor/authorship tracking
# These fields can be used to create CREATED_BY and MODIFIED_BY edges in a KG
created_by: Optional[str] = Field(default=None, index=True)
modified_by: Optional[str] = Field(default=None, index=True)
status: DocumentStatus = Field(index=True) # Store enum value directly
# Define the relationship back to ProjectMemoryDB
# This defines the reverse navigation for the HAS_ARTIFACT relationship
project_memory: "ProjectMemoryDB" = Relationship(back_populates="artifacts")
# KG-Ready: Store Path as string for easier querying/linking
def __init__(self, *, path: Path, **kwargs):
super().__init__(path=str(path), **kwargs)
@property
def path_obj(self) -> Path:
return Path(self.path)
# --- Project Memory Model ---
class ProjectMemoryDB(SQLModel, table=True):
"""Project memory database model."""
# Use a separate UUID for the DB primary key, keep metadata name unique?
# Or use metadata.name as PK? For now, using UUID.
id: UUID = Field(default_factory=uuid4, primary_key=True, index=True)
name: str = Field(unique=True, index=True) # From metadata.name
language: Optional[str] = Field(default=None)
purpose: Optional[str] = Field(default=None)
target_audience: Optional[str] = Field(default=None)
objectives: Optional[List[str]] = Field(
sa_column=Column(JSON), default=None
) # Store list as JSON
base_path: Optional[str] = Field(
default=None
) # Store as string representation of Path
interaction_language: Optional[str] = Field(default=None)
documentation_language: Optional[str] = Field(default=None)
taxonomy_version: str
created_at: datetime = Field(default_factory=datetime.utcnow)
last_updated_at: datetime = Field(default_factory=datetime.utcnow)
# KG Node Properties for actor/authorship tracking
created_by: Optional[str] = Field(default=None, index=True)
modified_by: Optional[str] = Field(default=None, index=True)
# MECE Taxonomy Configuration
platform_taxonomy: Optional[str] = Field(index=True) # Selected platform taxonomy
domain_taxonomy: Optional[str] = Field(index=True) # Selected domain taxonomy
size_taxonomy: Optional[str] = Field(index=True) # Selected size taxonomy
compliance_taxonomy: Optional[str] = Field(
index=True
) # Selected compliance taxonomy
# Custom taxonomy configuration for this project
custom_taxonomy: Optional[dict] = Field(
sa_column=Column(JSON), default=None
) # Store as JSON object
# MECE validation status
taxonomy_validation: Optional[dict] = Field(
sa_column=Column(JSON), default=None
) # Store validation results
# Define the one-to-many relationship to ArtifactMetaDB
# KG Edge: HAS_ARTIFACT (ProjectMemoryDB -> ArtifactMetaDB)
# artifacts will be loaded automatically by SQLModel/SQLAlchemy when accessed
artifacts: List["ArtifactMetaDB"] = Relationship(back_populates="project_memory")
# TODO: Decide how to handle the old 'documents' field if migration is needed.
# Could be another JSON field temporarily or migrated into ArtifactMetaDB.
# For now, omitting it, assuming new structure only or migration handles it.
================
File: paelladoc/adapters/services/system_time_service.py
================
"""System implementation of the time service."""
import datetime
from ...domain.services.time_service import TimeService
class SystemTimeService(TimeService):
"""System implementation of TimeService using system clock."""
def get_current_time(self) -> datetime.datetime:
"""Get current timestamp in UTC using system clock."""
return datetime.datetime.now(datetime.timezone.utc)
def ensure_utc(self, dt: datetime.datetime) -> datetime.datetime:
"""Convert a datetime to UTC.
If the datetime has no timezone info, assumes it's in UTC.
"""
if dt.tzinfo is None:
return dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
================
File: paelladoc/application/utils/behavior_enforcer.py
================
"""
Utility for enforcing behavior rules defined in tool configurations.
"""
import logging
from typing import Dict, Any, Set, Optional
# Assuming MCPContext structure or relevant parts are accessible
# from mcp.context import Context as MCPContext # Or use Any for now
logger = logging.getLogger(__name__)
class BehaviorViolationError(Exception):
"""Custom exception raised when a behavior rule is violated."""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
class BehaviorEnforcer:
"""Enforces conversational behavior based on tool config and context."""
@staticmethod
def enforce(
tool_name: str,
behavior_config: Optional[Dict[str, Any]],
ctx: Optional[Any], # Replace Any with actual MCPContext type if available
provided_args: Optional[Dict[str, Any]]
):
"""Checks current context and arguments against behavior rules.
Args:
tool_name: The name of the tool being called.
behavior_config: The BEHAVIOR_CONFIG dictionary for the tool.
ctx: The current MCP context object (expected to have ctx.progress).
provided_args: The arguments passed to the tool function in the current call.
Raises:
BehaviorViolationError: If a rule is violated.
"""
if not behavior_config:
logger.debug(f"No behavior config for tool '{tool_name}', skipping enforcement.")
return
if not ctx or not hasattr(ctx, 'progress') or not provided_args:
logger.warning(f"Behavior enforcement skipped for '{tool_name}': missing context or args.")
# Decide if this should be an error or just skipped
return
# --- Enforce fixed_question_order ---
if "fixed_question_order" in behavior_config:
sequence = behavior_config["fixed_question_order"]
if not isinstance(sequence, list):
logger.warning(f"Invalid 'fixed_question_order' in config for {tool_name}. Skipping check.")
return
# Assume ctx.progress['collected_params'] holds previously gathered arguments
collected_params: Set[str] = ctx.progress.get("collected_params", set())
# Identify arguments provided in *this* specific call (non-None values)
current_call_args = {k for k, v in provided_args.items() if v is not None}
# Identify which of the currently provided args are *new* (not already collected)
newly_provided_params = current_call_args - collected_params
if not newly_provided_params:
# No *new* parameters were provided in this call.
# This might be okay if just confirming or if sequence is done.
# Or maybe it should error if the sequence is *not* done?
# For now, allow proceeding. Behavior could be refined.
logger.debug(f"Tool '{tool_name}': No new parameters provided, sequence check passes by default.")
return
# Find the first parameter in the defined sequence that hasn't been collected yet
expected_next_param = None
for param in sequence:
if param not in collected_params:
expected_next_param = param
break
if expected_next_param is None:
# The defined sequence is complete.
# Should we allow providing *other* (optional?) parameters now?
# If strict_parameter_sequence is True, maybe disallow?
# For now, allow extra parameters after the main sequence.
logger.debug(f"Tool '{tool_name}': Sequence complete, allowing provided args: {newly_provided_params}")
return
# --- Enforce one_parameter_at_a_time (implicitly for sequence) ---
# Check if exactly one *new* parameter was provided and if it's the expected one.
if len(newly_provided_params) > 1:
raise BehaviorViolationError(
f"Tool '{tool_name}' expects parameters sequentially. "
f"Expected next: '{expected_next_param}'. "
f"Provided multiple new parameters: {newly_provided_params}. "
f"Collected so far: {collected_params}."
)
provided_param = list(newly_provided_params)[0]
if provided_param != expected_next_param:
raise BehaviorViolationError(
f"Tool '{tool_name}' expects parameters sequentially. "
f"Expected next: '{expected_next_param}'. "
f"Got unexpected new parameter: '{provided_param}'. "
f"Collected so far: {collected_params}."
)
# If we reach here, exactly one new parameter was provided and it was the expected one.
logger.debug(f"Tool '{tool_name}': Correct sequential parameter '{provided_param}' provided.")
# --- Add other rule checks here as needed ---
# e.g., max_questions_per_message (more complex, needs turn context)
# e.g., documentation_first (likely better as separate middleware/check)
# If all checks pass
return
================
File: paelladoc/application/services/vector_store_service.py
================
import logging
from typing import List, Dict, Any, Optional
# Ports and SearchResult
from paelladoc.ports.output.vector_store_port import VectorStorePort, SearchResult
logger = logging.getLogger(__name__)
class VectorStoreService:
"""Application service for interacting with the vector store.
Uses the VectorStorePort to abstract the underlying vector database.
"""
def __init__(self, vector_store_port: VectorStorePort):
"""Initializes the service with a VectorStorePort implementation."""
self.vector_store_port = vector_store_port
logger.info(f"VectorStoreService initialized with port: {type(vector_store_port).__name__}")
async def add_texts_to_collection(
self,
collection_name: str,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> List[str]:
"""Adds text documents to a specific collection."""
logger.debug(f"Service: Adding {len(documents)} documents to vector store collection '{collection_name}'")
try:
added_ids = await self.vector_store_port.add_documents(
collection_name=collection_name,
documents=documents,
metadatas=metadatas,
ids=ids
)
logger.info(f"Service: Successfully added documents to collection '{collection_name}' with IDs: {added_ids}")
return added_ids
except Exception as e:
logger.error(f"Service: Error adding documents to collection '{collection_name}': {e}", exc_info=True)
# Re-raise or handle specific exceptions as needed
raise
async def find_similar_texts(
self,
collection_name: str,
query_texts: List[str],
n_results: int = 5,
filter_metadata: Optional[Dict[str, Any]] = None,
filter_document: Optional[Dict[str, Any]] = None
) -> List[List[SearchResult]]:
"""Finds documents similar to the query texts within a collection."""
logger.debug(f"Service: Searching collection '{collection_name}' for texts similar to: {query_texts} (n={n_results})")
try:
results = await self.vector_store_port.search_similar(
collection_name=collection_name,
query_texts=query_texts,
n_results=n_results,
where=filter_metadata, # Pass filters to the port
where_document=filter_document,
# Include common fields by default
include=["metadatas", "documents", "distances", "ids"]
)
logger.info(f"Service: Found {sum(len(r) for r in results)} potential results for {len(query_texts)} queries in '{collection_name}'.")
return results
except Exception as e:
logger.error(f"Service: Error searching collection '{collection_name}': {e}", exc_info=True)
# Re-raise or handle specific exceptions as needed
raise
async def ensure_collection_exists(self, collection_name: str):
"""Ensures a collection exists, creating it if necessary."""
logger.debug(f"Service: Ensuring collection '{collection_name}' exists.")
try:
await self.vector_store_port.get_or_create_collection(collection_name)
logger.info(f"Service: Collection '{collection_name}' checked/created.")
except Exception as e:
logger.error(f"Service: Error ensuring collection '{collection_name}' exists: {e}", exc_info=True)
raise
async def remove_collection(self, collection_name: str):
"""Removes a collection entirely."""
logger.debug(f"Service: Attempting to remove collection '{collection_name}'.")
try:
await self.vector_store_port.delete_collection(collection_name)
logger.info(f"Service: Collection '{collection_name}' removed.")
except Exception as e:
logger.error(f"Service: Error removing collection '{collection_name}': {e}", exc_info=True)
raise
================
File: paelladoc/application/services/memory_service.py
================
import logging
from typing import Optional, Dict, Any, List, Callable, Awaitable
# Domain Models
from paelladoc.domain.models.project import (
ProjectMemory,
DocumentStatus,
ArtifactMeta,
Bucket,
)
# Ports
from paelladoc.ports.output.memory_port import MemoryPort
logger = logging.getLogger(__name__)
# Type definition for event handlers
EventHandler = Callable[[str, Dict[str, Any]], Awaitable[None]]
class MemoryService:
"""Application service for managing project memory operations.
Uses the MemoryPort to interact with the persistence layer.
"""
def __init__(self, memory_port: MemoryPort):
"""Initializes the service with a MemoryPort implementation."""
self.memory_port = memory_port
self._event_handlers: Dict[str, List[EventHandler]] = {}
logger.info(
f"MemoryService initialized with port: {type(memory_port).__name__}"
)
# --- Event System ---
async def _emit_event(self, event_name: str, event_data: Dict[str, Any]) -> None:
"""Emits an event to all registered handlers for that event type.
Args:
event_name: The name of the event (e.g., 'artifact_created')
event_data: Dictionary with event data
"""
if event_name not in self._event_handlers:
logger.debug(f"No handlers registered for event: {event_name}")
return
handlers = self._event_handlers[event_name]
logger.debug(f"Emitting event '{event_name}' to {len(handlers)} handlers")
for handler in handlers:
try:
await handler(event_name, event_data)
except Exception as e:
logger.error(
f"Error in event handler for '{event_name}': {e}", exc_info=True
)
def register_event_handler(self, event_name: str, handler: EventHandler) -> None:
"""Registers a handler function for a specific event type.
Args:
event_name: The event name to listen for
handler: Async function that will be called when the event occurs
"""
if event_name not in self._event_handlers:
self._event_handlers[event_name] = []
self._event_handlers[event_name].append(handler)
logger.debug(f"Registered handler for event: {event_name}")
def unregister_event_handler(self, event_name: str, handler: EventHandler) -> bool:
"""Unregisters a handler function for a specific event type.
Args:
event_name: The event name
handler: The handler function to remove
Returns:
True if the handler was removed, False if not found
"""
if event_name not in self._event_handlers:
return False
handlers = self._event_handlers[event_name]
if handler in handlers:
handlers.remove(handler)
logger.debug(f"Unregistered handler for event: {event_name}")
return True
return False
# --- Memory Service Methods ---
async def get_project_memory(self, project_name: str) -> Optional[ProjectMemory]:
"""Retrieves the memory for a specific project."""
logger.debug(f"Service: Attempting to get memory for project '{project_name}'")
memory = await self.memory_port.load_memory(project_name)
if memory:
await self._emit_event(
"memory_loaded",
{
"project_name": project_name,
"memory_id": str(memory.project_info.name),
"timestamp": memory.last_updated_at.isoformat()
if memory.last_updated_at
else None,
},
)
return memory
async def check_project_exists(self, project_name: str) -> bool:
"""Checks if a project memory already exists."""
logger.debug(f"Service: Checking existence for project '{project_name}'")
return await self.memory_port.project_exists(project_name)
async def create_project_memory(self, memory: ProjectMemory) -> ProjectMemory:
"""Creates a new project memory entry.
Raises:
ValueError: If a project with the same name already exists.
"""
project_name = memory.project_info.name
logger.debug(
f"Service: Attempting to create memory for project '{project_name}'"
)
exists = await self.check_project_exists(project_name)
if exists:
logger.error(f"Cannot create project '{project_name}': already exists.")
raise ValueError(f"Project memory for '{project_name}' already exists.")
await self.memory_port.save_memory(memory)
logger.info(
f"Service: Successfully created memory for project '{project_name}'"
)
# Emit project_created event
await self._emit_event(
"project_created",
{
"project_name": project_name,
"base_path": str(memory.project_info.base_path)
if memory.project_info.base_path
else None,
"timestamp": memory.created_at.isoformat()
if memory.created_at
else None,
"project_info_details": {
k: v
for k, v in memory.project_info.dict().items()
if k not in ["name", "base_path"] and v is not None
},
},
)
# Emit taxonomy event if taxonomy fields were provided
if (
memory.platform_taxonomy
or memory.domain_taxonomy
or memory.size_taxonomy
or memory.compliance_taxonomy
or memory.custom_taxonomy
):
await self._emit_event(
"taxonomy_updated",
{
"project_name": project_name,
"new_taxonomy": {
"platform": memory.platform_taxonomy,
"domain": memory.domain_taxonomy,
"size": memory.size_taxonomy,
"compliance": memory.compliance_taxonomy,
"custom": memory.custom_taxonomy,
},
"old_taxonomy": None, # First time setting it
},
)
# Emit artifact_created events for initial artifacts
for bucket, artifacts in memory.artifacts.items():
for artifact in artifacts:
await self._emit_event(
"artifact_created",
{
"project_name": project_name,
"artifact_id": str(artifact.id),
"artifact_name": artifact.name,
"bucket": bucket.value,
"path": str(artifact.path),
"status": artifact.status.value,
"timestamp": artifact.created_at.isoformat()
if artifact.created_at
else None,
"created_by": artifact.created_by,
},
)
return memory # Return the saved object (could also reload it)
async def update_project_memory(self, memory: ProjectMemory) -> ProjectMemory:
"""Updates an existing project memory entry.
Raises:
ValueError: If the project does not exist.
"""
project_name = memory.project_info.name
logger.debug(
f"Service: Attempting to update memory for project '{project_name}'"
)
# Ensure the project exists before attempting an update
# Note: save_memory itself handles the create/update logic, but this check
# makes the service layer's intent clearer and prevents accidental creation.
exists = await self.check_project_exists(project_name)
if not exists:
logger.error(f"Cannot update project '{project_name}': does not exist.")
raise ValueError(
f"Project memory for '{project_name}' does not exist. Use create_project_memory first."
)
# Get the old memory to compare changes
old_memory = await self.memory_port.load_memory(project_name)
# Save the updated memory
await self.memory_port.save_memory(memory)
logger.info(
f"Service: Successfully updated memory for project '{project_name}'"
)
# Emit project_updated event
await self._emit_event(
"project_updated",
{
"project_name": project_name,
"timestamp": memory.last_updated_at.isoformat()
if memory.last_updated_at
else None,
},
)
# Check if taxonomy fields changed
if old_memory and (
memory.platform_taxonomy != old_memory.platform_taxonomy
or memory.domain_taxonomy != old_memory.domain_taxonomy
or memory.size_taxonomy != old_memory.size_taxonomy
or memory.compliance_taxonomy != old_memory.compliance_taxonomy
):
await self._emit_event(
"taxonomy_updated",
{
"project_name": project_name,
"timestamp": memory.last_updated_at.isoformat()
if memory.last_updated_at
else None,
"new_taxonomy": {
"platform": memory.platform_taxonomy,
"domain": memory.domain_taxonomy,
"size": memory.size_taxonomy,
"compliance": memory.compliance_taxonomy,
},
"old_taxonomy": {
"platform": old_memory.platform_taxonomy,
"domain": old_memory.domain_taxonomy,
"size": old_memory.size_taxonomy,
"compliance": old_memory.compliance_taxonomy,
},
},
)
# Check for new or updated artifacts
if old_memory:
# Track artifacts by ID to detect changes
for bucket, artifacts in memory.artifacts.items():
# Skip empty buckets
if not artifacts:
continue
old_bucket_artifacts = old_memory.artifacts.get(bucket, [])
old_artifact_ids = {str(a.id): a for a in old_bucket_artifacts}
# Check each artifact in the new memory
for artifact in artifacts:
artifact_id = str(artifact.id)
# If artifact didn't exist before, it's new
if artifact_id not in old_artifact_ids:
await self._emit_event(
"artifact_created",
{
"project_name": project_name,
"artifact_id": artifact_id,
"artifact_name": artifact.name,
"bucket": bucket.value,
"path": str(artifact.path),
"status": artifact.status.value,
"timestamp": artifact.created_at.isoformat()
if artifact.created_at
else None,
"created_by": artifact.created_by,
},
)
else:
# If artifact existed, check if it was updated
old_artifact = old_artifact_ids[artifact_id]
if (
artifact.status != old_artifact.status
or artifact.updated_at != old_artifact.updated_at
):
await self._emit_event(
"artifact_updated",
{
"project_name": project_name,
"artifact_id": artifact_id,
"artifact_name": artifact.name,
"bucket": bucket.value,
"path": str(artifact.path),
"old_status": old_artifact.status.value,
"new_status": artifact.status.value,
"timestamp": artifact.updated_at.isoformat()
if artifact.updated_at
else None,
"modified_by": artifact.modified_by,
},
)
return memory # Return the updated object
# Example of a more specific use case method:
async def update_document_status_in_memory(
self, project_name: str, document_name: str, new_status: DocumentStatus
) -> Optional[ProjectMemory]:
"""Updates the status of a specific document within a project's memory."""
logger.debug(
f"Service: Updating status for document '{document_name}' in project '{project_name}' to {new_status}"
)
memory = await self.get_project_memory(project_name)
if not memory:
logger.warning(
f"Project '{project_name}' not found, cannot update document status."
)
return None
if document_name not in memory.documents:
logger.warning(
f"Document '{document_name}' not found in project '{project_name}', cannot update status."
)
# Or should we raise an error?
return memory # Return unchanged memory?
# Get old status for event
old_status = memory.documents[document_name].status
# Update status
memory.update_document_status(
document_name, new_status
) # Use domain model method
# Save the updated memory
await self.memory_port.save_memory(memory)
logger.info(
f"Service: Saved updated status for document '{document_name}' in project '{project_name}'"
)
# Emit document_status_changed event
await self._emit_event(
"document_status_changed",
{
"project_name": project_name,
"document_name": document_name,
"old_status": old_status.value,
"new_status": new_status.value,
"timestamp": memory.last_updated_at.isoformat()
if memory.last_updated_at
else None,
},
)
return memory
async def add_artifact(
self, project_name: str, artifact: ArtifactMeta, author: Optional[str] = None
) -> Optional[ProjectMemory]:
"""Adds a new artifact to a project's memory.
Args:
project_name: The name of the project
artifact: The artifact to add
author: Optional name of the author creating the artifact
Returns:
The updated project memory, or None if project not found
"""
logger.debug(
f"Service: Adding artifact '{artifact.name}' to project '{project_name}'"
)
# Set author if provided
if author and not artifact.created_by:
artifact.created_by = author
artifact.modified_by = author
memory = await self.get_project_memory(project_name)
if not memory:
logger.warning(f"Project '{project_name}' not found, cannot add artifact.")
return None
# Add the artifact
added = memory.add_artifact(artifact)
if not added:
logger.warning(
f"Artifact with path '{artifact.path}' already exists in project '{project_name}'"
)
return memory
# Save the updated memory
await self.memory_port.save_memory(memory)
logger.info(
f"Service: Saved new artifact '{artifact.name}' in project '{project_name}'"
)
# Emit artifact_created event
await self._emit_event(
"artifact_created",
{
"project_name": project_name,
"artifact_id": str(artifact.id),
"artifact_name": artifact.name,
"bucket": artifact.bucket.value,
"path": str(artifact.path),
"status": artifact.status.value,
"timestamp": artifact.created_at.isoformat()
if artifact.created_at
else None,
"created_by": artifact.created_by,
},
)
return memory
async def update_artifact_status(
self,
project_name: str,
bucket: Bucket,
artifact_name: str,
new_status: DocumentStatus,
modifier: Optional[str] = None,
) -> Optional[ProjectMemory]:
"""Updates the status of a specific artifact within a project's memory.
Args:
project_name: The name of the project
bucket: The bucket containing the artifact
artifact_name: The name of the artifact to update
new_status: The new status to set
modifier: Optional name of the person making the change
Returns:
The updated project memory, or None if project not found
"""
logger.debug(
f"Service: Updating status for artifact '{artifact_name}' in project '{project_name}' to {new_status}"
)
memory = await self.get_project_memory(project_name)
if not memory:
logger.warning(
f"Project '{project_name}' not found, cannot update artifact status."
)
return None
# Get the artifact to check its current status
artifact = memory.get_artifact(bucket, artifact_name)
if not artifact:
logger.warning(
f"Artifact '{artifact_name}' not found in bucket '{bucket.value}' for project '{project_name}'"
)
return memory
old_status = artifact.status
# Update the artifact status
updated = memory.update_artifact_status(
bucket, artifact_name, new_status, modifier
)
if not updated:
logger.warning(
f"Failed to update status for artifact '{artifact_name}' in project '{project_name}'"
)
return memory
# Save the updated memory
await self.memory_port.save_memory(memory)
logger.info(
f"Service: Saved updated status for artifact '{artifact_name}' in project '{project_name}'"
)
# Emit artifact_updated event
await self._emit_event(
"artifact_updated",
{
"project_name": project_name,
"artifact_id": str(artifact.id),
"artifact_name": artifact_name,
"bucket": bucket.value,
"old_status": old_status.value,
"new_status": new_status.value,
"timestamp": artifact.updated_at.isoformat()
if artifact.updated_at
else None,
"modified_by": modifier or artifact.modified_by,
},
)
return memory
================
File: paelladoc/ports/input/mcp_port.py
================
from abc import ABC, abstractmethod
from typing import Any, Dict
class MCPPort(ABC):
"""Input port for MCP (Model-Command-Process) operations."""
@abstractmethod
def process_command(self, command: str, args: Dict[str, Any]) -> Dict[str, Any]:
"""Process an MCP command with its arguments."""
pass
@abstractmethod
def register_plugin(self, plugin: Any) -> None:
"""Register a new plugin."""
pass
================
File: paelladoc/ports/input/mcp_server_adapter.py
================
#!/usr/bin/env python3
"""
PAELLADOC MCP Server entry point (Input Adapter).
Relies on paelladoc_core.py (now core_logic.py in domain) for MCP functionality and FastMCP instance.
Simply runs the imported MCP instance.
Adds server-specific resources and prompts using decorators.
"""
import sys
import logging
from pathlib import Path
import time # Add time import
# Import TextContent for prompt definition
from mcp.types import TextContent # Assuming mcp is installed in .venv
# Import the core FastMCP instance and logger from the domain layer
from paelladoc.domain.core_logic import mcp, logger # Corrected import path
# --- Import plugin packages to trigger their __init__.py dynamic loading --- #
# This ensures decorators within the package modules are executed when the server starts
# Import core plugins package
# This will execute plugins/core/__init__.py which dynamically loads modules like paella.py
# We might need other plugin packages later, e.g.:
# from paelladoc.adapters.plugins import code_analysis
# from paelladoc.adapters.plugins import product_management
# --- Add specific tools/resources/prompts for this entry point using decorators --- #
# These are defined directly in this adapter file and might be deprecated later
@mcp.resource("docs://readme") # Use decorator
def get_readme() -> str:
"""Get the project README content."""
try:
# Assuming README.md is in the project root (cwd)
readme_path = Path("README.md")
if readme_path.exists():
return readme_path.read_text()
else:
logger.warning("README.md not found in project root.")
return "README.md not found" # Keep simple return for resource
except Exception as e:
logger.error(f"Error reading README.md: {e}", exc_info=True)
return f"Error reading README.md: {str(e)}"
@mcp.resource("docs://templates/{template_name}") # Use decorator
def get_template(template_name: str) -> str:
"""Get a documentation template."""
# Corrected path relative to src directory
base_path = Path(__file__).parent.parent.parent.parent # Should point to src/
template_path = (
base_path
/ "paelladoc"
/ "adapters"
/ "plugins"
/ "templates"
/ f"{template_name}.md"
)
try:
if template_path.exists():
return template_path.read_text()
else:
logger.warning(f"Template {template_name} not found at {template_path}")
return f"Error: Template {template_name} not found"
except Exception as e:
logger.error(f"Error reading template {template_name}: {e}", exc_info=True)
return f"Error reading template {template_name}: {str(e)}"
@mcp.prompt() # Use decorator
def paella_command(project_name: str) -> TextContent:
"""Create a PAELLA command prompt."""
return TextContent(
type="text",
text=f"Initiating PAELLADOC for project: {project_name}.\n"
f"Please specify: 1. Project type, 2. Methodologies, 3. Git workflow.",
)
# --- Main Execution Logic --- #
if __name__ == "__main__":
# Configure file logging
try:
log_file = "paelladoc_server.log"
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logging.getLogger().addHandler(file_handler)
logging.getLogger().setLevel(logging.DEBUG)
logger.info(f"Logging configured. Outputting to {log_file}")
except Exception as log_e:
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Re-get logger after basicConfig potentially reconfigured root
logger = logging.getLogger(__name__)
logger.error(f"Could not configure file logging: {log_e}. Logging to stderr.")
# Check command line arguments to determine run mode
run_mode = (
"stdio" if "--stdio" in sys.argv else "web"
) # Default to stdio if --stdio present
try:
if run_mode == "stdio":
logger.info(
"Starting PAELLADOC MCP server in STDIO mode via FastMCP mcp.run(transport='stdio')..."
)
logger.debug("Waiting 10 seconds before mcp.run()...")
time.sleep(10) # Add sleep before run
logger.debug('Attempting mcp.run(transport="stdio")')
mcp.run(transport="stdio") # Explicitly request stdio transport
else:
# Attempt to run the default web server (SSE)
# Note: FastMCP's default run() might try stdio first anyway if no host/port specified
logger.warning(
"Starting PAELLADOC MCP server in default mode (likely web/SSE) via FastMCP mcp.run()..."
)
logger.warning("Use --stdio argument for direct client integration.")
mcp.run() # Run with default settings (tries SSE/web)
logger.info(f"PAELLADOC MCP server finished (mode: {run_mode}).")
except Exception as e:
logger.critical(f"Failed to start or run MCP server: {e}", exc_info=True)
sys.exit(1)
================
File: paelladoc/ports/output/taxonomy_provider.py
================
from abc import ABC, abstractmethod
from typing import Dict, List
class TaxonomyProvider(ABC):
"""Abstract interface for providing available taxonomy information."""
@abstractmethod
def get_available_taxonomies(self) -> Dict[str, List[str]]:
"""Returns a dictionary of available taxonomies grouped by category.
Example:
{
"platform": ["web-frontend", "ios-native", ...],
"domain": ["ecommerce", "ai-ml", ...],
...
}
"""
pass
================
File: paelladoc/ports/output/vector_store_port.py
================
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
class SearchResult(ABC):
"""Represents a single search result from the vector store."""
# Define common attributes for a search result
id: str
distance: Optional[float] = None
metadata: Optional[Dict[str, Any]] = None
document: Optional[str] = None
class VectorStorePort(ABC):
"""Output Port defining operations for a vector store."""
@abstractmethod
async def add_documents(
self,
collection_name: str,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> List[str]:
"""Adds documents (text) to a specific collection in the vector store.
Embeddings are typically generated automatically by the implementation.
Args:
collection_name: The name of the collection to add documents to.
documents: A list of text documents to add.
metadatas: Optional list of metadata dictionaries corresponding to each document.
ids: Optional list of unique IDs for each document.
Returns:
A list of IDs for the added documents.
"""
pass
@abstractmethod
async def search_similar(
self,
collection_name: str,
query_texts: List[str],
n_results: int = 5,
where: Optional[Dict[str, Any]] = None,
where_document: Optional[Dict[str, Any]] = None,
include: Optional[List[str]] = ["metadatas", "documents", "distances"]
) -> List[List[SearchResult]]:
"""Searches for documents in a collection similar to the query texts.
Args:
collection_name: The name of the collection to search within.
query_texts: A list of query texts to find similar documents for.
n_results: The maximum number of results to return for each query.
where: Optional filter criteria for metadata.
where_document: Optional filter criteria for document content.
include: Optional list specifying what data to include in results.
Returns:
A list of lists of SearchResult objects, one list per query text.
"""
pass
@abstractmethod
async def get_or_create_collection(self, collection_name: str) -> Any:
"""Gets or creates a collection in the vector store.
The return type is Any for now, as it depends on the specific library's
collection object representation (e.g., Chroma's Collection).
Args:
collection_name: The name of the collection.
Returns:
The collection object.
"""
pass
@abstractmethod
async def delete_collection(self, collection_name: str) -> None:
"""Deletes a collection from the vector store.
Args:
collection_name: The name of the collection to delete.
"""
pass
# Add other potential methods like:
# async def delete_documents(self, collection_name: str, ids: List[str]) -> None: ...
# async def update_documents(...) -> None: ...
================
File: paelladoc/ports/output/memory_port.py
================
from abc import ABC, abstractmethod
from typing import Optional, List
# Import the domain model it needs to interact with
from paelladoc.domain.models.project import ProjectMemory
class MemoryPort(ABC):
"""Output Port defining operations for project memory persistence."""
@abstractmethod
async def save_memory(self, memory: ProjectMemory) -> None:
"""Saves the entire project memory state.
Args:
memory: The ProjectMemory object to save.
"""
pass
@abstractmethod
async def load_memory(self, project_name: str) -> Optional[ProjectMemory]:
"""Loads the project memory state for a given project name.
Args:
project_name: The unique name of the project to load.
Returns:
The ProjectMemory object if found, otherwise None.
"""
pass
@abstractmethod
async def project_exists(self, project_name: str) -> bool:
"""Checks if a project memory exists for the given name.
Args:
project_name: The unique name of the project to check.
Returns:
True if the project memory exists, False otherwise.
"""
pass
@abstractmethod
async def list_projects(self) -> List[str]:
"""Lists the names of all existing projects.
Returns:
A list of project names as strings. Returns an empty list if no projects exist.
"""
pass
# Potentially add other methods later if needed, e.g., delete_memory
================
File: paelladoc/domain/core_logic.py
================
"""
Core PAELLADOC MCP Logic.
Handles MCP instance creation, plugin loading, and base tool registration.
Uses FastMCP for compatibility with decorators.
"""
import logging
from mcp.server.fastmcp import FastMCP # Use FastMCP
from typing import Dict, Any
# Configure base logger (handlers will be added by server.py)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create the MCP server instance using FastMCP
mcp = FastMCP("PAELLADOC")
# --- Register Tools/Prompts --- #
# Import plugins dynamically to register tools/prompts
try:
# Import from the new adapters location
import paelladoc.adapters.plugins
logger.info("Successfully loaded plugins from paelladoc.adapters.plugins")
except ImportError as e:
# Log as warning, server might still be usable with base tools
logger.warning(f"Could not import plugins from paelladoc.adapters.plugins: {e}")
except Exception as e:
# Log as error for unexpected issues during import
logger.error(f"An unexpected error occurred during plugin import: {e}", exc_info=True)
@mcp.tool() # Use decorator again
def ping(random_string: str = "") -> Dict[str, Any]:
"""
Basic health check; returns pong.
Args:
random_string (str, optional): Dummy parameter for no-parameter tools
Returns:
Dict[str, Any]: Response with status and message
"""
logger.debug(f"Ping tool called with parameter: {random_string}")
return {
"status": "ok",
"message": "pong"
}
# Tools will be registered here by plugins
# Note: No `if __name__ == "__main__":` block here.
# This file is intended to be imported by the entry point (server.py).
================
File: paelladoc/domain/models/enums.py
================
from enum import Enum
from typing import Set
class DocumentStatus(str, Enum):
"""Status of a document in the project memory"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
ARCHIVED = "archived"
class Bucket(str, Enum):
"""MECE taxonomy buckets for categorizing artifacts"""
# Initiate categories
INITIATE_CORE_SETUP = "Initiate::CoreSetup"
INITIATE_INITIAL_PRODUCT_DOCS = "Initiate::InitialProductDocs"
# Elaborate categories
ELABORATE_DISCOVERY_AND_RESEARCH = "Elaborate::DiscoveryAndResearch"
ELABORATE_IDEATION_AND_DESIGN = "Elaborate::IdeationAndDesign"
ELABORATE_SPECIFICATION_AND_PLANNING = "Elaborate::SpecificationAndPlanning"
ELABORATE_CORE_AND_SUPPORT = "Elaborate::CoreAndSupport"
# Govern categories
GOVERN_CORE_SYSTEM = "Govern::CoreSystem"
GOVERN_STANDARDS_METHODOLOGIES = "Govern::StandardsMethodologies"
GOVERN_VERIFICATION_VALIDATION = "Govern::VerificationValidation"
GOVERN_MEMORY_TEMPLATES = "Govern::MemoryTemplates"
GOVERN_TOOLING_SCRIPTS = "Govern::ToolingScripts"
# Generate categories
GENERATE_CORE_FUNCTIONALITY = "Generate::CoreFunctionality"
GENERATE_SUPPORTING_ELEMENTS = "Generate::SupportingElements"
# Maintain categories
MAINTAIN_CORE_FUNCTIONALITY = "Maintain::CoreFunctionality"
MAINTAIN_SUPPORTING_ELEMENTS = "Maintain::SupportingElements"
# Deploy categories
DEPLOY_PIPELINES_AND_AUTOMATION = "Deploy::PipelinesAndAutomation"
DEPLOY_INFRASTRUCTURE_AND_CONFIG = "Deploy::InfrastructureAndConfig"
DEPLOY_GUIDES_AND_CHECKLISTS = "Deploy::GuidesAndChecklists"
DEPLOY_SECURITY = "Deploy::Security"
# Operate categories
OPERATE_RUNBOOKS_AND_SOPS = "Operate::RunbooksAndSOPs"
OPERATE_MONITORING_AND_ALERTING = "Operate::MonitoringAndAlerting"
OPERATE_MAINTENANCE = "Operate::Maintenance"
# Iterate categories
ITERATE_LEARNING_AND_ANALYSIS = "Iterate::LearningAndAnalysis"
ITERATE_PLANNING_AND_RETROSPECTION = "Iterate::PlanningAndRetrospection"
# Special bucket for artifacts not matching any pattern
UNKNOWN = "Unknown"
@classmethod
def get_phase_buckets(cls, phase: str) -> Set["Bucket"]:
"""Get all buckets belonging to a specific phase"""
return {bucket for bucket in cls if bucket.value.startswith(f"{phase}::")}
================
File: paelladoc/domain/models/fix_metadata.py
================
import re
# Leer el archivo
with open("project.py", "r") as f:
content = f.read()
# Cambiar nombre de clase Metadata a ProjectInfo para evitar palabras reservadas
content = re.sub(
r"class ProjectMetadata\(BaseModel\):", "class ProjectInfo(BaseModel):", content
)
# Cambiar referencias a project_metadata para usar project_info
content = re.sub(
r"project_metadata: ProjectMetadata", "project_info: ProjectInfo", content
)
# Asegurarse de usar project_info en lugar de metadata en todo el código
content = re.sub(r"memory\.metadata\.", "memory.project_info.", content)
content = re.sub(r"self\.metadata\.", "self.project_info.", content)
# Corregir timestamps utcnow()
content = content.replace(
"datetime.utcnow()", "datetime.datetime.now(datetime.timezone.utc)"
)
# Guardar el archivo modificado
with open("project.py", "w") as f:
f.write(content)
print("Modificación completada correctamente")
================
File: paelladoc/domain/models/language.py
================
"""Language model for PAELLADOC.
This module defines the supported languages and their metadata.
Following BCP 47 language tags (e.g., en-US, es-ES).
"""
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
@dataclass
class Language:
"""Represents a supported language with its code and name."""
code: str
name: str
native_name: str = ""
class LanguageService:
"""Service for managing supported languages."""
# Core supported languages (minimal set for initial implementation)
SUPPORTED_LANGUAGES: Dict[str, Language] = {
"es-ES": Language("es-ES", "Spanish (Spain)", "Español (España)"),
"en-US": Language("en-US", "English (US)", "English (US)"),
}
@classmethod
def get_language(cls, code: str) -> Language:
"""Get language by code."""
return cls.SUPPORTED_LANGUAGES.get(code, Language(code, code, code))
@classmethod
def get_all_languages(cls) -> List[Language]:
"""Get all supported languages."""
return list(cls.SUPPORTED_LANGUAGES.values())
@classmethod
def is_supported(cls, code: str) -> bool:
"""Check if a language code is supported."""
return code in cls.SUPPORTED_LANGUAGES
class SupportedLanguage(str, Enum):
"""
Supported languages for PAELLADOC interaction and documentation.
Uses standard language codes (e.g., en-US, es-ES).
"""
EN_US = "en-US" # English (US)
ES_ES = "es-ES" # Spanish (Spain)
@classmethod
def from_code(cls, code: str) -> "SupportedLanguage":
"""Convert a language code to a SupportedLanguage enum."""
code = code.lower()
if code in ["en", "en-us"]:
return cls.EN_US
elif code in ["es", "es-es"]:
return cls.ES_ES
raise ValueError(f"Unsupported language code: {code}")
def __str__(self) -> str:
"""Return the language code as a string."""
return self.value
================
File: paelladoc/domain/models/project.py
================
from typing import List, Dict, Optional, Any
from pydantic import BaseModel, Field
import datetime
from pathlib import Path
import uuid
from .enums import DocumentStatus, Bucket
from ..services.time_service import TimeService
import logging
logger = logging.getLogger(__name__)
# Singleton instance of the time service
# This will be injected by the application layer
time_service: TimeService = None
def set_time_service(service: TimeService):
"""Set the time service instance to be used by the domain models."""
global time_service
time_service = service
class ProjectDocument(BaseModel):
name: str # e.g., "README.md", "CONTRIBUTING.md"
template_origin: Optional[str] = None # Path or identifier of the template used
status: DocumentStatus = DocumentStatus.PENDING
class ProjectInfo(BaseModel):
name: str = Field(..., min_length=1)
language: Optional[str] = None
purpose: Optional[str] = None
target_audience: Optional[str] = None
objectives: Optional[List[str]] = Field(default_factory=list)
base_path: Optional[Path] = None
interaction_language: Optional[str] = None
documentation_language: Optional[str] = None
taxonomy_version: str = "1.0" # Default or loaded?
# Add the new taxonomy fields here as well for the domain model
platform_taxonomy: str
domain_taxonomy: str
size_taxonomy: str
compliance_taxonomy: str # Consider if this one could truly be optional sometimes?
custom_taxonomy: Optional[Dict[str, Any]] = Field(default_factory=dict)
taxonomy_validation: Optional[Dict[str, Any]] = Field(default_factory=dict)
model_config = {"arbitrary_types_allowed": True, "validate_assignment": True}
class ArtifactMeta(BaseModel):
"""Metadata for an artifact categorized according to the MECE taxonomy"""
id: uuid.UUID = Field(default_factory=uuid.uuid4)
name: str
bucket: Bucket
path: Path # Relative path from project root
created_at: datetime.datetime = None
updated_at: datetime.datetime = None
created_by: Optional[str] = None
modified_by: Optional[str] = None
status: DocumentStatus = DocumentStatus.PENDING
def __init__(self, **data: Any):
super().__init__(**data)
if not time_service:
raise RuntimeError("TimeService not initialized")
now = time_service.get_current_time()
if self.created_at is None:
self.created_at = now
if self.updated_at is None:
self.updated_at = now
if self.created_by is not None and self.modified_by is None:
self.modified_by = self.created_by
def update_timestamp(self, modifier: Optional[str] = None):
if not time_service:
raise RuntimeError("TimeService not initialized")
self.updated_at = time_service.get_current_time()
if modifier:
self.modified_by = modifier
def update_status(self, status: DocumentStatus, modifier: Optional[str] = None):
self.status = status
self.update_timestamp(modifier=modifier)
class ProjectMemory(BaseModel):
project_info: ProjectInfo
documents: Dict[str, ProjectDocument] = {} # Dict key is document name/path
# New taxonomy-based structure
taxonomy_version: str = "0.5"
artifacts: Dict[Bucket, List[ArtifactMeta]] = Field(
default_factory=lambda: {bucket: [] for bucket in Bucket}
)
# Consider adding: achievements, issues, decisions later?
created_at: datetime.datetime = None
last_updated_at: datetime.datetime = None
created_by: Optional[str] = None
modified_by: Optional[str] = None
# Add the new taxonomy fields here directly if they belong to the ProjectMemory state
# Or ensure they are loaded/accessed via metadata if that's the design
# Let's add them directly for consistency with the DB model and tests
platform_taxonomy: str
domain_taxonomy: str
size_taxonomy: str
compliance_taxonomy: str # Consider if this one could truly be optional sometimes?
custom_taxonomy: Optional[Dict[str, Any]] = Field(default_factory=dict)
taxonomy_validation: Optional[Dict[str, Any]] = Field(default_factory=dict)
def __init__(self, **data):
# Rename metadata to project_metadata if needed for backward compatibility
if "metadata" in data and "project_metadata" not in data:
data["project_metadata"] = data.pop("metadata")
super().__init__(**data)
if not time_service:
raise RuntimeError("TimeService not initialized")
if self.created_at is None:
self.created_at = time_service.get_current_time()
if self.last_updated_at is None:
self.last_updated_at = time_service.get_current_time()
def update_timestamp(self):
if not time_service:
raise RuntimeError("TimeService not initialized")
self.last_updated_at = time_service.get_current_time()
def get_document(self, name: str) -> Optional[ProjectDocument]:
return self.documents.get(name)
def update_document_status(self, name: str, status: DocumentStatus):
doc = self.get_document(name)
if doc:
doc.status = status
self.update_timestamp()
else:
# TODO: Decide error handling (log or raise?)
# For now, just pass
# Consider logging: logger.warning(
# f"Attempted to update status for non-existent doc: {name}"
# )
pass
def add_document(self, doc: ProjectDocument):
if doc.name not in self.documents:
self.documents[doc.name] = doc
self.update_timestamp()
else:
# TODO: Decide error handling (log or raise?)
# For now, just pass
# Consider logging: logger.warning(
# f"Attempted to add duplicate document: {doc.name}"
# )
pass
# New methods for artifact management
def get_artifact(self, bucket: Bucket, name: str) -> Optional[ArtifactMeta]:
"""Get an artifact by bucket and name"""
for artifact in self.artifacts.get(bucket, []):
if artifact.name == name:
return artifact
return None
def get_artifact_by_path(self, path: Path) -> Optional[ArtifactMeta]:
"""Get an artifact by path, searching across all buckets"""
path_str = str(path)
for bucket_artifacts in self.artifacts.values():
for artifact in bucket_artifacts:
if str(artifact.path) == path_str:
return artifact
return None
def add_artifact(self, artifact: ArtifactMeta) -> bool:
"""Adds an artifact to the correct bucket, checking for path duplicates."""
# Check if artifact with the same path already exists in any bucket
for bucket_artifacts in self.artifacts.values():
for existing_artifact in bucket_artifacts:
if existing_artifact.path == artifact.path:
logger.warning(
f"Artifact with path {artifact.path} already exists."
)
return False # Indicate artifact was not added
if artifact.bucket not in self.artifacts:
self.artifacts[artifact.bucket] = []
self.artifacts[artifact.bucket].append(artifact)
self._update_timestamp()
return True # Indicate artifact was added
def update_artifact_status(
self,
bucket: Bucket,
artifact_name: str,
new_status: DocumentStatus,
modifier: Optional[str] = None,
) -> bool:
"""Updates the status of a specific artifact."""
artifact = self.get_artifact(bucket, artifact_name)
if artifact:
artifact.status = new_status
artifact.updated_at = datetime.datetime.now(datetime.timezone.utc)
if modifier:
artifact.modified_by = modifier
self._update_timestamp()
return True
return False
def _update_timestamp(self):
"""Updates the last_updated_at timestamp."""
self.last_updated_at = datetime.datetime.now(datetime.timezone.utc)
def get_bucket_completion(self, bucket: Bucket) -> dict:
"""Get completion stats for a bucket"""
artifacts = self.artifacts.get(bucket, [])
total = len(artifacts)
completed = sum(1 for a in artifacts if a.status == DocumentStatus.COMPLETED)
in_progress = sum(
1 for a in artifacts if a.status == DocumentStatus.IN_PROGRESS
)
pending = total - completed - in_progress
return {
"total": total,
"completed": completed,
"in_progress": in_progress,
"pending": pending,
"completion_percentage": (completed / total * 100) if total > 0 else 0,
}
def get_phase_completion(self, phase: str) -> dict:
"""Get completion stats for an entire phase"""
phase_buckets = Bucket.get_phase_buckets(phase)
total = 0
completed = 0
in_progress = 0
for bucket in phase_buckets:
stats = self.get_bucket_completion(bucket)
total += stats["total"]
completed += stats["completed"]
in_progress += stats["in_progress"]
pending = total - completed - in_progress
return {
"phase": phase,
"buckets": len(phase_buckets),
"total": total,
"completed": completed,
"in_progress": in_progress,
"pending": pending,
"completion_percentage": (completed / total * 100) if total > 0 else 0,
}
================
File: paelladoc/domain/services/time_service.py
================
"""Time service for domain timestamp handling."""
from abc import ABC, abstractmethod
import datetime
class TimeService(ABC):
"""Abstract base class for time operations in the domain."""
@abstractmethod
def get_current_time(self) -> datetime.datetime:
"""Get current timestamp in UTC.
Returns:
datetime.datetime: Current time in UTC.
"""
pass
@abstractmethod
def ensure_utc(self, dt: datetime.datetime) -> datetime.datetime:
"""Ensure a datetime is in UTC.
If the datetime has no timezone info, assumes it's in UTC.
Args:
dt: Datetime to convert
Returns:
datetime.datetime: UTC datetime with timezone info
"""
pass
================
File: tests/conftest.py
================
import pytest
from datetime import datetime, timezone, timedelta
from pathlib import Path
import sys
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Import TimeService components
from paelladoc.domain.services.time_service import TimeService
from paelladoc.domain.models.project import set_time_service
class MockTimeService(TimeService):
"""Mock time service for testing."""
def __init__(self, fixed_time=None):
"""Initialize with optional fixed time."""
self.fixed_time = fixed_time or datetime.now(timezone.utc)
self.call_count = 0
def get_current_time(self) -> datetime:
"""Get the mocked current time, incrementing by microseconds on each call."""
# Increment call count
self.call_count += 1
# Return fixed time plus microseconds based on call count to ensure
# timestamps are different when multiple calls happen in sequence
return self.fixed_time + timedelta(microseconds=self.call_count)
def ensure_utc(self, dt: datetime) -> datetime:
"""Ensure a datetime is in UTC."""
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
@pytest.fixture(scope="session", autouse=True)
def setup_time_service():
"""Set up the time service globally for all tests."""
# Using a fixed time for consistent testing
fixed_time = datetime(2025, 4, 20, 12, 0, 0, tzinfo=timezone.utc)
mock_service = MockTimeService(fixed_time)
set_time_service(mock_service)
return mock_service
================
File: tests/README.md
================
# MCP Server Tests
This directory contains tests for the Paelladoc MCP server following hexagonal architecture principles. Tests are organized into three main categories:
## Test Structure
```
tests/
├── unit/ # Unit tests for individual components
│ └── test_tools.py # Tests for MCP tools in isolation
├── integration/ # Integration tests for component interactions
│ └── test_server.py # Tests for server STDIO communication
└── e2e/ # End-to-end tests simulating real-world usage
└── test_cursor_simulation.py # Simulates Cursor interaction
```
## Test Categories
1. **Unit Tests** (`unit/`)
- Test individual functions/components in isolation
- Don't require a running server
- Fast to execute
- Example: Testing the `ping()` function directly
2. **Integration Tests** (`integration/`)
- Test interactions between components
- Verify STDIO communication with the server
- Example: Starting the server and sending/receiving messages
3. **End-to-End Tests** (`e2e/`)
- Simulate real-world usage scenarios
- Test the system as a whole
- Example: Simulating how Cursor would interact with the server
## Running Tests
### Run All Tests
```bash
python -m unittest discover mcp_server/tests
```
### Run Tests by Category
```bash
# Unit tests only
python -m unittest discover mcp_server/tests/unit
# Integration tests only
python -m unittest discover mcp_server/tests/integration
# End-to-end tests only
python -m unittest discover mcp_server/tests/e2e
```
### Run a Specific Test File
```bash
python -m unittest mcp_server/tests/unit/test_tools.py
```
### Run a Specific Test Case
```bash
python -m unittest mcp_server.tests.unit.test_tools.TestToolsPing
```
### Run a Specific Test Method
```bash
python -m unittest mcp_server.tests.unit.test_tools.TestToolsPing.test_ping_returns_dict
```
## TDD Process
These tests follow the Test-Driven Development (TDD) approach:
1. **RED**: Write failing tests first
2. **GREEN**: Implement the minimal code to make tests pass
3. **REFACTOR**: Improve the code while keeping tests passing
## Adding New Tests
When adding new MCP tools:
1. Create unit tests for the tool's functionality
2. Add integration tests for the tool's STDIO communication
3. Update E2E tests to verify Cursor interaction with the tool
================
File: tests/unit/test_ping_tool.py
================
"""
Unit tests for Paelladoc MCP tools.
Following TDD approach - tests are written before implementation.
"""
import unittest
import sys
from pathlib import Path
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.absolute()
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
# Import directly from the domain layer
from paelladoc.domain import core_logic
class TestPingTool(unittest.TestCase):
"""Unit tests for the ping tool following TDD methodology."""
def test_ping_exists(self):
"""Test that the ping function exists."""
self.assertTrue(
hasattr(core_logic, "ping"),
"The ping function does not exist in core_logic",
)
def test_ping_returns_dict(self):
"""Test that ping returns a dictionary."""
result = core_logic.ping()
self.assertIsInstance(result, dict, "ping should return a dictionary")
def test_ping_has_required_fields(self):
"""Test that ping response has the required fields."""
result = core_logic.ping()
self.assertIn("status", result, "ping response should contain a 'status' field")
self.assertIn(
"message", result, "ping response should contain a 'message' field"
)
def test_ping_returns_expected_values(self):
"""Test that ping returns the expected values."""
result = core_logic.ping()
self.assertEqual(
result["status"],
"ok",
f"ping status should be 'ok', got '{result['status']}'",
)
self.assertEqual(
result["message"],
"pong",
f"ping message should be 'pong', got '{result['message']}'",
)
if __name__ == "__main__":
unittest.main()
================
File: tests/unit/config/test_database.py
================
"""Unit tests for database configuration module."""
import os
from pathlib import Path
import pytest
from paelladoc.config.database import (
get_project_root,
get_db_path,
PRODUCTION_DB_PATH,
)
@pytest.fixture
def clean_env():
"""Remove relevant environment variables before each test."""
# Store original values
original_db_path = os.environ.get("PAELLADOC_DB_PATH")
original_env = os.environ.get("PAELLADOC_ENV")
# Remove variables
if "PAELLADOC_DB_PATH" in os.environ:
del os.environ["PAELLADOC_DB_PATH"]
if "PAELLADOC_ENV" in os.environ:
del os.environ["PAELLADOC_ENV"]
yield
# Restore original values
if original_db_path is not None:
os.environ["PAELLADOC_DB_PATH"] = original_db_path
if original_env is not None:
os.environ["PAELLADOC_ENV"] = original_env
def test_get_project_root():
"""Test that get_project_root returns a valid directory."""
root = get_project_root()
assert isinstance(root, Path)
assert root.exists()
assert root.is_dir()
assert (root / "src").exists()
assert (root / "src" / "paelladoc").exists()
assert (root / "pyproject.toml").exists()
def test_get_db_path_with_env_var(clean_env):
"""Test that PAELLADOC_DB_PATH environment variable takes precedence."""
custom_path = "/custom/path/db.sqlite"
os.environ["PAELLADOC_DB_PATH"] = custom_path
db_path = get_db_path()
assert isinstance(db_path, Path)
assert str(db_path) == custom_path
def test_get_db_path_production_default(clean_env):
"""Test that production mode uses home directory."""
db_path = get_db_path()
assert isinstance(db_path, Path)
assert db_path == PRODUCTION_DB_PATH
assert db_path.name == "memory.db"
assert db_path.parent.name == ".paelladoc"
assert db_path.parent.parent == Path.home()
def test_production_db_path_constant():
"""Test that PRODUCTION_DB_PATH is correctly set."""
assert isinstance(PRODUCTION_DB_PATH, Path)
assert PRODUCTION_DB_PATH.name == "memory.db"
assert PRODUCTION_DB_PATH.parent.name == ".paelladoc"
assert PRODUCTION_DB_PATH.parent.parent == Path.home()
================
File: tests/unit/application/utils/test_behavior_enforcer.py
================
"""
Unit tests for the BehaviorEnforcer utility.
"""
import unittest
import sys
from pathlib import Path
from typing import Set, Optional
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Module to test
from paelladoc.application.utils.behavior_enforcer import (
BehaviorEnforcer,
BehaviorViolationError,
)
# Mock context object for tests
class MockContext:
def __init__(self, collected_params: Optional[Set[str]] = None):
self.progress = {
"collected_params": collected_params
if collected_params is not None
else set()
}
class TestBehaviorEnforcer(unittest.TestCase):
"""Unit tests for the BehaviorEnforcer."""
def setUp(self):
self.tool_name = "test.tool"
self.sequence = ["param1", "param2", "param3"]
self.behavior_config = {"fixed_question_order": self.sequence}
def test_enforce_no_config(self):
"""Test that enforcement passes if no behavior_config is provided."""
try:
BehaviorEnforcer.enforce(self.tool_name, None, MockContext(), {"arg": 1})
except BehaviorViolationError:
self.fail("Enforcement should pass when no config is given.")
def test_enforce_no_fixed_order(self):
"""Test enforcement passes if 'fixed_question_order' is not in config."""
config = {"other_rule": True}
try:
BehaviorEnforcer.enforce(
self.tool_name, config, MockContext(), {"param1": "value"}
)
except BehaviorViolationError:
self.fail(
"Enforcement should pass when fixed_question_order is not defined."
)
def test_enforce_no_context_or_args(self):
"""Test enforcement passes (logs warning) if context or args are missing."""
# Note: Current implementation returns None (passes), might change behavior later.
try:
BehaviorEnforcer.enforce(self.tool_name, self.behavior_config, None, None)
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, MockContext(), None
)
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, None, {"param1": "a"}
)
except BehaviorViolationError:
self.fail("Enforcement should pass when context or args are missing.")
def test_enforce_no_new_params_provided(self):
"""Test enforcement passes if no *new* parameters are provided."""
ctx = MockContext(collected_params={"param1"})
# Providing only already collected param
provided_args = {"param1": "new_value", "param2": None}
try:
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
except BehaviorViolationError as e:
self.fail(
f"Enforcement should pass when only old params are provided. Raised: {e}"
)
def test_enforce_correct_first_param(self):
"""Test enforcement passes when the correct first parameter is provided."""
ctx = MockContext()
provided_args = {"param1": "value1"}
try:
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
except BehaviorViolationError as e:
self.fail(f"Enforcement failed for correct first param. Raised: {e}")
def test_enforce_correct_second_param(self):
"""Test enforcement passes when the correct second parameter is provided."""
ctx = MockContext(collected_params={"param1"})
provided_args = {
"param1": "value1",
"param2": "value2",
} # param1 is old, param2 is new
try:
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
except BehaviorViolationError as e:
self.fail(f"Enforcement failed for correct second param. Raised: {e}")
def test_enforce_incorrect_first_param(self):
"""Test enforcement fails when the wrong first parameter is provided."""
ctx = MockContext()
provided_args = {"param2": "value2"} # Should be param1
with self.assertRaisesRegex(
BehaviorViolationError,
"Expected next: 'param1'. Got unexpected new parameter: 'param2'",
):
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
def test_enforce_incorrect_second_param(self):
"""Test enforcement fails when the wrong second parameter is provided."""
ctx = MockContext(collected_params={"param1"})
provided_args = {"param1": "val1", "param3": "value3"} # Should be param2
with self.assertRaisesRegex(
BehaviorViolationError,
"Expected next: 'param2'. Got unexpected new parameter: 'param3'",
):
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
def test_enforce_multiple_new_params_fails(self):
"""Test enforcement fails when multiple new parameters are provided at once."""
ctx = MockContext()
provided_args = {"param1": "value1", "param2": "value2"} # Both are new
# Adjust regex to match the more detailed error message
expected_regex = (
r"Tool 'test.tool' expects parameters sequentially. "
r"Expected next: 'param1'. "
# Use regex to handle potential set order variations {'param1', 'param2'} or {'param2', 'param1'}
r"Provided multiple new parameters: {('param1', 'param2'|'param2', 'param1')}. "
r"Collected so far: set\(\)."
)
with self.assertRaisesRegex(BehaviorViolationError, expected_regex):
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
def test_enforce_multiple_new_params_later_fails(self):
"""Test enforcement fails when multiple new params are provided later in sequence."""
ctx = MockContext(collected_params={"param1"})
provided_args = {
"param1": "v1",
"param2": "value2",
"param3": "value3",
} # param2 and param3 are new
# Adjust regex to match the more detailed error message
expected_regex = (
r"Tool 'test.tool' expects parameters sequentially. "
r"Expected next: 'param2'. "
# Use regex to handle potential set order variations
r"Provided multiple new parameters: {('param2', 'param3'|'param3', 'param2')}. "
r"Collected so far: {'param1'}."
)
with self.assertRaisesRegex(BehaviorViolationError, expected_regex):
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
def test_enforce_params_after_sequence_complete_passes(self):
"""Test enforcement passes when providing args after the sequence is complete."""
ctx = MockContext(collected_params={"param1", "param2", "param3"})
provided_args = {
"param1": "v1",
"param2": "v2",
"param3": "v3",
"optional_param": "opt",
}
try:
BehaviorEnforcer.enforce(
self.tool_name, self.behavior_config, ctx, provided_args
)
except BehaviorViolationError as e:
self.fail(
f"Enforcement should pass for args after sequence complete. Raised: {e}"
)
# if __name__ == "__main__":
# unittest.main()
================
File: tests/unit/application/services/test_memory_service.py
================
"""
Unit tests for the MemoryService.
"""
from unittest.mock import AsyncMock # Use AsyncMock for async methods
import sys
from pathlib import Path
import pytest
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Modules to test
from paelladoc.application.services.memory_service import MemoryService
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo, # Ensure this line is correct
)
from paelladoc.ports.output.memory_port import MemoryPort
# --- Pytest Fixtures ---
@pytest.fixture
def mock_memory_port() -> AsyncMock:
"""Provides a mocked MemoryPort instance for tests."""
return AsyncMock(spec=MemoryPort)
@pytest.fixture
def memory_service(mock_memory_port: AsyncMock) -> MemoryService:
"""Provides a MemoryService instance initialized with a mocked port."""
return MemoryService(memory_port=mock_memory_port)
# --- Tests for Taxonomy Events (Pytest Style) ---
@pytest.mark.asyncio
async def test_update_project_memory_emits_taxonomy_updated_event(
memory_service: MemoryService, mock_memory_port: AsyncMock
):
"""Test that taxonomy_updated event is emitted when taxonomy fields change."""
# Arrange
project_name = "tax-event-project"
old_memory = ProjectMemory(
project_info=ProjectInfo(
name=project_name, base_path="/fake", taxonomy_version="1.0"
)
)
old_memory.platform_taxonomy = "web-frontend"
old_memory.domain_taxonomy = "ecommerce"
old_memory.size_taxonomy = "smb"
old_memory.compliance_taxonomy = None
new_memory = ProjectMemory(
project_info=ProjectInfo(
name=project_name, base_path="/fake", taxonomy_version="1.0"
)
)
new_memory.platform_taxonomy = "ios-native" # Changed
new_memory.domain_taxonomy = "ecommerce"
new_memory.size_taxonomy = "enterprise" # Changed
new_memory.compliance_taxonomy = "gdpr" # Changed
# Mock the port methods
mock_memory_port.project_exists.return_value = True
mock_memory_port.load_memory.return_value = old_memory
mock_memory_port.save_memory.return_value = None # Async function returns None
# Create and register a mock event handler
mock_handler = AsyncMock()
memory_service.register_event_handler("taxonomy_updated", mock_handler)
# Also register for project_updated to ensure it's still called
mock_project_updated_handler = AsyncMock()
memory_service.register_event_handler(
"project_updated", mock_project_updated_handler
)
# Act
await memory_service.update_project_memory(new_memory)
# Assert
mock_memory_port.save_memory.assert_awaited_once_with(new_memory)
# Check project_updated event was called
mock_project_updated_handler.assert_awaited_once()
assert mock_project_updated_handler.await_args[0][0] == "project_updated"
# Check taxonomy_updated event was called with correct data
mock_handler.assert_awaited_once()
event_name, event_data = mock_handler.await_args[0]
assert event_name == "taxonomy_updated"
assert event_data["project_name"] == project_name
assert event_data["new_taxonomy"] == {
"platform": "ios-native",
"domain": "ecommerce",
"size": "enterprise",
"compliance": "gdpr",
}
assert event_data["old_taxonomy"] == {
"platform": "web-frontend",
"domain": "ecommerce",
"size": "smb",
"compliance": None,
}
@pytest.mark.asyncio
async def test_update_project_memory_no_taxonomy_change_no_event(
memory_service: MemoryService, mock_memory_port: AsyncMock
):
"""Test that taxonomy_updated event is NOT emitted if taxonomy fields don't change."""
# Arrange
project_name = "no-tax-event-project"
old_memory = ProjectMemory(
project_info=ProjectInfo(
name=project_name, base_path="/fake", taxonomy_version="1.0"
)
)
old_memory.platform_taxonomy = "web-frontend"
old_memory.domain_taxonomy = "ecommerce"
old_memory.size_taxonomy = "smb"
old_memory.compliance_taxonomy = None
new_memory = ProjectMemory(
project_info=ProjectInfo(
name=project_name, base_path="/fake", taxonomy_version="1.0"
)
)
# Keep taxonomy fields the same
new_memory.platform_taxonomy = "web-frontend"
new_memory.domain_taxonomy = "ecommerce"
new_memory.size_taxonomy = "smb"
new_memory.compliance_taxonomy = None
# Make some other change to trigger update
new_memory.project_info.purpose = "Updated purpose"
# Mock the port methods
mock_memory_port.project_exists.return_value = True
mock_memory_port.load_memory.return_value = old_memory
mock_memory_port.save_memory.return_value = None
# Create and register a mock event handler
mock_handler = AsyncMock()
memory_service.register_event_handler("taxonomy_updated", mock_handler)
# Also register for project_updated to ensure it's still called
mock_project_updated_handler = AsyncMock()
memory_service.register_event_handler(
"project_updated", mock_project_updated_handler
)
# Act
await memory_service.update_project_memory(new_memory)
# Assert
mock_memory_port.save_memory.assert_awaited_once_with(new_memory)
# Check project_updated event was called (because metadata changed)
mock_project_updated_handler.assert_awaited_once()
# Check taxonomy_updated event was NOT called
mock_handler.assert_not_awaited()
# NOTE: Keep the existing unittest class for other tests for now, or refactor all later.
# If keeping both styles, ensure imports and module structure support it.
# class TestMemoryService(unittest.IsolatedAsyncioTestCase):
# ... (existing unittest tests) ...
================
File: tests/unit/application/services/test_vector_store_service.py
================
"""
Unit tests for the VectorStoreService.
"""
import unittest
from unittest.mock import AsyncMock, MagicMock # Added MagicMock for SearchResult
import sys
from pathlib import Path
from typing import List, Dict, Any
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Modules to test
from paelladoc.application.services.vector_store_service import VectorStoreService
from paelladoc.ports.output.vector_store_port import VectorStorePort, SearchResult
# Dummy SearchResult implementation for tests
class MockSearchResult(SearchResult):
def __init__(
self, id: str, distance: float, metadata: Dict[str, Any], document: str
):
self.id = id
self.distance = distance
self.metadata = metadata
self.document = document
class TestVectorStoreService(unittest.IsolatedAsyncioTestCase):
"""Unit tests for the VectorStoreService using a mocked VectorStorePort."""
def setUp(self):
"""Set up a mocked VectorStorePort before each test."""
self.mock_vector_store_port = AsyncMock(spec=VectorStorePort)
self.vector_store_service = VectorStoreService(
vector_store_port=self.mock_vector_store_port
)
# --- Test Cases --- #
async def test_add_texts_to_collection_calls_port(self):
"""Verify add_texts_to_collection calls add_documents on the port."""
collection_name = "test_coll"
documents = ["doc1", "doc2"]
metadatas = [{"s": 1}, {"s": 2}]
ids = ["id1", "id2"]
expected_ids = ids
self.mock_vector_store_port.add_documents.return_value = expected_ids
actual_ids = await self.vector_store_service.add_texts_to_collection(
collection_name, documents, metadatas, ids
)
self.mock_vector_store_port.add_documents.assert_awaited_once_with(
collection_name=collection_name,
documents=documents,
metadatas=metadatas,
ids=ids,
)
self.assertEqual(actual_ids, expected_ids)
async def test_add_texts_to_collection_reraises_exception(self):
"""Verify add_texts_to_collection re-raises port exceptions."""
collection_name = "test_coll_fail"
documents = ["doc1"]
test_exception = ValueError("Port error")
self.mock_vector_store_port.add_documents.side_effect = test_exception
with self.assertRaises(ValueError) as cm:
await self.vector_store_service.add_texts_to_collection(
collection_name, documents
)
self.assertEqual(cm.exception, test_exception)
self.mock_vector_store_port.add_documents.assert_awaited_once()
async def test_find_similar_texts_calls_port(self):
"""Verify find_similar_texts calls search_similar on the port."""
collection_name = "test_search_coll"
query_texts = ["query1"]
n_results = 3
filter_metadata = {"year": 2024}
filter_document = None # Example
expected_results: List[List[SearchResult]] = [
[MockSearchResult("res1", 0.5, {"year": 2024}, "doc text")]
]
self.mock_vector_store_port.search_similar.return_value = expected_results
actual_results = await self.vector_store_service.find_similar_texts(
collection_name, query_texts, n_results, filter_metadata, filter_document
)
self.mock_vector_store_port.search_similar.assert_awaited_once_with(
collection_name=collection_name,
query_texts=query_texts,
n_results=n_results,
where=filter_metadata,
where_document=filter_document,
include=[
"metadatas",
"documents",
"distances",
"ids",
], # Check default include
)
self.assertEqual(actual_results, expected_results)
async def test_find_similar_texts_reraises_exception(self):
"""Verify find_similar_texts re-raises port exceptions."""
collection_name = "test_search_fail"
query_texts = ["query1"]
test_exception = RuntimeError("Search failed")
self.mock_vector_store_port.search_similar.side_effect = test_exception
with self.assertRaises(RuntimeError) as cm:
await self.vector_store_service.find_similar_texts(
collection_name, query_texts
)
self.assertEqual(cm.exception, test_exception)
self.mock_vector_store_port.search_similar.assert_awaited_once()
async def test_ensure_collection_exists_calls_port(self):
"""Verify ensure_collection_exists calls get_or_create_collection on the port."""
collection_name = "ensure_coll"
# Mock the port method to return a dummy collection object (can be anything)
self.mock_vector_store_port.get_or_create_collection.return_value = MagicMock()
await self.vector_store_service.ensure_collection_exists(collection_name)
self.mock_vector_store_port.get_or_create_collection.assert_awaited_once_with(
collection_name
)
async def test_ensure_collection_exists_reraises_exception(self):
"""Verify ensure_collection_exists re-raises port exceptions."""
collection_name = "ensure_coll_fail"
test_exception = ConnectionError("DB down")
self.mock_vector_store_port.get_or_create_collection.side_effect = (
test_exception
)
with self.assertRaises(ConnectionError) as cm:
await self.vector_store_service.ensure_collection_exists(collection_name)
self.assertEqual(cm.exception, test_exception)
self.mock_vector_store_port.get_or_create_collection.assert_awaited_once_with(
collection_name
)
async def test_remove_collection_calls_port(self):
"""Verify remove_collection calls delete_collection on the port."""
collection_name = "remove_coll"
self.mock_vector_store_port.delete_collection.return_value = (
None # Method returns None
)
await self.vector_store_service.remove_collection(collection_name)
self.mock_vector_store_port.delete_collection.assert_awaited_once_with(
collection_name
)
async def test_remove_collection_reraises_exception(self):
"""Verify remove_collection re-raises port exceptions."""
collection_name = "remove_coll_fail"
test_exception = TimeoutError("Delete timed out")
self.mock_vector_store_port.delete_collection.side_effect = test_exception
with self.assertRaises(TimeoutError) as cm:
await self.vector_store_service.remove_collection(collection_name)
self.assertEqual(cm.exception, test_exception)
self.mock_vector_store_port.delete_collection.assert_awaited_once_with(
collection_name
)
# if __name__ == "__main__":
# unittest.main()
================
File: tests/unit/domain/models/test_project.py
================
import json
import pytest
from datetime import datetime
from pathlib import Path
from paelladoc.domain.models.project import (
DocumentStatus,
Bucket,
ArtifactMeta,
ProjectInfo,
ProjectMemory,
# ProjectDocument, # Assuming this was removed or is internal
)
class TestBucket:
"""Tests for the Bucket enum"""
def test_bucket_values(self):
"""Test that all buckets have the correct string format"""
for bucket in Bucket:
if bucket is not Bucket.UNKNOWN:
# Format should be "Phase::Subcategory"
assert "::" in bucket.value
phase, subcategory = bucket.value.split("::")
assert phase in [
"Initiate",
"Elaborate",
"Govern",
"Generate",
"Maintain",
"Deploy",
"Operate",
"Iterate",
]
assert len(subcategory) > 0
else:
assert bucket.value == "Unknown"
def test_get_phase_buckets(self):
"""Test the get_phase_buckets class method"""
initiate_buckets = Bucket.get_phase_buckets("Initiate")
assert len(initiate_buckets) == 2
assert Bucket.INITIATE_CORE_SETUP in initiate_buckets
assert Bucket.INITIATE_INITIAL_PRODUCT_DOCS in initiate_buckets
elaborate_buckets = Bucket.get_phase_buckets("Elaborate")
assert len(elaborate_buckets) == 4
# Should return empty set for non-existent phase
nonexistent_buckets = Bucket.get_phase_buckets("NonExistent")
assert len(nonexistent_buckets) == 0
class TestArtifactMeta:
"""Tests for the ArtifactMeta model"""
def test_create_artifact_meta(self):
"""Test creating an ArtifactMeta instance"""
artifact = ArtifactMeta(
name="test_artifact",
bucket=Bucket.INITIATE_CORE_SETUP,
path=Path("docs/test_artifact.md"),
status=DocumentStatus.IN_PROGRESS,
)
assert artifact.name == "test_artifact"
assert artifact.bucket == Bucket.INITIATE_CORE_SETUP
assert artifact.path == Path("docs/test_artifact.md")
assert artifact.status == DocumentStatus.IN_PROGRESS
assert isinstance(artifact.created_at, datetime)
assert isinstance(artifact.updated_at, datetime)
def test_update_status(self):
"""Test updating an artifact's status"""
artifact = ArtifactMeta(
name="test_artifact",
bucket=Bucket.INITIATE_CORE_SETUP,
path=Path("docs/test_artifact.md"),
)
# Default status should be PENDING
assert artifact.status == DocumentStatus.PENDING
# Store the original timestamp
original_updated_at = artifact.updated_at
# Update the status
artifact.update_status(DocumentStatus.COMPLETED)
# Check that status was updated
assert artifact.status == DocumentStatus.COMPLETED
# Check that timestamp was updated
assert artifact.updated_at > original_updated_at
def test_serialization_deserialization(self):
"""Test that ArtifactMeta can be serialized and deserialized"""
artifact = ArtifactMeta(
name="test_artifact",
bucket=Bucket.ELABORATE_DISCOVERY_AND_RESEARCH,
path=Path("docs/research/test_artifact.md"),
status=DocumentStatus.COMPLETED,
)
# Serialize to JSON
artifact_json = artifact.model_dump_json()
# Deserialize from JSON
loaded_artifact = ArtifactMeta.model_validate_json(artifact_json)
# Check that all fields were preserved
assert loaded_artifact.name == artifact.name
assert loaded_artifact.bucket == artifact.bucket
assert loaded_artifact.path == artifact.path
assert loaded_artifact.status == artifact.status
assert loaded_artifact.created_at == artifact.created_at
assert loaded_artifact.updated_at == artifact.updated_at
@pytest.fixture
def sample_project_memory():
"""Fixture to create a sample ProjectMemory instance for testing."""
project_info = ProjectInfo(
name="Test Project",
description="A test project.",
base_path="/path/to/project",
documentation_language="en",
interaction_language="en",
platform_taxonomy="test_platform", # Added
domain_taxonomy="test_domain", # Added
size_taxonomy="test_size", # Added
compliance_taxonomy="test_compliance" # Added
)
return ProjectMemory(
info=project_info,
created_at=datetime.now(),
updated_at=datetime.now(),
metadata={"version": "1.0"}
)
def test_project_info_initialization():
"""Test ProjectInfo initialization."""
info = ProjectInfo(
name="Another Test",
description="Detailed desc.",
base_path="/tmp",
documentation_language="es",
interaction_language="es",
platform_taxonomy="test_platform_2", # Added
domain_taxonomy="test_domain_2", # Added
size_taxonomy="test_size_2", # Added
compliance_taxonomy="test_compliance_2" # Added
)
assert info.name == "Another Test"
assert info.description == "Detailed desc."
assert info.base_path == "/tmp"
assert info.documentation_language == "es"
assert info.interaction_language == "es"
assert info.platform_taxonomy == "test_platform_2"
assert info.domain_taxonomy == "test_domain_2"
assert info.size_taxonomy == "test_size_2"
assert info.compliance_taxonomy == "test_compliance_2"
def test_project_memory_initialization(sample_project_memory):
"""Test ProjectMemory initialization using the fixture."""
assert sample_project_memory.info.name == "Test Project"
assert "version" in sample_project_memory.metadata
assert isinstance(sample_project_memory.created_at, datetime)
assert isinstance(sample_project_memory.updated_at, datetime)
# Check taxonomy fields added in fixture
assert sample_project_memory.info.platform_taxonomy == "test_platform"
assert sample_project_memory.info.domain_taxonomy == "test_domain"
assert sample_project_memory.info.size_taxonomy == "test_size"
assert sample_project_memory.info.compliance_taxonomy == "test_compliance"
def test_project_memory_update(sample_project_memory):
"""Test ProjectMemory update."""
# Implementation of the test_project_memory_update method
pass
class TestProjectMemory:
"""Tests for the ProjectMemory model with taxonomy support"""
def test_project_memory_initialization(self):
"""Test initializing ProjectMemory with taxonomy support"""
project = ProjectMemory(
project_info=ProjectInfo(
name="test_project",
# Add required taxonomy fields
platform_taxonomy="test_platform",
domain_taxonomy="test_domain",
size_taxonomy="test_size",
compliance_taxonomy="test_compliance",
),
taxonomy_version="0.5",
# Add required taxonomy fields also directly to ProjectMemory
platform_taxonomy="test_platform",
domain_taxonomy="test_domain",
size_taxonomy="test_size",
compliance_taxonomy="test_compliance",
)
# Check that all buckets are initialized
for bucket in Bucket:
assert bucket in project.artifacts
assert isinstance(project.artifacts[bucket], list)
assert len(project.artifacts[bucket]) == 0
def test_add_artifact(self, sample_project_memory):
"""Test adding artifacts to ProjectMemory"""
project = sample_project_memory
# Check that artifacts were added to the correct buckets
assert len(project.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS]) == 1
assert len(project.artifacts[Bucket.ELABORATE_DISCOVERY_AND_RESEARCH]) == 1
assert len(project.artifacts[Bucket.ELABORATE_SPECIFICATION_AND_PLANNING]) == 1
# Check that artifact was added with correct fields
initiate_artifact = project.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS][0]
assert initiate_artifact.name == "vision_doc"
assert initiate_artifact.path == Path("docs/initiation/product_vision.md")
assert initiate_artifact.status == DocumentStatus.PENDING
# Test adding a duplicate (should return False)
duplicate = ArtifactMeta(
name="dup_vision",
bucket=Bucket.INITIATE_CORE_SETUP,
path=Path(
"docs/initiation/product_vision.md"
), # Same path as existing artifact
)
assert not project.add_artifact(duplicate)
# Check that original buckets still have the same count
assert len(project.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS]) == 1
assert (
len(project.artifacts[Bucket.INITIATE_CORE_SETUP]) == 0
) # Duplicate wasn't added
def test_get_artifact(self, sample_project_memory):
"""Test retrieving artifacts by bucket and name"""
project = sample_project_memory
# Get existing artifact
artifact = project.get_artifact(
Bucket.ELABORATE_DISCOVERY_AND_RESEARCH, "user_research"
)
assert artifact is not None
assert artifact.name == "user_research"
assert artifact.bucket == Bucket.ELABORATE_DISCOVERY_AND_RESEARCH
# Get non-existent artifact
non_existent = project.get_artifact(Bucket.DEPLOY_SECURITY, "security_plan")
assert non_existent is None
def test_get_artifact_by_path(self, sample_project_memory):
"""Test retrieving artifacts by path"""
project = sample_project_memory
# Get existing artifact by path
artifact = project.get_artifact_by_path(Path("docs/specs/api_specification.md"))
assert artifact is not None
assert artifact.name == "api_spec"
assert artifact.bucket == Bucket.ELABORATE_SPECIFICATION_AND_PLANNING
# Get non-existent artifact
non_existent = project.get_artifact_by_path(Path("nonexistent/path.md"))
assert non_existent is None
def test_update_artifact_status(self, sample_project_memory):
"""Test updating artifact status"""
project = sample_project_memory
# Update existing artifact
success = project.update_artifact_status(
Bucket.INITIATE_INITIAL_PRODUCT_DOCS, "vision_doc", DocumentStatus.COMPLETED
)
assert success
# Verify the status was updated
artifact = project.get_artifact(
Bucket.INITIATE_INITIAL_PRODUCT_DOCS, "vision_doc"
)
assert artifact.status == DocumentStatus.COMPLETED
# Try to update non-existent artifact
success = project.update_artifact_status(
Bucket.DEPLOY_SECURITY, "nonexistent", DocumentStatus.COMPLETED
)
assert not success
def test_get_bucket_completion(self, sample_project_memory):
"""Test getting completion stats for buckets"""
project = sample_project_memory
# Bucket with one completed artifact
elaborate_spec_stats = project.get_bucket_completion(
Bucket.ELABORATE_SPECIFICATION_AND_PLANNING
)
assert elaborate_spec_stats["total"] == 1
assert elaborate_spec_stats["completed"] == 1
assert elaborate_spec_stats["in_progress"] == 0
assert elaborate_spec_stats["pending"] == 0
assert elaborate_spec_stats["completion_percentage"] == 100.0
# Bucket with one in-progress artifact
elaborate_research_stats = project.get_bucket_completion(
Bucket.ELABORATE_DISCOVERY_AND_RESEARCH
)
assert elaborate_research_stats["total"] == 1
assert elaborate_research_stats["completed"] == 0
assert elaborate_research_stats["in_progress"] == 1
assert elaborate_research_stats["pending"] == 0
assert elaborate_research_stats["completion_percentage"] == 0.0
# Empty bucket
empty_bucket_stats = project.get_bucket_completion(Bucket.DEPLOY_SECURITY)
assert empty_bucket_stats["total"] == 0
assert empty_bucket_stats["completion_percentage"] == 0.0
def test_get_phase_completion(self, sample_project_memory):
"""Test getting completion stats for entire phases"""
project = sample_project_memory
# Elaborate phase has 2 artifacts (1 completed, 1 in-progress)
elaborate_stats = project.get_phase_completion("Elaborate")
assert elaborate_stats["total"] == 2
assert elaborate_stats["completed"] == 1
assert elaborate_stats["in_progress"] == 1
assert elaborate_stats["pending"] == 0
assert elaborate_stats["completion_percentage"] == 50.0
assert elaborate_stats["buckets"] == 4 # All Elaborate buckets
# Initiate phase has 1 pending artifact
initiate_stats = project.get_phase_completion("Initiate")
assert initiate_stats["total"] == 1
assert initiate_stats["completed"] == 0
assert initiate_stats["pending"] == 1
assert initiate_stats["completion_percentage"] == 0.0
# Deploy phase has 0 artifacts
deploy_stats = project.get_phase_completion("Deploy")
assert deploy_stats["total"] == 0
assert deploy_stats["completion_percentage"] == 0.0
def test_serialization_deserialization(self, sample_project_memory):
"""Test that ProjectMemory with taxonomy can be serialized and deserialized"""
project = sample_project_memory
# Serialize to JSON
project_json = project.model_dump_json()
# Check that JSON is valid
parsed_json = json.loads(project_json)
assert parsed_json["taxonomy_version"] == "0.5"
assert "artifacts" in parsed_json
# Deserialize from JSON
loaded_project = ProjectMemory.model_validate_json(project_json)
# Check that all fields were preserved
assert loaded_project.project_info.name == project.project_info.name
assert loaded_project.taxonomy_version == project.taxonomy_version
# Check artifacts
assert Bucket.INITIATE_INITIAL_PRODUCT_DOCS in loaded_project.artifacts
assert Bucket.ELABORATE_DISCOVERY_AND_RESEARCH in loaded_project.artifacts
assert Bucket.ELABORATE_SPECIFICATION_AND_PLANNING in loaded_project.artifacts
# Check specific artifact fields were preserved
loaded_artifact = loaded_project.get_artifact(
Bucket.ELABORATE_SPECIFICATION_AND_PLANNING, "api_spec"
)
assert loaded_artifact is not None
assert loaded_artifact.name == "api_spec"
assert loaded_artifact.path == Path("docs/specs/api_specification.md")
assert loaded_artifact.status == DocumentStatus.COMPLETED
# Verify completion stats are calculated correctly after deserialization
stats = loaded_project.get_phase_completion("Elaborate")
assert stats["completion_percentage"] == 50.0
================
File: tests/integration/test_server.py
================
#!/usr/bin/env python
"""
Integration tests for the Paelladoc MCP server.
These tests verify that the server correctly starts and responds to requests
via STDIO communication.
"""
import unittest
import sys
import os
import subprocess
from pathlib import Path
# Removed pty/select imports as PTY test is skipped
import signal
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
# Constants
SERVER_SCRIPT = project_root / "server.py"
class TestServerIntegration(unittest.TestCase):
"""Integration tests for the MCP server STDIO communication."""
@unittest.skip(
"Skipping PTY/STDIO test: FastMCP stdio interaction difficult to replicate reliably outside actual client environment."
)
def test_server_responds_to_ping(self):
"""Verify that the server responds to a ping request via PTY STDIO. (SKIPPED)"""
# request_id = str(uuid.uuid4()) # F841 - Removed
env = os.environ.copy()
env["PYTHONPATH"] = str(project_root)
env["PYTHONUNBUFFERED"] = "1"
# --- Start server using PTY ---
# master_fd, slave_fd = pty.openpty() # PTY logic commented out
server_process = None
master_fd = None # Ensure master_fd is defined for finally block
try:
# server_process = subprocess.Popen(...)
# os.close(slave_fd)
# --- Test Communication ---
# response_data = None # F841 - Removed
# stderr_output = "" # F841 - Removed again
# time.sleep(2)
# if server_process.poll() is not None:
# ...
# mcp_request = {...}
# request_json = json.dumps(mcp_request) + "\n"
# print(f"Sending request via PTY: {request_json.strip()}")
# os.write(master_fd, request_json.encode())
# # Read response from PTY master fd with timeout
# stdout_line = ""
# buffer = b""
# end_time = time.time() + 5
# while time.time() < end_time:
# ...
# print(f"Received raw response line: {stdout_line.strip()}")
# if not stdout_line:
# ...
# response_data = json.loads(stdout_line)
# print(f"Parsed response: {response_data}")
# self.assertEqual(...)
pass # Keep test structure but do nothing as it's skipped
except Exception as e:
# stderr_output = "" # F841 - Removed
# ... (error handling commented out) ...
self.fail(f"An error occurred during the PTY test (should be skipped): {e}")
finally:
# --- Cleanup ---
if master_fd:
try:
os.close(master_fd)
except OSError:
pass
if server_process and server_process.poll() is None:
print("Terminating server process (if it was started)...")
try:
os.killpg(os.getpgid(server_process.pid), signal.SIGTERM)
server_process.wait(timeout=2)
except (ProcessLookupError, subprocess.TimeoutExpired, AttributeError):
# Handle cases where process/pgid might not exist if startup failed early
print(
"Server cleanup notification: process termination might have failed or was not needed."
)
if server_process and server_process.poll() is None:
try:
os.killpg(os.getpgid(server_process.pid), signal.SIGKILL)
except Exception:
pass # Final attempt
except Exception as term_e:
print(f"Error during termination: {term_e}")
# Read any remaining stderr
if server_process and server_process.stderr:
stderr_rem = server_process.stderr.read().decode(errors="ignore")
if stderr_rem:
print(f"Remaining stderr: {stderr_rem}")
if __name__ == "__main__":
unittest.main()
================
File: tests/integration/test_alembic_config.py
================
"""Integration tests for Alembic configuration."""
import os
import pytest
from pathlib import Path
import uuid
import subprocess # Import subprocess
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
from sqlalchemy.ext.asyncio import create_async_engine
import sys
# Import get_db_path to test its behavior directly
from paelladoc.config.database import get_db_path
# Get project root to build absolute paths if needed
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent.absolute()
sys.path.insert(0, str(PROJECT_ROOT))
@pytest.fixture
def clean_env():
"""Remove relevant environment variables before each test."""
original_db_path = os.environ.get("PAELLADOC_DB_PATH")
original_env = os.environ.get("PAELLADOC_ENV")
if "PAELLADOC_DB_PATH" in os.environ:
del os.environ["PAELLADOC_DB_PATH"]
if "PAELLADOC_ENV" in os.environ:
del os.environ["PAELLADOC_ENV"]
yield
if original_db_path is not None:
os.environ["PAELLADOC_DB_PATH"] = original_db_path
if original_env is not None:
os.environ["PAELLADOC_ENV"] = original_env
@pytest.fixture
def temp_db_path():
"""Create a temporary database path."""
test_db_name = f"test_alembic_{uuid.uuid4()}.db"
# Use a simpler temp directory structure to avoid potential permission issues
test_dir = Path("/tmp") / "paelladoc_test_dbs"
test_db_path = test_dir / test_db_name
test_db_path.parent.mkdir(parents=True, exist_ok=True)
yield test_db_path
# Cleanup
try:
if test_db_path.exists():
# No need for asyncio.sleep here as subprocess runs separately
os.remove(test_db_path)
if test_dir.exists() and not any(test_dir.iterdir()):
test_dir.rmdir()
except Exception as e:
print(f"Error during cleanup: {e}")
def run_alembic_command(command: list, env: dict):
"""Helper function to run alembic CLI commands via subprocess."""
# Ensure alembic is callable, adjust path if needed (e.g., use .venv/bin/alembic)
alembic_executable = PROJECT_ROOT / ".venv" / "bin" / "alembic"
if not alembic_executable.exists():
# Fallback or error if venv structure is different
pytest.fail(f"Alembic executable not found at {alembic_executable}")
cmd = [str(alembic_executable)] + command
print(f"\nRunning subprocess: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
env={**os.environ, **env}, # Merge OS env with test-specific env
cwd=PROJECT_ROOT, # Run from project root where alembic.ini is
check=False, # Don't raise exception on non-zero exit, check manually
)
print(f"Subprocess stdout:\n{result.stdout}")
print(f"Subprocess stderr:\n{result.stderr}")
if result.returncode != 0:
pytest.fail(
f"Alembic command {' '.join(command)} failed with exit code {result.returncode}\nStderr: {result.stderr}"
)
return result
def test_alembic_config_uses_db_path_via_env(clean_env, temp_db_path):
"""Test that env.py logic picks up PAELLADOC_DB_PATH."""
os.environ["PAELLADOC_DB_PATH"] = str(temp_db_path)
# Verify that get_db_path() returns the expected path
# as this is what env.py uses to construct the URL.
resolved_path = get_db_path()
assert resolved_path == temp_db_path
@pytest.mark.asyncio
async def test_alembic_migrations_work_with_config(clean_env, temp_db_path):
"""Test that migrations work by running alembic upgrade via subprocess."""
test_env = {"PAELLADOC_DB_PATH": str(temp_db_path)}
# Ensure the temporary database file exists before running Alembic
if not temp_db_path.exists():
temp_db_path.touch()
# Run alembic upgrade head in a subprocess
run_alembic_command(["upgrade", "head"], env=test_env)
# Verify migrations applied using an async engine
# Need the actual URL alembic used (which comes from env var)
db_url = f"sqlite+aiosqlite:///{temp_db_path}"
engine = create_async_engine(db_url)
try:
async with engine.connect() as conn:
# Define a sync function to get revision
def get_rev_sync(sync_conn):
# Need alembic config to find script directory
cfg = Config("alembic.ini") # Load config to get script location
migration_context = MigrationContext.configure(
connection=sync_conn,
opts={"script": ScriptDirectory.from_config(cfg)},
)
return migration_context.get_current_revision()
# Run the sync function using run_sync
current_rev = await conn.run_sync(get_rev_sync)
# Get head revision directly from script directory
cfg = Config("alembic.ini")
script = ScriptDirectory.from_config(cfg)
head_rev = script.get_current_head()
assert current_rev is not None, "DB revision is None after upgrade."
assert current_rev == head_rev, (
f"DB revision {current_rev} does not match head {head_rev}"
)
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_alembic_downgrade_works_with_config(clean_env, temp_db_path):
"""Test that downgrades work by running alembic via subprocess."""
test_env = {"PAELLADOC_DB_PATH": str(temp_db_path)}
# Ensure the temporary database file exists before running Alembic
if not temp_db_path.exists():
temp_db_path.touch()
# Run migrations up first
run_alembic_command(["upgrade", "head"], env=test_env)
# Run migrations down
run_alembic_command(["downgrade", "base"], env=test_env)
# Verify database is at base (no revision)
db_url = f"sqlite+aiosqlite:///{temp_db_path}"
engine = create_async_engine(db_url)
try:
async with engine.connect() as conn:
# Define a sync function to get revision
def get_rev_sync(sync_conn):
cfg = Config("alembic.ini") # Load config to get script location
migration_context = MigrationContext.configure(
connection=sync_conn,
opts={"script": ScriptDirectory.from_config(cfg)},
)
return migration_context.get_current_revision()
# Run the sync function using run_sync
current_rev = await conn.run_sync(get_rev_sync)
assert current_rev is None, (
f"Expected base revision (None), got {current_rev}"
)
finally:
await engine.dispose()
def test_alembic_respects_environment_precedence(clean_env, temp_db_path):
"""Test that PAELLADOC_DB_PATH takes precedence over PAELLADOC_ENV."""
# Set both environment variables
os.environ["PAELLADOC_DB_PATH"] = str(temp_db_path)
os.environ["PAELLADOC_ENV"] = "development" # This should be ignored
# Verify that get_db_path() returns the path from PAELLADOC_DB_PATH
resolved_path = get_db_path()
assert resolved_path == temp_db_path
================
File: tests/integration/adapters/plugins/core/test_paella.py
================
"""
Integration tests for the core.paella plugin.
"""
import pytest
import asyncio
import sys
import os
from pathlib import Path
import uuid
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
from paelladoc.domain.models.language import SupportedLanguage
from paelladoc.adapters.plugins.core.paella import (
paella_init,
paella_list,
paella_select,
)
from paelladoc.domain.models.project import (
ProjectInfo, # Import Metadata and rename
)
# Adapter for verification
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# --- Pytest Fixture for Temporary DB --- #
@pytest.fixture(scope="function")
async def memory_adapter():
"""Provides an initialized SQLiteMemoryAdapter with a temporary DB."""
test_db_name = f"test_paella_{uuid.uuid4()}.db"
test_dir = Path(__file__).parent / "temp_dbs"
test_db_path = test_dir / test_db_name
test_db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"\nSetting up test with DB: {test_db_path}")
adapter = SQLiteMemoryAdapter(db_path=test_db_path)
await adapter._create_db_and_tables()
yield adapter
print(f"Tearing down test, removing DB: {test_db_path}")
await asyncio.sleep(0.01) # Brief pause for file lock release
try:
if test_db_path.exists():
os.remove(test_db_path)
print(f"Removed DB: {test_db_path}")
try:
test_db_path.parent.rmdir()
print(f"Removed test directory: {test_db_path.parent}")
except OSError:
pass # Directory not empty, likely other tests running concurrently
except Exception as e:
print(f"Error during teardown removing {test_db_path}: {e}")
# --- Test Cases --- #
@pytest.mark.asyncio
async def test_create_new_project_asks_for_base_path_and_saves_it(
memory_adapter,
monkeypatch,
):
"""
Verify the interactive flow for creating a new project:
1. Asks for interaction language.
2. Lists projects (if any) and asks action (create new).
3. Asks for documentation language.
4. Asks for new project name (checks for existence).
5. Asks for base path.
6. Creates the project, saves absolute base path, saves initial memory.
"""
print("\nRunning: test_create_new_project_asks_for_base_path_and_saves_it")
interaction_lang = SupportedLanguage.EN_US.value
doc_lang = SupportedLanguage.EN_US.value
project_name = f"test-project-{uuid.uuid4()}"
base_path_input = "./test_paella_docs" # Relative path input
expected_abs_base_path = Path(base_path_input).resolve()
# --- Monkeypatch the database path resolution ---
# Patch get_db_path where SQLiteMemoryAdapter imports it,
# so core_paella uses the temporary DB path when it creates its own adapter.
monkeypatch.setattr(
"paelladoc.adapters.output.sqlite.sqlite_memory_adapter.get_db_path",
lambda: memory_adapter.db_path, # Return the path from the fixture
)
# Initialize project
init_result = await paella_init(
base_path=base_path_input,
documentation_language=doc_lang,
interaction_language=interaction_lang,
new_project_name=project_name,
)
assert init_result["status"] == "ok"
assert init_result["project_name"] == project_name
assert init_result["base_path"] == str(expected_abs_base_path)
# Clean up
if expected_abs_base_path.exists():
import shutil
shutil.rmtree(expected_abs_base_path)
@pytest.mark.asyncio
async def test_paella_workflow():
"""Test the complete PAELLA workflow."""
# Test data
project_name = f"test_project_{uuid.uuid4().hex[:8]}"
base_path = f"docs/{project_name}"
doc_language = SupportedLanguage.EN_US.value
int_language = SupportedLanguage.EN_US.value
# Initialize project
init_result = await paella_init(
base_path=base_path,
documentation_language=doc_language,
interaction_language=int_language,
new_project_name=project_name,
)
assert init_result["status"] == "ok"
assert init_result["project_name"] == project_name
assert init_result["base_path"] == str(Path(base_path).expanduser().resolve())
# List projects
list_result = await paella_list()
assert list_result["status"] == "ok"
assert isinstance(list_result["projects"], list)
# Extract names from ProjectInfo objects before checking membership
project_names_list = [
info.name for info in list_result["projects"] if isinstance(info, ProjectInfo)
]
assert project_name in project_names_list
# Select project
select_result = await paella_select(project_name=project_name)
assert select_result["status"] == "ok"
assert select_result["project_name"] == project_name
# Clean up
project_dir = Path(base_path)
if project_dir.exists():
import shutil
shutil.rmtree(project_dir)
================
File: tests/integration/adapters/plugins/core/test_list_projects.py
================
"""
Integration tests for the core.list_projects plugin.
"""
import pytest
import asyncio
import sys
import os
from pathlib import Path
import uuid
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Adapter is needed to pre-populate the DB for the test
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# Import domain models to create test data
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo,
Bucket,
ArtifactMeta,
)
from paelladoc.domain.models.language import SupportedLanguage
# --- Helper Function to create test data --- #
def _create_sample_memory(name_suffix: str) -> ProjectMemory:
"""Helper to create a sample ProjectMemory object."""
project_name = f"test-project-{name_suffix}-{uuid.uuid4()}"
# Add a dummy artifact to make it valid
artifact = ArtifactMeta(
name="dummy.md", bucket=Bucket.UNKNOWN, path=Path("dummy.md")
)
memory = ProjectMemory(
project_info=ProjectInfo(
name=project_name,
interaction_language=SupportedLanguage.EN_US,
documentation_language=SupportedLanguage.EN_US,
base_path=Path(f"./docs/{project_name}").resolve(),
purpose="testing list projects",
target_audience="devs",
objectives=["test list"],
),
artifacts={Bucket.UNKNOWN: [artifact]},
taxonomy_version="0.5",
)
return memory
# --- Pytest Fixture for Temporary DB (copied from test_paella) --- #
@pytest.fixture(scope="function")
async def memory_adapter():
"""Provides an initialized SQLiteMemoryAdapter with a temporary DB."""
test_db_name = f"test_list_projects_{uuid.uuid4()}.db"
test_dir = Path(__file__).parent / "temp_dbs_list"
test_db_path = test_dir / test_db_name
test_db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"\nSetting up test with DB: {test_db_path}")
adapter = SQLiteMemoryAdapter(db_path=test_db_path)
await adapter._create_db_and_tables()
yield adapter # Provide the adapter to the test function
# Teardown
print(f"Tearing down test, removing DB: {test_db_path}")
await asyncio.sleep(0.01)
try:
if test_db_path.exists():
os.remove(test_db_path)
print(f"Removed DB: {test_db_path}")
try:
test_db_path.parent.rmdir()
print(f"Removed test directory: {test_db_path.parent}")
except OSError:
pass
except Exception as e:
print(f"Error during teardown removing {test_db_path}: {e}")
# --- Test Case --- #
@pytest.mark.asyncio
async def test_list_projects_returns_saved_projects(
memory_adapter: SQLiteMemoryAdapter,
):
"""
Verify that core.list_projects correctly lists projects previously saved.
THIS TEST WILL FAIL until the tool and adapter method are implemented.
"""
print("\nRunning: test_list_projects_returns_saved_projects")
# Arrange: Save some projects directly using the adapter
project1_memory = _create_sample_memory("list1")
project2_memory = _create_sample_memory("list2")
await memory_adapter.save_memory(project1_memory)
await memory_adapter.save_memory(project2_memory)
expected_project_names = sorted(
[project1_memory.project_info.name, project2_memory.project_info.name]
)
print(f"Saved projects: {expected_project_names}")
# Act: Call the tool function with our test db_path
from paelladoc.adapters.plugins.core.list_projects import list_projects
# Pass the path to our temporary test database
db_path_str = str(memory_adapter.db_path)
print(f"Using test DB path: {db_path_str}")
result = await list_projects(db_path=db_path_str)
# Assert: Check the response
assert result["status"] == "ok", f"Expected status ok, got {result.get('status')}"
assert "projects" in result, "Response missing 'projects' key"
assert isinstance(result["projects"], list), "'projects' should be a list"
# Extract names from the ProjectInfo objects returned by the plugin
returned_project_names = sorted(
[info.name for info in result["projects"] if isinstance(info, ProjectInfo)]
)
# Compare the sorted list of names
assert returned_project_names == expected_project_names, (
f"Expected project names {expected_project_names}, but got {returned_project_names}"
)
================
File: tests/integration/adapters/output/test_sqlite_memory_adapter_config.py
================
"""Integration tests for SQLite adapter configuration."""
import os
import pytest
import asyncio
from pathlib import Path
import uuid
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo,
)
@pytest.fixture
def clean_env():
"""Remove relevant environment variables before each test."""
original_db_path = os.environ.get("PAELLADOC_DB_PATH")
original_env = os.environ.get("PAELLADOC_ENV")
if "PAELLADOC_DB_PATH" in os.environ:
del os.environ["PAELLADOC_DB_PATH"]
if "PAELLADOC_ENV" in os.environ:
del os.environ["PAELLADOC_ENV"]
yield
if original_db_path is not None:
os.environ["PAELLADOC_DB_PATH"] = original_db_path
if original_env is not None:
os.environ["PAELLADOC_ENV"] = original_env
@pytest.fixture
async def temp_adapter():
"""Create a temporary adapter with a unique database."""
test_db_name = f"test_config_{uuid.uuid4()}.db"
test_dir = Path(__file__).parent / "temp_dbs"
test_db_path = test_dir / test_db_name
test_db_path.parent.mkdir(parents=True, exist_ok=True)
adapter = SQLiteMemoryAdapter(db_path=test_db_path)
await adapter._create_db_and_tables()
yield adapter
# Cleanup
await asyncio.sleep(0.01) # Brief pause for file lock release
try:
if test_db_path.exists():
os.remove(test_db_path)
test_db_path.parent.rmdir()
except Exception as e:
print(f"Error during cleanup: {e}")
@pytest.mark.asyncio
async def test_adapter_uses_custom_path(clean_env):
"""Verify adapter uses the path provided in __init__."""
custom_path = create_temp_db_path()
adapter = SQLiteMemoryAdapter(db_path=custom_path)
assert adapter.db_path == custom_path
# Clean up the test file if it was created
if custom_path.exists():
os.remove(custom_path)
@pytest.mark.asyncio
async def test_adapter_uses_env_var_path(clean_env):
"""Verify adapter uses PAELLADOC_DB_PATH environment variable if set."""
env_path = create_temp_db_path()
os.environ["PAELLADOC_DB_PATH"] = str(env_path)
adapter = SQLiteMemoryAdapter() # No path given, should use env var
assert adapter.db_path == env_path
if env_path.exists():
os.remove(env_path)
@pytest.mark.asyncio
async def test_adapter_uses_production_path(clean_env):
"""Verify adapter uses PRODUCTION_DB_PATH by default."""
# Ensure no env vars are set that override the default
os.environ.pop("PAELLADOC_DB_PATH", None)
os.environ.pop("PAELLADOC_ENV", None)
adapter = SQLiteMemoryAdapter()
expected_path = Path.home() / ".paelladoc" / "memory.db" # Get default directly
assert adapter.db_path == expected_path
@pytest.mark.asyncio
async def test_adapter_creates_parent_directory(clean_env):
"""Verify the adapter ensures the parent directory for the DB exists."""
test_subdir = Path.home() / ".paelladoc_test_dir" / str(uuid.uuid4())
custom_path = test_subdir / "test_creation.db"
# Ensure the directory does not exist initially
if test_subdir.exists():
for item in test_subdir.iterdir(): # Clear if exists
os.remove(item)
os.rmdir(test_subdir)
assert not test_subdir.exists()
# The adapter instantiation triggers the directory creation
_ = SQLiteMemoryAdapter(db_path=custom_path) # Assign to _ as intentionally unused
# Initialization should create the parent directory
assert test_subdir.exists()
assert test_subdir.is_dir()
# Clean up
if custom_path.exists():
os.remove(custom_path)
if test_subdir.exists():
os.rmdir(test_subdir)
@pytest.mark.asyncio
async def test_adapter_operations_with_custom_path(temp_adapter):
"""Test basic adapter operations with custom path."""
# Create test project
project = ProjectMemory(
project_info=ProjectInfo(
name=f"test-project-{uuid.uuid4()}",
language="python",
purpose="Test project",
target_audience="Developers",
objectives=["Test database configuration"],
)
)
# Test operations
await temp_adapter.save_memory(project)
assert await temp_adapter.project_exists(project.project_info.name)
loaded = await temp_adapter.load_memory(project.project_info.name)
assert loaded is not None
assert loaded.project_info.name == project.project_info.name
projects_info = await temp_adapter.list_projects()
# Extract names from the returned ProjectInfo objects
project_names = [info.name for info in projects_info]
assert project.project_info.name in project_names
# Helper function to create a unique temporary DB path
def create_temp_db_path(prefix="test_adapter_config") -> Path:
test_db_name = f"{prefix}_{uuid.uuid4()}.db"
# Use /tmp or a similar temporary directory standard across systems
test_db_path = Path("/tmp") / test_db_name
# test_db_path.parent.mkdir(parents=True, exist_ok=True) # /tmp should exist
print(f"\nGenerated temporary DB path: {test_db_path}")
return test_db_path
================
File: tests/integration/adapters/output/test_sqlite_memory_adapter.py
================
"""
Integration tests for the SQLiteMemoryAdapter.
"""
import pytest # Use pytest
import asyncio
import sys
import os
from pathlib import Path
import uuid
import datetime
from typing import Dict, List
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Module to test
from paelladoc.adapters.output.sqlite.sqlite_memory_adapter import SQLiteMemoryAdapter
# Import updated domain models
from paelladoc.domain.models.project import (
ProjectMemory,
ProjectInfo,
ArtifactMeta,
DocumentStatus,
Bucket,
)
# --- Pytest Fixture for Temporary DB --- #
@pytest.fixture(scope="function") # Recreate DB for each test function
async def memory_adapter():
"""Provides an initialized SQLiteMemoryAdapter with a temporary DB."""
test_db_name = f"test_memory_{uuid.uuid4()}.db"
test_db_path = Path("./temp_test_dbs") / test_db_name
test_db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"\nSetting up test with DB: {test_db_path}")
adapter = SQLiteMemoryAdapter(db_path=test_db_path)
await adapter._create_db_and_tables()
yield adapter # Provide the adapter to the test function
# Teardown: clean up the database
print(f"Tearing down test, removing DB: {test_db_path}")
# Dispose engine if needed
# await adapter.async_engine.dispose()
await asyncio.sleep(0.01)
try:
if test_db_path.exists():
os.remove(test_db_path)
print(f"Removed DB: {test_db_path}")
try:
test_db_path.parent.rmdir()
print(f"Removed test directory: {test_db_path.parent}")
except OSError:
pass # Directory not empty or other issue
except Exception as e:
print(f"Error during teardown removing {test_db_path}: {e}")
# --- Helper Function --- #
def _create_sample_memory(name_suffix: str) -> ProjectMemory:
"""Helper to create a sample ProjectMemory object with Artifacts."""
project_name = f"test-project-{name_suffix}"
# Create sample artifacts
artifact1 = ArtifactMeta(
name="README",
bucket=Bucket.INITIATE_INITIAL_PRODUCT_DOCS,
path=Path("README.md"),
status=DocumentStatus.PENDING,
)
artifact2 = ArtifactMeta(
name="main.py generation script",
bucket=Bucket.GENERATE_SUPPORTING_ELEMENTS,
path=Path("scripts/generate_main.py"),
status=DocumentStatus.IN_PROGRESS,
)
artifacts: Dict[Bucket, List[ArtifactMeta]] = {
Bucket.INITIATE_INITIAL_PRODUCT_DOCS: [artifact1],
Bucket.GENERATE_SUPPORTING_ELEMENTS: [artifact2],
}
memory = ProjectMemory(
project_info=ProjectInfo(
name=project_name,
language="python",
purpose="testing adapter v2",
target_audience="devs",
objectives=["test save artifacts", "test load artifacts"],
),
artifacts=artifacts,
taxonomy_version="0.5",
)
return memory
# --- Test Cases (using pytest and pytest-asyncio) --- #
@pytest.mark.asyncio
async def test_project_exists_on_empty_db(memory_adapter: SQLiteMemoryAdapter):
"""Test project_exists returns False when the DB is empty/project not saved."""
print("Running: test_project_exists_on_empty_db")
exists = await memory_adapter.project_exists("nonexistent-project")
assert not exists
@pytest.mark.asyncio
async def test_load_memory_on_empty_db(memory_adapter: SQLiteMemoryAdapter):
"""Test load_memory returns None when the DB is empty/project not saved."""
print("Running: test_load_memory_on_empty_db")
loaded_memory = await memory_adapter.load_memory("nonexistent-project")
assert loaded_memory is None
@pytest.mark.asyncio
async def test_save_and_load_new_project(memory_adapter: SQLiteMemoryAdapter):
"""Test saving a new project with artifacts and loading it back."""
print("Running: test_save_and_load_new_project")
original_memory = _create_sample_memory("save-load-artifacts")
project_name = original_memory.project_info.name
original_artifacts = original_memory.artifacts
artifact1_id = original_artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS][0].id
artifact2_id = original_artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS][0].id
# Save
await memory_adapter.save_memory(original_memory)
print(f"Saved project: {project_name}")
# Load
loaded_memory = await memory_adapter.load_memory(project_name)
print(f"Loaded project: {project_name}")
# Assertions
assert loaded_memory is not None
assert loaded_memory.project_info.name == original_memory.project_info.name
assert loaded_memory.project_info.language == original_memory.project_info.language
assert (
loaded_memory.project_info.objectives == original_memory.project_info.objectives
)
assert loaded_memory.taxonomy_version == original_memory.taxonomy_version
# Check artifacts dictionary structure
# Note: If the adapter pads with empty buckets, adjust this check
# For now, assume only buckets with artifacts are loaded
assert Bucket.INITIATE_INITIAL_PRODUCT_DOCS in loaded_memory.artifacts
assert Bucket.GENERATE_SUPPORTING_ELEMENTS in loaded_memory.artifacts
assert len(loaded_memory.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS]) == 1
assert len(loaded_memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS]) == 1
# assert len(loaded_memory.artifacts[Bucket.DEPLOY_SECURITY]) == 0 # Check depends on adapter behavior
# Check artifact details
loaded_artifact1 = loaded_memory.get_artifact_by_path(Path("README.md"))
assert loaded_artifact1 is not None
assert loaded_artifact1.id == artifact1_id
assert loaded_artifact1.name == "README"
assert loaded_artifact1.bucket == Bucket.INITIATE_INITIAL_PRODUCT_DOCS
assert loaded_artifact1.status == DocumentStatus.PENDING
loaded_artifact2 = loaded_memory.get_artifact_by_path(
Path("scripts/generate_main.py")
)
assert loaded_artifact2 is not None
assert loaded_artifact2.id == artifact2_id
assert loaded_artifact2.name == "main.py generation script"
assert loaded_artifact2.bucket == Bucket.GENERATE_SUPPORTING_ELEMENTS
assert loaded_artifact2.status == DocumentStatus.IN_PROGRESS
# Check timestamps - don't compare exact values since they'll be different due to persistence/mocking
# Just verify that created_at is a valid UTC timestamp
assert loaded_memory.created_at.tzinfo == datetime.timezone.utc
assert isinstance(loaded_memory.created_at, datetime.datetime)
assert isinstance(loaded_memory.last_updated_at, datetime.datetime)
# Verify the loaded timestamps are in a reasonable range
# Current time should be >= last_updated_at
assert datetime.datetime.now(datetime.timezone.utc) >= loaded_memory.last_updated_at
@pytest.mark.asyncio
async def test_project_exists_after_save(memory_adapter: SQLiteMemoryAdapter):
"""Test project_exists returns True after a project is saved."""
print("Running: test_project_exists_after_save")
memory_to_save = _create_sample_memory("exists-artifacts")
project_name = memory_to_save.project_info.name
await memory_adapter.save_memory(memory_to_save)
print(f"Saved project: {project_name}")
exists = await memory_adapter.project_exists(project_name)
assert exists
@pytest.mark.asyncio
async def test_save_updates_project(memory_adapter: SQLiteMemoryAdapter):
"""Test saving updates: changing artifact status, adding, removing."""
print("Running: test_save_updates_project")
# 1. Create and save initial state
memory = _create_sample_memory("update-artifacts")
project_name = memory.project_info.name
artifact1 = memory.artifacts[Bucket.INITIATE_INITIAL_PRODUCT_DOCS][0]
# artifact2 = memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS][0] # No need to store if removing
await memory_adapter.save_memory(memory)
print(f"Initial save for {project_name}")
# 2. Modify the domain object
artifact1.update_status(DocumentStatus.COMPLETED)
artifact3 = ArtifactMeta(
name="Deployment Script",
bucket=Bucket.DEPLOY_PIPELINES_AND_AUTOMATION,
path=Path("deploy.sh"),
)
# Add artifact3 - ensure bucket exists in dict first
if artifact3.bucket not in memory.artifacts:
memory.artifacts[artifact3.bucket] = []
memory.artifacts[artifact3.bucket].append(artifact3)
# Remove artifact2 - remove the list if it becomes empty
del memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS][0]
if not memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS]:
del memory.artifacts[Bucket.GENERATE_SUPPORTING_ELEMENTS]
# 3. Save the updated memory
await memory_adapter.save_memory(memory)
print(f"Saved updates for {project_name}")
# 4. Load and verify
loaded_memory = await memory_adapter.load_memory(project_name)
assert loaded_memory is not None
# Verify artifact1 status updated
loaded_artifact1 = loaded_memory.get_artifact_by_path(Path("README.md"))
assert loaded_artifact1 is not None
assert loaded_artifact1.status == DocumentStatus.COMPLETED
assert loaded_artifact1.id == artifact1.id
# Verify artifact2 removed
loaded_artifact2 = loaded_memory.get_artifact_by_path(
Path("scripts/generate_main.py")
)
assert loaded_artifact2 is None
assert not loaded_memory.artifacts.get(Bucket.GENERATE_SUPPORTING_ELEMENTS)
# Verify artifact3 added
loaded_artifact3 = loaded_memory.get_artifact_by_path(Path("deploy.sh"))
assert loaded_artifact3 is not None
assert loaded_artifact3.name == "Deployment Script"
assert loaded_artifact3.bucket == Bucket.DEPLOY_PIPELINES_AND_AUTOMATION
assert loaded_artifact3.status == DocumentStatus.PENDING
assert loaded_artifact3.id == artifact3.id
# Run tests if executed directly (optional, better via test runner)
# if __name__ == "__main__":
# # Consider using asyncio.run() if needed for top-level execution
# unittest.main()
================
File: tests/integration/adapters/output/test_chroma_vector_store_adapter.py
================
"""
Integration tests for the ChromaVectorStoreAdapter.
"""
import unittest
import asyncio
import sys
from pathlib import Path
import uuid
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
# Module to test
from paelladoc.adapters.output.chroma.chroma_vector_store_adapter import (
ChromaVectorStoreAdapter,
NotFoundError,
)
from paelladoc.ports.output.vector_store_port import SearchResult # Import base class
# Import Chroma specific types for assertions if needed
from chromadb.api.models.Collection import Collection
class TestChromaVectorStoreAdapterIntegration(unittest.IsolatedAsyncioTestCase):
"""Integration tests using an in-memory ChromaDB client."""
def setUp(self):
"""Set up an in-memory Chroma client and a unique collection name."""
print("\nSetting up test...")
self.adapter = ChromaVectorStoreAdapter(in_memory=True)
# Generate a unique collection name for each test to ensure isolation
self.collection_name = f"test_collection_{uuid.uuid4()}"
print(f"Using collection name: {self.collection_name}")
async def asyncTearDown(self):
"""Attempt to clean up the test collection."""
print(
f"Tearing down test, attempting to delete collection: {self.collection_name}"
)
try:
# Use the adapter's method to delete
await self.adapter.delete_collection(self.collection_name)
print(f"Deleted collection: {self.collection_name}")
except Exception as e:
# Log error if deletion fails, but don't fail the test run
print(
f"Error during teardown deleting collection {self.collection_name}: {e}"
)
# We can also try listing collections to see if it exists
try:
collections = self.adapter.client.list_collections()
collection_names = [col.name for col in collections]
if self.collection_name in collection_names:
print(
f"Collection {self.collection_name} still exists after teardown attempt."
)
else:
print(
f"Collection {self.collection_name} confirmed deleted or never existed."
)
except Exception as list_e:
print(f"Error listing collections during teardown check: {list_e}")
# --- Test Cases --- #
async def test_get_or_create_collection_creates_new(self):
"""Test that a new collection is created if it doesn't exist."""
print(f"Running: {self._testMethodName}")
collection = await self.adapter.get_or_create_collection(self.collection_name)
self.assertIsInstance(collection, Collection)
self.assertEqual(collection.name, self.collection_name)
# Verify it exists in the client
collections = self.adapter.client.list_collections()
collection_names = [col.name for col in collections]
self.assertIn(self.collection_name, collection_names)
async def test_get_or_create_collection_retrieves_existing(self):
"""Test that an existing collection is retrieved."""
print(f"Running: {self._testMethodName}")
# Create it first
collection1 = await self.adapter.get_or_create_collection(self.collection_name)
self.assertIsNotNone(collection1)
# Get it again
collection2 = await self.adapter.get_or_create_collection(self.collection_name)
self.assertIsInstance(collection2, Collection)
self.assertEqual(collection2.name, self.collection_name)
# Check they are likely the same underlying collection (same ID)
self.assertEqual(collection1.id, collection2.id)
async def test_add_documents(self):
"""Test adding documents to a collection."""
print(f"Running: {self._testMethodName}")
docs_to_add = ["doc one text", "doc two text"]
metadatas = [{"source": "test1"}, {"source": "test2"}]
ids = ["id1", "id2"]
returned_ids = await self.adapter.add_documents(
self.collection_name, docs_to_add, metadatas, ids
)
self.assertEqual(returned_ids, ids)
# Verify documents were added using the underlying client API
collection = await self.adapter.get_or_create_collection(self.collection_name)
results = collection.get(ids=ids, include=["metadatas", "documents"])
self.assertIsNotNone(results)
self.assertListEqual(results["ids"], ids)
self.assertListEqual(results["documents"], docs_to_add)
self.assertListEqual(results["metadatas"], metadatas)
self.assertEqual(collection.count(), 2)
async def test_add_documents_without_ids(self):
"""Test adding documents letting Chroma generate IDs."""
print(f"Running: {self._testMethodName}")
docs_to_add = ["auto id doc 1", "auto id doc 2"]
metadatas = [{"type": "auto"}, {"type": "auto"}]
returned_ids = await self.adapter.add_documents(
self.collection_name, docs_to_add, metadatas
)
self.assertEqual(len(returned_ids), 2)
self.assertIsInstance(returned_ids[0], str)
self.assertIsInstance(returned_ids[1], str)
# Verify using the returned IDs
collection = await self.adapter.get_or_create_collection(self.collection_name)
results = collection.get(ids=returned_ids, include=["metadatas", "documents"])
self.assertIsNotNone(results)
self.assertCountEqual(
results["ids"], returned_ids
) # Order might not be guaranteed?
self.assertCountEqual(results["documents"], docs_to_add)
self.assertCountEqual(results["metadatas"], metadatas)
self.assertEqual(collection.count(), 2)
async def test_delete_collection(self):
"""Test deleting a collection."""
print(f"Running: {self._testMethodName}")
# Create it first
await self.adapter.get_or_create_collection(self.collection_name)
# Verify it exists
collections_before = self.adapter.client.list_collections()
self.assertIn(self.collection_name, [c.name for c in collections_before])
# Delete it using the adapter
await self.adapter.delete_collection(self.collection_name)
# Verify it's gone
collections_after = self.adapter.client.list_collections()
self.assertNotIn(self.collection_name, [c.name for c in collections_after])
# Attempting to get it should now raise NotFoundError or ValueError (depending on Chroma version)
with self.assertRaises((NotFoundError, ValueError)):
self.adapter.client.get_collection(name=self.collection_name)
async def _add_sample_search_data(self):
"""Helper to add some consistent data for search tests."""
docs = [
"This is the first document about apples.",
"This document discusses oranges and citrus.",
"A third document, focusing on bananas.",
"Another apple document for testing similarity.",
]
metadatas = [
{"source": "doc1", "type": "fruit", "year": 2023},
{"source": "doc2", "type": "fruit", "year": 2024},
{"source": "doc3", "type": "fruit", "year": 2023},
{"source": "doc4", "type": "fruit", "year": 2024},
]
ids = ["s_id1", "s_id2", "s_id3", "s_id4"]
await self.adapter.add_documents(self.collection_name, docs, metadatas, ids)
print(f"Added sample search data to collection: {self.collection_name}")
# Short delay to allow potential indexing if needed (though likely not for in-memory)
await asyncio.sleep(0.1)
async def test_search_simple(self):
"""Test basic similarity search."""
print(f"Running: {self._testMethodName}")
await self._add_sample_search_data()
query = "Tell me about apples"
results = await self.adapter.search_similar(
self.collection_name, [query], n_results=2
)
self.assertEqual(len(results), 1) # One list for the single query
self.assertEqual(len(results[0]), 2) # Two results requested
# Check the content of the results (order might vary based on embedding similarity)
result_docs = [r.document for r in results[0]]
self.assertIn("This is the first document about apples.", result_docs)
self.assertIn("Another apple document for testing similarity.", result_docs)
# Check metadata and ID are included
first_result = results[0][0]
self.assertIsInstance(first_result, SearchResult)
self.assertIsNotNone(first_result.id)
self.assertIsNotNone(first_result.metadata)
self.assertIsNotNone(first_result.distance)
async def test_search_with_metadata_filter(self):
"""Test search with a 'where' clause for metadata filtering."""
print(f"Running: {self._testMethodName}")
await self._add_sample_search_data()
query = "Tell me about fruit"
# Filter for documents from year 2023
where_filter = {"year": 2023}
results = await self.adapter.search_similar(
self.collection_name, [query], n_results=3, where=where_filter
)
self.assertEqual(len(results), 1)
# Should only find doc1 and doc3 from year 2023
self.assertLessEqual(
len(results[0]), 2
) # Might return fewer than n_results if filter is strict
# Corrected: Access metadata via r.metadata, not r.project_info
returned_sources = [r.metadata.get("source") for r in results[0] if r.metadata]
# We expect only doc1 and doc3 from year 2023
expected_sources = ["doc1", "doc3"]
self.assertCountEqual(returned_sources, expected_sources)
async def test_search_no_results(self):
"""Test search for text unrelated to the documents."""
print(f"Running: {self._testMethodName}")
await self._add_sample_search_data()
query = "Information about programming languages"
results = await self.adapter.search_similar(
self.collection_name, [query], n_results=1
)
self.assertEqual(len(results), 1)
# Depending on the embedding model, might still return *something* even if very dissimilar.
# A more robust test might check the distance if available.
# For now, let's assume it might return the closest, even if irrelevant, or empty.
# If it returns results, ensure they are SearchResult instances
if results[0]:
self.assertIsInstance(results[0][0], SearchResult)
else:
self.assertEqual(len(results[0]), 0) # Or assert empty list
async def test_search_in_nonexistent_collection(self):
"""Test search returns empty list if collection doesn't exist."""
print(f"Running: {self._testMethodName}")
query = "anything"
results = await self.adapter.search_similar(
"nonexistent_collection_for_search", [query], n_results=1
)
self.assertEqual(len(results), 1) # Still returns a list for the query
self.assertEqual(len(results[0]), 0) # But the inner list is empty
# if __name__ == "__main__":
# unittest.main()
================
File: tests/e2e/test_cursor_simulation.py
================
"""
End-to-End tests for Paelladoc MCP Server.
This simulates how Cursor would interact with the server.
"""
import unittest
import sys
from pathlib import Path
# Ensure we can import Paelladoc modules
project_root = Path(__file__).parent.parent.parent.absolute()
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
# Import directly from the domain layer
from paelladoc.domain.core_logic import mcp, ping
class TestCursorE2E(unittest.TestCase):
"""End-to-End tests simulating Cursor interacting with Paelladoc."""
def test_direct_ping_call(self):
"""Test direct call to the ping function."""
# Call the ping function directly
result = ping()
# Verify the result
self.assertIsInstance(result, dict, "Ping should return a dict")
self.assertEqual(result["status"], "ok", "Status should be 'ok'")
self.assertEqual(result["message"], "pong", "Message should be 'pong'")
def test_ping_with_parameter(self):
"""Test ping function with a parameter."""
# Call ping with a test parameter
result = ping(random_string="test-parameter")
# Verify the result
self.assertIsInstance(result, dict, "Ping should return a dict")
self.assertEqual(result["status"], "ok", "Status should be 'ok'")
self.assertEqual(result["message"], "pong", "Message should be 'pong'")
def test_mcp_tool_registration(self):
"""Verify that the ping tool is registered with MCP."""
# Get tools registered with MCP
tool_manager = getattr(mcp, "_tool_manager", None)
self.assertIsNotNone(tool_manager, "MCP should have a tool manager")
tools = tool_manager.list_tools()
# Check if the ping tool is registered
tool_names = [tool.name for tool in tools]
self.assertIn("ping", tool_names, "Ping tool should be registered with MCP")
if __name__ == "__main__":
unittest.main()