Skip to main content
Glama

Magento 2 GraphQL Documentation MCP Server

parser.py•13.4 kB
import hashlib import logging import re from datetime import datetime, timezone from pathlib import Path from typing import List, Optional, Tuple from pydantic import BaseModel, Field import frontmatter from .config import MAX_FIELDS_PER_ELEMENT logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Document(BaseModel): """Represents a documentation page""" id: str # SHA256 hash of file_path file_path: str # Relative to docs root title: str description: Optional[str] = None keywords: List[str] = Field(default_factory=list) category: str # e.g., "schema", "develop", "usage" subcategory: Optional[str] = None # e.g., "products", "cart" content_type: str # "guide", "reference", "tutorial", "schema" searchable_text: str # Combined: title + description + content headers: List[str] = Field(default_factory=list) # All markdown headers last_modified: datetime content_md: str # Full markdown content class CodeBlock(BaseModel): """Represents a code example""" document_id: str language: str # graphql, json, javascript, bash code: str context: Optional[str] = None # Surrounding text for context line_number: int class GraphQLElement(BaseModel): """Represents a GraphQL schema element""" document_id: str element_type: str # query, mutation, type, interface, union name: str fields: List[str] = Field(default_factory=list) parameters: List[str] = Field(default_factory=list) return_type: Optional[str] = None description: Optional[str] = None searchable_text: str class MarkdownDocParser: """Parse markdown documentation files""" def __init__(self, docs_root: Path): self.docs_root = Path(docs_root) if not self.docs_root.exists(): raise FileNotFoundError(f"Documentation directory not found: {docs_root}") def walk_directory(self) -> List[Path]: """Find all .md files recursively""" md_files = list(self.docs_root.rglob("**/*.md")) logger.info(f"Found {len(md_files)} markdown files") return md_files def parse_file(self, file_path: Path) -> Document: """Parse single markdown file""" # Read file content with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Parse frontmatter post = frontmatter.loads(content) metadata = post.metadata markdown_content = post.content # Get file stats stat = file_path.stat() last_modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) # Extract relative path rel_path = str(file_path.relative_to(self.docs_root)) # Generate document ID doc_id = hashlib.sha256(rel_path.encode('utf-8')).hexdigest() # Extract category and subcategory from path category, subcategory = self.extract_category_from_path(file_path) # Determine content type content_type = self.determine_content_type(category, rel_path) # Extract title (from frontmatter or first header) title = metadata.get('title', self.extract_first_header(markdown_content) or rel_path) # Extract description description = metadata.get('description') # Extract keywords keywords = metadata.get('keywords', []) if isinstance(keywords, str): keywords = [keywords] # Extract headers headers = self.extract_headers(markdown_content) # Build searchable text searchable_text = self.build_searchable_text( title, description, markdown_content, keywords, headers ) return Document( id=doc_id, file_path=rel_path, title=title, description=description, keywords=keywords, category=category, subcategory=subcategory, content_type=content_type, searchable_text=searchable_text, headers=headers, last_modified=last_modified, content_md=markdown_content ) def extract_category_from_path(self, file_path: Path) -> Tuple[str, Optional[str]]: """Extract category and subcategory from file path""" rel_path = file_path.relative_to(self.docs_root) parts = rel_path.parts if len(parts) == 1: # Root level file (e.g., index.md, release-notes.md) return "root", None category = parts[0] # First directory: schema, develop, usage, tutorials, etc. # Extract subcategory (second level directory if exists) subcategory = None if len(parts) > 2: subcategory = parts[1] return category, subcategory def determine_content_type(self, category: str, rel_path: str) -> str: """Determine content type based on category and path""" if category == "schema": return "schema" elif category == "tutorials": return "tutorial" elif category in ["develop", "usage"]: return "guide" elif "reference" in rel_path.lower(): return "reference" else: return "guide" def extract_first_header(self, markdown: str) -> Optional[str]: """Extract first markdown header as fallback title""" for line in markdown.split('\n'): line = line.strip() if line.startswith('#'): # Remove # symbols and strip whitespace return line.lstrip('#').strip() return None def extract_headers(self, markdown: str) -> List[str]: """Extract all markdown headers""" headers = [] for line in markdown.split('\n'): line = line.strip() if line.startswith('#'): # Remove # symbols and keep the text header_text = line.lstrip('#').strip() if header_text: headers.append(header_text) return headers def build_searchable_text( self, title: str, description: Optional[str], content: str, keywords: List[str], headers: List[str] ) -> str: """Build searchable text from all available text""" parts = [title] if description: parts.append(description) if keywords: parts.extend(keywords) # Add headers parts.extend(headers) # Add content (remove code blocks and excessive whitespace) content_text = self.clean_content(content) parts.append(content_text) return " ".join(filter(None, parts)) def clean_content(self, markdown: str) -> str: """Clean markdown content for search indexing""" # Remove code blocks content = re.sub(r'```[\s\S]*?```', '', markdown) # Remove inline code content = re.sub(r'`[^`]+`', '', content) # Remove markdown links but keep text [text](url) -> text content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content) # Remove excessive whitespace content = ' '.join(content.split()) return content def extract_code_blocks(self, markdown: str, doc_id: str) -> List[CodeBlock]: """Extract code blocks with language tags""" code_blocks = [] lines = markdown.split('\n') i = 0 line_number = 0 while i < len(lines): line = lines[i] # Check for code block start if line.strip().startswith('```'): # Extract language language = line.strip()[3:].strip().lower() if not language: language = "text" # Find code block end code_lines = [] i += 1 start_line = i + 1 while i < len(lines) and not lines[i].strip().startswith('```'): code_lines.append(lines[i]) i += 1 code = '\n'.join(code_lines) # Extract context (previous paragraph) context = self.extract_context(lines, start_line - 2) code_blocks.append(CodeBlock( document_id=doc_id, language=language, code=code, context=context, line_number=start_line )) i += 1 line_number += 1 return code_blocks def extract_context(self, lines: List[str], start_idx: int) -> Optional[str]: """Extract context before code block (previous paragraph)""" if start_idx < 0: return None # Look backwards for non-empty lines context_lines = [] for i in range(start_idx, max(-1, start_idx - 5), -1): line = lines[i].strip() if line and not line.startswith('#'): context_lines.insert(0, line) elif context_lines: # Stop at empty line or header break return ' '.join(context_lines) if context_lines else None def extract_graphql_elements(self, code: str, doc_id: str) -> List[GraphQLElement]: """Extract GraphQL queries/mutations/types from code blocks""" elements = [] # Detect query query_match = re.search(r'query\s+(\w+)', code) if query_match: name = query_match.group(1) fields = self.extract_fields(code) parameters = self.extract_parameters(code) elements.append(GraphQLElement( document_id=doc_id, element_type="query", name=name, fields=fields, parameters=parameters, searchable_text=f"query {name} {' '.join(fields)} {' '.join(parameters)}" )) # Detect mutation mutation_match = re.search(r'mutation\s+(\w+)', code) if mutation_match: name = mutation_match.group(1) fields = self.extract_fields(code) parameters = self.extract_parameters(code) elements.append(GraphQLElement( document_id=doc_id, element_type="mutation", name=name, fields=fields, parameters=parameters, searchable_text=f"mutation {name} {' '.join(fields)} {' '.join(parameters)}" )) # Detect type type_match = re.search(r'type\s+(\w+)', code) if type_match: name = type_match.group(1) fields = self.extract_fields(code) elements.append(GraphQLElement( document_id=doc_id, element_type="type", name=name, fields=fields, searchable_text=f"type {name} {' '.join(fields)}" )) # Detect interface interface_match = re.search(r'interface\s+(\w+)', code) if interface_match: name = interface_match.group(1) fields = self.extract_fields(code) elements.append(GraphQLElement( document_id=doc_id, element_type="interface", name=name, fields=fields, searchable_text=f"interface {name} {' '.join(fields)}" )) return elements def extract_fields(self, code: str) -> List[str]: """Extract field names from GraphQL code""" # Simple field extraction (field_name followed by optional type) fields = re.findall(r'\b([a-z_]\w*)\s*(?:\(|:|\{)', code, re.IGNORECASE) # Limit to MAX_FIELDS_PER_ELEMENT unique fields to avoid bloat return list(set(fields))[:MAX_FIELDS_PER_ELEMENT] def extract_parameters(self, code: str) -> List[str]: """Extract parameter names from GraphQL code""" # Extract parameters in format ($paramName: Type) params = re.findall(r'\$(\w+)\s*:', code) return list(set(params)) def parse_all(self) -> Tuple[List[Document], List[CodeBlock], List[GraphQLElement]]: """Parse all markdown files and extract all data""" documents = [] all_code_blocks = [] all_graphql_elements = [] files = self.walk_directory() for file_path in files: try: # Parse document doc = self.parse_file(file_path) documents.append(doc) # Extract code blocks code_blocks = self.extract_code_blocks(doc.content_md, doc.id) all_code_blocks.extend(code_blocks) # Extract GraphQL elements from GraphQL code blocks for block in code_blocks: if block.language == 'graphql': elements = self.extract_graphql_elements(block.code, doc.id) all_graphql_elements.extend(elements) except Exception as e: logger.error(f"Error parsing {file_path}: {e}") continue logger.info(f"Parsed {len(documents)} documents, {len(all_code_blocks)} code blocks, {len(all_graphql_elements)} GraphQL elements") return documents, all_code_blocks, all_graphql_elements

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/florinel-chis/magento-graphql-docs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server