diff --git a/agv_app/utils/agv_controller_ros2.py b/agv_app/utils/agv_controller_ros2.py index e2ea2c4..82e2899 100644 --- a/agv_app/utils/agv_controller_ros2.py +++ b/agv_app/utils/agv_controller_ros2.py @@ -12,7 +12,7 @@ from typing import Tuple, Optional, List logger = logging.getLogger(__name__) # ROS2 环境设置 -ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source ~/agv_pro_ros2/install/setup.bash" +ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" class AGVController: @@ -28,7 +28,7 @@ class AGVController: def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: """执行 ros2 命令""" - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" + full_cmd = f"bash -c 'export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash && {cmd}'" try: result = subprocess.run( full_cmd, @@ -64,7 +64,7 @@ class AGVController: # 尝试获取一次位置数据 rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /odom 2>timeout 10 ros2 topic echo /odom --once 2>/dev/null1 | head -1", + "timeout 5 ros2 topic echo /odom --once 2>/dev/null", timeout=6 ) @@ -89,23 +89,10 @@ class AGVController: def _publish_cmd_vel(self, linear_x: float = 0.0, linear_y: float = 0.0, angular_z: float = 0.0): """发布速度命令到 /cmd_vel""" - # 直接执行,避免引号嵌套问题 msg = f'{{"linear": {{"x": {linear_x}, "y": {linear_y}, "z": 0.0}}, "angular": {{"x": 0.0, "y": 0.0, "z": {angular_z}}}}}' - full_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \"{msg}\" --once'" - try: - result = subprocess.run( - full_cmd, - shell=True, - capture_output=True, - text=True, - timeout=5 - ) - if result.returncode != 0: - logger.warning(f"发布 cmd_vel 失败: {result.stderr.strip()}") - except subprocess.TimeoutExpired: - logger.warning("发布 cmd_vel 超时") - except Exception as e: - logger.warning(f"发布 cmd_vel 失败: {e}") + rc, out, err = self._run_ros2_cmd(f'ros2 topic pub /cmd_vel geometry_msgs/msg/Twist "{msg}" --once') + if rc != 0: + logger.warning(f"发布 cmd_vel 失败: {err}") def move_forward(self, speed: float = 0.5, duration: float = None): """前进""" @@ -173,14 +160,19 @@ class AGVController: try: # 从 /odom topic 获取位置 rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /odom 2>timeout 10 ros2 topic echo /odom --once 2>/dev/null1 | head -1", + "timeout 5 ros2 topic echo /odom --once 2>/dev/null", timeout=6 ) if rc == 0 and out: # 解析 odom 消息 (YAML 格式) # ros2 topic echo 输出可能含多个 --- 分隔的文档,只取第一个 - import yaml - yaml_str = out.split('---')[0] + # 注意:RTPS 错误信息混在 stderr 通过管道进入 stdout,需要去掉 + import re, yaml + # 去掉 ANSI 转义序列 + clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) + # 去掉时间戳前缀行(形如 "2026-05-15 23:39:38.824 [RTPS_TRANSPORT_SHM Error] ...") + yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip() + yaml_str = yaml_str.split('---')[0] data = yaml.safe_load(yaml_str) if data: pos = data.get("pose", {}).get("pose", {}).get("position", {}) @@ -209,13 +201,14 @@ class AGVController: try: # 从 /voltage topic 获取电压 rc, out, err = self._run_ros2_cmd( - "timeout 5 ros2 topic echo /voltage 2>timeout 10 ros2 topic echo /voltage --once 2>/dev/null1 | head -1", + "timeout 5 ros2 topic echo /voltage --once 2>/dev/null", timeout=6 ) if rc == 0 and out: # 解析电压消息(ros2 topic echo 可能输出多文档 YAML) - import yaml - yaml_str = out.split('---')[0] + import re, yaml + clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) + yaml_str = clean.split('---')[0].strip() data = yaml.safe_load(yaml_str) if data: self._voltage = data.get("data", 0.0)