from mcp.server.fastmcp import FastMCP, Image
import os
import sys
import yaml
import requests
import json
import base64
from io import BytesIO
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
import functools
import logging
from datetime import datetime
CONFIG_FILE = "config.yaml"
CONFIG_FILE_EXAMPLE = "config.yaml.example"
# Check if config file exists
if not os.path.exists(CONFIG_FILE):
print(f"Config file {CONFIG_FILE} not found. Please create it from {CONFIG_FILE_EXAMPLE}.", file=sys.stderr)
sys.exit(1)
# Load config file
with open(CONFIG_FILE) as f:
config = yaml.safe_load(f.read())
ip = config["device"]["ip"]
port = config["device"]["port"]
# Base URL for API requests
BASE_URL = f"http://{ip}:{port}/api"
# ==================== 日志配置 ====================
# 创建日志目录
LOG_DIR = "logs"
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# 配置日志 - 只输出到文件,不输出到 stdout(避免干扰 MCP 协议)
log_filename = os.path.join(LOG_DIR, f"autobot_mcp_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
# 将启动信息输出到 stderr,避免干扰 MCP 协议
print("=" * 80, file=sys.stderr)
print(f"AutoBot MCP Server 启动 - 日志文件: {log_filename}", file=sys.stderr)
print(f"设备地址: {ip}:{port}", file=sys.stderr)
print("=" * 80, file=sys.stderr)
# 同时记录到日志文件
logger.info("=" * 80)
logger.info(f"AutoBot MCP Server 启动 - 日志文件: {log_filename}")
logger.info(f"设备地址: {ip}:{port}")
logger.info("=" * 80)
# 配置请求重试策略
retry_strategy = Retry(
total=3, # 最大重试次数
backoff_factor=0.5, # 重试间隔
status_forcelist=[500, 502, 503, 504] # 需要重试的HTTP状态码
)
http_adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("http://", http_adapter)
session.mount("https://", http_adapter)
# Initialize MCP
try:
mcp = FastMCP("autobot")
except Exception as e:
print(f"Error initializing MCP: {str(e)}", file=sys.stderr)
sys.exit(1)
# ==================== 屏幕状态管理 ====================
_screen_state_cache = {"last_check": 0, "is_ready": False, "is_locked": False}
_original_screen_timeout = None # 保存原始屏幕超时设置
_keep_alive_active = False # 屏幕保持激活状态标志
def _is_screen_locked() -> bool:
"""
检测屏幕是否处于锁屏状态(通过内部 API 调用)
Returns:
bool: True if locked, False otherwise
"""
try:
logger.debug("检查屏幕锁屏状态...")
response = requests.get(f"{BASE_URL}/screenJson", timeout=5)
if response.status_code == 200:
layout_data = response.json()
# 递归检查是否包含 keyguard(锁屏标志)
def check_keyguard(data):
if isinstance(data, dict):
if "keyguard" in str(data).lower():
return True
for value in data.values():
if check_keyguard(value):
return True
elif isinstance(data, list):
for item in data:
if check_keyguard(item):
return True
return False
is_locked = check_keyguard(layout_data)
logger.debug(f"屏幕锁屏状态检查结果: {'已锁定' if is_locked else '未锁定'}")
return is_locked
else:
# 如果获取屏幕信息失败,假设未锁定以避免死锁
logger.debug(f"无法获取屏幕信息 (状态码: {response.status_code}),假设未锁定")
return False
except Exception as e:
logger.warning(f"检查屏幕锁屏状态时出错: {str(e)},假设未锁定")
return False
def _get_screen_timeout() -> int:
"""
获取当前屏幕超时时间(秒)
Returns:
int: 屏幕超时时间(秒),失败返回 -1
"""
try:
# 使用 adb shell settings 命令获取屏幕超时
result = requests.post(
f"{BASE_URL}/execCmd",
json={"timeout": 5, "value": "settings get system screen_off_timeout"},
timeout=10
)
if result.status_code == 200:
# 解析 JSON 响应
response_json = result.json()
if response_json.get("code") == 1:
timeout_ms = int(response_json.get("data", "0"))
return timeout_ms // 1000 # 转换为秒
except Exception as e:
logger.warning(f"获取屏幕超时设置失败: {str(e)}")
return -1
def _set_screen_timeout(seconds: int) -> bool:
"""
设置屏幕超时时间
Args:
seconds: 超时时间(秒),0 表示永不熄屏,最大 1800(30分钟)
Returns:
bool: 是否设置成功
"""
try:
# 限制最大值为 30 分钟(1800 秒)
timeout_ms = min(max(seconds, 0), 1800) * 1000
result = requests.post(
f"{BASE_URL}/execCmd",
json={"timeout": 5, "value": f"settings put system screen_off_timeout {timeout_ms}"},
timeout=10
)
if result.status_code == 200:
logger.info(f"屏幕超时已设置为 {timeout_ms // 1000} 秒")
return True
except Exception as e:
logger.error(f"设置屏幕超时失败: {str(e)}")
return False
def _enable_screen_keep_alive(enable: bool = True) -> bool:
"""
启用或禁用屏幕保持常亮功能
Args:
enable: True 启用常亮,False 恢复原始设置
Returns:
bool: 是否操作成功
"""
global _original_screen_timeout, _keep_alive_active
if enable:
# 保存原始设置
if _original_screen_timeout is None:
_original_screen_timeout = _get_screen_timeout()
logger.info(f"已保存原始屏幕超时设置: {_original_screen_timeout} 秒")
# 设置为 30 分钟(最大值)
success = _set_screen_timeout(1800)
if success:
_keep_alive_active = True
logger.info("✓ 屏幕保持常亮已启用(30分钟)")
# 额外保障:尝试使用 wakelock 防止系统睡眠
try:
requests.post(
f"{BASE_URL}/execCmd",
json={"timeout": 5, "value": "dumpsys power | grep 'Wake Locks'"},
timeout=10
)
logger.debug("已检查系统 Wake Lock 状态")
except Exception as e:
logger.debug(f"检查 Wake Lock 失败: {str(e)}")
return success
else:
# 恢复原始设置
if _original_screen_timeout is not None:
success = _set_screen_timeout(_original_screen_timeout)
if success:
logger.info(f"✓ 屏幕超时已恢复为原始设置: {_original_screen_timeout} 秒")
_keep_alive_active = False
return success
else:
logger.warning("没有保存的原始屏幕超时设置")
_keep_alive_active = False
return False
def _keep_screen_awake():
"""
定期唤醒屏幕以防止熄灭(轻量级心跳)
通过轻微的屏幕操作保持屏幕活跃
"""
if _keep_alive_active:
try:
# 使用更温和的方式:在屏幕中心附近点击一个极小的区域
# 这样不会影响用户操作,但能保持屏幕活跃
# 使用极小的触摸操作(点击屏幕边缘)
requests.post(
f"{BASE_URL}/click",
json={"x": 0.01, "y": 0.99}, # 屏幕左下角,几乎不会影响操作
timeout=5
)
logger.debug("执行屏幕保持活跃操作(边缘轻触)")
except Exception as e:
logger.debug(f"屏幕保持活跃操作失败: {str(e)}")
def _is_notification_panel_open(ui_data: dict) -> bool:
"""
检查通知面板是否打开
Args:
ui_data: UI 布局数据
Returns:
bool: True if notification panel is open
"""
try:
# 检查是否有通知面板的特征元素
# 通知面板通常包含 "notification"、"状态栏" 等关键词
def check_node(node):
if isinstance(node, dict):
text = node.get("text", "").lower()
content_desc = node.get("contentDescription", "").lower()
# 检查常见的通知面板标识
if any(keyword in text or keyword in content_desc
for keyword in ["clear", "通知", "notification", "settings"]):
return True
# 检查是否是系统UI且占据整个屏幕
if node.get("class", "").startswith("android.widget"):
bounds = node.get("bounds", {})
if bounds:
h = bounds.get("bottom", 0) - bounds.get("top", 0)
# 如果高度超过屏幕的70%,可能是通知面板
# 需要配合其他判断条件
pass
# 递归检查子节点
children = node.get("children", [])
if isinstance(children, list):
return any(check_node(child) for child in children)
return False
return check_node(ui_data)
except Exception as e:
logger.debug(f"检查通知面板状态时出错: {str(e)}")
return False
def _ensure_screen_ready(max_retries: int = 2, allow_back_key: bool = False) -> bool:
"""
确保屏幕处于可操作状态
1. 唤醒屏幕(按电源键)
2. 解锁(滑动解锁)
3. 关闭通知面板(仅在必要时按返回键)
Args:
max_retries: 最大重试次数
allow_back_key: 是否允许使用返回键(默认False,避免关闭应用)
Returns:
bool: True if screen is ready, False otherwise
"""
global _screen_state_cache
current_time = time.time()
# 如果在10秒内已经检查过且屏幕就绪,跳过检查
if current_time - _screen_state_cache["last_check"] < 10.0 and _screen_state_cache["is_ready"]:
logger.debug("屏幕状态缓存:屏幕已就绪,跳过检查")
return True
logger.info("开始确保屏幕就绪...")
for attempt in range(max_retries):
try:
logger.debug(f"屏幕准备尝试 {attempt + 1}/{max_retries}")
# 执行屏幕保持活跃操作(轻量级,不会干扰)
# 放在屏幕操作之前,确保屏幕保持常亮
_keep_screen_awake()
time.sleep(0.1) # 等待心跳操作完成
# 步骤1: 唤醒屏幕(按一次电源键)
# 如果屏幕是灭的,按一次会亮屏;如果已经是亮的,按一次可能不会有影响或会锁屏
# 所以我们使用更可靠的方法:使用 turn_screen_on API
logger.debug(" 步骤1: 唤醒屏幕")
try:
requests.get(f"{BASE_URL}/turnScreenOn", timeout=5)
logger.debug(" 使用 turnScreenOn API")
except Exception as e:
# 如果 turnScreenOn 失败,尝试按电源键
logger.debug(f" turnScreenOn 失败 ({str(e)}),尝试按电源键")
requests.post(f"{BASE_URL}/pressKeyCode", json={"value": 26}, timeout=5)
time.sleep(0.8)
# 步骤2: 检查并解锁
was_locked = _is_screen_locked()
if was_locked:
logger.debug(" 步骤2: 屏幕已锁定,执行滑动解锁")
# 从下往上滑动解锁(使用百分比更兼容)
requests.post(
f"{BASE_URL}/swipe",
json={"x1": 0.5, "y1": 0.85, "x2": 0.5, "y2": 0.3, "duration": 300},
timeout=5
)
time.sleep(0.8)
else:
logger.debug(" 步骤2: 屏幕未锁定")
# 步骤3: 尝试关闭通知面板(仅在明确需要时使用返回键)
# 只有在以下情况才使用返回键:
# 1. 允许使用返回键(allow_back_key=True)
# 2. 刚才屏幕是锁定的(说明在锁屏界面,返回键不会关闭应用)
if allow_back_key or was_locked:
logger.debug(" 步骤3: 尝试关闭通知面板")
try:
requests.post(f"{BASE_URL}/pressKeyCode", json={"value": 4}, timeout=5) # BACK
time.sleep(0.3)
# 再按一次确保关闭
requests.post(f"{BASE_URL}/pressKeyCode", json={"value": 4}, timeout=5)
time.sleep(0.3)
logger.debug(" 已按两次返回键")
except Exception as e:
logger.debug(f" 按返回键时出错: {str(e)}")
else:
logger.debug(" 步骤3: 跳过返回键操作(避免关闭运行中的应用)")
# 最后检查:验证是否成功解锁
if not _is_screen_locked():
logger.info("✓ 屏幕准备成功:屏幕已就绪")
_screen_state_cache["last_check"] = current_time
_screen_state_cache["is_ready"] = True
_screen_state_cache["is_locked"] = False
return True
else:
logger.warning(f"✗ 屏幕准备失败:屏幕仍然锁定")
except Exception as e:
logger.error(f"屏幕准备出错 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
if attempt == max_retries - 1:
# 最后一次尝试失败,更新状态
_screen_state_cache["is_ready"] = False
_screen_state_cache["is_locked"] = True
time.sleep(0.5)
logger.error("✗ 屏幕准备最终失败")
return False
def require_screen_ready(func):
"""
装饰器:确保屏幕处于可操作状态后再执行函数
适用于所有需要屏幕交互的工具函数
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
logger.debug(f"[@require_screen_ready] 拦截函数调用: {func_name}")
# 在执行操作前确保屏幕准备就绪
_ensure_screen_ready()
logger.debug(f"[@require_screen_ready] 执行函数: {func_name}")
# 执行原函数
result = func(*args, **kwargs)
logger.debug(f"[@require_screen_ready] 函数执行完成: {func_name}")
return result
return wrapper
def make_request(method: str, url: str, **kwargs) -> requests.Response:
"""
统一的请求处理函数,包含重试和错误处理
"""
max_retries = 3
retry_delay = 1 # 秒
# 记录请求开始
logger.debug(f"API 请求开始: {method} {url}")
if kwargs.get('json'):
logger.debug(f" 请求体 (JSON): {kwargs['json']}")
if kwargs.get('params'):
logger.debug(f" 查询参数: {kwargs['params']}")
for attempt in range(max_retries):
try:
response = session.request(method, url, timeout=10, **kwargs)
response.raise_for_status()
logger.debug(f"API 请求成功: {method} {url} - 状态码: {response.status_code}")
return response
except requests.exceptions.RequestException as e:
logger.warning(f"API 请求失败 (尝试 {attempt + 1}/{max_retries}): {method} {url} - 错误: {str(e)}")
if attempt == max_retries - 1:
logger.error(f"API 请求最终失败: {method} {url} - 错误: {str(e)}")
raise e
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
raise Exception("Max retries exceeded")
@mcp.tool()
def get_packages() -> str:
"""
Get all installed packages on the device
Returns:
str: A list of all installed packages on the device as a string
"""
logger.info("[TOOL] get_packages 被调用")
try:
response = make_request("GET", f"{BASE_URL}/getAllPackage")
packages = response.json()
# 如果返回的是字典且包含data字段
if isinstance(packages, dict) and "data" in packages:
# 遍历data列表中的每个包信息
for package in packages["data"]:
# 如果包信息中有icon字段,则移除
if isinstance(package, dict) and "icon" in package:
del package["icon"]
# 如果直接返回的是列表
elif isinstance(packages, list):
# 遍历列表中的每个包信息
for package in packages:
# 如果包信息中有icon字段,则移除
if isinstance(package, dict) and "icon" in package:
del package["icon"]
package_count = len(packages.get("data", packages)) if isinstance(packages, dict) else len(packages)
logger.info(f"[TOOL] get_packages 成功:获取到 {package_count} 个应用包")
return json.dumps(packages, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"[TOOL] get_packages 失败:{str(e)}")
return f"Error: {str(e)}"
@mcp.tool()
def execute_adb_shell_command(command: str) -> str:
"""Executes an ADB command and returns the output or an error.
Args:
command (str): The ADB shell command to execute
Returns:
str: The output of the ADB command
"""
logger.info(f"[TOOL] execute_adb_shell_command 被调用 - 命令: {command}")
try:
payload = {
"timeout": 1,
"value": command
}
response = requests.post(f"{BASE_URL}/execCmd", json=payload, timeout=10)
if response.status_code == 200:
result = response.text
logger.debug(f"[TOOL] execute_adb_shell_command 成功 - 命令: {command}, 结果长度: {len(result)}")
return result
else:
logger.error(f"[TOOL] execute_adb_shell_command 失败 - 命令: {command}, 状态码: {response.status_code}")
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
logger.error(f"[TOOL] execute_adb_shell_command 异常 - 命令: {command}, 错误: {str(e)}")
return f"Error: {str(e)}"
def _get_uilayout_internal() -> str:
"""
内部函数:获取 UI 布局,不触发屏幕准备检查
Returns:
str: UI 布局 JSON 字符串或错误信息
"""
try:
response = requests.get(f"{BASE_URL}/screenJson", timeout=10)
if response.status_code == 200:
ui_data = response.json()
# 计算可点击元素数量
def count_clickable(items):
count = 0
if isinstance(items, dict):
if items.get("clickable"):
count += 1
for value in items.values():
count += count_clickable(value)
elif isinstance(items, list):
for item in items:
count += count_clickable(item)
return count
clickable_count = count_clickable(ui_data)
logger.debug(f"[内部] get_uilayout 成功 - 找到 {clickable_count} 个可点击元素")
return json.dumps(ui_data, ensure_ascii=False, indent=2)
else:
logger.error(f"[内部] get_uilayout 失败 - 状态码: {response.status_code}")
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
logger.error(f"[内部] get_uilayout 异常 - 错误: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool()
def get_uilayout() -> str:
"""
Retrieves information about clickable elements in the current UI.
Returns a formatted string containing details about each clickable elements,
including its text, content description, bounds, and center coordinates.
Returns:
str: A formatted list of clickable elements with their properties
"""
logger.debug("[TOOL] get_uilayout 被调用")
# 先确保屏幕就绪,但使用内部的 _get_uilayout_internal 来避免重复调用 /screenJson
_ensure_screen_ready()
return _get_uilayout_internal()
@mcp.tool()
@require_screen_ready
def get_screenshot() -> str:
"""Takes a screenshot of the device and returns it as base64 encoded string.
Returns:
str: Base64 encoded PNG image data
"""
try:
response = requests.get(f"{BASE_URL}/screenShotBase64")
if response.status_code == 200:
# Return the base64 image data directly
return response.text
else:
# Return an error message
return f"Error: Failed to capture screenshot (HTTP {response.status_code})"
except Exception as e:
# Return an error message
return f"Error: {str(e)}"
@mcp.tool()
def get_package_action_intents(package_name: str) -> list[str]:
"""
Get all non-data actions from Activity Resolver Table for a package
Args:
package_name (str): The name of the package to get actions for
Returns:
list[str]: A list of all non-data actions from the Activity Resolver Table for the package
"""
try:
# First get the package info
response = requests.get(f"{BASE_URL}/getPackageInfo", params={"packageName": package_name})
if response.status_code == 200:
package_info = response.json()
# Then get the start activity
start_response = requests.get(f"{BASE_URL}/getStartActivity", params={"packageName": package_name})
if start_response.status_code == 200:
start_activity = start_response.text
# Return both package info and start activity
return [json.dumps(package_info, ensure_ascii=False), start_activity]
else:
return [f"Error getting start activity: {start_response.status_code} - {start_response.text}"]
else:
return [f"Error getting package info: {response.status_code} - {response.text}"]
except Exception as e:
return [f"Error: {str(e)}"]
@mcp.tool()
@require_screen_ready
def click(x: float, y: float) -> str:
"""
Click at the specified coordinates on the screen
Args:
x (float): X coordinate (can be absolute or relative 0-1)
y (float): Y coordinate (can be absolute or relative 0-1)
Returns:
str: Success or error message
"""
logger.info(f"[TOOL] click 被调用 - 坐标: ({x}, {y})")
try:
payload = {"x": x, "y": y}
response = requests.post(f"{BASE_URL}/click", json=payload, timeout=10)
if response.status_code == 200:
logger.info(f"[TOOL] click 成功 - 坐标: ({x}, {y})")
return "Click successful"
else:
logger.error(f"[TOOL] click 失败 - 状态码: {response.status_code}, 响应: {response.text}")
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
logger.error(f"[TOOL] click 异常 - 坐标: ({x}, {y}), 错误: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def long_click(x: float, y: float) -> str:
"""
Long click at the specified coordinates on the screen
Args:
x (float): X coordinate (can be absolute or relative 0-1)
y (float): Y coordinate (can be absolute or relative 0-1)
Returns:
str: Success or error message
"""
try:
payload = {"x": x, "y": y}
response = requests.post(f"{BASE_URL}/longClick", json=payload)
if response.status_code == 200:
return "Long click successful"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def swipe(x1: float, y1: float, x2: float, y2: float, duration: int = 500) -> str:
"""
Swipe from one point to another on the screen
Args:
x1 (float): Start X coordinate (can be absolute or relative 0-1)
y1 (float): Start Y coordinate (can be absolute or relative 0-1)
x2 (float): End X coordinate (can be absolute or relative 0-1)
y2 (float): End Y coordinate (can be absolute or relative 0-1)
duration (int): Duration of the swipe in milliseconds (default: 500)
Returns:
str: Success or error message
"""
try:
payload = {"x1": x1, "y1": y1, "x2": x2, "y2": y2, "duration": duration}
response = requests.post(f"{BASE_URL}/swipe", json=payload)
if response.status_code == 200:
return "Swipe successful"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def input_text(text: str) -> str:
"""
Input text on the device
Args:
text (str): Text to input
Returns:
str: Success or error message
"""
try:
payload = {"value": text}
response = requests.post(f"{BASE_URL}/inputText", json=payload)
if response.status_code == 200:
return "Text input successful"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def press_key(key_code: int) -> str:
"""
Press a key on the device
Args:
key_code (int): Key code to press (e.g., 3 for HOME, 4 for BACK)
Returns:
str: Success or error message
"""
try:
payload = {"value": key_code}
response = requests.post(f"{BASE_URL}/pressKeyCode", json=payload)
if response.status_code == 200:
return "Key press successful"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def start_app(package_name: str) -> str:
"""
Start an application by package name
Args:
package_name (str): Package name of the application to start
Returns:
str: Success or error message
"""
logger.info(f"[TOOL] start_app 被调用 - 应用包名: {package_name}")
try:
response = requests.get(f"{BASE_URL}/startPackage", params={"packageName": package_name}, timeout=10)
if response.status_code == 200:
logger.info(f"[TOOL] start_app 成功 - 应用: {package_name}")
return f"Application {package_name} started successfully"
else:
logger.error(f"[TOOL] start_app 失败 - 应用: {package_name}, 状态码: {response.status_code}")
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
logger.error(f"[TOOL] start_app 异常 - 应用: {package_name}, 错误: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool()
def stop_app(package_name: str) -> str:
"""
Stop an application by package name
Args:
package_name (str): Package name of the application to stop
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/stopPackage", params={"packageName": package_name})
if response.status_code == 200:
return f"Application {package_name} stopped successfully"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_device_info() -> str:
"""
Get detailed information about the device
Returns:
str: Device information as a formatted string
"""
try:
response = requests.get(f"{BASE_URL}/getSystemInfo")
if response.status_code == 200:
device_info = response.json()
return json.dumps(device_info, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_screen_info() -> str:
"""
Get information about the screen (dimensions, rotation)
Returns:
str: Screen information as a formatted string
"""
try:
response = requests.get(f"{BASE_URL}/screenInfo")
if response.status_code == 200:
screen_info = response.json()
return json.dumps(screen_info, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 联系人管理 ====================
@mcp.tool()
def get_contacts(number: str = "*") -> str:
"""
Get all contacts or a specific contact by phone number
Args:
number (str): Phone number to search for, or "*" to get all contacts (default)
Returns:
str: Contact information as JSON
"""
try:
response = requests.get(f"{BASE_URL}/getAllContact", params={"number": number})
if response.status_code == 200:
contacts = response.json()
return json.dumps(contacts, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def delete_contact(number: str = "*") -> str:
"""
Delete a contact or all contacts
Warning: Use with caution, especially when deleting all contacts
Args:
number (str): Phone number of contact to delete, or "*" to delete all contacts (default)
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/deleteContact", params={"number": number})
if response.status_code == 200:
result = response.json()
return f"Deleted {result.get('data', '0')} contact(s)"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def insert_contact(name: str, number: str) -> str:
"""
Insert a new contact
Args:
name (str): Contact name
number (str): Contact phone number
Returns:
str: Success or error message
"""
try:
payload = {"name": name, "number": number}
response = requests.post(f"{BASE_URL}/insertContact", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Contact '{name}' inserted successfully"
else:
return "Failed to insert contact"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 短信管理 ====================
@mcp.tool()
def get_sms(number: str = "*") -> str:
"""
Get all SMS messages or messages from a specific number
Args:
number (str): Phone number to get SMS from, or "*" to get all SMS (default)
Returns:
str: SMS messages as JSON
"""
try:
response = requests.get(f"{BASE_URL}/getAllSms", params={"number": number})
if response.status_code == 200:
sms_data = response.json()
return json.dumps(sms_data, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def send_sms(phone_number: str, message: str) -> str:
"""
Send an SMS message
Args:
phone_number (str): Recipient phone number
message (str): Message content
Returns:
str: Success or error message
"""
try:
payload = {"phoneNumber": phone_number, "value": message}
response = requests.post(f"{BASE_URL}/sendSms", json=payload)
if response.status_code == 200:
return f"SMS sent to {phone_number}"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 文件操作 ====================
@mcp.tool()
def list_files(path: str) -> str:
"""
List files in a directory
Args:
path (str): Directory path to list (e.g., "/sdcard")
Returns:
str: File list as JSON
"""
try:
payload = {"value": path}
response = requests.post(f"{BASE_URL}/listFile", json=payload)
if response.status_code == 200:
files = response.json()
return json.dumps(files, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def delete_file(path: str) -> str:
"""
Delete a file or directory
Warning: This will delete the file/directory and all its contents
Args:
path (str): File or directory path to delete
Returns:
str: Success or error message
"""
try:
payload = {"value": path}
response = requests.post(f"{BASE_URL}/delFile", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"File/directory '{path}' deleted successfully"
else:
return "Failed to delete file/directory"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_file_url(path: str) -> str:
"""
Get the download URL for a file
Args:
path (str): File path to download
Returns:
str: Download URL or error message
"""
try:
import urllib.parse
encoded_path = urllib.parse.quote(path)
url = f"{BASE_URL}/download?path={encoded_path}"
return f"Download URL: {url}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 剪切板操作 ====================
@mcp.tool()
def set_clipboard_text(text: str) -> str:
"""
Set clipboard text
Args:
text (str): Text to set in clipboard
Returns:
str: Success or error message
"""
try:
payload = {"value": text}
response = requests.post(f"{BASE_URL}/setClipText", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Clipboard text set successfully"
else:
return "Failed to set clipboard text"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_clipboard_text() -> str:
"""
Get clipboard text
Returns:
str: Clipboard content or error message
"""
try:
response = requests.get(f"{BASE_URL}/getClipText")
if response.status_code == 200:
result = response.json()
return result.get("data", "")
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 输入操作 ====================
@mcp.tool()
@require_screen_ready
def input_char(text: str) -> str:
"""
Input ASCII characters using key simulation
Use this when input_text doesn't work in certain situations
Args:
text (str): ASCII text to input
Returns:
str: Success or error message
"""
try:
payload = {"value": text}
response = requests.post(f"{BASE_URL}/inputChar", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Character input successful"
else:
return "Failed to input characters"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def clear_text() -> str:
"""
Clear text in the current input field
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/clearText")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Text cleared successfully"
else:
return "Failed to clear text"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def press(x: float, y: float, duration: int = 1000) -> str:
"""
Long press at specified coordinates for a specific duration
Args:
x (float): X coordinate (can be absolute or relative 0-1)
y (float): Y coordinate (can be absolute or relative 0-1)
duration (int): Duration of press in milliseconds (default: 1000)
Returns:
str: Success or error message
"""
try:
payload = {"x": x, "y": y, "duration": duration}
response = requests.post(f"{BASE_URL}/press", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Press successful at ({x}, {y}) for {duration}ms"
else:
return "Failed to press"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 手势操作 ====================
@mcp.tool()
@require_screen_ready
def gesture(duration: int, points: list) -> str:
"""
Execute a single-finger gesture
Args:
duration (int): Duration of gesture in milliseconds
points (list): List of points [{"x": 100, "y": 200}, {"x": 150, "y": 250}, ...]
Returns:
str: Success or error message
"""
try:
payload = {"duration": duration, "points": points}
response = requests.post(f"{BASE_URL}/gesture", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Gesture executed successfully"
else:
return "Failed to execute gesture"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def gestures(gestures_data: list) -> str:
"""
Execute multi-finger gestures
Args:
gestures_data (list): List of gesture objects, each with:
- delay (int): Delay before starting this gesture
- duration (int): Duration of gesture in milliseconds
- points (list): List of points for the gesture
Returns:
str: Success or error message
"""
try:
response = requests.post(f"{BASE_URL}/gestures", json=gestures_data)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Multi-finger gestures executed successfully"
else:
return "Failed to execute multi-finger gestures"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 通话功能 ====================
@mcp.tool()
def call_phone(number: str) -> str:
"""
Make a phone call (SIM card 1 only)
Args:
number (str): Phone number to call
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/callPhone", params={"number": number})
if response.status_code == 200:
return f"Calling {number}"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def end_call() -> str:
"""
End the current phone call
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/endCall")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Call ended successfully"
else:
return "Failed to end call"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 屏幕控制 ====================
@mcp.tool()
def turn_screen_off() -> str:
"""
Turn off the screen (device remains active for control/casting)
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/turnScreenOff")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Screen turned off successfully"
else:
return "Failed to turn off screen"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def turn_screen_on() -> str:
"""
Turn on the screen
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/turnScreenOn")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Screen turned on successfully"
else:
return "Failed to turn on screen"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def start_screen_recording(limit: int = 180) -> str:
"""
Start screen recording
Args:
limit (int): Maximum recording duration in seconds (default: 180)
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/startRecoreScreen", params={"limit": limit})
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Screen recording started (max {limit}s), will save to /sdcard/screen.mp4"
else:
return "Failed to start screen recording"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def stop_screen_recording() -> str:
"""
Stop screen recording
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/stopRecoreScreen")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Screen recording stopped, saved to /sdcard/screen.mp4"
else:
return "Failed to stop screen recording"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 音乐播放 ====================
@mcp.tool()
def play_music(url: str) -> str:
"""
Play music from a URL
Warning: This may fail on Android 10 and below
Args:
url (str): URL of the audio file to play
Returns:
str: Success or error message
"""
try:
payload = {"value": url}
response = requests.post(f"{BASE_URL}/playMusic", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Playing music from {url}"
else:
return "Failed to play music"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def stop_music() -> str:
"""
Stop playing music
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/stopMusic")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Music stopped successfully"
else:
return "Failed to stop music"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 脚本执行 ====================
@mcp.tool()
def execute_script(script: str = None, path: str = None, delay: int = 0,
interval: int = 0, loop_times: int = 1) -> str:
"""
Execute an AutoX.js script
Args:
script (str): Script content to execute (optional)
path (str): Local script path on device (optional, takes priority over script)
delay (int): Delay before execution in milliseconds (default: 0)
interval (int): Interval between loop iterations in milliseconds (default: 0)
loop_times (int): Number of times to loop (default: 1, 0 for infinite)
Returns:
str: Success or error message
"""
try:
payload = {
"action": "exec",
"delay": delay,
"interval": interval,
"loopTimes": loop_times
}
if path:
payload["path"] = path
elif script:
payload["script"] = script
else:
return "Error: Either script or path must be provided"
response = requests.post(f"{BASE_URL}/execScript", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Script executed successfully"
else:
return "Failed to execute script"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def stop_all_scripts() -> str:
"""
Stop all running AutoX.js scripts
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/stopAllScript")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "All scripts stopped successfully"
else:
return "Failed to stop scripts"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 设备名称管理 ====================
@mcp.tool()
def set_device_name(name: str) -> str:
"""
Set a display name for the device
Note: This doesn't change the system device name, only the AutoBot display name
Args:
name (str): Display name for the device
Returns:
str: Success or error message
"""
try:
payload = {"value": name}
response = requests.post(f"{BASE_URL}/setDisplayName", json=payload)
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Device name set to '{name}'"
else:
return "Failed to set device name"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_device_name() -> str:
"""
Get the display name of the device
Returns:
str: Device display name or error message
"""
try:
response = requests.get(f"{BASE_URL}/getDisplayName")
if response.status_code == 200:
result = response.json()
return result.get("data", "")
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 应用管理扩展 ====================
@mcp.tool()
def get_top_activity() -> str:
"""
Get information about the current top activity
Returns:
str: Top activity information as JSON
"""
try:
response = requests.get(f"{BASE_URL}/getTopActivity")
if response.status_code == 200:
activity_info = response.json()
return json.dumps(activity_info, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def clear_app_data(package_name: str) -> str:
"""
Clear application data for a package
Warning: This will delete all app data and cannot be undone
Args:
package_name (str): Package name of the app
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/clearPackage", params={"packageName": package_name})
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return f"Data cleared for package '{package_name}'"
else:
return "Failed to clear app data"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 屏幕信息扩展 ====================
@mcp.tool()
def get_screen_rotation() -> str:
"""
Get the screen rotation
Returns:
str: Screen rotation as JSON (0=portrait, 1=landscape-90, 2=inverted, 3=landscape-270)
"""
try:
response = requests.get(f"{BASE_URL}/screenRotation")
if response.status_code == 200:
rotation_info = response.json()
return json.dumps(rotation_info, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
@require_screen_ready
def get_screen_xml(is_wait: bool = True) -> str:
"""
Get screen layout in XML format
Args:
is_wait (bool): Whether to wait for layout (default: true)
Returns:
str: Screen layout as XML
"""
try:
params = {"isWait": 0 if not is_wait else 1}
response = requests.get(f"{BASE_URL}/screenXml", params=params)
if response.status_code == 200:
xml_data = response.json()
return xml_data.get("data", "")
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 设备信息扩展 ====================
@mcp.tool()
def get_device_id() -> str:
"""
Get the device ID
Returns:
str: Device ID
"""
try:
response = requests.get(f"{BASE_URL}/getDeviceId")
if response.status_code == 200:
device_id = response.json()
return json.dumps(device_id, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_device_ip() -> str:
"""
Get all IP addresses of the device
Returns:
str: IP addresses as JSON
"""
try:
response = requests.get(f"{BASE_URL}/getIp")
if response.status_code == 200:
ip_data = response.json()
return json.dumps(ip_data, ensure_ascii=False, indent=2)
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_version() -> str:
"""
Get the AutoBot version number
Returns:
str: Version number
"""
try:
response = requests.get(f"{BASE_URL}/version")
if response.status_code == 200:
return response.text
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def hello() -> str:
"""
Test connection to AutoBot server
Returns:
str: Server response
"""
try:
response = requests.get(f"{BASE_URL}/hello")
if response.status_code == 200:
return response.text
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
# ==================== 安全模式 ====================
@mcp.tool()
def turn_safe_mode_on() -> str:
"""
Turn on safe mode
In safe mode, AutoBot cannot access screen layout (JSON/XML) or receive notifications
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/turnSafeModeOn")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Safe mode turned on"
else:
return "Failed to turn on safe mode"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def turn_safe_mode_off() -> str:
"""
Turn off safe mode
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/turnSafeModeOff")
if response.status_code == 200:
result = response.json()
if result.get("data") == "1":
return "Safe mode turned off"
else:
return "Failed to turn off safe mode"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def is_safe_mode() -> str:
"""
Check if safe mode is enabled
Returns:
str: Safe mode status (1=enabled, 0=disabled)
"""
try:
response = requests.get(f"{BASE_URL}/isSafeMode")
if response.status_code == 200:
result = response.json()
mode = result.get("data", "0")
return f"Safe mode: {'enabled' if mode == '1' else 'disabled'}"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def enable_screen_keep_alive() -> str:
"""
启用屏幕保持常亮功能
将屏幕超时设置为 30 分钟,并保存原始设置
Returns:
str: 操作结果
"""
try:
if _enable_screen_keep_alive(True):
return "屏幕保持常亮已启用(30分钟)"
else:
return "启用屏幕常亮失败"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def disable_screen_keep_alive() -> str:
"""
禁用屏幕保持常亮功能
恢复原始的屏幕超时设置
Returns:
str: 操作结果
"""
try:
if _enable_screen_keep_alive(False):
return f"屏幕超时已恢复为原始设置"
else:
return "恢复屏幕超时失败"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def get_screen_timeout() -> str:
"""
获取当前屏幕超时设置
Returns:
str: 当前屏幕超时时间(秒)
"""
try:
timeout = _get_screen_timeout()
if timeout >= 0:
return f"当前屏幕超时: {timeout} 秒"
else:
return "无法获取屏幕超时设置"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def set_screen_timeout(seconds: int) -> str:
"""
设置屏幕超时时间
Args:
seconds (int): 超时时间(秒)
- 0: 永不熄屏(部分设备可能不支持)
- 15-1800: 15秒到30分钟
Note: 实际最大值会被限制为 1800 秒(30分钟)
Returns:
str: 操作结果
"""
try:
if _set_screen_timeout(seconds):
actual_timeout = min(max(seconds, 0), 1800)
return f"屏幕超时已设置为 {actual_timeout} 秒"
else:
return "设置屏幕超时失败"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def exit_service() -> str:
"""
Exit the AutoBot service
Warning: This will terminate the AutoBot server
Returns:
str: Success or error message
"""
try:
response = requests.get(f"{BASE_URL}/exit")
if response.status_code == 200:
return "AutoBot service exited"
else:
return f"Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error: {str(e)}"
def main():
"""
Main entry point for the autobot-mcp command.
This function is called when the 'autobot-mcp' command is executed.
"""
# 启动时自动启用屏幕常亮功能
logger.info("正在启用屏幕常亮功能...")
_enable_screen_keep_alive(True)
try:
mcp.run(transport="stdio")
except KeyboardInterrupt:
logger.info("正在关闭...")
# 恢复原始屏幕超时设置
_enable_screen_keep_alive(False)
print("\nShutting down gracefully...", file=sys.stderr)
sys.exit(0)
except Exception as e:
# 发生异常时也要恢复原始设置
_enable_screen_keep_alive(False)
print(f"Error running MCP: {str(e)}", file=sys.stderr)
sys.exit(1)
finally:
# 确保恢复原始设置
_enable_screen_keep_alive(False)
if __name__ == "__main__":
main()