Skip to main content
Glama
read.py10.8 kB
"""mcp_read tool implementation.""" from ..copyq_client import client from ..errors import ( MCPCopyQError, ErrorCode, InvalidModeError, TabNotFoundError, IndexOutOfBoundsError, MissingParamError, ) from ..models import ( ReadMode, ReadRequest, Scope, MCP_ROOT, ALLOWED_ROOT_TABS, PREVIEW_LENGTH, ) async def mcp_read( mode: str, tab: str = "", index: int | None = None, query: str | None = None, search_in: str = "all", scope: str = "mcp", max_depth: int = 2, max_items: int = 20, skip: int = 0, include_text: bool = True, include_tags: bool = True, include_note: bool = False, ) -> str: """ Universal read operation for CopyQ MCP. Modes: - tree: Get tab structure with metadata - list: Get items from a specific tab - item: Get single item fully - search: Search across tabs Scope: - mcp: Only mcp/* tabs (default, full access) - all: All CopyQ tabs (external tabs are read-only) - external: Only non-mcp tabs (read-only) Returns compact pipe-separated format. """ try: # Validate mode valid_modes = [m.value for m in ReadMode] if mode not in valid_modes: raise InvalidModeError(mode, valid_modes) # Validate scope valid_scopes = [s.value for s in Scope] if scope not in valid_scopes: raise MCPCopyQError(ErrorCode.INVALID_PARAM, f"Invalid scope '{scope}'. Valid: {', '.join(valid_scopes)}") if mode == "tree": return await _read_tree(max_depth, max_items, include_text, include_tags, scope) elif mode == "list": if not tab: raise MissingParamError("tab", mode) return await _read_list( tab, max_items, skip, include_text, include_tags, include_note, scope ) elif mode == "item": if not tab: raise MissingParamError("tab", mode) if index is None: raise MissingParamError("index", mode) return await _read_item(tab, index, include_text, include_tags, include_note, scope) elif mode == "search": if not query: raise MissingParamError("query", mode) return await _read_search( query, search_in, max_items, skip, include_text, include_tags, scope ) return "error|INVALID_MODE|Unknown mode" except MCPCopyQError as e: return e.to_response() except Exception as e: return f"error|INTERNAL|{str(e)}" async def _read_tree( max_depth: int, max_items: int, include_text: bool, include_tags: bool, scope: str = "mcp", ) -> str: """Build tree structure of tabs.""" # Get tabs based on scope if scope == "mcp": tabs = await client.list_tabs() elif scope == "external": tabs = await client.list_external_tabs() else: # "all" tabs = await client.list_all_tabs() lines = [] async def format_tab( path: str, depth: int, indent: str = "", is_external: bool = False ) -> None: if depth > max_depth: return count = await client.get_count(path, external=is_external) # Clean path for display if path.startswith(f"{MCP_ROOT}/"): display_path = path[len(MCP_ROOT) + 1:] else: display_path = path # Mark external tabs external_marker = " [external]" if is_external else "" lines.append(f"{indent}{display_path} [{count}]{external_marker}") # Add preview items preview_count = min(count, max_items) for i in range(preview_count): preview = await client.read_item_preview(path, i, external=is_external) parts = [f"{indent} {i}:"] if include_text: parts.append(f'"{preview["text_preview"]}"') if include_tags and preview["tags"]: parts.append(f"[{','.join(preview['tags'])}]") if preview["has_note"]: parts.append("+note") lines.append("|".join(parts) if len(parts) > 1 else parts[0]) # Find child tabs child_tabs = [t for t in tabs if t.startswith(path + "/") and t.count("/") == path.count("/") + 1] for child in child_tabs: child_is_external = not child.startswith(f"{MCP_ROOT}/") await format_tab(child, depth + 1, indent + " ", child_is_external) if scope == "mcp": # Start from mcp root tabs for root in ALLOWED_ROOT_TABS: full_path = f"{MCP_ROOT}/{root}" if full_path in tabs or any(t.startswith(full_path + "/") for t in tabs): await format_tab(full_path, 1, "", False) else: # For all/external scope, show root-level tabs root_tabs = [t for t in tabs if "/" not in t or (t.count("/") == 1 and t.startswith(f"{MCP_ROOT}/"))] # Also get direct children of mcp/ for "all" scope if scope == "all": # Get unique root tabs (either no "/" or first level of mcp/) seen_roots = set() for t in tabs: if t.startswith(f"{MCP_ROOT}/"): # For mcp tabs, get first level after mcp/ parts = t.split("/") if len(parts) >= 2: seen_roots.add(f"{MCP_ROOT}/{parts[1]}") else: # For external tabs, get root root = t.split("/")[0] seen_roots.add(root) root_tabs = sorted(seen_roots) for tab in root_tabs: is_external = not tab.startswith(f"{MCP_ROOT}/") await format_tab(tab, 1, "", is_external) return "\n".join(lines) if lines else "empty|no tabs found" async def _read_list( tab: str, max_items: int, skip: int, include_text: bool, include_tags: bool, include_note: bool, scope: str = "mcp", ) -> str: """List items in a tab.""" # Determine if external tab is_external = scope in ("external", "all") and not tab.startswith(f"{MCP_ROOT}/") and "/" in tab or scope == "external" # For scope="all" or "external", check if tab looks like external path if scope in ("all", "external"): # If tab doesn't match mcp structure, treat as external all_tabs = await client.list_all_tabs() if tab in all_tabs: is_external = True elif f"{MCP_ROOT}/{tab}" in all_tabs: is_external = False else: # Tab not found, try as external is_external = scope == "external" # Check tab exists if not await client.tab_exists(tab, external=is_external): raise TabNotFoundError(tab) count = await client.get_count(tab, external=is_external) end = min(skip + max_items, count) # Build header if is_external: full_path = tab header = f"mode:list|tab:{tab}|scope:external|total:{count}|showing:{skip}-{end - 1}" else: full_path = f"{MCP_ROOT}/{tab}" header = f"mode:list|tab:{tab}|full_path:{full_path}|total:{count}|showing:{skip}-{end - 1}" lines = [header] for i in range(skip, end): if include_note: item = await client.read_item_full(tab, i, external=is_external) parts = [str(i)] if include_text: text_preview = item["text"][:PREVIEW_LENGTH] if len(item["text"]) > PREVIEW_LENGTH: text_preview += "..." parts.append(f'"{text_preview.replace(chr(10), " ").strip()}"') if include_tags: parts.append(f"[{','.join(item['tags'])}]" if item["tags"] else "[]") if item["note"]: note_preview = item["note"][:50] if len(item["note"]) > 50: note_preview += "..." parts.append(f"note:{note_preview.replace(chr(10), ' ')}") lines.append("|".join(parts)) else: preview = await client.read_item_preview(tab, i, external=is_external) parts = [str(i)] if include_text: parts.append(f'"{preview["text_preview"]}"') if include_tags: parts.append(f"[{','.join(preview['tags'])}]" if preview["tags"] else "[]") if preview["has_note"]: parts.append("+note") lines.append("|".join(parts)) return "\n".join(lines) async def _read_item( tab: str, index: int, include_text: bool, include_tags: bool, include_note: bool, scope: str = "mcp", ) -> str: """Read single item fully.""" # Determine if external tab (same logic as _read_list) is_external = False if scope in ("all", "external"): all_tabs = await client.list_all_tabs() if tab in all_tabs: is_external = True elif f"{MCP_ROOT}/{tab}" in all_tabs: is_external = False else: is_external = scope == "external" # Check tab exists if not await client.tab_exists(tab, external=is_external): raise TabNotFoundError(tab) item = await client.read_item_full(tab, index, external=is_external) # Build header if is_external: header = f"mode:item|tab:{tab}|scope:external|index:{index}" else: header = f"mode:item|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{index}" lines = [header] if include_text: lines.append(f"text:{item['text']}") if include_tags: lines.append(f"tags:[{','.join(item['tags'])}]") if include_note: lines.append(f"note:{item['note']}") return "\n".join(lines) async def _read_search( query: str, search_in: str, max_items: int, skip: int, include_text: bool, include_tags: bool, scope: str = "mcp", ) -> str: """Search across tabs.""" results = await client.search( query=query, search_in=search_in, max_items=max_items, skip=skip, scope=scope, ) if not results: return f"mode:search|query:{query}|scope:{scope}|results:0" lines = [f"mode:search|query:{query}|scope:{scope}|results:{len(results)}"] for r in results: parts = [f"{r['tab']}[{r['index']}]"] if include_text: parts.append(f'"{r["text_preview"]}"') if include_tags and r["tags"]: parts.append(f"[{','.join(r['tags'])}]") parts.append(f"match:{r['match_in']}") if r.get("external"): parts.append("external") lines.append("|".join(parts)) return "\n".join(lines)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/list91/mcp-copyq'

If you have feedback or need assistance with the MCP directory API, please join our Discord server