ROS2 第三讲 基本操作

ROS2 第三讲 基本操作

创建工作区

source /opt/ros/foxy/setup.bash

这句命令的目的是让ros2开头的命令可以在终端使用。

创建目录, 此目录作为我们的工作区

mkdir -p ~/dev_ws/src
cd ~/dev_ws/src

在工作区中创建工作包(package), package的创建可以使用CMake或者是Python, 后面我们主要使用python.

# ros2 pkg create --build-type ament_python <package_name>
ros2 pkg create --build-type ament_python --node-name my_node my_package

上面的命令在工作区中创建了名为my_pacakge的package, 后面的参数--node-name 为我们创建了一个名为my_nodeHello World的测试文件。执行上面的命令时,终端会显示:

going to create a new package
package name: my_package
destination directory: /home/user/dev_ws/src
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['<name> <email>']
licenses: ['TODO: License declaration']
build type: ament_python
dependencies: []
node_name: my_node
creating folder ./my_package
creating ./my_package/package.xml
creating source folder
creating folder ./my_package/my_package
creating ./my_package/setup.py
creating ./my_package/setup.cfg
creating folder ./my_package/resource
creating ./my_package/resource/my_package
creating ./my_package/my_package/__init__.py
creating folder ./my_package/test
creating ./my_package/test/test_copyright.py
creating ./my_package/test/test_flake8.py
creating ./my_package/test/test_pep257.py
creating ./my_package/my_package/my_node.py

创建完成后,还需要使用colcon来构建package:

colcon build --package-select my_package

参数--package-select 告诉系统只需要构建my_package即可, 不加此参数,则工作区内的所有package一起构建。

要使用你的package及其中的可执行文件,需要在新的终端上面,在工作区目录下(本例中,即为~/dev_ws),执行:

. install/setup.bash

然后,我们就可以在此终端下执行类似如下命令:

# ros2 run <package_name> <executable_file>
ros2 run my_package my_node

终端会显示这个测试文件的输出结果

Hi from my_package.

订阅与发布topic

创建一个package:

cd ~/dev_ws/src
ros2 pkg create --build-type ament_python py_pubsub
cd py_pubsub/py_pubsub

编写发布者节点

在其中创建名为publisher_member_function.py的文件:

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'hello_world', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1


def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_publisher.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

代码解释

import rclpy
from rclpy.node import Node

from std_msgs.msg import String

引入python中操作ROS2的功能包rclpy, 后面的一句是导入内置的字符串消息类型,节点使用此数据类型在topic上面进行数据传递。

 super().__init__('minimal_publisher')

调用父类Node的构造函数来为节点命名,本例中即为:minimal_publisher.

self.publisher_ = self.create_publisher(String, 'hello_world', 10)

声明此节点将在名为hello_world的topic上面发布消息,消息类型是String,而且设定消息队列大小为10。

self.timer = self.create_timer(timer_period, self.timer_callback)

上句是调用父类Node的函数创建计时器,此计时器会每隔timer_period时间(本例为0.5秒)调用回调函数(本例为self.timer_callback).

...
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
...

上面第一句是创建String消息类型, 为其data赋值,最后将其发布出去。

在main函数中,首先初始化库rclpy,然后,构造发布者。rclpy.spin(minimal_publisher) 这句开使执行发布者,而且程序一直会"阻塞"在这一句, 后面语句不会执行,直到关闭。 后面的语句是显式地毁掉节节点, 并关闭rclpy

发布者的程序就写完了,但让它真正可又跑起来,还需要对几个文件进行处理。首先来到dev_ws/src/py_pubsub/目录下,打开package.xml文件, 并将下面几句加进去:

<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>

这是在为程序执行增加依赖。 再打开setup.py文件,在entry_pointsconsole_scripts中加入:

'talker = py_pubsub.publisher_member_function:main',

编写订阅者节点

