rlm_exec
Run Python code against a loaded context in a sandboxed subprocess for secure data analysis. Set the result variable for output. Optional timeout and security checks.
Instructions
Execute Python code against a loaded context in a sandboxed subprocess.
Set result variable for output.
Args: code: Python code to execute. User sets result variable for output. context_name: Name of previously loaded context timeout: Max execution time in seconds (default 30)
Security: When RLM_FIREWALL_ENABLED=1, code is checked against known dangerous patterns before execution. Blocked code returns an error instead of executing.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| code | Yes | ||
| context_name | Yes | ||
| timeout | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/rlm_mcp_server.py:1903-1908 (registration)This is the @mcp.tool() decorator registration for the 'rlm_exec' tool, making it available as an MCP tool.
@mcp.tool() async def rlm_exec( code: str, context_name: str, timeout: int = 30, ) -> dict: - src/rlm_mcp_server.py:1904-2036 (handler)The rlm_exec handler function that executes Python code against a loaded context in a sandboxed subprocess. It accepts 'code', 'context_name', and 'timeout' parameters, checks the code against the firewall if enabled, injects the context as stdin, and captures stdout/stderr and a 'result' variable.
async def rlm_exec( code: str, context_name: str, timeout: int = 30, ) -> dict: """Execute Python code against a loaded context in a sandboxed subprocess. Set result variable for output. Args: code: Python code to execute. User sets result variable for output. context_name: Name of previously loaded context timeout: Max execution time in seconds (default 30) Security: When RLM_FIREWALL_ENABLED=1, code is checked against known dangerous patterns before execution. Blocked code returns an error instead of executing. """ # Check code against firewall if enabled firewall_result = await _check_code_firewall(code) if firewall_result.get("blocked"): return { "error": "blocked_by_firewall", "reason": firewall_result.get("reason", "Dangerous pattern detected"), "pattern": firewall_result.get("pattern"), "similarity": firewall_result.get("similarity", 0.0), "message": "Code execution blocked for security. Set RLM_FIREWALL_ENABLED=0 to disable.", } # Include firewall warning in response if there was an issue firewall_warning = firewall_result.get("warning") # Ensure context is loaded error = _ensure_context_loaded(context_name) if error: return {"error": "context_not_found", "message": error} content = contexts[context_name]["content"] # Create a temporary Python file with the execution environment with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: temp_file = f.name # Write the execution wrapper f.write(""" import sys import json import re import collections # Inject context as read-only variable context = sys.stdin.read() # User code execution result = None try: """) # Indent user code for line in code.split("\n"): f.write(f" {line}\n") # Capture result f.write(""" # Output result if result is not None: print("__RESULT_START__") print(json.dumps(result, indent=2) if isinstance(result, (dict, list)) else str(result)) print("__RESULT_END__") except Exception as e: print(f"__ERROR__: {type(e).__name__}: {e}", file=sys.stderr) sys.exit(1) """) try: # Run the subprocess with minimal environment (no shell=True for security) env = { "PATH": os.environ.get("PATH", "/usr/bin:/bin"), } process = subprocess.run( [sys.executable, temp_file], input=content, capture_output=True, text=True, timeout=timeout, env=env, ) # Parse output stdout = process.stdout stderr = process.stderr return_code = process.returncode # Extract result result = None if "__RESULT_START__" in stdout and "__RESULT_END__" in stdout: result_start = stdout.index("__RESULT_START__") + len("__RESULT_START__\n") result_end = stdout.index("__RESULT_END__") result_str = stdout[result_start:result_end].strip() try: result = json.loads(result_str) except json.JSONDecodeError: result = result_str # Clean stdout stdout = stdout[: stdout.index("__RESULT_START__")].strip() response = { "result": result, "stdout": stdout, "stderr": stderr, "return_code": return_code, "timed_out": False, } if firewall_warning: response["firewall_warning"] = firewall_warning return response except subprocess.TimeoutExpired: return { "result": None, "stdout": "", "stderr": f"Execution timed out after {timeout} seconds", "return_code": -1, "timed_out": True, } except Exception as e: return {"error": "execution_error", "message": str(e)} finally: # Clean up temp file try: os.unlink(temp_file) except Exception: pass - src/rlm_mcp_server.py:110-327 (helper)The _check_code_firewall helper function used by rlm_exec to check code against dangerous patterns before execution when the firewall is enabled.
async def _check_code_firewall(code: str) -> dict: """ Check code against the firewall before execution. Uses the code-firewall-mcp structural similarity approach: 1. Normalize code (strip identifiers/literals, preserve security-sensitive ones) 2. Embed via Ollama 3. Check against blacklist Returns: { "allowed": bool, "blocked": bool, "reason": str | None, "similarity": float, "error": str | None, } """ if not FIREWALL_ENABLED: return {"allowed": True, "blocked": False, "reason": None, "similarity": 0.0} if not HAS_HTTPX: return {"allowed": True, "blocked": False, "error": "httpx not available for firewall"} # Security-sensitive identifiers to preserve (same as code-firewall-mcp) SECURITY_SENSITIVE = { "eval", "exec", "compile", "__import__", "system", "popen", "spawn", "fork", "execl", "execle", "execlp", "execv", "execve", "execvp", "spawnl", "spawnle", "spawnlp", "spawnv", "spawnve", "spawnvp", "subprocess", "Popen", "call", "check_call", "check_output", "run", "shell", "os", "remove", "unlink", "rmdir", "removedirs", "rename", "chmod", "chown", "link", "symlink", "mkdir", "makedirs", "open", "read", "write", "truncate", "socket", "connect", "bind", "listen", "accept", "send", "recv", "urlopen", "urlretrieve", "Request", "load", "loads", "pickle", "unpickle", "marshal", "__class__", "__bases__", "__subclasses__", "__mro__", "__globals__", "__code__", "__builtins__", "ctypes", "cffi", "CDLL", "windll", "oledll", "getattr", "setattr", "delattr", "hasattr", } # Python keywords to preserve keywords = { "import", "from", "def", "class", "return", "if", "else", "elif", "for", "while", "try", "except", "finally", "with", "as", "async", "await", "yield", "raise", "pass", "break", "continue", "and", "or", "not", "in", "is", "lambda", "global", "nonlocal", "True", "False", "None", } preserve = keywords | SECURITY_SENSITIVE # Normalize code normalized = code # Strip comments normalized = re.sub(r"#.*$", "", normalized, flags=re.MULTILINE) # Replace identifiers (except preserved ones) def replace_id(m): return m.group(0) if m.group(0) in preserve else "_" normalized = re.sub(r"\b[a-zA-Z_][a-zA-Z0-9_]*\b", replace_id, normalized) # Replace strings and numbers normalized = re.sub(r'"[^"]*"', '"S"', normalized) normalized = re.sub(r"'[^']*'", '"S"', normalized) normalized = re.sub(r"\b\d+\.?\d*\b", "N", normalized) normalized = re.sub(r"\s+", " ", normalized).strip() # Quick check: if no security-sensitive identifiers, likely safe has_sensitive = any(s in normalized for s in SECURITY_SENSITIVE) if not has_sensitive: return {"allowed": True, "blocked": False, "reason": None, "similarity": 0.0} # Get embedding from Ollama try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{FIREWALL_URL}/api/embed", json={"model": FIREWALL_EMBEDDING_MODEL, "input": normalized}, ) if response.status_code != 200: # Firewall unavailable, allow but warn return { "allowed": True, "blocked": False, "error": f"Firewall embedding failed: {response.status_code}", "warning": "Firewall check skipped - Ollama unavailable", } data = response.json() embedding = data.get("embeddings", [[]])[0] or data.get("embedding", []) if not embedding: return {"allowed": True, "blocked": False, "error": "No embedding returned"} # For now, just check for obvious dangerous patterns via keywords # Full ChromaDB similarity check would require the firewall server # This is a lightweight inline check dangerous_patterns = [ ("os.system", "Direct shell command execution"), ("subprocess", "Subprocess execution"), ("eval(", "Dynamic code evaluation"), ("exec(", "Dynamic code execution"), ("__import__", "Dynamic module import"), ("Popen", "Process spawning"), ("shell=True", "Shell execution enabled"), ] for pattern, reason in dangerous_patterns: if pattern in code: return { "allowed": False, "blocked": True, "reason": reason, "pattern": pattern, "similarity": 1.0, } return {"allowed": True, "blocked": False, "reason": None, "similarity": 0.0} except httpx.ConnectError: # Firewall service not running, allow but warn return { "allowed": True, "blocked": False, "error": "Firewall service not reachable", "warning": "Firewall check skipped - connection refused", } except Exception as e: return {"allowed": True, "blocked": False, "error": str(e)} - src/rlm_mcp_server.py:1909-1920 (schema)The docstring and type annotations for rlm_exec defining the input schema (code: str, context_name: str, timeout: int = 30) and security notes. The output is a dict with result, stdout, stderr, return_code, and timed_out fields.
"""Execute Python code against a loaded context in a sandboxed subprocess. Set result variable for output. Args: code: Python code to execute. User sets result variable for output. context_name: Name of previously loaded context timeout: Max execution time in seconds (default 30) Security: When RLM_FIREWALL_ENABLED=1, code is checked against known dangerous patterns before execution. Blocked code returns an error instead of executing.