Skip to main content
Glama

doc-lib-mcp

by shifusen329
chunkers.py17.2 kB
import re import ast from typing import List, Dict, Any import json import pypandoc # Use pypandoc import subprocess # Needed if pypandoc fails or for direct calls # Helper function to extract text from Pandoc AST inline elements def stringify_inline(inlines): result = [] for inline in inlines: if inline['t'] == 'Str': result.append(inline['c']) elif inline['t'] == 'Code': result.append(f"`{inline['c'][1]}`") elif inline['t'] == 'Emph': result.append(f"*{stringify_inline(inline['c'])}*") elif inline['t'] == 'Strong': result.append(f"**{stringify_inline(inline['c'])}**") elif inline['t'] == 'Strikeout': result.append(f"~~{stringify_inline(inline['c'])}~~") elif inline['t'] == 'Link': link_text = stringify_inline(inline['c'][1]) url = inline['c'][2][0] title = inline['c'][2][1] title_str = f' "{title}"' if title else "" result.append(f"[{link_text}]({url}{title_str})") elif inline['t'] == 'Image': alt_text = stringify_inline(inline['c'][1]) url = inline['c'][2][0] title = inline['c'][2][1] title_str = f' "{title}"' if title else "" result.append(f"![{alt_text}]({url}{title_str})") elif inline['t'] == 'Space': result.append(' ') elif inline['t'] == 'SoftBreak': result.append('\n') elif inline['t'] == 'LineBreak': result.append(' \n') # Add other inline types as needed (RawInline, Math, etc.) return "".join(result) # Heading and Code Block based chunk_markdown def chunk_markdown(text: str, source: str) -> List[Dict[str, Any]]: """ Split markdown text into chunks based on headings (H1, H2, H3) and fenced code blocks. Includes content before the first heading/code block. """ chunks = [] lines = text.splitlines() current_markdown_chunk_lines = [] current_heading = None current_heading_level = None chunk_idx = 0 # Regex to find headings (H1, H2, H3) heading_regex = re.compile(r"^(#+)\s+(.*)$") # Regex for fenced code block start, capturing language fenced_code_start_regex = re.compile(r"^```+(\S*)$") # Regex for fenced code block end fenced_code_end_regex = re.compile(r"^```+\s*$") in_fenced_code_block = False fenced_code_lines = [] fenced_code_lang = None def add_current_markdown_chunk(): nonlocal chunks, current_markdown_chunk_lines, current_heading, current_heading_level, chunk_idx if current_markdown_chunk_lines: content = "\n".join(current_markdown_chunk_lines).strip() if content: metadata = {} location = f"chunk:{chunk_idx}" if current_heading: metadata['heading'] = current_heading metadata['level'] = current_heading_level location = f"heading:{current_heading_level}:{current_heading}" chunks.append({ "content": content, "type": "markdown", "source": source, "location": location, "metadata": metadata }) chunk_idx += 1 current_markdown_chunk_lines = [] current_heading = None # Reset heading after adding chunk current_heading_level = None # Reset level after adding chunk for line in lines: # Check for fenced code block start fenced_code_start_match = fenced_code_start_regex.match(line) if fenced_code_start_match and not in_fenced_code_block: add_current_markdown_chunk() # Add any preceding text as a markdown chunk in_fenced_code_block = True fenced_code_lines.append(line) fenced_code_lang = fenced_code_start_match.group(1) if fenced_code_start_match.group(1) else None continue # Check for fenced code block end fenced_code_end_match = fenced_code_end_regex.match(line) if fenced_code_end_match and in_fenced_code_block: fenced_code_lines.append(line) in_fenced_code_block = False # Add the code block as a separate chunk # Ensure we have at least a start and end fence before processing if len(fenced_code_lines) >= 2: # Include the fences in the chunk content for clarity full_code_content = "\n".join(fenced_code_lines).strip() if full_code_content: metadata = {"code_tag": True} if fenced_code_lang: metadata["language"] = fenced_code_lang chunks.append({ "content": full_code_content, "type": "code", "source": source, "location": f"code:{chunk_idx}", "metadata": metadata }) chunk_idx += 1 fenced_code_lines = [] fenced_code_lang = None continue # If in a fenced code block, just append the line if in_fenced_code_block: fenced_code_lines.append(line) continue # Check for headings if not in a code block heading_match = heading_regex.match(line) if heading_match: level = len(heading_match.group(1)) if level <= 3: # Only chunk by H1, H2, H3 add_current_markdown_chunk() # Add content before this heading as a markdown chunk current_heading = heading_match.group(2).strip() current_heading_level = level current_markdown_chunk_lines.append(line) # Include the heading line in the chunk continue # If not in a code block and not a heading, add to current markdown chunk lines current_markdown_chunk_lines.append(line) # Add the last markdown chunk if any lines remain and we are not in a code block if not in_fenced_code_block: add_current_markdown_chunk() # If we end while still in a code block, the last lines are part of an unclosed block - handle as markdown? elif fenced_code_lines: chunks.append({ "content": "\n".join(fenced_code_lines).strip(), "type": "markdown", # Treat as markdown if block is unclosed "source": source, "location": f"unclosed_code_block:{chunk_idx}", "metadata": {"code_tag": True, "unclosed": True} }) chunk_idx += 1 # Fallback if no chunks were created (e.g., empty file or only H4+ headings/malformed content) if not chunks and text.strip(): chunks.append({ "content": text, "type": "markdown", "source": source, "location": "full_fallback", "metadata": {"reason": "No chunks generated by heading/code logic"} }) return chunks # --- UNCHANGED FUNCTIONS BELOW --- def chunk_python(code: str, source: str) -> List[Dict[str, Any]]: """ Chunk Python code into functions, classes, and optionally their docstrings. Each chunk includes content, type, source, location, and metadata. """ chunks = [] try: tree = ast.parse(code) except Exception: return [{ "content": code, "type": "python", "source": source, "location": "full", "metadata": {} }] for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): name = node.name docstring = ast.get_docstring(node) start_line = node.lineno end_line = getattr(node, 'end_lineno', start_line) lines = code.splitlines() func_code = "\n".join(lines[start_line-1:end_line]) chunk = { "content": func_code, "type": "python_function", "source": source, "location": f"function:{name}", "metadata": {"name": name, "start_line": start_line, "end_line": end_line} } chunks.append(chunk) if docstring: chunks.append({ "content": docstring, "type": "python_docstring", "source": source, "location": f"function_docstring:{name}", "metadata": {"name": name, "start_line": start_line, "end_line": end_line} }) elif isinstance(node, ast.ClassDef): name = node.name docstring = ast.get_docstring(node) start_line = node.lineno end_line = getattr(node, 'end_lineno', start_line) lines = code.splitlines() class_code = "\n".join(lines[start_line-1:end_line]) chunk = { "content": class_code, "type": "python_class", "source": source, "location": f"class:{name}", "metadata": {"name": name, "start_line": start_line, "end_line": end_line} } chunks.append(chunk) if docstring: chunks.append({ "content": docstring, "type": "python_docstring", "source": source, "location": f"class_docstring:{name}", "metadata": {"name": name, "start_line": start_line, "end_line": end_line} }) if not chunks: chunks.append({ "content": code, "type": "python", "source": source, "location": "full", "metadata": {} }) return chunks def chunk_openapi(openapi_json: Dict[str, Any], source: str) -> List[Dict[str, Any]]: """ Chunk OpenAPI JSON into endpoints (paths/operations) and schema descriptions. Each chunk includes content, type, source, location, and metadata. """ chunks = [] paths = openapi_json.get("paths", {}) for path, ops in paths.items(): for method, op in ops.items(): summary = op.get("summary", "") description = op.get("description", "") content = f"Path: {path}\nMethod: {method.upper()}\nSummary: {summary}\nDescription: {description}" chunk = { "content": content.strip(), "type": "openapi_operation", "source": source, "location": f"path:{path}:{method}", "metadata": { "path": path, "method": method, "summary": summary, "operationId": op.get("operationId", "") } } chunks.append(chunk) components = openapi_json.get("components", {}) schemas = components.get("schemas", {}) for name, schema in schemas.items(): description = schema.get("description", "") content = f"Schema: {name}\nDescription: {description}" chunk = { "content": content.strip(), "type": "openapi_schema", "source": source, "location": f"schema:{name}", "metadata": { "schema": name, "description": description } } chunks.append(chunk) if not chunks: chunks.append({ "content": str(openapi_json), "type": "openapi", "source": source, "location": "full", "metadata": {} }) return chunks def chunk_html(text: str, source: str) -> List[Dict[str, Any]]: """ Chunk cleaned HTML text into sections by heading or paragraph, and extract code snippets as dedicated chunks. Each chunk includes content, type, source, location, and metadata. """ from bs4 import BeautifulSoup soup = BeautifulSoup(text, "lxml") for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): tag.decompose() for comment in soup.find_all(string=lambda text: isinstance(text, type(soup.Comment))): comment.extract() main = soup.find("main") content_root = main if main else soup.body if soup.body else soup chunks = [] # --- Extract code blocks from <div class="nextra-code"> wrappers (Nextra/modern docs) --- nextra_code_blocks = [] for code_div in content_root.find_all("div", class_=lambda c: c and "nextra-code" in c): pre = code_div.find("pre") code = pre.get_text(separator="\n", strip=True) if pre else None lang = None filename = None # Try to extract language from <code> or <pre> class if pre and pre.has_attr("class"): for cls in pre["class"]: if cls.startswith("language-"): lang = cls.replace("language-", "") # Look for filename in a child or sibling div filename_div = code_div.find("div", class_=lambda c: c and ("filename" in c or "file" in c)) if filename_div: filename = filename_div.get_text(strip=True) if code: nextra_code_blocks.append((code_div, code, lang, filename)) # Remove code block from soup so it's not included in narrative chunks code_div.decompose() for idx, (tag, code, lang, filename) in enumerate(nextra_code_blocks): meta = {"code_tag": True, "framework": "nextra"} if lang: meta["language"] = lang if filename: meta["filename"] = filename chunks.append({ "content": code, "type": "code", "source": source, "location": f"nextra-code:{idx+1}", "metadata": meta }) # --- Extract block code snippets (<pre>) as dedicated chunks --- # Find all <pre> tags. Inline <code> tags will remain part of the narrative chunks. pre_blocks = [] for pre in content_root.find_all("pre"): code = pre.get_text(separator="\n", strip=True) # Try to extract language from class (e.g., class="language-python") lang = None filename = None if pre.has_attr("class"): for cls in pre["class"]: if cls.startswith("language-"): lang = cls.replace("language-", "") # Look for a filename label in a preceding sibling or parent label = None parent = pre.parent if parent and parent.name == "div" and parent.has_attr("class"): for cls in parent["class"]: if "filename" in cls or "file" in cls: label = parent.get_text(strip=True) # Some doc frameworks use a <div> with a filename label just before the <pre> prev = pre.find_previous_sibling() if prev and prev.name == "div" and ("filename" in prev.get("class", []) or "file" in prev.get("class", [])): filename = prev.get_text(strip=True) if code: pre_blocks.append((pre, code, lang, filename or label)) # Remove code block from soup so it's not included in narrative chunks pre.decompose() for idx, (tag, code, lang, filename) in enumerate(pre_blocks): meta = {"code_tag": True} if lang: meta["language"] = lang if filename: meta["filename"] = filename chunks.append({ "content": code, "type": "code", "source": source, "location": f"code:{idx+1}", "metadata": meta }) # --- Chunk the rest of the content as before --- headings = content_root.find_all(re.compile("^h[1-3]$")) if headings: for i, heading in enumerate(headings): start = heading end = headings[i + 1] if i + 1 < len(headings) else None content = [] for sib in start.next_siblings: if sib == end: break if getattr(sib, "get_text", None): content.append(sib.get_text(separator=" ", strip=True)) chunk_text = " ".join(content).strip() heading_text = heading.get_text(separator=" ", strip=True) if chunk_text: chunks.append({ "content": chunk_text, "type": "html", "source": source, "location": f"heading:{heading_text}", "metadata": {"heading": heading_text} }) else: paragraphs = [p.get_text(separator=" ", strip=True) for p in content_root.find_all("p") if p.get_text(strip=True)] for idx, para in enumerate(paragraphs): chunks.append({ "content": para, "type": "html", "source": source, "location": f"paragraph:{idx+1}", "metadata": {} }) return chunks

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shifusen329/doc-lib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server