跳到主要内容

编写一个 tf2 监听器 (Python)

编写坐标监听器

首先,创建一个新的 ROS2 功能包,在终端中运行以下命令:

ros2 pkg create --build-type ament_python tf2_listener_py

该命令将使用 ament_python 构建系统创建一个名为 tf2_listener_py 的功能包。

进入到功能包下的 turtle_tf2_listener_py,并创建一个名为 tf2_listener.py 的 Python 文件。在该文件中,编写一个 tf2 监听器节点,代码如下:

import math
from geometry_msgs.msg import Twist # 导入Twist消息类型,用于发布速度命令


import rclpy
from rclpy.node import Node
from tf2_ros import TransformException # 导入变换异常类
from tf2_ros.buffer import Buffer #导入ros2 的Buffer 类
from tf2_ros.transform_listener import TransformListener
#从tf2_ros功能包导入TransformListener类
from turtlesim.srv import Spawn #导入turtlesim的Spawn服务


class FrameListener(Node):#定义继承自Node的FrameListener类

def __init__(self):
super().__init__('turtle_tf2_frame_listener')#初始化节点

self.target_frame = self.declare_parameter(
'target_frame', 'turtle1').get_parameter_value().string_value
# 声明并获取'target_frame'的参数
self.tf_buffer = Buffer() #创建tf2的Buffer实例
self.tf_listener = TransformListener(self.tf_buffer, self)
#创建tf2的transformlistener实例

self.spawner = self.create_client(Spawn, 'spawn')
#创建Spawn服务的客户端
self.turtle_spawning_service_ready = False
self.turtle_spawned = False #创建海龟是否生成的标志


self.publisher = self.create_publisher(Twist, 'turtle2/cmd_vel', 1)
# 创建Twist发布者

self.timer = self.create_timer(1.0, self.on_timer)#创建定时器

def on_timer(self):
#存储变换坐标名称
from_frame_rel = self.target_frame
to_frame_rel = 'turtle2'

if self.turtle_spawning_service_ready:
if self.turtle_spawned:#判断服务就绪
# 查找target_frame和turtle2框架之间的变换,并发送速度命令让turtle2到达target_frame
try:
t = self.tf_buffer.lookup_transform(
to_frame_rel,
from_frame_rel,
rclpy.time.Time())
except TransformException as ex:#捕获变化异常
self.get_logger().info(
f'Could not transform {to_frame_rel} to {from_frame_rel}: {ex}')
return

msg = Twist()
scale_rotation_rate = 1.0
msg.angular.z = scale_rotation_rate * math.atan2(
t.transform.translation.y,
t.transform.translation.x)

scale_forward_speed = 0.5
msg.linear.x = scale_forward_speed * math.sqrt(
t.transform.translation.x ** 2 +
t.transform.translation.y ** 2)

self.publisher.publish(msg)
else:
if self.result.done():
self.get_logger().info(
f'Successfully spawned {self.result.result().name}')
self.turtle_spawned = True
else:
self.get_logger().info('Spawn is not finished')
else:#如果没有海龟
if self.spawner.service_is_ready():#如果服务端准备完成
# 初始化请求,设置海龟的名称和坐标
request = Spawn.Request()
request.name = 'turtle2'
request.x = float(4)
request.y = float(2)
request.theta = float(0)
# 调用请求
self.result = self.spawner.call_async(request)#异步调用服务
self.turtle_spawning_service_ready = True#服务就绪
else:
# 输出服务未就绪
self.get_logger().info('Service is not ready')


def main():
rclpy.init()
node = FrameListener()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass

rclpy.shutdown()

代码解释

现在,让我们来看一下与监听坐标变换相关的代码。tf2_ros 功能包提供了一个 TransformListener方法来管理已经发布到 Buffer 中坐标系的关系。

from tf2_ros.transform_listener import TransformListener

在这里,我们创建了一个 TransformListener 对象。一旦监听器被创建,它就开始接收通过网络传输的 tf2 坐标变换,并将其缓存最长达 10 秒钟。

self.tf_listener = TransformListener(self.tf_buffer, self)

最后,我们可以使用lookup_transform 方法查询监听器以获取特定的坐标变换之间的关系。lookup_transform 使用方法如下:

  • 目标坐标系(Target frame)
  • 源坐标系(Source frame)
  • 我们希望进行变换的时间(The time at which we want to transform)

提供 rclpy.time.Time() 将使我们获取最新的可用变换。所有这些操作都被包装在一个 try-except 块中,以处理可能发生的异常。

