""" AGV 导航控制模块 - 通过 ROS2 控制 AGV 运动 使用 ros2 CLI 命令进行通信,避免 rclpy 导入问题 """ import time import subprocess import json import logging import math from typing import Tuple, Optional, List logger = logging.getLogger(__name__) # ROS2 环境设置 ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash" class AGVController: """AGV 运动控制 - ROS2 版本""" def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000): self.device = device self.baudrate = baudrate self._connected = False self._position = [0.0, 0.0, 0.0] # [x, y, yaw] self._voltage = 0.0 self._ros2_available = False def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: """执行 ros2 命令""" full_cmd = f"bash -c 'source /opt/ros/humble/setup.bash && source /home/elephant/agv_pro_ros2/install/setup.bash && export ROS_DOMAIN_ID=0 && {cmd}'" try: result = subprocess.run( full_cmd, shell=True, capture_output=True, text=True, timeout=timeout ) return result.returncode, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Timeout" except Exception as e: return -1, "", str(e) def connect(self) -> bool: """连接 AGV - 检查 ROS2 节点和 topic""" try: # 检查 agv_pro_node 是否运行 rc, out, err = self._run_ros2_cmd("ros2 node list") if rc != 0: logger.error(f"ROS2 节点列表获取失败: {err}") return False if "/agv_pro_node" not in out: logger.error("agv_pro_node 未运行") return False # 检查 /odom topic rc, out, err = self._run_ros2_cmd("ros2 topic list") if "/odom" not in out: logger.error("/odom topic 不存在") return False # 尝试获取一次位置数据 rc, out, err = self._run_ros2_cmd( "timeout 5 ros2 topic echo /odom --once 2>/dev/null", timeout=6 ) if rc == 0 and out: self._connected = True self._ros2_available = True logger.info("AGV ROS2 连接成功") return True else: # /odom 可能暂时没数据,但节点存在也算连接成功 self._connected = True self._ros2_available = True logger.info("AGV ROS2 连接成功 (节点存在,等待 odom 数据)") return True except Exception as e: logger.error(f"AGV 连接失败: {e}") return False def is_connected(self) -> bool: return self._connected def _publish_cmd_vel(self, linear_x: float = 0.0, linear_y: float = 0.0, angular_z: float = 0.0): """发布速度命令到 /cmd_vel""" msg = f'{{"linear": {{"x": {linear_x}, "y": {linear_y}, "z": 0.0}}, "angular": {{"x": 0.0, "y": 0.0, "z": {angular_z}}}}}' rc, out, err = self._run_ros2_cmd(f'ros2 topic pub /cmd_vel geometry_msgs/msg/Twist "{msg}" --once') if rc != 0: logger.warning(f"发布 cmd_vel 失败: {err}") def move_forward(self, speed: float = 0.5, duration: float = None): """前进""" if not self.is_connected(): return self._publish_cmd_vel(linear_x=speed) if duration: time.sleep(duration) self.stop() def move_backward(self, speed: float = 0.5, duration: float = None): """后退""" if not self.is_connected(): return self._publish_cmd_vel(linear_x=-speed) if duration: time.sleep(duration) self.stop() def turn_left(self, speed: float = 0.5, duration: float = None): """左转""" if not self.is_connected(): return self._publish_cmd_vel(angular_z=speed) if duration: time.sleep(duration) self.stop() def turn_right(self, speed: float = 0.5, duration: float = None): """右转""" if not self.is_connected(): return self._publish_cmd_vel(angular_z=-speed) if duration: time.sleep(duration) self.stop() def move_left_lateral(self, speed: float = 0.5, duration: float = None): """向左横向移动""" if not self.is_connected(): return self._publish_cmd_vel(linear_y=speed) if duration: time.sleep(duration) self.stop() def move_right_lateral(self, speed: float = 0.5, duration: float = None): """向右横向移动""" if not self.is_connected(): return self._publish_cmd_vel(linear_y=-speed) if duration: time.sleep(duration) self.stop() def stop(self): """停止""" if self.is_connected(): self._publish_cmd_vel(0, 0, 0) def get_position(self) -> Optional[List[float]]: """获取 AGV 当前位置 [x, y, yaw]""" if not self.is_connected(): return None try: # 从 /odom topic 获取位置 rc, out, err = self._run_ros2_cmd( "timeout 5 ros2 topic echo /odom --once 2>/dev/null", timeout=6 ) if rc == 0 and out: # 解析 odom 消息 (YAML 格式) # ros2 topic echo 输出可能含多个 --- 分隔的文档,只取第一个 # 注意:RTPS 错误信息混在 stderr 通过管道进入 stdout,需要去掉 import re, yaml # 去掉 ANSI 转义序列 clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) # 去掉时间戳前缀行(形如 "2026-05-15 23:39:38.824 [RTPS_TRANSPORT_SHM Error] ...") yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip() yaml_str = yaml_str.split('---')[0] data = yaml.safe_load(yaml_str) if data: pos = data.get("pose", {}).get("pose", {}).get("position", {}) x = pos.get("x", 0.0) y = pos.get("y", 0.0) # 从四元数计算 yaw orient = data.get("pose", {}).get("pose", {}).get("orientation", {}) qz = orient.get("z", 0.0) qw = orient.get("w", 1.0) yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz) self._position = [x, y, yaw] return self._position elif self._connected: # 连接正常但 odom 暂时无数据,返回缓存位置 return self._position except Exception as e: logger.debug(f"获取位置失败: {e}") if self._connected: return self._position return None def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 0.5) -> bool: """移动到目标点(需要 ROS2 导航栈)""" logger.warning("go_to_point 需要 ROS2 Nav2 支持,当前仅记录目标") return True def get_battery(self) -> Optional[float]: """获取电池电压""" if not self.is_connected(): return None try: # 从 /voltage topic 获取电压 rc, out, err = self._run_ros2_cmd( "timeout 5 ros2 topic echo /voltage --once 2>/dev/null", timeout=6 ) if rc == 0 and out: # 解析电压消息(ros2 topic echo 可能输出多文档 YAML) import re, yaml clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out) yaml_str = clean.split('---')[0].strip() data = yaml.safe_load(yaml_str) if data: self._voltage = data.get("data", 0.0) return self._voltage except Exception as e: logger.debug(f"获取电压失败: {e}") return None def disconnect(self): self.stop() self._connected = False def __enter__(self): self.connect() return self def __exit__(self, *args): self.disconnect()