速度调节
This commit is contained in:
@@ -107,6 +107,10 @@ class MissionExecutorV3:
|
||||
# Nav2 导航器(直接使用 rclpy BasicNavigator API,比 subprocess 更可靠)
|
||||
self._nav = Nav2Navigator()
|
||||
|
||||
# 速度控制(默认值,可在 execute_mission 时覆写)
|
||||
self.arm_speed = 500
|
||||
self.agv_speed = 0.5
|
||||
|
||||
# ==================== 连接 ====================
|
||||
|
||||
def connect_all(self) -> Dict[str, bool]:
|
||||
@@ -234,6 +238,15 @@ class MissionExecutorV3:
|
||||
arm_initial_pose = mission_config.get("arm_initial_pose", [0.0] * 6)
|
||||
has_arm_pose = self.arm_client and any(abs(a) > 0.01 for a in arm_initial_pose)
|
||||
|
||||
# 速度控制(从前端传入)
|
||||
self.arm_speed = int(options.get("arm_speed", 500))
|
||||
self.agv_speed = float(options.get("agv_speed", 0.5))
|
||||
self._log(f"🚀 AGV速度={self.agv_speed:.1f}m/s, 机械臂速度={self.arm_speed}")
|
||||
|
||||
# 设置 Nav2 导航速度(仅在任务开始时设一次)
|
||||
if opt_agv_move:
|
||||
self._set_nav_speed()
|
||||
|
||||
# 进度统计
|
||||
max_actions = total_machines * 2 # 每台机器正面+背面
|
||||
completed_actions = 0
|
||||
@@ -282,7 +295,7 @@ class MissionExecutorV3:
|
||||
if opt_arm_init and has_arm_pose and opt_agv_move:
|
||||
self._log(" 🦾 恢复机械臂初始姿态")
|
||||
try:
|
||||
self.arm_client.set_angles(arm_initial_pose, speed=500)
|
||||
self.arm_client.set_angles(arm_initial_pose, speed=self.arm_speed)
|
||||
self._wait_arm_ready(arm_initial_pose)
|
||||
except Exception as e:
|
||||
self._log(f" ⚠️ 机械臂初始化失败: {e}")
|
||||
@@ -513,6 +526,19 @@ class MissionExecutorV3:
|
||||
|
||||
# ==================== 导航 ====================
|
||||
|
||||
def _set_nav_speed(self) -> bool:
|
||||
"""动态设置 Nav2 控制器最大速度参数"""
|
||||
try:
|
||||
# 尝试设置 controller_server 的线速度参数
|
||||
vel = self.agv_speed
|
||||
cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 param set /controller_server FollowPath.desired_linear_vel {vel:.2f} 2>/dev/null || true'"
|
||||
subprocess.run(cmd, shell=True, timeout=5, capture_output=True)
|
||||
self._log(f" 🚀 AGV 速度设为 {vel:.1f} m/s")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"设置 AGV 速度失败: {e}")
|
||||
return False
|
||||
|
||||
def _navigate(self, point: dict, label: str) -> bool:
|
||||
coords = point["coords"]
|
||||
x, y = float(coords[0]), float(coords[1])
|
||||
@@ -557,7 +583,7 @@ class MissionExecutorV3:
|
||||
self._log(f" [{i+1}/{len(qr_configs)}] {name}")
|
||||
# 调整机械臂姿态
|
||||
if self.arm_client:
|
||||
self.arm_client.set_angles(angles, speed=500)
|
||||
self.arm_client.set_angles(angles, speed=self.arm_speed)
|
||||
self._wait_arm_ready(angles)
|
||||
# 读取摄像头并扫码
|
||||
qr = self._decode_qr_from_arm()
|
||||
@@ -656,7 +682,7 @@ class MissionExecutorV3:
|
||||
|
||||
# 调整机械臂
|
||||
if self.arm_client:
|
||||
self.arm_client.set_angles(angles, speed=500)
|
||||
self.arm_client.set_angles(angles, speed=self.arm_speed)
|
||||
self._wait_arm_ready(angles)
|
||||
|
||||
# 拍照
|
||||
|
||||
Reference in New Issue
Block a user