跳转至

模块:多进程同步与数据共享


类定义:DataBuffer

用于在多进程环境中共享不同组件采集的数据。

class DataBuffer:
    '''
    一个用于共享存储不同组件采集的数据的信息的类
    输入:
    manager: 创建的一个独立的控制器, multiprocessing::Manager
    '''
    def __init__(self, manager):
        self.manager = manager
        self.buffer = manager.dict()

    def collect(self, name, data):
        if name not in self.buffer:
            self.buffer[name] = self.manager.list()
        self.buffer[name].append(data)

    def get(self):
        return dict(self.buffer)

成员属性

属性名 类型 说明
manager multiprocessing.Manager 控制共享数据结构的创建与同步
buffer Dict[str, List[Any]] 存储按组件名分类的数据列表

函数一览

__init__(self, manager)

  • 功能:初始化共享数据容器。
  • 参数
  • manager (multiprocessing.Manager): 多进程共享管理器。

collect(self, name, data)

  • 功能:收集一条数据并按组件名分类存入缓冲区。
  • 参数
  • name (str): 组件名称。
  • data (Any): 要存储的数据。

get(self)

  • 功能:获取所有已存储的数据副本(转为常规字典)。
  • 返回值Dict[str, List[Any]] 数据缓冲字典。

函数定义:ComponentWorker

组件级别的多进程同步器,用于执行独立子进程的组件采集逻辑。

def ComponentWorker(component_class, component_name, component_setup_input,
                    component_collect_info, data_buffer: DataBuffer,
                    time_lock: Semaphore, start_event: Event,
                    finish_event: Event, process_name: str):
    '''
    组件级别的多进程同步器, 用于多进程数据采集, 如果希望是多进程的同步控制也可以稍微改下代码添加一个共享的信号输入
    输入:
    component_class: 你的组件类, 可以自定义的是controller / sensor, class
    component_name: 你希望组件的名称, 用于对应组件info的输出, str
    component_setup_input: 组件初始化需要设置的信息, List[Any]
    component_collect_info: 组件采集的数据种类, List[str]
    data_buffer: 初始化一个同步所有组件的内存空间, DataBuffer
    time_lock: 初始化对于当前组件的时间同步锁, 该锁需要分配给time_scheduler用于控制时间, multiprocessing::Semaphore
    start_event: 同步开始事件, 所有的组件共用一个, multiprocessing::Event
    finish_event: 同步结束事件, 所有的组件共用一个, multiprocessing::Event
    process_name:你希望当前进程叫什么, 用于对应子进程info的输出, str
    '''

参数说明

参数名 类型 说明
component_class Type 自定义组件类(如 TestArmController, TestVisionSensor
component_name str 组件名称(用于调试和分类数据)
component_setup_input Optional[List[Any]] 组件初始化参数,如为 None 则使用默认构造函数
component_collect_info List[str] 该组件要采集的数据类型(如 ["color"], ["joint", "qpos"]
data_buffer DataBuffer 数据共享对象,跨进程存储采集结果
time_lock Semaphore 控制时间步的锁,与时间调度器配合控制采集频率
start_event Event 由主线程触发的全局“开始”事件
finish_event Event 全局“结束”事件,触发后各子进程结束任务
process_name str 当前进程的标识名(用于 debug 日志输出)

示例:主函数使用方式(简略)

manager = Manager()
data_buffer = DataBuffer(manager)
time_lock = Semaphore(0)
start_event = Event()
finish_event = Event()

vision_proc = Process(target=ComponentWorker, args=(
    TestVisionSensor, "vision", None, ["color"],
    data_buffer, time_lock, start_event, finish_event, "vision_proc"
))
vision_proc.start()

实现自己的组件类(Controller 或 Sensor)

组件类需要实现以下接口:

必要函数

set_up(*args)

  • 初始化组件,接收任意参数。
  • 无返回值。

set_collect_info(info: List[str])

  • 告知组件采集哪些类型的数据。

get() -> Dict[str, Any]

  • 获取当前采集结果,作为一个字典返回。

如需整理更多类或模块,随时贴上来我继续按此格式编写。