Skip to main content
Glama
validate.py8.24 kB
"""mcp_validate tool implementation.""" from ..copyq_client import client from ..models import ( ReadMode, WriteMode, EditMode, FieldType, SearchIn, Intent, ALLOWED_ROOT_TABS, TABS_WITH_SUBTABS, ) async def mcp_validate( tool: str, params: dict, ) -> str: """ Validate parameters for mcp_read or mcp_write without executing. Returns validation result with errors and warnings. """ errors: list[str] = [] warnings: list[str] = [] if tool == "read": errors, warnings = await _validate_read(params) elif tool == "write": errors, warnings = await _validate_write(params) else: errors.append(f"INVALID_TOOL: Unknown tool '{tool}'. Valid: read, write") if errors: return f"valid:false|errors:[{'; '.join(errors)}]" warning_str = f"|warnings:[{'; '.join(warnings)}]" if warnings else "" estimated_tokens = _estimate_tokens(tool, params) return f"valid:true{warning_str}|estimated_tokens:{estimated_tokens}" async def _validate_read(params: dict) -> tuple[list[str], list[str]]: """Validate mcp_read parameters.""" errors: list[str] = [] warnings: list[str] = [] mode = params.get("mode") tab = params.get("tab", "") index = params.get("index") query = params.get("query") search_in = params.get("search_in", "all") max_depth = params.get("max_depth", 2) max_items = params.get("max_items", 20) skip = params.get("skip", 0) # Validate mode valid_modes = [m.value for m in ReadMode] if not mode: errors.append("MISSING_PARAM: mode is required") elif mode not in valid_modes: errors.append(f"INVALID_MODE: '{mode}' not in {valid_modes}") # Mode-specific validation if mode == "list" or mode == "item": if not tab: errors.append(f"MISSING_PARAM: tab is required for mode={mode}") elif not await client.tab_exists(tab): errors.append(f"TAB_NOT_FOUND: {tab}") if mode == "item": if index is None: errors.append("MISSING_PARAM: index is required for mode=item") elif tab and await client.tab_exists(tab): count = await client.get_count(tab) if index < 0 or index >= count: errors.append(f"INDEX_OUT_OF_BOUNDS: {index} (count: {count})") if mode == "search": if not query: errors.append("MISSING_PARAM: query is required for mode=search") if search_in not in [s.value for s in SearchIn]: errors.append(f"INVALID_PARAM: search_in '{search_in}' invalid") # Validate numeric params if max_depth < 1 or max_depth > 10: warnings.append(f"max_depth={max_depth} clamped to 1-10") if max_items < 1 or max_items > 100: warnings.append(f"max_items={max_items} clamped to 1-100") if skip < 0: errors.append("INVALID_PARAM: skip must be >= 0") return errors, warnings async def _validate_write(params: dict) -> tuple[list[str], list[str]]: """Validate mcp_write parameters.""" errors: list[str] = [] warnings: list[str] = [] mode = params.get("mode") tab = params.get("tab") index = params.get("index") text = params.get("text") tags = params.get("tags") note = params.get("note") field = params.get("field") edit_mode = params.get("edit_mode", "replace") match = params.get("match") to_tab = params.get("to_tab") path = params.get("path") intent = params.get("intent", "execute") # Validate mode valid_modes = [m.value for m in WriteMode] if not mode: errors.append("MISSING_PARAM: mode is required") elif mode not in valid_modes: errors.append(f"INVALID_MODE: '{mode}' not in {valid_modes}") return errors, warnings # Validate intent if intent not in [i.value for i in Intent]: errors.append(f"INVALID_PARAM: intent '{intent}' invalid") # Mode-specific validation if mode == "add": if not tab: errors.append("MISSING_PARAM: tab required for mode=add") if not text: errors.append("MISSING_PARAM: text required for mode=add") if tab: root = tab.split("/")[0] if root not in ALLOWED_ROOT_TABS: errors.append(f"TAB_NOT_FOUND: root '{root}' not allowed") elif mode == "update": if not tab: errors.append("MISSING_PARAM: tab required for mode=update") if index is None: errors.append("MISSING_PARAM: index required for mode=update") if not field: errors.append("MISSING_PARAM: field required for mode=update") elif field not in [f.value for f in FieldType]: errors.append(f"INVALID_PARAM: field '{field}' invalid") if edit_mode not in [e.value for e in EditMode]: errors.append(f"INVALID_PARAM: edit_mode '{edit_mode}' invalid") if edit_mode == "substitute" and not match: errors.append("MISSING_PARAM: match required for edit_mode=substitute") # Check field/value match if field == "text" and text is None: errors.append("MISSING_PARAM: text required for field=text") if field == "tags" and tags is None: errors.append("MISSING_PARAM: tags required for field=tags") if field == "note" and note is None: errors.append("MISSING_PARAM: note required for field=note") # Check tab and index exist if tab and index is not None: if not await client.tab_exists(tab): errors.append(f"TAB_NOT_FOUND: {tab}") else: count = await client.get_count(tab) if index < 0 or index >= count: errors.append(f"INDEX_OUT_OF_BOUNDS: {index} (count: {count})") elif mode == "delete": if not tab: errors.append("MISSING_PARAM: tab required for mode=delete") if index is None: errors.append("MISSING_PARAM: index required for mode=delete") if tab and index is not None: if not await client.tab_exists(tab): errors.append(f"TAB_NOT_FOUND: {tab}") else: count = await client.get_count(tab) if index < 0 or index >= count: errors.append(f"INDEX_OUT_OF_BOUNDS: {index} (count: {count})") elif mode == "move": if not tab: errors.append("MISSING_PARAM: tab required for mode=move") if index is None: errors.append("MISSING_PARAM: index required for mode=move") if not to_tab: errors.append("MISSING_PARAM: to_tab required for mode=move") if tab and not await client.tab_exists(tab): errors.append(f"TAB_NOT_FOUND: {tab}") elif mode == "tab_create": if not path: errors.append("MISSING_PARAM: path required for mode=tab_create") elif not path.startswith("workspace"): errors.append(f"PERMISSION_DENIED: tab_create only allowed in workspace/") if path and await client.tab_exists(path): warnings.append(f"Tab already exists: {path}") elif mode == "tab_delete": if not path: errors.append("MISSING_PARAM: path required for mode=tab_delete") elif not path.startswith("workspace"): errors.append(f"PERMISSION_DENIED: tab_delete only allowed in workspace/") if path and not await client.tab_exists(path): errors.append(f"TAB_NOT_FOUND: {path}") return errors, warnings def _estimate_tokens(tool: str, params: dict) -> int: """Estimate token count for response.""" base = 50 # Base overhead if tool == "read": mode = params.get("mode", "") max_items = params.get("max_items", 20) if mode == "tree": return base + max_items * 30 elif mode == "list": return base + max_items * 25 elif mode == "item": return base + 200 # Full item elif mode == "search": return base + max_items * 30 elif tool == "write": return base + 20 # Write responses are compact return base

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/list91/mcp-copyq'

If you have feedback or need assistance with the MCP directory API, please join our Discord server