read.py•10.8 kB
"""mcp_read tool implementation."""
from ..copyq_client import client
from ..errors import (
MCPCopyQError,
ErrorCode,
InvalidModeError,
TabNotFoundError,
IndexOutOfBoundsError,
MissingParamError,
)
from ..models import (
ReadMode,
ReadRequest,
Scope,
MCP_ROOT,
ALLOWED_ROOT_TABS,
PREVIEW_LENGTH,
)
async def mcp_read(
mode: str,
tab: str = "",
index: int | None = None,
query: str | None = None,
search_in: str = "all",
scope: str = "mcp",
max_depth: int = 2,
max_items: int = 20,
skip: int = 0,
include_text: bool = True,
include_tags: bool = True,
include_note: bool = False,
) -> str:
"""
Universal read operation for CopyQ MCP.
Modes:
- tree: Get tab structure with metadata
- list: Get items from a specific tab
- item: Get single item fully
- search: Search across tabs
Scope:
- mcp: Only mcp/* tabs (default, full access)
- all: All CopyQ tabs (external tabs are read-only)
- external: Only non-mcp tabs (read-only)
Returns compact pipe-separated format.
"""
try:
# Validate mode
valid_modes = [m.value for m in ReadMode]
if mode not in valid_modes:
raise InvalidModeError(mode, valid_modes)
# Validate scope
valid_scopes = [s.value for s in Scope]
if scope not in valid_scopes:
raise MCPCopyQError(ErrorCode.INVALID_PARAM, f"Invalid scope '{scope}'. Valid: {', '.join(valid_scopes)}")
if mode == "tree":
return await _read_tree(max_depth, max_items, include_text, include_tags, scope)
elif mode == "list":
if not tab:
raise MissingParamError("tab", mode)
return await _read_list(
tab, max_items, skip, include_text, include_tags, include_note, scope
)
elif mode == "item":
if not tab:
raise MissingParamError("tab", mode)
if index is None:
raise MissingParamError("index", mode)
return await _read_item(tab, index, include_text, include_tags, include_note, scope)
elif mode == "search":
if not query:
raise MissingParamError("query", mode)
return await _read_search(
query, search_in, max_items, skip, include_text, include_tags, scope
)
return "error|INVALID_MODE|Unknown mode"
except MCPCopyQError as e:
return e.to_response()
except Exception as e:
return f"error|INTERNAL|{str(e)}"
async def _read_tree(
max_depth: int,
max_items: int,
include_text: bool,
include_tags: bool,
scope: str = "mcp",
) -> str:
"""Build tree structure of tabs."""
# Get tabs based on scope
if scope == "mcp":
tabs = await client.list_tabs()
elif scope == "external":
tabs = await client.list_external_tabs()
else: # "all"
tabs = await client.list_all_tabs()
lines = []
async def format_tab(
path: str,
depth: int,
indent: str = "",
is_external: bool = False
) -> None:
if depth > max_depth:
return
count = await client.get_count(path, external=is_external)
# Clean path for display
if path.startswith(f"{MCP_ROOT}/"):
display_path = path[len(MCP_ROOT) + 1:]
else:
display_path = path
# Mark external tabs
external_marker = " [external]" if is_external else ""
lines.append(f"{indent}{display_path} [{count}]{external_marker}")
# Add preview items
preview_count = min(count, max_items)
for i in range(preview_count):
preview = await client.read_item_preview(path, i, external=is_external)
parts = [f"{indent} {i}:"]
if include_text:
parts.append(f'"{preview["text_preview"]}"')
if include_tags and preview["tags"]:
parts.append(f"[{','.join(preview['tags'])}]")
if preview["has_note"]:
parts.append("+note")
lines.append("|".join(parts) if len(parts) > 1 else parts[0])
# Find child tabs
child_tabs = [t for t in tabs if t.startswith(path + "/") and t.count("/") == path.count("/") + 1]
for child in child_tabs:
child_is_external = not child.startswith(f"{MCP_ROOT}/")
await format_tab(child, depth + 1, indent + " ", child_is_external)
if scope == "mcp":
# Start from mcp root tabs
for root in ALLOWED_ROOT_TABS:
full_path = f"{MCP_ROOT}/{root}"
if full_path in tabs or any(t.startswith(full_path + "/") for t in tabs):
await format_tab(full_path, 1, "", False)
else:
# For all/external scope, show root-level tabs
root_tabs = [t for t in tabs if "/" not in t or (t.count("/") == 1 and t.startswith(f"{MCP_ROOT}/"))]
# Also get direct children of mcp/ for "all" scope
if scope == "all":
# Get unique root tabs (either no "/" or first level of mcp/)
seen_roots = set()
for t in tabs:
if t.startswith(f"{MCP_ROOT}/"):
# For mcp tabs, get first level after mcp/
parts = t.split("/")
if len(parts) >= 2:
seen_roots.add(f"{MCP_ROOT}/{parts[1]}")
else:
# For external tabs, get root
root = t.split("/")[0]
seen_roots.add(root)
root_tabs = sorted(seen_roots)
for tab in root_tabs:
is_external = not tab.startswith(f"{MCP_ROOT}/")
await format_tab(tab, 1, "", is_external)
return "\n".join(lines) if lines else "empty|no tabs found"
async def _read_list(
tab: str,
max_items: int,
skip: int,
include_text: bool,
include_tags: bool,
include_note: bool,
scope: str = "mcp",
) -> str:
"""List items in a tab."""
# Determine if external tab
is_external = scope in ("external", "all") and not tab.startswith(f"{MCP_ROOT}/") and "/" in tab or scope == "external"
# For scope="all" or "external", check if tab looks like external path
if scope in ("all", "external"):
# If tab doesn't match mcp structure, treat as external
all_tabs = await client.list_all_tabs()
if tab in all_tabs:
is_external = True
elif f"{MCP_ROOT}/{tab}" in all_tabs:
is_external = False
else:
# Tab not found, try as external
is_external = scope == "external"
# Check tab exists
if not await client.tab_exists(tab, external=is_external):
raise TabNotFoundError(tab)
count = await client.get_count(tab, external=is_external)
end = min(skip + max_items, count)
# Build header
if is_external:
full_path = tab
header = f"mode:list|tab:{tab}|scope:external|total:{count}|showing:{skip}-{end - 1}"
else:
full_path = f"{MCP_ROOT}/{tab}"
header = f"mode:list|tab:{tab}|full_path:{full_path}|total:{count}|showing:{skip}-{end - 1}"
lines = [header]
for i in range(skip, end):
if include_note:
item = await client.read_item_full(tab, i, external=is_external)
parts = [str(i)]
if include_text:
text_preview = item["text"][:PREVIEW_LENGTH]
if len(item["text"]) > PREVIEW_LENGTH:
text_preview += "..."
parts.append(f'"{text_preview.replace(chr(10), " ").strip()}"')
if include_tags:
parts.append(f"[{','.join(item['tags'])}]" if item["tags"] else "[]")
if item["note"]:
note_preview = item["note"][:50]
if len(item["note"]) > 50:
note_preview += "..."
parts.append(f"note:{note_preview.replace(chr(10), ' ')}")
lines.append("|".join(parts))
else:
preview = await client.read_item_preview(tab, i, external=is_external)
parts = [str(i)]
if include_text:
parts.append(f'"{preview["text_preview"]}"')
if include_tags:
parts.append(f"[{','.join(preview['tags'])}]" if preview["tags"] else "[]")
if preview["has_note"]:
parts.append("+note")
lines.append("|".join(parts))
return "\n".join(lines)
async def _read_item(
tab: str,
index: int,
include_text: bool,
include_tags: bool,
include_note: bool,
scope: str = "mcp",
) -> str:
"""Read single item fully."""
# Determine if external tab (same logic as _read_list)
is_external = False
if scope in ("all", "external"):
all_tabs = await client.list_all_tabs()
if tab in all_tabs:
is_external = True
elif f"{MCP_ROOT}/{tab}" in all_tabs:
is_external = False
else:
is_external = scope == "external"
# Check tab exists
if not await client.tab_exists(tab, external=is_external):
raise TabNotFoundError(tab)
item = await client.read_item_full(tab, index, external=is_external)
# Build header
if is_external:
header = f"mode:item|tab:{tab}|scope:external|index:{index}"
else:
header = f"mode:item|tab:{tab}|full_path:{MCP_ROOT}/{tab}|index:{index}"
lines = [header]
if include_text:
lines.append(f"text:{item['text']}")
if include_tags:
lines.append(f"tags:[{','.join(item['tags'])}]")
if include_note:
lines.append(f"note:{item['note']}")
return "\n".join(lines)
async def _read_search(
query: str,
search_in: str,
max_items: int,
skip: int,
include_text: bool,
include_tags: bool,
scope: str = "mcp",
) -> str:
"""Search across tabs."""
results = await client.search(
query=query,
search_in=search_in,
max_items=max_items,
skip=skip,
scope=scope,
)
if not results:
return f"mode:search|query:{query}|scope:{scope}|results:0"
lines = [f"mode:search|query:{query}|scope:{scope}|results:{len(results)}"]
for r in results:
parts = [f"{r['tab']}[{r['index']}]"]
if include_text:
parts.append(f'"{r["text_preview"]}"')
if include_tags and r["tags"]:
parts.append(f"[{','.join(r['tags'])}]")
parts.append(f"match:{r['match_in']}")
if r.get("external"):
parts.append("external")
lines.append("|".join(parts))
return "\n".join(lines)