""" Nav2 导航模块 - 使用官方 nav2_simple_commander.BasicNavigator API 参考: https://github.com/elephantrobotics/agv_pro_ros2/blob/humble/agv_pro_navigation2/scripts/example_nav_to_pose.py """ import time import math import logging import threading import subprocess import os import re from typing import List, Tuple, Optional, Dict from enum import Enum logger = logging.getLogger(__name__) # 延迟导入 rclpy 和 nav2_simple_commander(避免在模块加载时初始化 ROS) _rclpy = None _BasicNavigator = None _TaskResult = None _PoseStamped = None _Duration = None def _init_ros(): """延迟初始化 ROS2 依赖""" global _rclpy, _BasicNavigator, _TaskResult, _PoseStamped, _Duration if _rclpy is None: import rclpy _rclpy = rclpy from geometry_msgs.msg import PoseStamped from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult from rclpy.duration import Duration _PoseStamped = PoseStamped _BasicNavigator = BasicNavigator _TaskResult = TaskResult _Duration = Duration def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: """执行 ros2 命令(直接运行,不二次包装)""" setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" full_cmd = f"bash -l -c '{setup} && {cmd}'" try: result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout) return result.returncode, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Timeout" class Nav2Status(Enum): IDLE = "idle" NAVIGATING = "navigating" SUCCEEDED = "succeeded" FAILED = "failed" CANCELLED = "cancelled" UNKNOWN = "unknown" class Nav2Navigator: """Nav2 导航器 — 使用官方 BasicNavigator API""" def __init__(self): self.status = Nav2Status.IDLE self._nav_thread: Optional[threading.Thread] = None self._cancel_event = threading.Event() self._result_status = None self._current_pose = [0.0, 0.0, 0.0] self._navigator = None # BasicNavigator 实例 self._rclpy_initialized = False def _ensure_rclpy(self): """确保 rclpy 已初始化(每个线程只能调用一次)""" _init_ros() if not self._rclpy_initialized: if not _rclpy.ok(): _rclpy.init() self._rclpy_initialized = True def _get_navigator(self): """获取或创建 BasicNavigator 实例""" self._ensure_rclpy() if self._navigator is None: self._navigator = _BasicNavigator() return self._navigator def _check_nav2_available(self) -> bool: """检查 Nav2 action server 是否可用""" try: nav = self._get_navigator() # BasicNavigator 内部会检查 Nav2 是否活跃 return True except Exception as e: logger.debug(f"Nav2 不可用: {e}") return False def _get_current_pose(self) -> List[float]: """从 /amcl_pose 获取当前位置 [x, y, yaw]""" try: rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /amcl_pose --once 2>/dev/null", timeout=4) if rc == 0 and out: import yaml clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip() yaml_str = yaml_str.split('---')[0] data = yaml.safe_load(yaml_str) if data: pos = data.get("pose", {}).get("pose", {}).get("position", {}) orient = data.get("pose", {}).get("pose", {}).get("orientation", {}) x = pos.get("x", 0.0) y = pos.get("y", 0.0) qz = orient.get("z", 0.0) qw = orient.get("w", 1.0) yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz) self._current_pose = [x, y, yaw] return self._current_pose except Exception as e: logger.debug(f"获取 amcl_pose 失败: {e}") return self._current_pose def navigate_to_pose(self, x: float, y: float, yaw: float = None, timeout_sec: float = 120.0, blocking: bool = True) -> bool: """导航到目标坐标(使用官方 BasicNavigator API)""" if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止当前导航") return False self.status = Nav2Status.IDLE self._result_status = None if yaw is None: current = self._get_current_pose() yaw = current[2] logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°") self._cancel_event.clear() self._result_status = None self.status = Nav2Status.NAVIGATING if self._nav_thread and self._nav_thread.is_alive(): self.stop() self._nav_thread.join(timeout=3) self._nav_thread = threading.Thread( target=self._nav_thread_func, args=(x, y, yaw, timeout_sec), daemon=True ) self._nav_thread.start() if blocking: self._nav_thread.join(timeout=timeout_sec + 10) return self.status == Nav2Status.SUCCEEDED return True def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float): """导航执行线程 — 使用官方 BasicNavigator API""" try: logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s") # 在子线程中初始化 rclpy _init_ros() if not _rclpy.ok(): _rclpy.init() navigator = _BasicNavigator() # 等待 Nav2 激活(可选,如果 Nav2 已启动可跳过) # navigator.waitUntilNav2Active() # 构造目标位姿 goal_pose = _PoseStamped() goal_pose.header.frame_id = 'map' goal_pose.header.stamp = navigator.get_clock().now().to_msg() goal_pose.pose.position.x = x goal_pose.pose.position.y = y goal_pose.pose.position.z = 0.0 # 四元数(仅 z, w) qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) goal_pose.pose.orientation.x = 0.0 goal_pose.pose.orientation.y = 0.0 goal_pose.pose.orientation.z = qz goal_pose.pose.orientation.w = qw # 发送导航目标 navigator.goToPose(goal_pose) logger.info("导航目标已发送,等待完成...") # 等待导航完成 start_time = time.time() while not navigator.isTaskComplete(): if self._cancel_event.is_set(): navigator.cancelTask() logger.info("导航被取消") self._result_status = "cancelled" break elapsed = time.time() - start_time if elapsed > timeout_sec: logger.warning(f"导航超时 ({timeout_sec}s),取消任务") navigator.cancelTask() self._result_status = "failed" break # 获取反馈(可选) feedback = navigator.getFeedback() if feedback: remaining = _Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9 logger.debug(f"预计剩余时间: {remaining:.1f}s") time.sleep(0.5) # 获取结果 result = navigator.getResult() if result == _TaskResult.SUCCEEDED: logger.info(f"✅ Nav2 导航成功到达目标 ({x:.3f}, {y:.3f})") self._result_status = "succeeded" self.status = Nav2Status.SUCCEEDED elif result == _TaskResult.CANCELED: logger.info("导航被取消") self._result_status = "cancelled" self.status = Nav2Status.CANCELLED elif result == _TaskResult.FAILED: logger.warning("⚠️ Nav2 导航失败") self._result_status = "failed" self.status = Nav2Status.FAILED else: logger.warning(f"导航返回未知状态: {result}") self._result_status = "failed" self.status = Nav2Status.FAILED except Exception as e: logger.error(f"Nav2 导航异常: {e}", exc_info=True) self.status = Nav2Status.FAILED self._result_status = "failed" def navigate_through_poses(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float = 120.0, blocking: bool = True) -> bool: """导航通过多个路径点""" if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止") return False if not poses: return True logger.info(f"发送 {len(poses)} 个路径点的导航任务") self._cancel_event.clear() self.status = Nav2Status.NAVIGATING self._nav_thread = threading.Thread( target=self._nav_thread_func_multi, args=(poses, timeout_per_pose), daemon=True ) self._nav_thread.start() if blocking: total_timeout = len(poses) * timeout_per_pose + 30 self._nav_thread.join(timeout=total_timeout) return self.status == Nav2Status.SUCCEEDED return True def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float): """多路径点导航线程""" try: _init_ros() if not _rclpy.ok(): _rclpy.init() navigator = _BasicNavigator() # 构造路径点列表 goal_poses = [] for x, y, yaw in poses: pose = _PoseStamped() pose.header.frame_id = 'map' pose.header.stamp = navigator.get_clock().now().to_msg() pose.pose.position.x = x pose.pose.position.y = y pose.pose.position.z = 0.0 qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) pose.pose.orientation.x = 0.0 pose.pose.orientation.y = 0.0 pose.pose.orientation.z = qz pose.pose.orientation.w = qw goal_poses.append(pose) navigator.goThroughPoses(goal_poses) logger.info(f"已发送 {len(poses)} 个路径点,等待完成...") start_time = time.time() total_timeout = len(poses) * timeout_per_pose while not navigator.isTaskComplete(): if self._cancel_event.is_set(): navigator.cancelTask() self._result_status = "cancelled" break elapsed = time.time() - start_time if elapsed > total_timeout: navigator.cancelTask() self._result_status = "failed" break time.sleep(0.5) result = navigator.getResult() if result == _TaskResult.SUCCEEDED: logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") self._result_status = "succeeded" self.status = Nav2Status.SUCCEEDED elif result == _TaskResult.CANCELED: self._result_status = "cancelled" self.status = Nav2Status.CANCELLED else: self._result_status = "failed" self.status = Nav2Status.FAILED except Exception as e: logger.error(f"Nav2 路径点导航异常: {e}", exc_info=True) self.status = Nav2Status.FAILED def stop(self): """取消当前导航""" if self.status == Nav2Status.NAVIGATING: self._cancel_event.set() self.status = Nav2Status.CANCELLED logger.info("导航已停止") def get_status(self) -> Dict: """获取导航状态""" current = self._get_current_pose() nav2_available = self._check_nav2_available() return { "status": self.status.value, "current_position": current, "nav2_available": nav2_available } def get_current_position(self) -> List[float]: """获取当前位置 [x, y, yaw]""" return self._get_current_pose()