tool_handlers.pyā¢64.5 kB
"""
Tool handler functions for MCP server (QUA-002).
Extracted from server.py to improve maintainability and testability.
Each handler is a standalone async function that can be tested independently.
"""
from pathlib import Path
from mcp.types import TextContent
import json
import jsonschema
import time
from datetime import datetime
# These will be injected from server.py
TEMPLATES_DIR = None
# Import dependencies
from typing import Any
from generators import FoundationGenerator, BaseGenerator, ChangelogGenerator, StandardsGenerator, AuditGenerator
from generators.planning_analyzer import PlanningAnalyzer
from generators.plan_validator import PlanValidator
from generators.review_formatter import ReviewFormatter
from generators.planning_generator import PlanningGenerator
from constants import Paths, Files, ScanDepth, FocusArea, AuditSeverity, AuditScope, PlanningPaths
from validation import (
validate_project_path_input,
validate_version_format,
validate_template_name_input,
validate_changelog_inputs,
validate_scan_depth,
validate_focus_areas,
validate_severity_filter,
validate_audit_scope,
validate_section_name,
validate_plan_file_path,
validate_feature_name_input,
VALID_TEMPLATE_SECTIONS
)
from type_defs import PlanningTemplateDict, PreparationSummaryDict, PlanResultDict
from error_responses import ErrorResponse
# Import logging (ARCH-003)
from logger_config import logger
# Import decorators and helpers (ARCH-004, ARCH-005, QUA-004)
from handler_decorators import mcp_error_handler, log_invocation
from handler_helpers import format_success_response
@log_invocation
@mcp_error_handler
async def handle_list_templates(arguments: dict) -> list[TextContent]:
"""
Handle list_templates tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
logger.debug(f"Listing templates from {TEMPLATES_DIR}")
templates = []
if TEMPLATES_DIR.exists():
for file in TEMPLATES_DIR.glob("*.txt"):
templates.append(file.stem)
logger.info(f"Found {len(templates)} templates")
if templates:
result = "Available POWER Framework Templates:\n\n"
for i, template in enumerate(sorted(templates), 1):
result += f"{i}. {template}\n"
result += f"\nTotal: {len(templates)} templates"
else:
logger.warning("No templates found in templates directory")
result = "No templates found in templates/power/"
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_get_template(arguments: dict) -> list[TextContent]:
"""
Handle get_template tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate input at boundary (REF-003)
template_name = validate_template_name_input(arguments.get("template_name", ""))
logger.debug(f"Reading template: {template_name}")
template_file = TEMPLATES_DIR / f"{template_name}.txt"
if not template_file.exists():
logger.warning(f"Template not found: {template_name}")
raise FileNotFoundError(f"Template '{template_name}'")
with open(template_file, 'r', encoding='utf-8') as f:
content = f.read()
logger.info(f"Successfully read template: {template_name}")
result = f"=== {template_name.upper()} Template ===\n\n{content}"
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_generate_foundation_docs(arguments: dict) -> list[TextContent]:
"""
Handle generate_foundation_docs tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate input at boundary (REF-003)
project_path = validate_project_path_input(arguments.get("project_path", ""))
logger.info(f"Generating foundation docs for project: {project_path}")
# Initialize foundation generator
generator = FoundationGenerator(TEMPLATES_DIR)
# Prepare paths for generation
paths = generator.prepare_generation(project_path)
# Get generation plan
plan = generator.get_generation_plan(project_path)
# Get all templates
logger.debug("Loading all foundation templates")
templates = generator.get_templates_for_generation()
logger.info(f"Loaded {len(templates)} foundation templates")
# Build response
result = plan + "\n\n" + "=" * 50 + "\n\n"
result += "TEMPLATES FOR GENERATION:\n\n"
for template in templates:
if 'error' in template:
result += f"ERROR - {template['template_name']}: {template['error']}\n\n"
else:
result += f"=== {template['template_name'].upper()} ===\n\n"
result += f"{template['template_content']}\n\n"
result += "-" * 50 + "\n\n"
result += "\nINSTRUCTIONS:\n"
result += "Generate each document in order using the templates above.\n\n"
result += "SAVE LOCATIONS (SEC-003):\n"
result += f"- README.md ā {paths['project_path']}/README.md\n"
result += f"- All other docs ā {paths['output_dir']}/\n\n"
result += "Each document should reference previous documents as indicated in the templates.\n"
logger.info(f"Successfully generated foundation docs plan for: {project_path}")
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_generate_individual_doc(arguments: dict) -> list[TextContent]:
"""
Handle generate_individual_doc tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs at boundary (REF-003)
project_path = validate_project_path_input(arguments.get("project_path", ""))
template_name = validate_template_name_input(arguments.get("template_name", ""))
logger.info(f"Generating individual doc: {template_name} for project: {project_path}")
generator = BaseGenerator(TEMPLATES_DIR)
paths = generator.prepare_generation(project_path)
template_content = generator.read_template(template_name)
template_info = generator.get_template_info(template_name)
# Get correct output path (SEC-003: README goes to root)
output_path = generator.get_doc_output_path(paths['project_path'], template_name)
logger.debug(f"Output path determined: {output_path}")
result = f"=== Generating {template_name.upper()} ===\n\n"
result += f"Project: {paths['project_path']}\n"
result += f"Output: {output_path}\n\n"
result += "=" * 50 + "\n\n"
result += f"TEMPLATE:\n\n{template_content}\n\n"
result += "=" * 50 + "\n\n"
result += "INSTRUCTIONS:\n"
result += f"Generate {template_info.get('save_as', f'{template_name.upper()}.md')} using the template above.\n"
result += f"Save the document to: {output_path}\n"
logger.info(f"Successfully generated plan for {template_name}")
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_get_changelog(arguments: dict) -> list[TextContent]:
"""
Handle get_changelog tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs at boundary (REF-003)
project_path = validate_project_path_input(arguments.get("project_path"))
version = arguments.get("version")
if version:
version = validate_version_format(version)
change_type = arguments.get("change_type")
breaking_only = arguments.get("breaking_only", False)
logger.info(f"Reading changelog from: {project_path}", extra={'version': version, 'change_type': change_type, 'breaking_only': breaking_only})
# Determine changelog location
changelog_path = Path(project_path) / Paths.CHANGELOG_DIR / Files.CHANGELOG
generator = ChangelogGenerator(changelog_path)
if breaking_only:
# Get only breaking changes
logger.debug("Filtering for breaking changes only")
changes = generator.get_breaking_changes()
result = f"Breaking Changes:\n\n"
result += json.dumps(changes, indent=2)
elif version:
# Get specific version
logger.debug(f"Fetching changes for version: {version}")
version_data = generator.get_version_changes(version)
if version_data:
result = f"Changes for version {version}:\n\n"
result += json.dumps(version_data, indent=2)
else:
logger.warning(f"Version not found in changelog: {version}")
result = f"Version {version} not found in changelog"
elif change_type:
# Filter by type
logger.debug(f"Filtering by change type: {change_type}")
changes = generator.get_changes_by_type(change_type)
result = f"Changes of type '{change_type}':\n\n"
result += json.dumps(changes, indent=2)
else:
# Get all changelog
logger.debug("Fetching full changelog")
data = generator.read_changelog()
result = "Full Changelog:\n\n"
result += json.dumps(data, indent=2)
logger.info("Successfully retrieved changelog")
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_add_changelog_entry(arguments: dict) -> list[TextContent]:
"""
Handle add_changelog_entry tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs at boundary (REF-003)
project_path = validate_project_path_input(arguments.get("project_path"))
validated = validate_changelog_inputs(
version=arguments.get("version"),
change_type=arguments.get("change_type"),
severity=arguments.get("severity"),
title=arguments.get("title"),
description=arguments.get("description"),
files=arguments.get("files"),
reason=arguments.get("reason"),
impact=arguments.get("impact")
)
logger.info(f"Adding changelog entry", extra={
'project_path': project_path,
'version': validated['version'],
'change_type': validated['change_type'],
'severity': validated['severity'],
'title': validated['title']
})
# Determine changelog location
changelog_dir = Path(project_path) / Paths.CHANGELOG_DIR
changelog_path = changelog_dir / Files.CHANGELOG
# Create directory structure if it doesn't exist
changelog_dir.mkdir(parents=True, exist_ok=True)
# Create initial CHANGELOG.json if it doesn't exist
if not changelog_path.exists():
initial_data = {
"$schema": "./schema.json",
"project": Path(project_path).name,
"changelog_version": "1.0",
"current_version": "0.0.0",
"entries": []
}
with open(changelog_path, 'w', encoding='utf-8') as f:
json.dump(initial_data, f, indent=2)
f.write('\n')
generator = ChangelogGenerator(changelog_path)
# Get optional arguments (validated inputs are in 'validated' dict)
breaking = arguments.get("breaking", False)
migration = arguments.get("migration")
summary = arguments.get("summary")
contributors = arguments.get("contributors")
# Add the change (using validated inputs)
change_id = generator.add_change(
version=validated['version'],
change_type=validated['change_type'],
severity=validated['severity'],
title=validated['title'],
description=validated['description'],
files=validated['files'],
reason=validated['reason'],
impact=validated['impact'],
breaking=breaking,
migration=migration,
summary=summary,
contributors=contributors
)
result = f"ā
Changelog entry added successfully!\n\n"
result += f"Change ID: {change_id}\n"
result += f"Version: {validated['version']}\n"
result += f"Type: {validated['change_type']}\n"
result += f"Title: {validated['title']}\n\n"
result += f"The changelog has been updated. Use get_changelog to view changes."
logger.info(f"Successfully added changelog entry: {change_id}", extra={'version': validated['version'], 'change_id': change_id})
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_update_changelog(arguments: dict) -> list[TextContent]:
"""
Handle update_changelog tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs at boundary (REF-003)
project_path = validate_project_path_input(arguments.get("project_path"))
version = validate_version_format(arguments.get("version"))
logger.info(f"Preparing agentic changelog update workflow", extra={'project_path': project_path, 'version': version})
project_name = Path(project_path).name
result = f"š Agentic Changelog Update Workflow\n"
result += f"=" * 60 + "\n\n"
result += f"Project: {project_name}\n"
result += f"Version: {version}\n"
result += f"Location: {project_path}\n\n"
result += f"=" * 60 + "\n\n"
result += f"INSTRUCTIONS FOR AGENT:\n\n"
result += f"You have the context of recent changes you made to this project.\n"
result += f"Use that context to document your work in the changelog.\n\n"
result += f"STEP 1: Analyze Your Changes\n"
result += f"-" * 60 + "\n"
result += f"Review the changes you just made:\n"
result += f"⢠What files did you modify?\n"
result += f"⢠What functionality did you add/fix/change?\n"
result += f"⢠Why did you make these changes?\n"
result += f"⢠What impact does this have on users/system?\n\n"
result += f"STEP 2: Determine Change Details\n"
result += f"-" * 60 + "\n"
result += f"Based on your analysis, determine:\n\n"
result += f"change_type (pick one):\n"
result += f" ⢠bugfix - Fixed a bug or error\n"
result += f" ⢠enhancement - Improved existing functionality\n"
result += f" ⢠feature - Added new functionality\n"
result += f" ⢠breaking_change - Incompatible API changes\n"
result += f" ⢠deprecation - Marked features for removal\n"
result += f" ⢠security - Security patches\n\n"
result += f"severity (pick one):\n"
result += f" ⢠critical - System broken, data loss risk\n"
result += f" ⢠major - Significant feature impact\n"
result += f" ⢠minor - Small improvements\n"
result += f" ⢠patch - Cosmetic, docs-only\n\n"
result += f"STEP 3: Call add_changelog_entry\n"
result += f"-" * 60 + "\n"
result += f"Use the add_changelog_entry tool with:\n\n"
result += f"add_changelog_entry(\n"
result += f" project_path=\"{project_path}\",\n"
result += f" version=\"{version}\",\n"
result += f" change_type=\"...\", # from step 2\n"
result += f" severity=\"...\", # from step 2\n"
result += f" title=\"...\", # short, clear title\n"
result += f" description=\"...\", # what changed\n"
result += f" files=[...], # list of modified files\n"
result += f" reason=\"...\", # why you made this change\n"
result += f" impact=\"...\", # effect on users/system\n"
result += f" breaking=false, # or true if breaking change\n"
result += f" contributors=[\"your_name\"] # optional\n"
result += f")\n\n"
result += f"=" * 60 + "\n\n"
result += f"Execute the above steps using your context and call add_changelog_entry.\n"
logger.info(f"Changelog update workflow instructions generated for version {version}")
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_generate_quickref_interactive(arguments: dict) -> list[TextContent]:
"""
Handle generate_quickref_interactive tool call (meta-tool).
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs at boundary (REF-003)
project_path = validate_project_path_input(arguments.get("project_path"))
app_type = arguments.get("app_type")
logger.info(f"Initiating quickref generation workflow", extra={'project_path': project_path, 'app_type': app_type})
# Import QuickrefGenerator
from generators.quickref_generator import QuickrefGenerator
# Initialize generator
generator = QuickrefGenerator()
# Get interview questions and workflow
interview = generator.get_interview_questions(app_type)
# Build response with interview script
result = f"š Universal Quickref Generator - Interactive Workflow\n"
result += f"=" * 60 + "\n\n"
result += f"Project: {Path(project_path).name}\n"
result += f"Output: coderef/quickref.md\n"
if app_type:
result += f"App Type: {app_type.upper()}\n"
result += f"\n" + "=" * 60 + "\n\n"
result += f"INSTRUCTIONS FOR AI:\n\n"
result += f"{interview['instructions_for_ai']}\n\n"
result += f"=" * 60 + "\n\n"
result += f"š INTERVIEW WORKFLOW ({interview['total_steps']} steps):\n\n"
for step in interview['steps']:
step_num = step.get('step')
step_name = step.get('name')
result += f"STEP {step_num}: {step_name}\n"
result += f"-" * 60 + "\n"
if 'questions' in step:
result += f"Ask the user:\n"
for q in step['questions']:
result += f" ⢠{q}\n"
if 'format' in step:
result += f"\nExpected format: {step['format']}\n"
elif 'action' in step:
result += f"{step['action']}\n"
if 'output_location' in step:
result += f"Output: {step['output_location']}\n"
result += f"\n"
result += f"=" * 60 + "\n\n"
result += f"Begin the interview by asking the Step 1 questions.\n"
result += f"After gathering all information, generate quickref.md using the universal pattern.\n"
result += f"Save to: {project_path}/coderef/quickref.md\n"
logger.info(f"Quickref generation workflow initiated")
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_establish_standards(arguments: dict) -> list[TextContent]:
"""
Handle establish_standards tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate and extract inputs (REF-003)
project_path = validate_project_path_input(arguments.get("project_path", ""))
scan_depth = validate_scan_depth(
arguments.get("scan_depth", ScanDepth.STANDARD.value)
)
focus_areas = validate_focus_areas(
arguments.get("focus_areas", [FocusArea.ALL.value])
)
logger.info(
"Starting standards establishment",
extra={
'project_path': str(project_path),
'scan_depth': scan_depth,
'focus_areas': focus_areas
}
)
# Create standards directory if needed
project_path_obj = Path(project_path).resolve()
standards_dir = project_path_obj / Paths.STANDARDS_DIR
standards_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Standards directory ready: {standards_dir}")
# Initialize StandardsGenerator
generator = StandardsGenerator(project_path_obj, scan_depth)
# Generate and save standards
result_dict = generator.save_standards(standards_dir)
# Format success response
result = f"ā
Standards establishment completed successfully!\n\n"
result += f"Project: {project_path_obj.name}\n"
result += f"Scan Depth: {scan_depth}\n"
result += f"Focus Areas: {', '.join(focus_areas)}\n\n"
result += f"=" * 60 + "\n\n"
result += f"š RESULTS:\n\n"
result += f"Files Created: {len(result_dict['files'])}\n"
result += f"Total Patterns Discovered: {result_dict['patterns_count']}\n"
result += f" ⢠UI Patterns: {result_dict['ui_patterns_count']}\n"
result += f" ⢠Behavior Patterns: {result_dict['behavior_patterns_count']}\n"
result += f" ⢠UX Patterns: {result_dict['ux_patterns_count']}\n"
result += f"Components Indexed: {result_dict['components_count']}\n\n"
result += f"=" * 60 + "\n\n"
result += f"š STANDARDS DOCUMENTS:\n\n"
for file_path in result_dict['files']:
file_name = Path(file_path).name
result += f" ⢠{file_name}\n"
result += f"\nš Location: {standards_dir}\n\n"
result += f"These standards documents can now be used with:\n"
result += f" ⢠Tool #9: audit_codebase - Find violations of standards\n"
result += f" ⢠Tool #10: check_consistency - Quality gate for new code\n"
logger.info(
"Standards establishment completed successfully",
extra={
'files_created': len(result_dict['files']),
'patterns_discovered': result_dict['patterns_count'],
'components': result_dict['components_count']
}
)
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_audit_codebase(arguments: dict) -> list[TextContent]:
"""
Handle audit_codebase tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate and extract inputs (REF-003)
project_path = validate_project_path_input(arguments.get("project_path", ""))
standards_dir_arg = arguments.get("standards_dir", Paths.STANDARDS_DIR)
severity_filter = validate_severity_filter(
arguments.get("severity_filter", AuditSeverity.ALL.value)
)
scope = validate_audit_scope(
arguments.get("scope", [AuditScope.ALL.value])
)
generate_fixes = arguments.get("generate_fixes", True)
logger.info(
"Starting codebase audit",
extra={
'project_path': str(project_path),
'standards_dir': standards_dir_arg,
'severity_filter': severity_filter,
'scope': scope,
'generate_fixes': generate_fixes
}
)
# Resolve project path and standards directory
project_path_obj = Path(project_path).resolve()
standards_dir = project_path_obj / standards_dir_arg
# Check if standards documents exist (SEC-001) - raise exception instead of early return
if not standards_dir.exists():
logger.warning(f"Standards directory not found: {standards_dir}")
raise FileNotFoundError(f"Standards directory: {standards_dir}")
# Check if required standards files exist - raise exception instead of early return
required_files = [Files.UI_STANDARDS, Files.BEHAVIOR_STANDARDS, Files.UX_PATTERNS]
missing_files = []
for file_name in required_files:
if not (standards_dir / file_name).exists():
missing_files.append(file_name)
if missing_files:
logger.warning(f"Missing standards files: {missing_files}")
raise FileNotFoundError(f"Standards files: {', '.join(missing_files)}")
logger.debug(f"Standards directory verified: {standards_dir}")
# Initialize AuditGenerator
generator = AuditGenerator(project_path_obj, standards_dir)
logger.debug("AuditGenerator initialized")
# Start timing
start_time = time.time()
# Parse standards documents
logger.info("Parsing standards documents")
standards = generator.parse_standards_documents(standards_dir)
if standards.get('parse_errors'):
logger.warning(f"Standards parsing had errors: {standards['parse_errors']}")
# Scan for violations
logger.info("Scanning codebase for violations")
violations = generator.scan_for_violations(standards)
# Extract files_scanned metadata
files_scanned = 0
if violations:
# Check if first violation has metadata
if '_files_scanned' in violations[0]:
files_scanned = violations[0].pop('_files_scanned')
# Remove metadata-only entries
violations = [v for v in violations if not v.get('_is_metadata', False)]
# Filter violations by severity if not 'all'
if severity_filter != AuditSeverity.ALL.value:
violations = [v for v in violations if v.get('severity') == severity_filter]
logger.debug(f"Filtered to {len(violations)} violations with severity={severity_filter}")
# Calculate compliance score
logger.info("Calculating compliance score")
total_patterns = (
len(standards.get('ui_patterns', {}).get('buttons', {}).get('allowed_sizes', [])) +
len(standards.get('ui_patterns', {}).get('buttons', {}).get('allowed_variants', [])) +
len(standards.get('ui_patterns', {}).get('colors', {}).get('allowed_hex_codes', [])) +
len(standards.get('behavior_patterns', {}).get('error_handling', {}).get('expected_patterns', []))
)
compliance = generator.calculate_compliance_score(violations, total_patterns)
# End timing
end_time = time.time()
duration = end_time - start_time
# Build scan metadata
scan_metadata = {
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'duration': duration,
'files_scanned': files_scanned,
'standards_files': standards.get('source_files', []),
'parse_errors': standards.get('parse_errors', [])
}
# Generate audit report
logger.info("Generating audit report")
report_content = generator.generate_audit_report(violations, compliance, scan_metadata)
# Save audit report
audits_dir = project_path_obj / Paths.AUDITS_DIR
result_dict = generator.save_audit_report(
report_content, audits_dir, violations, compliance, scan_metadata
)
# Format success response
result = f"ā
Codebase audit completed successfully!\n\n"
result += f"Project: {project_path_obj.name}\n"
result += f"Standards: {standards_dir_arg}\n"
result += f"Severity Filter: {severity_filter}\n"
result += f"Scope: {', '.join(scope)}\n\n"
result += f"=" * 60 + "\n\n"
result += f"š AUDIT RESULTS:\n\n"
result += f"Compliance Score: {compliance['overall_score']}/100 ({compliance['grade']})\n"
result += f"Status: {'ā
PASSING' if compliance['passing'] else 'ā FAILING'}\n\n"
result += f"Violations Found: {result_dict['violation_stats']['total_violations']}\n"
result += f" ⢠Critical: {result_dict['violation_stats']['critical_count']}\n"
result += f" ⢠Major: {result_dict['violation_stats']['major_count']}\n"
result += f" ⢠Minor: {result_dict['violation_stats']['minor_count']}\n\n"
result += f"Scan Duration: {duration:.2f} seconds\n"
result += f"Files Scanned: {scan_metadata['files_scanned']}\n\n"
result += f"=" * 60 + "\n\n"
result += f"š AUDIT REPORT:\n\n"
result += f" ⢠{result_dict['report_path']}\n\n"
if standards.get('parse_errors'):
result += f"ā ļø WARNINGS:\n"
for error in standards['parse_errors']:
result += f" ⢠{error}\n"
result += "\n"
result += f"Next steps:\n"
result += f" 1. Review the audit report at {result_dict['report_path']}\n"
result += f" 2. Address critical and major violations first\n"
if generate_fixes:
result += f" 3. Use the fix suggestions in the report\n"
result += f" 4. Re-run audit_codebase to verify fixes\n"
logger.info(
"Codebase audit completed successfully",
extra={
'compliance_score': compliance['overall_score'],
'violations_found': len(violations),
'report_path': result_dict['report_path']
}
)
return [TextContent(type="text", text=result)]
@log_invocation
@mcp_error_handler
async def handle_check_consistency(arguments: dict) -> list[TextContent]:
"""
Handle check_consistency tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate and extract inputs (REF-003)
project_path = validate_project_path_input(arguments.get("project_path", ""))
files = arguments.get("files") # Optional - will auto-detect if not provided
standards_dir_arg = arguments.get("standards_dir", Paths.STANDARDS_DIR)
severity_threshold = arguments.get("severity_threshold", "major") # critical, major, or minor
scope = validate_audit_scope(arguments.get("scope", [AuditScope.ALL.value]))
fail_on_violations = arguments.get("fail_on_violations", True)
# Validate optional parameters
from validation import validate_severity_threshold, validate_file_list
severity_threshold = validate_severity_threshold(severity_threshold)
if files is not None:
files = validate_file_list(files)
logger.info(
"Starting consistency check",
extra={
'project_path': str(project_path),
'files_provided': files is not None,
'severity_threshold': severity_threshold,
'scope': scope
}
)
# Resolve paths
project_path_obj = Path(project_path).resolve()
standards_dir = project_path_obj / standards_dir_arg
# Check if standards exist - raise exception instead of early return
if not standards_dir.exists():
logger.warning(f"Standards directory not found: {standards_dir}")
raise FileNotFoundError(f"Standards directory: {standards_dir}")
# Initialize ConsistencyChecker
from generators.consistency_checker import ConsistencyChecker
checker = ConsistencyChecker(project_path_obj, standards_dir)
logger.debug("ConsistencyChecker initialized")
# Detect or use provided files
if files is None:
# Auto-detect from git
logger.info("Auto-detecting changed files from git")
files_to_check = checker.detect_changed_files(mode='staged')
if not checker.is_git_repository():
logger.error("Not a git repository and no files specified")
raise ValueError("Not a git repository and no files specified")
# Check if no files to check (this is NOT an error, just "no work to do")
if not files_to_check:
logger.info("No files to check (no staged changes)")
result = "[PASS] Consistency check PASSED\n\n"
result += "0 violations found\n"
result += "Files checked: 0\n"
result += "Duration: 0.00s\n\n"
result += "No files to check (no staged changes)."
return [TextContent(type="text", text=result)]
else:
# Use provided files (convert to Path objects)
files_to_check = [Path(f) for f in files]
logger.info(f"Checking {len(files_to_check)} files", extra={'files_count': len(files_to_check)})
# Start timing
start_time = time.time()
# Parse standards
logger.info("Parsing standards documents")
standards = checker.audit_generator.parse_standards_documents(standards_dir)
# Check files for violations
logger.info("Checking files for violations")
violations = checker.check_files(files_to_check, standards, scope)
# Filter by severity threshold
logger.info(f"Filtering violations by severity threshold: {severity_threshold}")
violations = checker.filter_by_severity_threshold(violations, severity_threshold)
# Calculate duration
duration = time.time() - start_time
# Determine status and exit code
status = 'pass' if len(violations) == 0 else 'fail'
exit_code = 1 if (fail_on_violations and status == 'fail') else 0
# Generate terminal-friendly summary
summary = checker.generate_check_summary(violations, len(files_to_check), duration)
# Log completion
logger.info(
"Consistency check completed",
extra={
'status': status,
'violations_found': len(violations),
'files_checked': len(files_to_check),
'duration': duration,
'exit_code': exit_code
}
)
return [TextContent(type="text", text=summary)]
@log_invocation
@mcp_error_handler
async def handle_get_planning_template(arguments: dict[str, Any]) -> list[TextContent]:
"""
Handle get_planning_template tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Extract and validate section parameter
section = arguments.get('section', 'all')
section = validate_section_name(section)
logger.info(f"Retrieving template section: {section}")
# Read template file
template_path = Path(__file__).parent / PlanningPaths.TEMPLATE_PATH
# Check if template file exists - raise exception instead of early return
if not template_path.exists():
logger.warning(f"Template file not found: {template_path}")
raise FileNotFoundError(f"Template file: {PlanningPaths.TEMPLATE_PATH}")
# Parse template JSON (let decorator catch JSONDecodeError)
with open(template_path, 'r', encoding='utf-8') as f:
template_data = json.load(f)
# Extract requested section
if section == 'all':
content = template_data
elif section in template_data:
# Top-level key (META_DOCUMENTATION, QUALITY_CHECKLIST_FOR_PLANS, etc.)
content = template_data[section]
elif 'UNIVERSAL_PLANNING_STRUCTURE' in template_data and section in template_data['UNIVERSAL_PLANNING_STRUCTURE']:
# Section within UNIVERSAL_PLANNING_STRUCTURE (0_preparation, 1_executive_summary, etc.)
content = template_data['UNIVERSAL_PLANNING_STRUCTURE'][section]
else:
# Section not found - raise exception instead of early return
logger.warning(f"Section '{section}' not found in template")
raise FileNotFoundError(f"Section '{section}' in template")
# Format result
result: PlanningTemplateDict = {
'section': section,
'content': content
}
logger.info(
f"Template section '{section}' retrieved successfully",
extra={'section': section, 'content_size': len(json.dumps(content))}
)
return [TextContent(type='text', text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_analyze_project_for_planning(arguments: dict) -> list[TextContent]:
"""
Handle analyze_project_for_planning tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate project_path
project_path = validate_project_path_input(arguments.get('project_path', ''))
project_path_obj = Path(project_path).resolve()
# Get optional feature_name for feature-specific saving
feature_name = arguments.get('feature_name')
if feature_name:
feature_name = validate_feature_name_input(feature_name)
logger.info(f"Analyzing project for planning: {project_path_obj}", extra={'feature_name': feature_name})
# Initialize PlanningAnalyzer
analyzer = PlanningAnalyzer(project_path_obj)
# Run analysis
result = analyzer.analyze()
# Feature-specific file persistence (optional)
if feature_name:
try:
# Create feature working directory
feature_dir = project_path_obj / 'coderef' / 'working' / feature_name
feature_dir.mkdir(parents=True, exist_ok=True)
# Save to analysis.json (no timestamp)
analysis_file = feature_dir / 'analysis.json'
with open(analysis_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2)
# Add metadata to response
result['_metadata'] = {
'saved_to': str(analysis_file.relative_to(project_path_obj)),
'feature_name': feature_name,
'generated_at': datetime.now().isoformat()
}
logger.info(
"Project analysis completed and saved successfully",
extra={
'project_path': str(project_path_obj),
'feature_name': feature_name,
'saved_to': str(analysis_file),
'foundation_docs_available': len(result['foundation_docs'].get('available', [])),
'standards_available': len(result['coding_standards'].get('available', [])),
'patterns_identified': len(result['key_patterns_identified']),
'gaps_found': len(result['gaps_and_risks'])
}
)
except (PermissionError, OSError) as e:
# Graceful degradation - log warning but still return data
logger.warning(
f"Analysis completed but failed to save to file: {str(e)}",
extra={
'project_path': str(project_path_obj),
'feature_name': feature_name,
'error_type': type(e).__name__,
'error_message': str(e)
}
)
# Add metadata indicating save failure
result['_metadata'] = {
'saved_to': None,
'feature_name': feature_name,
'save_error': str(e),
'generated_at': datetime.now().isoformat()
}
logger.info(
"Project analysis completed successfully (file save failed)",
extra={
'project_path': str(project_path_obj),
'feature_name': feature_name,
'foundation_docs_available': len(result['foundation_docs'].get('available', [])),
'standards_available': len(result['coding_standards'].get('available', [])),
'patterns_identified': len(result['key_patterns_identified']),
'gaps_found': len(result['gaps_and_risks'])
}
)
else:
# No feature_name provided - return analysis without saving
logger.info(
"Project analysis completed (not saved - no feature_name provided)",
extra={
'project_path': str(project_path_obj),
'foundation_docs_available': len(result['foundation_docs'].get('available', [])),
'standards_available': len(result['coding_standards'].get('available', [])),
'patterns_identified': len(result['key_patterns_identified']),
'gaps_found': len(result['gaps_and_risks'])
}
)
# Return JSON-formatted result
return [TextContent(type='text', text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_validate_implementation_plan(arguments: dict) -> list[TextContent]:
"""
Handle validate_implementation_plan tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs
project_path_str = arguments.get('project_path', '')
plan_file_str = arguments.get('plan_file_path', '')
project_path = Path(validate_project_path_input(project_path_str)).resolve()
plan_path = validate_plan_file_path(project_path, plan_file_str)
logger.info(f'Validating plan: {plan_path}')
# Check plan file exists - raise exception instead of early return
if not plan_path.exists():
logger.warning(f'Plan file not found: {plan_file_str}')
raise FileNotFoundError(f'Plan file not found: {plan_file_str}')
# Create validator and validate (let decorator catch JSONDecodeError)
validator = PlanValidator(plan_path)
result = validator.validate()
logger.info(
f'Validation complete: score={result["score"]}, result={result["validation_result"]}, issues={len(result["issues"])}',
extra={'score': result['score'], 'result': result['validation_result']}
)
# Return result as JSON
return [TextContent(type='text', text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_generate_plan_review_report(arguments: dict) -> list[TextContent]:
"""
Handle generate_plan_review_report tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs
project_path_str = arguments.get('project_path', '')
plan_file_str = arguments.get('plan_file_path', '')
output_path_arg = arguments.get('output_path')
project_path = Path(validate_project_path_input(project_path_str)).resolve()
plan_path = validate_plan_file_path(project_path, plan_file_str)
logger.info(f'Generating review report for plan: {plan_path}')
# Check plan file exists - raise exception instead of early return
if not plan_path.exists():
logger.warning(f'Plan file not found: {plan_file_str}')
raise FileNotFoundError(f'Plan file not found: {plan_file_str}')
# Run validation first to get validation results (let decorator catch JSONDecodeError)
validator = PlanValidator(plan_path)
validation_result = validator.validate()
logger.debug(
f'Validation completed: score={validation_result["score"]}, issues={len(validation_result["issues"])}'
)
# Extract plan name from file path
plan_name = plan_path.stem # e.g., "feature-auth-plan" from "feature-auth-plan.json"
# Create ReviewFormatter with validation results
formatter = ReviewFormatter(validation_result, plan_name)
# Generate markdown report
report_content = formatter.format_report()
# Determine output path
if output_path_arg:
output_path = project_path / output_path_arg
else:
# Default: coderef/reviews/review-{planname}-{timestamp}.md
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
reviews_dir = project_path / 'coderef' / 'reviews'
reviews_dir.mkdir(parents=True, exist_ok=True)
output_path = reviews_dir / f'review-{plan_name}-{timestamp}.md'
# Save report to file (let decorator catch PermissionError and OSError)
logger.debug(f'Saving report to: {output_path}')
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(report_content)
logger.info(
f'Review report generated successfully: {output_path}',
extra={
'plan_name': plan_name,
'score': validation_result['score'],
'approved': validation_result['approved'],
'output_path': str(output_path)
}
)
# Format success response
result = f"ā
Review report generated successfully!\n\n"
result += f"Plan: {plan_name}\n"
result += f"Score: {validation_result['score']}/100\n"
result += f"Result: {validation_result['validation_result']}\n"
result += f"Approved: {'Yes ā
' if validation_result['approved'] else 'No ā'}\n\n"
result += f"=" * 60 + "\n\n"
result += f"š REVIEW REPORT:\n\n"
result += f" ⢠{output_path.relative_to(project_path)}\n\n"
result += f"The review report has been saved to:\n"
result += f" {output_path}\n"
return [TextContent(type='text', text=result)]
@log_invocation
@mcp_error_handler
async def handle_create_plan(arguments: dict) -> list[TextContent]:
"""
Handle create_plan tool call (meta-tool).
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
# Validate inputs
project_path_str = arguments.get('project_path', '')
feature_name = arguments.get('feature_name', '')
project_path = Path(validate_project_path_input(project_path_str)).resolve()
feature_name = validate_feature_name_input(feature_name)
logger.info(f'Preparing plan synthesis for feature: {feature_name}')
# Initialize PlanningGenerator (let decorator catch FileNotFoundError)
generator = PlanningGenerator(project_path)
# Load inputs
context = generator.load_context(feature_name)
analysis = generator.load_analysis() # Will be None for now
template = generator.load_template()
# Build meta-tool response with synthesis instructions
result = f"š Implementation Plan Synthesis Workflow\n"
result += f"=" * 60 + "\n\n"
result += f"Feature: {feature_name}\n"
result += f"Project: {project_path.name}\n"
result += f"Output: coderef/working/{feature_name}/plan.json\n\n"
result += f"=" * 60 + "\n\n"
result += f"INSTRUCTIONS FOR AI:\n\n"
result += f"You have context, analysis, and template data below.\n"
result += f"Synthesize these into a complete 10-section implementation plan.\n\n"
# Context section
if context:
result += f"š¦ CONTEXT (from /gather-context):\n"
result += f"-" * 60 + "\n"
result += json.dumps(context, indent=2) + "\n\n"
else:
result += f"ā ļø NO CONTEXT - Generate plan without requirements\n\n"
# Analysis section (if available in future)
if analysis:
result += f"š ANALYSIS (from /analyze-for-planning):\n"
result += f"-" * 60 + "\n"
result += json.dumps(analysis, indent=2) + "\n\n"
else:
result += f"ā ļø NO ANALYSIS - Use generic preparation section\n\n"
# Template structure
result += f"š TEMPLATE STRUCTURE:\n"
result += f"-" * 60 + "\n"
result += f"Generate these 10 sections:\n"
result += f"0. Preparation - Use analysis data or generic structure\n"
result += f"1. Executive Summary - Use context description and goal\n"
result += f"2. Risk Assessment - Estimate complexity, identify risks\n"
result += f"3. Current State Analysis - List files to create/modify\n"
result += f"4. Key Features - Extract from context requirements\n"
result += f"5. Task ID System - Break features into specific tasks\n"
result += f"6. Implementation Phases - Group tasks into 3-4 phases\n"
result += f"7. Testing Strategy - Define unit/integration tests\n"
result += f"8. Success Criteria - Measurable acceptance criteria\n"
result += f"9. Implementation Checklist - Phase checklists\n\n"
result += f"=" * 60 + "\n\n"
result += f"ACTION REQUIRED:\n\n"
result += f"Generate the complete plan JSON following this structure:\n"
result += f"{{\n"
result += f' "META_DOCUMENTATION": {{\n'
result += f' "feature_name": "{feature_name}",\n'
result += f' "version": "1.0.0",\n'
result += f' "status": "complete",\n'
result += f' "generated_by": "AI Assistant",\n'
result += f' "has_context": {str(context is not None).lower()},\n'
result += f' "has_analysis": {str(analysis is not None).lower()}\n'
result += f' }},\n'
result += f' "UNIVERSAL_PLANNING_STRUCTURE": {{\n'
result += f' "0_preparation": {{ ... }},\n'
result += f' "1_executive_summary": {{ ... }},\n'
result += f' ... (all 10 sections)\n'
result += f' }}\n'
result += f'}}\n\n'
result += f"Save the plan to:\n"
result += f" {project_path}/coderef/working/{feature_name}/plan.json\n"
logger.info(f'Plan synthesis instructions prepared for: {feature_name}')
return [TextContent(type='text', text=result)]
@log_invocation
@mcp_error_handler
async def handle_inventory_manifest(arguments: dict) -> list[TextContent]:
"""
Handle inventory_manifest tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.inventory_generator import InventoryGenerator
from constants import AnalysisDepth
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
# Get optional parameters
analysis_depth = arguments.get("analysis_depth", "standard")
exclude_dirs = arguments.get("exclude_dirs")
max_file_size = arguments.get("max_file_size")
# Validate analysis_depth (raises ValueError if invalid - caught by decorator)
valid_depths = [d.value for d in AnalysisDepth]
if analysis_depth not in valid_depths:
raise ValueError(f"Invalid analysis_depth. Must be one of: {valid_depths}")
logger.info(f"Generating inventory manifest for: {project_path} (depth={analysis_depth})")
# Initialize inventory generator
generator = InventoryGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest(
analysis_depth=analysis_depth,
exclude_dirs=exclude_dirs,
max_file_size=max_file_size
)
# Save manifest
manifest_path = generator.save_manifest(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"files_analyzed": metrics['total_files'],
"project_name": manifest['project_name'],
"analysis_depth": analysis_depth,
"metrics": {
"total_files": metrics['total_files'],
"total_size": metrics['total_size'],
"total_lines": metrics['total_lines'],
"file_categories": metrics['file_categories'],
"risk_distribution": metrics['risk_distribution'],
"language_breakdown": metrics.get('language_breakdown', {})
},
"success": True
}
logger.info(f"Inventory manifest generated successfully: {result['files_analyzed']} files")
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_dependency_inventory(arguments: dict) -> list[TextContent]:
"""
Handle dependency_inventory tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.dependency_generator import DependencyGenerator
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
# Get optional parameters
scan_security = arguments.get("scan_security", True)
ecosystems = arguments.get("ecosystems", ['all'])
include_transitive = arguments.get("include_transitive", False)
logger.info(
f"Generating dependency inventory for: {project_path}",
extra={
'scan_security': scan_security,
'ecosystems': ecosystems,
'include_transitive': include_transitive
}
)
# Initialize dependency generator
generator = DependencyGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest(
scan_security=scan_security,
ecosystems=ecosystems,
include_transitive=include_transitive
)
# Save manifest
manifest_path = generator.save_manifest(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"package_managers": manifest['package_managers'],
"total_dependencies": metrics['total_dependencies'],
"vulnerable_count": metrics['vulnerable_count'],
"outdated_count": metrics['outdated_count'],
"metrics": {
"total_dependencies": metrics['total_dependencies'],
"direct_count": metrics['direct_count'],
"dev_count": metrics['dev_count'],
"peer_count": metrics.get('peer_count', 0),
"transitive_count": metrics.get('transitive_count', 0),
"outdated_count": metrics['outdated_count'],
"vulnerable_count": metrics['vulnerable_count'],
"critical_vulnerabilities": metrics['critical_vulnerabilities'],
"high_vulnerabilities": metrics['high_vulnerabilities'],
"medium_vulnerabilities": metrics['medium_vulnerabilities'],
"low_vulnerabilities": metrics['low_vulnerabilities'],
"license_breakdown": metrics.get('license_breakdown', {}),
"ecosystem_breakdown": metrics['ecosystem_breakdown']
},
"success": True
}
logger.info(
f"Dependency inventory generated successfully: {result['total_dependencies']} dependencies, {result['vulnerable_count']} vulnerable"
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_api_inventory(arguments: dict) -> list[TextContent]:
"""
Handle api_inventory tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.api_generator import ApiGenerator
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
# Get optional parameters
frameworks = arguments.get("frameworks", ['all'])
include_graphql = arguments.get("include_graphql", False)
scan_documentation = arguments.get("scan_documentation", True)
logger.info(
f"Generating API inventory for: {project_path}",
extra={
'frameworks': frameworks,
'include_graphql': include_graphql,
'scan_documentation': scan_documentation
}
)
# Initialize API generator
generator = ApiGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest(
frameworks=frameworks,
include_graphql=include_graphql,
scan_documentation=scan_documentation
)
# Save manifest
manifest_path = generator.save(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"frameworks": manifest['frameworks'],
"total_endpoints": metrics['total_endpoints'],
"documented_endpoints": metrics['documented_endpoints'],
"documentation_coverage": metrics['documentation_coverage'],
"metrics": {
"total_endpoints": metrics['total_endpoints'],
"documented_endpoints": metrics['documented_endpoints'],
"documentation_coverage": metrics['documentation_coverage'],
"frameworks_detected": metrics['frameworks_detected'],
"framework_breakdown": metrics.get('framework_breakdown', {}),
"method_breakdown": metrics.get('method_breakdown', {}),
"rest_endpoints": metrics['rest_endpoints'],
"graphql_endpoints": metrics['graphql_endpoints']
},
"success": True
}
logger.info(
f"API inventory generated successfully: {result['total_endpoints']} endpoints, {result['documentation_coverage']}% documented"
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_test_inventory(arguments: dict) -> list[TextContent]:
"""
Handle test_inventory tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.test_generator import TestGenerator
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
# Get optional parameters
frameworks = arguments.get("frameworks", ['all'])
include_coverage = arguments.get("include_coverage", True)
logger.info(
f"Generating test inventory for: {project_path}",
extra={
'frameworks': frameworks,
'include_coverage': include_coverage
}
)
# Initialize test generator
generator = TestGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest()
# Save manifest
manifest_path = generator.save_manifest(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"frameworks_detected": manifest['frameworks'],
"total_test_files": metrics['total_test_files'],
"untested_files_count": metrics['untested_files_count'],
"coverage_available": metrics['coverage_available'],
"metrics": {
"total_test_files": metrics['total_test_files'],
"frameworks_detected": metrics['frameworks_detected'],
"untested_files_count": metrics['untested_files_count'],
"coverage_available": metrics['coverage_available'],
"overall_coverage": metrics.get('overall_coverage')
},
"success": True
}
logger.info(
f"Test inventory generated successfully: {result['total_test_files']} test files, {result['untested_files_count']} untested files"
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_database_inventory(arguments: dict) -> list[TextContent]:
"""
Handle database_inventory tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.database_generator import DatabaseGenerator
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
# Get optional parameters
database_systems = arguments.get("database_systems", ['all'])
include_migrations = arguments.get("include_migrations", True)
logger.info(
f"Generating database inventory for: {project_path}",
extra={
'database_systems': database_systems,
'include_migrations': include_migrations
}
)
# Initialize database generator
generator = DatabaseGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest(
database_systems=database_systems,
include_migrations=include_migrations
)
# Save manifest
manifest_path = generator.save(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"database_systems": manifest['database_systems'],
"total_schemas": metrics['total_schemas'],
"sql_tables": metrics['sql_tables'],
"nosql_collections": metrics['nosql_collections'],
"metrics": {
"total_schemas": metrics['total_schemas'],
"sql_tables": metrics['sql_tables'],
"nosql_collections": metrics['nosql_collections'],
"database_systems_detected": metrics['database_systems_detected'],
"system_breakdown": metrics.get('system_breakdown', {}),
"orm_breakdown": metrics.get('orm_breakdown', {}),
"total_columns": metrics['total_columns'],
"total_relationships": metrics['total_relationships']
},
"success": True
}
logger.info(
f"Database inventory generated successfully: {result['total_schemas']} schemas ({result['sql_tables']} SQL tables, {result['nosql_collections']} NoSQL collections)"
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_config_inventory(arguments: dict) -> list[TextContent]:
"""
Handle config_inventory tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.config_generator import ConfigGenerator
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
# Get optional parameters
formats = arguments.get("formats", ['all'])
mask_sensitive = arguments.get("mask_sensitive", True)
logger.info(
f"Generating configuration inventory for: {project_path}",
extra={
'formats': formats,
'mask_sensitive': mask_sensitive
}
)
# Initialize config generator
generator = ConfigGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest(
formats=formats,
mask_sensitive=mask_sensitive
)
# Save manifest
manifest_path = generator.save_manifest(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"formats_detected": manifest['formats'],
"total_files": metrics['total_files'],
"sensitive_files": metrics['sensitive_files'],
"success": True
}
logger.info(
f"Configuration inventory generated successfully: {result['total_files']} files, {result['sensitive_files']} with sensitive data"
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@log_invocation
@mcp_error_handler
async def handle_documentation_inventory(arguments: dict) -> list[TextContent]:
"""
Handle documentation_inventory tool call.
Uses @log_invocation and @mcp_error_handler decorators for automatic
logging and error handling (ARCH-004, ARCH-005).
"""
from generators.documentation_generator import DocumentationGenerator
# Validate inputs
project_path = validate_project_path_input(arguments.get("project_path", ""))
logger.info(f"Generating documentation inventory for: {project_path}")
# Initialize documentation generator
generator = DocumentationGenerator(Path(project_path))
# Generate manifest
manifest = generator.generate_manifest()
# Save manifest
manifest_path = generator.save_manifest(manifest)
# Build summary response
metrics = manifest['metrics']
result = {
"manifest_path": str(manifest_path.relative_to(Path(project_path))),
"formats_detected": manifest['formats'],
"total_files": metrics['total_files'],
"markdown_files": metrics['markdown_files'],
"rst_files": metrics['rst_files'],
"asciidoc_files": metrics['asciidoc_files'],
"html_files": metrics['html_files'],
"orgmode_files": metrics['orgmode_files'],
"quality_score": metrics['quality_score'],
"freshness_days": metrics['freshness_days'],
"coverage_percentage": metrics['coverage_percentage'],
"success": True
}
logger.info(
f"Documentation inventory generated successfully: {result['total_files']} files, quality score {result['quality_score']}/100"
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Tool handlers registry (QUA-002)
TOOL_HANDLERS = {
'list_templates': handle_list_templates,
'get_template': handle_get_template,
'generate_foundation_docs': handle_generate_foundation_docs,
'generate_individual_doc': handle_generate_individual_doc,
'get_changelog': handle_get_changelog,
'add_changelog_entry': handle_add_changelog_entry,
'update_changelog': handle_update_changelog,
'generate_quickref_interactive': handle_generate_quickref_interactive,
'establish_standards': handle_establish_standards,
'audit_codebase': handle_audit_codebase,
'check_consistency': handle_check_consistency,
'get_planning_template': handle_get_planning_template,
'analyze_project_for_planning': handle_analyze_project_for_planning,
'validate_implementation_plan': handle_validate_implementation_plan,
'generate_plan_review_report': handle_generate_plan_review_report,
'create_plan': handle_create_plan,
'inventory_manifest': handle_inventory_manifest,
'dependency_inventory': handle_dependency_inventory,
'api_inventory': handle_api_inventory,
'database_inventory': handle_database_inventory,
'config_inventory': handle_config_inventory,
'test_inventory': handle_test_inventory,
'documentation_inventory': handle_documentation_inventory,
}
def set_templates_dir(templates_dir: Path) -> None:
"""Set the TEMPLATES_DIR global for handlers to use."""
global TEMPLATES_DIR
TEMPLATES_DIR = templates_dir