ROS2 Ar Track & Turtlebot3 실물 구동
Ar Track
패키지 생성
ar_track 이라는 패키지명
pub_tb3_pose2d.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from turtlesim.msg import Pose
from nav_msgs.msg import Odometry
from math import radians, degrees, pi
from tf_transformations import euler_from_quaternion#, quaternion_from_euler
class TB3Pose2D(Node):
def __init__(self):
super().__init__('pub_tb3_pose2d')
qos_profile = QoSProfile(depth=10)
self.sub_ar_pose = self.create_subscription(
Odometry, # topic type
'odom', # topic name
self.get_odom_, # callback function
qos_profile)
self.pub_pose2d = self.create_publisher(Pose, 'tb3pose2d', qos_profile)
self.prv_theta = 0.0
self.theta_sum = 0.0
def get_odom_(self, msg):
pos_x, pos_y, theta = self.get_pose(msg)
pose2d = Pose() # turtlesim.msg.Pose()
pose2d.x = pos_x
pose2d.y = pos_y
# pose2d.theta
pose2d.linear_velocity = msg.twist.twist.linear.x
pose2d.angular_velocity = msg.twist.twist.linear.x
if (theta - self.prv_theta) > radians(270): # 5.0: # 5.0(rad) = 286.479(deg)
d_theta = (theta - self.prv_theta) - 2 * pi
elif (theta - self.prv_theta) < -radians(270): #-5.0: # -5.0(rad) = -286.479(deg)
d_theta = (theta - self.prv_theta) + 2 * pi
else:
d_theta = (theta - self.prv_theta)
self.theta_sum = self.theta_sum + d_theta
self.prv_theta = theta
pose2d.theta = self.theta_sum
self.pub_pose2d.publish(pose2d)
self.print_pose(pose2d)
def get_pose(self, msg):
q = ( msg.pose.pose.orientation.x,
msg.pose.pose.orientation.y,
msg.pose.pose.orientation.z,
msg.pose.pose.orientation.w )
# quart[0] = roll
euler = euler_from_quaternion(q) # quart[1] = pitch
theta = euler[2] # quart[2] = yaw <----
# make theta within from 0 to 360 degree
if theta < 0:
theta = theta + pi * 2
if theta > pi * 2:
theta = theta - pi * 2
pos_x = msg.pose.pose.position.x
pos_y = msg.pose.pose.position.y
return pos_x, pos_y, theta
def print_pose(self, msg):
print("x = %s, y = %s, th = %s = %s" %(round(msg.x,2),round(msg.y,2),round(msg.theta,3),round(degrees(msg.theta),2)))
def main(args=None):
rclpy.init(args=args)
node = TB3Pose2D()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt(SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
move_tb3.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from nav_msgs.msg import Odometry
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from math import degrees, sqrt, pi
# Turtlebot3 Specification
MAX_LIN_SPEED = 0.22
MAX_ANG_SPEED = 2.84
# make default speed of linear & angular
LIN_SPD = MAX_LIN_SPEED * 0.125
ANG_SPD = MAX_ANG_SPEED * 0.125
class MoveTB3(Node):
def __init__(self):
super().__init__('move_tb3')
qos_profile = QoSProfile(depth=10)
# define subscriber
self.sub_ar_pose = self.create_subscription(
Pose, # topic type
'tb3pose2d', # topic name
self.get_tb3_pose_, # callback function
qos_profile)
#self.timer = self.create_timer(0.1, self.get_tb3_pose_)
self.pub_tw = self.create_publisher(Twist, 'cmd_vel', qos_profile)
self.tb3pose = self.org = Pose()
#self.node = Node()
def get_tb3_pose_(self, msg):
self.tb3pose = msg
def update_org(self):
self.org = self.tb3pose
#print(self.org.theta)
def elapsed_dist(self):
return sqrt(pow((self.tb3pose.x - self.org.x), 2) + pow((self.tb3pose.y - self.org.y), 2))
def straight(self, distance):
rclpy.spin_once(self)
tw = Twist()
self.update_org()
print("straight start from (%s, %s)" %(round(self.org.x, 2), round(self.org.y, 2)))
if distance >= 0: # +distance
tw.linear.x = LIN_SPD
else: # -distance
tw.linear.x = -LIN_SPD
#self.pub_tw.publish(tw)
while rclpy.ok():
rclpy.spin_once(self)
self.pub_tw.publish(tw)
if self.elapsed_dist() < abs(distance): pass
else: break
#print("%s(m) of %s(m)" %(round(self.elapsed_dist(),2), round(abs(distance),2)))
tw.linear.x = 0.0; self.pub_tw.publish(tw)
print("straight stop to (%s, %s)" %(round(self.tb3pose.x, 2), round(self.tb3pose.y, 2)))
def elapsed_angle(self):
return abs(self.tb3pose.theta - self.org.theta)
def rotate(self, angle):
rclpy.spin_once(self)
tw = Twist()
self.update_org()
print("rotate start from: %s" %(round(degrees(self.org.theta), 2)))
if angle >= 0.0: # angle(+): rotate left(ccw)
tw.angular.z = ANG_SPD
else: # angle(-): rotate right(cw)
tw.angular.z = -ANG_SPD
#self.pub_tw.publish(tw)
while rclpy.ok():
rclpy.spin_once(self)
self.pub_tw.publish(tw)
if self.elapsed_angle() < abs(angle): pass
else: break
#print("%s of %s" %(round(degrees(self.elapsed_angle()),2) ,round(degrees(abs(angle)),2)))
tw.angular.z = 0.0; self.pub_tw.publish(tw)
print("rotate stop to : %s" %(round(degrees(self.tb3pose.theta), 2)))
test_move_tb3.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from ar_track.move_tb3 import MoveTB3
from math import radians, degrees, sqrt, atan2
class TestMoveTB3(Node):
def __init__(self):
super().__init__('test_move_tb3')
qos_profile = QoSProfile(depth=10)
self.tb3 = MoveTB3()
def main(args=None):
rclpy.init(args=args)
node = TestMoveTB3()
try:
while rclpy.ok():
rclpy.spin_once(node, timeout_sec = 0.1)
angle = radians(float(input("input rotation (deg): ")))
dist = float(input("input distance (m) : "))
node.tb3.rotate(angle)
node.tb3.straight(dist)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt(SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py랑 cd robot_ws 빌드 소스 당연히 하고
entry_points={
'console_scripts': [
'pub_tb3_pose2d = ar_track.pub_tb3_pose2d:main',
'test_move_tb3 = ar_track.test_move_tb3:main',
],
},
실행
1번 터미널
ros2 launch turtlebot3_gazebo turtlebot3_world.launch.py
2번 터미널
ros2 run ar_track pub_tb3_pose2d
라이브러리가 없다고 나오면
sudo apt install ros-foxy-tf-transformations
sudo pip3 install transforms3d
3번 터미널
ros2 run ar_track test_move_tb3
터틀봇3 실물로 구동
노트북 핫스팟 연결하고, 터틀봇3 전원 켜고, 도메인아이디 똑같이 하고
1번 터틀봇3 원격 접속 터미널
ros2 launch turtlebot3_bringup robot.launch.py
2번 터미널
ros2 run ar_track pub_tb3_pose2d
3번 터미널
ros2 run ar_track test_move_tb3
ROS2 ARUCO
ros2_aruco/aruco_node.py
"""
This node locates Aruco AR markers in images and publishes their ids and poses.
Subscriptions:
/camera/image_raw (sensor_msgs.msg.Image)
/camera/camera_info (sensor_msgs.msg.CameraInfo)
/camera/camera_info (sensor_msgs.msg.CameraInfo)
Published Topics:
/aruco_poses (geometry_msgs.msg.PoseArray)
Pose of all detected markers (suitable for rviz visualization)
/aruco_markers (ros2_aruco_interfaces.msg.ArucoMarkers)
Provides an array of all poses along with the corresponding
marker ids.
Parameters:
marker_size - size of the markers in meters (default .0625)
aruco_dictionary_id - dictionary that was used to generate markers
(default DICT_5X5_250)
image_topic - image topic to subscribe to (default /camera/image_raw)
camera_info_topic - camera info topic to subscribe to
(default /camera/camera_info)
Author: Nathan Sprague
Version: 10/26/2020
"""
import rclpy
import rclpy.node
from rclpy.qos import qos_profile_sensor_data
from cv_bridge import CvBridge
import numpy as np
import cv2
from ros2_aruco import transformations
from sensor_msgs.msg import CameraInfo
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseArray, Pose
from ros2_aruco_interfaces.msg import ArucoMarkers
class ArucoNode(rclpy.node.Node):
def __init__(self):
super().__init__('aruco_node')
# Declare and read parameters
self.declare_parameter("marker_size", .042)
self.declare_parameter("aruco_dictionary_id", "DICT_5X5_250")
self.declare_parameter("image_topic", "/image_raw")
self.declare_parameter("camera_info_topic", "/camera/image/camera_info")
self.declare_parameter("camera_frame", "camera")
self.marker_size = self.get_parameter("marker_size").get_parameter_value().double_value
dictionary_id_name = self.get_parameter(
"aruco_dictionary_id").get_parameter_value().string_value
image_topic = self.get_parameter("image_topic").get_parameter_value().string_value
info_topic = self.get_parameter("camera_info_topic").get_parameter_value().string_value
self.camera_frame = self.get_parameter("camera_frame").get_parameter_value().string_value
# Make sure we have a valid dictionary id:
try:
dictionary_id = cv2.aruco.__getattribute__(dictionary_id_name)
if type(dictionary_id) != type(cv2.aruco.DICT_5X5_100):
raise AttributeError
except AttributeError:
self.get_logger().error("bad aruco_dictionary_id: {}".format(dictionary_id_name))
options = "\n".join([s for s in dir(cv2.aruco) if s.startswith("DICT")])
self.get_logger().error("valid options: {}".format(options))
# Set up subscriptions
self.info_sub = self.create_subscription(CameraInfo,
info_topic,
self.info_callback,
qos_profile_sensor_data)
self.create_subscription(Image, image_topic,
self.image_callback, qos_profile_sensor_data)
# Set up publishers
self.poses_pub = self.create_publisher(PoseArray, 'aruco_poses', 10)
self.markers_pub = self.create_publisher(ArucoMarkers, 'aruco_markers', 10)
# Set up fields for camera parameters
self.info_msg = None
self.intrinsic_mat = None
self.distortion = None
self.aruco_dictionary = cv2.aruco.Dictionary_get(dictionary_id)
self.aruco_parameters = cv2.aruco.DetectorParameters_create()
self.bridge = CvBridge()
def info_callback(self, info_msg):
self.info_msg = info_msg
self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3))
self.distortion = np.array(self.info_msg.d)
# Assume that camera parameters will remain the same...
self.destroy_subscription(self.info_sub)
def image_callback(self, img_msg):
if self.info_msg is None:
self.get_logger().warn("No camera info has been received!")
return
# cv_image = np.full((320,240), 255, dtype=np.uint8)
# try:
# cv_image = self.bridge.compressed_imgmsg_to_cv2(img_msg, desired_encoding='passthrough')
cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding='8UC3')
# except:
# pass
print("---")
markers = ArucoMarkers()
pose_array = PoseArray()
if self.camera_frame is None:
markers.header.frame_id = self.info_msg.header.frame_id
pose_array.header.frame_id = self.info_msg.header.frame_id
else:
markers.header.frame_id = self.camera_frame
pose_array.header.frame_id = self.camera_frame
markers.header.stamp = img_msg.header.stamp
pose_array.header.stamp = img_msg.header.stamp
corners, marker_ids, rejected = cv2.aruco.detectMarkers(cv_image,
self.aruco_dictionary,
parameters=self.aruco_parameters)
if marker_ids is not None:
if cv2.__version__ > '4.0.0':
rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers(corners,
self.marker_size, self.intrinsic_mat,
self.distortion)
else:
rvecs, tvecs = cv2.aruco.estimatePoseSingleMarkers(corners,
self.marker_size, self.intrinsic_mat,
self.distortion)
for i, marker_id in enumerate(marker_ids):
pose = Pose()
pose.position.x = tvecs[i][0][0]
pose.position.y = tvecs[i][0][1]
pose.position.z = tvecs[i][0][2]
rot_matrix = np.eye(4)
rot_matrix[0:3, 0:3] = cv2.Rodrigues(np.array(rvecs[i][0]))[0]
quat = transformations.quaternion_from_matrix(rot_matrix)
pose.orientation.x = quat[0]
pose.orientation.y = quat[1]
pose.orientation.z = quat[2]
pose.orientation.w = quat[3]
pose_array.poses.append(pose)
markers.poses.append(pose)
markers.marker_ids.append(marker_id[0])
self.poses_pub.publish(pose_array)
self.markers_pub.publish(markers)
def main():
rclpy.init()
node = ArucoNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
ar_track 패키지 안에 img_compressed2raw.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import Image, CompressedImage
from cv_bridge import CvBridge
import cv2
import numpy as np
class ImageCovertor(Node):
def __init__(self):
super().__init__('img_convert')
qos_profile = QoSProfile(depth=10)
self.subscription = self.create_subscription(CompressedImage,
'camera/image/compressed',
self.get_compressed,
10)
self.pub_img = self.create_publisher(Image, 'image_raw', qos_profile)
self.bridge = CvBridge()
def get_compressed(self, msg):
self.cv_img = self.bridge.compressed_imgmsg_to_cv2(msg, "bgr8")
self.img_msg = self.bridge.cv2_to_imgmsg(self.cv_img)#, "bgr8")
self.pub_img.publish(self.img_msg)
def main(args=None):
rclpy.init(args=args)
node = ImageCovertor()
try:
print("start publish image_raw...")
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt(SIGINT)')
print("finish publish image_raw...")
if __name__ == '__main__':
main()
마커 저장
# ros2 run ros2_aruco aruco_generate_marker --id 숫자
ros2 run ros2_aruco aruco_generate_marker --id 9
카메라
1번 터틀봇3 원격 접속 터미널
ros2 run raspicam2 raspicam2_node --ros-args --params-file `ros2 pkg prefix raspicam2`/share/raspicam2/cfg/params.yaml
2번 터미널
ros2 run rqt_image_view rqt_image_view
터틀봇3 원격 접속 터미널에서 확인했을 때
ros2 topic list
/*
/camera/image/camera_info
/camera/image/compressed
/parameter_events
/rosout
*/
3번 터미널
ros2 run ar_track img_compressed2raw
터틀봇3 원격 접속 터미널에서 확인했을 때
ros2 topic list
/*
/camera/image/camera_info
/camera/image/compressed
/image_raw
/parameter_events
/rosout
*/
4번 터미널
ros2 run ros2_aruco aruco_node
터틀봇3 원격 접속 터미널에서 확인했을 때
ros2 topic list
/*
/aruco_markers
/aruco_poses
/battery_state
/camera/image/camera_info
/camera/image/compressed
/cmd_vel
/image_raw
/imu
/joint_states
/magnetic_field
/odom
/parameter_events
/robot_description
/rosout
/scan
/sensor_state
/tf
/tf_static
*/
5번 터미널
ros2 topic echo /aruco_markers
Comments