Skip to main content
Glama
copyq_client.py14.2 kB
"""CopyQ CLI client wrapper.""" import asyncio import os import re from pathlib import Path from .errors import CopyQError, TabNotFoundError, IndexOutOfBoundsError from .models import MCP_ROOT, PREVIEW_LENGTH # CopyQ executable path COPYQ_PATH = os.environ.get( "COPYQ_PATH", r"C:\Program Files\CopyQ\copyq.exe" ) class CopyQClient: """Async wrapper for CopyQ CLI commands.""" def __init__(self, copyq_path: str = COPYQ_PATH): self.copyq_path = copyq_path async def _run(self, *args: str) -> str: """Execute CopyQ command and return output.""" cmd = [self.copyq_path, *args] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode("utf-8", errors="replace").strip() raise CopyQError(f"CopyQ error: {error_msg}") return stdout.decode("utf-8", errors="replace") except FileNotFoundError: raise CopyQError(f"CopyQ not found at: {self.copyq_path}") def _full_tab(self, tab: str) -> str: """Convert relative tab path to full path with mcp/ prefix.""" if not tab: return MCP_ROOT if tab.startswith(f"{MCP_ROOT}/"): return tab return f"{MCP_ROOT}/{tab}" def _resolve_tab(self, tab: str, external: bool = False) -> str: """Resolve tab path. If external=True, use tab as-is without mcp/ prefix.""" if external: return tab # Use exactly as provided return self._full_tab(tab) # === Tab Operations === async def list_all_tabs(self) -> list[str]: """Get list of ALL tabs in CopyQ.""" output = await self._run("tab") return output.strip().split("\n") if output.strip() else [] async def list_tabs(self) -> list[str]: """Get list of tabs under mcp/ only.""" all_tabs = await self.list_all_tabs() return [t for t in all_tabs if t.startswith(f"{MCP_ROOT}/")] async def list_external_tabs(self) -> list[str]: """Get list of tabs NOT under mcp/.""" all_tabs = await self.list_all_tabs() return [t for t in all_tabs if not t.startswith(f"{MCP_ROOT}/")] def is_external_tab(self, tab: str) -> bool: """Check if tab is external (not under mcp/).""" full_tab = self._full_tab(tab) return not full_tab.startswith(f"{MCP_ROOT}/") async def tab_exists(self, tab: str, external: bool = False) -> bool: """Check if tab exists.""" resolved_tab = self._resolve_tab(tab, external) all_tabs = await self.list_all_tabs() return resolved_tab in all_tabs async def get_count(self, tab: str, external: bool = False) -> int: """Get number of items in tab.""" resolved_tab = self._resolve_tab(tab, external) output = await self._run("tab", resolved_tab, "count") try: return int(output.strip()) except ValueError: return 0 async def create_tab(self, tab: str) -> None: """Create a new tab (happens automatically when accessing).""" full_tab = self._full_tab(tab) # CopyQ creates tab on first access await self._run("tab", full_tab, "count") async def remove_tab(self, tab: str) -> None: """Remove a tab.""" full_tab = self._full_tab(tab) await self._run("removeTab", full_tab) # === Item Read Operations === async def read_text(self, tab: str, index: int, external: bool = False) -> str: """Read item text.""" resolved_tab = self._resolve_tab(tab, external) return await self._run("tab", resolved_tab, "read", str(index)) async def read_tags(self, tab: str, index: int, external: bool = False) -> list[str]: """Read item tags.""" resolved_tab = self._resolve_tab(tab, external) output = await self._run( "tab", resolved_tab, "read", "application/x-copyq-tags", str(index) ) if not output.strip(): return [] return output.strip().split("\n") async def read_note(self, tab: str, index: int, external: bool = False) -> str: """Read item note.""" resolved_tab = self._resolve_tab(tab, external) return await self._run( "tab", resolved_tab, "read", "application/x-copyq-item-notes", str(index) ) async def has_note(self, tab: str, index: int, external: bool = False) -> bool: """Check if item has a note.""" resolved_tab = self._resolve_tab(tab, external) output = await self._run("tab", resolved_tab, "read", "?", str(index)) return "application/x-copyq-item-notes" in output async def read_item_full(self, tab: str, index: int, external: bool = False) -> dict: """Read all item data.""" # Validate index count = await self.get_count(tab, external) if index < 0 or index >= count: raise IndexOutOfBoundsError(index, count) text = await self.read_text(tab, index, external) tags = await self.read_tags(tab, index, external) note = await self.read_note(tab, index, external) return { "index": index, "text": text, "tags": tags, "note": note.strip(), } async def read_item_preview(self, tab: str, index: int, external: bool = False) -> dict: """Read item preview (truncated text, tags, has_note).""" text = await self.read_text(tab, index, external) tags = await self.read_tags(tab, index, external) has_note = await self.has_note(tab, index, external) # Truncate text for preview text_preview = text[:PREVIEW_LENGTH] if len(text) > PREVIEW_LENGTH: text_preview += "..." return { "index": index, "text_preview": text_preview.replace("\n", " ").strip(), "tags": tags, "has_note": has_note, } # === Item Write Operations === async def write_item( self, tab: str, index: int, text: str, tags: list[str] | None = None, note: str | None = None, ) -> None: """Write/update item at index with all fields using 'change' command.""" full_tab = self._full_tab(tab) # Use 'change' to update existing item (not 'write' which inserts!) args = ["tab", full_tab, "change", str(index), "text/plain", text] if tags is not None: args.extend(["application/x-copyq-tags", "\n".join(tags)]) if note is not None: args.extend(["application/x-copyq-item-notes", note]) await self._run(*args) async def add_item( self, tab: str, text: str, tags: list[str] | None = None, note: str | None = None, ) -> int: """Add new item at index 0 (beginning).""" full_tab = self._full_tab(tab) # Use insert to add at position 0 await self._run("tab", full_tab, "insert", "0", text) # If we have tags or note, update the item if tags or note: await self.write_item(tab, 0, text, tags, note) return 0 async def remove_item(self, tab: str, index: int) -> None: """Remove item at index.""" full_tab = self._full_tab(tab) # Validate index count = await self.get_count(tab) if index < 0 or index >= count: raise IndexOutOfBoundsError(index, count) await self._run("tab", full_tab, "remove", str(index)) # === Update Helpers === async def update_text( self, tab: str, index: int, new_text: str, edit_mode: str = "replace", match: str | None = None, ) -> None: """Update item text with various modes.""" # Get current data current = await self.read_item_full(tab, index) if edit_mode == "replace": final_text = new_text elif edit_mode == "append": final_text = current["text"] + new_text elif edit_mode == "prepend": final_text = new_text + current["text"] elif edit_mode == "substitute": if match is None or match not in current["text"]: from .errors import MatchNotFoundError raise MatchNotFoundError(match or "") final_text = current["text"].replace(match, new_text) else: final_text = new_text await self.write_item( tab, index, final_text, current["tags"], current["note"] ) async def update_tags( self, tab: str, index: int, new_tags: list[str], edit_mode: str = "replace", ) -> None: """Update item tags with various modes.""" current = await self.read_item_full(tab, index) if edit_mode == "replace": final_tags = new_tags elif edit_mode == "append": final_tags = list(set(current["tags"] + new_tags)) elif edit_mode == "remove": final_tags = [t for t in current["tags"] if t not in new_tags] else: final_tags = new_tags await self.write_item( tab, index, current["text"], final_tags, current["note"] ) async def update_note( self, tab: str, index: int, new_note: str, edit_mode: str = "replace", match: str | None = None, ) -> None: """Update item note with various modes.""" current = await self.read_item_full(tab, index) if edit_mode == "replace": final_note = new_note elif edit_mode == "append": final_note = current["note"] + new_note elif edit_mode == "prepend": final_note = new_note + current["note"] elif edit_mode == "substitute": if match is None or match not in current["note"]: from .errors import MatchNotFoundError raise MatchNotFoundError(match or "") final_note = current["note"].replace(match, new_note) else: final_note = new_note await self.write_item( tab, index, current["text"], current["tags"], final_note ) # === Move Operation === async def move_item(self, from_tab: str, index: int, to_tab: str) -> int: """Move item from one tab to another. Returns new index.""" # Read full item item = await self.read_item_full(from_tab, index) # Add to destination (at index 0) new_index = await self.add_item( to_tab, item["text"], item["tags"], item["note"] ) # Remove from source await self.remove_item(from_tab, index) return new_index # === Search === async def search( self, query: str, tabs: list[str] | None = None, search_in: str = "all", max_items: int = 20, skip: int = 0, scope: str = "mcp", # "mcp", "all", "external" ) -> list[dict]: """Search for items matching query. scope: "mcp" = only mcp/* tabs, "all" = all tabs, "external" = non-mcp tabs """ if tabs is None: if scope == "mcp": tabs = await self.list_tabs() elif scope == "external": tabs = await self.list_external_tabs() else: # "all" tabs = await self.list_all_tabs() else: # If specific tabs provided, determine if external resolved_tabs = [] for t in tabs: if t.startswith(f"{MCP_ROOT}/") or scope == "mcp": resolved_tabs.append(self._full_tab(t)) else: resolved_tabs.append(t) # External tab, use as-is tabs = resolved_tabs results = [] pattern = re.compile(query, re.IGNORECASE) for tab in tabs: # Determine if this is an external tab is_external = not tab.startswith(f"{MCP_ROOT}/") count = await self.get_count(tab, external=is_external) for i in range(count): match_in = [] if search_in in ("all", "text"): text = await self.read_text(tab, i, external=is_external) if pattern.search(text): match_in.append("text") if search_in in ("all", "tags"): item_tags = await self.read_tags(tab, i, external=is_external) if any(pattern.search(t) for t in item_tags): match_in.append("tags") if search_in in ("all", "note"): note = await self.read_note(tab, i, external=is_external) if pattern.search(note): match_in.append("note") if match_in: preview = await self.read_item_preview(tab, i, external=is_external) # For mcp tabs, remove prefix; for external, keep as-is if tab.startswith(f"{MCP_ROOT}/"): clean_tab = tab[len(MCP_ROOT) + 1:] else: clean_tab = tab results.append({ "tab": clean_tab, "index": i, "text_preview": preview["text_preview"], "tags": preview["tags"], "match_in": ",".join(match_in), "external": is_external, }) if len(results) >= skip + max_items: break if len(results) >= skip + max_items: break return results[skip:skip + max_items] # Global client instance client = CopyQClient()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/list91/mcp-copyq'

If you have feedback or need assistance with the MCP directory API, please join our Discord server