""" Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航 使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action 通过写入临时 shell 脚本再执行,避免 bash -c 单引号转义问题 """ import time import math import logging import threading import subprocess import os from typing import List, Tuple, Optional, Dict from enum import Enum logger = logging.getLogger(__name__) def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: """执行 ros2 命令(直接运行,不二次包装)""" setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" full_cmd = f"bash -l -c '{setup} && {cmd}'" try: result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout) return result.returncode, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Timeout" class Nav2Status(Enum): IDLE = "idle" NAVIGATING = "navigating" SUCCEEDED = "succeeded" FAILED = "failed" CANCELLED = "cancelled" UNKNOWN = "unknown" class Nav2Navigator: """Nav2 导航器 — 通过 ros2 action /navigate_to_pose 与 Nav2 通信""" NAVIGATE_SCRIPT = "/tmp/nav2_send_goal.sh" def __init__(self): self.status = Nav2Status.IDLE self._nav_thread: Optional[threading.Thread] = None self._cancel_event = threading.Event() self._result_status = None self._current_pose = [0.0, 0.0, 0.0] self._build_script() def _build_script(self): """生成 Nav2 action 发送脚本(通过 heredoc 避免所有 bash 转义问题)""" script = self.NAVIGATE_SCRIPT content = [ "#!/bin/bash", "source /opt/ros/humble/setup.bash", "source /home/elephant/agv_pro_ros2/install/setup.bash", "", "TARGET_X=${1:-0}", "TARGET_Y=${2:-0}", "TARGET_YAW=${3:-0}", "", "# 计算四元数", "YAW_QZ=$(python3 -c \"import math; print(math.sin($TARGET_YAW / 2.0))\")", "YAW_QW=$(python3 -c \"import math; print(math.cos($TARGET_YAW / 2.0))\")", "", "# 通过 stdin pipe 发送 goal", 'cat << \'GOAL\' | ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose - --feedback', "pose:", " header:", " stamp:", " sec: 0", " nanosec: 0", " frame_id: map", " pose:", " position:", " x: $TARGET_X", " y: $TARGET_Y", " z: 0.0", " orientation:", " x: 0.0", " y: 0.0", " z: $YAW_QZ", " w: $YAW_QW", "GOAL", ] with open(script, "w") as f: f.write("\n".join(content) + "\n") os.chmod(script, 0o755) def _check_nav2_available(self) -> bool: """检查 Nav2 action server 是否可用(带缓存 10 秒)""" import time as time_mod if hasattr(self, '_nav2_check_time') and hasattr(self, '_nav2_check_result'): if time_mod.time() - self._nav2_check_time < 10: return self._nav2_check_result # 写入脚本文件执行,兼容 Flask subprocess 环境 script = ( "#!/bin/bash\n" "source /opt/ros/humble/setup.bash 2>/dev/null\n" "source /home/elephant/agv_pro_ros2/install/setup.bash 2>/dev/null\n" "export ROS_DOMAIN_ID=1\n" "ros2 action list 2>&1\n" ) script_file = "/tmp/nav2_check.sh" with open(script_file, "w") as f: f.write(script) import os os.chmod(script_file, 0o755) r = subprocess.run([script_file], capture_output=True, text=True, timeout=8) rc = r.returncode out = r.stdout err = r.stderr result = rc == 0 and ("/navigate_to_pose" in out or "navigate_to_pose" in out) logger.debug(f"_check_nav2_available: rc={rc}, out={out[:100]}, result={result}") self._nav2_check_result = result self._nav2_check_time = time_mod.time() return result def _get_current_pose(self) -> List[float]: """从 /odom 获取当前位置 [x, y, yaw]""" try: rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /amcl_pose --once 2>/dev/null", timeout=4) if rc == 0 and out: import re, yaml clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip() yaml_str = yaml_str.split('---')[0] data = yaml.safe_load(yaml_str) if data: pos = data.get("pose", {}).get("pose", {}).get("position", {}) orient = data.get("pose", {}).get("pose", {}).get("orientation", {}) x = pos.get("x", 0.0) y = pos.get("y", 0.0) qz = orient.get("z", 0.0) qw = orient.get("w", 1.0) yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz) self._current_pose = [x, y, yaw] return self._current_pose except Exception as e: logger.debug(f"获取 odom 失败: {e}") return self._current_pose def navigate_to_pose(self, x: float, y: float, yaw: float = None, timeout_sec: float = 120.0, blocking: bool = True) -> bool: """ 导航到目标坐标(Nav2 navigate_to_pose action) Args: x, y: 目标位置(世界坐标,米) yaw: 目标朝向(弧度),None 则保持当前朝向 timeout_sec: 超时时间(秒) blocking: 是否阻塞等待导航完成 Returns: blocking=True: 是否成功到达 blocking=False: 是否成功启动导航 """ if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止当前导航") return False if not self._check_nav2_available(): logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动") return False if yaw is None: current = self._get_current_pose() yaw = current[2] logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°") # 启动导航线程 self._cancel_event.clear() self._result_status = None self.status = Nav2Status.NAVIGATING self._nav_thread = threading.Thread( target=self._nav_thread_func, args=(x, y, yaw, timeout_sec), daemon=True ) self._nav_thread.start() if blocking: self._nav_thread.join(timeout=timeout_sec + 10) return self.status == Nav2Status.SUCCEEDED return True def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float): """导航执行线程 — 使用 heredoc 脚本避免 bash 转义问题""" try: logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s") start_time = time.time() elapsed = 0.0 check_interval = 1.0 result_received = False # 使用 pipe 方式运行导航 # 通过 subprocess.Popen + stdin heredoc 避免所有引号转义问题 setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) # 构建 goal YAML(用 bash heredoc 方式避免转义问题) # 写入临时文件 import tempfile goal_yaml = ( f"pose:\n" f" header:\n" f" stamp:\n" f" sec: 0\n" f" nanosec: 0\n" f" frame_id: map\n" f" pose:\n" f" position:\n" f" x: {x}\n" f" y: {y}\n" f" z: 0.0\n" f" orientation:\n" f" x: 0.0\n" f" y: 0.0\n" f" z: {qz}\n" f" w: {qw}\n" f"behavior_tree: ''\n" ) goal_file = "/tmp/nav2_goal_{}.yaml".format(os.getpid()) with open(goal_file, "w") as f: f.write(goal_yaml) cmd = ( f'bash -l -c \'' f'source /opt/ros/humble/setup.bash && ' f'source /home/elephant/agv_pro_ros2/install/setup.bash && ' f'ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose ' f'"$(cat {goal_file})" --feedback\'' ) process = subprocess.Popen( cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout_lines = [] stderr_lines = [] stop_reading = threading.Event() def reader(pipe, lines): for line in iter(pipe.readline, ''): if stop_reading.is_set(): break lines.append(line) # 写入 goal(通过 bash heredoc) # 写入 goal(通过 bash 脚本方式) script_content = ( f'#!/bin/bash\n' f'source /opt/ros/humble/setup.bash\n' f'source /home/elephant/agv_pro_ros2/install/setup.bash\n' f'GOAL=$(cat {goal_file})\n' f'ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose "$GOAL" --feedback\n' ) script_file = "/tmp/nav2_action.sh" with open(script_file, "w") as sf: sf.write(script_content) os.chmod(script_file, 0o755) out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines)) err_thread = threading.Thread(target=reader, args=(process.stderr, stderr_lines)) out_thread.start() err_thread.start() # 用 subprocess.Popen([script_path]) 替代 stdin stop_reading.set() if process.poll() is None: process.terminate() process.wait(timeout=3) act_proc = subprocess.Popen( [script_file], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout_lines2 = [] stderr_lines2 = [] stop_reading2 = threading.Event() def reader2(pipe, lines): for line in iter(pipe.readline, ''): if stop_reading2.is_set(): break lines.append(line) t_out2 = threading.Thread(target=reader2, args=(act_proc.stdout, stdout_lines2)) t_err2 = threading.Thread(target=reader2, args=(act_proc.stderr, stderr_lines2)) t_out2.start() t_err2.start() # 轮询等待 action 完成(每 2s 检查一次结果) result_received = False for _ in range(60): # 最多等 120 秒 time.sleep(2) elapsed = time.time() - start_time # 检查是否有结果 full_out = "".join(stdout_lines2) full_err = "".join(stderr_lines2) combined = full_out + full_err result_idx = combined.find("Result:") if result_idx >= 0: import re status_m = re.search(r'status[\s:]+(\w+)', combined[result_idx:]) if status_m: st = status_m.group(1).lower() if st in ('succeeded', 'SUCCEEDED'): logger.info("✅ Nav2 导航成功到达目标") self._result_status = "succeeded" result_received = True break elif st in ('failed', 'FAILED', 'aborted'): logger.warning(f"⚠️ Nav2 导航失败: status={st}") self._result_status = "failed" result_received = True break elif st in ('canceled', 'cancelled'): logger.info("Nav2 导航被取消") self._result_status = "cancelled" result_received = True break # 检查进程是否已结束但无结果 if act_proc.poll() is not None and not result_received: self._result_status = "failed" break if self._cancel_event.is_set(): break stop_reading2.set() if act_proc.poll() is None: act_proc.terminate() try: act_proc.wait(timeout=3) except subprocess.TimeoutExpired: act_proc.kill() t_out2.join(timeout=2) t_err2.join(timeout=2) stdout_lines = stdout_lines2 stderr_lines = stderr_lines2 # 轮询检查结果 while not self._cancel_event.is_set() and elapsed < timeout_sec: time.sleep(check_interval) elapsed = time.time() - start_time full_out = "".join(stdout_lines) full_err = "".join(stderr_lines) combined = full_out + full_err # 查找 Result 节中的 status 字段 import re result_idx = combined.find("Result:") if result_idx >= 0: status_m = re.search(r'status[\s:]+(\w+)', combined[result_idx:]) if status_m: st = status_m.group(1).lower() if st in ('succeeded', 'SUCCEEDED'): logger.info("✅ Nav2 导航成功到达目标") self._result_status = "succeeded" result_received = True break elif st in ('failed', 'FAILED', 'aborted'): logger.warning(f"⚠️ Nav2 导航失败: status={st}") self._result_status = "failed" result_received = True break elif st in ('canceled', 'cancelled'): logger.info("Nav2 导航被取消") self._result_status = "cancelled" result_received = True break if not result_received: self._result_status = "failed" break # 善后 stop_reading.set() if act_proc.poll() is None: act_proc.terminate() try: act_proc.wait(timeout=3) except subprocess.TimeoutExpired: act_proc.kill() out_thread.join(timeout=2) err_thread.join(timeout=2) if self._cancel_event.is_set() and not result_received: logger.info("取消 Nav2 导航...") cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true\'' subprocess.run(cancel_cmd, shell=True, timeout=3) self._result_status = "cancelled" if self._result_status == "succeeded": self.status = Nav2Status.SUCCEEDED elif self._result_status == "cancelled": self.status = Nav2Status.CANCELLED else: self.status = Nav2Status.FAILED except Exception as e: logger.error(f"Nav2 导航异常: {e}") self.status = Nav2Status.FAILED def navigate_through_poses(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float = 120.0, blocking: bool = True) -> bool: """导航通过多个路径点(Nav2 navigate_through_poses action)""" if self.status == Nav2Status.NAVIGATING: logger.warning("导航正在进行中,请先停止") return False if not self._check_nav2_available(): logger.error("Nav2 action server 不可用") return False if not poses: return True logger.info(f"发送 {len(poses)} 个路径点的导航任务") self._cancel_event.clear() self.status = Nav2Status.NAVIGATING self._nav_thread = threading.Thread( target=self._nav_thread_func_multi, args=(poses, timeout_per_pose), daemon=True ) self._nav_thread.start() if blocking: total_timeout = len(poses) * timeout_per_pose + 30 self._nav_thread.join(timeout=total_timeout) return self.status == Nav2Status.SUCCEEDED return True def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float): """多路径点导航线程""" try: setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" # 构建 poses heredoc poses_lines = [] for i, (x, y, yaw) in enumerate(poses): qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) poses_lines.extend([ f" - pose:", f" header:", f" stamp:", f" sec: 0", f" nanosec: 0", f" frame_id: map", f" pose:", f" position:", f" x: {x}", f" y: {y}", f" z: 0.0", f" orientation:", f" x: 0.0", f" y: 0.0", f" z: {qz}", f" w: {qw}", ]) poses_yaml = "poses:\n" + "\n".join(poses_lines) cmd = f'bash -l -c \'{setup} && ros2 action send_goal /navigate_through_poses nav2_msgs/action/NavigateThroughPoses - --feedback\'' process = subprocess.Popen( cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout_lines = [] stop_reading = threading.Event() def reader(pipe, lines): for line in iter(pipe.readline, ''): if stop_reading.is_set(): break lines.append(line) out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines)) out_thread.start() try: process.stdin.write(poses_yaml + "\n") process.stdin.flush() process.stdin.close() except Exception as e: logger.error(f"写入 poses heredoc 失败: {e}") self.status = Nav2Status.FAILED return result_received = False while not self._cancel_event.is_set(): time.sleep(1.0) full_out = "".join(stdout_lines) full_err = "".join(stderr_lines) combined = full_out + full_err result_idx = combined.find("Result:") if result_idx >= 0: import re status_m = re.search(r'status[\s:]+(\w+)', combined[result_idx:]) if status_m: st = status_m.group(1).lower() if st in ('succeeded', 'SUCCEEDED'): logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点") self._result_status = "succeeded" result_received = True break elif st in ('failed', 'FAILED', 'aborted'): logger.warning(f"⚠️ Nav2 路径点导航失败") self._result_status = "failed" result_received = True break elif st in ('canceled', 'cancelled'): self._result_status = "cancelled" result_received = True break if process.poll() is not None and not result_received: self._result_status = "failed" break stop_reading.set() if process.poll() is None: process.terminate() process.wait(timeout=3) out_thread.join(timeout=2) if self._result_status == "succeeded": self.status = Nav2Status.SUCCEEDED elif self._result_status == "cancelled": self.status = Nav2Status.CANCELLED else: self.status = Nav2Status.FAILED except Exception as e: logger.error(f"Nav2 路径点导航异常: {e}") self.status = Nav2Status.FAILED def stop(self): """取消当前导航""" if self.status == Nav2Status.NAVIGATING: self._cancel_event.set() setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1" cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true\'' subprocess.run(cancel_cmd, shell=True, timeout=3) self.status = Nav2Status.CANCELLED logger.info("导航已停止") def get_status(self) -> Dict: """获取导航状态""" current = self._get_current_pose() return { "status": self.status.value, "current_position": current, "nav2_available": self._check_nav2_available() } def get_current_position(self) -> List[float]: """获取当前位置 [x, y, yaw]""" return self._get_current_pose()