也是在dev_ws/src/py_pubsub/py_pubsub目录下创建subscriber_member_function.py

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            String,
            'hello_world',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

只是要注意topic的名称(本例中为hello_world)一定要与发布者发布消息使用的topic名称相一致。

再打开setup.py文件,在entry_pointsconsole_scripts中加入:

'listener = py_pubsub.subscriber_member_function:main',

在构建之前,需要检查是否缺少依赖项:

rosdep install -i --from-path src --rosdistro foxy -y

开始构建:

colcon build --package-select py_pubsub

然后, 新打开终端,到dev_ws的目录下:

. install/setup.bash
ros2 run py_pubsub talker

会显示类似如下信息:

[INFO] [1630456461.097123166] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1630456461.587039320] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1630456462.086940655] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1630456462.586954812] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [1630456463.087072928] [minimal_publisher]: Publishing: "Hello World: 4"
[INFO] [1630456463.587015133] [minimal_publisher]: Publishing: "Hello World: 5"

再新打开一个终端,到dev_ws目录下:

. install/setup.bash
ros2 run py_pubsub listener

会显示类似如下信息:

[INFO] [1630456623.240335705] [minimal_subscriber]: I heard: "Hello World: 30"
[INFO] [1630456623.740374180] [minimal_subscriber]: I heard: "Hello World: 31"
[INFO] [1630456624.240430432] [minimal_subscriber]: I heard: "Hello World: 32"
[INFO] [1630456624.740421047] [minimal_subscriber]: I heard: "Hello World: 33"
[INFO] [1630456625.240359291] [minimal_subscriber]: I heard: "Hello World: 34"
[INFO] [1630456625.740324980] [minimal_subscriber]: I heard: "Hello World: 35"

响应与请求服务

进入到dev_ws/src目录中,创建新的package:

ros2 pkg create --build-type ament_python py_srvcli --dependencies rclpy example_interfaces

参数--dependencies自动将发要的依赖添加到package.xmlexample_interfaces是包含AddTwoInts.srv的package。 其数据结构:

int64 a
int64 b
---
int64 sum

前两行是请求参数,虚线下面的是响应参数。因为使用了--dependencies参数,也不就用手动地将依赖添加到package.xml.

进入到dev_ws/src/py_srvcli/py_srvcli目录,在其中创建名为service_member_function.py:

from example_interfaces.srv import AddTwoInts

