From 2bb8dff4e2fc8e183c7e6829345640f723e694b1 Mon Sep 17 00:00:00 2001 From: ywb <347742090@qq.com> Date: Sat, 16 May 2026 10:23:17 +0800 Subject: [PATCH] =?UTF-8?q?Nav2=20=E5=AF=BC=E8=88=AA=E9=9B=86=E6=88=90?= =?UTF-8?q?=EF=BC=9A=E6=96=B0=E5=A2=9E=20nav2=5Fnavigator.py=EF=BC=8Cmissi?= =?UTF-8?q?on=5Fexecutor=20=E6=94=B9=E7=94=A8=20Nav2=20action=20=E5=AF=BC?= =?UTF-8?q?=E8=88=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 agv_app/utils/nav2_navigator.py:Nav2Navigator 类,通过 ros2 action /navigate_to_pose 和 /navigate_through_poses 与 Nav2 通信,支持路径点导航 - app.py:navigate/to, navigate/path, navigate/status 三个 API 改用 Nav2Navigator - mission_executor.py:_execute_point 中调用 _nav2_go_to_point() 替代原来的 time.sleep 模拟移动,Nav2 负责从当前点到目标点的自主导航 - 原有的 map_navigator.py(A* + Pure Pursuit 自实现)保留不动,供无 Nav2 时降级用 --- agv_app/app.py | 31 +- agv_app/data/models_config.json | 71 +++++ agv_app/timeout | 1 + agv_app/utils/mission_executor.py | 164 ++++++++++- agv_app/utils/nav2_navigator.py | 461 ++++++++++++++++++++++++++++++ 5 files changed, 710 insertions(+), 18 deletions(-) create mode 100644 agv_app/data/models_config.json create mode 100644 agv_app/timeout create mode 100644 agv_app/utils/nav2_navigator.py diff --git a/agv_app/app.py b/agv_app/app.py index 2ae9929..b518b41 100644 --- a/agv_app/app.py +++ b/agv_app/app.py @@ -17,7 +17,7 @@ from utils.agv_controller_ros2 import AGVController from utils.qr_scanner import QRScanner from utils.image_uploader import ImageUploader from utils.mission_executor import MissionExecutor, TaskStatus -from utils.map_navigator import MapNavigator, NavStatus +from utils.nav2_navigator import Nav2Navigator, Nav2Status # 配置日志 logging.basicConfig( @@ -51,7 +51,7 @@ class GlobalState: "positions": [] # 独立点位配置 [{row, col, side, coords, poses}] } self.machines_config = [] # 机器配置(每台机器的正面/背面点位+姿态) - self.navigator = None # MapNavigator 实例 + self.navigator = None # Nav2Navigator 实例 self.lock = threading.Lock() def reset(self): @@ -412,12 +412,13 @@ def api_navigate_to(): try: if gs.navigator is None: - gs.navigator = MapNavigator(gs.map_config["map_yaml"]) - ok = gs.navigator.navigate_to(float(goal_x), float(goal_y)) + gs.navigator = Nav2Navigator() + # navigate_to_pose(x, y, yaw=None, timeout_sec=120, blocking=False) + ok = gs.navigator.navigate_to_pose(float(goal_x), float(goal_y), blocking=False) if ok: return jsonify({"ok": True, "message": "导航已启动"}) else: - return jsonify({"ok": False, "error": "路径规划失败或导航正在进行中"}), 400 + return jsonify({"ok": False, "error": "导航启动失败,可能是Nav2未运行或AGV未连接"}), 400 except Exception as e: logger.error(f"导航失败: {e}") return jsonify({"ok": False, "error": str(e)}), 500 @@ -437,12 +438,12 @@ def api_navigate_status(): """获取导航状态""" if gs.navigator: return jsonify(gs.navigator.get_status()) - return jsonify({"status": "idle", "current_position": [0, 0, 0], "path_length": 0, "path": []}) + return jsonify({"status": "idle", "current_position": [0, 0, 0], "nav2_available": False}) @app.route("/api/navigate/path", methods=["POST"]) def api_navigate_path(): - """预览路径(仅规划不执行)""" + """预览路径(仅规划不执行)- Nav2版本不支持预计算路径,返回当前导航状态""" if not gs.map_config or "map_yaml" not in gs.map_config: return jsonify({"ok": False, "error": "地图未加载"}), 400 @@ -454,12 +455,16 @@ def api_navigate_path(): try: if gs.navigator is None: - gs.navigator = MapNavigator(gs.map_config["map_yaml"]) - path = gs.navigator.get_path_preview(float(goal_x), float(goal_y)) - if path is not None: - return jsonify({"ok": True, "path": path, "length": len(path)}) - else: - return jsonify({"ok": False, "error": "路径规划失败,目标点不可达"}), 400 + gs.navigator = Nav2Navigator() + # Nav2 不提供路径预览,直接返回可用状态 + current = gs.navigator.get_current_position() + status = gs.navigator.get_status() + return jsonify({ + "ok": True, + "message": "Nav2 路径预览不可用,请在 RViz 中查看规划路径", + "current_position": current, + "nav2_available": status.get("nav2_available", False) + }) except Exception as e: logger.error(f"路径预览失败: {e}") return jsonify({"ok": False, "error": str(e)}), 500 diff --git a/agv_app/data/models_config.json b/agv_app/data/models_config.json new file mode 100644 index 0000000..8a6b3c1 --- /dev/null +++ b/agv_app/data/models_config.json @@ -0,0 +1,71 @@ +[ + { + "id": "m_1778767289", + "name": "机型1", + "serial_prefix": "", + "poses": [ + { + "id": "pose_1778767302", + "name": "front_1", + "photo_type": "front", + "arm_angles": [ + 0, + 0, + 0, + 0, + 0, + 0 + ], + "speed": 500, + "description": "" + }, + { + "id": "pose_1778767312", + "name": "front_2", + "photo_type": "front", + "arm_angles": [ + 0, + 0, + 0, + 0, + 0, + 0 + ], + "speed": 500, + "description": "" + }, + { + "id": "pose_1778767323", + "name": "back_1", + "photo_type": "back", + "arm_angles": [ + 0, + 0, + 0, + 0, + 0, + 0 + ], + "speed": 500, + "description": "" + }, + { + "id": "pose_1778767325", + "name": "back_2", + "photo_type": "back", + "arm_angles": [ + 0, + 0, + 0, + 0, + 0, + 0 + ], + "speed": 500, + "description": "" + } + ], + "description": "", + "notes": "" + } +] \ No newline at end of file diff --git a/agv_app/timeout b/agv_app/timeout new file mode 100644 index 0000000..ce19ff1 --- /dev/null +++ b/agv_app/timeout @@ -0,0 +1 @@ +bash: line 1: /dev/null1: Permission denied diff --git a/agv_app/utils/mission_executor.py b/agv_app/utils/mission_executor.py index 182e407..b6ca4eb 100644 --- a/agv_app/utils/mission_executor.py +++ b/agv_app/utils/mission_executor.py @@ -5,16 +5,21 @@ import os import json import time import logging +import subprocess +import math from typing import List, Dict, Optional from enum import Enum from .arm_client import ArmClient -from .agv_controller import AGVController +from .agv_controller_ros2 import AGVController from .qr_scanner import QRScanner from .image_uploader import ImageUploader logger = logging.getLogger(__name__) +# ROS2 环境设置(与 agv_controller_ros2.py 保持一致) +ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" + class TaskStatus(Enum): PENDING = "pending" @@ -122,12 +127,16 @@ class MissionExecutor: "poses": [] } - # 1. AGV 移动到点位 + # 1. AGV 移动到点位(Nav2 导航) coords = point.get("coords", {}) x, y = coords.get("x", 0), coords.get("y", 0) - logger.info(f"AGV 移动到 ({x}, {y})") - # TODO: 调用导航移动到目标点 - time.sleep(1) # 模拟移动 + rz = coords.get("rz", 0.0) # 目标朝向 + logger.info(f"AGV Nav2 导航到 ({x}, {y}), yaw={rz}") + + nav_result = self._nav2_go_to_point(x, y, rz) + if not nav_result: + logger.warning(f"AGV 导航到点位 {point_name} 失败,跳过") + return {"point_name": point_name, "status": "failed", "error": "导航失败"} # 2. 执行该点位的所有姿态 poses = point.get("poses", []) @@ -206,6 +215,151 @@ class MissionExecutor: self.uploader.upload(photo_path, serial, pose_idx, photo_type) logger.info(f"上传完成: {serial} {photo_type}") + # ========== Nav2 导航 ========== + + def _nav2_check_available(self) -> bool: + """检查 Nav2 action server 是否可用""" + try: + rc, out, err = self._run_ros2_cmd("ros2 action list") + if rc != 0: + return False + return "/navigate_to_pose" in out or "navigate_to_pose" in out + except: + return False + + def _nav2_send_goal(self, x: float, y: float, yaw: float, timeout_sec: float = 120.0) -> bool: + """ + 通过 ros2 action 发送 navigate_to_pose 目标并等待结果 + + Returns: + 是否成功到达 + """ + # 检查 Nav2 可用性 + if not self._nav2_check_available(): + logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动") + return False + + # 构建四元数 + qz = math.sin(yaw / 2.0) + qw = math.cos(yaw / 2.0) + + pose_yaml = ( + f"pose:\n" + f" header:\n" + f" stamp:\n" + f" sec: 0\n" + f" nanosec: 0\n" + f" frame_id: 'map'\n" + f" position:\n" + f" x: {x}\n" + f" y: {y}\n" + f" z: 0.0\n" + f" orientation:\n" + f" x: 0.0\n" + f" y: 0.0\n" + f" z: {qz}\n" + f" w: {qw}" + ) + + logger.info(f"Nav2 发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°") + + # 发送 goal 并监听反馈和结果 + cmd = ( + f"ros2 action send_goal /navigate_to_pose " + f"navigation_action_msgs/NavigateToPose " + f"'{pose_yaml}' " + f"--feedback" + ) + + full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + + try: + process = subprocess.Popen( + full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + + succeeded = False + check_interval = 1.0 + elapsed = 0.0 + + while elapsed < timeout_sec: + import select + reads = [process.stdout] + ready, _, _ = select.select(reads, [], [], check_interval) + elapsed += check_interval + + if process.stdout in ready: + line = process.stdout.readline() + if not line: + break + line_stripped = line.strip() + + if "succeeded" in line_stripped.lower(): + logger.info("✅ Nav2 导航成功到达目标") + succeeded = True + break + elif "failed" in line_stripped.lower() or "aborted" in line_stripped.lower(): + logger.warning(f"⚠️ Nav2 导航失败: {line_stripped}") + break + elif "canceled" in line_stripped.lower() or "cancelled" in line_stripped.lower(): + logger.info("Nav2 导航被取消") + break + + if process.poll() is not None: + # 进程结束但没读到 succeeded + break + + # 确保进程结束 + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=3) + except subprocess.TimeoutExpired: + process.kill() + + return succeeded + + except Exception as e: + logger.error(f"Nav2 导航异常: {e}") + return False + + def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0, + timeout_sec: float = 120.0) -> bool: + """ + AGV 通过 Nav2 导航到目标点位 + + Args: + x, y: 目标世界坐标(米) + yaw: 目标朝向(弧度) + timeout_sec: 超时时间 + + Returns: + 是否成功到达 + """ + return self._nav2_send_goal(x, y, yaw, timeout_sec) + + def _nav2_cancel(self): + """取消当前 Nav2 导航""" + cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'" + try: + subprocess.run(cancel_cmd, shell=True, timeout=3) + logger.info("Nav2 导航已取消") + except: + pass + + def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: + """执行 ros2 命令""" + full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + try: + result = subprocess.run( + full_cmd, shell=True, capture_output=True, text=True, timeout=timeout + ) + return result.returncode, result.stdout.strip(), result.stderr.strip() + except subprocess.TimeoutExpired: + return -1, "", "Timeout" + except Exception as e: + return -1, "", str(e) + # ========== 状态查询 ========== def get_status(self) -> dict: diff --git a/agv_app/utils/nav2_navigator.py b/agv_app/utils/nav2_navigator.py new file mode 100644 index 0000000..f3506ab --- /dev/null +++ b/agv_app/utils/nav2_navigator.py @@ -0,0 +1,461 @@ +""" +Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航 +使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action + +依赖:ROS2 humble, nav2_bringup 已启动 +不依赖:自己实现 A*、路径跟踪 +""" + +import time +import math +import logging +import threading +import subprocess +from typing import List, Tuple, Optional, Dict +from enum import Enum + +logger = logging.getLogger(__name__) + +# ROS2 环境设置(与 agv_controller_ros2.py 保持一致) +ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" + + +class Nav2Status(Enum): + IDLE = "idle" + NAVIGATING = "navigating" + SUCCEEDED = "succeeded" + FAILED = "failed" + CANCELLED = "cancelled" + UNKNOWN = "unknown" + + +class Nav2Navigator: + """Nav2 导航器 — 通过 ROS2 Action Server 发送导航目标""" + + def __init__(self): + self.status = Nav2Status.IDLE + self._nav_thread: Optional[threading.Thread] = None + self._cancel_event = threading.Event() + self._result_status = None + self._current_pose = [0.0, 0.0, 0.0] + + def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: + """执行 ros2 命令""" + full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + try: + result = subprocess.run( + full_cmd, shell=True, capture_output=True, text=True, timeout=timeout + ) + return result.returncode, result.stdout.strip(), result.stderr.strip() + except subprocess.TimeoutExpired: + return -1, "", "Timeout" + except Exception as e: + return -1, "", str(e) + + def _check_nav2_available(self) -> bool: + """检查 Nav2 action server 是否可用""" + rc, out, err = self._run_ros2_cmd("ros2 action list") + if rc != 0: + return False + return "/navigate_to_pose" in out or "navigate_to_pose" in out + + def _get_current_pose(self) -> List[float]: + """从 /odom 获取当前位置 [x, y, yaw]""" + try: + rc, out, err = self._run_ros2_cmd( + "timeout 5 ros2 topic echo /odom --once 2>/dev/null", + timeout=6 + ) + if rc == 0 and out: + import re, yaml + clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) + yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip() + yaml_str = yaml_str.split('---')[0] + data = yaml.safe_load(yaml_str) + if data: + pos = data.get("pose", {}).get("pose", {}).get("position", {}) + orient = data.get("pose", {}).get("pose", {}).get("orientation", {}) + x = pos.get("x", 0.0) + y = pos.get("y", 0.0) + qz = orient.get("z", 0.0) + qw = orient.get("w", 1.0) + yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz) + self._current_pose = [x, y, yaw] + return self._current_pose + except Exception as e: + logger.debug(f"获取 odom 失败: {e}") + return self._current_pose + + def navigate_to_pose(self, x: float, y: float, yaw: float = None, + timeout_sec: float = 120.0, + blocking: bool = True) -> bool: + """ + 导航到目标坐标(Nav2 navigate_to_pose action) + + Args: + x, y: 目标位置(世界坐标,米) + yaw: 目标朝向(弧度),None 则保持当前朝向 + timeout_sec: 超时时间(秒) + blocking: 是否阻塞等待导航完成 + + Returns: + blocking=True: 是否成功到达 + blocking=False: 是否成功启动导航 + """ + if self.status == Nav2Status.NAVIGATING: + logger.warning("导航正在进行中,请先停止当前导航") + return False + + # 检查 Nav2 是否可用 + if not self._check_nav2_available(): + logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动") + return False + + # 构建 pose 消息(geometry_msgs/Pose) + if yaw is None: + current = self._get_current_pose() + yaw = current[2] + + # 计算四元数(只有 yaw) + qz = math.sin(yaw / 2.0) + qw = math.cos(yaw / 2.0) + + pose_yaml = ( + f"pose:\\n" + f" header:\\n" + f" stamp:\\n" + f" sec: 0\\n" + f" nanosec: 0\\n" + f" frame_id: 'map'\\n" + f" position:\\n" + f" x: {x}\\n" + f" y: {y}\\n" + f" z: 0.0\\n" + f" orientation:\\n" + f" x: 0.0\\n" + f" y: 0.0\\n" + f" z: {qz}\\n" + f" w: {qw}" + ) + + logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°") + + # 启动导航线程 + self._cancel_event.clear() + self._result_status = None + self.status = Nav2Status.NAVIGATING + + self._nav_thread = threading.Thread( + target=self._nav_thread_func, + args=(x, y, yaw, timeout_sec), + daemon=True + ) + self._nav_thread.start() + + if blocking: + self._nav_thread.join(timeout=timeout_sec + 10) + return self.status == Nav2Status.SUCCEEDED + + return True + + def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float): + """导航执行线程""" + try: + # 计算四元数 + qz = math.sin(yaw / 2.0) + qw = math.cos(yaw / 2.0) + + # 使用 ros2 action send_goal 发送导航目标 + # Nav2 的 navigate_to_pose action 接受 geometry_msgs/PoseStamped + pose_yaml = ( + f"pose:\\n" + f" header:\\n" + f" stamp:\\n" + f" sec: 0\\n" + f" nanosec: 0\\n" + f" frame_id: 'map'\\n" + f" position:\\n" + f" x: {x}\\n" + f" y: {y}\\n" + f" z: 0.0\\n" + f" orientation:\\n" + f" x: 0.0\\n" + f" y: 0.0\\n" + f" z: {qz}\\n" + f" w: {qw}" + ) + + # 发送 goal,保持连接获取结果 + cmd = ( + f"ros2 action send_goal /navigate_to_pose " + f"navigation_action_msgs/NavigateToPose " + f"'{pose_yaml}' " + f"--feedback" + ) + + full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + + # 用 popen 持续读取反馈,直到完成或取消 + process = subprocess.Popen( + full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + + feedback_received = False + result_received = False + cancel_check_interval = 1.0 # 每秒检查一次取消标志 + + while not self._cancel_event.is_set(): + # 非阻塞读取 stdout + import select + reads = [process.stdout] + ready, _, _ = select.select(reads, [], [], cancel_check_interval) + + if process.stdout in ready: + line = process.stdout.readline() + if not line: + break + line = line.strip() + + # 解析反馈 + if "feedback" in line.lower() or "distance_remaining" in line: + feedback_received = True + logger.debug(f"Nav2 feedback: {line[:100]}") + # 解析结果 + if "succeeded" in line.lower(): + self._result_status = "succeeded" + result_received = True + logger.info("✅ Nav2 导航成功到达目标") + break + elif "failed" in line.lower() or "aborted" in line.lower(): + self._result_status = "failed" + result_received = True + logger.warning(f"⚠️ Nav2 导航失败: {line}") + break + elif "canceled" in line.lower() or "cancelled" in line.lower(): + self._result_status = "cancelled" + result_received = True + logger.info("Nav2 导航被取消") + break + + # 检查进程是否结束 + if process.poll() is not None: + # 进程已结束但没读到结果 + if not result_received: + self._result_status = "failed" + break + + # 处理取消或超时 + if self._cancel_event.is_set() and not result_received: + logger.info("取消 Nav2 导航...") + # 发送取消命令 + cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose'" + subprocess.run(cancel_cmd, shell=True, timeout=3) + self._result_status = "cancelled" + + # 确保进程结束 + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=3) + except subprocess.TimeoutExpired: + process.kill() + + # 设置最终状态 + if self._result_status == "succeeded": + self.status = Nav2Status.SUCCEEDED + elif self._result_status == "cancelled": + self.status = Nav2Status.CANCELLED + else: + self.status = Nav2Status.FAILED + + except Exception as e: + logger.error(f"Nav2 导航异常: {e}") + self.status = Nav2Status.FAILED + + def navigate_through_poses(self, poses: List[Tuple[float, float, float]], + timeout_per_pose: float = 120.0, + blocking: bool = True) -> bool: + """ + 导航通过多个路径点(Nav2 navigate_through_poses action) + + Args: + poses: [(x, y, yaw), ...] 路径点列表 + timeout_per_pose: 每个点位的超时(秒) + blocking: 是否阻塞等待完成 + + Returns: + 是否全部成功 + """ + if self.status == Nav2Status.NAVIGATING: + logger.warning("导航正在进行中,请先停止") + return False + + if not self._check_nav2_available(): + logger.error("Nav2 action server 不可用") + return False + + if not poses: + return True + + # 构建 poses 列表 + poses_yaml_parts = [] + for i, (x, y, yaw) in enumerate(poses): + qz = math.sin(yaw / 2.0) + qw = math.cos(yaw / 2.0) + poses_yaml_parts.append( + f" - pose:\\n" + f" header:\\n" + f" stamp:\\n" + f" sec: 0\\n" + f" nanosec: 0\\n" + f" frame_id: 'map'\\n" + f" position:\\n" + f" x: {x}\\n" + f" y: {y}\\n" + f" z: 0.0\\n" + f" orientation:\\n" + f" x: 0.0\\n" + f" y: 0.0\\n" + f" z: {qz}\\n" + f" w: {qw}" + ) + + poses_yaml = "poses:\n" + "\n".join(poses_yaml_parts) + logger.info(f"发送 {len(poses)} 个路径点的导航任务") + + self._cancel_event.clear() + self.status = Nav2Status.NAVIGATING + + self._nav_thread = threading.Thread( + target=self._nav_thread_func_multi, + args=(poses, timeout_per_pose), + daemon=True + ) + self._nav_thread.start() + + if blocking: + total_timeout = len(poses) * timeout_per_pose + 30 + self._nav_thread.join(timeout=total_timeout) + return self.status == Nav2Status.SUCCEEDED + + return True + + def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float): + """多路径点导航线程""" + try: + poses_yaml_parts = [] + for i, (x, y, yaw) in enumerate(poses): + qz = math.sin(yaw / 2.0) + qw = math.cos(yaw / 2.0) + poses_yaml_parts.append( + f" - pose:\\n" + f" header:\\n" + f" stamp:\\n" + f" sec: 0\\n" + f" nanosec: 0\\n" + f" frame_id: 'map'\\n" + f" position:\\n" + f" x: {x}\\n" + f" y: {y}\\n" + f" z: 0.0\\n" + f" orientation:\\n" + f" x: 0.0\\n" + f" y: 0.0\\n" + f" z: {qz}\\n" + f" w: {qw}" + ) + + poses_yaml = "poses:\n" + "\n".join(poses_yaml_parts) + + cmd = ( + f"ros2 action send_goal /navigate_through_poses " + f"navigation_action_msgs/NavigateThroughPoses " + f"'{poses_yaml}' " + f"--feedback" + ) + + full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + + process = subprocess.Popen( + full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + + result_received = False + + while not self._cancel_event.is_set(): + import select + reads = [process.stdout] + ready, _, _ = select.select(reads, [], [], 1.0) + + if process.stdout in ready: + line = process.stdout.readline() + if not line: + break + line = line.strip() + + if "succeeded" in line.lower(): + logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") + self._result_status = "succeeded" + result_received = True + break + elif "failed" in line.lower() or "aborted" in line.lower(): + logger.warning(f"⚠️ Nav2 路径点导航失败: {line}") + self._result_status = "failed" + result_received = True + break + elif "canceled" in line.lower() or "cancelled" in line.lower(): + self._result_status = "cancelled" + result_received = True + break + + if process.poll() is not None: + if not result_received: + self._result_status = "failed" + break + + if self._cancel_event.is_set() and not result_received: + logger.info("取消路径点导航...") + cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_through_poses'" + subprocess.run(cancel_cmd, shell=True, timeout=3) + self._result_status = "cancelled" + + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=3) + except subprocess.TimeoutExpired: + process.kill() + + if self._result_status == "succeeded": + self.status = Nav2Status.SUCCEEDED + elif self._result_status == "cancelled": + self.status = Nav2Status.CANCELLED + else: + self.status = Nav2Status.FAILED + + except Exception as e: + logger.error(f"Nav2 路径点导航异常: {e}") + self.status = Nav2Status.FAILED + + def stop(self): + """取消当前导航""" + if self.status == Nav2Status.NAVIGATING: + self._cancel_event.set() + # 发送取消命令 + cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'" + subprocess.run(cancel_cmd, shell=True, timeout=3) + self.status = Nav2Status.CANCELLED + logger.info("导航已停止") + + def get_status(self) -> Dict: + """获取导航状态""" + current = self._get_current_pose() + return { + "status": self.status.value, + "current_position": current, + "nav2_available": self._check_nav2_available() + } + + def get_current_position(self) -> List[float]: + """获取当前位置 [x, y, yaw]""" + return self._get_current_pose() \ No newline at end of file