get_ble_readings
Retrieve historical temperature and humidity data from BLE sensors. Analyze trends by specifying device ID, time range, and result limit.
Instructions
查询蓝牙传感器的历史温湿度读数,用于趋势分析。
Args:
did: BLE 设备ID
hours: 查询最近多少小时的数据,默认 24
limit: 最大返回条数,默认 100Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| did | Yes | ||
| hours | No | ||
| limit | No |
Implementation Reference
- app/services/ble_service.py:65-84 (handler)The core business logic for get_ble_readings: queries BLESensorReading records for a given device DID within a time window, ordered by recorded_at descending, limited by count. Returns list of dicts with 'values' and 'recorded_at'.
@staticmethod def get_readings(did: str, hours: int = 24, limit: int = 200) -> list[dict]: device = BLEDevice.query.filter_by(did=did).first() if not device: raise ValueError(f"BLE 设备 {did} 未找到") since = datetime.now(timezone.utc) - timedelta(hours=hours) query = BLESensorReading.query.filter( BLESensorReading.ble_device_id == device.id, BLESensorReading.recorded_at >= since, ).order_by(BLESensorReading.recorded_at.desc()) readings = query.limit(limit).all() return [ { "values": r.values, "recorded_at": r.recorded_at.isoformat() if r.recorded_at else None, } for r in readings ] - mcp_server/server.py:174-183 (registration)MCP tool registration: defines the 'get_ble_readings' tool with parameters 'did', 'hours' (default 24), and 'limit' (default 100). Calls the backend API endpoint /ble/devices/{did}/readings with query params.
@mcp.tool() async def get_ble_readings(did: str, hours: int = 24, limit: int = 100) -> dict: """查询蓝牙传感器的历史温湿度读数,用于趋势分析。 Args: did: BLE 设备ID hours: 查询最近多少小时的数据,默认 24 limit: 最大返回条数,默认 100 """ return await _request("GET", f"/ble/devices/{quote(did)}/readings", params={"hours": hours, "limit": limit}) - app/api/ble.py:75-86 (handler)Flask route handler for GET /ble/devices/<did>/readings. Extracts 'hours' and 'limit' query params, calls BLEService.get_readings, returns success/error JSON response.
@ble_bp.route("/ble/devices/<path:did>/readings", methods=["GET"]) @auth_required def get_ble_readings(did): hours = request.args.get("hours", 24, type=int) limit = request.args.get("limit", 200, type=int) try: readings = BLEService.get_readings(did, hours=hours, limit=limit) return success(data=readings) except ValueError as e: return error(str(e), 404) - app/services/ble_service.py:78-182 (schema)Return type schema: each reading contains 'values' (dict of sensor values like temperature/humidity) and 'recorded_at' (ISO format timestamp).
return [ { "values": r.values, "recorded_at": r.recorded_at.isoformat() if r.recorded_at else None, } for r in readings ] @staticmethod def ingest_reading(did: str, values: dict) -> dict: device = BLEDevice.query.filter_by(did=did, is_enabled=True).first() if not device: raise ValueError(f"BLE 设备 {did} 未找到或已禁用") reading = BLESensorReading( ble_device_id=device.id, values=values, ) db.session.add(reading) device.last_seen_at = datetime.now(timezone.utc) db.session.commit() BLEService._check_automation_rules(device.user_id, device.did, values) return {"device_id": device.id, "values": values} @staticmethod def refresh_bindkey(user_id: int, did: str, bindkey: str | None = None) -> dict: device = BLEDevice.query.filter_by(user_id=user_id, did=did).first() if not device: raise ValueError(f"BLE 设备 {did} 未找到") if not bindkey: bindkey = BLEService._fetch_bindkey(user_id, did) device.bindkey = bindkey db.session.commit() return {"did": did, "bindkey": bindkey} @staticmethod def get_all_enabled_devices() -> list[dict]: devices = BLEDevice.query.filter_by(is_enabled=True).all() return [ { "did": d.did, "mac_address": d.mac_address, "bindkey": d.bindkey, "model": d.model, "capabilities": d.capabilities, } for d in devices ] @staticmethod def _fetch_bindkey(user_id: int, did: str) -> str | None: try: api = api_pool.get_api(user_id) result = api._request("/v2/device/bltconfig", {"did": did}) bindkey = result.get("result", {}).get("bindkey") or result.get("result", {}).get("ble_key") if bindkey: logger.info("成功从云端获取 bindkey: did=%s", did) return bindkey logger.warning("云端返回的 bindkey 为空: did=%s, response=%s", did, result) except Exception as e: logger.warning("从云端获取 bindkey 失败: did=%s, error=%s", did, e) return None @staticmethod def _get_latest_reading(ble_device_id: int) -> dict | None: reading = ( BLESensorReading.query.filter_by(ble_device_id=ble_device_id) .order_by(BLESensorReading.recorded_at.desc()) .first() ) if not reading: return None return { "values": reading.values, "recorded_at": reading.recorded_at.isoformat() if reading.recorded_at else None, } @staticmethod def _check_automation_rules(user_id: int, did: str, values: dict): from app.models.automation_rule import AutomationRule rules = AutomationRule.query.filter_by( user_id=user_id, trigger_type="ble_sensor", is_enabled=True ).all() for rule in rules: cfg = rule.trigger_config if cfg.get("did") != did: continue metric = cfg.get("metric") operator = cfg.get("operator") threshold = cfg.get("threshold") cooldown = cfg.get("cooldown_seconds", 300) if metric not in values: continue actual = values[metric] if not BLEService._evaluate_condition(actual, operator, threshold): continue - mcp_server/server.py:23-27 (helper)Helper _request function that makes authenticated HTTP requests to the backend API, used by the MCP tool to delegate to the Flask endpoint.
async def _request(method: str, path: str, *, json_data: dict | None = None, params: dict | None = None): async with httpx.AsyncClient(timeout=30) as client: resp = await client.request(method, f"{_BASE_URL}{path}", json=json_data, params=params, headers=_headers()) return resp.json()