from typing import Optional
from html.parser import HTMLParser
import httpx
import subprocess
import json
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import Context
from pydantic import BaseModel, Field
mcp = FastMCP("Dash Documentation API")
async def check_api_health(ctx: Context, port: int) -> bool:
"""Check if the Dash API server is responding at the given port."""
base_url = f"http://127.0.0.1:{port}"
try:
with httpx.Client(timeout=5.0) as client:
response = client.get(f"{base_url}/health")
response.raise_for_status()
await ctx.debug(f"Successfully connected to Dash API at {base_url}")
return True
except Exception as e:
await ctx.debug(f"Health check failed for {base_url}: {e}")
return False
async def working_api_base_url(ctx: Context) -> Optional[str]:
dash_running = await ensure_dash_running(ctx)
if not dash_running:
return None
port = await get_dash_api_port(ctx)
if port is None:
# Try to automatically enable the Dash API Server
await ctx.info(
"The Dash API Server is not enabled. Attempting to enable it automatically..."
)
try:
subprocess.run(
[
"defaults",
"write",
"com.kapeli.dashdoc",
"DHAPIServerEnabled",
"YES",
],
check=True,
timeout=10,
)
subprocess.run(
[
"defaults",
"write",
"com.kapeli.dash-setapp",
"DHAPIServerEnabled",
"YES",
],
check=True,
timeout=10,
)
# Wait a moment for Dash to pick up the change
import time
time.sleep(2)
# Try to get the port again
port = await get_dash_api_port(ctx)
if port is None:
await ctx.error(
"Failed to enable Dash API Server automatically. Please enable it manually in Dash Settings > Integration"
)
return None
else:
await ctx.info("Successfully enabled Dash API Server")
except Exception as e:
await ctx.error(
"Failed to enable Dash API Server automatically. Please enable it manually in Dash Settings > Integration"
)
return None
return f"http://127.0.0.1:{port}"
async def get_dash_api_port(ctx: Context) -> Optional[int]:
"""Get the Dash API port from the status.json file and verify the API server is responding."""
status_file = (
Path.home()
/ "Library"
/ "Application Support"
/ "Dash"
/ ".dash_api_server"
/ "status.json"
)
try:
with open(status_file, "r") as f:
status_data = json.load(f)
port = status_data.get("port")
if port is None:
return None
# Check if the API server is actually responding
if await check_api_health(ctx, port):
return port
else:
return None
except (FileNotFoundError, json.JSONDecodeError, KeyError):
return None
def check_dash_running() -> bool:
"""Check if Dash app is running by looking for the process."""
try:
# Use pgrep to check for Dash process
result = subprocess.run(["pgrep", "-f", "Dash"], capture_output=True, timeout=5)
return result.returncode == 0
except Exception:
return False
async def ensure_dash_running(ctx: Context) -> bool:
"""Ensure Dash is running, launching it if necessary."""
if not check_dash_running():
await ctx.info("Dash is not running. Launching Dash...")
try:
# Launch Dash using the bundle identifier
result = subprocess.run(
["open", "-g", "-j", "-b", "com.kapeli.dashdoc"], timeout=10
)
if result.returncode != 0:
# Try Setapp bundle identifier
subprocess.run(
["open", "-g", "-j", "-b", "com.kapeli.dash-setapp"],
check=True,
timeout=10,
)
# Wait a moment for Dash to start
import time
time.sleep(4)
# Check again if Dash is now running
if not check_dash_running():
await ctx.error("Failed to launch Dash application")
return False
else:
await ctx.info("Dash launched successfully")
return True
except subprocess.CalledProcessError:
await ctx.error("Failed to launch Dash application")
return False
except Exception as e:
await ctx.error(f"Error launching Dash: {e}")
return False
else:
return True
class DocsetResult(BaseModel):
"""Information about a docset."""
name: str = Field(description="Display name of the docset")
identifier: str = Field(description="Unique identifier")
platform: str = Field(description="Platform/type of the docset")
full_text_search: str = Field(
description="Full-text search status: 'not supported', 'disabled', 'indexing', or 'enabled'"
)
notice: Optional[str] = Field(
description="Optional notice about the docset status", default=None
)
class DocsetResults(BaseModel):
"""Result from listing docsets."""
docsets: list[DocsetResult] = Field(
description="List of installed docsets", default_factory=list
)
error: Optional[str] = Field(
description="Error message if there was an issue", default=None
)
class SearchResult(BaseModel):
"""A search result from documentation."""
name: str = Field(description="Name of the documentation entry")
type: str = Field(description="Type of result (Function, Class, etc.)")
platform: Optional[str] = Field(description="Platform of the result", default=None)
load_url: str = Field(description="URL to load the documentation")
docset: Optional[str] = Field(description="Name of the docset", default=None)
description: Optional[str] = Field(
description="Additional description", default=None
)
language: Optional[str] = Field(
description="Programming language (snippet results only)", default=None
)
tags: Optional[str] = Field(description="Tags (snippet results only)", default=None)
class SearchResults(BaseModel):
"""Result from searching documentation."""
results: list[SearchResult] = Field(
description="List of search results", default_factory=list
)
error: Optional[str] = Field(
description="Error message if there was an issue", default=None
)
class DocumentationPage(BaseModel):
"""Documentation page content."""
content: str = Field(description="The documentation page content")
load_url: str = Field(description="The URL that was loaded")
error: Optional[str] = Field(
description="Error message if there was an issue", default=None
)
class _HTMLToTextParser(HTMLParser):
"""Converts HTML to plain text with markdown-style links."""
_block_tags = {
"p",
"br",
"div",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"li",
"tr",
"blockquote",
"pre",
"hr",
"section",
"article",
"header",
"footer",
"nav",
"dt",
"dd",
}
_skip_tags = {"script", "style"}
def __init__(self):
super().__init__()
self._parts: list[str] = []
self._skip_depth = 0
self._link_href: Optional[str] = None
self._link_text_parts: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]):
tag = tag.lower()
if tag in self._skip_tags:
self._skip_depth += 1
return
if self._skip_depth > 0:
return
if tag in self._block_tags:
self._parts.append("\n")
if tag == "a":
attr_dict = dict(attrs)
self._link_href = attr_dict.get("href")
self._link_text_parts = []
def handle_endtag(self, tag: str):
tag = tag.lower()
if tag in self._skip_tags:
self._skip_depth = max(0, self._skip_depth - 1)
return
if self._skip_depth > 0:
return
if tag == "a" and self._link_href is not None:
link_text = "".join(self._link_text_parts).strip()
if link_text:
self._parts.append(f"[{link_text}]({self._link_href})")
else:
self._parts.append(self._link_href)
self._link_href = None
self._link_text_parts = []
def handle_data(self, data: str):
if self._skip_depth > 0:
return
if self._link_href is not None:
self._link_text_parts.append(data)
else:
self._parts.append(data)
def get_text(self) -> str:
import re
text = "".join(self._parts)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def html_to_text(html: str) -> str:
"""Convert HTML to plain text with markdown-style links."""
parser = _HTMLToTextParser()
parser.feed(html)
return parser.get_text()
def estimate_tokens(obj) -> int:
"""Estimate token count for a serialized object. Rough approximation: 1 token ≈ 4 characters."""
if isinstance(obj, str):
return max(1, len(obj) // 4)
elif isinstance(obj, (list, tuple)):
return sum(estimate_tokens(item) for item in obj)
elif isinstance(obj, dict):
return sum(estimate_tokens(k) + estimate_tokens(v) for k, v in obj.items())
elif hasattr(obj, "model_dump"): # Pydantic model
return estimate_tokens(obj.model_dump())
else:
return max(1, len(str(obj)) // 4)
@mcp.tool()
async def list_installed_docsets(ctx: Context) -> DocsetResults:
"""List all installed documentation sets in Dash. An empty list is returned if the user has no docsets installed.
Results are automatically truncated if they would exceed 25,000 tokens."""
try:
base_url = await working_api_base_url(ctx)
if base_url is None:
return DocsetResults(
error="Failed to connect to Dash API Server. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration)."
)
await ctx.debug("Fetching installed docsets from Dash API")
with httpx.Client(timeout=30.0) as client:
response = client.get(f"{base_url}/docsets/list")
response.raise_for_status()
result = response.json()
docsets = result.get("docsets", [])
await ctx.info(f"Found {len(docsets)} installed docsets")
# Build result list with token limit checking
token_limit = 25000
current_tokens = 100 # Base overhead for response structure
limited_docsets = []
for docset in docsets:
docset_info = DocsetResult(
name=docset["name"],
identifier=docset["identifier"],
platform=docset["platform"],
full_text_search=docset["full_text_search"],
notice=docset.get("notice"),
)
# Estimate tokens for this docset
docset_tokens = estimate_tokens(docset_info)
if current_tokens + docset_tokens > token_limit:
await ctx.warning(
f"Token limit reached. Returning {len(limited_docsets)} of {len(docsets)} docsets to stay under 25k token limit."
)
break
limited_docsets.append(docset_info)
current_tokens += docset_tokens
if len(limited_docsets) < len(docsets):
await ctx.info(
f"Returned {len(limited_docsets)} docsets (truncated from {len(docsets)} due to token limit)"
)
return DocsetResults(docsets=limited_docsets)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
await ctx.warning("No docsets found. Install some in Settings > Downloads.")
return DocsetResults(
error="No docsets found. Instruct the user to install some docsets in Settings > Downloads."
)
return DocsetResults(error=f"HTTP error: {e}")
except Exception as e:
await ctx.error(f"Failed to get installed docsets: {e}")
return DocsetResults(error=f"Failed to get installed docsets: {e}")
@mcp.tool()
async def search_documentation(
ctx: Context,
query: str,
docset_identifiers: str,
search_snippets: bool = True,
max_results: int = 100,
) -> SearchResults:
"""
Search for documentation across docset identifiers and snippets.
Args:
query: The search query string
docset_identifiers: Comma-separated list of docset identifiers to search in (from list_installed_docsets)
search_snippets: Whether to include snippets in search results
max_results: Maximum number of results to return (1-1000)
Results are automatically truncated if they would exceed 25,000 tokens.
"""
if not query.strip():
await ctx.error("Query cannot be empty")
return SearchResults(error="Query cannot be empty")
if not docset_identifiers.strip():
await ctx.error(
"docset_identifiers cannot be empty. Get the docset identifiers using list_installed_docsets"
)
return SearchResults(
error="docset_identifiers cannot be empty. Get the docset identifiers using list_installed_docsets"
)
if max_results < 1 or max_results > 1000:
await ctx.error("max_results must be between 1 and 1000")
return SearchResults(error="max_results must be between 1 and 1000")
try:
base_url = await working_api_base_url(ctx)
if base_url is None:
return SearchResults(
error="Failed to connect to Dash API Server. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration)."
)
params = {
"query": query,
"docset_identifiers": docset_identifiers,
"search_snippets": search_snippets,
"max_results": max_results,
}
await ctx.debug(f"Searching Dash API with query: '{query}'")
with httpx.Client(timeout=30.0) as client:
response = client.get(f"{base_url}/search", params=params)
response.raise_for_status()
result = response.json()
# Check for warning message in response
warning_message = None
if "message" in result:
warning_message = result["message"]
await ctx.warning(warning_message)
results = result.get("results", [])
# Filter out empty dict entries (Dash API returns [{}] for no results)
results = [r for r in results if r]
if not results and " " in query:
return SearchResults(
results=[], error="Nothing found. Try to search for fewer terms."
)
await ctx.info(f"Found {len(results)} results")
# Build result list with token limit checking
token_limit = 25000
current_tokens = 100 # Base overhead for response structure
limited_results = []
for item in results:
search_result = SearchResult(
name=item["name"],
type=item["type"],
platform=item.get("platform"),
load_url=item["load_url"],
docset=item.get("docset"),
description=item.get("description"),
language=item.get("language"),
tags=item.get("tags"),
)
# Estimate tokens for this result
result_tokens = estimate_tokens(search_result)
if current_tokens + result_tokens > token_limit:
await ctx.warning(
f"Token limit reached. Returning {len(limited_results)} of {len(results)} results to stay under 25k token limit."
)
break
limited_results.append(search_result)
current_tokens += result_tokens
if len(limited_results) < len(results):
await ctx.info(
f"Returned {len(limited_results)} results (truncated from {len(results)} due to token limit)"
)
return SearchResults(results=limited_results, error=warning_message)
except httpx.HTTPStatusError as e:
if e.response.status_code == 400:
error_text = e.response.text
if "Docset with identifier" in error_text and "not found" in error_text:
await ctx.error(
"Invalid docset identifier. Run list_installed_docsets to see available docsets."
)
return SearchResults(
error="Invalid docset identifier. Run list_installed_docsets to see available docsets, then use the exact identifier from that list."
)
elif "No docsets found" in error_text:
await ctx.error("No valid docsets found for search.")
return SearchResults(
error="No valid docsets found for search. Either provide valid docset identifiers from list_installed_docsets, or set search_snippets=true to search snippets only."
)
else:
await ctx.error(f"Bad request: {error_text}")
return SearchResults(
error=f"Bad request: {error_text}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration)."
)
elif e.response.status_code == 403:
error_text = e.response.text
if "API access blocked due to Dash trial expiration" in error_text:
await ctx.error(
"Dash trial expired. Purchase Dash to continue using the API."
)
return SearchResults(
error="Your Dash trial has expired. Purchase Dash at https://kapeli.com/dash to continue using the API. During trial expiration, API access is blocked."
)
else:
await ctx.error(f"Forbidden: {error_text}")
return SearchResults(
error=f"Forbidden: {error_text}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration)."
)
await ctx.error(f"HTTP error: {e}")
return SearchResults(
error=f"HTTP error: {e}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration)."
)
except Exception as e:
await ctx.error(f"Search failed: {e}")
return SearchResults(
error=f"Search failed: {e}. Please ensure Dash is running and the API server is enabled (in Dash Settings > Integration)."
)
@mcp.tool()
async def enable_docset_fts(ctx: Context, identifier: str) -> bool:
"""
Enable full-text search for a specific docset.
Args:
identifier: The docset identifier (from list_installed_docsets)
Returns:
True if FTS was successfully enabled, False otherwise
"""
if not identifier.strip():
await ctx.error("Docset identifier cannot be empty")
return False
try:
base_url = await working_api_base_url(ctx)
if base_url is None:
return False
await ctx.debug(f"Enabling FTS for docset: {identifier}")
with httpx.Client(timeout=30.0) as client:
response = client.get(
f"{base_url}/docsets/enable_fts", params={"identifier": identifier}
)
response.raise_for_status()
result = response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 400:
await ctx.error(f"Bad request: {e.response.text}")
return False
elif e.response.status_code == 404:
await ctx.error(f"Docset not found: {identifier}")
return False
await ctx.error(f"HTTP error: {e}")
return False
except Exception as e:
await ctx.error(f"Failed to enable FTS: {e}")
return False
return True
@mcp.tool()
async def load_documentation_page(ctx: Context, load_url: str) -> DocumentationPage:
"""
Load a documentation page from a load_url returned by search_documentation.
Args:
load_url: The load_url value from a search result (must point to the local Dash API at 127.0.0.1)
Returns:
The documentation page content as plain text with markdown-style links
"""
if not load_url.startswith("http://127.0.0.1"):
await ctx.error(
"Invalid URL: load_url must point to the local Dash API (http://127.0.0.1)"
)
return DocumentationPage(
content="",
load_url=load_url,
error="Invalid URL: load_url must point to the local Dash API (http://127.0.0.1). Only URLs returned by search_documentation are supported.",
)
try:
await ctx.debug(f"Loading documentation page: {load_url}")
with httpx.Client(timeout=30.0) as client:
response = client.get(load_url)
response.raise_for_status()
content = html_to_text(response.text)
await ctx.info(
f"Successfully loaded documentation page ({len(content)} characters)"
)
return DocumentationPage(content=content, load_url=load_url)
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
error_text = e.response.text
if "API access blocked due to Dash trial expiration" in error_text:
await ctx.error(
"Dash trial expired. Purchase Dash to continue using the API."
)
return DocumentationPage(
content="",
load_url=load_url,
error="Your Dash trial has expired. Purchase Dash at https://kapeli.com/dash to continue using the API.",
)
await ctx.error(f"Forbidden: {error_text}")
return DocumentationPage(
content="", load_url=load_url, error=f"Forbidden: {error_text}"
)
elif e.response.status_code == 404:
await ctx.error("Documentation page not found.")
return DocumentationPage(
content="", load_url=load_url, error="Documentation page not found."
)
await ctx.error(f"HTTP error: {e}")
return DocumentationPage(
content="", load_url=load_url, error=f"HTTP error: {e}"
)
except Exception as e:
await ctx.error(f"Failed to load documentation page: {e}")
return DocumentationPage(
content="",
load_url=load_url,
error=f"Failed to load documentation page: {e}",
)
def main():
mcp.run()
if __name__ == "__main__":
main()