Skip to main content
Glama
server.py17.2 kB
from typing import Optional import httpx import subprocess import json from pathlib import Path from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import Context from pydantic import BaseModel, Field mcp = FastMCP("Dash Documentation API") async def check_api_health(ctx: Context, port: int) -> bool: """Check if the Dash API server is responding at the given port.""" base_url = f"http://127.0.0.1:{port}" try: with httpx.Client(timeout=5.0) as client: response = client.get(f"{base_url}/health") response.raise_for_status() await ctx.debug(f"Successfully connected to Dash API at {base_url}") return True except Exception as e: await ctx.debug(f"Health check failed for {base_url}: {e}") return False async def working_api_base_url(ctx: Context) -> Optional[str]: dash_running = await ensure_dash_running(ctx) if not dash_running: return None port = await get_dash_api_port(ctx) if port is None: # Try to automatically enable the Dash API Server await ctx.info("The Dash API Server is not enabled. Attempting to enable it automatically...") try: subprocess.run( ["defaults", "write", "com.kapeli.dashdoc", "DHAPIServerEnabled", "YES"], check=True, timeout=10 ) subprocess.run( ["defaults", "write", "com.kapeli.dash-setapp", "DHAPIServerEnabled", "YES"], check=True, timeout=10 ) # Wait a moment for Dash to pick up the change import time time.sleep(2) # Try to get the port again port = await get_dash_api_port(ctx) if port is None: await ctx.error("Failed to enable Dash API Server automatically. Please enable it manually in Dash Settings > Integration") return None else: await ctx.info("Successfully enabled Dash API Server") except Exception as e: await ctx.error("Failed to enable Dash API Server automatically. Please enable it manually in Dash Settings > Integration") return None return f"http://127.0.0.1:{port}" async def get_dash_api_port(ctx: Context) -> Optional[int]: """Get the Dash API port from the status.json file and verify the API server is responding.""" status_file = Path.home() / "Library" / "Application Support" / "Dash" / ".dash_api_server" / "status.json" try: with open(status_file, 'r') as f: status_data = json.load(f) port = status_data.get('port') if port is None: return None # Check if the API server is actually responding if await check_api_health(ctx, port): return port else: return None except (FileNotFoundError, json.JSONDecodeError, KeyError): return None def check_dash_running() -> bool: """Check if Dash app is running by looking for the process.""" try: # Use pgrep to check for Dash process result = subprocess.run( ["pgrep", "-f", "Dash"], capture_output=True, timeout=5 ) return result.returncode == 0 except Exception: return False async def ensure_dash_running(ctx: Context) -> bool: """Ensure Dash is running, launching it if necessary.""" if not check_dash_running(): await ctx.info("Dash is not running. Launching Dash...") try: # Launch Dash using the bundle identifier result = subprocess.run( ["open", "-g", "-j", "-b", "com.kapeli.dashdoc"], timeout=10 ) if result.returncode != 0: # Try Setapp bundle identifier subprocess.run( ["open", "-g", "-j", "-b", "com.kapeli.dash-setapp"], check=True, timeout=10 ) # Wait a moment for Dash to start import time time.sleep(4) # Check again if Dash is now running if not check_dash_running(): await ctx.error("Failed to launch Dash application") return False else: await ctx.info("Dash launched successfully") return True except subprocess.CalledProcessError: await ctx.error("Failed to launch Dash application") return False except Exception as e: await ctx.error(f"Error launching Dash: {e}") return False else: return True class DocsetResult(BaseModel): """Information about a docset.""" name: str = Field(description="Display name of the docset") identifier: str = Field(description="Unique identifier") platform: str = Field(description="Platform/type of the docset") full_text_search: str = Field(description="Full-text search status: 'not supported', 'disabled', 'indexing', or 'enabled'") notice: Optional[str] = Field(description="Optional notice about the docset status", default=None) class DocsetResults(BaseModel): """Result from listing docsets.""" docsets: list[DocsetResult] = Field(description="List of installed docsets", default_factory=list) error: Optional[str] = Field(description="Error message if there was an issue", default=None) class SearchResult(BaseModel): """A search result from documentation.""" name: str = Field(description="Name of the documentation entry") type: str = Field(description="Type of result (Function, Class, etc.)") platform: Optional[str] = Field(description="Platform of the result", default=None) load_url: str = Field(description="URL to load the documentation") docset: Optional[str] = Field(description="Name of the docset", default=None) description: Optional[str] = Field(description="Additional description", default=None) language: Optional[str] = Field(description="Programming language (snippet results only)", default=None) tags: Optional[str] = Field(description="Tags (snippet results only)", default=None) class SearchResults(BaseModel): """Result from searching documentation.""" results: list[SearchResult] = Field(description="List of search results", default_factory=list) error: Optional[str] = Field(description="Error message if there was an issue", default=None) def estimate_tokens(obj) -> int: """Estimate token count for a serialized object. Rough approximation: 1 token ≈ 4 characters.""" if isinstance(obj, str): return max(1, len(obj) // 4) elif isinstance(obj, (list, tuple)): return sum(estimate_tokens(item) for item in obj) elif isinstance(obj, dict): return sum(estimate_tokens(k) + estimate_tokens(v) for k, v in obj.items()) elif hasattr(obj, 'model_dump'): # Pydantic model return estimate_tokens(obj.model_dump()) else: return max(1, len(str(obj)) // 4) @mcp.tool() async def list_installed_docsets(ctx: Context) -> DocsetResults: """List all installed documentation sets in Dash. An empty list is returned if the user has no docsets installed. Results are automatically truncated if they would exceed 25,000 tokens.""" try: base_url = await working_api_base_url(ctx) if base_url is None: return DocsetResults(error="Failed to connect to Dash API Server. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration).") await ctx.debug("Fetching installed docsets from Dash API") with httpx.Client(timeout=30.0) as client: response = client.get(f"{base_url}/docsets/list") response.raise_for_status() result = response.json() docsets = result.get("docsets", []) await ctx.info(f"Found {len(docsets)} installed docsets") # Build result list with token limit checking token_limit = 25000 current_tokens = 100 # Base overhead for response structure limited_docsets = [] for docset in docsets: docset_info = DocsetResult( name=docset["name"], identifier=docset["identifier"], platform=docset["platform"], full_text_search=docset["full_text_search"], notice=docset.get("notice") ) # Estimate tokens for this docset docset_tokens = estimate_tokens(docset_info) if current_tokens + docset_tokens > token_limit: await ctx.warning(f"Token limit reached. Returning {len(limited_docsets)} of {len(docsets)} docsets to stay under 25k token limit.") break limited_docsets.append(docset_info) current_tokens += docset_tokens if len(limited_docsets) < len(docsets): await ctx.info(f"Returned {len(limited_docsets)} docsets (truncated from {len(docsets)} due to token limit)") return DocsetResults(docsets=limited_docsets) except httpx.HTTPStatusError as e: if e.response.status_code == 404: await ctx.warning("No docsets found. Install some in Settings > Downloads.") return DocsetResults(error="No docsets found. Instruct the user to install some docsets in Settings > Downloads.") return DocsetResults(error=f"HTTP error: {e}") except Exception as e: await ctx.error(f"Failed to get installed docsets: {e}") return DocsetResults(error=f"Failed to get installed docsets: {e}") @mcp.tool() async def search_documentation( ctx: Context, query: str, docset_identifiers: str, search_snippets: bool = True, max_results: int = 100, ) -> SearchResults: """ Search for documentation across docset identifiers and snippets. Args: query: The search query string docset_identifiers: Comma-separated list of docset identifiers to search in (from list_installed_docsets) search_snippets: Whether to include snippets in search results max_results: Maximum number of results to return (1-1000) Results are automatically truncated if they would exceed 25,000 tokens. """ if not query.strip(): await ctx.error("Query cannot be empty") return SearchResults(error="Query cannot be empty") if not docset_identifiers.strip(): await ctx.error("docset_identifiers cannot be empty. Get the docset identifiers using list_installed_docsets") return SearchResults(error="docset_identifiers cannot be empty. Get the docset identifiers using list_installed_docsets") if max_results < 1 or max_results > 1000: await ctx.error("max_results must be between 1 and 1000") return SearchResults(error="max_results must be between 1 and 1000") try: base_url = await working_api_base_url(ctx) if base_url is None: return SearchResults(error="Failed to connect to Dash API Server. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration).") params = { "query": query, "docset_identifiers": docset_identifiers, "search_snippets": search_snippets, "max_results": max_results, } await ctx.debug(f"Searching Dash API with query: '{query}'") with httpx.Client(timeout=30.0) as client: response = client.get(f"{base_url}/search", params=params) response.raise_for_status() result = response.json() # Check for warning message in response warning_message = None if "message" in result: warning_message = result["message"] await ctx.warning(warning_message) results = result.get("results", []) await ctx.info(f"Found {len(results)} results") # Build result list with token limit checking token_limit = 25000 current_tokens = 100 # Base overhead for response structure limited_results = [] for item in results: search_result = SearchResult( name=item["name"], type=item["type"], platform=item.get("platform"), load_url=item["load_url"], docset=item.get("docset"), description=item.get("description"), language=item.get("language"), tags=item.get("tags") ) # Estimate tokens for this result result_tokens = estimate_tokens(search_result) if current_tokens + result_tokens > token_limit: await ctx.warning(f"Token limit reached. Returning {len(limited_results)} of {len(results)} results to stay under 25k token limit.") break limited_results.append(search_result) current_tokens += result_tokens if len(limited_results) < len(results): await ctx.info(f"Returned {len(limited_results)} results (truncated from {len(results)} due to token limit)") return SearchResults(results=limited_results, error=warning_message) except httpx.HTTPStatusError as e: if e.response.status_code == 400: error_text = e.response.text if "Docset with identifier" in error_text and "not found" in error_text: await ctx.error("Invalid docset identifier. Run list_installed_docsets to see available docsets.") return SearchResults(error="Invalid docset identifier. Run list_installed_docsets to see available docsets, then use the exact identifier from that list.") elif "No docsets found" in error_text: await ctx.error("No valid docsets found for search.") return SearchResults(error="No valid docsets found for search. Either provide valid docset identifiers from list_installed_docsets, or set search_snippets=true to search snippets only.") else: await ctx.error(f"Bad request: {error_text}") return SearchResults(error=f"Bad request: {error_text}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration).") elif e.response.status_code == 403: error_text = e.response.text if "API access blocked due to Dash trial expiration" in error_text: await ctx.error("Dash trial expired. Purchase Dash to continue using the API.") return SearchResults(error="Your Dash trial has expired. Purchase Dash at https://kapeli.com/dash to continue using the API. During trial expiration, API access is blocked.") else: await ctx.error(f"Forbidden: {error_text}") return SearchResults(error=f"Forbidden: {error_text}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration).") await ctx.error(f"HTTP error: {e}") return SearchResults(error=f"HTTP error: {e}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration).") except Exception as e: await ctx.error(f"Search failed: {e}") return SearchResults(error=f"Search failed: {e}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration).") @mcp.tool() async def enable_docset_fts(ctx: Context, identifier: str) -> bool: """ Enable full-text search for a specific docset. Args: identifier: The docset identifier (from list_installed_docsets) Returns: True if FTS was successfully enabled, False otherwise """ if not identifier.strip(): await ctx.error("Docset identifier cannot be empty") return False try: base_url = await working_api_base_url(ctx) if base_url is None: return False await ctx.debug(f"Enabling FTS for docset: {identifier}") with httpx.Client(timeout=30.0) as client: response = client.get(f"{base_url}/docsets/enable_fts", params={"identifier": identifier}) response.raise_for_status() result = response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 400: await ctx.error(f"Bad request: {e.response.text}") return False elif e.response.status_code == 404: await ctx.error(f"Docset not found: {identifier}") return False await ctx.error(f"HTTP error: {e}") return False except Exception as e: await ctx.error(f"Failed to enable FTS: {e}") return False return True def main(): mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kapeli/dash-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server