Files
smart-inspection/agv_app/utils/nav2_navigator.py
T
2026-05-19 15:59:23 +08:00

445 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航
使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action
通过 stdin pipe 发送 goal YAML,避免 bash $() 替换破坏 YAML 格式
"""
import time
import math
import logging
import threading
import subprocess
import os
import re
from typing import List, Tuple, Optional, Dict
from enum import Enum
logger = logging.getLogger(__name__)
def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]:
"""执行 ros2 命令(直接运行,不二次包装)"""
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1"
full_cmd = f"bash -l -c '{setup} && {cmd}'"
try:
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
class Nav2Status(Enum):
IDLE = "idle"
NAVIGATING = "navigating"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
UNKNOWN = "unknown"
class Nav2Navigator:
"""Nav2 导航器 — 通过 ros2 action /navigate_to_pose 与 Nav2 通信"""
def __init__(self):
self.status = Nav2Status.IDLE
self._nav_thread: Optional[threading.Thread] = None
self._cancel_event = threading.Event()
self._result_status = None
self._current_pose = [0.0, 0.0, 0.0]
def _check_nav2_available(self) -> bool:
"""检查 Nav2 action server 是否可用(带缓存 10 秒)"""
import time as time_mod
if hasattr(self, '_nav2_check_time') and hasattr(self, '_nav2_check_result'):
if time_mod.time() - self._nav2_check_time < 10:
return self._nav2_check_result
script = (
"#!/bin/bash\n"
"source /opt/ros/humble/setup.bash 2>/dev/null\n"
"source /home/elephant/agv_pro_ros2/install/setup.bash 2>/dev/null\n"
"export ROS_DOMAIN_ID=1\n"
"ros2 action list 2>&1\n"
)
script_file = "/tmp/nav2_check.sh"
with open(script_file, "w") as f:
f.write(script)
os.chmod(script_file, 0o755)
r = subprocess.run([script_file], capture_output=True, text=True, timeout=8)
result = r.returncode == 0 and ("/navigate_to_pose" in r.stdout or "navigate_to_pose" in r.stdout)
logger.debug(f"_check_nav2_available: rc={r.returncode}, result={result}")
self._nav2_check_result = result
self._nav2_check_time = time_mod.time()
return result
def _get_current_pose(self) -> List[float]:
"""从 /amcl_pose 获取当前位置 [x, y, yaw]"""
try:
rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /amcl_pose --once 2>/dev/null", timeout=4)
if rc == 0 and out:
import yaml
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip()
yaml_str = yaml_str.split('---')[0]
data = yaml.safe_load(yaml_str)
if data:
pos = data.get("pose", {}).get("pose", {}).get("position", {})
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
x = pos.get("x", 0.0)
y = pos.get("y", 0.0)
qz = orient.get("z", 0.0)
qw = orient.get("w", 1.0)
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
self._current_pose = [x, y, yaw]
return self._current_pose
except Exception as e:
logger.debug(f"获取 amcl_pose 失败: {e}")
return self._current_pose
def navigate_to_pose(self, x: float, y: float, yaw: float = None,
timeout_sec: float = 120.0,
blocking: bool = True) -> bool:
"""导航到目标坐标"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止当前导航")
return False
self.status = Nav2Status.IDLE
self._result_status = None
if not self._check_nav2_available():
logger.error("Nav2 action server 不可用")
return False
if yaw is None:
current = self._get_current_pose()
yaw = current[2]
logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
self._cancel_event.clear()
self._result_status = None
self.status = Nav2Status.NAVIGATING
if self._nav_thread and self._nav_thread.is_alive():
self.stop()
self._nav_thread.join(timeout=3)
self._nav_thread = threading.Thread(
target=self._nav_thread_func,
args=(x, y, yaw, timeout_sec),
daemon=True
)
self._nav_thread.start()
if blocking:
self._nav_thread.join(timeout=timeout_sec + 10)
return self.status == Nav2Status.SUCCEEDED
return True
def _parse_result_status(self, combined: str) -> Optional[str]:
"""从 ros2 action send_goal 输出中解析导航结果状态"""
result_idx = combined.find("Result:")
if result_idx < 0:
return None
# 匹配 "status: STATUS_SUCCEEDED" 或 "status: SUCCEEDED" 等格式
status_m = re.search(r'status\s*:\s*(\w+)', combined[result_idx:])
if status_m:
st = status_m.group(1).upper()
if 'SUCCEEDED' in st:
return "succeeded"
elif 'FAILED' in st or 'ABORTED' in st:
return "failed"
elif 'CANCELED' in st or 'CANCELLED' in st:
return "cancelled"
logger.warning(f"未知的导航状态: {st}")
return None
def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float):
"""导航执行线程 — 使用 stdin pipe 发送 goal YAML"""
try:
logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s")
start_time = time.time()
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
goal_yaml = (
f"pose:\n"
f" header:\n"
f" stamp:\n"
f" sec: 0\n"
f" nanosec: 0\n"
f" frame_id: map\n"
f" pose:\n"
f" position:\n"
f" x: {x}\n"
f" y: {y}\n"
f" z: 0.0\n"
f" orientation:\n"
f" x: 0.0\n"
f" y: 0.0\n"
f" z: {qz}\n"
f" w: {qw}\n"
)
# 写入临时文件,用 $(cat file) 方式传递(stdin pipe "-" 在 Humble 有 bug
import tempfile
goal_file = "/tmp/nav2_goal_{}_{}.yaml".format(os.getpid(), int(time.time()))
with open(goal_file, "w") as f:
f.write(goal_yaml)
# 写一个临时脚本执行导航(避免引号嵌套地狱)
script_file = "/tmp/nav2_navigate_{}_{}.sh".format(os.getpid(), int(time.time()))
with open(script_file, "w") as f:
f.write('#!/bin/bash\n')
f.write('source /opt/ros/humble/setup.bash\n')
f.write('source /home/elephant/agv_pro_ros2/install/setup.bash\n')
f.write('export ROS_DOMAIN_ID=1\n')
f.write('ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose "$(cat {})" --feedback\n'.format(goal_file))
os.chmod(script_file, 0o755)
proc = subprocess.Popen(
['bash', '-l', script_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines = []
stderr_lines = []
stop_reading = threading.Event()
def reader(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading.is_set():
break
lines.append(line)
t_out = threading.Thread(target=reader, args=(proc.stdout, stdout_lines))
t_err = threading.Thread(target=reader, args=(proc.stderr, stderr_lines))
t_out.start()
t_err.start()
# 轮询等待 action 完成
result_received = False
max_checks = int(timeout_sec / 2)
for _ in range(max_checks):
time.sleep(2)
elapsed = time.time() - start_time
full_out = "".join(stdout_lines)
full_err = "".join(stderr_lines)
combined = full_out + full_err
# 检查是否有 YAML 解析错误
if "Traceback" in combined and "Goal accepted" not in combined:
logger.error(f"ros2 action send_goal YAML 解析失败: {combined[:500]}")
self._result_status = "failed"
result_received = True
break
# 检查是否未被接受
if "Goal accepted" not in combined and elapsed > 15:
logger.warning(f"导航目标 15 秒内未被接受,输出: {combined[:300]}")
# 解析 Result
parsed = self._parse_result_status(combined)
if parsed:
self._result_status = parsed
result_received = True
if parsed == "succeeded":
logger.info(f"✅ Nav2 导航成功到达目标 ({x:.3f}, {y:.3f})")
elif parsed == "failed":
logger.warning(f"⚠️ Nav2 导航失败")
else:
logger.info("Nav2 导航被取消")
break
# 进程已结束但无 Result
if proc.poll() is not None and not result_received:
logger.error(f"导航进程提前退出,rc={proc.returncode},输出: {combined[:300]}")
self._result_status = "failed"
result_received = True
break
if self._cancel_event.is_set():
break
# 清理
stop_reading.set()
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
t_out.join(timeout=2)
t_err.join(timeout=2)
# 处理取消
if self._cancel_event.is_set() and not result_received:
logger.info("取消 Nav2 导航...")
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1"
cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || true\''
subprocess.run(cancel_cmd, shell=True, timeout=3)
self._result_status = "cancelled"
if self._result_status == "succeeded":
self.status = Nav2Status.SUCCEEDED
elif self._result_status == "cancelled":
self.status = Nav2Status.CANCELLED
else:
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 导航异常: {e}")
self.status = Nav2Status.FAILED
def navigate_through_poses(self, poses: List[Tuple[float, float, float]],
timeout_per_pose: float = 120.0,
blocking: bool = True) -> bool:
"""导航通过多个路径点"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止")
return False
if not self._check_nav2_available():
logger.error("Nav2 action server 不可用")
return False
if not poses:
return True
logger.info(f"发送 {len(poses)} 个路径点的导航任务")
self._cancel_event.clear()
self.status = Nav2Status.NAVIGATING
self._nav_thread = threading.Thread(
target=self._nav_thread_func_multi,
args=(poses, timeout_per_pose),
daemon=True
)
self._nav_thread.start()
if blocking:
total_timeout = len(poses) * timeout_per_pose + 30
self._nav_thread.join(timeout=total_timeout)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float):
"""多路径点导航线程"""
try:
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1"
poses_lines = []
for i, (x, y, yaw) in enumerate(poses):
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
poses_lines.extend([
f" - pose:",
f" header:",
f" stamp:",
f" sec: 0",
f" nanosec: 0",
f" frame_id: map",
f" pose:",
f" position:",
f" x: {x}",
f" y: {y}",
f" z: 0.0",
f" orientation:",
f" x: 0.0",
f" y: 0.0",
f" z: {qz}",
f" w: {qw}",
])
poses_yaml = "poses:\n" + "\n".join(poses_lines)
cmd = f'bash -l -c \'{setup} && ros2 action send_goal /navigate_through_poses nav2_msgs/action/NavigateThroughPoses - --feedback\''
process = subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines = []
stderr_lines = []
stop_reading = threading.Event()
def reader(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading.is_set():
break
lines.append(line)
out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines))
err_thread = threading.Thread(target=reader, args=(process.stderr, stderr_lines))
out_thread.start()
err_thread.start()
try:
process.stdin.write(poses_yaml + "\n")
process.stdin.flush()
process.stdin.close()
except Exception as e:
logger.error(f"写入 poses YAML 失败: {e}")
self.status = Nav2Status.FAILED
return
result_received = False
while not self._cancel_event.is_set():
time.sleep(1.0)
full_out = "".join(stdout_lines)
full_err = "".join(stderr_lines)
combined = full_out + full_err
parsed = self._parse_result_status(combined)
if parsed:
self._result_status = parsed
result_received = True
if parsed == "succeeded":
logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点")
break
if process.poll() is not None and not result_received:
self._result_status = "failed"
result_received = True
break
stop_reading.set()
if process.poll() is None:
process.terminate()
process.wait(timeout=3)
out_thread.join(timeout=2)
err_thread.join(timeout=2)
if self._result_status == "succeeded":
self.status = Nav2Status.SUCCEEDED
elif self._result_status == "cancelled":
self.status = Nav2Status.CANCELLED
else:
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 路径点导航异常: {e}")
self.status = Nav2Status.FAILED
def stop(self):
"""取消当前导航"""
if self.status == Nav2Status.NAVIGATING:
self._cancel_event.set()
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=1"
cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true\''
subprocess.run(cancel_cmd, shell=True, timeout=3)
self.status = Nav2Status.CANCELLED
logger.info("导航已停止")
def get_status(self) -> Dict:
"""获取导航状态"""
current = self._get_current_pose()
return {
"status": self.status.value,
"current_position": current,
"nav2_available": self._check_nav2_available()
}
def get_current_position(self) -> List[float]:
"""获取当前位置 [x, y, yaw]"""
return self._get_current_pose()