Skip to main content
Glama

DevDocs MCP Server

by cyberagiinc
server.py38.7 kB
#!/usr/bin/env python3 import sys import logging import signal import json import re import asyncio import math from pathlib import Path from typing import Optional, Dict, List, Tuple, Any import mcp.types as types from mcp.server import Server from mcp.server.stdio import stdio_server from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from difflib import SequenceMatcher logger = logging.getLogger(__name__) from .document_structure import DocumentStructure, Section class MarkdownStore: """Manages markdown content and metadata.""" def __init__(self, storage_path: str): self.base_path = Path(storage_path) self.content_cache = {} self.metadata_cache = {} self.structure_cache = {} # Cache for parsed document structures def _calculate_similarity(self, text1: str, text2: str) -> float: """Calculate text similarity using SequenceMatcher.""" return SequenceMatcher(None, text1.lower(), text2.lower()).ratio() def _calculate_confidence(self, similarity: float, match_type: str) -> float: """Calculate confidence score based on similarity and match type.""" # Base confidence from similarity score confidence = similarity # Adjust based on match type if match_type == "exact": confidence = min(1.0, confidence * 1.2) # Boost exact matches elif match_type == "fuzzy": confidence = confidence * 0.9 # Slightly reduce fuzzy matches elif match_type == "regex": confidence = confidence * 0.95 # Slightly reduce regex matches return round(confidence, 2) async def sync_all_files(self): """Initial sync of all files in the storage directory.""" logger.info("Starting initial sync of all files...") try: for md_file in self.base_path.glob("*.md"): file_id = md_file.stem await self.sync_file(file_id) logger.info("Initial sync completed successfully") except Exception as e: logger.error(f"Error during initial sync: {e}") raise async def get_content(self, file_id: str) -> str: """Get markdown content.""" file_path = self.base_path / f"{file_id}.md" try: content = file_path.read_text(encoding='utf-8') # Parse and cache document structure if file_id not in self.structure_cache: structure = DocumentStructure() structure.parse_document(content) self.structure_cache[file_id] = structure return content except Exception as e: logger.error(f"Error reading content for {file_id}: {e}") return f"Error reading content: {str(e)}" async def get_section(self, file_id: str, section_id: str) -> str: """Get a specific section from a markdown file.""" try: if file_id not in self.structure_cache: await self.get_content(file_id) # This will parse and cache the structure structure = self.structure_cache[file_id] section = structure.get_section_by_id(section_id) if not section: return f"Section '{section_id}' not found in {file_id}" return f"Section: {section.title}\n\n{section.content}" except Exception as e: logger.error(f"Error getting section {section_id} from {file_id}: {e}") return f"Error getting section: {str(e)}" async def get_table_of_contents(self, file_id: str) -> str: """Get table of contents for a markdown file.""" try: if file_id not in self.structure_cache: await self.get_content(file_id) # This will parse and cache the structure structure = self.structure_cache[file_id] toc = structure.get_table_of_contents() result = [f"Table of Contents for {file_id}:"] for level, title, section_id in toc: indent = " " * level result.append(f"{indent}- {title} [{section_id}]") return "\n".join(result) except Exception as e: logger.error(f"Error getting table of contents for {file_id}: {e}") return f"Error getting table of contents: {str(e)}" async def get_metadata(self, file_id: str) -> dict: """Get metadata as a dictionary.""" file_path = self.base_path / f"{file_id}.json" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: # Create a default metadata file with a pages array if the MD file exists md_file_path = self.base_path / f"{file_id}.md" if md_file_path.exists(): try: content = md_file_path.read_text(encoding='utf-8') # Create a basic metadata structure with pages array to mark as consolidated from datetime import datetime default_metadata = { "title": f"Documentation for {file_id}", "timestamp": datetime.now().isoformat(), "pages": [ { "title": "Main Content", "url": f"file://{file_id}", "timestamp": datetime.now().isoformat(), "internal_links": 0, "external_links": 0 } ], "is_consolidated": True, "last_updated": datetime.now().isoformat() } # Write the default metadata to file with open(file_path, 'w', encoding='utf-8') as f: json.dump(default_metadata, f, indent=2) logger.info(f"Created default metadata for {file_id}") return default_metadata except Exception as write_error: logger.error(f"Error creating default metadata for {file_id}: {write_error}") logger.error(f"Error reading metadata for {file_id}: File not found") return {} except Exception as e: logger.error(f"Error reading metadata for {file_id}: {e}") return {} async def get_index(self) -> str: """Get list of available files.""" try: files = [] for md_file in self.base_path.glob("*.md"): files.append(md_file.stem) return "\n".join(files) except Exception as e: logger.error(f"Error getting index: {e}") return f"Error getting index: {str(e)}" async def sync_file(self, file_id: str) -> str: """Force sync a file.""" try: # Clear caches for this file self.content_cache.pop(file_id, None) self.metadata_cache.pop(file_id, None) self.structure_cache.pop(file_id, None) # Reload content and metadata content = await self.get_content(file_id) metadata = await self.get_metadata(file_id) self.content_cache[file_id] = content self.metadata_cache[file_id] = metadata logger.info(f"Successfully synced {file_id}") return f"Successfully synced {file_id}" except Exception as e: logger.error(f"Error syncing {file_id}: {e}") return f"Error syncing file: {str(e)}" async def list_files(self) -> str: """List all available markdown files.""" try: files = [] for md_file in self.base_path.glob("*.md"): files.append(f"- {md_file.stem}") if not files: return "No markdown files found" return "Available markdown files:\n" + "\n".join(files) except Exception as e: logger.error(f"Error listing files: {e}") return f"Error listing files: {str(e)}" async def read_file(self, file_id: str) -> str: """Read and return the content of a markdown file.""" try: content = await self.get_content(file_id) metadata = await self.get_metadata(file_id) return f"""Content of {file_id}.md: {content} Metadata: - Last modified: {metadata.get('timestamp', 'Unknown')} - Word count: {metadata.get('stats', {}).get('wordCount', 'Unknown')} - Character count: {metadata.get('stats', {}).get('charCount', 'Unknown')} """ except Exception as e: logger.error(f"Error reading file {file_id}: {e}") return f"Error reading file: {str(e)}" async def search_files(self, query: str) -> str: """Search content across all markdown files.""" try: results = [] for md_file in self.base_path.glob("*.md"): file_id = md_file.stem content = await self.get_content(file_id) metadata = await self.get_metadata(file_id) if query.lower() in content.lower(): # Find the context around the match lines = content.split('\n') for i, line in enumerate(lines): if query.lower() in line.lower(): context_start = max(0, i - 2) context_end = min(len(lines), i + 3) context = '\n'.join(lines[context_start:context_end]) results.append(f"""Match in {file_id}.md: Context: {context} ---""") if not results: return f"No matches found for query: {query}" return "\n\n".join(results) except Exception as e: logger.error(f"Error searching files: {e}") return f"Error searching files: {str(e)}" async def smart_section_search(self, query: str, max_results: int = 10, use_fuzzy: bool = True, use_regex: bool = True) -> str: """ Advanced search for sections across all markdown files with ranking and confidence scores. Args: query: The search query max_results: Maximum number of results to return use_fuzzy: Whether to use fuzzy matching use_regex: Whether to use regex matching Returns: Formatted search results with rankings, confidence scores, and citations """ try: all_matches = [] # Process all markdown files for md_file in self.base_path.glob("*.md"): file_id = md_file.stem # Ensure the document structure is loaded if file_id not in self.structure_cache: await self.get_content(file_id) structure = self.structure_cache[file_id] # Get all sections from the document sections = [] def collect_sections(section_list): for section in section_list: sections.append(section) collect_sections(section.subsections) collect_sections(structure.sections) # Search through each section for section in sections: matches = [] # Exact match in title if query.lower() in section.title.lower(): similarity = self._calculate_similarity(query, section.title) confidence = self._calculate_confidence(similarity, "exact") matches.append({ "match_type": "exact_title", "similarity": similarity, "confidence": confidence, "score": confidence * 1.2 # Boost title matches }) # Exact match in content if query.lower() in section.content.lower(): similarity = self._calculate_similarity(query, section.content[:min(len(section.content), len(query) * 10)]) confidence = self._calculate_confidence(similarity, "exact") matches.append({ "match_type": "exact_content", "similarity": similarity, "confidence": confidence, "score": confidence }) # Fuzzy match if use_fuzzy: # Fuzzy match in title title_similarity = self._calculate_similarity(query, section.title) if title_similarity > 0.6: # Threshold for fuzzy matches confidence = self._calculate_confidence(title_similarity, "fuzzy") matches.append({ "match_type": "fuzzy_title", "similarity": title_similarity, "confidence": confidence, "score": confidence * 1.1 # Slightly boost title matches }) # Fuzzy match in content (first 1000 chars to avoid performance issues) content_sample = section.content[:min(len(section.content), 1000)] # Split content into chunks to find best matching segment chunks = [content_sample[i:i+len(query)*3] for i in range(0, len(content_sample), len(query)*2) if i+len(query)*3 <= len(content_sample)] if chunks: best_chunk_similarity = max(self._calculate_similarity(query, chunk) for chunk in chunks) if best_chunk_similarity > 0.6: confidence = self._calculate_confidence(best_chunk_similarity, "fuzzy") matches.append({ "match_type": "fuzzy_content", "similarity": best_chunk_similarity, "confidence": confidence, "score": confidence }) # Regex match if use_regex: try: pattern = re.compile(query, re.IGNORECASE) # Regex match in title if pattern.search(section.title): # Calculate how much of the title matches the pattern match_length = sum(len(m.group(0)) for m in pattern.finditer(section.title)) similarity = match_length / len(section.title) confidence = self._calculate_confidence(similarity, "regex") matches.append({ "match_type": "regex_title", "similarity": similarity, "confidence": confidence, "score": confidence * 1.1 # Slightly boost title matches }) # Regex match in content if pattern.search(section.content): # Calculate how much of the content matches the pattern match_length = sum(len(m.group(0)) for m in pattern.finditer(section.content[:1000])) similarity = match_length / min(len(section.content), 1000) confidence = self._calculate_confidence(similarity, "regex") matches.append({ "match_type": "regex_content", "similarity": similarity, "confidence": confidence, "score": confidence }) except re.error: # If the query isn't a valid regex, just skip regex matching pass # If we have any matches, add this section to the results if matches: # Get the best match best_match = max(matches, key=lambda x: x["score"]) # Extract a relevant snippet from the content snippet = section.content if len(snippet) > 300: # Try to find the query in the content query_pos = section.content.lower().find(query.lower()) if query_pos >= 0: # Extract content around the query start = max(0, query_pos - 100) end = min(len(section.content), query_pos + len(query) + 100) snippet = "..." + section.content[start:end] + "..." else: # Just take the first part of the content snippet = section.content[:300] + "..." # Create a section ID if needed section_id = structure._make_section_id(section.title) all_matches.append({ "file_id": file_id, "section_title": section.title, "section_id": section_id, "snippet": snippet, "match_type": best_match["match_type"], "confidence": best_match["confidence"], "score": best_match["score"] }) # Sort results by score (descending) all_matches.sort(key=lambda x: x["score"], reverse=True) # Limit to max_results all_matches = all_matches[:max_results] # Format results if not all_matches: return f"No matches found for query: {query}" results = [f"# Smart Section Search Results for: '{query}'"] for i, match in enumerate(all_matches, 1): confidence_percent = int(match["confidence"] * 100) match_type_display = match["match_type"].replace("_", " ").title() results.append(f""" ## Result {i} - {match["section_title"]} (Confidence: {confidence_percent}%) **File:** {match["file_id"]}.md **Section:** {match["section_title"]} **Match Type:** {match_type_display} **Confidence:** {confidence_percent}% **Snippet:** {match["snippet"]} **Citation:** `{match["file_id"]}#{match["section_id"]}` ---""") return "\n".join(results) except Exception as e: logger.error(f"Error in smart section search: {e}") return f"Error performing smart section search: {str(e)}" async def search_by_tag(self, tag: str) -> str: """Search files by metadata tags.""" try: results = [] for json_file in self.base_path.glob("*.json"): file_id = json_file.stem metadata = await self.get_metadata(file_id) # Check both metadata.tags and top-level tags tags = metadata.get('metadata', {}).get('tags', []) + metadata.get('tags', []) if tag.lower() in [t.lower() for t in tags]: results.append(f"""File: {file_id}.md Tags: {', '.join(tags)} Last modified: {metadata.get('timestamp', 'Unknown')} ---""") if not results: return f"No files found with tag: {tag}" return "\n\n".join(results) except Exception as e: logger.error(f"Error searching by tag: {e}") return f"Error searching by tag: {str(e)}" async def get_stats(self) -> str: """Get statistics about all markdown files.""" try: total_files = 0 total_words = 0 total_chars = 0 files_by_month = {} all_tags = set() for json_file in self.base_path.glob("*.json"): file_id = json_file.stem metadata = await self.get_metadata(file_id) total_files += 1 total_words += metadata.get('stats', {}).get('wordCount', 0) total_chars += metadata.get('stats', {}).get('charCount', 0) # Extract month from timestamp timestamp = metadata.get('timestamp', '') if timestamp: month = timestamp[:7] # YYYY-MM files_by_month[month] = files_by_month.get(month, 0) + 1 # Collect all tags tags = metadata.get('metadata', {}).get('tags', []) + metadata.get('tags', []) all_tags.update(tags) stats = f"""Markdown Files Statistics: Total Files: {total_files} Total Words: {total_words} Total Characters: {total_chars} Files by Month: {chr(10).join(f'- {month}: {count} files' for month, count in sorted(files_by_month.items()))} All Tags: {chr(10).join(f'- {tag}' for tag in sorted(all_tags))} """ return stats except Exception as e: logger.error(f"Error getting stats: {e}") return f"Error getting statistics: {str(e)}" class MarkdownEventHandler(FileSystemEventHandler): """Handles file system events for markdown files.""" def __init__(self, store: MarkdownStore, loop: asyncio.AbstractEventLoop): self.store = store self.loop = loop def sync_file(self, path: str): """Sync a file when it's created or modified.""" if path.endswith(('.md', '.json')): file_id = Path(path).stem asyncio.run_coroutine_threadsafe( self.store.sync_file(file_id), self.loop ) def on_created(self, event): """Handle file creation.""" if not event.is_directory: self.sync_file(event.src_path) def on_modified(self, event): """Handle file modification.""" if not event.is_directory: self.sync_file(event.src_path) class FastMarkdownServer: """MCP server for markdown content management.""" def __init__(self, storage_path: str): self.server = Server("fast-markdown", version="1.0.0") # Set default version self.store = MarkdownStore(storage_path) self.loop = asyncio.get_event_loop() self.event_handler = MarkdownEventHandler(self.store, self.loop) self.observer = Observer() self.observer.schedule(self.event_handler, storage_path, recursive=False) self.setup_handlers() def setup_handlers(self): """Set up request handlers.""" @self.server.list_resources() async def list_resources() -> list[types.Resource]: """List available resources.""" files = [] for md_file in self.store.base_path.glob("*.md"): file_id = md_file.stem files.append( types.Resource( uri=f"markdown://{file_id}/content", name=f"Markdown content for {file_id}", mimeType="text/markdown" ) ) return files @self.server.read_resource() async def read_resource(uri: str) -> str: """Read resource content.""" if not uri.startswith("markdown://"): raise ValueError(f"Invalid resource URI: {uri}") parts = uri.split("/") if len(parts) != 4 or parts[3] not in ["content", "metadata"]: raise ValueError(f"Invalid resource URI format: {uri}") file_id = parts[2] resource_type = parts[3] if resource_type == "content": return await self.store.get_content(file_id) else: return json.dumps(await self.store.get_metadata(file_id), indent=2) @self.server.list_tools() async def list_tools() -> list[types.Tool]: """List available tools.""" return [ types.Tool( name="sync_file", description="Force sync a specific file", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file to sync (without .md extension)" } }, "required": ["file_id"] } ), types.Tool( name="get_status", description="Get server status", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="list_files", description="List all available markdown files", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="read_file", description="Read the content of a markdown file", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file to read (without .md extension)" } }, "required": ["file_id"] } ), types.Tool( name="search_files", description="Search content across all markdown files", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query to find in markdown content" } }, "required": ["query"] } ), types.Tool( name="search_by_tag", description="Search files by metadata tags", inputSchema={ "type": "object", "properties": { "tag": { "type": "string", "description": "Tag to search for in file metadata" } }, "required": ["tag"] } ), types.Tool( name="get_stats", description="Get statistics about all markdown files", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_section", description="Get a specific section from a markdown file", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file (without .md extension)" }, "section_id": { "type": "string", "description": "ID of the section to retrieve" } }, "required": ["file_id", "section_id"] } ), types.Tool( name="get_table_of_contents", description="Get table of contents for a markdown file", inputSchema={ "type": "object", "properties": { "file_id": { "type": "string", "description": "ID of the file (without .md extension)" } }, "required": ["file_id"] } ), types.Tool( name="smart_section_search", description="Advanced search for sections with ranking and confidence scores", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query to find in sections" }, "max_results": { "type": "number", "description": "Maximum number of results to return (default: 10)" }, "use_fuzzy": { "type": "boolean", "description": "Whether to use fuzzy matching (default: true)" }, "use_regex": { "type": "boolean", "description": "Whether to use regex matching (default: true)" } }, "required": ["query"] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> list[types.TextContent]: """Execute tool.""" if name == "sync_file": file_id = arguments.get("file_id") if not file_id: raise ValueError("file_id is required") result = await self.store.sync_file(file_id) return [types.TextContent(type="text", text=result)] elif name == "get_status": return [types.TextContent(type="text", text="Server is running and monitoring markdown files")] elif name == "list_files": result = await self.store.list_files() return [types.TextContent(type="text", text=result)] elif name == "read_file": file_id = arguments.get("file_id") if not file_id: raise ValueError("file_id is required") result = await self.store.read_file(file_id) return [types.TextContent(type="text", text=result)] elif name == "search_files": query = arguments.get("query") if not query: raise ValueError("query is required") result = await self.store.search_files(query) return [types.TextContent(type="text", text=result)] elif name == "search_by_tag": tag = arguments.get("tag") if not tag: raise ValueError("tag is required") result = await self.store.search_by_tag(tag) return [types.TextContent(type="text", text=result)] elif name == "get_stats": result = await self.store.get_stats() return [types.TextContent(type="text", text=result)] elif name == "get_section": file_id = arguments.get("file_id") section_id = arguments.get("section_id") if not file_id or not section_id: raise ValueError("file_id and section_id are required") result = await self.store.get_section(file_id, section_id) return [types.TextContent(type="text", text=result)] elif name == "get_table_of_contents": file_id = arguments.get("file_id") if not file_id: raise ValueError("file_id is required") result = await self.store.get_table_of_contents(file_id) return [types.TextContent(type="text", text=result)] elif name == "smart_section_search": query = arguments.get("query") if not query: raise ValueError("query is required") # Get optional parameters with defaults max_results = arguments.get("max_results", 10) use_fuzzy = arguments.get("use_fuzzy", True) use_regex = arguments.get("use_regex", True) # Ensure max_results is an integer try: max_results = int(max_results) except (ValueError, TypeError): max_results = 10 # Ensure boolean parameters are actually booleans use_fuzzy = bool(use_fuzzy) use_regex = bool(use_regex) result = await self.store.smart_section_search( query, max_results=max_results, use_fuzzy=use_fuzzy, use_regex=use_regex ) return [types.TextContent(type="text", text=result)] else: raise ValueError(f"Unknown tool: {name}") async def run(self): """Run the server.""" logger.info("Starting server...") # Start the file observer self.observer.start() # Initial sync of all files await self.store.sync_all_files() try: # Keep the server running while True: try: async with stdio_server() as streams: await self.server.run( streams[0], streams[1], self.server.create_initialization_options() ) except Exception as e: logger.error(f"Server error: {e}") # Wait before retrying await asyncio.sleep(1) finally: self.observer.stop() self.observer.join() logger.info("Server shutdown complete") def setup_logging(): """Configure logging.""" # Get the project root directory root_dir = Path(__file__).parents[3].resolve() log_dir = root_dir / "logs" log_dir.mkdir(exist_ok=True) # Use absolute path for log file log_path = log_dir / "mcp.log" # Configure file handler for all logs file_handler = logging.FileHandler(str(log_path)) file_handler.setLevel(logging.INFO) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) # Configure console handler with higher log level to reduce noise console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) # Only show WARNING and above in console console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) # Specifically set higher log level for MCP SDK's internal logging mcp_logger = logging.getLogger('mcp.server.lowlevel') mcp_logger.setLevel(logging.WARNING) def handle_sigterm(signum, frame): """Handle SIGTERM signal.""" logger.info("Received shutdown signal") sys.exit(0) async def main() -> None: """Main entry point.""" if len(sys.argv) != 2: print("Usage: fast-markdown-mcp <storage_path>") sys.exit(1) setup_logging() signal.signal(signal.SIGTERM, handle_sigterm) storage_path = sys.argv[1] try: server = FastMarkdownServer(storage_path) logger.info(f"Starting server with storage path: {storage_path}") await server.run() except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {e}") raise finally: logger.info("Server shutdown complete") if __name__ == "__main__": import asyncio asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cyberagiinc/DevDocs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server