Files
smart-inspection/agv_app/utils/nav2_navigator.py
T
ywb 4e3cb4d3ef Nav2导航 heredoc pipe方式修复 + 重启脚本更新
nav2_navigator.py: 改用 subprocess.Popen + stdin heredoc 避免bash单引号转义
restart_agv.sh: 清理所有旧进程再重启(bringup → Nav2 → Flask)
app.py: init_pose改用 heredoc 子进程脚本方式
2026-05-16 19:18:18 +08:00

442 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航
使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action
通过写入临时 shell 脚本再执行,避免 bash -c 单引号转义问题
"""
import time
import math
import logging
import threading
import subprocess
import os
from typing import List, Tuple, Optional, Dict
from enum import Enum
logger = logging.getLogger(__name__)
def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]:
"""执行 ros2 命令(通过 bash -l 避免环境问题)"""
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
full_cmd = f"bash -l -c '{setup} && {cmd}'"
try:
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
class Nav2Status(Enum):
IDLE = "idle"
NAVIGATING = "navigating"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
UNKNOWN = "unknown"
class Nav2Navigator:
"""Nav2 导航器 — 通过 ros2 action /navigate_to_pose 与 Nav2 通信"""
NAVIGATE_SCRIPT = "/tmp/nav2_send_goal.sh"
def __init__(self):
self.status = Nav2Status.IDLE
self._nav_thread: Optional[threading.Thread] = None
self._cancel_event = threading.Event()
self._result_status = None
self._current_pose = [0.0, 0.0, 0.0]
self._build_script()
def _build_script(self):
"""生成 Nav2 action 发送脚本(通过 heredoc 避免所有 bash 转义问题)"""
script = self.NAVIGATE_SCRIPT
content = [
"#!/bin/bash",
"source /opt/ros/humble/setup.bash",
"source /home/elephant/agv_pro_ros2/install/setup.bash",
"",
"TARGET_X=${1:-0}",
"TARGET_Y=${2:-0}",
"TARGET_YAW=${3:-0}",
"",
"# 计算四元数",
"YAW_QZ=$(python3 -c \"import math; print(math.sin($TARGET_YAW / 2.0))\")",
"YAW_QW=$(python3 -c \"import math; print(math.cos($TARGET_YAW / 2.0))\")",
"",
"# 通过 stdin pipe 发送 goal",
'cat << \'GOAL\' | ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose - --feedback',
"pose:",
" header:",
" stamp:",
" sec: 0",
" nanosec: 0",
" frame_id: map",
" pose:",
" position:",
" x: $TARGET_X",
" y: $TARGET_Y",
" z: 0.0",
" orientation:",
" x: 0.0",
" y: 0.0",
" z: $YAW_QZ",
" w: $YAW_QW",
"GOAL",
]
with open(script, "w") as f:
f.write("\n".join(content) + "\n")
os.chmod(script, 0o755)
def _check_nav2_available(self) -> bool:
"""检查 Nav2 action server 是否可用"""
rc, out, err = _run_ros2_bash("ros2 action list")
if rc != 0:
return False
return "/navigate_to_pose" in out or "navigate_to_pose" in out
def _get_current_pose(self) -> List[float]:
"""从 /odom 获取当前位置 [x, y, yaw]"""
try:
rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /odom --once 2>/dev/null", timeout=4)
if rc == 0 and out:
import re, yaml
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip()
yaml_str = yaml_str.split('---')[0]
data = yaml.safe_load(yaml_str)
if data:
pos = data.get("pose", {}).get("pose", {}).get("position", {})
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
x = pos.get("x", 0.0)
y = pos.get("y", 0.0)
qz = orient.get("z", 0.0)
qw = orient.get("w", 1.0)
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
self._current_pose = [x, y, yaw]
return self._current_pose
except Exception as e:
logger.debug(f"获取 odom 失败: {e}")
return self._current_pose
def navigate_to_pose(self, x: float, y: float, yaw: float = None,
timeout_sec: float = 120.0,
blocking: bool = True) -> bool:
"""
导航到目标坐标(Nav2 navigate_to_pose action
Args:
x, y: 目标位置(世界坐标,米)
yaw: 目标朝向(弧度),None 则保持当前朝向
timeout_sec: 超时时间(秒)
blocking: 是否阻塞等待导航完成
Returns:
blocking=True: 是否成功到达
blocking=False: 是否成功启动导航
"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止当前导航")
return False
if not self._check_nav2_available():
logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动")
return False
if yaw is None:
current = self._get_current_pose()
yaw = current[2]
logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
# 启动导航线程
self._cancel_event.clear()
self._result_status = None
self.status = Nav2Status.NAVIGATING
self._nav_thread = threading.Thread(
target=self._nav_thread_func,
args=(x, y, yaw, timeout_sec),
daemon=True
)
self._nav_thread.start()
if blocking:
self._nav_thread.join(timeout=timeout_sec + 10)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float):
"""导航执行线程 — 使用 heredoc 脚本避免 bash 转义问题"""
try:
logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s")
start_time = time.time()
elapsed = 0.0
check_interval = 1.0
result_received = False
# 使用 pipe 方式运行导航
# 通过 subprocess.Popen + stdin heredoc 避免所有引号转义问题
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
# 构建 heredoc 内容
heredoc = (
f"pose:\n"
f" header:\n"
f" stamp:\n"
f" sec: 0\n"
f" nanosec: 0\n"
f" frame_id: map\n"
f" pose:\n"
f" position:\n"
f" x: {x}\n"
f" y: {y}\n"
f" z: 0.0\n"
f" orientation:\n"
f" x: 0.0\n"
f" y: 0.0\n"
f" z: {qz}\n"
f" w: {qw}\n"
)
cmd = f'{setup} && ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose - --feedback'
process = subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines = []
stderr_lines = []
stop_reading = threading.Event()
def reader(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading.is_set():
break
lines.append(line)
out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines))
err_thread = threading.Thread(target=reader, args=(process.stderr, stderr_lines))
out_thread.start()
err_thread.start()
# 写入 heredoc 数据
try:
process.stdin.write(heredoc + "\n")
process.stdin.flush()
process.stdin.close()
except Exception as e:
logger.error(f"写入 heredoc 失败: {e}")
# 轮询检查结果
while not self._cancel_event.is_set() and elapsed < timeout_sec:
time.sleep(check_interval)
elapsed = time.time() - start_time
full_out = "".join(stdout_lines)
if "succeeded" in full_out.lower():
logger.info("✅ Nav2 导航成功到达目标")
self._result_status = "succeeded"
result_received = True
break
if any(k in full_out.lower() for k in ["failed", "aborted"]):
logger.warning(f"⚠️ Nav2 导航失败: {full_out[-200:]}")
self._result_status = "failed"
result_received = True
break
if any(k in full_out.lower() for k in ["canceled", "cancelled"]):
logger.info("Nav2 导航被取消")
self._result_status = "cancelled"
result_received = True
break
# 检查进程是否结束但没读到结果
if process.poll() is not None and not result_received:
if full_out:
logger.warning(f"Nav2 进程意外结束,输出: {full_out[:200]}")
self._result_status = "failed"
break
# 善后
stop_reading.set()
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
out_thread.join(timeout=2)
err_thread.join(timeout=2)
if self._cancel_event.is_set() and not result_received:
logger.info("取消 Nav2 导航...")
cancel_cmd = f"bash -l -c '{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'"
subprocess.run(cancel_cmd, shell=True, timeout=3)
self._result_status = "cancelled"
if self._result_status == "succeeded":
self.status = Nav2Status.SUCCEEDED
elif self._result_status == "cancelled":
self.status = Nav2Status.CANCELLED
else:
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 导航异常: {e}")
self.status = Nav2Status.FAILED
def navigate_through_poses(self, poses: List[Tuple[float, float, float]],
timeout_per_pose: float = 120.0,
blocking: bool = True) -> bool:
"""导航通过多个路径点(Nav2 navigate_through_poses action"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止")
return False
if not self._check_nav2_available():
logger.error("Nav2 action server 不可用")
return False
if not poses:
return True
logger.info(f"发送 {len(poses)} 个路径点的导航任务")
self._cancel_event.clear()
self.status = Nav2Status.NAVIGATING
self._nav_thread = threading.Thread(
target=self._nav_thread_func_multi,
args=(poses, timeout_per_pose),
daemon=True
)
self._nav_thread.start()
if blocking:
total_timeout = len(poses) * timeout_per_pose + 30
self._nav_thread.join(timeout=total_timeout)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float):
"""多路径点导航线程"""
try:
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
# 构建 poses heredoc
poses_lines = []
for i, (x, y, yaw) in enumerate(poses):
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
poses_lines.extend([
f" - pose:",
f" header:",
f" stamp:",
f" sec: 0",
f" nanosec: 0",
f" frame_id: map",
f" pose:",
f" position:",
f" x: {x}",
f" y: {y}",
f" z: 0.0",
f" orientation:",
f" x: 0.0",
f" y: 0.0",
f" z: {qz}",
f" w: {qw}",
])
poses_yaml = "poses:\n" + "\n".join(poses_lines)
cmd = f'{setup} && ros2 action send_goal /navigate_through_poses nav2_msgs/action/NavigateThroughPoses - --feedback'
process = subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines = []
stop_reading = threading.Event()
def reader(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading.is_set():
break
lines.append(line)
out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines))
out_thread.start()
try:
process.stdin.write(poses_yaml + "\n")
process.stdin.flush()
process.stdin.close()
except Exception as e:
logger.error(f"写入 poses heredoc 失败: {e}")
self.status = Nav2Status.FAILED
return
result_received = False
while not self._cancel_event.is_set():
time.sleep(1.0)
full_out = "".join(stdout_lines)
if "succeeded" in full_out.lower():
logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点")
self._result_status = "succeeded"
result_received = True
break
if any(k in full_out.lower() for k in ["failed", "aborted"]):
logger.warning(f"⚠️ Nav2 路径点导航失败")
self._result_status = "failed"
result_received = True
break
if any(k in full_out.lower() for k in ["canceled", "cancelled"]):
self._result_status = "cancelled"
result_received = True
break
if process.poll() is not None and not result_received:
self._result_status = "failed"
break
stop_reading.set()
if process.poll() is None:
process.terminate()
process.wait(timeout=3)
out_thread.join(timeout=2)
if self._result_status == "succeeded":
self.status = Nav2Status.SUCCEEDED
elif self._result_status == "cancelled":
self.status = Nav2Status.CANCELLED
else:
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 路径点导航异常: {e}")
self.status = Nav2Status.FAILED
def stop(self):
"""取消当前导航"""
if self.status == Nav2Status.NAVIGATING:
self._cancel_event.set()
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
cancel_cmd = f"bash -l -c '{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'"
subprocess.run(cancel_cmd, shell=True, timeout=3)
self.status = Nav2Status.CANCELLED
logger.info("导航已停止")
def get_status(self) -> Dict:
"""获取导航状态"""
current = self._get_current_pose()
return {
"status": self.status.value,
"current_position": current,
"nav2_available": self._check_nav2_available()
}
def get_current_position(self) -> List[float]:
"""获取当前位置 [x, y, yaw]"""
return self._get_current_pose()