"""Skill lifecycle management - discovery, loading, execution."""
import logging
from datetime import UTC, datetime
from pathlib import Path
import yaml
from pydantic import ValidationError
from mcp_skills.models.skill import (
Skill,
SkillMetadata,
SkillMetadataModel,
SkillModel,
)
from mcp_skills.services.validators import (
SkillSecurityValidator,
SkillValidator,
ThreatLevel,
TrustLevel,
)
logger = logging.getLogger(__name__)
class SkillManager:
"""Orchestrate skill lifecycle - discovery, loading, execution.
Manages the complete lifecycle of skills from discovery through
loading and caching. Integrates with indexing for search.
Design Decision: In-Memory Caching Strategy
Rationale: Use simple dict cache for loaded skills to avoid repeated
file I/O and YAML parsing. Skills are immutable once loaded, making
caching safe and effective.
Trade-offs:
- Performance: O(1) cache lookups vs. O(n) file I/O + parsing
- Memory: ~10KB per skill for 100 skills = ~1MB (negligible)
- Freshness: Cache must be cleared when repos are updated
Future Optimization: When skill count exceeds 1000, consider:
- LRU cache with size limit (functools.lru_cache)
- Disk-based cache with mtime checking
- SQLite integration (Phase 1 Task 7) for indexed access
"""
# Predefined skill categories (maintained for backward compatibility)
VALID_CATEGORIES = SkillValidator.VALID_CATEGORIES
def __init__(
self, repos_dir: Path | None = None, enable_security: bool = True
) -> None:
"""Initialize skill manager.
Args:
repos_dir: Directory containing skill repositories.
Defaults to ~/.mcp-skillset/repos/
enable_security: Enable security validation for skills (default: True)
"""
self.repos_dir = repos_dir or Path.home() / ".mcp-skillset" / "repos"
self._skill_cache: dict[str, Skill] = {}
self._skill_paths: dict[str, Path] = {} # Map skill_id -> file_path
self.validator = SkillValidator()
self.enable_security = enable_security
# Trusted repositories (minimal security filtering)
self._trusted_repos = {
"anthropics", # anthropics/skills
"bobmatnyc", # bobmatnyc/claude-mpm-skills
"anthropics-skills", # Legacy support
"claude-mpm-skills", # Legacy support
}
# Verified community repositories (moderate filtering)
self._verified_repos: set[str] = set() # Can be extended via configuration
def discover_skills(self, repos_dir: Path | None = None) -> list[Skill]:
"""Scan repositories for skills.
Searches for SKILL.md files in repository directories and
parses their metadata.
Args:
repos_dir: Directory to scan (defaults to self.repos_dir)
Returns:
List of discovered Skill objects
Performance:
- Time Complexity: O(n) where n = total files in all repos
- Space Complexity: O(m) where m = number of skills found
Error Handling:
- Invalid YAML: Log error and skip skill
- Missing required fields: Log error and skip skill
- File read errors: Log error and skip skill
Example:
>>> manager = SkillManager()
>>> skills = manager.discover_skills()
>>> len(skills)
42
>>> skills[0].name
'pytest-testing'
"""
search_dir = repos_dir or self.repos_dir
if not search_dir.exists():
logger.warning(f"Repository directory does not exist: {search_dir}")
return []
discovered_skills: list[Skill] = []
# Walk directory tree and find all SKILL.md files (case-insensitive)
for skill_file in search_dir.rglob("*"):
if skill_file.name.upper() == "SKILL.MD" and skill_file.is_file():
try:
# Extract repo_id from path structure
# Path structure: {repos_dir}/{repo_id}/{skill_path}/SKILL.md
relative_path = skill_file.relative_to(search_dir)
repo_id = (
relative_path.parts[0] if relative_path.parts else "unknown"
)
# Parse skill file
skill = self._parse_skill_file(skill_file, repo_id)
if skill:
discovered_skills.append(skill)
# Cache the skill path for later lookups
self._skill_paths[skill.id] = skill_file
logger.debug(f"Discovered skill: {skill.id}")
except Exception as e:
logger.error(f"Failed to parse skill file {skill_file}: {e}")
continue
logger.info(f"Discovered {len(discovered_skills)} skills in {search_dir}")
return discovered_skills
def load_skill(self, skill_id: str) -> Skill | None:
"""Load skill from disk with caching and security validation.
Security Features:
1. Trust level detection based on repository
2. Prompt injection pattern detection
3. Suspicious content scanning
4. Size limit enforcement
5. Content sanitization with boundaries
Args:
skill_id: Unique skill identifier
Returns:
Skill object or None if not found or blocked by security
Performance:
- Cache hit: O(1) dict lookup
- Cache miss: O(n) file search + O(m) parsing where m = file size
Example:
>>> manager = SkillManager()
>>> skill = manager.load_skill("anthropics/pytest")
>>> skill.name
'pytest-testing'
>>> # Second call uses cache
>>> skill2 = manager.load_skill("anthropics/pytest")
>>> skill is skill2
True
"""
# Check cache first
if skill_id in self._skill_cache:
logger.debug(f"Cache hit for skill: {skill_id}")
return self._skill_cache[skill_id]
# Check if we have the path cached from discovery
if skill_id in self._skill_paths:
skill_file = self._skill_paths[skill_id]
relative_path = skill_file.relative_to(self.repos_dir)
repo_id = relative_path.parts[0] if relative_path.parts else "unknown"
skill = self._parse_skill_file(skill_file, repo_id)
if skill:
# Security validation before caching
if self.enable_security:
skill = self._apply_security_validation(skill)
if not skill:
return None # Blocked by security
self._skill_cache[skill_id] = skill
return skill
# Fall back to searching for the skill
# This handles the case where load_skill is called before discover_skills
logger.debug(f"Searching for skill: {skill_id}")
# Try to find skill file by ID
# Skill ID format: {repo_id}/{skill_path}
parts = skill_id.split("/", 1)
if len(parts) == 2:
repo_id, skill_path = parts
skill_file = self.repos_dir / repo_id / skill_path / "SKILL.md"
if skill_file.exists():
skill = self._parse_skill_file(skill_file, repo_id)
if skill:
# Security validation before caching
if self.enable_security:
skill = self._apply_security_validation(skill)
if not skill:
return None # Blocked by security
self._skill_cache[skill_id] = skill
self._skill_paths[skill_id] = skill_file
return skill
logger.warning(f"Skill not found: {skill_id}")
return None
def get_skill_metadata(self, skill_id: str) -> SkillMetadata | None:
"""Extract metadata from SKILL.md.
Parses YAML frontmatter without loading full instructions.
This is faster than load_skill() when only metadata is needed.
Args:
skill_id: Unique skill identifier
Returns:
SkillMetadata object or None if not found
Performance:
- O(1) for cached skills (checks cache first)
- O(m) where m = frontmatter size (skips instructions)
- ~10x faster than full skill loading for large instruction sets
Example:
>>> manager = SkillManager()
>>> metadata = manager.get_skill_metadata("anthropics/pytest")
>>> metadata.name
'pytest-testing'
>>> metadata.category
'testing'
"""
# Check if skill is already cached
if skill_id in self._skill_cache:
skill = self._skill_cache[skill_id]
return SkillMetadata(
name=skill.name,
description=skill.description,
category=skill.category,
tags=skill.tags,
dependencies=skill.dependencies,
version=skill.version,
author=skill.author,
)
# Find skill file
skill_file: Path | None = None
if skill_id in self._skill_paths:
skill_file = self._skill_paths[skill_id]
else:
# Try to construct path from skill_id
parts = skill_id.split("/", 1)
if len(parts) == 2:
repo_id, skill_path = parts
potential_file = self.repos_dir / repo_id / skill_path / "SKILL.md"
if potential_file.exists():
skill_file = potential_file
if not skill_file or not skill_file.exists():
logger.warning(f"Skill file not found for: {skill_id}")
return None
# Parse frontmatter only (skip instructions for performance)
try:
frontmatter = self._parse_frontmatter(skill_file)
if not frontmatter:
return None
# Validate with Pydantic
metadata_model = SkillMetadataModel(**frontmatter)
return SkillMetadata(
name=metadata_model.name,
description=metadata_model.description,
category=metadata_model.category,
tags=metadata_model.tags,
dependencies=metadata_model.dependencies,
version=metadata_model.version,
author=metadata_model.author,
)
except (ValidationError, KeyError, yaml.YAMLError) as e:
logger.error(f"Failed to parse metadata for {skill_id}: {e}")
return None
def validate_skill(self, skill: Skill) -> dict[str, list[str]]:
"""Check skill structure and dependencies.
Validates skill against structure requirements and business rules.
Returns both critical errors (MUST fix) and warnings (SHOULD fix).
Args:
skill: Skill object to validate
Returns:
Dictionary with validation results:
{
"errors": ["Critical errors that prevent skill usage"],
"warnings": ["Non-critical warnings for improvement"]
}
Validation Rules:
- ERRORS (critical):
- Missing required fields: name, description, instructions
- Invalid YAML frontmatter
- Description too short (<10 chars)
- Instructions too short (<50 chars)
- WARNINGS (non-critical):
- Unknown category (not in VALID_CATEGORIES)
- Missing tags
- Missing examples in instructions
- Unresolved dependencies
Example:
>>> manager = SkillManager()
>>> skill = manager.load_skill("test/broken-skill")
>>> result = manager.validate_skill(skill)
>>> result["errors"]
['Description too short (5 chars, minimum 10)']
>>> result["warnings"]
['Unknown category: invalid-cat']
"""
# Delegate validation to SkillValidator with dependency resolution
return self.validator.validate_skill_with_dependencies(
skill, dependency_resolver=self.load_skill
)
def search_skills(
self, query: str, category: str | None = None, limit: int = 10
) -> list[Skill]:
"""Search skills using basic text matching.
This is a simple keyword-based search. For advanced semantic search,
use IndexingEngine with ChromaDB vector search (Task 5).
Args:
query: Search query (case-insensitive)
category: Optional category filter (exact match)
limit: Maximum results to return
Returns:
List of matching Skill objects sorted by relevance
Relevance Scoring:
- Name match: 10 points
- Tag match: 5 points per tag
- Description match: 3 points
- Higher scores = more relevant
Performance:
- Time Complexity: O(n * m) where n = skills, m = avg text length
- For >1000 skills, consider vector search (ChromaDB integration)
Example:
>>> manager = SkillManager()
>>> manager.discover_skills()
>>> results = manager.search_skills("testing python", category="testing")
>>> results[0].name
'pytest-testing'
"""
# Discover skills if not already cached
# This ensures we search across all available skills
if not self._skill_paths:
self.discover_skills()
query_lower = query.lower()
results: list[tuple[Skill, int]] = [] # (skill, relevance_score)
# Search through all discovered skills
for skill_id in self._skill_paths:
skill = self.load_skill(skill_id)
if not skill:
continue
# Apply category filter if specified
if category and skill.category != category:
continue
score = 0
# Check name match (highest weight)
if query_lower in skill.name.lower():
score += 10
# Check tag matches
for tag in skill.tags:
if query_lower in tag.lower():
score += 5
# Check description match
if query_lower in skill.description.lower():
score += 3
# Only include results with non-zero score
if score > 0:
results.append((skill, score))
# Sort by relevance score (descending) and limit results
results.sort(key=lambda x: x[1], reverse=True)
return [skill for skill, _ in results[:limit]]
def clear_cache(self) -> None:
"""Clear in-memory skill cache.
Clears both the skill object cache and skill path cache.
Call this after repository updates to ensure fresh data.
"""
self._skill_cache.clear()
self._skill_paths.clear()
# Private helper methods
def _parse_skill_file(self, file_path: Path, repo_id: str) -> Skill | None:
"""Parse SKILL.md file and create Skill object.
Args:
file_path: Path to SKILL.md file
repo_id: Repository identifier
Returns:
Skill object or None if parsing fails
Error Handling:
- Invalid YAML: Log error and return None
- Missing required fields: Log error and return None
- File read errors: Log error and return None
Design Decision: Pydantic Validation
Rationale: Use Pydantic for validation to enforce data quality at
parse time. This catches issues early and provides clear error messages.
Trade-offs:
- Safety: Invalid skills are rejected, preventing bad data
- Performance: ~5% overhead for validation vs. raw parsing
- Developer Experience: Clear validation errors vs. runtime failures
"""
try:
# Get file modification time before reading content
file_stat = file_path.stat()
updated_at = datetime.fromtimestamp(file_stat.st_mtime, tz=UTC)
# Read file content
content = file_path.read_text(encoding="utf-8")
# Parse frontmatter and instructions
frontmatter, instructions = self._split_frontmatter(content)
if not frontmatter:
logger.error(f"No frontmatter found in {file_path}")
return None
# Parse YAML frontmatter
metadata = yaml.safe_load(frontmatter)
if not isinstance(metadata, dict):
logger.error(f"Invalid frontmatter format in {file_path}")
return None
# Generate skill ID from file path
# Format: {repo_id}/{skill_path}
# Example: anthropics/testing/pytest/SKILL.md -> anthropics/testing/pytest
relative_path = file_path.relative_to(self.repos_dir)
path_parts = list(relative_path.parts[:-1]) # Remove SKILL.md
skill_id = "/".join(path_parts)
# Normalize skill ID (lowercase, replace spaces/special chars)
skill_id = self._normalize_skill_id(skill_id)
# Extract examples from instructions (look for ## Examples section)
examples = self._extract_examples(instructions)
# Build skill data for validation
skill_data = {
"id": skill_id,
"name": metadata.get("name", ""),
"description": metadata.get("description", ""),
"instructions": instructions.strip(),
"category": metadata.get("category", ""),
"tags": metadata.get("tags", []),
"dependencies": metadata.get("dependencies", []),
"examples": examples,
"file_path": str(file_path),
"repo_id": repo_id,
"version": metadata.get("version"),
"author": metadata.get("author"),
}
# Validate with Pydantic
skill_model = SkillModel(**skill_data)
# Create Skill dataclass instance
return Skill(
id=skill_model.id,
name=skill_model.name,
description=skill_model.description,
instructions=skill_model.instructions,
category=skill_model.category,
tags=skill_model.tags,
dependencies=skill_model.dependencies,
examples=skill_model.examples,
file_path=file_path,
repo_id=skill_model.repo_id,
version=skill_model.version,
author=skill_model.author,
updated_at=updated_at,
)
except (ValidationError, yaml.YAMLError, OSError) as e:
logger.error(f"Failed to parse skill file {file_path}: {e}")
return None
def _parse_frontmatter(self, file_path: Path) -> dict | None:
"""Parse YAML frontmatter from SKILL.md file.
This is a faster alternative to full file parsing when only
metadata is needed.
Args:
file_path: Path to SKILL.md file
Returns:
Dictionary with frontmatter data or None if parsing fails
"""
# Delegate to validator
return self.validator.parse_frontmatter(file_path)
def _split_frontmatter(self, content: str) -> tuple[str, str]:
"""Split SKILL.md content into frontmatter and instructions.
Expected format:
---
name: skill-name
description: Brief description
---
# Instructions
Markdown content...
Args:
content: Full file content
Returns:
Tuple of (frontmatter_yaml, instructions_markdown)
"""
# Delegate to validator
return self.validator.split_frontmatter(content)
def _normalize_skill_id(self, skill_id: str) -> str:
"""Normalize skill ID to lowercase with hyphens.
Args:
skill_id: Raw skill ID from file path
Returns:
Normalized skill ID (lowercase, special chars replaced)
Examples:
"Anthropics/Testing/PyTest" -> "anthropics/testing/pytest"
"My Skill!" -> "my-skill"
"""
# Delegate to validator
return self.validator.normalize_skill_id(skill_id)
def _extract_examples(self, instructions: str) -> list[str]:
"""Extract examples from skill instructions.
Looks for sections like:
- ## Examples
- ## Example Usage
- Code blocks (```)
Args:
instructions: Skill instructions markdown
Returns:
List of example strings (empty if none found)
Performance Note:
- This is a basic heuristic extraction
- For more sophisticated parsing, consider using markdown AST
- Current implementation is fast enough for <100KB files
"""
# Delegate to validator
return self.validator.extract_examples(instructions)
# Security-related private methods
def _get_trust_level(self, repo_id: str) -> TrustLevel:
"""Determine trust level for a repository.
Trust Level Assignment:
- TRUSTED: Official Anthropic repositories
- VERIFIED: Known community repositories (configurable)
- UNTRUSTED: All other repositories (default)
Args:
repo_id: Repository identifier
Returns:
Trust level for security filtering
Example:
>>> manager._get_trust_level("anthropics-skills")
TrustLevel.TRUSTED
>>> manager._get_trust_level("unknown-repo")
TrustLevel.UNTRUSTED
"""
if repo_id in self._trusted_repos:
return TrustLevel.TRUSTED
elif repo_id in self._verified_repos:
return TrustLevel.VERIFIED
else:
return TrustLevel.UNTRUSTED
def _apply_security_validation(self, skill: Skill) -> Skill | None:
"""Apply security validation and sanitization to skill.
Security Workflow:
1. Determine repository trust level
2. Create security validator with trust level
3. Validate skill content for threats
4. Log violations and block if necessary
5. Sanitize content with boundaries
6. Return sanitized skill or None if blocked
Args:
skill: Skill object to validate
Returns:
Sanitized skill or None if blocked by security
Security Decision Logic:
- TRUSTED repos: Only block BLOCKED-level threats
- VERIFIED repos: Block BLOCKED and DANGEROUS threats
- UNTRUSTED repos: Block BLOCKED, DANGEROUS, and SUSPICIOUS threats
Example:
>>> skill = Skill(...) # Skill with suspicious content
>>> validated = manager._apply_security_validation(skill)
>>> validated is None # True if blocked
"""
# Determine trust level
trust_level = self._get_trust_level(skill.repo_id)
# Create security validator
security_validator = SkillSecurityValidator(trust_level=trust_level)
# Validate skill content
is_safe, violations = security_validator.validate_skill(
instructions=skill.instructions,
description=skill.description,
skill_id=skill.id,
)
# Log violations
if violations:
# Group violations by threat level for logging
blocked = [v for v in violations if v.threat_level == ThreatLevel.BLOCKED]
dangerous = [
v for v in violations if v.threat_level == ThreatLevel.DANGEROUS
]
suspicious = [
v for v in violations if v.threat_level == ThreatLevel.SUSPICIOUS
]
if blocked:
logger.error(
f"Skill {skill.id} BLOCKED - Critical security threats detected:\n"
+ "\n".join([f" - {v.description}" for v in blocked])
)
if dangerous:
logger.warning(
f"Skill {skill.id} contains DANGEROUS patterns:\n"
+ "\n".join([f" - {v.description}" for v in dangerous])
)
if suspicious:
logger.info(
f"Skill {skill.id} contains SUSPICIOUS content:\n"
+ "\n".join([f" - {v.description}" for v in suspicious])
)
# Block if not safe
if not is_safe:
logger.error(
f"Skill {skill.id} blocked by security validation. "
f"Trust level: {trust_level.value}, "
f"Violations: {len(violations)}"
)
return None
# Sanitize content (wrap in boundaries)
skill.instructions = security_validator.sanitize_skill(
skill.instructions, skill.id
)
logger.debug(
f"Skill {skill.id} passed security validation ({trust_level.value})"
)
return skill
def add_verified_repo(self, repo_id: str) -> None:
"""Add a repository to the verified trust list.
Verified repositories receive moderate security filtering
(block BLOCKED and DANGEROUS threats, allow SUSPICIOUS).
Args:
repo_id: Repository identifier to add
Example:
>>> manager.add_verified_repo("community/awesome-skills")
"""
self._verified_repos.add(repo_id)
logger.info(f"Added {repo_id} to verified repositories")
def remove_verified_repo(self, repo_id: str) -> None:
"""Remove a repository from the verified trust list.
Repository will revert to UNTRUSTED status.
Args:
repo_id: Repository identifier to remove
Example:
>>> manager.remove_verified_repo("community/awesome-skills")
"""
self._verified_repos.discard(repo_id)
logger.info(f"Removed {repo_id} from verified repositories")