Skip to main content
Glama

Library MCP

by lethain
main.py20.8 kB
import os import re import sys import glob import yaml from typing import Any, Dict, List, Set, Optional, Union, Tuple from dataclasses import dataclass from datetime import datetime import httpx from mcp.server.fastmcp import FastMCP # Redirect all debug prints to stderr def debug_print(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) @dataclass class ContentFile: path: str meta: Dict[str, Any] data: str @property def date(self) -> Optional[datetime]: """Extract date from metadata or fallback to file modification time""" if 'date' in self.meta: try: date_str = str(self.meta['date']) if 'T' in date_str and not date_str.endswith('Z') and '+' not in date_str: date_str += 'Z' # Add UTC indicator if missing return datetime.fromisoformat(date_str.replace('Z', '+00:00')) except (ValueError, TypeError) as e: debug_print(f"Error parsing date: {e} for {self.path}") pass try: # Get modification time as naive datetime (assumed UTC) mod_time = datetime.fromtimestamp(os.path.getmtime(self.path)) # Return as naive datetime for consistent comparison return mod_time except OSError: return None @property def slug(self) -> str: """Extract slug from metadata or from filename""" if 'slug' in self.meta: return str(self.meta['slug']) # Extract from filename (basename without extension) filename = os.path.basename(self.path) return os.path.splitext(filename)[0] @property def url(self) -> Optional[str]: """Extract URL from metadata if available""" if 'url' in self.meta: return str(self.meta['url']) return None class HugoContentManager: def __init__(self, content_dirs: List[str]): self.content_dirs = content_dirs self.dir_to_files: Dict[str, List[str]] = {} self.path_to_content: Dict[str, ContentFile] = {} self.load_content() def load_content(self) -> None: """Load all content from the specified directories""" self.dir_to_files = {} self.path_to_content = {} for content_dir in self.content_dirs: if not os.path.isdir(content_dir): debug_print(f"Warning: {content_dir} is not a valid directory, skipping") continue md_files = [] for root, _, files in os.walk(content_dir): for file in files: if file.endswith('.md'): full_path = os.path.join(root, file) md_files.append(full_path) self.dir_to_files[content_dir] = md_files debug_print(f"Found {len(md_files)} markdown files in {content_dir}") for file_path in md_files: try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() meta, data = self._parse_markdown(content) self.path_to_content[file_path] = ContentFile( path=file_path, meta=meta, data=data ) except Exception as e: debug_print(f"Error processing {file_path}: {e}") debug_print(f"Total files processed: {len(self.path_to_content)}") def _normalize_tags(self, tags: Union[str, List, None]) -> List[str]: """Normalize tags to a list format regardless of input type""" if tags is None: return [] if isinstance(tags, list): return [str(tag).strip() for tag in tags] if isinstance(tags, str): # If it looks like a YAML list if tags.startswith('[') and tags.endswith(']'): inner = tags[1:-1].strip() if not inner: return [] return [tag.strip().strip('\'"') for tag in inner.split(',')] # If it's a comma-separated string if ',' in tags: return [tag.strip() for tag in tags.split(',')] # Single tag return [tags.strip()] # Any other type, convert to string and return as single item return [str(tags)] def _parse_markdown(self, content: str) -> Tuple[Dict[str, Any], str]: """Parse markdown content, separating front matter from content""" front_matter_pattern = r"^---\s*\n(.*?)\n---\s*\n" match = re.search(front_matter_pattern, content, re.DOTALL) if not match: return {}, content front_matter_text = match.group(1) # Use PyYAML to properly parse the front matter try: meta = yaml.safe_load(front_matter_text) or {} except Exception as e: debug_print(f"YAML parsing error: {e}") meta = {} # Ensure meta is a dictionary if not isinstance(meta, dict): debug_print(f"Front matter did not parse as a dictionary: {type(meta)}") meta = {} # Ensure tags are always in list format if 'tags' in meta and meta['tags'] is not None: if not isinstance(meta['tags'], list): meta['tags'] = [meta['tags']] # Extract the actual content (everything after front matter) data = content[match.end():] return meta, data def get_by_tag(self, tag: str, limit: int = 50) -> List[ContentFile]: """Find all files with a given tag""" matches = [] tag_lower = tag.lower() debug_print(f"Searching for tag: '{tag_lower}'") for file_path, content_file in self.path_to_content.items(): raw_tags = content_file.meta.get('tags', []) tags = self._normalize_tags(raw_tags) # Debug if tags: debug_print(f"File: {os.path.basename(file_path)} - Tags: {tags}") # Check for exact tag match (case insensitive) if any(tag_lower == t.lower() for t in tags): debug_print(f"Found exact tag match in {os.path.basename(file_path)}") matches.append(content_file) continue # Check if the tag is part of a tag for t in tags: if tag_lower in t.lower(): debug_print(f"Found partial tag match in {os.path.basename(file_path)}: '{t}'") matches.append(content_file) break debug_print(f"Found {len(matches)} files with tag '{tag}'") # Sort by date (most recent first) def get_sort_key(content_file): date = content_file.date if date is None: return datetime.min # Make date naive if it has timezone info if hasattr(date, 'tzinfo') and date.tzinfo is not None: date = date.replace(tzinfo=None) return date matches.sort(key=get_sort_key, reverse=True) return matches[:limit] def get_by_text(self, query: str, limit: int = 50) -> List[ContentFile]: """Find all files containing the specified text""" matches = [] query_lower = query.lower() debug_print(f"Searching for text: '{query}'") for file_path, content_file in self.path_to_content.items(): if query_lower in content_file.data.lower(): matches.append(content_file) debug_print(f"Found {len(matches)} files containing '{query}'") # Sort by date (most recent first) def get_sort_key(content_file): date = content_file.date if date is None: return datetime.min # Make date naive if it has timezone info if hasattr(date, 'tzinfo') and date.tzinfo is not None: date = date.replace(tzinfo=None) return date matches.sort(key=get_sort_key, reverse=True) return matches[:limit] def search_tags(self, tag_query: str, limit: int = 20) -> List[str]: """Search for tags matching the provided query""" all_tags = set() tag_query_lower = tag_query.lower() debug_print(f"Searching for tags containing: '{tag_query_lower}'") for _, content_file in self.path_to_content.items(): raw_tags = content_file.meta.get('tags', []) tags = self._normalize_tags(raw_tags) # Add tags that match the query for tag in tags: if tag_query_lower in tag.lower(): all_tags.add(tag) # Convert to list and sort alphabetically tag_list = sorted(list(all_tags)) debug_print(f"Found {len(tag_list)} tags matching '{tag_query_lower}'") return tag_list[:limit] def list_all_tags(self) -> List[Tuple[str, int, Optional[datetime]]]: """List all tags with their post count and most recent post date""" tag_info: Dict[str, Tuple[int, Optional[datetime]]] = {} debug_print("Collecting tag statistics...") for _, content_file in self.path_to_content.items(): raw_tags = content_file.meta.get('tags', []) tags = self._normalize_tags(raw_tags) post_date = content_file.date for tag in tags: if tag in tag_info: count, latest_date = tag_info[tag] # Handle the case where either date might be None if latest_date is None: new_latest = post_date elif post_date is None: new_latest = latest_date else: # Make both dates naive if they're not already if hasattr(latest_date, 'tzinfo') and latest_date.tzinfo is not None: latest_date = latest_date.replace(tzinfo=None) if hasattr(post_date, 'tzinfo') and post_date.tzinfo is not None: post_date = post_date.replace(tzinfo=None) new_latest = max(latest_date, post_date) tag_info[tag] = (count + 1, new_latest) else: tag_info[tag] = (1, post_date) # Convert to list of tuples (tag, count, latest_date) result = [(tag, count, date) for tag, (count, date) in tag_info.items()] # Sort by count (descending) and then by date (most recent first) # Make all dates naive for comparison def get_sort_key(item): count = item[1] date = item[2] if date is None: return (-count, datetime.min) # Make date naive if it has timezone info if hasattr(date, 'tzinfo') and date.tzinfo is not None: date = date.replace(tzinfo=None) return (-count, date) result.sort(key=get_sort_key, reverse=True) debug_print(f"Collected statistics for {len(result)} tags") return result def get_by_slug_or_url(self, identifier: str) -> Optional[ContentFile]: """Find a post by its slug or URL""" identifier_lower = identifier.lower() debug_print(f"Searching for post with slug or URL: '{identifier}'") # First check for exact URL match (case insensitive) for _, content_file in self.path_to_content.items(): url = content_file.url if url and url.lower() == identifier_lower: debug_print(f"Found exact URL match: {url}") return content_file # Then check for exact slug match (case insensitive) for _, content_file in self.path_to_content.items(): slug = content_file.slug if slug.lower() == identifier_lower: debug_print(f"Found exact slug match: {slug}") return content_file # Try partial path match if no exact matches found for path, content_file in self.path_to_content.items(): if identifier_lower in path.lower(): debug_print(f"Found partial path match: {path}") return content_file debug_print(f"No post found for '{identifier}'") return None def get_by_date_range(self, start_date: datetime, end_date: datetime, limit: int = 50) -> List[ContentFile]: """Find all posts within a date range""" matches = [] debug_print(f"Searching for posts between {start_date} and {end_date}") for _, content_file in self.path_to_content.items(): post_date = content_file.date if post_date: # Make date naive for comparison if it has timezone info if hasattr(post_date, 'tzinfo') and post_date.tzinfo is not None: post_date = post_date.replace(tzinfo=None) # Make start and end dates naive for comparison start_naive = start_date if hasattr(start_naive, 'tzinfo') and start_naive.tzinfo is not None: start_naive = start_naive.replace(tzinfo=None) end_naive = end_date if hasattr(end_naive, 'tzinfo') and end_naive.tzinfo is not None: end_naive = end_naive.replace(tzinfo=None) if start_naive <= post_date <= end_naive: matches.append(content_file) debug_print(f"Found {len(matches)} posts within date range") # Sort by date (most recent first) def get_sort_key(content_file): date = content_file.date if date is None: return datetime.min # Make date naive if it has timezone info if hasattr(date, 'tzinfo') and date.tzinfo is not None: date = date.replace(tzinfo=None) return date matches.sort(key=get_sort_key, reverse=True) return matches[:limit] def format_content_for_output(content_files: List[ContentFile]) -> str: """Format the content files for output""" if not content_files: return "No matching content found." result = [] for i, file in enumerate(content_files): result.append(f"File: {file.path}") result.append("Metadata:") for key, value in file.meta.items(): result.append(f" {key}: {value}") # Include the full content result.append("Content:") result.append(file.data.strip()) # Add separator between entries, but not after the last one if i < len(content_files) - 1: result.append("-" * 50) return "\n".join(result) def format_tags_for_output(tags: List[Tuple[str, int, Optional[datetime]]]) -> str: """Format tag information for output""" if not tags: return "No tags found." result = [] result.append("Tags (by post count and most recent post):") for tag, count, date in tags: if date is None: date_str = "Unknown" else: # Strip timezone info for display if present if hasattr(date, 'tzinfo') and date.tzinfo is not None: date = date.replace(tzinfo=None) # Only use date part for display if date != datetime.min: date_str = date.strftime("%Y-%m-%d") else: date_str = "Unknown" result.append(f"- {tag}: {count} posts, most recent: {date_str}") return "\n".join(result) # Create MCP server mcp = FastMCP("hugo_content") content_manager = None @mcp.tool() async def get_by_tag(tag: str, limit: int = 50) -> str: """Get blog content by its tag. Args: tag: the tag associated with content limit: the number of results to include """ if content_manager is None: return "Content has not been loaded. Please ensure the server is properly initialized." matching_content = content_manager.get_by_tag(tag, limit) return format_content_for_output(matching_content) @mcp.tool() async def get_by_text(query: str, limit: int = 50) -> str: """Get blog content by text in content. Args: query: text for an exact match limit: the number of results to include """ if content_manager is None: return "Content has not been loaded. Please ensure the server is properly initialized." matching_content = content_manager.get_by_text(query, limit) return format_content_for_output(matching_content) @mcp.tool() async def rebuild() -> bool: """Rebuild text index. Useful for when contents have changed on disk""" if content_manager is None: return False debug_print("Rebuilding content index...") content_manager.load_content() debug_print("Content index rebuilt successfully") return True @mcp.tool() async def search_tags(tag_query: str, limit: int = 20) -> str: """Search for tags matching the provided query. Args: tag_query: partial or full tag name to search for limit: the maximum number of tags to return """ if content_manager is None: return "Content has not been loaded. Please ensure the server is properly initialized." matching_tags = content_manager.search_tags(tag_query, limit) if not matching_tags: return f"No tags found matching '{tag_query}'." result = [f"Tags matching '{tag_query}':"] for tag in matching_tags: result.append(f"- {tag}") return "\n".join(result) @mcp.tool() async def list_all_tags() -> str: """List all tags sorted by number of posts and most recent post.""" if content_manager is None: return "Content has not been loaded. Please ensure the server is properly initialized." tag_info = content_manager.list_all_tags() return format_tags_for_output(tag_info) @mcp.tool() async def get_by_slug_or_url(identifier: str) -> str: """Get a post by its slug or URL. Args: identifier: the slug, URL, or path fragment to search for """ if content_manager is None: return "Content has not been loaded. Please ensure the server is properly initialized." post = content_manager.get_by_slug_or_url(identifier) if post is None: return f"No post found with slug or URL matching '{identifier}'." # Format as a list to reuse format_content_for_output return format_content_for_output([post]) @mcp.tool() async def get_by_date_range(start_date: str, end_date: str, limit: int = 50) -> str: """Get posts published within a date range. Args: start_date: the start date in ISO format (YYYY-MM-DD) end_date: the end date in ISO format (YYYY-MM-DD) limit: the maximum number of posts to return """ if content_manager is None: return "Content has not been loaded. Please ensure the server is properly initialized." try: # Parse dates with time set to beginning/end of day # Always create naive datetimes for consistent comparison start = datetime.fromisoformat(f"{start_date}T00:00:00") end = datetime.fromisoformat(f"{end_date}T23:59:59") except ValueError as e: return f"Error parsing dates: {e}. Please use ISO format (YYYY-MM-DD)." posts = content_manager.get_by_date_range(start, end, limit) return format_content_for_output(posts) if __name__ == "__main__": if len(sys.argv) < 2: debug_print("Usage: python script.py <content_dir1> [<content_dir2> ...]") sys.exit(1) content_dirs = sys.argv[1:] debug_print(f"Loading content from directories: {', '.join(content_dirs)}") content_manager = HugoContentManager(content_dirs) debug_print(f"Loaded {len(content_manager.path_to_content)} markdown files") mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lethain/library-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server