6 분 소요

Hits

Ar Track

패키지 생성

ar_track 이라는 패키지명

pub_tb3_pose2d.py

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from turtlesim.msg import Pose
from nav_msgs.msg import Odometry
from math import radians, degrees, pi
from tf_transformations import euler_from_quaternion#, quaternion_from_euler

class TB3Pose2D(Node):

    def __init__(self):    
        super().__init__('pub_tb3_pose2d')
        qos_profile = QoSProfile(depth=10)
        
        self.sub_ar_pose = self.create_subscription(
            Odometry,       # topic type
            'odom',         # topic name
            self.get_odom_, # callback function
            qos_profile)
        self.pub_pose2d = self.create_publisher(Pose, 'tb3pose2d', qos_profile)
                
        self.prv_theta = 0.0
        self.theta_sum = 0.0        
        
    def get_odom_(self, msg):
        
        pos_x, pos_y, theta = self.get_pose(msg)
        
        pose2d       = Pose()   # turtlesim.msg.Pose()
        pose2d.x     = pos_x
        pose2d.y     = pos_y
        # pose2d.theta
        pose2d.linear_velocity  = msg.twist.twist.linear.x
        pose2d.angular_velocity = msg.twist.twist.linear.x
        
        if   (theta - self.prv_theta) >  radians(270): # 5.0: #  5.0(rad) =  286.479(deg)
            d_theta = (theta - self.prv_theta) - 2 * pi            
        elif (theta - self.prv_theta) < -radians(270): #-5.0: # -5.0(rad) = -286.479(deg)
            d_theta = (theta - self.prv_theta) + 2 * pi
        else:
            d_theta = (theta - self.prv_theta)

        self.theta_sum = self.theta_sum + d_theta
        self.prv_theta = theta
               
        pose2d.theta = self.theta_sum
        
        self.pub_pose2d.publish(pose2d)
        self.print_pose(pose2d)
        
        
    def get_pose(self, msg):
        
        q = ( msg.pose.pose.orientation.x,
              msg.pose.pose.orientation.y,
              msg.pose.pose.orientation.z, 
              msg.pose.pose.orientation.w )
                                            # quart[0] = roll
        euler = euler_from_quaternion(q)    # quart[1] = pitch
        theta = euler[2]                    # quart[2] = yaw <----
        
    	# make theta within from 0 to 360 degree
        if theta < 0:
            theta = theta + pi * 2
        if theta > pi * 2:
            theta = theta - pi * 2

        pos_x = msg.pose.pose.position.x
        pos_y = msg.pose.pose.position.y

        return pos_x, pos_y, theta
        
        
    def print_pose(self, msg):
        print("x = %s, y = %s, th = %s = %s" %(round(msg.x,2),round(msg.y,2),round(msg.theta,3),round(degrees(msg.theta),2)))


def main(args=None):
    rclpy.init(args=args)
    node = TB3Pose2D()
    
    try:
        rclpy.spin(node)
                
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt(SIGINT)')
        
    finally:
        node.destroy_node()
        rclpy.shutdown()
    
            
if __name__ == '__main__':
    main()

move_tb3.py

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from nav_msgs.msg import Odometry
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from math import degrees, sqrt, pi

# Turtlebot3 Specification
MAX_LIN_SPEED =  0.22
MAX_ANG_SPEED =  2.84

# make default speed of linear & angular
LIN_SPD = MAX_LIN_SPEED * 0.125
ANG_SPD = MAX_ANG_SPEED * 0.125


