Skip to main content
Glama
be3600_crypto.py18.5 kB
"""BE3600 Router Encryption Module. Implements the sign+data encryption scheme used by TP-Link BE3600 routers with firmware version 1.1.0+ (2025). The encryption flow: 1. Get RSA public key and sequence from /login?form=auth 2. Hash password: SHA256("admin" + password) 3. Generate random AES-256 key and IV 4. Encrypt password with AES-CBC 5. Create signature with AES key info and hash 6. Encrypt signature chunks with RSA 7. Send { sign: <rsa_encrypted_sig>, data: <aes_encrypted_password> } """ import base64 import hashlib import os import secrets from typing import Optional, Tuple from Crypto.Cipher import AES, PKCS1_v1_5, PKCS1_OAEP from Crypto.PublicKey import RSA from Crypto.Util.Padding import pad, unpad class BE3600Crypto: """Encryption handler for BE3600 routers.""" SIGNATURE_CHUNK_SIZE = 53 def __init__(self): # RSA key for signature (from /login?form=auth) self.rsa_n: Optional[str] = None self.rsa_e: Optional[str] = None # RSA key for password (from /login?form=keys) self.pwd_rsa_n: Optional[str] = None self.pwd_rsa_e: Optional[str] = None self.sequence: Optional[int] = None self.aes_key: Optional[bytes] = None self.aes_iv: Optional[bytes] = None self.hash: Optional[str] = None def set_rsa_key(self, n: str, e: str): """Set RSA public key for signature (hex strings).""" self.rsa_n = n self.rsa_e = e def set_password_rsa_key(self, n: str, e: str): """Set RSA public key for password encryption (hex strings).""" self.pwd_rsa_n = n self.pwd_rsa_e = e def set_sequence(self, seq: int): """Set the sequence number from the router.""" self.sequence = seq def generate_aes_key(self, use_hex: bool = True): """Generate random AES key and IV. Args: use_hex: If True, use 16-char hex strings (library style). If False, use 16-digit numeric strings (original JS style). """ if use_hex: # Library style: 8 random bytes → 16 hex chars from binascii import b2a_hex from Crypto import Random self.aes_key = b2a_hex(Random.get_random_bytes(8)).decode() self.aes_iv = b2a_hex(Random.get_random_bytes(8)).decode() else: # Original JS style: 16 random digits import random self.aes_key = ''.join([str(random.randint(0, 9)) for _ in range(16)]) self.aes_iv = ''.join([str(random.randint(0, 9)) for _ in range(16)]) def get_aes_formatted_key(self) -> str: """Get AES key in formatted string: k=<digits>&i=<digits>""" if not self.aes_key or not self.aes_iv: return "" # Router uses k= and i= not key= and iv= return f"k={self.aes_key}&i={self.aes_iv}" def hash_password(self, password: str, username: str = "admin", use_md5: bool = False) -> str: """Hash password with username prefix. Args: password: The password to hash username: The username (default: admin) use_md5: Use MD5 instead of SHA256 (for older firmware) """ combined = f"{username}{password}" if use_md5: self.hash = hashlib.md5(combined.encode()).hexdigest() else: self.hash = hashlib.sha256(combined.encode()).hexdigest() return self.hash def aes_encrypt(self, data: str) -> str: """Encrypt data with AES-CBC and return base64.""" if not self.aes_key or not self.aes_iv: raise ValueError("AES key not generated") # Key and IV are UTF-8 strings (16-digit numeric strings) key_bytes = self.aes_key.encode('utf-8') iv_bytes = self.aes_iv.encode('utf-8') cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) padded = pad(data.encode(), AES.block_size) encrypted = cipher.encrypt(padded) return base64.b64encode(encrypted).decode() def aes_decrypt(self, data: str) -> str: """Decrypt base64 AES-CBC data.""" if not self.aes_key or not self.aes_iv: raise ValueError("AES key not generated") # Key and IV are UTF-8 strings key_bytes = self.aes_key.encode('utf-8') iv_bytes = self.aes_iv.encode('utf-8') cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) encrypted = base64.b64decode(data) decrypted = unpad(cipher.decrypt(encrypted), AES.block_size) return decrypted.decode() def rsa_encrypt_chunk(self, data: str) -> str: """Encrypt a chunk of data with RSA using PKCS1v1.5 and return hex.""" if not self.rsa_n or not self.rsa_e: raise ValueError("RSA key not set") # Convert hex strings to integers n = int(self.rsa_n, 16) e = int(self.rsa_e, 16) # Create RSA key rsa_key = RSA.construct((n, e)) cipher = PKCS1_v1_5.new(rsa_key) # Encrypt with PKCS1v1.5 encrypted = cipher.encrypt(data.encode()) # Pad to key length key_len = len(self.rsa_n) result = encrypted.hex() if len(result) < key_len: result = result.zfill(key_len) return result def generate_signature(self, data_length: int, include_aes_key: bool = True) -> str: """Generate the encrypted signature. Args: data_length: Length of the encrypted data include_aes_key: Whether to include AES key in signature (first login) Returns: RSA-encrypted signature string """ if not self.hash or self.sequence is None: raise ValueError("Hash or sequence not set") # Build signature payload sig_parts = [] if include_aes_key: sig_parts.append(self.get_aes_formatted_key()) sig_parts.append(f"h={self.hash}") sig_parts.append(f"s={self.sequence + data_length}") signature = "&".join(sig_parts) print(f"DEBUG: Signature before encrypt ({len(signature)} chars): {signature[:100]}...") # Encrypt in chunks encrypted_sig = "" for i in range(0, len(signature), self.SIGNATURE_CHUNK_SIZE): chunk = signature[i:i + self.SIGNATURE_CHUNK_SIZE] encrypted_sig += self.rsa_encrypt_chunk(chunk) return encrypted_sig def rsa_encrypt_password(self, password: str, use_oaep: bool = False) -> str: """RSA encrypt the plain password for login. Uses the password-specific RSA key from /login?form=keys if available, otherwise falls back to the auth RSA key. Args: password: Plain text password use_oaep: Use OAEP padding (True) or PKCS1v1.5 (False) Returns: Hex string of encrypted password (padded to key length) """ # Use password-specific key if available rsa_n = self.pwd_rsa_n or self.rsa_n rsa_e = self.pwd_rsa_e or self.rsa_e if not rsa_n or not rsa_e: raise ValueError("RSA key not set") # Convert hex strings to integers n = int(rsa_n, 16) e = int(rsa_e, 16) # Create RSA key rsa_key = RSA.construct((n, e)) if use_oaep: cipher = PKCS1_OAEP.new(rsa_key) else: cipher = PKCS1_v1_5.new(rsa_key) # Encrypt encrypted = cipher.encrypt(password.encode()) # Pad to key length (modulus length in hex) key_len = len(rsa_n) result = encrypted.hex() if len(result) < key_len: result = result.zfill(key_len) return result def serialize_payload(self, payload: dict) -> str: """Serialize payload as URL-encoded key-value pairs. This matches the router's serialize() JavaScript function. """ from urllib.parse import quote parts = [] for key, value in payload.items(): if value is None: continue if isinstance(value, bool): value = "true" if value else "false" elif isinstance(value, (dict, list)): import json value = json.dumps(value) parts.append(f"{quote(str(key))}={quote(str(value))}") return "&".join(parts) def encrypt_for_login(self, password: str) -> Tuple[str, str]: """Encrypt password for login request. The login flow: 1. RSA encrypt the plain password 2. Create URL-encoded payload: password=<rsa_encrypted>&operation=login&confirm=true 3. AES encrypt the serialized payload 4. Generate signature with AES key info Args: password: Plain text password Returns: Tuple of (sign, data) for the login request """ # Generate fresh AES key self.generate_aes_key() print(f"DEBUG: AES key={self.aes_key}, IV={self.aes_iv}") print(f"DEBUG: AES formatted: {self.get_aes_formatted_key()}") # Hash the password (used in signature) self.hash_password(password) print(f"DEBUG: Password hash: {self.hash}") # RSA encrypt the password rsa_encrypted_password = self.rsa_encrypt_password(password) print(f"DEBUG: RSA encrypted password ({len(rsa_encrypted_password)} chars): {rsa_encrypted_password[:50]}...") # Create payload - serialized as URL-encoded format payload = { "password": rsa_encrypted_password, "operation": "login", "confirm": True } serialized = self.serialize_payload(payload) print(f"DEBUG: Serialized payload ({len(serialized)} chars): {serialized[:100]}...") # AES encrypt the serialized payload encrypted_data = self.aes_encrypt(serialized) # Generate signature (include AES key for initial login) sign = self.generate_signature(len(encrypted_data), include_aes_key=True) return sign, encrypted_data class BE3600Client: """HTTP client for BE3600 router with encryption support.""" def __init__(self, host: str, password: str, username: str = "admin"): self.host = host self.password = password self.username = username self.base_url = f"http://{host}" self.crypto = BE3600Crypto() self.stok: Optional[str] = None self.sysauth: Optional[str] = None def _get_auth_info(self) -> dict: """Get RSA key and sequence from router.""" import httpx url = f"{self.base_url}/cgi-bin/luci/;stok=/login?form=auth&operation=read" with httpx.Client(timeout=15.0) as client: resp = client.get(url) if resp.status_code == 200: data = resp.json() if data.get("success") and "data" in data: return data["data"] raise Exception("Failed to get auth info from router") def _get_keys_info(self) -> dict: """Get password RSA key from router.""" import httpx url = f"{self.base_url}/cgi-bin/luci/;stok=/login?form=keys&operation=read" with httpx.Client(timeout=15.0) as client: resp = client.get(url) if resp.status_code == 200: data = resp.json() if data.get("success") and "data" in data: return data["data"] raise Exception("Failed to get keys info from router") def login(self) -> bool: """Authenticate with the router. Returns: True if login successful, False otherwise """ import httpx import json import traceback try: # Get RSA key and sequence for signature auth_info = self._get_auth_info() key = auth_info.get("key", []) seq = auth_info.get("seq") if len(key) != 2 or seq is None: raise Exception("Invalid auth info format") print(f"DEBUG: Got auth RSA key (n={key[0][:20]}..., e={key[1]})") print(f"DEBUG: Got sequence: {seq}") # Get password RSA key keys_info = self._get_keys_info() pwd_key = keys_info.get("password", []) if len(pwd_key) == 2: print(f"DEBUG: Got password RSA key (n={pwd_key[0][:20]}..., e={pwd_key[1]})") self.crypto.set_password_rsa_key(pwd_key[0], pwd_key[1]) # Set up crypto self.crypto.set_rsa_key(key[0], key[1]) self.crypto.set_sequence(seq) # Encrypt password sign, data = self.crypto.encrypt_for_login(self.password) print(f"DEBUG: Generated sign ({len(sign)} chars): {sign[:50]}...") print(f"DEBUG: Generated data ({len(data)} chars): {data[:50]}...") # Send login request as form-urlencoded (not JSON!) url = f"{self.base_url}/cgi-bin/luci/;stok=/login?form=login" # Use httpx's built-in form encoding form_payload = { "sign": sign, "data": data } print(f"DEBUG: Sending to URL: {url}") print(f"DEBUG: Form data keys: sign={len(sign)} chars, data={len(data)} chars") with httpx.Client(timeout=15.0) as client: resp = client.post( url, data=form_payload, # httpx encodes as form-urlencoded headers={ "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Language": "en-US,en;q=0.9", "Cache-Control": "no-cache", "Content-Type": "application/x-www-form-urlencoded", "Origin": f"http://{self.host}", "Referer": f"http://{self.host}/", "X-Requested-With": "XMLHttpRequest" } ) print(f"DEBUG: Response status: {resp.status_code}") print(f"DEBUG: Response body: {resp.text[:500]}") print(f"DEBUG: Response cookies: {dict(resp.cookies)}") print(f"DEBUG: Response headers set-cookie: {resp.headers.get('set-cookie', 'none')}") if resp.status_code == 200: result = resp.json() print(f"DEBUG: Response JSON: {json.dumps(result)[:200]}...") if result.get("success"): # Response data is encrypted - decrypt it resp_data = result.get("data") if isinstance(resp_data, str): try: decrypted = self.crypto.aes_decrypt(resp_data) print(f"DEBUG: Decrypted response: {decrypted}") dec_data = json.loads(decrypted) self.stok = dec_data.get("stok") except Exception as e: print(f"DEBUG: Decrypt error: {e}") elif isinstance(resp_data, dict): self.stok = resp_data.get("stok") # Extract sysauth from cookies cookies = resp.headers.get("set-cookie", "") if "sysauth=" in cookies: import re match = re.search(r'sysauth=([^;]+)', cookies) if match: self.sysauth = match.group(1) print(f"DEBUG: STOK: {self.stok}") print(f"DEBUG: Sysauth: {self.sysauth}") return self.stok is not None or self.sysauth is not None else: print(f"DEBUG: Login failed - success=false") print(f"DEBUG: Error: {result.get('errorcode', 'unknown')}") return False except Exception as e: print(f"Login error: {e}") traceback.print_exc() return False def _make_request(self, endpoint: str, payload: dict = None, method: str = "read") -> dict: """Make an authenticated request to the router.""" import httpx if not self.stok: raise Exception("Not authenticated") url = f"{self.base_url}/cgi-bin/luci/;stok={self.stok}/{endpoint}" # Encrypt payload if needed if payload: # Add operation payload["operation"] = method # Encrypt for non-login requests if self.crypto.aes_key: import json encrypted = self.crypto.aes_encrypt(json.dumps(payload)) sign = self.crypto.generate_signature(len(encrypted), include_aes_key=False) request_data = {"sign": sign, "data": encrypted} else: request_data = payload else: request_data = {"operation": method} cookies = {} if self.sysauth: cookies["sysauth"] = self.sysauth with httpx.Client(timeout=15.0, cookies=cookies) as client: resp = client.post( url, json=request_data, headers={"Content-Type": "application/json"} ) if resp.status_code == 200: result = resp.json() if result.get("success") and "data" in result: data = result["data"] # Try to decrypt if it's a string (encrypted response) if isinstance(data, str) and self.crypto.aes_key: try: import json decrypted = self.crypto.aes_decrypt(data) return json.loads(decrypted) except Exception: pass return data if isinstance(data, dict) else {"data": data} return result raise Exception(f"Request failed: {endpoint}") def get_status(self) -> dict: """Get router status.""" return self._make_request("admin/status?form=all") def get_devices(self) -> dict: """Get connected devices.""" return self._make_request("admin/status?form=client_list")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/consigcody94/mcp-tplink-router'

If you have feedback or need assistance with the MCP directory API, please join our Discord server