Files
smart-inspection/agv_app/utils/nav2_navigator.py
T
2026-05-16 23:47:02 +08:00

576 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Nav2 导航模块 - 通过 ROS2 Action Server 控制 AGV 导航
使用 Nav2 的 navigate_to_pose 和 navigate_through_poses action
通过写入临时 shell 脚本再执行,避免 bash -c 单引号转义问题
"""
import time
import math
import logging
import threading
import subprocess
import os
from typing import List, Tuple, Optional, Dict
from enum import Enum
logger = logging.getLogger(__name__)
def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]:
"""执行 ros2 命令(直接运行,不二次包装)"""
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
full_cmd = f"bash -l -c '{setup} && {cmd}'"
try:
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
class Nav2Status(Enum):
IDLE = "idle"
NAVIGATING = "navigating"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
UNKNOWN = "unknown"
class Nav2Navigator:
"""Nav2 导航器 — 通过 ros2 action /navigate_to_pose 与 Nav2 通信"""
NAVIGATE_SCRIPT = "/tmp/nav2_send_goal.sh"
def __init__(self):
self.status = Nav2Status.IDLE
self._nav_thread: Optional[threading.Thread] = None
self._cancel_event = threading.Event()
self._result_status = None
self._current_pose = [0.0, 0.0, 0.0]
self._build_script()
def _build_script(self):
"""生成 Nav2 action 发送脚本(通过 heredoc 避免所有 bash 转义问题)"""
script = self.NAVIGATE_SCRIPT
content = [
"#!/bin/bash",
"source /opt/ros/humble/setup.bash",
"source /home/elephant/agv_pro_ros2/install/setup.bash",
"",
"TARGET_X=${1:-0}",
"TARGET_Y=${2:-0}",
"TARGET_YAW=${3:-0}",
"",
"# 计算四元数",
"YAW_QZ=$(python3 -c \"import math; print(math.sin($TARGET_YAW / 2.0))\")",
"YAW_QW=$(python3 -c \"import math; print(math.cos($TARGET_YAW / 2.0))\")",
"",
"# 通过 stdin pipe 发送 goal",
'cat << \'GOAL\' | ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose - --feedback',
"pose:",
" header:",
" stamp:",
" sec: 0",
" nanosec: 0",
" frame_id: map",
" pose:",
" position:",
" x: $TARGET_X",
" y: $TARGET_Y",
" z: 0.0",
" orientation:",
" x: 0.0",
" y: 0.0",
" z: $YAW_QZ",
" w: $YAW_QW",
"GOAL",
]
with open(script, "w") as f:
f.write("\n".join(content) + "\n")
os.chmod(script, 0o755)
def _check_nav2_available(self) -> bool:
"""检查 Nav2 action server 是否可用(带缓存 10 秒)"""
import time as time_mod
if hasattr(self, '_nav2_check_time') and hasattr(self, '_nav2_check_result'):
if time_mod.time() - self._nav2_check_time < 10:
return self._nav2_check_result
# 写入脚本文件执行,兼容 Flask subprocess 环境
script = (
"#!/bin/bash\n"
"source /opt/ros/humble/setup.bash 2>/dev/null\n"
"source /home/elephant/agv_pro_ros2/install/setup.bash 2>/dev/null\n"
"ros2 action list 2>&1\n"
)
script_file = "/tmp/nav2_check.sh"
with open(script_file, "w") as f:
f.write(script)
import os
os.chmod(script_file, 0o755)
r = subprocess.run([script_file], capture_output=True, text=True, timeout=8)
rc = r.returncode
out = r.stdout
err = r.stderr
result = rc == 0 and ("/navigate_to_pose" in out or "navigate_to_pose" in out)
logger.debug(f"_check_nav2_available: rc={rc}, out={out[:100]}, result={result}")
self._nav2_check_result = result
self._nav2_check_time = time_mod.time()
return result
def _get_current_pose(self) -> List[float]:
"""从 /odom 获取当前位置 [x, y, yaw]"""
try:
rc, out, err = _run_ros2_bash("timeout 3 ros2 topic echo /odom --once 2>/dev/null", timeout=4)
if rc == 0 and out:
import re, yaml
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip()
yaml_str = yaml_str.split('---')[0]
data = yaml.safe_load(yaml_str)
if data:
pos = data.get("pose", {}).get("pose", {}).get("position", {})
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
x = pos.get("x", 0.0)
y = pos.get("y", 0.0)
qz = orient.get("z", 0.0)
qw = orient.get("w", 1.0)
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
self._current_pose = [x, y, yaw]
return self._current_pose
except Exception as e:
logger.debug(f"获取 odom 失败: {e}")
return self._current_pose
def navigate_to_pose(self, x: float, y: float, yaw: float = None,
timeout_sec: float = 120.0,
blocking: bool = True) -> bool:
"""
导航到目标坐标(Nav2 navigate_to_pose action
Args:
x, y: 目标位置(世界坐标,米)
yaw: 目标朝向(弧度),None 则保持当前朝向
timeout_sec: 超时时间(秒)
blocking: 是否阻塞等待导航完成
Returns:
blocking=True: 是否成功到达
blocking=False: 是否成功启动导航
"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止当前导航")
return False
if not self._check_nav2_available():
logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动")
return False
if yaw is None:
current = self._get_current_pose()
yaw = current[2]
logger.info(f"发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
# 启动导航线程
self._cancel_event.clear()
self._result_status = None
self.status = Nav2Status.NAVIGATING
self._nav_thread = threading.Thread(
target=self._nav_thread_func,
args=(x, y, yaw, timeout_sec),
daemon=True
)
self._nav_thread.start()
if blocking:
self._nav_thread.join(timeout=timeout_sec + 10)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func(self, x: float, y: float, yaw: float, timeout_sec: float):
"""导航执行线程 — 使用 heredoc 脚本避免 bash 转义问题"""
try:
logger.info(f"Nav2 线程启动: x={x}, y={y}, yaw={yaw}, timeout={timeout_sec}s")
start_time = time.time()
elapsed = 0.0
check_interval = 1.0
result_received = False
# 使用 pipe 方式运行导航
# 通过 subprocess.Popen + stdin heredoc 避免所有引号转义问题
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
# 构建 goal YAML(用 bash heredoc 方式避免转义问题)
# 写入临时文件
import tempfile
goal_yaml = (
f"pose:\n"
f" header:\n"
f" stamp:\n"
f" sec: 0\n"
f" nanosec: 0\n"
f" frame_id: map\n"
f" pose:\n"
f" position:\n"
f" x: {x}\n"
f" y: {y}\n"
f" z: 0.0\n"
f" orientation:\n"
f" x: 0.0\n"
f" y: 0.0\n"
f" z: {qz}\n"
f" w: {qw}\n"
f"behavior_tree: ''\n"
)
goal_file = "/tmp/nav2_goal_{}.yaml".format(os.getpid())
with open(goal_file, "w") as f:
f.write(goal_yaml)
cmd = (
f'bash -l -c \''
f'source /opt/ros/humble/setup.bash && '
f'source /home/elephant/agv_pro_ros2/install/setup.bash && '
f'ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose '
f'"$(cat {goal_file})" --feedback\''
)
process = subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines = []
stderr_lines = []
stop_reading = threading.Event()
def reader(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading.is_set():
break
lines.append(line)
# 写入 goal(通过 bash heredoc
# 写入 goal(通过 bash 脚本方式)
script_content = (
f'#!/bin/bash\n'
f'source /opt/ros/humble/setup.bash\n'
f'source /home/elephant/agv_pro_ros2/install/setup.bash\n'
f'GOAL=$(cat {goal_file})\n'
f'ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose "$GOAL" --feedback\n'
)
script_file = "/tmp/nav2_action.sh"
with open(script_file, "w") as sf:
sf.write(script_content)
os.chmod(script_file, 0o755)
out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines))
err_thread = threading.Thread(target=reader, args=(process.stderr, stderr_lines))
out_thread.start()
err_thread.start()
# 用 subprocess.Popen([script_path]) 替代 stdin
stop_reading.set()
if process.poll() is None:
process.terminate()
process.wait(timeout=3)
act_proc = subprocess.Popen(
[script_file],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines2 = []
stderr_lines2 = []
stop_reading2 = threading.Event()
def reader2(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading2.is_set():
break
lines.append(line)
t_out2 = threading.Thread(target=reader2, args=(act_proc.stdout, stdout_lines2))
t_err2 = threading.Thread(target=reader2, args=(act_proc.stderr, stderr_lines2))
t_out2.start()
t_err2.start()
# 轮询等待 action 完成(每 2s 检查一次结果)
result_received = False
for _ in range(60): # 最多等 120 秒
time.sleep(2)
elapsed = time.time() - start_time
# 检查是否有结果
full_out = "".join(stdout_lines2)
full_err = "".join(stderr_lines2)
combined = full_out + full_err
result_idx = combined.find("Result:")
if result_idx >= 0:
import re
status_m = re.search(r'status[\s:]+(\w+)', combined[result_idx:])
if status_m:
st = status_m.group(1).lower()
if st in ('succeeded', 'SUCCEEDED'):
logger.info("✅ Nav2 导航成功到达目标")
self._result_status = "succeeded"
result_received = True
break
elif st in ('failed', 'FAILED', 'aborted'):
logger.warning(f"⚠️ Nav2 导航失败: status={st}")
self._result_status = "failed"
result_received = True
break
elif st in ('canceled', 'cancelled'):
logger.info("Nav2 导航被取消")
self._result_status = "cancelled"
result_received = True
break
# 检查进程是否已结束但无结果
if act_proc.poll() is not None and not result_received:
self._result_status = "failed"
break
if self._cancel_event.is_set():
break
stop_reading2.set()
if act_proc.poll() is None:
act_proc.terminate()
try:
act_proc.wait(timeout=3)
except subprocess.TimeoutExpired:
act_proc.kill()
t_out2.join(timeout=2)
t_err2.join(timeout=2)
stdout_lines = stdout_lines2
stderr_lines = stderr_lines2
# 轮询检查结果
while not self._cancel_event.is_set() and elapsed < timeout_sec:
time.sleep(check_interval)
elapsed = time.time() - start_time
full_out = "".join(stdout_lines)
full_err = "".join(stderr_lines)
combined = full_out + full_err
# 查找 Result 节中的 status 字段
import re
result_idx = combined.find("Result:")
if result_idx >= 0:
status_m = re.search(r'status[\s:]+(\w+)', combined[result_idx:])
if status_m:
st = status_m.group(1).lower()
if st in ('succeeded', 'SUCCEEDED'):
logger.info("✅ Nav2 导航成功到达目标")
self._result_status = "succeeded"
result_received = True
break
elif st in ('failed', 'FAILED', 'aborted'):
logger.warning(f"⚠️ Nav2 导航失败: status={st}")
self._result_status = "failed"
result_received = True
break
elif st in ('canceled', 'cancelled'):
logger.info("Nav2 导航被取消")
self._result_status = "cancelled"
result_received = True
break
if not result_received:
self._result_status = "failed"
break
# 善后
stop_reading.set()
if act_proc.poll() is None:
act_proc.terminate()
try:
act_proc.wait(timeout=3)
except subprocess.TimeoutExpired:
act_proc.kill()
out_thread.join(timeout=2)
err_thread.join(timeout=2)
if self._cancel_event.is_set() and not result_received:
logger.info("取消 Nav2 导航...")
cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true\''
subprocess.run(cancel_cmd, shell=True, timeout=3)
self._result_status = "cancelled"
if self._result_status == "succeeded":
self.status = Nav2Status.SUCCEEDED
elif self._result_status == "cancelled":
self.status = Nav2Status.CANCELLED
else:
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 导航异常: {e}")
self.status = Nav2Status.FAILED
def navigate_through_poses(self, poses: List[Tuple[float, float, float]],
timeout_per_pose: float = 120.0,
blocking: bool = True) -> bool:
"""导航通过多个路径点(Nav2 navigate_through_poses action"""
if self.status == Nav2Status.NAVIGATING:
logger.warning("导航正在进行中,请先停止")
return False
if not self._check_nav2_available():
logger.error("Nav2 action server 不可用")
return False
if not poses:
return True
logger.info(f"发送 {len(poses)} 个路径点的导航任务")
self._cancel_event.clear()
self.status = Nav2Status.NAVIGATING
self._nav_thread = threading.Thread(
target=self._nav_thread_func_multi,
args=(poses, timeout_per_pose),
daemon=True
)
self._nav_thread.start()
if blocking:
total_timeout = len(poses) * timeout_per_pose + 30
self._nav_thread.join(timeout=total_timeout)
return self.status == Nav2Status.SUCCEEDED
return True
def _nav_thread_func_multi(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float):
"""多路径点导航线程"""
try:
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
# 构建 poses heredoc
poses_lines = []
for i, (x, y, yaw) in enumerate(poses):
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
poses_lines.extend([
f" - pose:",
f" header:",
f" stamp:",
f" sec: 0",
f" nanosec: 0",
f" frame_id: map",
f" pose:",
f" position:",
f" x: {x}",
f" y: {y}",
f" z: 0.0",
f" orientation:",
f" x: 0.0",
f" y: 0.0",
f" z: {qz}",
f" w: {qw}",
])
poses_yaml = "poses:\n" + "\n".join(poses_lines)
cmd = f'bash -l -c \'{setup} && ros2 action send_goal /navigate_through_poses nav2_msgs/action/NavigateThroughPoses - --feedback\''
process = subprocess.Popen(
cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout_lines = []
stop_reading = threading.Event()
def reader(pipe, lines):
for line in iter(pipe.readline, ''):
if stop_reading.is_set():
break
lines.append(line)
out_thread = threading.Thread(target=reader, args=(process.stdout, stdout_lines))
out_thread.start()
try:
process.stdin.write(poses_yaml + "\n")
process.stdin.flush()
process.stdin.close()
except Exception as e:
logger.error(f"写入 poses heredoc 失败: {e}")
self.status = Nav2Status.FAILED
return
result_received = False
while not self._cancel_event.is_set():
time.sleep(1.0)
full_out = "".join(stdout_lines)
full_err = "".join(stderr_lines)
combined = full_out + full_err
result_idx = combined.find("Result:")
if result_idx >= 0:
import re
status_m = re.search(r'status[\s:]+(\w+)', combined[result_idx:])
if status_m:
st = status_m.group(1).lower()
if st in ('succeeded', 'SUCCEEDED'):
logger.info(f"✅ Nav2 路径点导航成功完成 {len(poses)} 个点")
self._result_status = "succeeded"
result_received = True
break
elif st in ('failed', 'FAILED', 'aborted'):
logger.warning(f"⚠️ Nav2 路径点导航失败")
self._result_status = "failed"
result_received = True
break
elif st in ('canceled', 'cancelled'):
self._result_status = "cancelled"
result_received = True
break
if process.poll() is not None and not result_received:
self._result_status = "failed"
break
stop_reading.set()
if process.poll() is None:
process.terminate()
process.wait(timeout=3)
out_thread.join(timeout=2)
if self._result_status == "succeeded":
self.status = Nav2Status.SUCCEEDED
elif self._result_status == "cancelled":
self.status = Nav2Status.CANCELLED
else:
self.status = Nav2Status.FAILED
except Exception as e:
logger.error(f"Nav2 路径点导航异常: {e}")
self.status = Nav2Status.FAILED
def stop(self):
"""取消当前导航"""
if self.status == Nav2Status.NAVIGATING:
self._cancel_event.set()
setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash"
cancel_cmd = f'bash -l -c \'{setup} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true\''
subprocess.run(cancel_cmd, shell=True, timeout=3)
self.status = Nav2Status.CANCELLED
logger.info("导航已停止")
def get_status(self) -> Dict:
"""获取导航状态"""
current = self._get_current_pose()
return {
"status": self.status.value,
"current_position": current,
"nav2_available": self._check_nav2_available()
}
def get_current_position(self) -> List[float]:
"""获取当前位置 [x, y, yaw]"""
return self._get_current_pose()