class MoveTB3(Node):

    def __init__(self):
        super().__init__('move_tb3')
        qos_profile = QoSProfile(depth=10)
        # define subscriber
        self.sub_ar_pose = self.create_subscription(
            Pose,               # topic type
            'tb3pose2d',        # topic name
            self.get_tb3_pose_, # callback function
            qos_profile)
        #self.timer  = self.create_timer(0.1, self.get_tb3_pose_)
        self.pub_tw = self.create_publisher(Twist, 'cmd_vel', qos_profile)       
        
        self.tb3pose  = self.org = Pose()
        #self.node = Node()
        
         
    def get_tb3_pose_(self, msg):
        self.tb3pose = msg
        
    
    def update_org(self):
        self.org = self.tb3pose
        #print(self.org.theta)
        
    def elapsed_dist(self):
        return sqrt(pow((self.tb3pose.x - self.org.x), 2) + pow((self.tb3pose.y - self.org.y), 2))
        
    
    def straight(self, distance):
        rclpy.spin_once(self)
        tw = Twist()
        self.update_org()
        print("straight start from (%s, %s)" %(round(self.org.x, 2), round(self.org.y, 2)))
        
        if distance >= 0:   # +distance
            tw.linear.x =  LIN_SPD
        else:               # -distance
            tw.linear.x = -LIN_SPD
                        
        #self.pub_tw.publish(tw)        
        while rclpy.ok():
            rclpy.spin_once(self)
            self.pub_tw.publish(tw)
            if self.elapsed_dist() < abs(distance):  pass
            else:   break
            #print("%s(m) of %s(m)" %(round(self.elapsed_dist(),2), round(abs(distance),2)))
        
        tw.linear.x = 0.0;    self.pub_tw.publish(tw)
        print("straight stop to    (%s, %s)" %(round(self.tb3pose.x, 2), round(self.tb3pose.y, 2)))
    
        
    def elapsed_angle(self):
        return abs(self.tb3pose.theta - self.org.theta)
        
    def rotate(self, angle):
        rclpy.spin_once(self)
        tw = Twist()
        self.update_org()
        print("rotate start from: %s" %(round(degrees(self.org.theta), 2)))
        
        if angle >= 0.0:	# angle(+): rotate left(ccw)
            tw.angular.z =  ANG_SPD
        else:			# angle(-): rotate right(cw)
            tw.angular.z = -ANG_SPD
            
        #self.pub_tw.publish(tw)
        while rclpy.ok():
            rclpy.spin_once(self)
            self.pub_tw.publish(tw)
            if self.elapsed_angle() < abs(angle):    pass
            else:   break
            #print("%s of %s" %(round(degrees(self.elapsed_angle()),2) ,round(degrees(abs(angle)),2)))
            
        tw.angular.z =  0.0;  self.pub_tw.publish(tw)
        print("rotate stop to   : %s" %(round(degrees(self.tb3pose.theta), 2)))

test_move_tb3.py

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from ar_track.move_tb3 import MoveTB3
from math import radians, degrees, sqrt, atan2


class TestMoveTB3(Node):

 def __init__(self):
        
        super().__init__('test_move_tb3')
        qos_profile = QoSProfile(depth=10)
        self.tb3 = MoveTB3()

def main(args=None):
    rclpy.init(args=args)
    node = TestMoveTB3()
    
    try:
        while rclpy.ok():
            rclpy.spin_once(node, timeout_sec = 0.1)
            angle = radians(float(input("input rotation (deg): ")))
            dist = float(input("input distance (m)  : "))
        
            node.tb3.rotate(angle)
            node.tb3.straight(dist)
                
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt(SIGINT)')
        
    finally:
        node.destroy_node()
        rclpy.shutdown()
    
            
if __name__ == '__main__':
    main()

setup.py랑 cd robot_ws 빌드 소스 당연히 하고

entry_points={
        'console_scripts': [
            'pub_tb3_pose2d  = ar_track.pub_tb3_pose2d:main',
            'test_move_tb3   = ar_track.test_move_tb3:main',
        ],
    },

실행

1번 터미널

ros2 launch turtlebot3_gazebo turtlebot3_world.launch.py 

2번 터미널

ros2 run ar_track pub_tb3_pose2d

라이브러리가 없다고 나오면

sudo apt install ros-foxy-tf-transformations
sudo pip3 install transforms3d

3번 터미널

ros2 run ar_track test_move_tb3

터틀봇3 실물로 구동

노트북 핫스팟 연결하고, 터틀봇3 전원 켜고, 도메인아이디 똑같이 하고

1번 터틀봇3 원격 접속 터미널

ros2 launch turtlebot3_bringup robot.launch.py

2번 터미널

ros2 run ar_track pub_tb3_pose2d

3번 터미널

ros2 run ar_track test_move_tb3




ROS2 ARUCO

ROS2 aruco

ros2_aruco/aruco_node.py

