From 1dc439dc8ec4f68f8bd8608b8b4c17b085f3b1bb Mon Sep 17 00:00:00 2001 From: ywb <347742090@qq.com> Date: Fri, 15 May 2026 23:44:21 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20AGV=E4=BD=8D=E7=BD=AE=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=20-=20ROS=5FDOMAIN=5FID=E7=8E=AF=E5=A2=83?= =?UTF-8?q?=E5=8F=98=E9=87=8F=E7=BC=BA=E5=A4=B1=E3=80=81odom=E8=BE=93?= =?UTF-8?q?=E5=87=BA=E7=AE=A1=E9=81=93=E9=94=99=E8=AF=AF=E3=80=81ANSI?= =?UTF-8?q?=E8=BD=AC=E4=B9=89=E5=BA=8F=E5=88=97=E6=9C=AA=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题根因: 1. ros2 daemon默认运行在domain 0,AGV节点在domain 1,导致无法通信 2. _run_ros2_cmd通过daemon通信,daemon忽略了shell的ROS_DOMAIN_ID环境变量 3. 修复: 在bash -c内联设置export ROS_DOMAIN_ID=1 4. 修复: 所有ros2 topic echo命令添加--once参数确保只输出一次后退出 5. 修复: 清理ANSI转义序列(\x1b)和RTPS错误信息前缀,避免YAML解析失败 6. 修复: 添加source /opt/ros/humble/setup.bash确保ros2命令可用 7. 修复: get_position和get_battery统一使用_run_ros2_cmd而非独立subprocess调用 --- agv_app/utils/agv_controller_ros2.py | 43 ++++++++++++---------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/agv_app/utils/agv_controller_ros2.py b/agv_app/utils/agv_controller_ros2.py index e2ea2c4..82e2899 100644 --- a/agv_app/utils/agv_controller_ros2.py +++ b/agv_app/utils/agv_controller_ros2.py @@ -12,7 +12,7 @@ from typing import Tuple, Optional, List logger = logging.getLogger(__name__) # ROS2 环境设置 -ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source ~/agv_pro_ros2/install/setup.bash" +ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" class AGVController: @@ -28,7 +28,7 @@ class AGVController: def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: """执行 ros2 命令""" - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + full_cmd = f"bash -c 'export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash && {cmd}'" try: result = subprocess.run( full_cmd, @@ -64,7 +64,7 @@ class AGVController: # 尝试获取一次位置数据 rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /odom 2>timeout 10 ros2 topic echo /odom --once 2>/dev/null1 | head -1", + "timeout 5 ros2 topic echo /odom --once 2>/dev/null", timeout=6 ) @@ -89,23 +89,10 @@ class AGVController: def _publish_cmd_vel(self, linear_x: float = 0.0, linear_y: float = 0.0, angular_z: float = 0.0): """发布速度命令到 /cmd_vel""" - # 直接执行,避免引号嵌套问题 msg = f'{{"linear": {{"x": {linear_x}, "y": {linear_y}, "z": 0.0}}, "angular": {{"x": 0.0, "y": 0.0, "z": {angular_z}}}}}' - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \"{msg}\" --once'" - try: - result = subprocess.run( - full_cmd, - shell=True, - capture_output=True, - text=True, - timeout=5 - ) - if result.returncode != 0: - logger.warning(f"发布 cmd_vel 失败: {result.stderr.strip()}") - except subprocess.TimeoutExpired: - logger.warning("发布 cmd_vel 超时") - except Exception as e: - logger.warning(f"发布 cmd_vel 失败: {e}") + rc, out, err = self._run_ros2_cmd(f'ros2 topic pub /cmd_vel geometry_msgs/msg/Twist "{msg}" --once') + if rc != 0: + logger.warning(f"发布 cmd_vel 失败: {err}") def move_forward(self, speed: float = 0.5, duration: float = None): """前进""" @@ -173,14 +160,19 @@ class AGVController: try: # 从 /odom topic 获取位置 rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /odom 2>timeout 10 ros2 topic echo /odom --once 2>/dev/null1 | head -1", + "timeout 5 ros2 topic echo /odom --once 2>/dev/null", timeout=6 ) if rc == 0 and out: # 解析 odom 消息 (YAML 格式) # ros2 topic echo 输出可能含多个 --- 分隔的文档,只取第一个 - import yaml - yaml_str = out.split('---')[0] + # 注意:RTPS 错误信息混在 stderr 通过管道进入 stdout,需要去掉 + import re, yaml + # 去掉 ANSI 转义序列 + clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) + # 去掉时间戳前缀行(形如 "2026-05-15 23:39:38.824 [RTPS_TRANSPORT_SHM Error] ...") + yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip() + yaml_str = yaml_str.split('---')[0] data = yaml.safe_load(yaml_str) if data: pos = data.get("pose", {}).get("pose", {}).get("position", {}) @@ -209,13 +201,14 @@ class AGVController: try: # 从 /voltage topic 获取电压 rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /voltage 2>timeout 10 ros2 topic echo /voltage --once 2>/dev/null1 | head -1", + "timeout 5 ros2 topic echo /voltage --once 2>/dev/null", timeout=6 ) if rc == 0 and out: # 解析电压消息(ros2 topic echo 可能输出多文档 YAML) - import yaml - yaml_str = out.split('---')[0] + import re, yaml + clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) + yaml_str = clean.split('---')[0].strip() data = yaml.safe_load(yaml_str) if data: self._voltage = data.get("data", 0.0)