import rclpy
from rclpy.node import Node


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints',                                                   self.add_two_ints_callback)

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info('Incoming request
a: %d b: %d' % (request.a, request.b))

        return response


def main(args=None):
    rclpy.init(args=args)

    minimal_service = MinimalService()

    rclpy.spin(minimal_service)

    rclpy.shutdown()


if __name__ == '__main__':
    main()

经过学习上面topic的发布与订阅,现在对这个服务的代码的也较容易上手, 此文件创建了名为add_two_ints的服务,此服务接收两个整数,然后返回这两个整数的和。

为使用ros2 run ... 命令支行上面的代码,需要将下面的内容添加到dev_ws/src/py_srvcli/setup.pyconsole_scripts中:

'service = py_srvcli.service_member_function:main',

然后再进入到dev_ws/src/py_srvcli/py_srvcli目录,编写client_member_function.py:

import sys

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node


class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddTwoInts.Request()

    def send_request(self):
        self.req.a = int(sys.argv[1])
        self.req.b = int(sys.argv[2])
        self.future = self.cli.call_async(self.req)


def main(args=None):
    rclpy.init(args=args)

    minimal_client = MinimalClientAsync()
    minimal_client.send_request()

    while rclpy.ok():
        rclpy.spin_once(minimal_client)
        if minimal_client.future.done():
            try:
                response = minimal_client.future.result()
            except Exception as e:
                minimal_client.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                minimal_client.get_logger().info(
                    'Result of add_two_ints: for %d + %d = %d' %
                    (minimal_client.req.a, minimal_client.req.b, response.sum))
            break

    minimal_client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

代码解释:

while not self.cli.wait_for_service(timeout_sec=1.0):
    self.get_logger().info('service not available, waiting again...')

这两句是客户端每秒都会检查服务是否可用。

    while rclpy.ok():
    	...

这是在检查future是否有结果返回。

然后,同样地,将下面的内容添加到dev_ws/src/py_srvcli/setup.pyconsole_scripts中:

'client = py_srvcli.client_member_function:main',

服务与客户端代码编写完成,后我们开始构建与运行,首先检查一下依赖:

'client = py_srvcli.client_member_function:main',

dev_ws目录下,构建package:

colcon build --packages-select py_srvcli

打开新的终端,并进入到dev_ws目录下:

. install/setup.bash
ros2 run py_srvcli service

再打开个终端:

. install/setup.bash
ros2 run py_srvcli client 2 3

在此终端会显示类似如下信息:

[INFO] [1630533285.392494354] [minimal_client_async]: Result of add_two_ints: for 2 + 3 = 5

同时,在上一个终端会显示类似:

[INFO] [1630533285.392494354] [minimal_client_async]: Result of add_two_ints: for 2 + 3 = 5

自定义ROS2 msg和srv文件

进入到目录dev_ws/src中,创建新的package:

ros2 pkg create --build-type ament_cmake tutorial_interfaces

这里没有使用ament_python来创建package, 是因为目前版本ament_python无法生成.msg.srv文件。不过即使在ament_python生成的package仍可使用python来编写代码。

进入到dev_ws/src/tutorial_interfaces来创建目录:

mkdir msg 
mkdir srv

进入到msg目录,创建一个新文件Num.msg:

int64 num

这个即为创建的自定义消息类型,其数据结构是一个64位的整数,名称为num.

类似地,进入到srv目录下,创建自定义服务文件AddThreeInts.srv:

int64 a
int64 b
int64 c
---
int64 sum

此服务需要在请求时提供三个整数,然后返回一个整数sum.

CMakeLists.txt文件中加入:

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Num.msg"
  "srv/AddThreeInts.srv"
 )

package.xml中加入:

<build_depend>rosidl_default_generators</build_depend>

<exec_depend>rosidl_default_runtime</exec_depend>

<member_of_group>rosidl_interface_packages</member_of_group>

然后进入到dev_ws目录下:

colcon build --packages-select tutorial_interfaces

构建package后,就可又查看刚刚自定义的消息与服务了:

. install/setup.bash
ros2 interface show tutorial_interfaces/msg/Num
ros2 interface show tutorial_interfaces/srv/AddThreeInts

为方便我们直接在上面编写的程序上面修改

import rclpy
from rclpy.node import Node

from tutorial_interfaces.msg import Num    # CHANGE


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(Num, 'helloworld', 10)     # CHANGE
        timer_period = 0.5
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = Num()                                           # CHANGE
        msg.num = self.i                                      # CHANGE
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%d"' % msg.num)  # CHANGE
        self.i += 1


def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    minimal_publisher.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
import rclpy
from rclpy.node import Node

from tutorial_interfaces.msg import Num        # CHANGE


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            Num,                                              # CHANGE
            'helloworld',
            self.listener_callback,
            10)
        self.subscription

    def listener_callback(self, msg):
            self.get_logger().info('I heard: "%d"' % msg.num) # CHANGE


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

接着在package.xml中加入:

<exec_depend>tutorial_interfaces</exec_depend>

然后编译:

colcon build --packages-select py_pubsub

我们就可在新窗口执行:

cd ~/dev_ws/
 . install/setup.bash
 ros2 run py_pubsub talker
cd ~/dev_ws/
 . install/setup.bash
 ros2 run py_pubsub listener

会打印类似如下信息:

