161 lines
4.9 KiB
Python
161 lines
4.9 KiB
Python
"""
|
|
AGV 导航控制模块 - 通过 pymycobot 控制 AGV 运动
|
|
"""
|
|
import time
|
|
import logging
|
|
from typing import Tuple, Optional, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 尝试导入 pymycobot
|
|
try:
|
|
from pymycobot import MyAGVPro
|
|
MYCOBOT_AVAILABLE = True
|
|
except ImportError:
|
|
MYCOBOT_AVAILABLE = False
|
|
logger.warning("pymycobot 未安装,AGV 控制功能不可用")
|
|
|
|
|
|
class AGVController:
|
|
"""AGV 运动控制"""
|
|
|
|
def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000):
|
|
self.device = device
|
|
self.baudrate = baudrate
|
|
self._agv: Optional[MyAGVPro] = None
|
|
self._connected = False
|
|
|
|
def connect(self) -> bool:
|
|
"""连接 AGV"""
|
|
if not MYCOBOT_AVAILABLE:
|
|
logger.error("pymycobot 不可用")
|
|
return False
|
|
try:
|
|
self._agv = MyAGVPro(self.device, self.baudrate, debug=False)
|
|
# 检查是否上电
|
|
if self._agv.is_power_on():
|
|
self._connected = True
|
|
logger.info("AGV 连接成功")
|
|
return True
|
|
else:
|
|
logger.warning("AGV 未上电,尝试上电...")
|
|
self._agv.power_on()
|
|
time.sleep(2)
|
|
if self._agv.is_power_on():
|
|
self._connected = True
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"AGV 连接失败: {e}")
|
|
return False
|
|
|
|
def is_connected(self) -> bool:
|
|
return self._connected and self._agv is not None
|
|
|
|
def move_forward(self, speed: float = 0.5, duration: float = None):
|
|
"""前进"""
|
|
if not self.is_connected():
|
|
return
|
|
self._agv.move_forward(speed)
|
|
if duration:
|
|
time.sleep(duration)
|
|
self.stop()
|
|
|
|
def move_backward(self, speed: float = 0.5, duration: float = None):
|
|
"""后退"""
|
|
if not self.is_connected():
|
|
return
|
|
self._agv.move_backward(speed)
|
|
if duration:
|
|
time.sleep(duration)
|
|
self.stop()
|
|
|
|
def turn_left(self, speed: float = 0.5, duration: float = None):
|
|
"""左转"""
|
|
if not self.is_connected():
|
|
return
|
|
self._agv.turn_left(speed)
|
|
if duration:
|
|
time.sleep(duration)
|
|
self.stop()
|
|
|
|
def turn_right(self, speed: float = 0.5, duration: float = None):
|
|
"""右转"""
|
|
if not self.is_connected():
|
|
return
|
|
self._agv.turn_right(speed)
|
|
if duration:
|
|
time.sleep(duration)
|
|
self.stop()
|
|
|
|
def move_left_lateral(self, speed: float = 0.5, duration: float = None):
|
|
"""向左横向移动"""
|
|
if not self.is_connected():
|
|
return
|
|
self._agv.move_left_lateral(speed)
|
|
if duration:
|
|
time.sleep(duration)
|
|
self.stop()
|
|
|
|
def move_right_lateral(self, speed: float = 0.5, duration: float = None):
|
|
"""向右横向移动"""
|
|
if not self.is_connected():
|
|
return
|
|
self._agv.move_right_lateral(speed)
|
|
if duration:
|
|
time.sleep(duration)
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""停止"""
|
|
if self.is_connected():
|
|
self._agv.stop()
|
|
|
|
def get_position(self) -> Optional[List[float]]:
|
|
"""获取 AGV 当前位置 [x, y, rz]"""
|
|
if not self.is_connected():
|
|
return None
|
|
try:
|
|
# 启用自动报告以获取位置
|
|
self._agv.set_auto_report_state(1)
|
|
time.sleep(0.5)
|
|
msg = self._agv.get_auto_report_message()
|
|
if msg and len(msg) >= 3:
|
|
return [msg[0], msg[1], msg[2]]
|
|
except Exception as e:
|
|
logger.error(f"获取 AGV 位置失败: {e}")
|
|
return None
|
|
|
|
def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 0.5) -> bool:
|
|
"""移动到目标点(简单的方向控制实现)"""
|
|
# 注意:AGV Pro 的 pymycobot 没有直接 goto API
|
|
# 需要 ROS2 SLAM 导航支持,此处提供基础运动接口
|
|
# 实际导航需要结合地图和路径规划
|
|
logger.warning("go_to_point 需要 ROS2 导航支持,当前仅记录目标")
|
|
return True
|
|
|
|
def get_battery(self) -> Optional[float]:
|
|
"""获取电池电压"""
|
|
if not self.is_connected():
|
|
return None
|
|
try:
|
|
self._agv.set_auto_report_state(1)
|
|
msg = self._agv.get_auto_report_message()
|
|
if msg and len(msg) > 5:
|
|
return msg[5] # 电池电压
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def disconnect(self):
|
|
if self._agv:
|
|
self.stop()
|
|
self._agv = None
|
|
self._connected = False
|
|
|
|
def __enter__(self):
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.disconnect() |