What is the intended way of sending an object of type bytes
in a Python ROS2 node?
The best solution I have right now uses UInt8MultiArray
and a combination of list
to convert bytes
into a list of integers in the sender, and bytes
to convert the list of integers back to bytes
in the receiver.
Here is an example of code that uses UInt8MultiArray with list/bytes conversion:
<code>import rclpy
from rclpy.node import Node
from std_msgs.msg import UInt8MultiArray
class Test(Node):
def __init__(self):
super().__init__('minimal_pubsub')
self.publisher_ = self.create_publisher(UInt8MultiArray, 'topic', 10)
self.create_subscription(UInt8MultiArray, 'topic',self.listener_callback, 10)
self.timer = self.create_timer(1, self.timer_callback)
def timer_callback(self):
input_bytes = b'Bytes'
msg = UInt8MultiArray(data = list(input_bytes))
self.publisher_.publish(msg)
def listener_callback(self, msg):
output_bytes = bytes(msg.data)
print(output_bytes)
def main(args=None):
rclpy.init(args=args)
test= Test()
rclpy.spin(test)
test.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
</code>
<code>import rclpy
from rclpy.node import Node
from std_msgs.msg import UInt8MultiArray
class Test(Node):
def __init__(self):
super().__init__('minimal_pubsub')
self.publisher_ = self.create_publisher(UInt8MultiArray, 'topic', 10)
self.create_subscription(UInt8MultiArray, 'topic',self.listener_callback, 10)
self.timer = self.create_timer(1, self.timer_callback)
def timer_callback(self):
input_bytes = b'Bytes'
msg = UInt8MultiArray(data = list(input_bytes))
self.publisher_.publish(msg)
def listener_callback(self, msg):
output_bytes = bytes(msg.data)
print(output_bytes)
def main(args=None):
rclpy.init(args=args)
test= Test()
rclpy.spin(test)
test.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
</code>
import rclpy
from rclpy.node import Node
from std_msgs.msg import UInt8MultiArray
class Test(Node):
def __init__(self):
super().__init__('minimal_pubsub')
self.publisher_ = self.create_publisher(UInt8MultiArray, 'topic', 10)
self.create_subscription(UInt8MultiArray, 'topic',self.listener_callback, 10)
self.timer = self.create_timer(1, self.timer_callback)
def timer_callback(self):
input_bytes = b'Bytes'
msg = UInt8MultiArray(data = list(input_bytes))
self.publisher_.publish(msg)
def listener_callback(self, msg):
output_bytes = bytes(msg.data)
print(output_bytes)
def main(args=None):
rclpy.init(args=args)
test= Test()
rclpy.spin(test)
test.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
This works, but it does not seem very elegant.
I have not succeeded in creating a working and elegant solution with ByteMultiArray
. For example, the following code:
<code>...
input_bytes = b'Bytes'
msg = ByteMultiArray(data = input_bytes)
...
</code>
<code>...
input_bytes = b'Bytes'
msg = ByteMultiArray(data = input_bytes)
...
</code>
...
input_bytes = b'Bytes'
msg = ByteMultiArray(data = input_bytes)
...
gives an error.
<code>AssertionError: The 'data' field must be a set or sequence and each value of type 'bytes'
</code>
<code>AssertionError: The 'data' field must be a set or sequence and each value of type 'bytes'
</code>
AssertionError: The 'data' field must be a set or sequence and each value of type 'bytes'