[INFO] [1630716380.031157485] [minimal_publisher]: Publishing: "0"
[INFO] [1630716380.521359183] [minimal_publisher]: Publishing: "1"
[INFO] [1630716381.021380733] [minimal_publisher]: Publishing: "2"
[INFO] [1630716381.521212491] [minimal_publisher]: Publishing: "3"
[INFO] [1630716382.021220153] [minimal_publisher]: Publishing: "4"
[INFO] [1630716382.519948860] [minimal_publisher]: Publishing: "5"

同样地, 我们修改一下服务与客户端请求脚本:

from tutorial_interfaces.srv import AddThreeInts     # CHANGE

import rclpy
from rclpy.node import Node


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback)        # CHANGE

    def add_three_ints_callback(self, request, response):
        response.sum = request.a + request.b + request.c                                                  # CHANGE
        self.get_logger().info('Incoming request
a: %d b: %d c: %d' % (request.a, request.b, request.c)) # CHANGE

        return response

def main(args=None):
    rclpy.init(args=args)

    minimal_service = MinimalService()

    rclpy.spin(minimal_service)

    rclpy.shutdown()

if __name__ == '__main__':
    main()
from tutorial_interfaces.srv import AddThreeInts       # CHANGE
import sys
import rclpy
from rclpy.node import Node


class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddThreeInts, 'add_three_ints')       # CHANGE
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddThreeInts.Request()                                   # CHANGE

    def send_request(self):
        self.req.a = int(sys.argv[1])
        self.req.b = int(sys.argv[2])
        self.req.c = int(sys.argv[3])                  # CHANGE
        self.future = self.cli.call_async(self.req)


def main(args=None):
    rclpy.init(args=args)

    minimal_client = MinimalClientAsync()
    minimal_client.send_request()

    while rclpy.ok():
        rclpy.spin_once(minimal_client)
        if minimal_client.future.done():
            try:
                response = minimal_client.future.result()
            except Exception as e:
                minimal_client.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                minimal_client.get_logger().info(
                    'Result of add_three_ints: for %d + %d + %d = %d' %                               # CHANGE
                    (minimal_client.req.a, minimal_client.req.b, minimal_client.req.c, response.sum)) # CHANGE
            break

    minimal_client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

package.xml中加入:

<exec_depend>tutorial_interfaces</exec_depend>

编译:

colcon build --packages-select py_srvcli
cd ~/dev_ws/
 . install/setup.bash
 ros2 run py_srvcli service
cd ~/dev_ws/
 . install/setup.bash
 ros2 run py_srvcli client 2 3 4

会分类显示类似:

# service terminal
[INFO] [1630717079.441505424] [minimal_service]: Incoming request
a: 2 b: 3 c: 4
# client terminal
[INFO] [1630717079.448639918] [minimal_client_async]: Result of add_three_ints: for 2 + 3 + 4 = 9

节点参数

进入到dev_ws/src目录下,创建工作包:

ros2 pkg create --build-type ament_python python_parameters --dependencies rclpy

进入到dev_ws/src/python_parameters/python_parameters中,编写python_parameters_node.py:

import rclpy
import rclpy.node
from rcl_interfaces.msg import ParameterDescriptor
from rclpy.exceptions import ParameterNotDeclaredException
from rcl_interfaces.msg import ParameterType

class MinimalParam(rclpy.node.Node):
    def __init__(self):
        super().__init__('minimal_param_node')
        timer_period = 2  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
		my_parameter_descriptor = ParameterDescriptor(description='This parameter is mine!')
        self.declare_parameter('my_parameter', 'world',my_parameter_descriptor)

    def timer_callback(self):
        my_param = self.get_parameter('my_parameter').get_parameter_value().string_value

        self.get_logger().info('Hello %s!' % my_param)

        my_new_param = rclpy.parameter.Parameter(
            'my_parameter',
            rclpy.Parameter.Type.STRING,
            'world'
        )
        all_new_parameters = [my_new_param]
        self.set_parameters(all_new_parameters)

def main():
    rclpy.init()
    node = MinimalParam()
    rclpy.spin(node)