"""
This node locates Aruco AR markers in images and publishes their ids and poses.

Subscriptions:
   /camera/image_raw (sensor_msgs.msg.Image)
   /camera/camera_info (sensor_msgs.msg.CameraInfo)
   /camera/camera_info (sensor_msgs.msg.CameraInfo)

Published Topics:
    /aruco_poses (geometry_msgs.msg.PoseArray)
       Pose of all detected markers (suitable for rviz visualization)

    /aruco_markers (ros2_aruco_interfaces.msg.ArucoMarkers)
       Provides an array of all poses along with the corresponding
       marker ids.

Parameters:
    marker_size - size of the markers in meters (default .0625)
    aruco_dictionary_id - dictionary that was used to generate markers
                          (default DICT_5X5_250)
    image_topic - image topic to subscribe to (default /camera/image_raw)
    camera_info_topic - camera info topic to subscribe to
                         (default /camera/camera_info)

Author: Nathan Sprague
Version: 10/26/2020

"""

import rclpy
import rclpy.node
from rclpy.qos import qos_profile_sensor_data
from cv_bridge import CvBridge
import numpy as np
import cv2
from ros2_aruco import transformations

from sensor_msgs.msg import CameraInfo
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseArray, Pose
from ros2_aruco_interfaces.msg import ArucoMarkers


class ArucoNode(rclpy.node.Node):

    def __init__(self):
        super().__init__('aruco_node')

        # Declare and read parameters
        self.declare_parameter("marker_size", .042)
        self.declare_parameter("aruco_dictionary_id", "DICT_5X5_250")
        self.declare_parameter("image_topic", "/image_raw")
        self.declare_parameter("camera_info_topic", "/camera/image/camera_info")
        self.declare_parameter("camera_frame", "camera")

        self.marker_size = self.get_parameter("marker_size").get_parameter_value().double_value
        dictionary_id_name = self.get_parameter(
            "aruco_dictionary_id").get_parameter_value().string_value
        image_topic = self.get_parameter("image_topic").get_parameter_value().string_value
        info_topic = self.get_parameter("camera_info_topic").get_parameter_value().string_value
        self.camera_frame = self.get_parameter("camera_frame").get_parameter_value().string_value

        # Make sure we have a valid dictionary id:
        try:
            dictionary_id = cv2.aruco.__getattribute__(dictionary_id_name)
            if type(dictionary_id) != type(cv2.aruco.DICT_5X5_100):
                raise AttributeError
        except AttributeError:
            self.get_logger().error("bad aruco_dictionary_id: {}".format(dictionary_id_name))
            options = "\n".join([s for s in dir(cv2.aruco) if s.startswith("DICT")])
            self.get_logger().error("valid options: {}".format(options))

        # Set up subscriptions
        self.info_sub = self.create_subscription(CameraInfo,
                                                 info_topic,
                                                 self.info_callback,
                                                 qos_profile_sensor_data)

        self.create_subscription(Image, image_topic,
                                 self.image_callback, qos_profile_sensor_data)

        # Set up publishers
        self.poses_pub = self.create_publisher(PoseArray, 'aruco_poses', 10)
        self.markers_pub = self.create_publisher(ArucoMarkers, 'aruco_markers', 10)

        # Set up fields for camera parameters
        self.info_msg = None
        self.intrinsic_mat = None
        self.distortion = None

        self.aruco_dictionary = cv2.aruco.Dictionary_get(dictionary_id)
        self.aruco_parameters = cv2.aruco.DetectorParameters_create()
        self.bridge = CvBridge()

    def info_callback(self, info_msg):
        self.info_msg = info_msg
        self.intrinsic_mat = np.reshape(np.array(self.info_msg.k), (3, 3))
        self.distortion = np.array(self.info_msg.d)
        # Assume that camera parameters will remain the same...
        self.destroy_subscription(self.info_sub)

    def image_callback(self, img_msg):

        if self.info_msg is None:
            self.get_logger().warn("No camera info has been received!")
            return
        # cv_image = np.full((320,240), 255, dtype=np.uint8)
        # try:
        # cv_image = self.bridge.compressed_imgmsg_to_cv2(img_msg, desired_encoding='passthrough')
        cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding='8UC3')
        # except: 
        #     pass
        print("---")
        markers = ArucoMarkers()
        pose_array = PoseArray()
        if self.camera_frame is None:
            markers.header.frame_id = self.info_msg.header.frame_id
            pose_array.header.frame_id = self.info_msg.header.frame_id
        else:
            markers.header.frame_id = self.camera_frame
            pose_array.header.frame_id = self.camera_frame
            
            
        markers.header.stamp = img_msg.header.stamp
        pose_array.header.stamp = img_msg.header.stamp

        corners, marker_ids, rejected = cv2.aruco.detectMarkers(cv_image,
                                                                self.aruco_dictionary,
                                                                parameters=self.aruco_parameters)
        if marker_ids is not None:

            if cv2.__version__ > '4.0.0':
                rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers(corners,
                                                                      self.marker_size, self.intrinsic_mat,
                                                                      self.distortion)
            else:
                rvecs, tvecs = cv2.aruco.estimatePoseSingleMarkers(corners,
                                                                   self.marker_size, self.intrinsic_mat,
                                                                   self.distortion)
            for i, marker_id in enumerate(marker_ids):
                pose = Pose()
                pose.position.x = tvecs[i][0][0]
                pose.position.y = tvecs[i][0][1]
                pose.position.z = tvecs[i][0][2]

                rot_matrix = np.eye(4)
                rot_matrix[0:3, 0:3] = cv2.Rodrigues(np.array(rvecs[i][0]))[0]
                quat = transformations.quaternion_from_matrix(rot_matrix)

                pose.orientation.x = quat[0]
                pose.orientation.y = quat[1]
                pose.orientation.z = quat[2]
                pose.orientation.w = quat[3]

                pose_array.poses.append(pose)
                markers.poses.append(pose)
                markers.marker_ids.append(marker_id[0])

            self.poses_pub.publish(pose_array)
            self.markers_pub.publish(markers)


