Skip to main content
Glama

Docs-MCP

by herring101
url_import.py14.6 kB
#!/usr/bin/env python3 """ URLからドキュメントを高速に取得してMarkdownに変換するスクリプト """ import argparse import asyncio import os import re import unicodedata from concurrent.futures import ThreadPoolExecutor from pathlib import Path from urllib.parse import unquote, urljoin, urlparse, urlunparse import aiohttp from bs4 import BeautifulSoup from markdownify import markdownify as md from tqdm import tqdm class URLImporter: def __init__( self, output_dir: str | None = None, max_depth: int = 2, include_patterns: list[str] | None = None, exclude_patterns: list[str] | None = None, concurrent_downloads: int = 10, timeout: int = 30, rate_limit: float = 0.1, ): self.output_dir = output_dir self.max_depth = max_depth self.include_patterns = [re.compile(p) for p in (include_patterns or [])] self.exclude_patterns = [re.compile(p) for p in (exclude_patterns or [])] self.concurrent_downloads = concurrent_downloads self.timeout = aiohttp.ClientTimeout(total=timeout) self.rate_limit = rate_limit self.visited_urls: set[str] = set() self.session: aiohttp.ClientSession | None = None self.semaphore: asyncio.Semaphore | None = None self.progress_bar: tqdm | None = None async def __aenter__(self): """非同期コンテキストマネージャーのエントリー""" connector = aiohttp.TCPConnector( limit=self.concurrent_downloads * 2, limit_per_host=self.concurrent_downloads, ) self.session = aiohttp.ClientSession(connector=connector, timeout=self.timeout) self.semaphore = asyncio.Semaphore(self.concurrent_downloads) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """非同期コンテキストマネージャーのエグジット""" if self.session: await self.session.close() async def fetch_page(self, url: str) -> tuple[str | None, list[str]]: """ページを非同期で取得してMarkdownに変換""" if self.semaphore is None: raise ValueError("Semaphore not initialized") async with self.semaphore: try: if self.session is None: raise ValueError("Session not initialized") async with self.session.get(url) as response: response.raise_for_status() html_content = await response.text() # BeautifulSoupでリンクを抽出(CPUバウンドタスク) soup = BeautifulSoup(html_content, "html.parser") links = [] for link in soup.find_all("a", href=True): # BeautifulSoupのTagオブジェクトのみ処理 href = link.get("href", "") # type: ignore if href: absolute_url = urljoin(url, str(href)) links.append(absolute_url) # HTMLをMarkdownに変換 markdown = self.html_to_markdown(html_content) if self.progress_bar is not None: self.progress_bar.update(1) return markdown, links except Exception as e: print(f"\nError fetching {url}: {e}") if self.progress_bar is not None: self.progress_bar.update(1) return None, [] def html_to_markdown(self, html: str) -> str: """HTMLをMarkdownに変換""" return md( html, heading_style="ATX", bullets="*", code_language="", strip=["script", "style", "meta", "link"], ).strip() def sanitize_filename(self, filename: str) -> str: """ファイル名をファイルシステムで安全な形式に変換""" # URLデコード filename = unquote(filename) # Windowsで使えない文字を置換 invalid_chars = '<>:"|?*' for char in invalid_chars: filename = filename.replace(char, "_") # 制御文字を削除 filename = "".join( char for char in filename if not unicodedata.category(char).startswith("C") ) # 先頭・末尾の空白とピリオドを削除 filename = filename.strip(" .") # 空になった場合はデフォルト名 if not filename: filename = "untitled" return filename def url_to_filepath(self, url: str) -> str: """URLをファイルパスに変換""" parsed = urlparse(url) path = parsed.path.strip("/") # 空またはディレクトリ(末尾スラッシュ)の場合 if not path: path = "index.md" elif parsed.path.endswith("/"): # パスの各部分をサニタイズ parts = [self.sanitize_filename(part) for part in path.split("/") if part] parts.append("index.md") path = os.path.join(*parts) else: # パスの各部分をサニタイズ parts = [self.sanitize_filename(part) for part in path.split("/") if part] if parts: # 最後の部分に拡張子がない場合は.mdを追加 if not parts[-1].endswith(".md"): parts[-1] += ".md" path = os.path.join(*parts) else: path = "index.md" # docs/ディレクトリ内に保存 # DOCS_BASE_DIRが設定されていればそれを使用、なければ現在のディレクトリ docs_base_dir = os.getenv("DOCS_BASE_DIR", os.getcwd()) base_dir = Path(docs_base_dir) docs_dir = base_dir / "docs" if self.output_dir is None: return str(docs_dir / path) return str(docs_dir / self.output_dir / path) def filter_links(self, links: list[str], base_url: str) -> list[str]: """リンクをフィルタリング""" base_domain = urlparse(base_url).netloc filtered = [] for link in links: parsed = urlparse(link) # 同じドメインのみ if parsed.netloc != base_domain: continue # パターンマッチング path = parsed.path # exclude_patternsにマッチしたら除外 if any(p.search(path) for p in self.exclude_patterns): continue # include_patternsが指定されている場合、いずれかにマッチする必要がある if self.include_patterns and not any( p.search(path) for p in self.include_patterns ): continue filtered.append(link) return filtered def normalize_url(self, url: str) -> str: """URLを正規化(末尾スラッシュやフラグメントを削除)""" parsed = urlparse(url) # フラグメントを削除し、末尾スラッシュを統一 normalized = urlunparse( ( parsed.scheme, parsed.netloc, parsed.path.rstrip("/") or "/", parsed.params, parsed.query, "", ) ) return normalized async def crawl_level( self, urls: list[tuple[str, int]], start_url: str ) -> tuple[dict[str, tuple[str, int]], list[tuple[str, int]]]: """同じ深さのURLを並列でクロール""" tasks = [] task_urls = [] results = {} for url, depth in urls: if url in self.visited_urls or depth > self.max_depth: continue self.visited_urls.add(url) task = self.fetch_page(url) tasks.append(task) task_urls.append((url, depth)) if not tasks: return {}, [] # 並列でページを取得 fetched_data = await asyncio.gather(*tasks) # 結果を処理 new_urls = [] for i, (url, depth) in enumerate(task_urls): content, links = fetched_data[i] if content: results[url] = (content, depth) # 次のレベルのリンクを収集 if depth < self.max_depth: filtered_links = self.filter_links(links, start_url) for link in filtered_links: normalized_link = self.normalize_url(link) if normalized_link not in self.visited_urls: new_urls.append((normalized_link, depth + 1)) # レート制限 if self.rate_limit > 0: await asyncio.sleep(self.rate_limit) return results, new_urls async def crawl(self, start_url: str) -> dict[str, str]: """指定された深さまで非同期でクロール""" pages = {} queue = [(self.normalize_url(start_url), 0)] # デフォルトの出力先をドメイン名に設定 if self.output_dir is None: parsed_url = urlparse(start_url) self.output_dir = parsed_url.netloc.replace( ":", "_" ) # ポート番号の:を_に置換 # 全URLを収集して進捗バーを初期化 print(f"Starting import from: {start_url}") # 実際の出力ディレクトリを表示 # DOCS_BASE_DIRが設定されていればそれを使用、なければ現在のディレクトリ docs_base_dir = os.getenv("DOCS_BASE_DIR", os.getcwd()) base_dir = Path(docs_base_dir) docs_dir = base_dir / "docs" if self.output_dir is None: actual_output_dir = docs_dir else: actual_output_dir = docs_dir / self.output_dir print(f"Output directory: {actual_output_dir}") print(f"Max depth: {self.max_depth}") print(f"Concurrent downloads: {self.concurrent_downloads}") with tqdm(desc="Downloading pages", unit="pages", leave=True) as pbar: self.progress_bar = pbar while queue: # 同じ深さのURLをグループ化 current_level_urls = [] next_level_urls = [] for url, depth in queue: if depth == queue[0][1]: current_level_urls.append((url, depth)) else: next_level_urls.append((url, depth)) # 現在のレベルを並列処理 results, new_urls = await self.crawl_level( current_level_urls, start_url ) # 結果を保存 for url, (content, _) in results.items(): if content: pages[url] = content # 次のレベルのURLをキューに追加 queue = next_level_urls + new_urls self.progress_bar = None return pages def save_page(self, url: str, content: str): """ページを保存""" filepath = self.url_to_filepath(url) os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, "w", encoding="utf-8") as f: f.write(content) async def import_from_url(self, url: str): """URLからドキュメントを高速にインポート""" pages = await self.crawl(url) print(f"\nFound {len(pages)} pages") # ページを並列で保存 with ( ThreadPoolExecutor(max_workers=10) as executor, tqdm(total=len(pages), desc="Saving files", unit="files") as pbar, ): futures = [] for page_url, content in pages.items(): future = executor.submit(self.save_page, page_url, content) futures.append(future) for future in futures: future.result() pbar.update(1) # 実際の出力ディレクトリを表示 # DOCS_BASE_DIRが設定されていればそれを使用、なければ現在のディレクトリ docs_base_dir = os.getenv("DOCS_BASE_DIR", os.getcwd()) base_dir = Path(docs_base_dir) docs_dir = base_dir / "docs" if self.output_dir is None: actual_output_dir = docs_dir else: actual_output_dir = docs_dir / self.output_dir print(f"\nImport completed! {len(pages)} pages saved to {actual_output_dir}") async def main(): parser = argparse.ArgumentParser( description="URLからドキュメントを高速に取得してMarkdownに変換" ) parser.add_argument("url", help="インポート元のURL") parser.add_argument( "--output-dir", "-o", default=None, help="出力先ディレクトリ (default: ドメイン名)", ) parser.add_argument( "--depth", "-d", type=int, default=2, help="クロールの深さ (default: 2)" ) parser.add_argument( "--include-pattern", "-i", action="append", dest="include_patterns", help="含めるURLパターン(正規表現)", ) parser.add_argument( "--exclude-pattern", "-e", action="append", dest="exclude_patterns", help="除外するURLパターン(正規表現)", ) parser.add_argument( "--concurrent", "-c", type=int, default=10, help="同時ダウンロード数 (default: 10)", ) parser.add_argument( "--timeout", type=int, default=30, help="タイムアウト(秒) (default: 30)" ) parser.add_argument( "--rate-limit", type=float, default=0.1, help="レート制限(秒) (default: 0.1)" ) args = parser.parse_args() async with URLImporter( output_dir=args.output_dir, max_depth=args.depth, include_patterns=args.include_patterns, exclude_patterns=args.exclude_patterns, concurrent_downloads=args.concurrent, timeout=args.timeout, rate_limit=args.rate_limit, ) as importer: await importer.import_from_url(args.url) def cli(): """CLI entry point for PyPI installation.""" asyncio.run(main()) if __name__ == "__main__": cli()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/herring101/docs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server