Skip to main content
Glama

Claude Skills MCP Server

by K-Dense-AI
skill_loader.py32.9 kB
"""Skill loading and parsing functionality.""" import base64 import hashlib import json import logging import re import tempfile from collections.abc import Callable from datetime import datetime, timedelta from pathlib import Path from typing import Any from urllib.parse import urlparse import httpx logger = logging.getLogger(__name__) class Skill: """Represents a Claude Agent Skill. Attributes ---------- name : str Skill name. description : str Short description of the skill. content : str Full content of the SKILL.md file. source : str Origin of the skill (GitHub URL or local path). documents : dict[str, dict[str, Any]] Additional documents from the skill directory. Keys are relative paths, values contain metadata and content. _document_fetcher : Callable | None Function to fetch document content on-demand. _document_cache : dict[str, dict[str, Any]] In-memory cache for fetched documents. """ def __init__( self, name: str, description: str, content: str, source: str, documents: dict[str, dict[str, Any]] | None = None, document_fetcher: Callable | None = None, ): self.name = name self.description = description self.content = content self.source = source self.documents = documents or {} self._document_fetcher = document_fetcher self._document_cache = {} def get_document(self, doc_path: str) -> dict[str, Any] | None: """Fetch document content on-demand with caching. Parameters ---------- doc_path : str Relative path to the document. Returns ------- dict[str, Any] | None Document content with metadata, or None if not found. """ # Check memory cache first if doc_path in self._document_cache: return self._document_cache[doc_path] # Check if document exists in metadata if doc_path not in self.documents: return None # If already fetched (eager loaded), return from documents doc_info = self.documents[doc_path] if doc_info.get("fetched") or "content" in doc_info: return doc_info # Fetch using the document_fetcher (lazy loading) if self._document_fetcher: content = self._document_fetcher(doc_path) if content: # Cache it in memory self._document_cache[doc_path] = content return content return None def to_dict(self) -> dict[str, Any]: """Convert skill to dictionary representation. Returns ------- dict[str, Any] Dictionary with skill information. """ return { "name": self.name, "description": self.description, "content": self.content, "source": self.source, "documents": self.documents, } def parse_skill_md(content: str, source: str) -> Skill | None: """Parse a SKILL.md file and extract skill information. Parameters ---------- content : str Content of the SKILL.md file. source : str Origin of the skill (for tracking). Returns ------- Skill | None Parsed skill or None if parsing failed. """ try: # Parse YAML frontmatter (between --- markers) frontmatter_match = re.match( r"^---\s*\n(.*?)\n---\s*\n(.*)$", content, re.DOTALL ) if not frontmatter_match: logger.warning(f"No YAML frontmatter found in skill from {source}") return None frontmatter_text = frontmatter_match.group(1) markdown_body = frontmatter_match.group(2) # Extract name and description from YAML frontmatter name_match = re.search(r"^name:\s*(.+)$", frontmatter_text, re.MULTILINE) desc_match = re.search(r"^description:\s*(.+)$", frontmatter_text, re.MULTILINE) if not name_match or not desc_match: logger.warning(f"Missing name or description in skill from {source}") return None name = name_match.group(1).strip() description = desc_match.group(1).strip() # Remove quotes if present name = name.strip("\"'") description = description.strip("\"'") return Skill( name=name, description=description, content=markdown_body.strip(), # Store only the markdown body, not the frontmatter source=source, ) except Exception as e: logger.error(f"Error parsing SKILL.md from {source}: {e}") return None def _is_text_file(file_path: Path, text_extensions: list[str]) -> bool: """Check if a file is a text file based on extension. Parameters ---------- file_path : Path Path to the file. text_extensions : list[str] List of allowed text file extensions. Returns ------- bool True if file is a text file. """ return file_path.suffix.lower() in text_extensions def _is_image_file(file_path: Path, image_extensions: list[str]) -> bool: """Check if a file is an image based on extension. Parameters ---------- file_path : Path Path to the file. image_extensions : list[str] List of allowed image file extensions. Returns ------- bool True if file is an image. """ return file_path.suffix.lower() in image_extensions def _load_text_file(file_path: Path) -> dict[str, Any] | None: """Load a text file and return its metadata. Parameters ---------- file_path : Path Path to the text file. Returns ------- dict[str, Any] | None Document metadata with content, or None on error. """ try: content = file_path.read_text(encoding="utf-8") return { "type": "text", "content": content, "size": len(content), } except Exception as e: logger.error(f"Error reading text file {file_path}: {e}") return None def _load_image_file( file_path: Path, max_size: int, url: str | None = None ) -> dict[str, Any] | None: """Load an image file and return its metadata with base64 encoding. Parameters ---------- file_path : Path Path to the image file. max_size : int Maximum file size in bytes. url : str | None Optional URL to the image (for GitHub sources). Returns ------- dict[str, Any] | None Document metadata with base64 content and/or URL, or None on error. """ try: file_size = file_path.stat().st_size if file_size > max_size: logger.warning( f"Image {file_path} exceeds size limit ({file_size} > {max_size}), " "storing metadata only" ) result = { "type": "image", "size": file_size, "size_exceeded": True, } if url: result["url"] = url return result # Read and base64 encode the image image_data = file_path.read_bytes() base64_content = base64.b64encode(image_data).decode("utf-8") result = { "type": "image", "content": base64_content, "size": file_size, } if url: result["url"] = url return result except Exception as e: logger.error(f"Error reading image file {file_path}: {e}") return None def _load_documents_from_directory( skill_dir: Path, text_extensions: list[str], image_extensions: list[str], max_image_size: int, ) -> dict[str, dict[str, Any]]: """Load all documents from a skill directory. Parameters ---------- skill_dir : Path Path to the skill directory. text_extensions : list[str] List of allowed text file extensions. image_extensions : list[str] List of allowed image file extensions. max_image_size : int Maximum image file size in bytes. Returns ------- dict[str, dict[str, Any]] Dictionary mapping relative paths to document metadata. """ documents = {} for file_path in skill_dir.rglob("*"): # Skip SKILL.md itself and directories if file_path.name == "SKILL.md" or file_path.is_dir(): continue # Calculate relative path from skill directory try: rel_path = str(file_path.relative_to(skill_dir)) except ValueError: continue # Process text files if _is_text_file(file_path, text_extensions): doc_data = _load_text_file(file_path) if doc_data: documents[rel_path] = doc_data # Process image files elif _is_image_file(file_path, image_extensions): doc_data = _load_image_file(file_path, max_image_size) if doc_data: documents[rel_path] = doc_data return documents def load_from_local(path: str, config: dict[str, Any] | None = None) -> list[Skill]: """Load skills from a local directory. Parameters ---------- path : str Path to local directory containing skills. config : dict[str, Any] | None Configuration dictionary with document loading settings. Returns ------- list[Skill] List of loaded skills. """ skills: list[Skill] = [] # Get configuration settings if config is None: config = {} load_documents = config.get("load_skill_documents", True) text_extensions = config.get( "text_file_extensions", [".md", ".py", ".txt", ".json", ".yaml", ".yml", ".sh", ".r", ".ipynb"], ) image_extensions = config.get( "allowed_image_extensions", [".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp"] ) max_image_size = config.get("max_image_size_bytes", 5242880) try: local_path = Path(path).expanduser().resolve() if not local_path.exists(): logger.warning(f"Local path {path} does not exist, skipping") return skills if not local_path.is_dir(): logger.warning(f"Local path {path} is not a directory, skipping") return skills # Find all SKILL.md files recursively skill_files = list(local_path.rglob("SKILL.md")) for skill_file in skill_files: try: content = skill_file.read_text(encoding="utf-8") skill = parse_skill_md(content, str(skill_file)) if skill: # Load additional documents from the skill directory if load_documents: skill_dir = skill_file.parent documents = _load_documents_from_directory( skill_dir, text_extensions, image_extensions, max_image_size ) skill.documents = documents if documents: logger.info( f"Loaded {len(documents)} additional documents for skill: {skill.name}" ) skills.append(skill) logger.info(f"Loaded skill: {skill.name} from {skill_file}") except Exception as e: logger.error(f"Error reading {skill_file}: {e}") continue logger.info(f"Loaded {len(skills)} skills from local path {path}") except Exception as e: logger.error(f"Error accessing local path {path}: {e}") return skills def _get_document_cache_dir() -> Path: """Get document cache directory. Returns ------- Path Path to document cache directory. """ cache_dir = Path(tempfile.gettempdir()) / "claude_skills_mcp_cache" / "documents" cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir def _get_cache_path(url: str, branch: str) -> Path: """Get cache file path for a GitHub repository. Parameters ---------- url : str GitHub repository URL. branch : str Branch name. Returns ------- Path Path to cache file. """ cache_dir = Path(tempfile.gettempdir()) / "claude_skills_mcp_cache" cache_dir.mkdir(exist_ok=True) # Create hash-based filename cache_key = f"{url}_{branch}" hash_key = hashlib.md5(cache_key.encode()).hexdigest() return cache_dir / f"{hash_key}.json" def _load_from_cache( cache_path: Path, max_age_hours: int = 24 ) -> dict[str, Any] | None: """Load cached GitHub API response if available and not expired. Parameters ---------- cache_path : Path Path to cache file. max_age_hours : int, optional Maximum cache age in hours, by default 24. Returns ------- dict[str, Any] | None Cached tree data or None if cache is invalid/expired. """ if not cache_path.exists(): return None try: with open(cache_path, "r") as f: cache_data = json.load(f) # Check if cache is expired cached_time = datetime.fromisoformat(cache_data["timestamp"]) if datetime.now() - cached_time > timedelta(hours=max_age_hours): logger.info(f"Cache expired for {cache_path}") return None logger.info(f"Using cached GitHub API response from {cache_path}") return cache_data["tree_data"] except Exception as e: logger.warning(f"Failed to load cache from {cache_path}: {e}") return None def _save_to_cache(cache_path: Path, tree_data: dict[str, Any]) -> None: """Save GitHub API response to cache. Parameters ---------- cache_path : Path Path to cache file. tree_data : dict[str, Any] GitHub tree data to cache. """ try: cache_data = { "timestamp": datetime.now().isoformat(), "tree_data": tree_data, } with open(cache_path, "w") as f: json.dump(cache_data, f) logger.info(f"Saved GitHub API response to cache: {cache_path}") except Exception as e: logger.warning(f"Failed to save cache to {cache_path}: {e}") def _get_document_metadata_from_github( owner: str, repo: str, branch: str, skill_dir_path: str, tree_data: dict[str, Any], text_extensions: list[str], image_extensions: list[str], ) -> dict[str, dict[str, Any]]: """Get document metadata from GitHub without fetching content. Parameters ---------- owner : str GitHub repository owner. repo : str GitHub repository name. branch : str Branch name. skill_dir_path : str Path to the skill directory within the repo. tree_data : dict[str, Any] GitHub API tree data for the repository. text_extensions : list[str] List of allowed text file extensions. image_extensions : list[str] List of allowed image file extensions. Returns ------- dict[str, dict[str, Any]] Dictionary mapping relative paths to document metadata (no content). """ documents = {} # Find all files in the skill directory (but not SKILL.md itself) for item in tree_data.get("tree", []): if item["type"] != "blob": continue item_path = item["path"] # Skip if not in the skill directory if not item_path.startswith(skill_dir_path): continue # Skip SKILL.md itself if item_path.endswith("/SKILL.md") or item_path == f"{skill_dir_path}/SKILL.md": continue # Calculate relative path from skill directory if skill_dir_path: rel_path = item_path[len(skill_dir_path) :].lstrip("/") else: rel_path = item_path if not rel_path: continue # Check file extension file_ext = Path(item_path).suffix.lower() # Store metadata for text and image files if file_ext in text_extensions: documents[rel_path] = { "type": "text", "size": item.get("size", 0), "url": f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{item_path}", "fetched": False, } elif file_ext in image_extensions: documents[rel_path] = { "type": "image", "size": item.get("size", 0), "url": f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{item_path}", "fetched": False, } return documents def _create_document_fetcher( owner: str, repo: str, branch: str, skill_dir_path: str, text_extensions: list[str], image_extensions: list[str], max_image_size: int, ) -> Callable: """Create a closure that fetches documents on-demand with disk caching. Parameters ---------- owner : str GitHub repository owner. repo : str GitHub repository name. branch : str Branch name. skill_dir_path : str Path to the skill directory within the repo. text_extensions : list[str] List of allowed text file extensions. image_extensions : list[str] List of allowed image file extensions. max_image_size : int Maximum image file size in bytes. Returns ------- callable Function that fetches a document by path. """ cache_dir = _get_document_cache_dir() def fetch_document(doc_path: str) -> dict[str, Any] | None: """Fetch a single document with local caching. Parameters ---------- doc_path : str Relative path to the document. Returns ------- dict[str, Any] | None Document content with metadata, or None if fetch failed. """ # Build full GitHub path if skill_dir_path: full_path = f"{skill_dir_path}/{doc_path}" else: full_path = doc_path url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{full_path}" # Check disk cache first cache_key = hashlib.md5(url.encode()).hexdigest() cache_file = cache_dir / f"{cache_key}.cache" if cache_file.exists(): try: with open(cache_file, "r", encoding="utf-8") as f: cached_data = json.load(f) logger.debug(f"Using cached document: {doc_path}") return cached_data except Exception as e: logger.warning(f"Failed to load cache for {doc_path}: {e}") # Fetch from GitHub try: file_ext = Path(doc_path).suffix.lower() with httpx.Client(timeout=30.0) as client: response = client.get(url) response.raise_for_status() # Process based on file type if file_ext in image_extensions: # Image file image_data = response.content file_size = len(image_data) if file_size > max_image_size: content = { "type": "image", "size": file_size, "size_exceeded": True, "url": url, "fetched": True, } else: base64_content = base64.b64encode(image_data).decode("utf-8") content = { "type": "image", "content": base64_content, "size": file_size, "url": url, "fetched": True, } elif file_ext in text_extensions: # Text file text_content = response.text content = { "type": "text", "content": text_content, "size": len(text_content), "fetched": True, } else: return None # Save to disk cache try: with open(cache_file, "w", encoding="utf-8") as f: json.dump(content, f) logger.debug(f"Cached document: {doc_path}") except Exception as e: logger.warning(f"Failed to cache document {doc_path}: {e}") return content except Exception as e: logger.error(f"Failed to fetch document {doc_path} from {url}: {e}") return None return fetch_document def load_from_github( url: str, subpath: str = "", config: dict[str, Any] | None = None ) -> list[Skill]: """Load skills from a GitHub repository. Parameters ---------- url : str GitHub repository URL. Can be: - Base repo URL: https://github.com/owner/repo - URL with branch and subpath: https://github.com/owner/repo/tree/branch/subpath subpath : str, optional Subdirectory within the repo to search, by default "". If the URL already contains a subpath, this parameter is ignored. config : dict[str, Any] | None Configuration dictionary with document loading settings. Returns ------- list[Skill] List of loaded skills. """ skills: list[Skill] = [] # Get configuration settings if config is None: config = {} load_documents = config.get("load_skill_documents", True) text_extensions = config.get( "text_file_extensions", [".md", ".py", ".txt", ".json", ".yaml", ".yml", ".sh", ".r", ".ipynb"], ) image_extensions = config.get( "allowed_image_extensions", [".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp"] ) max_image_size = config.get("max_image_size_bytes", 5242880) try: # Parse GitHub URL to extract owner, repo, branch, and subpath parsed = urlparse(url) path_parts = parsed.path.strip("/").split("/") if len(path_parts) < 2: logger.error(f"Invalid GitHub URL: {url}") return skills owner = path_parts[0] repo = path_parts[1] branch = "main" # Default branch # Check if URL contains /tree/{branch}/{subpath} format # e.g., https://github.com/owner/repo/tree/main/subdirectory if len(path_parts) > 3 and path_parts[2] == "tree": branch = path_parts[3] # Extract subpath from URL if provided (overrides subpath parameter) if len(path_parts) > 4: url_subpath = "/".join(path_parts[4:]) if not subpath: # Only use URL subpath if not explicitly provided subpath = url_subpath logger.info(f"Extracted subpath from URL: {subpath}") if subpath: logger.info( f"Loading skills from GitHub: {owner}/{repo} (branch: {branch}, subpath: {subpath})" ) else: logger.info( f"Loading skills from GitHub: {owner}/{repo} (branch: {branch})" ) # Get repository tree (with caching to avoid API limits) cache_path = _get_cache_path(url, branch) tree_data = _load_from_cache(cache_path) if tree_data is None: api_url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1" with httpx.Client(timeout=30.0) as client: response = client.get(api_url) response.raise_for_status() tree_data = response.json() # Save to cache _save_to_cache(cache_path, tree_data) # Find all SKILL.md files skill_paths = [] for item in tree_data.get("tree", []): if item["type"] == "blob" and item["path"].endswith("SKILL.md"): # Apply subpath filter if provided if subpath: if item["path"].startswith(subpath): skill_paths.append(item["path"]) else: skill_paths.append(item["path"]) # Load each SKILL.md file for skill_path in skill_paths: try: raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{skill_path}" with httpx.Client(timeout=30.0) as client: response = client.get(raw_url) response.raise_for_status() content = response.text source = f"{url}/tree/{branch}/{skill_path}" skill = parse_skill_md(content, source) if skill: # Load additional documents from the skill directory if load_documents: # Get the skill directory path (parent of SKILL.md) skill_dir_path = str(Path(skill_path).parent) if skill_dir_path == ".": skill_dir_path = "" # Get metadata only (lazy loading) documents = _get_document_metadata_from_github( owner, repo, branch, skill_dir_path, tree_data, text_extensions, image_extensions, ) # Create document fetcher for lazy loading fetcher = _create_document_fetcher( owner, repo, branch, skill_dir_path, text_extensions, image_extensions, max_image_size, ) skill.documents = documents skill._document_fetcher = fetcher if documents: logger.info( f"Found {len(documents)} additional documents for skill: {skill.name}" ) skills.append(skill) logger.info(f"Loaded skill: {skill.name} from {source}") except Exception as e: logger.error(f"Error loading {skill_path} from GitHub: {e}") continue logger.info(f"Loaded {len(skills)} skills from GitHub repo {url}") except httpx.HTTPStatusError as e: if e.response.status_code == 404: # Try 'master' branch instead try: logger.info( f"Branch 'main' not found, trying 'master' for {owner}/{repo}" ) branch = "master" # Try cache for master branch cache_path = _get_cache_path(url, branch) tree_data = _load_from_cache(cache_path) if tree_data is None: api_url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1" with httpx.Client(timeout=30.0) as client: response = client.get(api_url) response.raise_for_status() tree_data = response.json() # Save to cache _save_to_cache(cache_path, tree_data) # Repeat the loading process with master branch skill_paths = [] for item in tree_data.get("tree", []): if item["type"] == "blob" and item["path"].endswith("SKILL.md"): if subpath: if item["path"].startswith(subpath): skill_paths.append(item["path"]) else: skill_paths.append(item["path"]) for skill_path in skill_paths: try: raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{skill_path}" with httpx.Client(timeout=30.0) as client: response = client.get(raw_url) response.raise_for_status() content = response.text source = f"{url}/tree/{branch}/{skill_path}" skill = parse_skill_md(content, source) if skill: # Load additional documents from the skill directory if load_documents: # Get the skill directory path (parent of SKILL.md) skill_dir_path = str(Path(skill_path).parent) if skill_dir_path == ".": skill_dir_path = "" # Get metadata only (lazy loading) documents = _get_document_metadata_from_github( owner, repo, branch, skill_dir_path, tree_data, text_extensions, image_extensions, ) # Create document fetcher for lazy loading fetcher = _create_document_fetcher( owner, repo, branch, skill_dir_path, text_extensions, image_extensions, max_image_size, ) skill.documents = documents skill._document_fetcher = fetcher if documents: logger.info( f"Found {len(documents)} additional documents for skill: {skill.name}" ) skills.append(skill) logger.info(f"Loaded skill: {skill.name} from {source}") except Exception as e: logger.error(f"Error loading {skill_path} from GitHub: {e}") continue logger.info(f"Loaded {len(skills)} skills from GitHub repo {url}") except Exception as e2: logger.error( f"Error loading from GitHub repo {url} (tried both main and master): {e2}" ) else: logger.error(f"HTTP error loading from GitHub {url}: {e}") except Exception as e: logger.error(f"Error loading from GitHub {url}: {e}") return skills def load_all_skills( skill_sources: list[dict[str, Any]], config: dict[str, Any] | None = None ) -> list[Skill]: """Load skills from all configured sources. Parameters ---------- skill_sources : list[dict[str, Any]] List of skill source configurations. config : dict[str, Any] | None Configuration dictionary with document loading settings. Returns ------- list[Skill] All loaded skills from all sources. """ all_skills: list[Skill] = [] for source_config in skill_sources: source_type = source_config.get("type") if source_type == "github": url = source_config.get("url") subpath = source_config.get("subpath", "") if url: skills = load_from_github(url, subpath, config) all_skills.extend(skills) elif source_type == "local": path = source_config.get("path") if path: skills = load_from_local(path, config) all_skills.extend(skills) else: logger.warning(f"Unknown source type: {source_type}") logger.info(f"Total skills loaded: {len(all_skills)}") return all_skills

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/K-Dense-AI/claude-skills-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server