【ROS2】Publisher

目次

パブリッシャー(Publisher)とは

ROS 2の通信モデルの中で、特定の「トピック」に対してメッセージを送信する役割を持つノードのこと。

パブリッシャーの役割

ROS 2のノード同士の通信は、トピック(Topic)という名前付きのデータストリームを通じて行われる。
パブリッシャーは、そのトピックにメッセージを送信し、他のノード(サブスクライバーと呼ばれる)が
そのメッセージを受信します。
例えば、センサーからのデータ(温度やカメラ画像など)を送るノードはパブリッシャーとして機能します。
一方、そのデータを処理するノードはサブスクライバーとしてデータを受信します。

パブリッシャーの仕組み

  1. パブリッシャーを作成:ノード内でcreate_publisher()関数を使って、メッセージの型とトピック名を
    指定してパブリッシャーを作成する。
  2. メッセージを送信:パブリッシャーが生成された後、publish()関数を使って定期的にメッセージを
    トピックに送信する。
  3. サブスクライバーが受け取る: 他のノードがそのトピックにサブスクライブしている場合、
    そのメッセージを受信して処理を行う。

Publisherのコード

TrialPublisherというクラスがパブリッシャーの役割を持ち、trial_topicというトピックにStringメッセージを
毎秒送信するコード。

import rclpy

from rclpy.node import Node
from std_msgs.msg import String

class TrialPublisher(Node):
    def __init__(self):
        super().__init__('trial_publisher')
        self.publisher_ = self.create_publisher(String, 'trial_topic', 10)
        timer_period = 1
        self.timere = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello! %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing, "%s"' % msg.data)
        self.i += 1

def main(args=None):
    try:
        rclpy.init(args=args)
        trial_publisher = TrialPublisher()
        rclpy.spin(trial_publisher)

    except KeyboardInterrupt:
        pass

    finally:
        trial_publisher.destroy_node()
        rclpy.shutdown()

if __name__== '__main__':
    main()

必要なモジュールのインポート

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
  • rclpy:ROS 2のPythonクライアントライブラリ。
    これを使ってROS 2ノードや通信機能を扱う。
  • Node:rclpyのNodeクラスを使うことで、ROS 2でのノード(通信の単位)を作成する。
    ROS 2のノードの基本クラス。すべてのROS 2ノードはこのクラスを継承します。
  • String:std_msgsパッケージからのメッセージ型で、文字列を扱うメッセージ。
    標準メッセージの一つ。文字列データ型のメッセージを送受信する際に使用する。

ノードの定義

class TrialPublisher(Node):
    def __init__(self):
        super().__init__('trial_publisher')
        self.publisher_ = self.create_publisher(String, 'trial_topic', 10)
        timer_period = 1
        self.timere = self.create_timer(timer_period, self.timer_callback)
        self.i = 0
  • TrialPublisherクラス:このクラスはNodeクラスを継承し、パブリッシャー機能を持つノードとして定義。
  • super().__init__(‘trial_publisher’):親クラスNodeのコンストラクタを呼び出し、
    このノードの名前をtrial_publisherに設定。
  • self.publisher_ = self.create_publisher(String, ‘trial_topic’, 10):文字列型のメッセージをtrial_topicという
    トピックにパブリッシュするためのパブリッシャーを作成する。
    10はキューのサイズを意味する。
    • self.publisherselfはクラスのインスタンスを指す。self.pubはこのインスタンス変数に
      パブリッシャーオブジェクトを格納している。これにより、クラスの他のメソッドからもアクセスできるようになる。
    • self.create_publisher():Nodeクラス(今回のクラスTrialPublisherはこれを継承)のメソッドで、
      このメソッドを使って、特定のメッセージ型とトピック名を持つパブリッシャーを作成している。
  • self.timere =self.create_timer(1, self.timer_callback)
    タイマーを1秒ごとに実行し、そのたびにself.timer_callbackを呼び出す。
    • self.create_timer():Nodeクラスのメソッド。
      指定した周期で特定のコールバック関数を呼び出すタイマーを作成する。
  • self.i = 0: 変数iはメッセージ内のカウント。

タイマーコールバック

このメソッドは、1秒ごとにタイマーがトリガーされるたびに呼ばれる。

def timer_callback(self):
    msg = String()
    msg.data = 'Hello! %d' % self.i
    self.publisher_.publish(msg)
    self.get_logger().info('Publishing, "%s"' % msg.data)
    self.i += 1
  • msg = String():メッセージオブジェクトを作成。
  • msg.data = ‘Hello! %d’ % self.i:msg.dataにHello! %dという形式で文字列を格納します。
    この%dはself.iの値(メッセージごとにカウントアップ)に置き換えられる。
  • self.publisher_.publish(msg):でメッセージをtrial_topicにパブリッシュする。
  • self.get_logger().info(‘Publishing, “%s”‘ % msg.data):ROS 2のログ機能を使って、
    コンソールにパブリッシュしたメッセージ内容を出力する。
  • self.i += 1:メッセージのカウントが増えるようにしている。

メイン関数

def main(args=None):
    try:
        rclpy.init(args=args)
        trial_publisher = TrialPublisher()
        rclpy.spin(trial_publisher)
    except KeyboardInterrupt:
        pass
    finally:
        trial_publisher.destroy_node()
        rclpy.shotdown()
  • rclpy.init(args=args):ROS 2の初期化処理を行う。※ここで、ROS 2ノードが起動できる準備が整う。
  • trial_publisher = TrialPublisher():TrialPublisherノードをインスタンス化。
  • rclpy.spin(trial_publisher):この関数は、ノードが終了するまでノードを実行し続ける。
    (たとえば、Ctrl+Cが押されるまで)
  • except KeyboardInterruptCtrl+Cでプログラムを中断した際に発生する例外をキャッチし、
    プログラムが安全に終了できるようにする。
  • trial_publisher.destroy_node():ノードのクリーンアップを行う。
  • rclpy.shutdown():ROS 2を安全にシャットダウンする。

エントリーポイント

if __name__== '__main__':
    main()
  • スクリプトが直接実行された場合にmain()関数を呼び出すためのエントリーポイント.

ChatGPTによるパブリッシャーの最小コード

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class TrialPublisher(Node):
    def __init__(self):
        super().__init__('trial_publisher')
        self.pub = self.create_publisher(String, 'trial_topic', 10)
        self.create_timer(1, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String(data=f'Hello! {self.i}')
        self.pub.publish(msg)
        self.get_logger().info(f'Publishing: "{msg.data}"')
        self.i += 1

def main():
    rclpy.init()
    rclpy.spin(TrialPublisher())
    rclpy.shutdown()

if __name__ == '__main__':
    main()

簡潔なポイント

  • タイマー、パブリッシャー、カウンターを直接初期化して処理をまとめました。
  • try-except-finallyブロックは不要なので省略。
  • メッセージの生成とdataの設定を簡潔に表現しました(f-stringを使用)。
  • ノードの生成とスピン処理を1行に統合しました。

このコードは、元のコードと同じ機能を持ちながら、シンプルに記述されています。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

目次