351 lines
13 KiB
Python
351 lines
13 KiB
Python
"""
|
||
Nav2 导航模块 - 使用官方 nav2_simple_commander.BasicNavigator API
|
||
参考: https://github.com/elephantrobotics/agv_pro_ros2/blob/humble/agv_pro_navigation2/scripts/example_nav_to_pose.py
|
||
"""
|
||
import time
|
||
import math
|
||
import logging
|
||
import threading
|
||
import subprocess
|
||
import os
|
||
import re
|
||
from typing import List, Tuple, Optional, Dict
|
||
from enum import Enum
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 延迟导入 rclpy 和 nav2_simple_commander(避免在模块加载时初始化 ROS)
|
||
_rclpy = None
|
||
_BasicNavigator = None
|
||
_TaskResult = None
|
||
_PoseStamped = None
|
||
_Duration = None
|
||
|
||
|
||
def _init_ros():
|
||
"""延迟初始化 ROS2 依赖"""
|
||
global _rclpy, _BasicNavigator, _TaskResult, _PoseStamped, _Duration
|
||
if _rclpy is None:
|
||
import rclpy
|
||
_rclpy = rclpy
|
||
from geometry_msgs.msg import PoseStamped
|
||
from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
|
||
from rclpy.duration import Duration
|
||
_PoseStamped = PoseStamped
|
||
_BasicNavigator = BasicNavigator
|
||
_TaskResult = TaskResult
|
||
_Duration = Duration
|
||
|
||
|
||
def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]:
|
||
"""执行 ros2 命令(直接运行,不二次包装)"""
|
||
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1"
|
||
full_cmd = f"bash -l -c '{setup} && {cmd}'"
|
||
try:
|
||
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout)
|
||
return result.returncode, result.stdout.strip(), result.stderr.strip()
|
||
except subprocess.TimeoutExpired:
|
||
return -1, "", "Timeout"
|
||
|
||
|
||
class Nav2Status(Enum):
|
||
IDLE = "idle"
|
||
NAVIGATING = "navigating"
|
||
SUCCEEDED = "succeeded"
|
||
FAILED = "failed"
|
||
CANCELLED = "cancelled"
|
||
UNKNOWN = "unknown"
|
||
|
||
|
||
class Nav2Navigator:
|
||
"""Nav2 导航器 — 使用官方 BasicNavigator API"""
|
||
|
||
def __init__(self):
|
||
self.status = Nav2Status.IDLE
|
||
self._nav_thread: Optional[threading.Thread] = None
|
||
self._cancel_event = threading.Event()
|
||
self._result_status = None
|
||
self._current_pose = [0.0, 0.0, 0.0]
|
||
self._navigator = None # BasicNavigator 实例
|
||
self._rclpy_initialized = False
|
||
|
||
def _ensure_rclpy(self):
|
||
"""确保 rclpy 已初始化(每个线程只能调用一次)"""
|
||
_init_ros()
|
||
if not self._rclpy_initialized:
|
||
if not _rclpy.ok():
|
||
_rclpy.init()
|
||
self._rclpy_initialized = True
|
||
|
||
def _get_navigator(self):
|
||
"""获取或创建 BasicNavigator 实例"""
|
||
self._ensure_rclpy()
|
||
if self._navigator is None:
|
||
self._navigator = _BasicNavigator()
|
||
return self._navigator
|
||
|
||
def _check_nav2_available(self) -> bool:
|
||
"""检查 Nav2 action server 是否可用"""
|
||
try:
|
||
nav = self._get_navigator()
|
||
# BasicNavigator 内部会检查 Nav2 是否活跃
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"Nav2 不可用: {e}")
|
||
return False
|
||
|
||
def _get_current_pose(self) -> List[float]:
|
||
"""从 /amcl_pose 获取当前位置 [x, y, yaw]"""
|
||
try:
|
||
rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /amcl_pose --once 2>/dev/null", timeout=4)
|
||
if rc == 0 and out:
|
||
import yaml
|
||
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
|
||
yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip()
|
||
yaml_str = yaml_str.split('---')[0]
|
||
data = yaml.safe_load(yaml_str)
|
||
if data:
|
||
pos = data.get("pose", {}).get("pose", {}).get("position", {})
|
||
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
|
||
x = pos.get("x", 0.0)
|
||
y = pos.get("y", 0.0)
|
||
qz = orient.get("z", 0.0)
|
||
qw = orient.get("w", 1.0)
|
||
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
|
||
self._current_pose = [x, y, yaw]
|
||
return self._current_pose
|
||
except Exception as e:
|
||
logger.debug(f"获取 amcl_pose 失败: {e}")
|
||
return self._current_pose
|
||
|
||
def navigate_to_pose(self, x: float, y: float, yaw: float = None,
|
||
timeout_sec: float = 120.0,
|
||
blocking: bool = True) -> bool:
|
||
"""导航到目标坐标(使用官方 BasicNavigator API)"""
|
||
if self.status == Nav2Status.NAVIGATING:
|
||
logger.warning("导航正在进行中,请先停止当前导航")
|
||
return False
|
||
|
||
self.status = Nav2Status.IDLE
|
||
self._result_status = None
|
||
|
||
if yaw is None:
|
||
current = self._get_current_pose()
|
||
yaw = current[2]
|
||
|
||
logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
|
||
|
||
self._cancel_event.clear()
|
||
self._result_status = None
|
||
self.status = Nav2Status.NAVIGATING
|
||
|
||
if self._nav_thread and self._nav_thread.is_alive():
|
||
self.stop()
|
||
self._nav_thread.join(timeout=3)
|
||
|
||
self._nav_thread = threading.Thread(
|
||
target=self._nav_thread_func,
|
||
args=(x, y, yaw, timeout_sec),
|
||
daemon=True
|
||
)
|
||
self._nav_thread.start()
|
||
|
||
if blocking:
|
||
self._nav_thread.join(timeout=timeout_sec + 10)
|
||
return self.status == Nav2Status.SUCCEEDED
|
||
|
||
return True
|
||
|
||
def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float):
|
||
"""导航执行线程 — 使用官方 BasicNavigator API"""
|
||
try:
|
||
logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s")
|
||
|
||
# 在子线程中初始化 rclpy
|
||
_init_ros()
|
||
if not _rclpy.ok():
|
||
_rclpy.init()
|
||
|
||
navigator = _BasicNavigator()
|
||
|
||
# 等待 Nav2 激活(可选,如果 Nav2 已启动可跳过)
|
||
# navigator.waitUntilNav2Active()
|
||
|
||
# 构造目标位姿
|
||
goal_pose = _PoseStamped()
|
||
goal_pose.header.frame_id = 'map'
|
||
goal_pose.header.stamp = navigator.get_clock().now().to_msg()
|
||
goal_pose.pose.position.x = x
|
||
goal_pose.pose.position.y = y
|
||
goal_pose.pose.position.z = 0.0
|
||
|
||
# 四元数(仅 z, w)
|
||
qz = math.sin(yaw / 2.0)
|
||
qw = math.cos(yaw / 2.0)
|
||
goal_pose.pose.orientation.x = 0.0
|
||
goal_pose.pose.orientation.y = 0.0
|
||
goal_pose.pose.orientation.z = qz
|
||
goal_pose.pose.orientation.w = qw
|
||
|
||
# 发送导航目标
|
||
navigator.goToPose(goal_pose)
|
||
logger.info("导航目标已发送,等待完成...")
|
||
|
||
# 等待导航完成
|
||
start_time = time.time()
|
||
while not navigator.isTaskComplete():
|
||
if self._cancel_event.is_set():
|
||
navigator.cancelTask()
|
||
logger.info("导航被取消")
|
||
self._result_status = "cancelled"
|
||
break
|
||
|
||
elapsed = time.time() - start_time
|
||
if elapsed > timeout_sec:
|
||
logger.warning(f"导航超时 ({timeout_sec}s),取消任务")
|
||
navigator.cancelTask()
|
||
self._result_status = "failed"
|
||
break
|
||
|
||
# 获取反馈(可选)
|
||
feedback = navigator.getFeedback()
|
||
if feedback:
|
||
remaining = _Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9
|
||
logger.debug(f"预计剩余时间: {remaining:.1f}s")
|
||
|
||
time.sleep(0.5)
|
||
|
||
# 获取结果
|
||
result = navigator.getResult()
|
||
if result == _TaskResult.SUCCEEDED:
|
||
logger.info(f"✅ Nav2 导航成功到达目标 ({x:.3f}, {y:.3f})")
|
||
self._result_status = "succeeded"
|
||
self.status = Nav2Status.SUCCEEDED
|
||
elif result == _TaskResult.CANCELED:
|
||
logger.info("导航被取消")
|
||
self._result_status = "cancelled"
|
||
self.status = Nav2Status.CANCELLED
|
||
elif result == _TaskResult.FAILED:
|
||
logger.warning("⚠️ Nav2 导航失败")
|
||
self._result_status = "failed"
|
||
self.status = Nav2Status.FAILED
|
||
else:
|
||
logger.warning(f"导航返回未知状态: {result}")
|
||
self._result_status = "failed"
|
||
self.status = Nav2Status.FAILED
|
||
|
||
except Exception as e:
|
||
logger.error(f"Nav2 导航异常: {e}", exc_info=True)
|
||
self.status = Nav2Status.FAILED
|
||
self._result_status = "failed"
|
||
|
||
def navigate_through_poses(self, poses: List[Tuple[float, float, float]],
|
||
timeout_per_pose: float = 120.0,
|
||
blocking: bool = True) -> bool:
|
||
"""导航通过多个路径点"""
|
||
if self.status == Nav2Status.NAVIGATING:
|
||
logger.warning("导航正在进行中,请先停止")
|
||
return False
|
||
if not poses:
|
||
return True
|
||
|
||
logger.info(f"发送 {len(poses)} 个路径点的导航任务")
|
||
self._cancel_event.clear()
|
||
self.status = Nav2Status.NAVIGATING
|
||
|
||
self._nav_thread = threading.Thread(
|
||
target=self._nav_thread_func_multi,
|
||
args=(poses, timeout_per_pose),
|
||
daemon=True
|
||
)
|
||
self._nav_thread.start()
|
||
|
||
if blocking:
|
||
total_timeout = len(poses) * timeout_per_pose + 30
|
||
self._nav_thread.join(timeout=total_timeout)
|
||
return self.status == Nav2Status.SUCCEEDED
|
||
return True
|
||
|
||
def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float):
|
||
"""多路径点导航线程"""
|
||
try:
|
||
_init_ros()
|
||
if not _rclpy.ok():
|
||
_rclpy.init()
|
||
|
||
navigator = _BasicNavigator()
|
||
|
||
# 构造路径点列表
|
||
goal_poses = []
|
||
for x, y, yaw in poses:
|
||
pose = _PoseStamped()
|
||
pose.header.frame_id = 'map'
|
||
pose.header.stamp = navigator.get_clock().now().to_msg()
|
||
pose.pose.position.x = x
|
||
pose.pose.position.y = y
|
||
pose.pose.position.z = 0.0
|
||
qz = math.sin(yaw / 2.0)
|
||
qw = math.cos(yaw / 2.0)
|
||
pose.pose.orientation.x = 0.0
|
||
pose.pose.orientation.y = 0.0
|
||
pose.pose.orientation.z = qz
|
||
pose.pose.orientation.w = qw
|
||
goal_poses.append(pose)
|
||
|
||
navigator.goThroughPoses(goal_poses)
|
||
logger.info(f"已发送 {len(poses)} 个路径点,等待完成...")
|
||
|
||
start_time = time.time()
|
||
total_timeout = len(poses) * timeout_per_pose
|
||
|
||
while not navigator.isTaskComplete():
|
||
if self._cancel_event.is_set():
|
||
navigator.cancelTask()
|
||
self._result_status = "cancelled"
|
||
break
|
||
|
||
elapsed = time.time() - start_time
|
||
if elapsed > total_timeout:
|
||
navigator.cancelTask()
|
||
self._result_status = "failed"
|
||
break
|
||
|
||
time.sleep(0.5)
|
||
|
||
result = navigator.getResult()
|
||
if result == _TaskResult.SUCCEEDED:
|
||
logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点")
|
||
self._result_status = "succeeded"
|
||
self.status = Nav2Status.SUCCEEDED
|
||
elif result == _TaskResult.CANCELED:
|
||
self._result_status = "cancelled"
|
||
self.status = Nav2Status.CANCELLED
|
||
else:
|
||
self._result_status = "failed"
|
||
self.status = Nav2Status.FAILED
|
||
|
||
except Exception as e:
|
||
logger.error(f"Nav2 路径点导航异常: {e}", exc_info=True)
|
||
self.status = Nav2Status.FAILED
|
||
|
||
def stop(self):
|
||
"""取消当前导航"""
|
||
if self.status == Nav2Status.NAVIGATING:
|
||
self._cancel_event.set()
|
||
self.status = Nav2Status.CANCELLED
|
||
logger.info("导航已停止")
|
||
|
||
def get_status(self) -> Dict:
|
||
"""获取导航状态"""
|
||
current = self._get_current_pose()
|
||
nav2_available = self._check_nav2_available()
|
||
return {
|
||
"status": self.status.value,
|
||
"current_position": current,
|
||
"nav2_available": nav2_available
|
||
}
|
||
|
||
def get_current_position(self) -> List[float]:
|
||
"""获取当前位置 [x, y, yaw]"""
|
||
return self._get_current_pose()
|