t = self.tf_buffer.lookup_transform(to_frame_rel, from_frame_rel,rclpy.time.Time())
  • self.tf_buffer:这是一个 Buffer 对象的实例,它保存了 ROS 中的所有已知的坐标变换信息。Buffer 类是 tf2_ros 功能包中的核心类之一,用于存储和管理坐标变换数据。

  • lookup_transform:这是 Buffer 类的一个方法,用于查找从一个坐标系(from_frame)到另一个坐标系(to_frame)在 rclpy.time.Time() 的变换。

self.tf_buffer.lookup_transform(to_frame_rel, from_frame_rel, rclpy.time.Time()) 的参数解释如下:

  1. to_frame_rel:目标坐标系的名称(父坐标系),即你想要转换到的坐标系。在这个例子中,它是'turtle2'。

  2. from_frame_rel:源坐标系的名称(子坐标系),即你想要从哪里转换的坐标。在这个例子中,它是self.target_frame,其值在初始化时被设置为 'turtle1'。

  3. rclpy.time.Time():查询变换的时间点。这里使用 rclpy.time.Time() 表示当前时间,即你想要获取当前时刻两个坐标系之间的变换。

功能包配置和运行

**1. 设置入口点

为了能够通过命令行运行这个节点,需要在功能包的 setup.py 文件中配置入口点。打开 setup.py 文件并确保在 entry_points 部分添加以下内容:

entry_points={ 'console_scripts': 
[
'tf2_listener = tf2_listener_py.tf2_listener:main',
],
},

2. 更新launch 文件

在功能包 tf2_listener_py 下创建名为 launch 的文件夹并在该文件夹下创建名为turtle_tf2_demo_launch.py 的文件,编辑文件的内容如下。

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node

def generate_launch_description():
return LaunchDescription([
# 声明目标坐标系参数
DeclareLaunchArgument(
'target_frame',
default_value='turtle1',
description='Target frame name.'
),

# 启动 turtlesim_node
Node(
package='turtlesim',
executable='turtlesim_node',
name='sim'
),

# 启动 turtle1 的 tf2 广播器
Node(
package='tf2_broadcaster_py',
executable='turtle_tf2_broadcaster',
name='broadcaster1',
parameters=[{'turtlename': 'turtle1'}]
),

# 启动 turtle2 的 tf2 广播器
Node(
package='tf2_broadcaster_py',
executable='turtle_tf2_broadcaster',
name='broadcaster2',
parameters=[{'turtlename': 'turtle2'}]
),

# 启动 tf2 listener 节点
Node(
package='tf2_listener_py',
executable='tf2_listener',
name='listener',
parameters=[{'target_frame': LaunchConfiguration('target_frame')}]
)
])

这段代码将声明一个名为 target_frame 的launch参数,启动一个广播器用于我们即将生成的第二个海龟,以及一个监听器,该监听器将订阅这些变换。

跟新 setup.py 文件,在 data_files 字段添加如下代码:

data_files=[
...
(os.path.join('share', package_name, 'launch'), glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
],

还需要在文件顶部添加适当的模块:

import os
from glob import glob

3. 编译并运行功能包

在终端中,切换到功能包所在的工作空间根目录并编译功能包:

colcon build --packages-select tf2_listener_py

编译完成后,加载工作空间:

source install/setup.bash

现在,可以运行 tf2 监听器节点了。使用以下命令启动节点:

ros2 launch tf2_listener_py turtle_tf2_demo_launch.py

打开第二个终端,并在终端中打开键盘功能

ros2 run turtlesim turtle_teleop_key

按动键盘控制海龟1运行,查看海龟2的运行,并注意终端输出的海龟的坐标变换,此时会将海龟2的坐标转换到海龟1的坐标系中。

![[tf2_listener.png]]

tf2监听器

图 1: tf2监听器运行结果

本章小结

本章介绍了 ROS2 中坐标变换(tf2)机制的基本原理和常用方法,包括静态变换的发布、动态变换的广播器实现,以及 REP 105 与 REP 103 所规范的坐标系命名约定。这些内容为后续构建多传感器融合、导航与控制系统打下基础。值得一提的是,tf2 还支持诸如时间同步插值、坐标系缓存管理、与消息同步机制(如 message_filters)的深度集成等高级功能,读者在实际项目中可根据需求进一步查阅官方文档或相关资料,灵活应用这些能力,以实现更高效、更健壮的坐标管理系统。