Skip to main content
Glama
be3600_playwright.py20.6 kB
"""BE3600 Router Client using Playwright for all operations. This module uses browser automation to handle the complex encryption scheme of BE3600 routers. All operations are performed through the browser's UI automation. """ import json import re from typing import Optional, List, Dict from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page class BE3600PlaywrightClient: """HTTP client for BE3600 router using Playwright for all operations. Since the router uses encrypted requests/responses, we let the browser handle all operations through its native JavaScript. """ def __init__(self, host: str, password: str, username: str = "admin"): self.host = host self.password = password self.username = username self.base_url = f"http://{host}" self.stok: Optional[str] = None self.sysauth: Optional[str] = None self._browser: Optional[Browser] = None self._context: Optional[BrowserContext] = None self._page: Optional[Page] = None self._playwright = None def login(self) -> bool: """Authenticate with the router using browser automation.""" try: self._playwright = sync_playwright().start() self._browser = self._playwright.chromium.launch(headless=True) self._context = self._browser.new_context( ignore_https_errors=True, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ) page = self._context.new_page() # Capture stok from network traffic def capture_stok(request): if "stok=" in request.url and "/stok=" not in request.url: match = re.search(r';stok=([^/]+)', request.url) if match and match.group(1): self.stok = match.group(1) page.on("request", capture_stok) # Navigate to router page.goto(f"{self.base_url}/", wait_until="domcontentloaded") page.wait_for_timeout(2000) page.wait_for_load_state("networkidle") # Find and fill password field password_field = page.query_selector('input[type="password"]') if not password_field: print("Could not find password field") return False password_field.fill(self.password) page.wait_for_timeout(500) # Find and click login button login_button = page.query_selector('button[type="submit"]') or \ page.query_selector('button:has-text("Login")') or \ page.query_selector('button:has-text("Log In")') or \ page.query_selector('.login-btn') or \ page.query_selector('button') if not login_button: print("Could not find login button") return False login_button.click() page.wait_for_timeout(3000) page.wait_for_load_state("networkidle") # Get sysauth cookie cookies = self._context.cookies() for cookie in cookies: if cookie['name'] == 'sysauth': self.sysauth = cookie['value'] break self._page = page return bool(self.stok and self.sysauth) except Exception as e: print(f"Login error: {e}") import traceback traceback.print_exc() return False def logout(self): """Close browser and clean up.""" try: if self._browser: self._browser.close() if self._playwright: self._playwright.stop() except: pass self.stok = None self.sysauth = None self._page = None def _click_element(self, text: str, timeout: int = 5000) -> bool: """Click an element by text using JavaScript for reliability.""" try: result = self._page.evaluate(f''' () => {{ const elements = Array.from(document.querySelectorAll('*')); const matches = elements.filter(el => el.textContent.trim() === '{text}' && el.tagName !== 'SCRIPT' && el.offsetParent !== null ); for (const el of matches) {{ el.click(); return true; }} return false; }} ''') return result except: return False def _navigate_to(self, menu_text: str, submenu_text: str = None, wait_time: int = 3000): """Navigate to a specific page in the router UI by clicking menu items.""" if not self._page: raise Exception("Not logged in") try: # Click main menu using JavaScript (more reliable for this UI) if self._click_element(menu_text): self._page.wait_for_timeout(1500) self._page.wait_for_load_state("networkidle") # Click submenu if specified if submenu_text: self._page.wait_for_timeout(500) if self._click_element(submenu_text): self._page.wait_for_timeout(1500) self._page.wait_for_load_state("networkidle") self._page.wait_for_timeout(wait_time) except Exception as e: print(f"Navigation error: {e}") def get_status(self) -> dict: """Get router status from the network map page.""" self._navigate_to("Network Map") # Extract data from the page data = self._page.evaluate(""" () => { const result = {}; // Try to get internet status const internetStatus = document.querySelector('.internet-status, .wan-status'); if (internetStatus) { result.internet_status = internetStatus.textContent.trim(); } // Try to get connected devices count const deviceCount = document.querySelector('.device-count, .client-count'); if (deviceCount) { result.device_count = deviceCount.textContent.trim(); } // Get router model from title or header const title = document.title || ''; result.title = title; // Get all text content for debugging const mainContent = document.querySelector('#app, .main-content, main'); if (mainContent) { result.page_text = mainContent.innerText.substring(0, 2000); } return result; } """) return data def get_devices(self) -> List[Dict]: """Get list of connected devices by clicking on Clients icon.""" # Click on the Clients icon in the network map try: clients_icon = self._page.query_selector('text="Clients"') or \ self._page.query_selector('.clients, [class*="client"]') if clients_icon: clients_icon.click() self._page.wait_for_timeout(2000) except: pass # Wait for client list to load self._page.wait_for_timeout(2000) # Extract device data devices = self._page.evaluate(""" () => { const devices = []; // Try different selectors for device list const rows = document.querySelectorAll( '.client-item, .device-row, tr[data-device], .device-card' ); rows.forEach(row => { const device = {}; // Try to find name const nameEl = row.querySelector('.device-name, .client-name, td:first-child'); if (nameEl) device.name = nameEl.textContent.trim(); // Try to find IP const ipEl = row.querySelector('.device-ip, .client-ip, [class*="ip"]'); if (ipEl) device.ip = ipEl.textContent.trim(); // Try to find MAC const macEl = row.querySelector('.device-mac, .client-mac, [class*="mac"]'); if (macEl) device.mac = macEl.textContent.trim(); if (Object.keys(device).length > 0) { devices.push(device); } }); // If no devices found, try to get raw text if (devices.length === 0) { const content = document.querySelector('#app, .main-content'); if (content) { return [{raw_text: content.innerText.substring(0, 3000)}]; } } return devices; } """) return devices def get_port_forwarding(self) -> List[Dict]: """Get port forwarding rules.""" # Navigate to Advanced -> NAT Forwarding -> Virtual Servers self._navigate_to("Advanced") self._page.wait_for_timeout(1500) # Click NAT Forwarding try: nat_menu = self._page.query_selector('text="NAT Forwarding"') or \ self._page.query_selector('text="NAT"') or \ self._page.query_selector('[class*="nat"]') if nat_menu: nat_menu.click() self._page.wait_for_timeout(1000) # Click Virtual Servers vs_menu = self._page.query_selector('text="Virtual Servers"') or \ self._page.query_selector('text="Port Forwarding"') if vs_menu: vs_menu.click() self._page.wait_for_timeout(2000) except Exception as e: print(f"Navigation to port forwarding failed: {e}") rules = self._page.evaluate(""" () => { const rules = []; // Try different selectors for port forwarding list const rows = document.querySelectorAll( '.rule-item, .forward-row, tr[data-rule], .virtual-server-item, table tbody tr' ); rows.forEach(row => { const rule = {}; // Table columns: Name, Device IP, External Port, Internal Port, Protocol, Status, Modify const cells = row.querySelectorAll('td, .cell'); if (cells.length >= 5) { rule.name = cells[0]?.textContent?.trim(); rule.internal_ip = cells[1]?.textContent?.trim(); // Device IP Address rule.external_port = cells[2]?.textContent?.trim(); rule.internal_port = cells[3]?.textContent?.trim(); rule.protocol = cells[4]?.textContent?.trim(); if (cells[5]) rule.status = cells[5]?.textContent?.trim(); } // Skip header rows or empty rows if (rule.name === 'Service Name' || !rule.name) { return; } if (Object.keys(rule).length > 0) { rules.push(rule); } }); // If no rules found, get page content if (rules.length === 0) { const content = document.querySelector('#app, .main-content'); if (content) { return [{raw_text: content.innerText.substring(0, 3000)}]; } } return rules; } """) return rules def add_port_forward(self, name: str, external_port: str, internal_ip: str, internal_port: str = "", protocol: str = "All", is_port_range: bool = False) -> bool: """Add a port forwarding rule through the UI. Args: name: Service name for the rule external_port: External port or port range (e.g., "5060" or "18000-18100") internal_ip: Internal device IP address (e.g., "10.13.37.169") internal_port: Internal port (optional, uses external port if empty) protocol: "All", "TCP", or "UDP" is_port_range: True if external_port is a range like "18000-18100" Returns True if successful. """ # First navigate to the port forwarding page self.get_port_forwarding() # This navigates to the right page try: # Click "Add" button self._click_element("Add") self._page.wait_for_timeout(2000) # Fill form using JavaScript for reliability success = self._page.evaluate(f''' () => {{ // Get all text inputs in the modal const inputs = Array.from(document.querySelectorAll('.su-dialog input.su-input__content')); if (inputs.length < 4) return {{ error: 'Not enough inputs found', count: inputs.length }}; // Fill Service Name (first input) inputs[0].value = '{name}'; inputs[0].dispatchEvent(new Event('input', {{ bubbles: true }})); // Fill Device IP Address (second input) inputs[1].value = '{internal_ip}'; inputs[1].dispatchEvent(new Event('input', {{ bubbles: true }})); // Select Port Range if needed if ({str(is_port_range).lower()}) {{ const portRangeRadio = document.querySelector('.su-dialog .su-radio:nth-child(2) input') || Array.from(document.querySelectorAll('.su-dialog .su-radio')).find(r => r.textContent.includes('Port Range'))?.querySelector('input'); if (portRangeRadio) {{ portRangeRadio.click(); }} }} // Wait a bit for UI to update after radio selection // Fill External Port (third input) setTimeout(() => {{ const updatedInputs = Array.from(document.querySelectorAll('.su-dialog input.su-input__content')); updatedInputs[2].value = '{external_port}'; updatedInputs[2].dispatchEvent(new Event('input', {{ bubbles: true }})); // Fill Internal Port if provided (fourth input) if ('{internal_port}') {{ updatedInputs[3].value = '{internal_port}'; updatedInputs[3].dispatchEvent(new Event('input', {{ bubbles: true }})); }} }}, 100); return {{ success: true }}; }} ''') self._page.wait_for_timeout(500) # Fill the port fields again to ensure they're set self._page.evaluate(f''' () => {{ const inputs = Array.from(document.querySelectorAll('.su-dialog input.su-input__content')); if (inputs.length >= 3) {{ inputs[2].value = '{external_port}'; inputs[2].dispatchEvent(new Event('input', {{ bubbles: true }})); }} if (inputs.length >= 4 && '{internal_port}') {{ inputs[3].value = '{internal_port}'; inputs[3].dispatchEvent(new Event('input', {{ bubbles: true }})); }} }} ''') # Select protocol from dropdown if protocol != "All": self._page.evaluate(f''' () => {{ const select = document.querySelector('.su-dialog .su-select'); if (select) {{ select.click(); setTimeout(() => {{ const options = document.querySelectorAll('.su-select-dropdown__item'); for (const opt of options) {{ if (opt.textContent.trim() === '{protocol}') {{ opt.click(); break; }} }} }}, 200); }} }} ''') self._page.wait_for_timeout(500) # Click Save button self._page.wait_for_timeout(500) save_clicked = self._click_element("SAVE") if not save_clicked: # Try alternative selector self._page.evaluate(''' () => { const saveBtn = document.querySelector('.su-dialog button:last-child') || Array.from(document.querySelectorAll('.su-dialog button')).find(b => b.textContent.includes('SAVE')); if (saveBtn) saveBtn.click(); } ''') self._page.wait_for_timeout(2000) return True except Exception as e: print(f"Error adding port forward: {e}") import traceback traceback.print_exc() return False def get_dhcp_settings(self) -> dict: """Get DHCP server settings.""" # Navigate to Advanced -> DHCP Server self._navigate_to("Advanced") self._page.wait_for_timeout(1500) try: dhcp_menu = self._page.query_selector('text="DHCP Server"') or \ self._page.query_selector('text="DHCP"') if dhcp_menu: dhcp_menu.click() self._page.wait_for_timeout(2000) except Exception as e: print(f"Navigation to DHCP failed: {e}") data = self._page.evaluate(""" () => { const result = {}; // Get all input values const inputs = document.querySelectorAll('input, select'); inputs.forEach(input => { const name = input.name || input.id || input.placeholder; if (name) { result[name] = input.value; } }); // Get page text as fallback const content = document.querySelector('#app, .main-content'); if (content) { result.page_text = content.innerText.substring(0, 2000); } return result; } """) return data def take_screenshot(self, path: str = "/tmp/router_screenshot.png"): """Take a screenshot of the current page.""" if self._page: self._page.screenshot(path=path) return path return None def test_client(): """Test the BE3600 client.""" import os from dotenv import load_dotenv load_dotenv() host = os.getenv("TPLINK_HOST", "10.13.37.1") password = os.getenv("TPLINK_PASSWORD", "") print(f"Testing BE3600 router at {host}") client = BE3600PlaywrightClient(host, password) print("\n=== Logging in ===") if client.login(): print(f"✅ Login successful!") print(f" STOK: {client.stok}") print(f" Sysauth: {client.sysauth[:30]}..." if client.sysauth else " No sysauth") print("\n=== Getting router status ===") try: status = client.get_status() print(f"Status: {json.dumps(status, indent=2)[:1000]}") except Exception as e: print(f"Status error: {e}") print("\n=== Getting connected devices ===") try: devices = client.get_devices() print(f"Devices: {json.dumps(devices, indent=2)[:1000]}") except Exception as e: print(f"Devices error: {e}") print("\n=== Getting port forwarding rules ===") try: rules = client.get_port_forwarding() print(f"Port forwarding: {json.dumps(rules, indent=2)[:1000]}") except Exception as e: print(f"Port forwarding error: {e}") print("\n=== Taking screenshot ===") try: path = client.take_screenshot() print(f"Screenshot saved to: {path}") except Exception as e: print(f"Screenshot error: {e}") client.logout() print("\n✅ Test complete") else: print("❌ Login failed") if __name__ == "__main__": test_client()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/consigcody94/mcp-tplink-router'

If you have feedback or need assistance with the MCP directory API, please join our Discord server