if __name__ == '__main__':
    main()

代码解释:

timer_period = 2  # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)

为观察参数变化,我们创建了一个简单地定时函数来打印参数的值。

 my_parameter_descriptor = ParameterDescriptor(description='This parameter is mine!')
 self.declare_parameter('my_parameter', 'world',my_parameter_descriptor)

声明了一个名为my_parameter的参数,默认值为world.

def timer_callback(self):
    my_param = self.get_parameter('my_parameter').get_parameter_value().string_value

    self.get_logger().info('Hello %s!' % my_param)

    my_new_param = rclpy.parameter.Parameter(
        'my_parameter',
        rclpy.Parameter.Type.STRING,
        'world'
    )
    all_new_parameters = [my_new_param]
    self.set_parameters(all_new_parameters)

此回调函数先是获取参数my_parameter当前值进行消费,此处只是简单地将此参数值在log中打印出来。然后,又将此参数的值设置回原默认参数,此处为world.

然后,同样地,在setup.py文件中的console_scripts中加入:

 'param_talker = python_parameters.python_parameters_node:main',

然后检查依赖、构建、进行:

cd ~/dev_ws
rosdep install -i --from-path src --rosdistro foxy -y
colcon build --packages-select python_parameters
. install/setup.bash
ros2 run python_parameters param_talker

然后会看到终端会显示类似如下信息:

[INFO] [1630969878.922440651] [minimal_param_node]: Hello world!
[INFO] [1630969880.924136402] [minimal_param_node]: Hello world!
[INFO] [1630969882.923843790] [minimal_param_node]: Hello world!
...

在新的终端输入:

ros2 param list

会显示:

/minimal_param_node:
  my_parameter
  use_sim_time

也可以在终端重设参数:

ros2 param set /minimal_param_node my_parameter earth

然后会显示:

Set parameter successful

同时在前一个终端中会显示类似:

[INFO] [1630970016.917325178] [minimal_param_node]: Hello earth!

我们也可以在launch文件中设置参数。 先在dev_ws/src/python_parameters/中创建目录launch并在其中编写python_parameters_launch.py

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='python_parameters',
            executable='param_talker',
            name='custom_parameter_node',
            output='screen',
            emulate_tty=True,
            parameters=[
                {'my_parameter': 'earth'}
            ]
        )
    ])

setup.py中加入:

import os
from glob import glob
# ...

setup(
  # ...
  data_files=[
      # ...
      (os.path.join('share', package_name), glob('launch/*_launch.py')),
    ]
  )

然后:

colcon build --packages-select python_parameters
. install/setup.bash
ros2 launch python_parameters python_parameters_launch.py

会显示类似如何信息:

[INFO] [launch]: All log files can be found below /home/[username]/.ros/log/2021-09-07-07-46-33-861137-[user-pc-name]-14461
[INFO] [launch]: Default logging verbosity is set to INFO
[INFO] [param_talker-1]: process started with pid [14463]
[param_talker-1] [INFO] [1630971996.143561574] [custom_parameter_node]: Hello earth!
[param_talker-1] [INFO] [1630971998.134791036] [custom_parameter_node]: Hello world!
...

编写action server 与action client

action是ROS2中异步通信形式。客户端发送一个目标(goal)请求后,服务端则会在实现目标之前不断的发送反馈直到目标实现,发送最终结果. 下面自定义动作并模拟客户端与服务端的交互。

首先创建新的package及action文件:

cd dev_ws/src
ros2 pkg create action_tutorials_interfaces
cd  action_tutorials_interfaces
mkdir action
cd action
vim Fibonacci.action

Fibonacci.action中加入:

int32 order
---
int32[] sequence
---
int32[] partial_sequence

.action的格式遵从:

# Request
---
# Result
---
# Feedback

构建action:

  1. action_tutorials_interfaces/CMakeLists.txt中加入(在ament_package()这句之前即可):
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/Fibonacci.action"
)
  1. action_tutorials_interfaces/package.xml中加入:
