目次
パブリッシャー(Publisher)とは
ROS 2の通信モデルの中で、特定の「トピック」に対してメッセージを送信する役割を持つノードのこと。
パブリッシャーの役割
ROS 2のノード同士の通信は、トピック(Topic)という名前付きのデータストリームを通じて行われる。
パブリッシャーは、そのトピックにメッセージを送信し、他のノード(サブスクライバーと呼ばれる)が
そのメッセージを受信します。
例えば、センサーからのデータ(温度やカメラ画像など)を送るノードはパブリッシャーとして機能します。
一方、そのデータを処理するノードはサブスクライバーとしてデータを受信します。
パブリッシャーの仕組み
- パブリッシャーを作成:ノード内で
create_publisher()
関数を使って、メッセージの型とトピック名を
指定してパブリッシャーを作成する。 - メッセージを送信:パブリッシャーが生成された後、
publish()
関数を使って定期的にメッセージを
トピックに送信する。 - サブスクライバーが受け取る: 他のノードがそのトピックにサブスクライブしている場合、
そのメッセージを受信して処理を行う。
Publisherのコード
TrialPublisherというクラスがパブリッシャーの役割を持ち、trial_topicというトピックにStringメッセージを
毎秒送信するコード。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TrialPublisher(Node):
def __init__(self):
super().__init__('trial_publisher')
self.publisher_ = self.create_publisher(String, 'trial_topic', 10)
timer_period = 1
self.timere = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = 'Hello! %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing, "%s"' % msg.data)
self.i += 1
def main(args=None):
try:
rclpy.init(args=args)
trial_publisher = TrialPublisher()
rclpy.spin(trial_publisher)
except KeyboardInterrupt:
pass
finally:
trial_publisher.destroy_node()
rclpy.shutdown()
if __name__== '__main__':
main()
必要なモジュールのインポート
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
- rclpy:ROS 2のPythonクライアントライブラリ。
これを使ってROS 2ノードや通信機能を扱う。 - Node:rclpyのNodeクラスを使うことで、ROS 2でのノード(通信の単位)を作成する。
ROS 2のノードの基本クラス。すべてのROS 2ノードはこのクラスを継承します。 - String:std_msgsパッケージからのメッセージ型で、文字列を扱うメッセージ。
標準メッセージの一つ。文字列データ型のメッセージを送受信する際に使用する。
ノードの定義
class TrialPublisher(Node):
def __init__(self):
super().__init__('trial_publisher')
self.publisher_ = self.create_publisher(String, 'trial_topic', 10)
timer_period = 1
self.timere = self.create_timer(timer_period, self.timer_callback)
self.i = 0
- TrialPublisherクラス:このクラスはNodeクラスを継承し、パブリッシャー機能を持つノードとして定義。
- super().__init__(‘trial_publisher’):親クラスNodeのコンストラクタを呼び出し、
このノードの名前をtrial_publisher
に設定。 - self.publisher_ = self.create_publisher(String, ‘trial_topic’, 10):文字列型のメッセージをtrial_topicという
トピックにパブリッシュするためのパブリッシャーを作成する。
10はキューのサイズを意味する。- self.publisher:selfはクラスのインスタンスを指す。self.pubはこのインスタンス変数に
パブリッシャーオブジェクトを格納している。これにより、クラスの他のメソッドからもアクセスできるようになる。 - self.create_publisher():Nodeクラス(今回のクラスTrialPublisherはこれを継承)のメソッドで、
このメソッドを使って、特定のメッセージ型とトピック名を持つパブリッシャーを作成している。
- self.publisher:selfはクラスのインスタンスを指す。self.pubはこのインスタンス変数に
- self.timere =self.create_timer(1, self.timer_callback):
タイマーを1秒ごとに実行し、そのたびにself.timer_callbackを呼び出す。- self.create_timer():Nodeクラスのメソッド。
指定した周期で特定のコールバック関数を呼び出すタイマーを作成する。
- self.create_timer():Nodeクラスのメソッド。
- self.i = 0: 変数iはメッセージ内のカウント。
タイマーコールバック
このメソッドは、1秒ごとにタイマーがトリガーされるたびに呼ばれる。
def timer_callback(self):
msg = String()
msg.data = 'Hello! %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing, "%s"' % msg.data)
self.i += 1
- msg = String():メッセージオブジェクトを作成。
- msg.data = ‘Hello! %d’ % self.i:msg.dataにHello! %dという形式で文字列を格納します。
この%dはself.iの値(メッセージごとにカウントアップ)に置き換えられる。 - self.publisher_.publish(msg):でメッセージを
trial_topic
にパブリッシュする。 - self.get_logger().info(‘Publishing, “%s”‘ % msg.data):ROS 2のログ機能を使って、
コンソールにパブリッシュしたメッセージ内容を出力する。 - self.i += 1:メッセージのカウントが増えるようにしている。
メイン関数
def main(args=None):
try:
rclpy.init(args=args)
trial_publisher = TrialPublisher()
rclpy.spin(trial_publisher)
except KeyboardInterrupt:
pass
finally:
trial_publisher.destroy_node()
rclpy.shotdown()
- rclpy.init(args=args):ROS 2の初期化処理を行う。※ここで、ROS 2ノードが起動できる準備が整う。
- trial_publisher = TrialPublisher():TrialPublisherノードをインスタンス化。
- rclpy.spin(trial_publisher):この関数は、ノードが終了するまでノードを実行し続ける。
(たとえば、Ctrl+C
が押されるまで) - except KeyboardInterrupt:
Ctrl+C
でプログラムを中断した際に発生する例外をキャッチし、
プログラムが安全に終了できるようにする。 - trial_publisher.destroy_node():ノードのクリーンアップを行う。
- rclpy.shutdown():ROS 2を安全にシャットダウンする。
エントリーポイント
if __name__== '__main__':
main()
- スクリプトが直接実行された場合にmain()関数を呼び出すためのエントリーポイント.
ChatGPTによるパブリッシャーの最小コード
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TrialPublisher(Node):
def __init__(self):
super().__init__('trial_publisher')
self.pub = self.create_publisher(String, 'trial_topic', 10)
self.create_timer(1, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String(data=f'Hello! {self.i}')
self.pub.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main():
rclpy.init()
rclpy.spin(TrialPublisher())
rclpy.shutdown()
if __name__ == '__main__':
main()
簡潔なポイント
- タイマー、パブリッシャー、カウンターを直接初期化して処理をまとめました。
try-except-finally
ブロックは不要なので省略。- メッセージの生成と
data
の設定を簡潔に表現しました(f-string
を使用)。 - ノードの生成とスピン処理を1行に統合しました。
このコードは、元のコードと同じ機能を持ちながら、シンプルに記述されています。
コメント