diff --git a/agv_app/app.py b/agv_app/app.py index b215f0d..bd9fb82 100644 --- a/agv_app/app.py +++ b/agv_app/app.py @@ -445,8 +445,15 @@ def api_navigate_cancel(): @app.route("/api/navigate/status", methods=["GET"]) def api_navigate_status(): """获取导航状态""" + # 懒初始化 navigator + if gs.navigator is None: + try: + gs.navigator = Nav2Navigator() + except Exception as e: + logger.warning(f"Nav2Navigator 初始化失败: {e}") if gs.navigator: return jsonify(gs.navigator.get_status()) + # navigator 仍为 None,说明 Nav2 不可用 return jsonify({"status": "idle", "current_position": [0, 0, 0], "nav2_available": False}) diff --git a/agv_app/utils/nav2_navigator.py b/agv_app/utils/nav2_navigator.py index 2fdc0af..91470aa 100644 --- a/agv_app/utils/nav2_navigator.py +++ b/agv_app/utils/nav2_navigator.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) def _run_ros2_bash(cmd: str, timeout: float = 5.0) -> Tuple[int, str, str]: - """执行 ros2 命令(通过 bash -l 避免环境问题)""" + """执行 ros2 命令(直接运行,不二次包装)""" setup = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" full_cmd = f"bash -l -c '{setup} && {cmd}'" try: @@ -90,11 +90,32 @@ class Nav2Navigator: os.chmod(script, 0o755) def _check_nav2_available(self) -> bool: - """检查 Nav2 action server 是否可用""" - rc, out, err = _run_ros2_bash("ros2 action list") - if rc != 0: - return False - return "/navigate_to_pose" in out or "navigate_to_pose" in out + """检查 Nav2 action server 是否可用(带缓存 10 秒)""" + import time as time_mod + if hasattr(self, '_nav2_check_time') and hasattr(self, '_nav2_check_result'): + if time_mod.time() - self._nav2_check_time < 10: + return self._nav2_check_result + # 写入脚本文件执行,兼容 Flask subprocess 环境 + script = ( + "#!/bin/bash\n" + "source /opt/ros/humble/setup.bash 2>/dev/null\n" + "source /home/elephant/agv_pro_ros2/install/setup.bash 2>/dev/null\n" + "ros2 action list 2>&1\n" + ) + script_file = "/tmp/nav2_check.sh" + with open(script_file, "w") as f: + f.write(script) + import os + os.chmod(script_file, 0o755) + r = subprocess.run([script_file], capture_output=True, text=True, timeout=8) + rc = r.returncode + out = r.stdout + err = r.stderr + result = rc == 0 and ("/navigate_to_pose" in out or "navigate_to_pose" in out) + logger.debug(f"_check_nav2_available: rc={rc}, out={out[:100]}, result={result}") + self._nav2_check_result = result + self._nav2_check_time = time_mod.time() + return result def _get_current_pose(self) -> List[float]: """从 /odom 获取当前位置 [x, y, yaw]"""