copyq_client.py•14.2 kB
"""CopyQ CLI client wrapper."""
import asyncio
import os
import re
from pathlib import Path
from .errors import CopyQError, TabNotFoundError, IndexOutOfBoundsError
from .models import MCP_ROOT, PREVIEW_LENGTH
# CopyQ executable path
COPYQ_PATH = os.environ.get(
"COPYQ_PATH",
r"C:\Program Files\CopyQ\copyq.exe"
)
class CopyQClient:
"""Async wrapper for CopyQ CLI commands."""
def __init__(self, copyq_path: str = COPYQ_PATH):
self.copyq_path = copyq_path
async def _run(self, *args: str) -> str:
"""Execute CopyQ command and return output."""
cmd = [self.copyq_path, *args]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode("utf-8", errors="replace").strip()
raise CopyQError(f"CopyQ error: {error_msg}")
return stdout.decode("utf-8", errors="replace")
except FileNotFoundError:
raise CopyQError(f"CopyQ not found at: {self.copyq_path}")
def _full_tab(self, tab: str) -> str:
"""Convert relative tab path to full path with mcp/ prefix."""
if not tab:
return MCP_ROOT
if tab.startswith(f"{MCP_ROOT}/"):
return tab
return f"{MCP_ROOT}/{tab}"
def _resolve_tab(self, tab: str, external: bool = False) -> str:
"""Resolve tab path. If external=True, use tab as-is without mcp/ prefix."""
if external:
return tab # Use exactly as provided
return self._full_tab(tab)
# === Tab Operations ===
async def list_all_tabs(self) -> list[str]:
"""Get list of ALL tabs in CopyQ."""
output = await self._run("tab")
return output.strip().split("\n") if output.strip() else []
async def list_tabs(self) -> list[str]:
"""Get list of tabs under mcp/ only."""
all_tabs = await self.list_all_tabs()
return [t for t in all_tabs if t.startswith(f"{MCP_ROOT}/")]
async def list_external_tabs(self) -> list[str]:
"""Get list of tabs NOT under mcp/."""
all_tabs = await self.list_all_tabs()
return [t for t in all_tabs if not t.startswith(f"{MCP_ROOT}/")]
def is_external_tab(self, tab: str) -> bool:
"""Check if tab is external (not under mcp/)."""
full_tab = self._full_tab(tab)
return not full_tab.startswith(f"{MCP_ROOT}/")
async def tab_exists(self, tab: str, external: bool = False) -> bool:
"""Check if tab exists."""
resolved_tab = self._resolve_tab(tab, external)
all_tabs = await self.list_all_tabs()
return resolved_tab in all_tabs
async def get_count(self, tab: str, external: bool = False) -> int:
"""Get number of items in tab."""
resolved_tab = self._resolve_tab(tab, external)
output = await self._run("tab", resolved_tab, "count")
try:
return int(output.strip())
except ValueError:
return 0
async def create_tab(self, tab: str) -> None:
"""Create a new tab (happens automatically when accessing)."""
full_tab = self._full_tab(tab)
# CopyQ creates tab on first access
await self._run("tab", full_tab, "count")
async def remove_tab(self, tab: str) -> None:
"""Remove a tab."""
full_tab = self._full_tab(tab)
await self._run("removeTab", full_tab)
# === Item Read Operations ===
async def read_text(self, tab: str, index: int, external: bool = False) -> str:
"""Read item text."""
resolved_tab = self._resolve_tab(tab, external)
return await self._run("tab", resolved_tab, "read", str(index))
async def read_tags(self, tab: str, index: int, external: bool = False) -> list[str]:
"""Read item tags."""
resolved_tab = self._resolve_tab(tab, external)
output = await self._run(
"tab", resolved_tab, "read",
"application/x-copyq-tags", str(index)
)
if not output.strip():
return []
return output.strip().split("\n")
async def read_note(self, tab: str, index: int, external: bool = False) -> str:
"""Read item note."""
resolved_tab = self._resolve_tab(tab, external)
return await self._run(
"tab", resolved_tab, "read",
"application/x-copyq-item-notes", str(index)
)
async def has_note(self, tab: str, index: int, external: bool = False) -> bool:
"""Check if item has a note."""
resolved_tab = self._resolve_tab(tab, external)
output = await self._run("tab", resolved_tab, "read", "?", str(index))
return "application/x-copyq-item-notes" in output
async def read_item_full(self, tab: str, index: int, external: bool = False) -> dict:
"""Read all item data."""
# Validate index
count = await self.get_count(tab, external)
if index < 0 or index >= count:
raise IndexOutOfBoundsError(index, count)
text = await self.read_text(tab, index, external)
tags = await self.read_tags(tab, index, external)
note = await self.read_note(tab, index, external)
return {
"index": index,
"text": text,
"tags": tags,
"note": note.strip(),
}
async def read_item_preview(self, tab: str, index: int, external: bool = False) -> dict:
"""Read item preview (truncated text, tags, has_note)."""
text = await self.read_text(tab, index, external)
tags = await self.read_tags(tab, index, external)
has_note = await self.has_note(tab, index, external)
# Truncate text for preview
text_preview = text[:PREVIEW_LENGTH]
if len(text) > PREVIEW_LENGTH:
text_preview += "..."
return {
"index": index,
"text_preview": text_preview.replace("\n", " ").strip(),
"tags": tags,
"has_note": has_note,
}
# === Item Write Operations ===
async def write_item(
self,
tab: str,
index: int,
text: str,
tags: list[str] | None = None,
note: str | None = None,
) -> None:
"""Write/update item at index with all fields using 'change' command."""
full_tab = self._full_tab(tab)
# Use 'change' to update existing item (not 'write' which inserts!)
args = ["tab", full_tab, "change", str(index), "text/plain", text]
if tags is not None:
args.extend(["application/x-copyq-tags", "\n".join(tags)])
if note is not None:
args.extend(["application/x-copyq-item-notes", note])
await self._run(*args)
async def add_item(
self,
tab: str,
text: str,
tags: list[str] | None = None,
note: str | None = None,
) -> int:
"""Add new item at index 0 (beginning)."""
full_tab = self._full_tab(tab)
# Use insert to add at position 0
await self._run("tab", full_tab, "insert", "0", text)
# If we have tags or note, update the item
if tags or note:
await self.write_item(tab, 0, text, tags, note)
return 0
async def remove_item(self, tab: str, index: int) -> None:
"""Remove item at index."""
full_tab = self._full_tab(tab)
# Validate index
count = await self.get_count(tab)
if index < 0 or index >= count:
raise IndexOutOfBoundsError(index, count)
await self._run("tab", full_tab, "remove", str(index))
# === Update Helpers ===
async def update_text(
self,
tab: str,
index: int,
new_text: str,
edit_mode: str = "replace",
match: str | None = None,
) -> None:
"""Update item text with various modes."""
# Get current data
current = await self.read_item_full(tab, index)
if edit_mode == "replace":
final_text = new_text
elif edit_mode == "append":
final_text = current["text"] + new_text
elif edit_mode == "prepend":
final_text = new_text + current["text"]
elif edit_mode == "substitute":
if match is None or match not in current["text"]:
from .errors import MatchNotFoundError
raise MatchNotFoundError(match or "")
final_text = current["text"].replace(match, new_text)
else:
final_text = new_text
await self.write_item(
tab, index, final_text,
current["tags"], current["note"]
)
async def update_tags(
self,
tab: str,
index: int,
new_tags: list[str],
edit_mode: str = "replace",
) -> None:
"""Update item tags with various modes."""
current = await self.read_item_full(tab, index)
if edit_mode == "replace":
final_tags = new_tags
elif edit_mode == "append":
final_tags = list(set(current["tags"] + new_tags))
elif edit_mode == "remove":
final_tags = [t for t in current["tags"] if t not in new_tags]
else:
final_tags = new_tags
await self.write_item(
tab, index, current["text"],
final_tags, current["note"]
)
async def update_note(
self,
tab: str,
index: int,
new_note: str,
edit_mode: str = "replace",
match: str | None = None,
) -> None:
"""Update item note with various modes."""
current = await self.read_item_full(tab, index)
if edit_mode == "replace":
final_note = new_note
elif edit_mode == "append":
final_note = current["note"] + new_note
elif edit_mode == "prepend":
final_note = new_note + current["note"]
elif edit_mode == "substitute":
if match is None or match not in current["note"]:
from .errors import MatchNotFoundError
raise MatchNotFoundError(match or "")
final_note = current["note"].replace(match, new_note)
else:
final_note = new_note
await self.write_item(
tab, index, current["text"],
current["tags"], final_note
)
# === Move Operation ===
async def move_item(self, from_tab: str, index: int, to_tab: str) -> int:
"""Move item from one tab to another. Returns new index."""
# Read full item
item = await self.read_item_full(from_tab, index)
# Add to destination (at index 0)
new_index = await self.add_item(
to_tab,
item["text"],
item["tags"],
item["note"]
)
# Remove from source
await self.remove_item(from_tab, index)
return new_index
# === Search ===
async def search(
self,
query: str,
tabs: list[str] | None = None,
search_in: str = "all",
max_items: int = 20,
skip: int = 0,
scope: str = "mcp", # "mcp", "all", "external"
) -> list[dict]:
"""Search for items matching query.
scope: "mcp" = only mcp/* tabs, "all" = all tabs, "external" = non-mcp tabs
"""
if tabs is None:
if scope == "mcp":
tabs = await self.list_tabs()
elif scope == "external":
tabs = await self.list_external_tabs()
else: # "all"
tabs = await self.list_all_tabs()
else:
# If specific tabs provided, determine if external
resolved_tabs = []
for t in tabs:
if t.startswith(f"{MCP_ROOT}/") or scope == "mcp":
resolved_tabs.append(self._full_tab(t))
else:
resolved_tabs.append(t) # External tab, use as-is
tabs = resolved_tabs
results = []
pattern = re.compile(query, re.IGNORECASE)
for tab in tabs:
# Determine if this is an external tab
is_external = not tab.startswith(f"{MCP_ROOT}/")
count = await self.get_count(tab, external=is_external)
for i in range(count):
match_in = []
if search_in in ("all", "text"):
text = await self.read_text(tab, i, external=is_external)
if pattern.search(text):
match_in.append("text")
if search_in in ("all", "tags"):
item_tags = await self.read_tags(tab, i, external=is_external)
if any(pattern.search(t) for t in item_tags):
match_in.append("tags")
if search_in in ("all", "note"):
note = await self.read_note(tab, i, external=is_external)
if pattern.search(note):
match_in.append("note")
if match_in:
preview = await self.read_item_preview(tab, i, external=is_external)
# For mcp tabs, remove prefix; for external, keep as-is
if tab.startswith(f"{MCP_ROOT}/"):
clean_tab = tab[len(MCP_ROOT) + 1:]
else:
clean_tab = tab
results.append({
"tab": clean_tab,
"index": i,
"text_preview": preview["text_preview"],
"tags": preview["tags"],
"match_in": ",".join(match_in),
"external": is_external,
})
if len(results) >= skip + max_items:
break
if len(results) >= skip + max_items:
break
return results[skip:skip + max_items]
# Global client instance
client = CopyQClient()