validate.py•8.24 kB
"""mcp_validate tool implementation."""
from ..copyq_client import client
from ..models import (
ReadMode,
WriteMode,
EditMode,
FieldType,
SearchIn,
Intent,
ALLOWED_ROOT_TABS,
TABS_WITH_SUBTABS,
)
async def mcp_validate(
tool: str,
params: dict,
) -> str:
"""
Validate parameters for mcp_read or mcp_write without executing.
Returns validation result with errors and warnings.
"""
errors: list[str] = []
warnings: list[str] = []
if tool == "read":
errors, warnings = await _validate_read(params)
elif tool == "write":
errors, warnings = await _validate_write(params)
else:
errors.append(f"INVALID_TOOL: Unknown tool '{tool}'. Valid: read, write")
if errors:
return f"valid:false|errors:[{'; '.join(errors)}]"
warning_str = f"|warnings:[{'; '.join(warnings)}]" if warnings else ""
estimated_tokens = _estimate_tokens(tool, params)
return f"valid:true{warning_str}|estimated_tokens:{estimated_tokens}"
async def _validate_read(params: dict) -> tuple[list[str], list[str]]:
"""Validate mcp_read parameters."""
errors: list[str] = []
warnings: list[str] = []
mode = params.get("mode")
tab = params.get("tab", "")
index = params.get("index")
query = params.get("query")
search_in = params.get("search_in", "all")
max_depth = params.get("max_depth", 2)
max_items = params.get("max_items", 20)
skip = params.get("skip", 0)
# Validate mode
valid_modes = [m.value for m in ReadMode]
if not mode:
errors.append("MISSING_PARAM: mode is required")
elif mode not in valid_modes:
errors.append(f"INVALID_MODE: '{mode}' not in {valid_modes}")
# Mode-specific validation
if mode == "list" or mode == "item":
if not tab:
errors.append(f"MISSING_PARAM: tab is required for mode={mode}")
elif not await client.tab_exists(tab):
errors.append(f"TAB_NOT_FOUND: {tab}")
if mode == "item":
if index is None:
errors.append("MISSING_PARAM: index is required for mode=item")
elif tab and await client.tab_exists(tab):
count = await client.get_count(tab)
if index < 0 or index >= count:
errors.append(f"INDEX_OUT_OF_BOUNDS: {index} (count: {count})")
if mode == "search":
if not query:
errors.append("MISSING_PARAM: query is required for mode=search")
if search_in not in [s.value for s in SearchIn]:
errors.append(f"INVALID_PARAM: search_in '{search_in}' invalid")
# Validate numeric params
if max_depth < 1 or max_depth > 10:
warnings.append(f"max_depth={max_depth} clamped to 1-10")
if max_items < 1 or max_items > 100:
warnings.append(f"max_items={max_items} clamped to 1-100")
if skip < 0:
errors.append("INVALID_PARAM: skip must be >= 0")
return errors, warnings
async def _validate_write(params: dict) -> tuple[list[str], list[str]]:
"""Validate mcp_write parameters."""
errors: list[str] = []
warnings: list[str] = []
mode = params.get("mode")
tab = params.get("tab")
index = params.get("index")
text = params.get("text")
tags = params.get("tags")
note = params.get("note")
field = params.get("field")
edit_mode = params.get("edit_mode", "replace")
match = params.get("match")
to_tab = params.get("to_tab")
path = params.get("path")
intent = params.get("intent", "execute")
# Validate mode
valid_modes = [m.value for m in WriteMode]
if not mode:
errors.append("MISSING_PARAM: mode is required")
elif mode not in valid_modes:
errors.append(f"INVALID_MODE: '{mode}' not in {valid_modes}")
return errors, warnings
# Validate intent
if intent not in [i.value for i in Intent]:
errors.append(f"INVALID_PARAM: intent '{intent}' invalid")
# Mode-specific validation
if mode == "add":
if not tab:
errors.append("MISSING_PARAM: tab required for mode=add")
if not text:
errors.append("MISSING_PARAM: text required for mode=add")
if tab:
root = tab.split("/")[0]
if root not in ALLOWED_ROOT_TABS:
errors.append(f"TAB_NOT_FOUND: root '{root}' not allowed")
elif mode == "update":
if not tab:
errors.append("MISSING_PARAM: tab required for mode=update")
if index is None:
errors.append("MISSING_PARAM: index required for mode=update")
if not field:
errors.append("MISSING_PARAM: field required for mode=update")
elif field not in [f.value for f in FieldType]:
errors.append(f"INVALID_PARAM: field '{field}' invalid")
if edit_mode not in [e.value for e in EditMode]:
errors.append(f"INVALID_PARAM: edit_mode '{edit_mode}' invalid")
if edit_mode == "substitute" and not match:
errors.append("MISSING_PARAM: match required for edit_mode=substitute")
# Check field/value match
if field == "text" and text is None:
errors.append("MISSING_PARAM: text required for field=text")
if field == "tags" and tags is None:
errors.append("MISSING_PARAM: tags required for field=tags")
if field == "note" and note is None:
errors.append("MISSING_PARAM: note required for field=note")
# Check tab and index exist
if tab and index is not None:
if not await client.tab_exists(tab):
errors.append(f"TAB_NOT_FOUND: {tab}")
else:
count = await client.get_count(tab)
if index < 0 or index >= count:
errors.append(f"INDEX_OUT_OF_BOUNDS: {index} (count: {count})")
elif mode == "delete":
if not tab:
errors.append("MISSING_PARAM: tab required for mode=delete")
if index is None:
errors.append("MISSING_PARAM: index required for mode=delete")
if tab and index is not None:
if not await client.tab_exists(tab):
errors.append(f"TAB_NOT_FOUND: {tab}")
else:
count = await client.get_count(tab)
if index < 0 or index >= count:
errors.append(f"INDEX_OUT_OF_BOUNDS: {index} (count: {count})")
elif mode == "move":
if not tab:
errors.append("MISSING_PARAM: tab required for mode=move")
if index is None:
errors.append("MISSING_PARAM: index required for mode=move")
if not to_tab:
errors.append("MISSING_PARAM: to_tab required for mode=move")
if tab and not await client.tab_exists(tab):
errors.append(f"TAB_NOT_FOUND: {tab}")
elif mode == "tab_create":
if not path:
errors.append("MISSING_PARAM: path required for mode=tab_create")
elif not path.startswith("workspace"):
errors.append(f"PERMISSION_DENIED: tab_create only allowed in workspace/")
if path and await client.tab_exists(path):
warnings.append(f"Tab already exists: {path}")
elif mode == "tab_delete":
if not path:
errors.append("MISSING_PARAM: path required for mode=tab_delete")
elif not path.startswith("workspace"):
errors.append(f"PERMISSION_DENIED: tab_delete only allowed in workspace/")
if path and not await client.tab_exists(path):
errors.append(f"TAB_NOT_FOUND: {path}")
return errors, warnings
def _estimate_tokens(tool: str, params: dict) -> int:
"""Estimate token count for response."""
base = 50 # Base overhead
if tool == "read":
mode = params.get("mode", "")
max_items = params.get("max_items", 20)
if mode == "tree":
return base + max_items * 30
elif mode == "list":
return base + max_items * 25
elif mode == "item":
return base + 200 # Full item
elif mode == "search":
return base + max_items * 30
elif tool == "write":
return base + 20 # Write responses are compact
return base