目次
Subscriverのコード
ROS 2で、パブリッシャーからのメッセージを購読するサブスクライバーを作成するPythonプログラムの覚え書き。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TrialSubscriber(Node):
def __init__(self):
super().__init__('trial_subscriber')
self.subscription = self.create_subscription(
String,
'trial_topic',
self.listener_callback,
10)
self.subscription #不要かも
def listener_callback(self, msg):
self.get_logger().info('Subscribed, "%s"' % msg.data)
def main(args=None):
try:
rclpy.init(args=args)
trial_subscriber = TrialSubscriber()
rclpy.spin(trial_subscriber)
except KeyboardInterrupt:
pass
finally:
trial_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
必要なモジュールのインポート
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
- rclpy:ROS 2のPythonクライアントライブラリ。
これを使ってROS 2ノードや通信機能を扱う。 - Node:rclpyのNodeクラスを使うことで、ROS 2でのノード(通信の単位)を作成する。
- String:
std_msgs
パッケージからのメッセージ型で、文字列を扱うメッセージ。
TrialSubscriberクラスの定義
トピックのメッセージを購読できるようにする。
class TrialSubscriber(Node):
def __init__(self):
super().__init__('trial_subscriber')
self.subscription = self.create_subscription(
String,
'trial_topic',
self.listener_callback,
10)
self.subscription #不要かも
- TrialSubscriber クラス:
Node
クラスを継承して作成されている。
このクラスは、サブスクライバー(購読者)ノードを表す。 - super().__init__(‘trial_subscriber’):親クラスである
Node
のコンストラクタを呼び出し、
ノード名を'trial_subscriber'
として初期化する。 - self.create_subscription(…):サブスクリプション(購読)の設定を行う。
- String:購読するメッセージの型。
- ‘trial_topic’:購読するトピックの名前。
- self.listener_callback:メッセージが受信されたときに呼び出されるコールバック関数。
メッセージが受信されるたびにこのメソッドが呼び出される。 - 10:キューのサイズ(バッファサイズ)を指定する。
コールバック関数の定義
メッセージを受信時、listener_callback
関数が呼ばれ、メッセージの内容がログに出力される。
def listener_callback(self, msg):
self.get_logger().info('Subscribed, "%s"' % msg.data)
- listener_callback 関数:メッセージを受信したときに実行される関数。
- msg:ROS2のサブスクライバーがトピックからメッセージを受信したときに、そのメッセージデータが格納されている。(自動的に
listener_callback
メソッドに渡される。)
具体的には、次の流れでmsg
が渡される。
self.subscription = self.create_subscription(
String,
'trial_topic',
self.listener_callback,
10)
- サブスクライバーの作成:
- ここで、
self.create_subscription
メソッドが呼び出され、サブスクライバーが作成される。
このメソッドには、購読するトピック('trial_topic'
)、メッセージの型(String
)、
およびメッセージを受信したときに呼び出すコールバック関数(self.listener_callback
)が指定されている。
- ここで、
- メッセージの受信:
'trial_topic'
というトピックに新しいメッセージがパブリッシュされると、
サブスクライバーはそのメッセージを受信する。
- コールバック関数の呼び出し:
- メッセージが受信されると、
listener_callback
が呼び出される。
この際、受信したメッセージが引数msg
としてlistener_callback
に渡される。
- メッセージが受信されると、
msg
の内容:msg
は、受信したメッセージのデータを持つオブジェクト。
メッセージの型がString
であるため、msg.data
には受信した文字列データが格納されている。
- self.get_logger().info(…):ログにメッセージを出力する。
msg.data
には受信したメッセージのデータが入っている。
メイン関数の定義
ROS 2のノードを初期化して、サブスクライバーを実行する。
def main(args=None):
try:
rclpy.init(args=args)
trial_subscriber = TrialSubscriber()
rclpy.spin(trial_subscriber)
except KeyboardInterrupt:
pass
finally:
trial_subscriber.destroy_node()
rclpy.shutdown()
- main 関数:プログラムのエントリーポイント。
- rclpy.init(args=args):ROS 2の通信を初期化する。
- ROS 2のPythonプログラムを開始する際に必ず最初に呼び出すべき関数。
- ROS 2ネットワーク内での通信やサービスのためのリソースがセットアップされる。
- この初期化処理を行わずにノードを作成したり、通信を行おうとするとエラーが発生する。
- 多くのROS 2アプリケーションでは、特定の引数が指定されない限り、
args=None
としてrclpy.init(args=None)
が呼び出される。これは、特に引数が必要ない場合のデフォルトの使い方。
- TrialSubscriber():先ほど定義した
TrialSubscriber
クラスのインスタンスを作成する。 - clpy.spin(trial_subscriber):ノードをスピン(実行)し、メッセージを受信するまで待機する。
- KeyboardInterrupt:キーボードの割り込み(通常はCtrl+C)をキャッチして、ノードを安全に終了する。
- trial_subscriber.destroy_node():ノードを破棄する。
- rclpy.shutdown():ROS 2をシャットダウンする。
スクリプトのエントリーポイント
if __name__ == '__main__':
main()
- if name == ‘main‘: :このスクリプトが直接実行された場合にのみ
main()
関数を実行するという
おまじない。
Nodeクラスとは
rclpy
の中核的なクラスの一つ。※rclpyはROS 2のPythonクライアントライブラリ
ROS 2のシステムでは、ノードと呼ばれる単位で各プログラムが動作し、相互に通信をする。
このNode
クラスを利用することで、ROS 2ノードとしての基本的な機能を備えたプログラムを作成することができる。
Nodeクラスの役割
- ノードの初期化:
Node
クラスは、ROS 2システムにおける各ノードを表現する。
これにより、ノードは名前を持ち、ROS 2ネットワーク上で他のノードと通信するための
エントリーポイントとなる。
- 通信インターフェースの提供:
Node
クラスを使って、他のノードと通信するためのパブリッシャーやサブスクライバー、
サービス、アクションなどを簡単に作成できます。
たとえば、サブスクライバーを作る場合、create_subscription()
メソッドを使用する。
- ロギング機能:
- ノード内でメッセージをログとして記録するためのロギング機能が提供されている。
self.get_logger()
メソッドを使うことで、簡単にログを出力することができる。
- ノード内でメッセージをログとして記録するためのロギング機能が提供されている。
- ライフサイクル管理:
- ノードの初期化やシャットダウン、リソースのクリーンアップなど、
ノードのライフサイクルを管理する機能がある。
これにより、ノードが予期せず停止しても、システム全体が安定して動作し続けるように設計できる。
- ノードの初期化やシャットダウン、リソースのクリーンアップなど、
Nodeクラスの使い方
Node
クラスを使用するためには、通常、このクラスを継承したサブクラスを作成します。
そのサブクラスでノードの名前を指定し、必要なパブリッシャーやサブスクライバーを定義します。
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
self.get_logger().info('Node has been started')
def main():
rclpy.init()
node = MyNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
この例では、MyNode
クラスがNode
クラスを継承しており、ノードが開始されると「Node has been started」というメッセージがログに出力されます。
ChatGPTによるサブスクライバーの最小コード
rclpy
の基本機能を使ってシンプルなサブスクライバーを作成。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SimpleSubscriber(Node):
def __init__(self):
super().__init__('simple_subscriber')
self.create_subscription(String, 'trial_topic', self.listener_callback, 10)
def listener_callback(self, msg):
self.get_logger().info('Received message: "%s"' % msg.data)
def main():
rclpy.init()
node = SimpleSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
簡略化のポイント
self.subscription
の削除:self.subscription
の代入を省略。create_subscription
メソッドはオブジェクトを返すが、それを保存する必要がない場合、
そのまま無視できる。
- エラーハンドリングの省略:
try
ブロックによる例外処理を省略。
シンプルなサンプルコードとして、エラーハンドリングが不要な場合はできる。
将来的にサブスクライバーオブジェクトを操作する必要がある場合は、元のコードのような構造が有用。
コメント