<buildtool_depend>rosidl_default_generators</buildtool_depend>

<depend>action_msgs</depend>

<member_of_group>rosidl_interface_packages</member_of_group>
  1. colcon 构建:
# Change to the root of the workspace
cd ~/dev_ws
# Build
colcon build --packages-select action_tutorials_interfaces

此时可以查看此action类型:

cd ~/dev_ws
. install/setup.bash
ros2 interface show action_tutorials_interfaces/action/Fibonacci

编写服务端

进入到dev_ws/src中:

ros2 pkg create --build-type ament_python action_tutorials_py --dependencies action_tutorials_interfaces
cd action_tutorials_py/action_tutorials_py
vim fibonacci_action_server.py

在文件中写入:

import time

from action_tutorials_interfaces.action import Fibonacci

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    try:
        rclpy.spin(fibonacci_action_server)
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()

代码解释

self._action_server = ActionServer(self, Fibonacci, 'fibonacci', self.execute_callback)

创建了名为fibonacci的action,

 goal_handle.publish_feedback(feedback_msg)

定时发送反馈到客户端,

 goal_handle.succeed()

执行此句时,说明目标成功实现.

编写客户端:


from action_tutorials_interfaces.action import Fibonacci

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()

        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()

        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

代码解释:

self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

客户端请求服务时,需要与服务端的action 名称及类型相一致。

 def send_goal(self, order):
 	 ...
 	 self._action_client.wait_for_server()
 	  self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

当服务端可提供服务时,此方法向其发送一个目标(goal),然后返回一个Future,Future代表目标未来的结果,以便追踪结果, 显然目标结果的获取是一个异步过程,因为要到后面我们才知道结果。

self._send_goal_future.add_done_callback(self.goal_response_callback)

Future结束后,执行回调函数。 此处的回调函数用来处理发送目标(goal))请求后,服务端的反馈,判断其是否被服务端接受。

    def goal_response_callback(self, future):
        ...

        self._get_result_future.add_done_callback(self.get_result_callback)

如果goal请求被服务端接受,那么当结果取得后,使用回调函数get_result_callback处理结果,本例中只是简单地将结果记录在日志中。

然后在setup.py中的console_scripts中加入:

'service = action_tutorials_py.fibonacci_action_server:main',
'client = action_tutorials_py.fibonacci_action_client:main',

然后构建:

colcon build --packages-select action_tutorials_py

启动服务端:

cd dev_ws
. install/setup.bash
 ros2 run action_tutorials_py service

再启动客户端:

cd dev_ws
. install/setup.bash
 ros2 run action_tutorials_py client

然后,会在服务端与客户端的终端分别看到类似如下信息:

[INFO] [1631233573.009128276] [fibonacci_action_server]: Executing goal...
[INFO] [1631233573.009567222] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [1631233574.012417797] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [1631233575.015357396] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [1631233576.018333121] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
[INFO] [1631233577.021284528] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8])
[INFO] [1631233578.024294773] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13])
[INFO] [1631233579.027237307] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21])
[INFO] [1631233580.030227920] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34])
[INFO] [1631233581.033309123] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])

以及:

[INFO] [1631233573.009172627] [fibonacci_action_client]: Goal accepted :)
[INFO] [1631233573.010058780] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1])
[INFO] [1631233574.014925803] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2])
[INFO] [1631233575.017879889] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [1631233576.020788404] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5])
[INFO] [1631233577.023794055] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8])
[INFO] [1631233578.026808039] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13])
[INFO] [1631233579.029738364] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21])
[INFO] [1631233580.032715415] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34])
[INFO] [1631233581.035831373] [fibonacci_action_client]: Received feedback: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
[INFO] [1631233582.039328117] [fibonacci_action_client]: Result: array('i', [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
原文地址:https://www.cnblogs.com/vpegasus/p/ros2_basic_op.html