start_data_recording
Start recording robot state, joint, TCP, and error data from a Universal Robot. Specify robot ID and data types; set duration or record continuously.
Instructions
开始记录机器人数据
参数:
- robot_id: 机器人ID
- record_types: 记录类型列表,可选值包括"robot_state", "joint_data", "tcp_data", "error_data"
- duration: 记录持续时间(秒),0表示持续记录直到停止
返回:
- 记录会话IDInput Schema
| Name | Required | Description | Default |
|---|---|---|---|
| robot_id | Yes | ||
| record_types | Yes | ||
| duration | No |
Implementation Reference
- MCP tool handler function 'start_data_recording'. Decorated with @mcp.tool(), it accepts robot_id (str), record_types (list of strings), and duration (float, default 0). Maps string record types to DataRecordType enums, then delegates to advanced_data_recorder.start_recording(). Returns the session ID in JSON format.
@mcp.tool() def start_data_recording(robot_id: str, record_types: list, duration: float = 0): """ 开始记录机器人数据 参数: - robot_id: 机器人ID - record_types: 记录类型列表,可选值包括"robot_state", "joint_data", "tcp_data", "error_data" - duration: 记录持续时间(秒),0表示持续记录直到停止 返回: - 记录会话ID """ try: if advanced_data_recorder is None: return return_msg("高级数据记录器未初始化") # 映射记录类型字符串到枚举值 type_map = { "robot_state": DataRecordType.ROBOT_STATE, "joint_data": DataRecordType.JOINT_DATA, "tcp_data": DataRecordType.TCP_DATA, "error_data": DataRecordType.ERROR } record_enum_types = [] for record_type in record_types: if record_type in type_map: record_enum_types.append(type_map[record_type]) else: return return_msg(f"不支持的记录类型: {record_type}") # 启动记录 session_id = advanced_data_recorder.start_recording( robot_id=robot_id, record_types=record_enum_types, duration=duration ) return return_msg({"session_id": session_id, "message": "数据记录已启动"}) except Exception as e: logger.error(f"开始数据记录失败: {str(e)}") return return_msg(f"开始数据记录失败: {str(e)}") - src/nonead_universal_robots_mcp/server.py:1019-1019 (registration)The tool is registered as an MCP tool via the @mcp.tool() decorator on line 1019. The FastMCP instance 'mcp' is created on line 31, and this decorator registers 'start_data_recording' as a callable tool.
@mcp.tool() - The DataRecordType enum defines the valid record type values used by the tool. The relevant values are: ROBOT_STATE, JOINT_DATA, TCP_DATA, and ERROR (mapped from string inputs 'robot_state', 'joint_data', 'tcp_data', 'error_data').
class DataRecordType(Enum): """数据记录类型枚举""" SYSTEM_INFO = "system_info" # 系统信息 ROBOT_STATE = "robot_state" # 机器人状态 JOINT_DATA = "joint_data" # 关节数据 TCP_DATA = "tcp_data" # TCP数据 TRAJECTORY = "trajectory" # 轨迹数据 OPERATION = "operation" # 操作记录 ERROR = "error" # 错误记录 SENSOR = "sensor" # 传感器数据 PERFORMANCE = "performance" # 性能数据 ENERGY = "energy" # 能耗数据 COLLISION = "collision" # 碰撞检测 CUSTOM = "custom" # 自定义数据 class StorageFormat(Enum): """存储格式枚举""" JSON = "json" # JSON格式 CSV = "csv" # CSV格式 SQLITE = "sqlite" # SQLite数据库 PICKLE = "pickle" # Python Pickle COMPRESSED = "compressed" # 压缩格式 class RotationStrategy(Enum): """日志轮转策略枚举""" SIZE_BASED = "size_based" # 基于大小 TIME_BASED = "time_based" # 基于时间 COUNT_BASED = "count_based" # 基于记录数 HYBRID = "hybrid" # 混合策略 class RecordPriority(Enum): """记录优先级枚举""" LOW = "low" # 低优先级 MEDIUM = "medium" # 中优先级 HIGH = "high" # 高优先级 CRITICAL = "critical" # 关键优先级 @dataclass class DataRecord: """数据记录基类""" timestamp: float = field(default_factory=time.time) # 时间戳 record_id: str = field(default_factory=lambda: f"rec_{int(time.time() * 1000)}") # 记录ID robot_id: Optional[str] = None # 机器人ID record_type: Optional[DataRecordType] = None # 记录类型 priority: RecordPriority = RecordPriority.MEDIUM # 优先级 metadata: Dict[str, Any] = field(default_factory=dict) # 元数据 data: Dict[str, Any] = field(default_factory=dict) # 实际数据 def to_dict(self) -> Dict[str, Any]: """转换为字典""" result = asdict(self) # 处理枚举类型 if result['record_type'] is not None: result['record_type'] = result['record_type'].value result['priority'] = result['priority'].value return result @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'DataRecord': """从字典创建实例""" # 转换回枚举类型 if 'record_type' in data and data['record_type'] is not None: data['record_type'] = DataRecordType(data['record_type']) if 'priority' in data: data['priority'] = RecordPriority(data['priority']) return cls(**data) @dataclass class RobotStateRecord(DataRecord): """机器人状态记录""" def __post_init__(self): self.record_type = DataRecordType.ROBOT_STATE @dataclass class JointDataRecord(DataRecord): """关节数据记录""" def __post_init__(self): self.record_type = DataRecordType.JOINT_DATA @dataclass class TCPDataRecord(DataRecord): """TCP数据记录""" def __post_init__(self): self.record_type = DataRecordType.TCP_DATA @dataclass class TrajectoryRecord(DataRecord): """轨迹数据记录""" def __post_init__(self): self.record_type = DataRecordType.TRAJECTORY @dataclass class OperationRecord(DataRecord): """操作记录""" def __post_init__(self): self.record_type = DataRecordType.OPERATION @dataclass class ErrorRecord(DataRecord): """错误记录""" def __post_init__(self): self.record_type = DataRecordType.ERROR self.priority = RecordPriority.HIGH @dataclass class SensorRecord(DataRecord): """传感器数据记录""" def __post_init__(self): self.record_type = DataRecordType.SENSOR @dataclass class PerformanceRecord(DataRecord): """性能数据记录""" def __post_init__(self): self.record_type = DataRecordType.PERFORMANCE @dataclass class StorageConfig: """存储配置""" storage_format: StorageFormat = StorageFormat.JSON # 存储格式 base_directory: str = "./robot_data" # 基础存储目录 file_prefix: str = "robot_log" # 文件前缀 rotation_strategy: RotationStrategy = RotationStrategy.SIZE_BASED # 轮转策略 max_file_size_mb: int = 100 # 最大文件大小(MB) rotation_interval_seconds: int = 3600 # 轮转间隔(秒) max_records_per_file: int = 10000 # 每个文件最大记录数 compression: bool = False # 是否压缩 enable_backup: bool = True # 是否启用备份 backup_interval_days: int = 7 # 备份间隔(天) @dataclass class RecordingConfig: """记录配置""" enabled: bool = True # 是否启用记录 buffer_size: int = 10000 # 缓冲区大小 flush_interval_seconds: int = 5 # 刷新间隔(秒) min_priority: RecordPriority = RecordPriority.LOW # 最小记录优先级 enabled_record_types: List[DataRecordType] = field( default_factory=lambda: list(DataRecordType)) # 启用的记录类型 sampling_rate_hz: Dict[DataRecordType, float] = field( default_factory=lambda: { DataRecordType.ROBOT_STATE: 10.0, DataRecordType.JOINT_DATA: 100.0, DataRecordType.TCP_DATA: 50.0, DataRecordType.TRAJECTORY: 20.0, DataRecordType.OPERATION: 0.0, # 事件触发 DataRecordType.ERROR: 0.0, # 事件触发 DataRecordType.SENSOR: 50.0, DataRecordType.PERFORMANCE: 1.0, DataRecordType.ENERGY: 5.0, DataRecordType.COLLISION: 0.0, # 事件触发 DataRecordType.CUSTOM: 10.0 }) # 采样率(Hz) custom_filters: Dict[str, Callable[[DataRecord], bool]] = field( default_factory=dict) # 自定义过滤器 class SQLiteStorage: """SQLite存储管理器""" def __init__(self, db_path: str): """ 初始化SQLite存储 Args: db_path: 数据库文件路径 """ self.db_path = db_path self._init_database() def _init_database(self): """初始化数据库表结构""" os.makedirs(os.path.dirname(self.db_path), exist_ok=True) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 创建主记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS records ( id TEXT PRIMARY KEY, timestamp REAL, record_type TEXT, robot_id TEXT, priority TEXT, metadata TEXT, data TEXT ) ''') # 创建索引以提高查询性能 cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON records (timestamp)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_record_type ON records (record_type)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_robot_id ON records (robot_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_priority ON records (priority)') conn.commit() def store_records(self, records: List[DataRecord]) -> bool: """ 存储记录到SQLite数据库 Args: records: 记录列表 Returns: bool: 是否成功 """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() for record in records: record_dict = record.to_dict() cursor.execute(''' INSERT OR REPLACE INTO records (id, timestamp, record_type, robot_id, priority, metadata, data) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( record_dict['record_id'], record_dict['timestamp'], record_dict['record_type'], record_dict['robot_id'], record_dict['priority'], json.dumps(record_dict['metadata']), json.dumps(record_dict['data']) )) conn.commit() return True except Exception as e: logger.error(f"SQLite存储失败: {str(e)}") return False def query_records(self, start_time: Optional[float] = None, end_time: Optional[float] = None, record_type: Optional[DataRecordType] = None, robot_id: Optional[str] = None, priority: Optional[RecordPriority] = None, limit: int = 1000, offset: int = 0) -> List[DataRecord]: """ 查询记录 Args: start_time: 开始时间 end_time: 结束时间 record_type: 记录类型 robot_id: 机器人ID priority: 优先级 limit: 限制数量 offset: 偏移量 Returns: List[DataRecord]: 记录列表 """ records = [] try: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # 构建查询 query = "SELECT * FROM records WHERE 1=1" params = [] if start_time is not None: query += " AND timestamp >= ?" params.append(start_time) if end_time is not None: query += " AND timestamp <= ?" params.append(end_time) if record_type is not None: query += " AND record_type = ?" params.append(record_type.value) if robot_id is not None: query += " AND robot_id = ?" params.append(robot_id) if priority is not None: query += " AND priority = ?" params.append(priority.value) query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) # 执行查询 cursor.execute(query, params) rows = cursor.fetchall() for row in rows: record_data = { 'record_id': row['id'], 'timestamp': row['timestamp'], 'record_type': row['record_type'], 'robot_id': row['robot_id'], 'priority': row['priority'], 'metadata': json.loads(row['metadata']), 'data': json.loads(row['data']) } records.append(DataRecord.from_dict(record_data)) except Exception as e: logger.error(f"SQLite查询失败: {str(e)}") return records def delete_records(self, start_time: Optional[float] = None, end_time: Optional[float] = None, record_type: Optional[DataRecordType] = None, robot_id: Optional[str] = None) -> bool: """ 删除记录 Args: start_time: 开始时间 end_time: 结束时间 record_type: 记录类型 robot_id: 机器人ID Returns: bool: 是否成功 """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 构建删除语句 query = "DELETE FROM records WHERE 1=1" params = [] if start_time is not None: query += " AND timestamp >= ?" params.append(start_time) if end_time is not None: query += " AND timestamp <= ?" params.append(end_time) if record_type is not None: query += " AND record_type = ?" params.append(record_type.value) if robot_id is not None: query += " AND robot_id = ?" params.append(robot_id) # 执行删除 cursor.execute(query, params) conn.commit() return True except Exception as e: logger.error(f"SQLite删除失败: {str(e)}") return False def get_record_count(self) -> int: """ 获取记录总数 Returns: int: 记录数量 """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM records") return cursor.fetchone()[0] except Exception as e: logger.error(f"获取记录数失败: {str(e)}") return 0 class JSONStorage: """JSON文件存储管理器""" def __init__(self, base_path: str, file_prefix: str): """ 初始化JSON存储 Args: base_path: 基础路径 file_prefix: 文件前缀 """ self.base_path = base_path self.file_prefix = file_prefix os.makedirs(base_path, exist_ok=True) def _get_current_filename(self) -> str: """获取当前文件名""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") return os.path.join(self.base_path, f"{self.file_prefix}_{timestamp}.json") def store_records(self, records: List[DataRecord]) -> bool: """ 存储记录到JSON文件 Args: records: 记录列表 Returns: bool: 是否成功 """ try: filename = self._get_current_filename() # 读取现有数据(如果文件存在) existing_records = [] if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as f: existing_records = json.load(f) # 添加新记录 for record in records: existing_records.append(record.to_dict()) # 写入文件 with open(filename, 'w', encoding='utf-8') as f: json.dump(existing_records, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"JSON存储失败: {str(e)}") return False def query_records(self, start_time: Optional[float] = None, end_time: Optional[float] = None, record_type: Optional[DataRecordType] = None, robot_id: Optional[str] = None, priority: Optional[RecordPriority] = None, limit: int = 1000, offset: int = 0) -> List[DataRecord]: """ 查询记录 Args: start_time: 开始时间 end_time: 结束时间 record_type: 记录类型 robot_id: 机器人ID priority: 优先级 limit: 限制数量 offset: 偏移量 Returns: List[DataRecord]: 记录列表 """ records = [] try: # 查找所有匹配的文件 pattern = f"{self.file_prefix}_*.json" import glob json_files = glob.glob(os.path.join(self.base_path, pattern)) json_files.sort(reverse=True) # 最新的文件优先 for file_path in json_files: if len(records) >= limit: break with open(file_path, 'r', encoding='utf-8') as f: file_records = json.load(f) # 过滤记录 for record_dict in file_records: # 时间过滤 if start_time is not None and record_dict['timestamp'] < start_time: continue if end_time is not None and record_dict['timestamp'] > end_time: continue # 类型过滤 if record_type is not None and record_dict['record_type'] != record_type.value: continue # 机器人ID过滤 if robot_id is not None and record_dict['robot_id'] != robot_id: continue # 优先级过滤 if priority is not None and record_dict['priority'] != priority.value: continue records.append(DataRecord.from_dict(record_dict)) # 按时间戳排序 records.sort(key=lambda x: x.timestamp, reverse=True) # 应用偏移和限制 records = records[offset:offset + limit] except Exception as e: logger.error(f"JSON查询失败: {str(e)}") return records class CSVStorage: """CSV文件存储管理器""" def __init__(self, base_path: str, file_prefix: str): """ 初始化CSV存储 Args: base_path: 基础路径 file_prefix: 文件前缀 """ self.base_path = base_path self.file_prefix = file_prefix os.makedirs(base_path, exist_ok=True) self._file_headers = {} def _get_filename_by_type(self, record_type: DataRecordType) -> str: """根据记录类型获取文件名""" date_str = datetime.datetime.now().strftime("%Y%m%d") return os.path.join(self.base_path, f"{self.file_prefix}_{record_type.value}_{date_str}.csv") def store_records(self, records: List[DataRecord]) -> bool: """ 存储记录到CSV文件 Args: records: 记录列表 Returns: bool: 是否成功 """ try: # 按记录类型分组 records_by_type = {} for record in records: if record.record_type not in records_by_type: records_by_type[record.record_type] = [] records_by_type[record.record_type].append(record) # 存储每组记录 for record_type, type_records in records_by_type.items(): filename = self._get_filename_by_type(record_type) file_exists = os.path.exists(filename) with open(filename, 'a', newline='', encoding='utf-8') as f: # 构建所有可能的字段 all_fields = ['record_id', 'timestamp', 'robot_id', 'priority'] data_fields = set() for record in type_records: data_fields.update(record.data.keys()) fieldnames = all_fields + sorted(list(data_fields)) writer = csv.DictWriter(f, fieldnames=fieldnames) # 写入头部(如果是新文件) if not file_exists: writer.writeheader() self._file_headers[filename] = fieldnames # 写入记录 for record in type_records: row = { 'record_id': record.record_id, 'timestamp': record.timestamp, 'robot_id': record.robot_id or '', 'priority': record.priority.value } row.update(record.data) writer.writerow(row) return True except Exception as e: logger.error(f"CSV存储失败: {str(e)}") return False class PickleStorage: """Pickle存储管理器""" def __init__(self, base_path: str, file_prefix: str, compression: bool = False): """ 初始化Pickle存储 Args: base_path: 基础路径 file_prefix: 文件前缀 compression: 是否使用压缩 """ self.base_path = base_path self.file_prefix = file_prefix self.compression = compression os.makedirs(base_path, exist_ok=True) def _get_current_filename(self) -> str: """获取当前文件名""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ext = "pklz" if self.compression else "pkl" return os.path.join(self.base_path, f"{self.file_prefix}_{timestamp}.{ext}") def store_records(self, records: List[DataRecord]) -> bool: """ 存储记录到Pickle文件 Args: records: 记录列表 Returns: bool: 是否成功 """ try: filename = self._get_current_filename() # 序列化记录 records_data = [record.to_dict() for record in records] # 写入文件 if self.compression: with open(filename, 'wb') as f: compressed_data = zlib.compress(pickle.dumps(records_data)) f.write(compressed_data) else: with open(filename, 'wb') as f: pickle.dump(records_data, f) return True except Exception as e: logger.error(f"Pickle存储失败: {str(e)}") return False def query_records(self, start_time: Optional[float] = None, end_time: Optional[float] = None, record_type: Optional[DataRecordType] = None, robot_id: Optional[str] = None, priority: Optional[RecordPriority] = None, limit: int = 1000, offset: int = 0) -> List[DataRecord]: """ 查询记录 Args: start_time: 开始时间 end_time: 结束时间 record_type: 记录类型 robot_id: 机器人ID priority: 优先级 limit: 限制数量 offset: 偏移量 Returns: List[DataRecord]: 记录列表 """ records = [] try: # 查找所有匹配的文件 import glob if self.compression: pattern = f"{self.file_prefix}_*.pklz" else: pattern = f"{self.file_prefix}_*.pkl" pickle_files = glob.glob(os.path.join(self.base_path, pattern)) pickle_files.sort(reverse=True) # 最新的文件优先 for file_path in pickle_files: if len(records) >= limit: break # 读取文件 if file_path.endswith('.pklz'): with open(file_path, 'rb') as f: compressed_data = f.read() file_records = pickle.loads(zlib.decompress(compressed_data)) else: with open(file_path, 'rb') as f: file_records = pickle.load(f) # 过滤记录 for record_dict in file_records: # 时间过滤 if start_time is not None and record_dict['timestamp'] < start_time: continue if end_time is not None and record_dict['timestamp'] > end_time: continue # 类型过滤 if record_type is not None and record_dict['record_type'] != record_type.value: continue # 机器人ID过滤 if robot_id is not None and record_dict['robot_id'] != robot_id: continue # 优先级过滤 if priority is not None and record_dict['priority'] != priority.value: continue records.append(DataRecord.from_dict(record_dict)) # 按时间戳排序 records.sort(key=lambda x: x.timestamp, reverse=True) # 应用偏移和限制 records = records[offset:offset + limit] except Exception as e: logger.error(f"Pickle查询失败: {str(e)}") return records class DataStorageFactory: """数据存储工厂类""" @staticmethod def create_storage(config: StorageConfig) -> Union[SQLiteStorage, JSONStorage, CSVStorage, PickleStorage]: """ 创建存储实例 Args: config: 存储配置 Returns: Union[SQLiteStorage, JSONStorage, CSVStorage, PickleStorage]: 存储实例 """ os.makedirs(config.base_directory, exist_ok=True) if config.storage_format == StorageFormat.SQLITE: db_path = os.path.join(config.base_directory, f"{config.file_prefix}.db") return SQLiteStorage(db_path) elif config.storage_format == StorageFormat.JSON: return JSONStorage(config.base_directory, config.file_prefix) elif config.storage_format == StorageFormat.CSV: return CSVStorage(config.base_directory, config.file_prefix) elif config.storage_format == StorageFormat.PICKLE or config.storage_format == StorageFormat.COMPRESSED: return PickleStorage(config.base_directory, config.file_prefix, compression=config.storage_format == StorageFormat.COMPRESSED) else: raise ValueError(f"不支持的存储格式: {config.storage_format}") class AdvancedDataRecorder: """高级数据记录器类""" def __init__(self, storage_config: Optional[StorageConfig] = None, recording_config: Optional[RecordingConfig] = None): """ 初始化高级数据记录器 Args: storage_config: 存储配置 recording_config: 记录配置 """ self.storage_config = storage_config or StorageConfig() self.recording_config = recording_config or RecordingConfig() # 创建存储实例 self.storage = DataStorageFactory.create_storage(self.storage_config) # 初始化缓冲区 self.buffer = deque(maxlen=self.recording_config.buffer_size) self.buffer_lock = threading.RLock() # 状态标志 self.running = False self.recording = self.recording_config.enabled # 记录统计信息 self.stats = { 'total_records': 0, 'records_stored': 0, 'records_dropped': 0, 'last_flush_time': time.time(), 'last_rotation_time': time.time() } # 记录最后采样时间(用于控制采样率) self.last_sampling_time = {} # 启动后台线程 self.flush_thread = None if self.recording_config.flush_interval_seconds > 0: self._start_flush_thread() def _start_flush_thread(self): """启动刷新线程""" self.running = True self.flush_thread = threading.Thread(target=self._flush_loop, daemon=True) self.flush_thread.start() logger.info("数据记录器刷新线程已启动") def _flush_loop(self): """刷新循环""" while self.running: try: # 检查是否需要刷新 current_time = time.time() if current_time - self.stats['last_flush_time'] >= self.recording_config.flush_interval_seconds: self.flush() # 检查是否需要轮转 self._check_rotation() # 检查是否需要备份 self._check_backup() except Exception as e: logger.error(f"刷新线程错误: {str(e)}") time.sleep(0.1) def start_recording(self): """ 开始记录 """ self.recording = True logger.info("数据记录已开始") def stop_recording(self): """ 停止记录 """ self.recording = False logger.info("数据记录已停止") def toggle_recording(self) -> bool: """ 切换记录状态 Returns: bool: 切换后的状态 """ self.recording = not self.recording logger.info(f"数据记录已{'开始' if self.recording else '停止'}") return self.recording def record(self, record_type: DataRecordType, data: Dict[str, Any], robot_id: Optional[str] = None, priority: Optional[RecordPriority] = None, metadata: Optional[Dict[str, Any]] = None) -> bool: """ 记录数据 Args: record_type: 记录类型 data: 记录数据 robot_id: 机器人ID priority: 优先级 metadata: 元数据 Returns: bool: 是否成功 """ # 检查是否启用记录 if not self.recording: return False # 检查记录类型是否启用 if record_type not in self.recording_config.enabled_record_types: return False # 检查优先级 if priority is None: priority = RecordPriority.MEDIUM if priority.value < self.recording_config.min_priority.value: return False # 检查采样率控制 current_time = time.time() sampling_rate = self.recording_config.sampling_rate_hz.get(record_type, 0.0) if sampling_rate > 0: if record_type not in self.last_sampling_time: self.last_sampling_time[record_type] = current_time else: elapsed = current_time - self.last_sampling_time[record_type] if elapsed < 1.0 / sampling_rate: return False self.last_sampling_time[record_type] = current_time # 创建记录 record = DataRecord( record_id=f"{record_type.value}_{int(current_time * 1000)}", robot_id=robot_id, record_type=record_type, priority=priority, metadata=metadata or {}, data=data ) # 应用自定义过滤器 filter_name = f"{record_type.value}_filter" if filter_name in self.recording_config.custom_filters: if not self.recording_config.custom_filters[filter_name](record): return False # 添加到缓冲区 with self.buffer_lock: if len(self.buffer) >= self.buffer.maxlen: self.stats['records_dropped'] += 1 return False self.buffer.append(record) self.stats['total_records'] += 1 return True def record_robot_state(self, robot_id: str, state_data: Dict[str, Any], priority: RecordPriority = RecordPriority.MEDIUM) -> bool: """ 记录机器人状态 Args: robot_id: 机器人ID state_data: 状态数据 priority: 优先级 Returns: bool: 是否成功 """ return self.record( record_type=DataRecordType.ROBOT_STATE, robot_id=robot_id, data=state_data, priority=priority ) def record_joint_data(self, robot_id: str, joint_data: Dict[str, Any], priority: RecordPriority = RecordPriority.MEDIUM) -> bool: """ 记录关节数据 Args: robot_id: 机器人ID joint_data: 关节数据 priority: 优先级 Returns: bool: 是否成功 """ return self.record( record_type=DataRecordType.JOINT_DATA, robot_id=robot_id, data=joint_data, priority=priority ) def record_tcp_data(self, robot_id: str, tcp_data: Dict[str, Any], priority: RecordPriority = RecordPriority.MEDIUM) -> bool: """ 记录TCP数据 Args: robot_id: 机器人ID tcp_data: TCP数据 priority: 优先级 Returns: bool: 是否成功 """ return self.record( record_type=DataRecordType.TCP_DATA, robot_id=robot_id, data=tcp_data, priority=priority ) def record_trajectory(self, robot_id: str, trajectory_data: Dict[str, Any], priority: RecordPriority = RecordPriority.MEDIUM) -> bool: """ 记录轨迹数据 Args: - The AdvancedDataRecorder.start_recording() method is the underlying implementation that gets called by the MCP handler. It sets self.recording = True to enable data recording.
def start_recording(self): """ 开始记录 """ self.recording = True logger.info("数据记录已开始") - Initialization of the advanced_data_recorder instance at server startup. Called from initialize_extended_modules() which creates an AdvancedDataRecorder() and assigns it to the global variable used by the tool handler.
# 初始化高级数据记录器 advanced_data_recorder = AdvancedDataRecorder()