From 4e3cb4d3ef692d8b2d8303a11ce4b326f0424b8a Mon Sep 17 00:00:00 2001 From: ywb <347742090@qq.com> Date: Sat, 16 May 2026 19:18:18 +0800 Subject: [PATCH] =?UTF-8?q?Nav2=E5=AF=BC=E8=88=AA=20heredoc=20pipe?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E4=BF=AE=E5=A4=8D=20+=20=E9=87=8D=E5=90=AF?= =?UTF-8?q?=E8=84=9A=E6=9C=AC=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit nav2_navigator.py: 改用 subprocess.Popen + stdin heredoc 避免bash单引号转义 restart_agv.sh: 清理所有旧进程再重启(bringup → Nav2 → Flask) app.py: init_pose改用 heredoc 子进程脚本方式 --- agv_app/app.py | 4 +- agv_app/utils/agv_controller_ros2.py | 4 +- agv_app/utils/mission_executor.py | 2 +- agv_app/utils/nav2_navigator.py | 449 +++++++++++++-------------- 4 files changed, 218 insertions(+), 241 deletions(-) diff --git a/agv_app/app.py b/agv_app/app.py index 2cb7cf2..b27e229 100644 --- a/agv_app/app.py +++ b/agv_app/app.py @@ -648,7 +648,7 @@ def api_mission_init_pose(): script = "/tmp/ros2_init_pose.sh" lines = [ "#!/bin/bash", - "export ROS_DOMAIN_ID=1", + "export ROS_DOMAIN_ID=0", "source /opt/ros/humble/setup.bash", "source ~/agv_pro_ros2/install/setup.bash", "python3 /tmp/publish_init_pose.py", @@ -659,7 +659,7 @@ def api_mission_init_pose(): result = subprocess.run( [script], capture_output=True, text=True, timeout=12, - env={**os.environ, "ROS_DOMAIN_ID": "1"} + env={**os.environ, "ROS_DOMAIN_ID": "0"} ) logger.info(f"init_pose: rc={result.returncode}") return jsonify({"ok": True, "message": "初始位置已设为 (0,0,0)"}) diff --git a/agv_app/utils/agv_controller_ros2.py b/agv_app/utils/agv_controller_ros2.py index 7f2f34b..6572dc4 100644 --- a/agv_app/utils/agv_controller_ros2.py +++ b/agv_app/utils/agv_controller_ros2.py @@ -12,7 +12,7 @@ from typing import Tuple, Optional, List logger = logging.getLogger(__name__) # ROS2 环境设置 -ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" +ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" class AGVController: @@ -28,7 +28,7 @@ class AGVController: def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: """执行 ros2 命令""" - full_cmd = f"bash -c 'export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash && {cmd}'" + full_cmd = f"bash -l -c 'source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && {cmd}'" try: result = subprocess.run( full_cmd, diff --git a/agv_app/utils/mission_executor.py b/agv_app/utils/mission_executor.py index 2e531fa..83e1c7a 100644 --- a/agv_app/utils/mission_executor.py +++ b/agv_app/utils/mission_executor.py @@ -18,7 +18,7 @@ from .image_uploader import ImageUploader logger = logging.getLogger(__name__) # ROS2 环境设置(与 agv_controller_ros2.py 保持一致) -ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" +ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" class TaskStatus(Enum): diff --git a/agv_app/utils/nav2_navigator.py b/agv_app/utils/nav2_navigator.py index 1d85351..d0447ab 100644 --- a/agv_app/utils/nav2_navigator.py +++ b/agv_app/utils/nav2_navigator.py @@ -2,22 +2,29 @@ Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航 使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action -依赖:ROS2 humble, nav2_bringup 已启动 -不依赖:自己实现 A*、路径跟踪 +通过写入临时 shell 脚本再执行,避免 bash -c 单引号转义问题 """ - import time import math import logging import threading import subprocess +import os from typing import List, Tuple, Optional, Dict from enum import Enum logger = logging.getLogger(__name__) -# ROS2 环境设置(与 agv_controller_ros2.py 保持一致) -ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" + +def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: + """执行 ros2 命令(通过 bash -l 避免环境问题)""" + setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" + full_cmd = f"bash -l -c '{setup} && {cmd}'" + try: + result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout) + return result.returncode, result.stdout.strip(), result.stderr.strip() + except subprocess.TimeoutExpired: + return -1, "", "Timeout" class Nav2Status(Enum): @@ -30,7 +37,9 @@ class Nav2Status(Enum): class Nav2Navigator: - """Nav2 导航器 — 通过 ROS2 Action Server 发送导航目标""" + """Nav2 导航器 — 通过 ros2 action /navigate_to_pose 与 Nav2 通信""" + + NAVIGATE_SCRIPT = "/tmp/nav2_send_goal.sh" def __init__(self): self.status = Nav2Status.IDLE @@ -38,23 +47,51 @@ class Nav2Navigator: self._cancel_event = threading.Event() self._result_status = None self._current_pose = [0.0, 0.0, 0.0] + self._build_script() - def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: - """执行 ros2 命令""" - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" - try: - result = subprocess.run( - full_cmd, shell=True, capture_output=True, text=True, timeout=timeout - ) - return result.returncode, result.stdout.strip(), result.stderr.strip() - except subprocess.TimeoutExpired: - return -1, "", "Timeout" - except Exception as e: - return -1, "", str(e) + def _build_script(self): + """生成 Nav2 action 发送脚本(通过 heredoc 避免所有 bash 转义问题)""" + script = self.NAVIGATE_SCRIPT + content = [ + "#!/bin/bash", + "source /opt/ros/humble/setup.bash", + "source /home/elephant/agv_pro_ros2/install/setup.bash", + "", + "TARGET_X=${1:-0}", + "TARGET_Y=${2:-0}", + "TARGET_YAW=${3:-0}", + "", + "# 计算四元数", + "YAW_QZ=$(python3 -c \"import math; print(math.sin($TARGET_YAW / 2.0))\")", + "YAW_QW=$(python3 -c \"import math; print(math.cos($TARGET_YAW / 2.0))\")", + "", + "# 通过 stdin pipe 发送 goal", + 'cat << \'GOAL\' | ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose - --feedback', + "pose:", + " header:", + " stamp:", + " sec: 0", + " nanosec: 0", + " frame_id: map", + " pose:", + " position:", + " x: $TARGET_X", + " y: $TARGET_Y", + " z: 0.0", + " orientation:", + " x: 0.0", + " y: 0.0", + " z: $YAW_QZ", + " w: $YAW_QW", + "GOAL", + ] + with open(script, "w") as f: + f.write("\n".join(content) + "\n") + os.chmod(script, 0o755) def _check_nav2_available(self) -> bool: """检查 Nav2 action server 是否可用""" - rc, out, err = self._run_ros2_cmd("ros2 action list") + rc, out, err = _run_ros2_bash("ros2 action list") if rc != 0: return False return "/navigate_to_pose" in out or "navigate_to_pose" in out @@ -62,10 +99,7 @@ class Nav2Navigator: def _get_current_pose(self) -> List[float]: """从 /odom 获取当前位置 [x, y, yaw]""" try: - rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /odom --once 2>/dev/null", - timeout=6 - ) + rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /odom --once 2>/dev/null", timeout=4) if rc == 0 and out: import re, yaml clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) @@ -106,38 +140,14 @@ class Nav2Navigator: logger.warning("导航正在进行中,请先停止当前导航") return False - # 检查 Nav2 是否可用 if not self._check_nav2_available(): logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动") return False - # 构建 pose 消息(geometry_msgs/Pose) if yaw is None: current = self._get_current_pose() yaw = current[2] - # 计算四元数(只有 yaw) - qz = math.sin(yaw / 2.0) - qw = math.cos(yaw / 2.0) - - pose_yaml = ( - f"pose:\\n" - f" header:\\n" - f" stamp:\\n" - f" sec: 0\\n" - f" nanosec: 0\\n" - f" frame_id: 'map'\\n" - f" position:\\n" - f" x: {x}\\n" - f" y: {y}\\n" - f" z: 0.0\\n" - f" orientation:\\n" - f" x: 0.0\\n" - f" y: 0.0\\n" - f" z: {qz}\\n" - f" w: {qw}" - ) - logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°") # 启动导航线程 @@ -159,101 +169,100 @@ class Nav2Navigator: return True def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float): - """导航执行线程""" + """导航执行线程 — 使用 heredoc 脚本避免 bash 转义问题""" try: - # 计算四元数 + logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s") + start_time = time.time() + elapsed = 0.0 + check_interval = 1.0 + result_received = False + + # 使用 pipe 方式运行导航 + # 通过 subprocess.Popen + stdin heredoc 避免所有引号转义问题 + setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) - # 使用 ros2 action send_goal 发送导航目标 - # Nav2 的 navigate_to_pose action 接受 geometry_msgs/PoseStamped - pose_yaml = ( - f"pose:\\n" - f" header:\\n" - f" stamp:\\n" - f" sec: 0\\n" - f" nanosec: 0\\n" - f" frame_id: 'map'\\n" - f" pose:\\n" - f" position:\\n" - f" x: {x}\\n" - f" y: {y}\\n" - f" z: 0.0\\n" - f" orientation:\\n" - f" x: 0.0\\n" - f" y: 0.0\\n" - f" z: {qz}\\n" - f" w: {qw}" + # 构建 heredoc 内容 + heredoc = ( + f"pose:\n" + f" header:\n" + f" stamp:\n" + f" sec: 0\n" + f" nanosec: 0\n" + f" frame_id: map\n" + f" pose:\n" + f" position:\n" + f" x: {x}\n" + f" y: {y}\n" + f" z: 0.0\n" + f" orientation:\n" + f" x: 0.0\n" + f" y: 0.0\n" + f" z: {qz}\n" + f" w: {qw}\n" ) - # 发送 goal,保持连接获取结果 - cmd = ( - f"ros2 action send_goal /navigate_to_pose " - f"nav2_msgs/action/NavigateToPose " - f"'{pose_yaml}' " - f"--feedback" - ) + cmd = f'{setup} && ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose - --feedback' - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" - - # 用 popen 持续读取反馈,直到完成或取消 process = subprocess.Popen( - full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) - feedback_received = False - result_received = False - cancel_check_interval = 1.0 # 每秒检查一次取消标志 + stdout_lines = [] + stderr_lines = [] + stop_reading = threading.Event() - while not self._cancel_event.is_set(): - # 非阻塞读取 stdout - import select - reads = [process.stdout] - ready, _, _ = select.select(reads, [], [], cancel_check_interval) + def reader(pipe, lines): + for line in iter(pipe.readline, ''): + if stop_reading.is_set(): + break + lines.append(line) - if process.stdout in ready: - line = process.stdout.readline() - if not line: - break - line = line.strip() + out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines)) + err_thread = threading.Thread(target=reader, args=(process.stderr, stderr_lines)) + out_thread.start() + err_thread.start() - # 解析反馈 - if "feedback" in line.lower() or "distance_remaining" in line: - feedback_received = True - logger.debug(f"Nav2 feedback: {line[:100]}") - # 解析结果 - if "succeeded" in line.lower(): - self._result_status = "succeeded" - result_received = True - logger.info("✅ Nav2 导航成功到达目标") - break - elif "failed" in line.lower() or "aborted" in line.lower(): - self._result_status = "failed" - result_received = True - logger.warning(f"⚠️ Nav2 导航失败: {line}") - break - elif "canceled" in line.lower() or "cancelled" in line.lower(): - self._result_status = "cancelled" - result_received = True - logger.info("Nav2 导航被取消") - break + # 写入 heredoc 数据 + try: + process.stdin.write(heredoc + "\n") + process.stdin.flush() + process.stdin.close() + except Exception as e: + logger.error(f"写入 heredoc 失败: {e}") - # 检查进程是否结束 - if process.poll() is not None: - # 进程已结束但没读到结果 - if not result_received: - self._result_status = "failed" + # 轮询检查结果 + while not self._cancel_event.is_set() and elapsed < timeout_sec: + time.sleep(check_interval) + elapsed = time.time() - start_time + + full_out = "".join(stdout_lines) + if "succeeded" in full_out.lower(): + logger.info("✅ Nav2 导航成功到达目标") + self._result_status = "succeeded" + result_received = True + break + if any(k in full_out.lower() for k in ["failed", "aborted"]): + logger.warning(f"⚠️ Nav2 导航失败: {full_out[-200:]}") + self._result_status = "failed" + result_received = True + break + if any(k in full_out.lower() for k in ["canceled", "cancelled"]): + logger.info("Nav2 导航被取消") + self._result_status = "cancelled" + result_received = True break - # 处理取消或超时 - if self._cancel_event.is_set() and not result_received: - logger.info("取消 Nav2 导航...") - # 发送取消命令 - cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose'" - subprocess.run(cancel_cmd, shell=True, timeout=3) - self._result_status = "cancelled" + # 检查进程是否结束但没读到结果 + if process.poll() is not None and not result_received: + if full_out: + logger.warning(f"Nav2 进程意外结束,输出: {full_out[:200]}") + self._result_status = "failed" + break - # 确保进程结束 + # 善后 + stop_reading.set() if process.poll() is None: process.terminate() try: @@ -261,7 +270,15 @@ class Nav2Navigator: except subprocess.TimeoutExpired: process.kill() - # 设置最终状态 + out_thread.join(timeout=2) + err_thread.join(timeout=2) + + if self._cancel_event.is_set() and not result_received: + logger.info("取消 Nav2 导航...") + cancel_cmd = f"bash -l -c '{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'" + subprocess.run(cancel_cmd, shell=True, timeout=3) + self._result_status = "cancelled" + if self._result_status == "succeeded": self.status = Nav2Status.SUCCEEDED elif self._result_status == "cancelled": @@ -276,55 +293,17 @@ class Nav2Navigator: def navigate_through_poses(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float = 120.0, blocking: bool = True) -> bool: - """ - 导航通过多个路径点(Nav2 navigate_through_poses action) - - Args: - poses: [(x, y, yaw), ...] 路径点列表 - timeout_per_pose: 每个点位的超时(秒) - blocking: 是否阻塞等待完成 - - Returns: - 是否全部成功 - """ + """导航通过多个路径点(Nav2 navigate_through_poses action)""" if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止") return False - if not self._check_nav2_available(): logger.error("Nav2 action server 不可用") return False - if not poses: return True - # 构建 poses 列表 - poses_yaml_parts = [] - for i, (x, y, yaw) in enumerate(poses): - qz = math.sin(yaw / 2.0) - qw = math.cos(yaw / 2.0) - poses_yaml_parts.append( - f" - pose:\\n" - f" header:\\n" - f" stamp:\\n" - f" sec: 0\\n" - f" nanosec: 0\\n" - f" frame_id: 'map'\\n" - f" pose:\\n" - f" position:\\n" - f" x: {x}\\n" - f" y: {y}\\n" - f" z: 0.0\\n" - f" orientation:\\n" - f" x: 0.0\\n" - f" y: 0.0\\n" - f" z: {qz}\\n" - f" w: {qw}" - ) - - poses_yaml = "poses:\n" + "\n".join(poses_yaml_parts) logger.info(f"发送 {len(poses)} 个路径点的导航任务") - self._cancel_event.clear() self.status = Nav2Status.NAVIGATING @@ -339,96 +318,94 @@ class Nav2Navigator: total_timeout = len(poses) * timeout_per_pose + 30 self._nav_thread.join(timeout=total_timeout) return self.status == Nav2Status.SUCCEEDED - return True def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float): """多路径点导航线程""" try: - poses_yaml_parts = [] + setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" + + # 构建 poses heredoc + poses_lines = [] for i, (x, y, yaw) in enumerate(poses): qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) - poses_yaml_parts.append( - f" - pose:\\n" - f" header:\\n" - f" stamp:\\n" - f" sec: 0\\n" - f" nanosec: 0\\n" - f" frame_id: 'map'\\n" - f" pose:\\n" - f" position:\\n" - f" x: {x}\\n" - f" y: {y}\ -" - f" z: 0.0\\n" - f" orientation:\\n" - f" x: 0.0\\n" - f" y: 0.0\\n" - f" z: {qz}\\n" - f" w: {qw}" - ) + poses_lines.extend([ + f" - pose:", + f" header:", + f" stamp:", + f" sec: 0", + f" nanosec: 0", + f" frame_id: map", + f" pose:", + f" position:", + f" x: {x}", + f" y: {y}", + f" z: 0.0", + f" orientation:", + f" x: 0.0", + f" y: 0.0", + f" z: {qz}", + f" w: {qw}", + ]) + poses_yaml = "poses:\n" + "\n".join(poses_lines) - poses_yaml = "poses:\n" + "\n".join(poses_yaml_parts) - - cmd = ( - f"ros2 action send_goal /navigate_through_poses " - f"nav2_msgs/action/NavigateThroughPoses " - f"'{poses_yaml}' " - f"--feedback" - ) - - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + cmd = f'{setup} && ros2 action send_goal /navigate_through_poses nav2_msgs/action/NavigateThroughPoses - --feedback' process = subprocess.Popen( - full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) + stdout_lines = [] + stop_reading = threading.Event() + + def reader(pipe, lines): + for line in iter(pipe.readline, ''): + if stop_reading.is_set(): + break + lines.append(line) + + out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines)) + out_thread.start() + + try: + process.stdin.write(poses_yaml + "\n") + process.stdin.flush() + process.stdin.close() + except Exception as e: + logger.error(f"写入 poses heredoc 失败: {e}") + self.status = Nav2Status.FAILED + return + result_received = False - while not self._cancel_event.is_set(): - import select - reads = [process.stdout] - ready, _, _ = select.select(reads, [], [], 1.0) + time.sleep(1.0) + full_out = "".join(stdout_lines) - if process.stdout in ready: - line = process.stdout.readline() - if not line: - break - line = line.strip() - - if "succeeded" in line.lower(): - logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") - self._result_status = "succeeded" - result_received = True - break - elif "failed" in line.lower() or "aborted" in line.lower(): - logger.warning(f"⚠️ Nav2 路径点导航失败: {line}") - self._result_status = "failed" - result_received = True - break - elif "canceled" in line.lower() or "cancelled" in line.lower(): - self._result_status = "cancelled" - result_received = True - break - - if process.poll() is not None: - if not result_received: - self._result_status = "failed" + if "succeeded" in full_out.lower(): + logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") + self._result_status = "succeeded" + result_received = True + break + if any(k in full_out.lower() for k in ["failed", "aborted"]): + logger.warning(f"⚠️ Nav2 路径点导航失败") + self._result_status = "failed" + result_received = True + break + if any(k in full_out.lower() for k in ["canceled", "cancelled"]): + self._result_status = "cancelled" + result_received = True break - if self._cancel_event.is_set() and not result_received: - logger.info("取消路径点导航...") - cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_through_poses'" - subprocess.run(cancel_cmd, shell=True, timeout=3) - self._result_status = "cancelled" + if process.poll() is not None and not result_received: + self._result_status = "failed" + break + stop_reading.set() if process.poll() is None: process.terminate() - try: - process.wait(timeout=3) - except subprocess.TimeoutExpired: - process.kill() + process.wait(timeout=3) + out_thread.join(timeout=2) if self._result_status == "succeeded": self.status = Nav2Status.SUCCEEDED @@ -445,8 +422,8 @@ class Nav2Navigator: """取消当前导航""" if self.status == Nav2Status.NAVIGATING: self._cancel_event.set() - # 发送取消命令 - cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'" + setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" + cancel_cmd = f"bash -l -c '{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'" subprocess.run(cancel_cmd, shell=True, timeout=3) self.status = Nav2Status.CANCELLED logger.info("导航已停止")