Files
smart-inspection/agv_app/utils/nav2_navigator.py
T
2026-05-19 21:25:41 +08:00

351 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Nav2 导航模块 - 使用官方 nav2_simple_commander.BasicNavigator API
参考: https://github.com/elephantrobotics/agv_pro_ros2/blob/humble/agv_pro_navigation2/scripts/example_nav_to_pose.py
"""
import time
import math
import logging
import threading
import subprocess
import os
import re
from typing import List, Tuple, Optional, Dict
from enum import Enum
logger = logging.getLogger(__name__)
# 延迟导入 rclpy 和 nav2_simple_commander(避免在模块加载时初始化 ROS)
_rclpy = None
_BasicNavigator = None
_TaskResult = None
_PoseStamped = None
_Duration = None
def _init_ros():
"""延迟初始化 ROS2 依赖"""
global _rclpy, _BasicNavigator, _TaskResult, _PoseStamped, _Duration
if _rclpy is None:
import rclpy
_rclpy = rclpy
from geometry_msgs.msg import PoseStamped
from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
from rclpy.duration import Duration
_PoseStamped = PoseStamped
_BasicNavigator = BasicNavigator
_TaskResult = TaskResult
_Duration = Duration
def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]:
"""执行 ros2 命令(直接运行,不二次包装)"""
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1"
full_cmd = f"bash -l -c '{setup} && {cmd}'"
try:
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
class Nav2Status(Enum):
IDLE = "idle"
NAVIGATING = "navigating"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
UNKNOWN = "unknown"
class Nav2Navigator:
"""Nav2 导航器 — 使用官方 BasicNavigator API"""
def __init__(self):
self.status = Nav2Status.IDLE
self._nav_thread: Optional[threading.Thread] = None
self._cancel_event = threading.Event()
self._result_status = None
self._current_pose = [0.0, 0.0, 0.0]
self._navigator = None # BasicNavigator 实例
self._rclpy_initialized = False
def _ensure_rclpy(self):
"""确保 rclpy 已初始化(每个线程只能调用一次)"""
_init_ros()
if not self._rclpy_initialized:
if not _rclpy.ok():
_rclpy.init()
self._rclpy_initialized = True
def _get_navigator(self):
"""获取或创建 BasicNavigator 实例"""
self._ensure_rclpy()
if self._navigator is None:
self._navigator = _BasicNavigator()
return self._navigator
def _check_nav2_available(self) -> bool:
"""检查 Nav2 action server 是否可用"""
try:
nav = self._get_navigator()
# BasicNavigator 内部会检查 Nav2 是否活跃
return True
except Exception as e:
logger.debug(f"Nav2 不可用: {e}")
return False
def _get_current_pose(self) -> List[float]:
"""从 /amcl_pose 获取当前位置 [x, y, yaw]"""
try:
rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /amcl_pose --once 2>/dev/null", timeout=4)
if rc == 0 and out:
import yaml
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip()
yaml_str = yaml_str.split('---')[0]
data = yaml.safe_load(yaml_str)
if data:
pos = data.get("pose", {}).get("pose", {}).get("position", {})
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
x = pos.get("x", 0.0)
y = pos.get("y", 0.0)
qz = orient.get("z", 0.0)
qw = orient.get("w", 1.0)
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
self._current_pose = [x, y, yaw]
return self._current_pose
except Exception as e:
logger.debug(f"获取 amcl_pose 失败: {e}")
return self._current_pose
def navigate_to_pose(self, x: float, y: float, yaw: float = None,
timeout_sec: float = 120.0,
blocking: bool = True) -> bool:
"""导航到目标坐标(使用官方 BasicNavigator API"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止当前导航")
return False
self.status = Nav2Status.IDLE
self._result_status = None
if yaw is None:
current = self._get_current_pose()
yaw = current[2]
logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
self._cancel_event.clear()
self._result_status = None
self.status = Nav2Status.NAVIGATING
if self._nav_thread and self._nav_thread.is_alive():
self.stop()
self._nav_thread.join(timeout=3)
self._nav_thread = threading.Thread(
target=self._nav_thread_func,
args=(x, y, yaw, timeout_sec),
daemon=True
)
self._nav_thread.start()
if blocking:
self._nav_thread.join(timeout=timeout_sec + 10)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float):
"""导航执行线程 — 使用官方 BasicNavigator API"""
try:
logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s")
# 在子线程中初始化 rclpy
_init_ros()
if not _rclpy.ok():
_rclpy.init()
navigator = _BasicNavigator()
# 等待 Nav2 激活(可选,如果 Nav2 已启动可跳过)
# navigator.waitUntilNav2Active()
# 构造目标位姿
goal_pose = _PoseStamped()
goal_pose.header.frame_id = 'map'
goal_pose.header.stamp = navigator.get_clock().now().to_msg()
goal_pose.pose.position.x = x
goal_pose.pose.position.y = y
goal_pose.pose.position.z = 0.0
# 四元数(仅 z, w
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
goal_pose.pose.orientation.x = 0.0
goal_pose.pose.orientation.y = 0.0
goal_pose.pose.orientation.z = qz
goal_pose.pose.orientation.w = qw
# 发送导航目标
navigator.goToPose(goal_pose)
logger.info("导航目标已发送,等待完成...")
# 等待导航完成
start_time = time.time()
while not navigator.isTaskComplete():
if self._cancel_event.is_set():
navigator.cancelTask()
logger.info("导航被取消")
self._result_status = "cancelled"
break
elapsed = time.time() - start_time
if elapsed > timeout_sec:
logger.warning(f"导航超时 ({timeout_sec}s),取消任务")
navigator.cancelTask()
self._result_status = "failed"
break
# 获取反馈(可选)
feedback = navigator.getFeedback()
if feedback:
remaining = _Duration.from_msg(feedback.estimated_time_remaining).nanoseconds / 1e9
logger.debug(f"预计剩余时间: {remaining:.1f}s")
time.sleep(0.5)
# 获取结果
result = navigator.getResult()
if result == _TaskResult.SUCCEEDED:
logger.info(f"✅ Nav2 导航成功到达目标 ({x:.3f}, {y:.3f})")
self._result_status = "succeeded"
self.status = Nav2Status.SUCCEEDED
elif result == _TaskResult.CANCELED:
logger.info("导航被取消")
self._result_status = "cancelled"
self.status = Nav2Status.CANCELLED
elif result == _TaskResult.FAILED:
logger.warning("⚠️ Nav2 导航失败")
self._result_status = "failed"
self.status = Nav2Status.FAILED
else:
logger.warning(f"导航返回未知状态: {result}")
self._result_status = "failed"
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 导航异常: {e}", exc_info=True)
self.status = Nav2Status.FAILED
self._result_status = "failed"
def navigate_through_poses(self, poses: List[Tuple[float, float, float]],
timeout_per_pose: float = 120.0,
blocking: bool = True) -> bool:
"""导航通过多个路径点"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止")
return False
if not poses:
return True
logger.info(f"发送 {len(poses)} 个路径点的导航任务")
self._cancel_event.clear()
self.status = Nav2Status.NAVIGATING
self._nav_thread = threading.Thread(
target=self._nav_thread_func_multi,
args=(poses, timeout_per_pose),
daemon=True
)
self._nav_thread.start()
if blocking:
total_timeout = len(poses) * timeout_per_pose + 30
self._nav_thread.join(timeout=total_timeout)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float):
"""多路径点导航线程"""
try:
_init_ros()
if not _rclpy.ok():
_rclpy.init()
navigator = _BasicNavigator()
# 构造路径点列表
goal_poses = []
for x, y, yaw in poses:
pose = _PoseStamped()
pose.header.frame_id = 'map'
pose.header.stamp = navigator.get_clock().now().to_msg()
pose.pose.position.x = x
pose.pose.position.y = y
pose.pose.position.z = 0.0
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
pose.pose.orientation.x = 0.0
pose.pose.orientation.y = 0.0
pose.pose.orientation.z = qz
pose.pose.orientation.w = qw
goal_poses.append(pose)
navigator.goThroughPoses(goal_poses)
logger.info(f"已发送 {len(poses)} 个路径点,等待完成...")
start_time = time.time()
total_timeout = len(poses) * timeout_per_pose
while not navigator.isTaskComplete():
if self._cancel_event.is_set():
navigator.cancelTask()
self._result_status = "cancelled"
break
elapsed = time.time() - start_time
if elapsed > total_timeout:
navigator.cancelTask()
self._result_status = "failed"
break
time.sleep(0.5)
result = navigator.getResult()
if result == _TaskResult.SUCCEEDED:
logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点")
self._result_status = "succeeded"
self.status = Nav2Status.SUCCEEDED
elif result == _TaskResult.CANCELED:
self._result_status = "cancelled"
self.status = Nav2Status.CANCELLED
else:
self._result_status = "failed"
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 路径点导航异常: {e}", exc_info=True)
self.status = Nav2Status.FAILED
def stop(self):
"""取消当前导航"""
if self.status == Nav2Status.NAVIGATING:
self._cancel_event.set()
self.status = Nav2Status.CANCELLED
logger.info("导航已停止")
def get_status(self) -> Dict:
"""获取导航状态"""
current = self._get_current_pose()
nav2_available = self._check_nav2_available()
return {
"status": self.status.value,
"current_position": current,
"nav2_available": nav2_available
}
def get_current_position(self) -> List[float]:
"""获取当前位置 [x, y, yaw]"""
return self._get_current_pose()