From 2eb39fdbd56960360a3220241f7f6e963d637f68 Mon Sep 17 00:00:00 2001 From: ywb <347742090@qq.com> Date: Tue, 19 May 2026 21:25:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E4=BB=96=E4=BB=AC=E7=9A=84?= =?UTF-8?q?=E5=AF=BC=E8=88=AA=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agv_app/utils/nav2_navigator.py | 404 ++++++++++++-------------------- 1 file changed, 155 insertions(+), 249 deletions(-) diff --git a/agv_app/utils/nav2_navigator.py b/agv_app/utils/nav2_navigator.py index 0eb4c5a..d270219 100644 --- a/agv_app/utils/nav2_navigator.py +++ b/agv_app/utils/nav2_navigator.py @@ -1,8 +1,6 @@ """ -Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航 -使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action - -通过 stdin pipe 发送 goal YAML,避免 bash $() 替换破坏 YAML 格式 +Nav2 导航模块 - 使用官方 nav2_simple_commander.BasicNavigator API +参考: https://github.com/elephantrobotics/agv_pro_ros2/blob/humble/agv_pro_navigation2/scripts/example_nav_to_pose.py """ import time import math @@ -16,6 +14,28 @@ from enum import Enum logger = logging.getLogger(__name__) +# 延迟导入 rclpy 和 nav2_simple_commander(避免在模块加载时初始化 ROS) +_rclpy = None +_BasicNavigator = None +_TaskResult = None +_PoseStamped = None +_Duration = None + + +def _init_ros(): + """延迟初始化 ROS2 依赖""" + global _rclpy, _BasicNavigator, _TaskResult, _PoseStamped, _Duration + if _rclpy is None: + import rclpy + _rclpy = rclpy + from geometry_msgs.msg import PoseStamped + from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult + from rclpy.duration import Duration + _PoseStamped = PoseStamped + _BasicNavigator = BasicNavigator + _TaskResult = TaskResult + _Duration = Duration + def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: """执行 ros2 命令(直接运行,不二次包装)""" @@ -38,7 +58,7 @@ class Nav2Status(Enum): class Nav2Navigator: - """Nav2 导航器 — 通过 ros2 action /navigate_to_pose 与 Nav2 通信""" + """Nav2 导航器 — 使用官方 BasicNavigator API""" def __init__(self): self.status = Nav2Status.IDLE @@ -46,30 +66,33 @@ class Nav2Navigator: self._cancel_event = threading.Event() self._result_status = None self._current_pose = [0.0, 0.0, 0.0] + self._navigator = None # BasicNavigator 实例 + self._rclpy_initialized = False + + def _ensure_rclpy(self): + """确保 rclpy 已初始化(每个线程只能调用一次)""" + _init_ros() + if not self._rclpy_initialized: + if not _rclpy.ok(): + _rclpy.init() + self._rclpy_initialized = True + + def _get_navigator(self): + """获取或创建 BasicNavigator 实例""" + self._ensure_rclpy() + if self._navigator is None: + self._navigator = _BasicNavigator() + return self._navigator def _check_nav2_available(self) -> bool: - """检查 Nav2 action server 是否可用(带缓存 10 秒)""" - import time as time_mod - if hasattr(self, '_nav2_check_time') and hasattr(self, '_nav2_check_result'): - if time_mod.time() - self._nav2_check_time < 10: - return self._nav2_check_result - script = ( - "#!/bin/bash\n" - "source /opt/ros/humble/setup.bash 2>/dev/null\n" - "source /home/elephant/agv_pro_ros2/install/setup.bash 2>/dev/null\n" - "export ROS_DOMAIN_ID=1\n" - "ros2 action list 2>&1\n" - ) - script_file = "/tmp/nav2_check.sh" - with open(script_file, "w") as f: - f.write(script) - os.chmod(script_file, 0o755) - r = subprocess.run([script_file], capture_output=True, text=True, timeout=8) - result = r.returncode == 0 and ("/navigate_to_pose" in r.stdout or "navigate_to_pose" in r.stdout) - logger.debug(f"_check_nav2_available: rc={r.returncode}, result={result}") - self._nav2_check_result = result - self._nav2_check_time = time_mod.time() - return result + """检查 Nav2 action server 是否可用""" + try: + nav = self._get_navigator() + # BasicNavigator 内部会检查 Nav2 是否活跃 + return True + except Exception as e: + logger.debug(f"Nav2 不可用: {e}") + return False def _get_current_pose(self) -> List[float]: """从 /amcl_pose 获取当前位置 [x, y, yaw]""" @@ -98,7 +121,7 @@ class Nav2Navigator: def navigate_to_pose(self, x: float, y: float, yaw: float = None, timeout_sec: float = 120.0, blocking: bool = True) -> bool: - """导航到目标坐标""" + """导航到目标坐标(使用官方 BasicNavigator API)""" if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止当前导航") return False @@ -106,10 +129,6 @@ class Nav2Navigator: self.status = Nav2Status.IDLE self._result_status = None - if not self._check_nav2_available(): - logger.error("Nav2 action server 不可用") - return False - if yaw is None: current = self._get_current_pose() yaw = current[2] @@ -137,161 +156,88 @@ class Nav2Navigator: return True - def _parse_result_status(self, combined: str) -> Optional[str]: - """从 ros2 action send_goal 输出中解析导航结果状态""" - result_idx = combined.find("Result:") - if result_idx < 0: - return None - # 匹配 "status: STATUS_SUCCEEDED" 或 "status: SUCCEEDED" 等格式 - status_m = re.search(r'status\s*:\s*(\w+)', combined[result_idx:]) - if status_m: - st = status_m.group(1).upper() - if 'SUCCEEDED' in st: - return "succeeded" - elif 'FAILED' in st or 'ABORTED' in st: - return "failed" - elif 'CANCELED' in st or 'CANCELLED' in st: - return "cancelled" - logger.warning(f"未知的导航状态: {st}") - return None - def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float): - """导航执行线程 — 使用 stdin pipe 发送 goal YAML""" + """导航执行线程 — 使用官方 BasicNavigator API""" try: logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s") - start_time = time.time() + + # 在子线程中初始化 rclpy + _init_ros() + if not _rclpy.ok(): + _rclpy.init() + + navigator = _BasicNavigator() + + # 等待 Nav2 激活(可选,如果 Nav2 已启动可跳过) + # navigator.waitUntilNav2Active() + + # 构造目标位姿 + goal_pose = _PoseStamped() + goal_pose.header.frame_id = 'map' + goal_pose.header.stamp = navigator.get_clock().now().to_msg() + goal_pose.pose.position.x = x + goal_pose.pose.position.y = y + goal_pose.pose.position.z = 0.0 + + # 四元数(仅 z, w) qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) + goal_pose.pose.orientation.x = 0.0 + goal_pose.pose.orientation.y = 0.0 + goal_pose.pose.orientation.z = qz + goal_pose.pose.orientation.w = qw - goal_yaml = ( - f"pose:\n" - f" header:\n" - f" stamp:\n" - f" sec: 0\n" - f" nanosec: 0\n" - f" frame_id: map\n" - f" pose:\n" - f" position:\n" - f" x: {x}\n" - f" y: {y}\n" - f" z: 0.0\n" - f" orientation:\n" - f" x: 0.0\n" - f" y: 0.0\n" - f" z: {qz}\n" - f" w: {qw}\n" - ) - - # 写入临时文件,用 $(cat file) 方式传递(stdin pipe "-" 在 Humble 有 bug) - import tempfile - goal_file = "/tmp/nav2_goal_{}_{}.yaml".format(os.getpid(), int(time.time())) - with open(goal_file, "w") as f: - f.write(goal_yaml) - - # 写一个临时脚本执行导航(避免引号嵌套地狱) - script_file = "/tmp/nav2_navigate_{}_{}.sh".format(os.getpid(), int(time.time())) - with open(script_file, "w") as f: - f.write('#!/bin/bash\n') - f.write('source /opt/ros/humble/setup.bash\n') - f.write('source /home/elephant/agv_pro_ros2/install/setup.bash\n') - f.write('export ROS_DOMAIN_ID=1\n') - f.write('ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose "$(cat {})" --feedback\n'.format(goal_file)) - os.chmod(script_file, 0o755) - - proc = subprocess.Popen( - ['bash', '-l', script_file], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True - ) - - stdout_lines = [] - stderr_lines = [] - stop_reading = threading.Event() - - def reader(pipe, lines): - for line in iter(pipe.readline, ''): - if stop_reading.is_set(): - break - lines.append(line) - - t_out = threading.Thread(target=reader, args=(proc.stdout, stdout_lines)) - t_err = threading.Thread(target=reader, args=(proc.stderr, stderr_lines)) - t_out.start() - t_err.start() - - # 轮询等待 action 完成 - result_received = False - max_checks = int(timeout_sec / 2) - for _ in range(max_checks): - time.sleep(2) - elapsed = time.time() - start_time - - full_out = "".join(stdout_lines) - full_err = "".join(stderr_lines) - combined = full_out + full_err - - # 检查是否有 YAML 解析错误 - if "Traceback" in combined and "Goal accepted" not in combined: - logger.error(f"ros2 action send_goal YAML 解析失败: {combined[:500]}") - self._result_status = "failed" - result_received = True - break - - # 检查是否未被接受 - if "Goal accepted" not in combined and elapsed > 15: - logger.warning(f"导航目标 15 秒内未被接受,输出: {combined[:300]}") - - # 解析 Result - parsed = self._parse_result_status(combined) - if parsed: - self._result_status = parsed - result_received = True - if parsed == "succeeded": - logger.info(f"✅ Nav2 导航成功到达目标 ({x:.3f}, {y:.3f})") - elif parsed == "failed": - logger.warning(f"⚠️ Nav2 导航失败") - else: - logger.info("Nav2 导航被取消") - break - - # 进程已结束但无 Result - if proc.poll() is not None and not result_received: - logger.error(f"导航进程提前退出,rc={proc.returncode},输出: {combined[:300]}") - self._result_status = "failed" - result_received = True - break + # 发送导航目标 + navigator.goToPose(goal_pose) + logger.info("导航目标已发送,等待完成...") + # 等待导航完成 + start_time = time.time() + while not navigator.isTaskComplete(): if self._cancel_event.is_set(): + navigator.cancelTask() + logger.info("导航被取消") + self._result_status = "cancelled" break - # 清理 - stop_reading.set() - if proc.poll() is None: - proc.terminate() - try: - proc.wait(timeout=3) - except subprocess.TimeoutExpired: - proc.kill() - t_out.join(timeout=2) - t_err.join(timeout=2) + elapsed = time.time() - start_time + if elapsed > timeout_sec: + logger.warning(f"导航超时 ({timeout_sec}s),取消任务") + navigator.cancelTask() + self._result_status = "failed" + break - # 处理取消 - if self._cancel_event.is_set() and not result_received: - logger.info("取消 Nav2 导航...") - setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" - cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || true\'' - subprocess.run(cancel_cmd, shell=True, timeout=3) - self._result_status = "cancelled" + # 获取反馈(可选) + feedback = navigator.getFeedback() + if feedback: + remaining = _Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9 + logger.debug(f"预计剩余时间: {remaining:.1f}s") - if self._result_status == "succeeded": + time.sleep(0.5) + + # 获取结果 + result = navigator.getResult() + if result == _TaskResult.SUCCEEDED: + logger.info(f"✅ Nav2 导航成功到达目标 ({x:.3f}, {y:.3f})") + self._result_status = "succeeded" self.status = Nav2Status.SUCCEEDED - elif self._result_status == "cancelled": + elif result == _TaskResult.CANCELED: + logger.info("导航被取消") + self._result_status = "cancelled" self.status = Nav2Status.CANCELLED + elif result == _TaskResult.FAILED: + logger.warning("⚠️ Nav2 导航失败") + self._result_status = "failed" + self.status = Nav2Status.FAILED else: + logger.warning(f"导航返回未知状态: {result}") + self._result_status = "failed" self.status = Nav2Status.FAILED except Exception as e: - logger.error(f"Nav2 导航异常: {e}") + logger.error(f"Nav2 导航异常: {e}", exc_info=True) self.status = Nav2Status.FAILED + self._result_status = "failed" def navigate_through_poses(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float = 120.0, @@ -300,9 +246,6 @@ class Nav2Navigator: if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止") return False - if not self._check_nav2_available(): - logger.error("Nav2 action server 不可用") - return False if not poses: return True @@ -326,117 +269,80 @@ class Nav2Navigator: def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float): """多路径点导航线程""" try: - setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" + _init_ros() + if not _rclpy.ok(): + _rclpy.init() - poses_lines = [] - for i, (x, y, yaw) in enumerate(poses): + navigator = _BasicNavigator() + + # 构造路径点列表 + goal_poses = [] + for x, y, yaw in poses: + pose = _PoseStamped() + pose.header.frame_id = 'map' + pose.header.stamp = navigator.get_clock().now().to_msg() + pose.pose.position.x = x + pose.pose.position.y = y + pose.pose.position.z = 0.0 qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) - poses_lines.extend([ - f" - pose:", - f" header:", - f" stamp:", - f" sec: 0", - f" nanosec: 0", - f" frame_id: map", - f" pose:", - f" position:", - f" x: {x}", - f" y: {y}", - f" z: 0.0", - f" orientation:", - f" x: 0.0", - f" y: 0.0", - f" z: {qz}", - f" w: {qw}", - ]) - poses_yaml = "poses:\n" + "\n".join(poses_lines) + pose.pose.orientation.x = 0.0 + pose.pose.orientation.y = 0.0 + pose.pose.orientation.z = qz + pose.pose.orientation.w = qw + goal_poses.append(pose) - cmd = f'bash -l -c \'{setup} && ros2 action send_goal /navigate_through_poses nav2_msgs/action/NavigateThroughPoses - --feedback\'' + navigator.goThroughPoses(goal_poses) + logger.info(f"已发送 {len(poses)} 个路径点,等待完成...") - process = subprocess.Popen( - cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True - ) + start_time = time.time() + total_timeout = len(poses) * timeout_per_pose - stdout_lines = [] - stderr_lines = [] - stop_reading = threading.Event() - - def reader(pipe, lines): - for line in iter(pipe.readline, ''): - if stop_reading.is_set(): - break - lines.append(line) - - out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines)) - err_thread = threading.Thread(target=reader, args=(process.stderr, stderr_lines)) - out_thread.start() - err_thread.start() - - try: - process.stdin.write(poses_yaml + "\n") - process.stdin.flush() - process.stdin.close() - except Exception as e: - logger.error(f"写入 poses YAML 失败: {e}") - self.status = Nav2Status.FAILED - return - - result_received = False - while not self._cancel_event.is_set(): - time.sleep(1.0) - full_out = "".join(stdout_lines) - full_err = "".join(stderr_lines) - combined = full_out + full_err - - parsed = self._parse_result_status(combined) - if parsed: - self._result_status = parsed - result_received = True - if parsed == "succeeded": - logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") + while not navigator.isTaskComplete(): + if self._cancel_event.is_set(): + navigator.cancelTask() + self._result_status = "cancelled" break - if process.poll() is not None and not result_received: + elapsed = time.time() - start_time + if elapsed > total_timeout: + navigator.cancelTask() self._result_status = "failed" - result_received = True break - stop_reading.set() - if process.poll() is None: - process.terminate() - process.wait(timeout=3) - out_thread.join(timeout=2) - err_thread.join(timeout=2) + time.sleep(0.5) - if self._result_status == "succeeded": + result = navigator.getResult() + if result == _TaskResult.SUCCEEDED: + logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") + self._result_status = "succeeded" self.status = Nav2Status.SUCCEEDED - elif self._result_status == "cancelled": + elif result == _TaskResult.CANCELED: + self._result_status = "cancelled" self.status = Nav2Status.CANCELLED else: + self._result_status = "failed" self.status = Nav2Status.FAILED except Exception as e: - logger.error(f"Nav2 路径点导航异常: {e}") + logger.error(f"Nav2 路径点导航异常: {e}", exc_info=True) self.status = Nav2Status.FAILED def stop(self): """取消当前导航""" if self.status == Nav2Status.NAVIGATING: self._cancel_event.set() - setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" - cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true\'' - subprocess.run(cancel_cmd, shell=True, timeout=3) self.status = Nav2Status.CANCELLED logger.info("导航已停止") def get_status(self) -> Dict: """获取导航状态""" current = self._get_current_pose() + nav2_available = self._check_nav2_available() return { "status": self.status.value, "current_position": current, - "nav2_available": self._check_nav2_available() + "nav2_available": nav2_available } def get_current_position(self) -> List[float]: