跳转至

类定义文档:ROS2Publisher


类介绍

class ROS2Publisher(Node):

一个基于 ROS 2 的通用发布器类,支持一次性发布和周期性持续发布。适用于实时控制命令发送,如控制机器人运动等。


类继承

类名 继承自
ROS2Publisher rclpy.node.Node

类定义代码块

class ROS2Publisher(Node):
    def __init__(self, topic_name, msg_type, continuous=True):
        ...

成员属性说明

属性名 类型 说明
topic_name str 发布的 ROS2 topic 名称
msg_type Type ROS2 消息类型,如 TwistPoseStamped
continuous bool 是否持续发布消息(默认为 True)
publisher Publisher ROS2 发布器对象
pub_msg Any 当前待发布的消息(会被周期性或一次性发布)
shutdown_event threading.Event 控制持续发布线程是否终止

函数接口

__init__

def __init__(self, topic_name, msg_type, continuous=True):

功能描述:初始化发布器节点,创建发布器对象。

参数

参数名 类型 说明
topic_name str 发布的 ROS2 话题名
msg_type Type 发布的消息类型(如 TwistPoseStamped
continuous bool 是否持续发布消息,默认为 True(每 10ms 发布一次)

返回值:无


publish_once

def publish_once(self):

功能描述:立即将当前 pub_msg 发布一次。

参数:无
返回值:无


continuous_publish

def continuous_publish(self, interval_sec=0.01):

功能描述:以固定时间间隔持续发布 pub_msg

参数

参数名 类型 说明
interval_sec float 发布时间间隔,单位为秒,默认 0.01 秒

返回值:无


update_msg

def update_msg(self, msg):

功能描述:更新待发布的消息内容。

参数

参数名 类型 说明
msg Any ROS2 消息对象

返回值:无


stop

def stop(self):

功能描述:发送停止信号终止持续发布线程,并输出日志。

参数:无
返回值:无


示例主程序

def main():
    rclpy.init()
    pub_node = ROS2Publisher('/cmd_vel', Twist, continuous=True)

    msg = Twist()
    msg.linear.x = 0.02
    pub_node.update_msg(msg)

    pub_thread = Thread(target=pub_node.continuous_publish)
    pub_thread.start()

    try:
        rclpy.spin_once(pub_node, timeout_sec=0.1)
        time.sleep(1)

        msg.linear.x = 0.0
        pub_node.update_msg(msg)
        time.sleep(1)

        msg.linear.x = -0.02
        pub_node.update_msg(msg)
        time.sleep(1)

        msg.linear.x = 0.0
        pub_node.update_msg(msg)
        time.sleep(1)

    except KeyboardInterrupt:
        pass
    finally:
        pub_node.stop()
        pub_thread.join()
        pub_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

实现自己的子类(可选)

若需要在发布前添加日志、消息校验等,可继承本类并重写 update_msg()publish_once() 函数:

class MyPublisher(ROS2Publisher):
    def update_msg(self, msg):
        print("准备发布:", msg)
        super().update_msg(msg)

多线程注意事项

  • 发布器默认启动线程持续发布,使用 threading.Event 控制退出。
  • 在主线程中仍可使用 rclpy.spin_once() 来处理 ROS2 回调(如日志或参数服务)。