def main():
    rclpy.init()
    node = ArucoNode()
    rclpy.spin(node)

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

ar_track 패키지 안에 img_compressed2raw.py

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import Image, CompressedImage
from cv_bridge import CvBridge
import cv2
import numpy as np

class ImageCovertor(Node):

    def __init__(self):
        super().__init__('img_convert')
        qos_profile = QoSProfile(depth=10)

        self.subscription = self.create_subscription(CompressedImage, 
                'camera/image/compressed', 
                self.get_compressed, 
                10)

        self.pub_img = self.create_publisher(Image, 'image_raw', qos_profile)
        self.bridge = CvBridge()

    def get_compressed(self, msg):
        self.cv_img = self.bridge.compressed_imgmsg_to_cv2(msg, "bgr8")
        self.img_msg = self.bridge.cv2_to_imgmsg(self.cv_img)#, "bgr8")
        self.pub_img.publish(self.img_msg)

def main(args=None):
    rclpy.init(args=args)
    node = ImageCovertor()
    try:
        print("start publish image_raw...") 
        rclpy.spin(node)     
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt(SIGINT)')
        print("finish publish image_raw...") 


if __name__ == '__main__':
    main()

마커 저장

# ros2 run ros2_aruco aruco_generate_marker --id 숫자
ros2 run ros2_aruco aruco_generate_marker --id 9

카메라

1번 터틀봇3 원격 접속 터미널

ros2 run raspicam2 raspicam2_node --ros-args --params-file `ros2 pkg prefix raspicam2`/share/raspicam2/cfg/params.yaml

2번 터미널

ros2 run rqt_image_view rqt_image_view

터틀봇3 원격 접속 터미널에서 확인했을 때

ros2 topic list

/*
/camera/image/camera_info
/camera/image/compressed
/parameter_events
/rosout
*/

3번 터미널

ros2 run ar_track img_compressed2raw

터틀봇3 원격 접속 터미널에서 확인했을 때

ros2 topic list

/*
/camera/image/camera_info
/camera/image/compressed
/image_raw
/parameter_events
/rosout
*/

4번 터미널

ros2 run ros2_aruco aruco_node

터틀봇3 원격 접속 터미널에서 확인했을 때

ros2 topic list

/*
/aruco_markers
/aruco_poses
/battery_state
/camera/image/camera_info
/camera/image/compressed
/cmd_vel
/image_raw
/imu
/joint_states
/magnetic_field
/odom
/parameter_events
/robot_description
/rosout
/scan
/sensor_state
/tf
/tf_static
*/

5번 터미널

ros2 topic echo /aruco_markers

태그:

카테고리:

업데이트:

Comments