Files
smart-inspection/agv_app/utils/agv_controller.py
T
2026-05-14 21:43:35 +08:00

161 lines
4.9 KiB
Python

"""
AGV 导航控制模块 - 通过 pymycobot 控制 AGV 运动
"""
import time
import logging
from typing import Tuple, Optional, List
logger = logging.getLogger(__name__)
# 尝试导入 pymycobot
try:
from pymycobot import MyAGVPro
MYCOBOT_AVAILABLE = True
except ImportError:
MYCOBOT_AVAILABLE = False
logger.warning("pymycobot 未安装,AGV 控制功能不可用")
class AGVController:
"""AGV 运动控制"""
def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000):
self.device = device
self.baudrate = baudrate
self._agv: Optional[MyAGVPro] = None
self._connected = False
def connect(self) -> bool:
"""连接 AGV"""
if not MYCOBOT_AVAILABLE:
logger.error("pymycobot 不可用")
return False
try:
self._agv = MyAGVPro(self.device, self.baudrate, debug=False)
# 检查是否上电
if self._agv.is_power_on():
self._connected = True
logger.info("AGV 连接成功")
return True
else:
logger.warning("AGV 未上电,尝试上电...")
self._agv.power_on()
time.sleep(2)
if self._agv.is_power_on():
self._connected = True
return True
return False
except Exception as e:
logger.error(f"AGV 连接失败: {e}")
return False
def is_connected(self) -> bool:
return self._connected and self._agv is not None
def move_forward(self, speed: float = 0.5, duration: float = None):
"""前进"""
if not self.is_connected():
return
self._agv.move_forward(speed)
if duration:
time.sleep(duration)
self.stop()
def move_backward(self, speed: float = 0.5, duration: float = None):
"""后退"""
if not self.is_connected():
return
self._agv.move_backward(speed)
if duration:
time.sleep(duration)
self.stop()
def turn_left(self, speed: float = 0.5, duration: float = None):
"""左转"""
if not self.is_connected():
return
self._agv.turn_left(speed)
if duration:
time.sleep(duration)
self.stop()
def turn_right(self, speed: float = 0.5, duration: float = None):
"""右转"""
if not self.is_connected():
return
self._agv.turn_right(speed)
if duration:
time.sleep(duration)
self.stop()
def move_left_lateral(self, speed: float = 0.5, duration: float = None):
"""向左横向移动"""
if not self.is_connected():
return
self._agv.move_left_lateral(speed)
if duration:
time.sleep(duration)
self.stop()
def move_right_lateral(self, speed: float = 0.5, duration: float = None):
"""向右横向移动"""
if not self.is_connected():
return
self._agv.move_right_lateral(speed)
if duration:
time.sleep(duration)
self.stop()
def stop(self):
"""停止"""
if self.is_connected():
self._agv.stop()
def get_position(self) -> Optional[List[float]]:
"""获取 AGV 当前位置 [x, y, rz]"""
if not self.is_connected():
return None
try:
# 启用自动报告以获取位置
self._agv.set_auto_report_state(1)
time.sleep(0.5)
msg = self._agv.get_auto_report_message()
if msg and len(msg) >= 3:
return [msg[0], msg[1], msg[2]]
except Exception as e:
logger.error(f"获取 AGV 位置失败: {e}")
return None
def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 0.5) -> bool:
"""移动到目标点(简单的方向控制实现)"""
# 注意:AGV Pro 的 pymycobot 没有直接 goto API
# 需要 ROS2 SLAM 导航支持,此处提供基础运动接口
# 实际导航需要结合地图和路径规划
logger.warning("go_to_point 需要 ROS2 导航支持,当前仅记录目标")
return True
def get_battery(self) -> Optional[float]:
"""获取电池电压"""
if not self.is_connected():
return None
try:
self._agv.set_auto_report_state(1)
msg = self._agv.get_auto_report_message()
if msg and len(msg) > 5:
return msg[5] # 电池电压
except:
pass
return None
def disconnect(self):
if self._agv:
self.stop()
self._agv = None
self._connected = False
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()