Skip to main content
Glama
find_in_file.py6.38 kB
import base64 import os import re from typing import Annotated, Any from urllib.parse import unquote, urlparse from fastmcp import Context from services.registry import mcp_for_unity_tool from services.tools import get_unity_instance_from_context from transport.unity_transport import send_with_unity_instance from transport.legacy.unity_connection import async_send_command_with_retry def _split_uri(uri: str) -> tuple[str, str]: """Split an incoming URI or path into (name, directory) suitable for Unity. Rules: - unity://path/Assets/... → keep as Assets-relative (after decode/normalize) - file://... → percent-decode, normalize, strip host and leading slashes, then, if any 'Assets' segment exists, return path relative to that 'Assets' root. Otherwise, fall back to original name/dir behavior. - plain paths → decode/normalize separators; if they contain an 'Assets' segment, return relative to 'Assets'. """ raw_path: str if uri.startswith("unity://path/"): raw_path = uri[len("unity://path/"):] elif uri.startswith("file://"): parsed = urlparse(uri) host = (parsed.netloc or "").strip() p = parsed.path or "" # UNC: file://server/share/... -> //server/share/... if host and host.lower() != "localhost": p = f"//{host}{p}" # Use percent-decoded path, preserving leading slashes raw_path = unquote(p) else: raw_path = uri # Percent-decode any residual encodings and normalize separators raw_path = unquote(raw_path).replace("\\", "/") # Strip leading slash only for Windows drive-letter forms like "/C:/..." if os.name == "nt" and len(raw_path) >= 3 and raw_path[0] == "/" and raw_path[2] == ":": raw_path = raw_path[1:] # Normalize path (collapse ../, ./) norm = os.path.normpath(raw_path).replace("\\", "/") # If an 'Assets' segment exists, compute path relative to it (case-insensitive) parts = [p for p in norm.split("/") if p not in ("", ".")] idx = next((i for i, seg in enumerate(parts) if seg.lower() == "assets"), None) assets_rel = "/".join(parts[idx:]) if idx is not None else None effective_path = assets_rel if assets_rel else norm # For POSIX absolute paths outside Assets, drop the leading '/' # to return a clean relative-like directory (e.g., '/tmp' -> 'tmp'). if effective_path.startswith("/"): effective_path = effective_path[1:] name = os.path.splitext(os.path.basename(effective_path))[0] directory = os.path.dirname(effective_path) return name, directory @mcp_for_unity_tool(description="Searches a file with a regex pattern and returns line numbers and excerpts.") async def find_in_file( ctx: Context, uri: Annotated[str, "The resource URI to search under Assets/ or file path form supported by read_resource"], pattern: Annotated[str, "The regex pattern to search for"], project_root: Annotated[str | None, "Optional project root path"] = None, max_results: Annotated[int, "Cap results to avoid huge payloads"] = 200, ignore_case: Annotated[bool | str | None, "Case insensitive search"] = True, ) -> dict[str, Any]: # project_root is currently unused but kept for interface consistency unity_instance = get_unity_instance_from_context(ctx) await ctx.info( f"Processing find_in_file: {uri} (unity_instance={unity_instance or 'default'})") name, directory = _split_uri(uri) # 1. Read file content via Unity read_resp = await send_with_unity_instance( async_send_command_with_retry, unity_instance, "manage_script", { "action": "read", "name": name, "path": directory, }, ) if not isinstance(read_resp, dict) or not read_resp.get("success"): return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)} data = read_resp.get("data", {}) contents = data.get("contents") if not contents and data.get("contentsEncoded") and data.get("encodedContents"): try: contents = base64.b64decode(data.get("encodedContents", "").encode( "utf-8")).decode("utf-8", "replace") except (ValueError, TypeError, base64.binascii.Error): contents = contents or "" if contents is None: return {"success": False, "message": "Could not read file content."} # 2. Perform regex search flags = re.MULTILINE # Handle ignore_case which can be boolean or string from some clients ic = ignore_case if isinstance(ic, str): ic = ic.lower() in ("true", "1", "yes") if ic: flags |= re.IGNORECASE try: regex = re.compile(pattern, flags) except re.error as e: return {"success": False, "message": f"Invalid regex pattern: {e}"} # If the regex is not multiline specific (doesn't contain \n literal match logic), # we could iterate lines. But users might use multiline regexes. # Let's search the whole content and map back to lines. found = list(regex.finditer(contents)) results = [] count = 0 for m in found: if count >= max_results: break start_idx = m.start() end_idx = m.end() # Calculate line number # Count newlines up to start_idx line_num = contents.count('\n', 0, start_idx) + 1 # Get line content for excerpt # Find start of line line_start = contents.rfind('\n', 0, start_idx) + 1 # Find end of line line_end = contents.find('\n', start_idx) if line_end == -1: line_end = len(contents) line_content = contents[line_start:line_end] # Create excerpt # We can just return the line content as excerpt results.append({ "line": line_num, "content": line_content.strip(), # detailed match info? "match": m.group(0), "start": start_idx, "end": end_idx }) count += 1 return { "success": True, "data": { "matches": results, "count": len(results), "total_matches": len(found) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CoplayDev/unity-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server