Skip to main content
Glama
nonead

Nonead Universal-Robots MCP Server

by nonead

start_data_recording

Start recording robot state, joint, TCP, and error data from a Universal Robot. Specify robot ID and data types; set duration or record continuously.

Instructions

开始记录机器人数据

参数:
- robot_id: 机器人ID
- record_types: 记录类型列表,可选值包括"robot_state", "joint_data", "tcp_data", "error_data"
- duration: 记录持续时间(秒),0表示持续记录直到停止

返回:
- 记录会话ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
robot_idYes
record_typesYes
durationNo

Implementation Reference

  • MCP tool handler function 'start_data_recording'. Decorated with @mcp.tool(), it accepts robot_id (str), record_types (list of strings), and duration (float, default 0). Maps string record types to DataRecordType enums, then delegates to advanced_data_recorder.start_recording(). Returns the session ID in JSON format.
    @mcp.tool()
    def start_data_recording(robot_id: str, record_types: list, duration: float = 0):
        """
        开始记录机器人数据
        
        参数:
        - robot_id: 机器人ID
        - record_types: 记录类型列表,可选值包括"robot_state", "joint_data", "tcp_data", "error_data"
        - duration: 记录持续时间(秒),0表示持续记录直到停止
        
        返回:
        - 记录会话ID
        """
        try:
            if advanced_data_recorder is None:
                return return_msg("高级数据记录器未初始化")
            
            # 映射记录类型字符串到枚举值
            type_map = {
                "robot_state": DataRecordType.ROBOT_STATE,
                "joint_data": DataRecordType.JOINT_DATA,
                "tcp_data": DataRecordType.TCP_DATA,
                "error_data": DataRecordType.ERROR
            }
            
            record_enum_types = []
            for record_type in record_types:
                if record_type in type_map:
                    record_enum_types.append(type_map[record_type])
                else:
                    return return_msg(f"不支持的记录类型: {record_type}")
            
            # 启动记录
            session_id = advanced_data_recorder.start_recording(
                robot_id=robot_id,
                record_types=record_enum_types,
                duration=duration
            )
            
            return return_msg({"session_id": session_id, "message": "数据记录已启动"})
        except Exception as e:
            logger.error(f"开始数据记录失败: {str(e)}")
            return return_msg(f"开始数据记录失败: {str(e)}")
  • The tool is registered as an MCP tool via the @mcp.tool() decorator on line 1019. The FastMCP instance 'mcp' is created on line 31, and this decorator registers 'start_data_recording' as a callable tool.
    @mcp.tool()
  • The DataRecordType enum defines the valid record type values used by the tool. The relevant values are: ROBOT_STATE, JOINT_DATA, TCP_DATA, and ERROR (mapped from string inputs 'robot_state', 'joint_data', 'tcp_data', 'error_data').
    class DataRecordType(Enum):
        """数据记录类型枚举"""
        SYSTEM_INFO = "system_info"                  # 系统信息
        ROBOT_STATE = "robot_state"                  # 机器人状态
        JOINT_DATA = "joint_data"                    # 关节数据
        TCP_DATA = "tcp_data"                        # TCP数据
        TRAJECTORY = "trajectory"                    # 轨迹数据
        OPERATION = "operation"                      # 操作记录
        ERROR = "error"                              # 错误记录
        SENSOR = "sensor"                            # 传感器数据
        PERFORMANCE = "performance"                  # 性能数据
        ENERGY = "energy"                            # 能耗数据
        COLLISION = "collision"                      # 碰撞检测
        CUSTOM = "custom"                            # 自定义数据
    
    
    class StorageFormat(Enum):
        """存储格式枚举"""
        JSON = "json"                                # JSON格式
        CSV = "csv"                                  # CSV格式
        SQLITE = "sqlite"                            # SQLite数据库
        PICKLE = "pickle"                            # Python Pickle
        COMPRESSED = "compressed"                    # 压缩格式
    
    
    class RotationStrategy(Enum):
        """日志轮转策略枚举"""
        SIZE_BASED = "size_based"                    # 基于大小
        TIME_BASED = "time_based"                    # 基于时间
        COUNT_BASED = "count_based"                  # 基于记录数
        HYBRID = "hybrid"                            # 混合策略
    
    
    class RecordPriority(Enum):
        """记录优先级枚举"""
        LOW = "low"                                  # 低优先级
        MEDIUM = "medium"                            # 中优先级
        HIGH = "high"                                # 高优先级
        CRITICAL = "critical"                        # 关键优先级
    
    
    @dataclass
    class DataRecord:
        """数据记录基类"""
        timestamp: float = field(default_factory=time.time)  # 时间戳
        record_id: str = field(default_factory=lambda: f"rec_{int(time.time() * 1000)}")  # 记录ID
        robot_id: Optional[str] = None  # 机器人ID
        record_type: Optional[DataRecordType] = None  # 记录类型
        priority: RecordPriority = RecordPriority.MEDIUM  # 优先级
        metadata: Dict[str, Any] = field(default_factory=dict)  # 元数据
        data: Dict[str, Any] = field(default_factory=dict)  # 实际数据
    
        def to_dict(self) -> Dict[str, Any]:
            """转换为字典"""
            result = asdict(self)
            # 处理枚举类型
            if result['record_type'] is not None:
                result['record_type'] = result['record_type'].value
            result['priority'] = result['priority'].value
            return result
    
        @classmethod
        def from_dict(cls, data: Dict[str, Any]) -> 'DataRecord':
            """从字典创建实例"""
            # 转换回枚举类型
            if 'record_type' in data and data['record_type'] is not None:
                data['record_type'] = DataRecordType(data['record_type'])
            if 'priority' in data:
                data['priority'] = RecordPriority(data['priority'])
            return cls(**data)
    
    
    @dataclass
    class RobotStateRecord(DataRecord):
        """机器人状态记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.ROBOT_STATE
    
    
    @dataclass
    class JointDataRecord(DataRecord):
        """关节数据记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.JOINT_DATA
    
    
    @dataclass
    class TCPDataRecord(DataRecord):
        """TCP数据记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.TCP_DATA
    
    
    @dataclass
    class TrajectoryRecord(DataRecord):
        """轨迹数据记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.TRAJECTORY
    
    
    @dataclass
    class OperationRecord(DataRecord):
        """操作记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.OPERATION
    
    
    @dataclass
    class ErrorRecord(DataRecord):
        """错误记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.ERROR
            self.priority = RecordPriority.HIGH
    
    
    @dataclass
    class SensorRecord(DataRecord):
        """传感器数据记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.SENSOR
    
    
    @dataclass
    class PerformanceRecord(DataRecord):
        """性能数据记录"""
        def __post_init__(self):
            self.record_type = DataRecordType.PERFORMANCE
    
    
    @dataclass
    class StorageConfig:
        """存储配置"""
        storage_format: StorageFormat = StorageFormat.JSON  # 存储格式
        base_directory: str = "./robot_data"  # 基础存储目录
        file_prefix: str = "robot_log"  # 文件前缀
        rotation_strategy: RotationStrategy = RotationStrategy.SIZE_BASED  # 轮转策略
        max_file_size_mb: int = 100  # 最大文件大小(MB)
        rotation_interval_seconds: int = 3600  # 轮转间隔(秒)
        max_records_per_file: int = 10000  # 每个文件最大记录数
        compression: bool = False  # 是否压缩
        enable_backup: bool = True  # 是否启用备份
        backup_interval_days: int = 7  # 备份间隔(天)
    
    
    @dataclass
    class RecordingConfig:
        """记录配置"""
        enabled: bool = True  # 是否启用记录
        buffer_size: int = 10000  # 缓冲区大小
        flush_interval_seconds: int = 5  # 刷新间隔(秒)
        min_priority: RecordPriority = RecordPriority.LOW  # 最小记录优先级
        enabled_record_types: List[DataRecordType] = field(
            default_factory=lambda: list(DataRecordType))  # 启用的记录类型
        sampling_rate_hz: Dict[DataRecordType, float] = field(
            default_factory=lambda: {
                DataRecordType.ROBOT_STATE: 10.0,
                DataRecordType.JOINT_DATA: 100.0,
                DataRecordType.TCP_DATA: 50.0,
                DataRecordType.TRAJECTORY: 20.0,
                DataRecordType.OPERATION: 0.0,  # 事件触发
                DataRecordType.ERROR: 0.0,  # 事件触发
                DataRecordType.SENSOR: 50.0,
                DataRecordType.PERFORMANCE: 1.0,
                DataRecordType.ENERGY: 5.0,
                DataRecordType.COLLISION: 0.0,  # 事件触发
                DataRecordType.CUSTOM: 10.0
            })  # 采样率(Hz)
        custom_filters: Dict[str, Callable[[DataRecord], bool]] = field(
            default_factory=dict)  # 自定义过滤器
    
    
    class SQLiteStorage:
        """SQLite存储管理器"""
        
        def __init__(self, db_path: str):
            """
            初始化SQLite存储
            
            Args:
                db_path: 数据库文件路径
            """
            self.db_path = db_path
            self._init_database()
        
        def _init_database(self):
            """初始化数据库表结构"""
            os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
            
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # 创建主记录表
                cursor.execute('''
                    CREATE TABLE IF NOT EXISTS records (
                        id TEXT PRIMARY KEY,
                        timestamp REAL,
                        record_type TEXT,
                        robot_id TEXT,
                        priority TEXT,
                        metadata TEXT,
                        data TEXT
                    )
                ''')
                
                # 创建索引以提高查询性能
                cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON records (timestamp)')
                cursor.execute('CREATE INDEX IF NOT EXISTS idx_record_type ON records (record_type)')
                cursor.execute('CREATE INDEX IF NOT EXISTS idx_robot_id ON records (robot_id)')
                cursor.execute('CREATE INDEX IF NOT EXISTS idx_priority ON records (priority)')
                
                conn.commit()
        
        def store_records(self, records: List[DataRecord]) -> bool:
            """
            存储记录到SQLite数据库
            
            Args:
                records: 记录列表
                
            Returns:
                bool: 是否成功
            """
            try:
                with sqlite3.connect(self.db_path) as conn:
                    cursor = conn.cursor()
                    
                    for record in records:
                        record_dict = record.to_dict()
                        cursor.execute('''
                            INSERT OR REPLACE INTO records 
                            (id, timestamp, record_type, robot_id, priority, metadata, data) 
                            VALUES (?, ?, ?, ?, ?, ?, ?)
                        ''', (
                            record_dict['record_id'],
                            record_dict['timestamp'],
                            record_dict['record_type'],
                            record_dict['robot_id'],
                            record_dict['priority'],
                            json.dumps(record_dict['metadata']),
                            json.dumps(record_dict['data'])
                        ))
                    
                    conn.commit()
                
                return True
            except Exception as e:
                logger.error(f"SQLite存储失败: {str(e)}")
                return False
        
        def query_records(self, 
                         start_time: Optional[float] = None,
                         end_time: Optional[float] = None,
                         record_type: Optional[DataRecordType] = None,
                         robot_id: Optional[str] = None,
                         priority: Optional[RecordPriority] = None,
                         limit: int = 1000,
                         offset: int = 0) -> List[DataRecord]:
            """
            查询记录
            
            Args:
                start_time: 开始时间
                end_time: 结束时间
                record_type: 记录类型
                robot_id: 机器人ID
                priority: 优先级
                limit: 限制数量
                offset: 偏移量
                
            Returns:
                List[DataRecord]: 记录列表
            """
            records = []
            
            try:
                with sqlite3.connect(self.db_path) as conn:
                    conn.row_factory = sqlite3.Row
                    cursor = conn.cursor()
                    
                    # 构建查询
                    query = "SELECT * FROM records WHERE 1=1"
                    params = []
                    
                    if start_time is not None:
                        query += " AND timestamp >= ?"
                        params.append(start_time)
                    
                    if end_time is not None:
                        query += " AND timestamp <= ?"
                        params.append(end_time)
                    
                    if record_type is not None:
                        query += " AND record_type = ?"
                        params.append(record_type.value)
                    
                    if robot_id is not None:
                        query += " AND robot_id = ?"
                        params.append(robot_id)
                    
                    if priority is not None:
                        query += " AND priority = ?"
                        params.append(priority.value)
                    
                    query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
                    params.extend([limit, offset])
                    
                    # 执行查询
                    cursor.execute(query, params)
                    rows = cursor.fetchall()
                    
                    for row in rows:
                        record_data = {
                            'record_id': row['id'],
                            'timestamp': row['timestamp'],
                            'record_type': row['record_type'],
                            'robot_id': row['robot_id'],
                            'priority': row['priority'],
                            'metadata': json.loads(row['metadata']),
                            'data': json.loads(row['data'])
                        }
                        records.append(DataRecord.from_dict(record_data))
                    
            except Exception as e:
                logger.error(f"SQLite查询失败: {str(e)}")
            
            return records
        
        def delete_records(self, 
                         start_time: Optional[float] = None,
                         end_time: Optional[float] = None,
                         record_type: Optional[DataRecordType] = None,
                         robot_id: Optional[str] = None) -> bool:
            """
            删除记录
            
            Args:
                start_time: 开始时间
                end_time: 结束时间
                record_type: 记录类型
                robot_id: 机器人ID
                
            Returns:
                bool: 是否成功
            """
            try:
                with sqlite3.connect(self.db_path) as conn:
                    cursor = conn.cursor()
                    
                    # 构建删除语句
                    query = "DELETE FROM records WHERE 1=1"
                    params = []
                    
                    if start_time is not None:
                        query += " AND timestamp >= ?"
                        params.append(start_time)
                    
                    if end_time is not None:
                        query += " AND timestamp <= ?"
                        params.append(end_time)
                    
                    if record_type is not None:
                        query += " AND record_type = ?"
                        params.append(record_type.value)
                    
                    if robot_id is not None:
                        query += " AND robot_id = ?"
                        params.append(robot_id)
                    
                    # 执行删除
                    cursor.execute(query, params)
                    conn.commit()
                
                return True
            except Exception as e:
                logger.error(f"SQLite删除失败: {str(e)}")
                return False
        
        def get_record_count(self) -> int:
            """
            获取记录总数
            
            Returns:
                int: 记录数量
            """
            try:
                with sqlite3.connect(self.db_path) as conn:
                    cursor = conn.cursor()
                    cursor.execute("SELECT COUNT(*) FROM records")
                    return cursor.fetchone()[0]
            except Exception as e:
                logger.error(f"获取记录数失败: {str(e)}")
                return 0
    
    
    class JSONStorage:
        """JSON文件存储管理器"""
        
        def __init__(self, base_path: str, file_prefix: str):
            """
            初始化JSON存储
            
            Args:
                base_path: 基础路径
                file_prefix: 文件前缀
            """
            self.base_path = base_path
            self.file_prefix = file_prefix
            os.makedirs(base_path, exist_ok=True)
            
        def _get_current_filename(self) -> str:
            """获取当前文件名"""
            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            return os.path.join(self.base_path, f"{self.file_prefix}_{timestamp}.json")
        
        def store_records(self, records: List[DataRecord]) -> bool:
            """
            存储记录到JSON文件
            
            Args:
                records: 记录列表
                
            Returns:
                bool: 是否成功
            """
            try:
                filename = self._get_current_filename()
                
                # 读取现有数据(如果文件存在)
                existing_records = []
                if os.path.exists(filename):
                    with open(filename, 'r', encoding='utf-8') as f:
                        existing_records = json.load(f)
                
                # 添加新记录
                for record in records:
                    existing_records.append(record.to_dict())
                
                # 写入文件
                with open(filename, 'w', encoding='utf-8') as f:
                    json.dump(existing_records, f, ensure_ascii=False, indent=2)
                
                return True
            except Exception as e:
                logger.error(f"JSON存储失败: {str(e)}")
                return False
        
        def query_records(self, 
                         start_time: Optional[float] = None,
                         end_time: Optional[float] = None,
                         record_type: Optional[DataRecordType] = None,
                         robot_id: Optional[str] = None,
                         priority: Optional[RecordPriority] = None,
                         limit: int = 1000,
                         offset: int = 0) -> List[DataRecord]:
            """
            查询记录
            
            Args:
                start_time: 开始时间
                end_time: 结束时间
                record_type: 记录类型
                robot_id: 机器人ID
                priority: 优先级
                limit: 限制数量
                offset: 偏移量
                
            Returns:
                List[DataRecord]: 记录列表
            """
            records = []
            
            try:
                # 查找所有匹配的文件
                pattern = f"{self.file_prefix}_*.json"
                import glob
                json_files = glob.glob(os.path.join(self.base_path, pattern))
                json_files.sort(reverse=True)  # 最新的文件优先
                
                for file_path in json_files:
                    if len(records) >= limit:
                        break
                    
                    with open(file_path, 'r', encoding='utf-8') as f:
                        file_records = json.load(f)
                    
                    # 过滤记录
                    for record_dict in file_records:
                        # 时间过滤
                        if start_time is not None and record_dict['timestamp'] < start_time:
                            continue
                        if end_time is not None and record_dict['timestamp'] > end_time:
                            continue
                        
                        # 类型过滤
                        if record_type is not None and record_dict['record_type'] != record_type.value:
                            continue
                        
                        # 机器人ID过滤
                        if robot_id is not None and record_dict['robot_id'] != robot_id:
                            continue
                        
                        # 优先级过滤
                        if priority is not None and record_dict['priority'] != priority.value:
                            continue
                        
                        records.append(DataRecord.from_dict(record_dict))
                
                # 按时间戳排序
                records.sort(key=lambda x: x.timestamp, reverse=True)
                
                # 应用偏移和限制
                records = records[offset:offset + limit]
                
            except Exception as e:
                logger.error(f"JSON查询失败: {str(e)}")
            
            return records
    
    
    class CSVStorage:
        """CSV文件存储管理器"""
        
        def __init__(self, base_path: str, file_prefix: str):
            """
            初始化CSV存储
            
            Args:
                base_path: 基础路径
                file_prefix: 文件前缀
            """
            self.base_path = base_path
            self.file_prefix = file_prefix
            os.makedirs(base_path, exist_ok=True)
            self._file_headers = {}
        
        def _get_filename_by_type(self, record_type: DataRecordType) -> str:
            """根据记录类型获取文件名"""
            date_str = datetime.datetime.now().strftime("%Y%m%d")
            return os.path.join(self.base_path, 
                              f"{self.file_prefix}_{record_type.value}_{date_str}.csv")
        
        def store_records(self, records: List[DataRecord]) -> bool:
            """
            存储记录到CSV文件
            
            Args:
                records: 记录列表
                
            Returns:
                bool: 是否成功
            """
            try:
                # 按记录类型分组
                records_by_type = {}
                for record in records:
                    if record.record_type not in records_by_type:
                        records_by_type[record.record_type] = []
                    records_by_type[record.record_type].append(record)
                
                # 存储每组记录
                for record_type, type_records in records_by_type.items():
                    filename = self._get_filename_by_type(record_type)
                    file_exists = os.path.exists(filename)
                    
                    with open(filename, 'a', newline='', encoding='utf-8') as f:
                        # 构建所有可能的字段
                        all_fields = ['record_id', 'timestamp', 'robot_id', 'priority']
                        data_fields = set()
                        
                        for record in type_records:
                            data_fields.update(record.data.keys())
                        
                        fieldnames = all_fields + sorted(list(data_fields))
                        
                        writer = csv.DictWriter(f, fieldnames=fieldnames)
                        
                        # 写入头部(如果是新文件)
                        if not file_exists:
                            writer.writeheader()
                            self._file_headers[filename] = fieldnames
                        
                        # 写入记录
                        for record in type_records:
                            row = {
                                'record_id': record.record_id,
                                'timestamp': record.timestamp,
                                'robot_id': record.robot_id or '',
                                'priority': record.priority.value
                            }
                            row.update(record.data)
                            writer.writerow(row)
                
                return True
            except Exception as e:
                logger.error(f"CSV存储失败: {str(e)}")
                return False
    
    
    class PickleStorage:
        """Pickle存储管理器"""
        
        def __init__(self, base_path: str, file_prefix: str, compression: bool = False):
            """
            初始化Pickle存储
            
            Args:
                base_path: 基础路径
                file_prefix: 文件前缀
                compression: 是否使用压缩
            """
            self.base_path = base_path
            self.file_prefix = file_prefix
            self.compression = compression
            os.makedirs(base_path, exist_ok=True)
        
        def _get_current_filename(self) -> str:
            """获取当前文件名"""
            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            ext = "pklz" if self.compression else "pkl"
            return os.path.join(self.base_path, f"{self.file_prefix}_{timestamp}.{ext}")
        
        def store_records(self, records: List[DataRecord]) -> bool:
            """
            存储记录到Pickle文件
            
            Args:
                records: 记录列表
                
            Returns:
                bool: 是否成功
            """
            try:
                filename = self._get_current_filename()
                
                # 序列化记录
                records_data = [record.to_dict() for record in records]
                
                # 写入文件
                if self.compression:
                    with open(filename, 'wb') as f:
                        compressed_data = zlib.compress(pickle.dumps(records_data))
                        f.write(compressed_data)
                else:
                    with open(filename, 'wb') as f:
                        pickle.dump(records_data, f)
                
                return True
            except Exception as e:
                logger.error(f"Pickle存储失败: {str(e)}")
                return False
        
        def query_records(self, 
                         start_time: Optional[float] = None,
                         end_time: Optional[float] = None,
                         record_type: Optional[DataRecordType] = None,
                         robot_id: Optional[str] = None,
                         priority: Optional[RecordPriority] = None,
                         limit: int = 1000,
                         offset: int = 0) -> List[DataRecord]:
            """
            查询记录
            
            Args:
                start_time: 开始时间
                end_time: 结束时间
                record_type: 记录类型
                robot_id: 机器人ID
                priority: 优先级
                limit: 限制数量
                offset: 偏移量
                
            Returns:
                List[DataRecord]: 记录列表
            """
            records = []
            
            try:
                # 查找所有匹配的文件
                import glob
                if self.compression:
                    pattern = f"{self.file_prefix}_*.pklz"
                else:
                    pattern = f"{self.file_prefix}_*.pkl"
                
                pickle_files = glob.glob(os.path.join(self.base_path, pattern))
                pickle_files.sort(reverse=True)  # 最新的文件优先
                
                for file_path in pickle_files:
                    if len(records) >= limit:
                        break
                    
                    # 读取文件
                    if file_path.endswith('.pklz'):
                        with open(file_path, 'rb') as f:
                            compressed_data = f.read()
                            file_records = pickle.loads(zlib.decompress(compressed_data))
                    else:
                        with open(file_path, 'rb') as f:
                            file_records = pickle.load(f)
                    
                    # 过滤记录
                    for record_dict in file_records:
                        # 时间过滤
                        if start_time is not None and record_dict['timestamp'] < start_time:
                            continue
                        if end_time is not None and record_dict['timestamp'] > end_time:
                            continue
                        
                        # 类型过滤
                        if record_type is not None and record_dict['record_type'] != record_type.value:
                            continue
                        
                        # 机器人ID过滤
                        if robot_id is not None and record_dict['robot_id'] != robot_id:
                            continue
                        
                        # 优先级过滤
                        if priority is not None and record_dict['priority'] != priority.value:
                            continue
                        
                        records.append(DataRecord.from_dict(record_dict))
                
                # 按时间戳排序
                records.sort(key=lambda x: x.timestamp, reverse=True)
                
                # 应用偏移和限制
                records = records[offset:offset + limit]
                
            except Exception as e:
                logger.error(f"Pickle查询失败: {str(e)}")
            
            return records
    
    
    class DataStorageFactory:
        """数据存储工厂类"""
        
        @staticmethod
        def create_storage(config: StorageConfig) -> Union[SQLiteStorage, JSONStorage, CSVStorage, PickleStorage]:
            """
            创建存储实例
            
            Args:
                config: 存储配置
                
            Returns:
                Union[SQLiteStorage, JSONStorage, CSVStorage, PickleStorage]: 存储实例
            """
            os.makedirs(config.base_directory, exist_ok=True)
            
            if config.storage_format == StorageFormat.SQLITE:
                db_path = os.path.join(config.base_directory, f"{config.file_prefix}.db")
                return SQLiteStorage(db_path)
            elif config.storage_format == StorageFormat.JSON:
                return JSONStorage(config.base_directory, config.file_prefix)
            elif config.storage_format == StorageFormat.CSV:
                return CSVStorage(config.base_directory, config.file_prefix)
            elif config.storage_format == StorageFormat.PICKLE or config.storage_format == StorageFormat.COMPRESSED:
                return PickleStorage(config.base_directory, config.file_prefix, 
                                   compression=config.storage_format == StorageFormat.COMPRESSED)
            else:
                raise ValueError(f"不支持的存储格式: {config.storage_format}")
    
    
    class AdvancedDataRecorder:
        """高级数据记录器类"""
        
        def __init__(self, 
                     storage_config: Optional[StorageConfig] = None,
                     recording_config: Optional[RecordingConfig] = None):
            """
            初始化高级数据记录器
            
            Args:
                storage_config: 存储配置
                recording_config: 记录配置
            """
            self.storage_config = storage_config or StorageConfig()
            self.recording_config = recording_config or RecordingConfig()
            
            # 创建存储实例
            self.storage = DataStorageFactory.create_storage(self.storage_config)
            
            # 初始化缓冲区
            self.buffer = deque(maxlen=self.recording_config.buffer_size)
            self.buffer_lock = threading.RLock()
            
            # 状态标志
            self.running = False
            self.recording = self.recording_config.enabled
            
            # 记录统计信息
            self.stats = {
                'total_records': 0,
                'records_stored': 0,
                'records_dropped': 0,
                'last_flush_time': time.time(),
                'last_rotation_time': time.time()
            }
            
            # 记录最后采样时间(用于控制采样率)
            self.last_sampling_time = {}
            
            # 启动后台线程
            self.flush_thread = None
            if self.recording_config.flush_interval_seconds > 0:
                self._start_flush_thread()
        
        def _start_flush_thread(self):
            """启动刷新线程"""
            self.running = True
            self.flush_thread = threading.Thread(target=self._flush_loop, daemon=True)
            self.flush_thread.start()
            logger.info("数据记录器刷新线程已启动")
        
        def _flush_loop(self):
            """刷新循环"""
            while self.running:
                try:
                    # 检查是否需要刷新
                    current_time = time.time()
                    if current_time - self.stats['last_flush_time'] >= self.recording_config.flush_interval_seconds:
                        self.flush()
                    
                    # 检查是否需要轮转
                    self._check_rotation()
                    
                    # 检查是否需要备份
                    self._check_backup()
                    
                except Exception as e:
                    logger.error(f"刷新线程错误: {str(e)}")
                
                time.sleep(0.1)
        
        def start_recording(self):
            """
            开始记录
            """
            self.recording = True
            logger.info("数据记录已开始")
        
        def stop_recording(self):
            """
            停止记录
            """
            self.recording = False
            logger.info("数据记录已停止")
        
        def toggle_recording(self) -> bool:
            """
            切换记录状态
            
            Returns:
                bool: 切换后的状态
            """
            self.recording = not self.recording
            logger.info(f"数据记录已{'开始' if self.recording else '停止'}")
            return self.recording
        
        def record(self, 
                  record_type: DataRecordType,
                  data: Dict[str, Any],
                  robot_id: Optional[str] = None,
                  priority: Optional[RecordPriority] = None,
                  metadata: Optional[Dict[str, Any]] = None) -> bool:
            """
            记录数据
            
            Args:
                record_type: 记录类型
                data: 记录数据
                robot_id: 机器人ID
                priority: 优先级
                metadata: 元数据
                
            Returns:
                bool: 是否成功
            """
            # 检查是否启用记录
            if not self.recording:
                return False
            
            # 检查记录类型是否启用
            if record_type not in self.recording_config.enabled_record_types:
                return False
            
            # 检查优先级
            if priority is None:
                priority = RecordPriority.MEDIUM
            
            if priority.value < self.recording_config.min_priority.value:
                return False
            
            # 检查采样率控制
            current_time = time.time()
            sampling_rate = self.recording_config.sampling_rate_hz.get(record_type, 0.0)
            
            if sampling_rate > 0:
                if record_type not in self.last_sampling_time:
                    self.last_sampling_time[record_type] = current_time
                else:
                    elapsed = current_time - self.last_sampling_time[record_type]
                    if elapsed < 1.0 / sampling_rate:
                        return False
                    self.last_sampling_time[record_type] = current_time
            
            # 创建记录
            record = DataRecord(
                record_id=f"{record_type.value}_{int(current_time * 1000)}",
                robot_id=robot_id,
                record_type=record_type,
                priority=priority,
                metadata=metadata or {},
                data=data
            )
            
            # 应用自定义过滤器
            filter_name = f"{record_type.value}_filter"
            if filter_name in self.recording_config.custom_filters:
                if not self.recording_config.custom_filters[filter_name](record):
                    return False
            
            # 添加到缓冲区
            with self.buffer_lock:
                if len(self.buffer) >= self.buffer.maxlen:
                    self.stats['records_dropped'] += 1
                    return False
                
                self.buffer.append(record)
                self.stats['total_records'] += 1
            
            return True
        
        def record_robot_state(self, 
                              robot_id: str,
                              state_data: Dict[str, Any],
                              priority: RecordPriority = RecordPriority.MEDIUM) -> bool:
            """
            记录机器人状态
            
            Args:
                robot_id: 机器人ID
                state_data: 状态数据
                priority: 优先级
                
            Returns:
                bool: 是否成功
            """
            return self.record(
                record_type=DataRecordType.ROBOT_STATE,
                robot_id=robot_id,
                data=state_data,
                priority=priority
            )
        
        def record_joint_data(self, 
                             robot_id: str,
                             joint_data: Dict[str, Any],
                             priority: RecordPriority = RecordPriority.MEDIUM) -> bool:
            """
            记录关节数据
            
            Args:
                robot_id: 机器人ID
                joint_data: 关节数据
                priority: 优先级
                
            Returns:
                bool: 是否成功
            """
            return self.record(
                record_type=DataRecordType.JOINT_DATA,
                robot_id=robot_id,
                data=joint_data,
                priority=priority
            )
        
        def record_tcp_data(self, 
                           robot_id: str,
                           tcp_data: Dict[str, Any],
                           priority: RecordPriority = RecordPriority.MEDIUM) -> bool:
            """
            记录TCP数据
            
            Args:
                robot_id: 机器人ID
                tcp_data: TCP数据
                priority: 优先级
                
            Returns:
                bool: 是否成功
            """
            return self.record(
                record_type=DataRecordType.TCP_DATA,
                robot_id=robot_id,
                data=tcp_data,
                priority=priority
            )
        
        def record_trajectory(self, 
                             robot_id: str,
                             trajectory_data: Dict[str, Any],
                             priority: RecordPriority = RecordPriority.MEDIUM) -> bool:
            """
            记录轨迹数据
            
            Args:
  • The AdvancedDataRecorder.start_recording() method is the underlying implementation that gets called by the MCP handler. It sets self.recording = True to enable data recording.
    def start_recording(self):
        """
        开始记录
        """
        self.recording = True
        logger.info("数据记录已开始")
  • Initialization of the advanced_data_recorder instance at server startup. Called from initialize_extended_modules() which creates an AdvancedDataRecorder() and assigns it to the global variable used by the tool handler.
    # 初始化高级数据记录器
    advanced_data_recorder = AdvancedDataRecorder()
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden. It mentions parameters but does not disclose behavioral traits such as what happens when duration is 0 (continuous recording), how the session ID is used, or whether concurrent recordings are allowed. This is minimal transparency.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is extremely concise: a header line followed by a bulleted list of parameters and return. Every sentence is necessary, and the structure is clear and front-loaded.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (3 parameters, no output schema, no annotations), the description covers the basics but lacks important context such as how to use the returned session ID (e.g., to stop recording), prerequisites (e.g., robot must be connected), or potential side effects. It is adequate but not complete.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 0% schema description coverage, the description adds significant value by explaining each parameter: robot_id (robot ID), record_types (list of possible values like robot_state, joint_data, tcp_data, error_data), and duration (seconds, 0 for continuous). This goes beyond the schema's minimal information.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states 'Start recording robot data' with a specific verb ('start') and resource ('data recording'). The presence of a sibling tool 'stop_data_recording' further clarifies its purpose as the start counterpart.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description implies usage in conjunction with 'stop_data_recording' but does not explicitly state when to use this tool versus alternatives. No guidance on prerequisites or context is provided.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nonead/Nonead-Universal-Robots-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server