Files
smart-inspection/agv_app/utils/agv_controller_ros2.py
T

235 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AGV 导航控制模块 - 通过 ROS2 控制 AGV 运动
使用 ros2 CLI 命令进行通信,避免 rclpy 导入问题
"""
import time
import subprocess
import json
import logging
import math
from typing import Tuple, Optional, List
logger = logging.getLogger(__name__)
# ROS2 环境设置
ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash"
class AGVController:
"""AGV 运动控制 - ROS2 版本"""
def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000):
self.device = device
self.baudrate = baudrate
self._connected = False
self._position = [0.0, 0.0, 0.0] # [x, y, yaw]
self._voltage = 0.0
self._ros2_available = False
def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple:
"""执行 ros2 命令"""
full_cmd = f"bash -c 'export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash && {cmd}'"
try:
result = subprocess.run(
full_cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
except Exception as e:
return -1, "", str(e)
def connect(self) -> bool:
"""连接 AGV - 检查 ROS2 节点和 topic"""
try:
# 检查 agv_pro_node 是否运行
rc, out, err = self._run_ros2_cmd("ros2 node list")
if rc != 0:
logger.error(f"ROS2 节点列表获取失败: {err}")
return False
if "/agv_pro_node" not in out:
logger.error("agv_pro_node 未运行")
return False
# 检查 /odom topic
rc, out, err = self._run_ros2_cmd("ros2 topic list")
if "/odom" not in out:
logger.error("/odom topic 不存在")
return False
# 尝试获取一次位置数据
rc, out, err = self._run_ros2_cmd(
"timeout 5 ros2 topic echo /odom --once 2>/dev/null",
timeout=6
)
if rc == 0 and out:
self._connected = True
self._ros2_available = True
logger.info("AGV ROS2 连接成功")
return True
else:
# /odom 可能暂时没数据,但节点存在也算连接成功
self._connected = True
self._ros2_available = True
logger.info("AGV ROS2 连接成功 (节点存在,等待 odom 数据)")
return True
except Exception as e:
logger.error(f"AGV 连接失败: {e}")
return False
def is_connected(self) -> bool:
return self._connected
def _publish_cmd_vel(self, linear_x: float = 0.0, linear_y: float = 0.0, angular_z: float = 0.0):
"""发布速度命令到 /cmd_vel"""
msg = f'{{"linear": {{"x": {linear_x}, "y": {linear_y}, "z": 0.0}}, "angular": {{"x": 0.0, "y": 0.0, "z": {angular_z}}}}}'
rc, out, err = self._run_ros2_cmd(f'ros2 topic pub /cmd_vel geometry_msgs/msg/Twist "{msg}" --once')
if rc != 0:
logger.warning(f"发布 cmd_vel 失败: {err}")
def move_forward(self, speed: float = 0.5, duration: float = None):
"""前进"""
if not self.is_connected():
return
self._publish_cmd_vel(linear_x=speed)
if duration:
time.sleep(duration)
self.stop()
def move_backward(self, speed: float = 0.5, duration: float = None):
"""后退"""
if not self.is_connected():
return
self._publish_cmd_vel(linear_x=-speed)
if duration:
time.sleep(duration)
self.stop()
def turn_left(self, speed: float = 0.5, duration: float = None):
"""左转"""
if not self.is_connected():
return
self._publish_cmd_vel(angular_z=speed)
if duration:
time.sleep(duration)
self.stop()
def turn_right(self, speed: float = 0.5, duration: float = None):
"""右转"""
if not self.is_connected():
return
self._publish_cmd_vel(angular_z=-speed)
if duration:
time.sleep(duration)
self.stop()
def move_left_lateral(self, speed: float = 0.5, duration: float = None):
"""向左横向移动"""
if not self.is_connected():
return
self._publish_cmd_vel(linear_y=speed)
if duration:
time.sleep(duration)
self.stop()
def move_right_lateral(self, speed: float = 0.5, duration: float = None):
"""向右横向移动"""
if not self.is_connected():
return
self._publish_cmd_vel(linear_y=-speed)
if duration:
time.sleep(duration)
self.stop()
def stop(self):
"""停止"""
if self.is_connected():
self._publish_cmd_vel(0, 0, 0)
def get_position(self) -> Optional[List[float]]:
"""获取 AGV 当前位置 [x, y, yaw]"""
if not self.is_connected():
return None
try:
# 从 /odom topic 获取位置
rc, out, err = self._run_ros2_cmd(
"timeout 5 ros2 topic echo /odom --once 2>/dev/null",
timeout=6
)
if rc == 0 and out:
# 解析 odom 消息 (YAML 格式)
# ros2 topic echo 输出可能含多个 --- 分隔的文档,只取第一个
# 注意:RTPS 错误信息混在 stderr 通过管道进入 stdout,需要去掉
import re, yaml
# 去掉 ANSI 转义序列
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
# 去掉时间戳前缀行(形如 "2026-05-15 23:39:38.824 [RTPS_TRANSPORT_SHM Error] ..."
yaml_str = re.sub(r'^[\d\-:. ]+\[RTPS[^\]]*\].*\n?', '', clean, flags=re.MULTILINE).strip()
yaml_str = yaml_str.split('---')[0]
data = yaml.safe_load(yaml_str)
if data:
pos = data.get("pose", {}).get("pose", {}).get("position", {})
x = pos.get("x", 0.0)
y = pos.get("y", 0.0)
# 从四元数计算 yaw
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
qz = orient.get("z", 0.0)
qw = orient.get("w", 1.0)
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
self._position = [x, y, yaw]
return self._position
elif self._connected:
# 连接正常但 odom 暂时无数据,返回缓存位置
return self._position
except Exception as e:
logger.debug(f"获取位置失败: {e}")
if self._connected:
return self._position
return None
def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 0.5) -> bool:
"""移动到目标点(需要 ROS2 导航栈)"""
logger.warning("go_to_point 需要 ROS2 Nav2 支持,当前仅记录目标")
return True
def get_battery(self) -> Optional[float]:
"""获取电池电压"""
if not self.is_connected():
return None
try:
# 从 /voltage topic 获取电压
rc, out, err = self._run_ros2_cmd(
"timeout 5 ros2 topic echo /voltage --once 2>/dev/null",
timeout=6
)
if rc == 0 and out:
# 解析电压消息(ros2 topic echo 可能输出多文档 YAML
import re, yaml
clean = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', out)
yaml_str = clean.split('---')[0].strip()
data = yaml.safe_load(yaml_str)
if data:
self._voltage = data.get("data", 0.0)
return self._voltage
except Exception as e:
logger.debug(f"获取电压失败: {e}")
return None
def disconnect(self):
self.stop()
self._connected = False
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()