Files
smart-inspection/agv_app/utils/mission_executor.py
T
2026-05-28 13:55:55 +08:00

880 lines
33 KiB
Python

# -*- coding: utf-8 -*-
"""
任务执行器 V3 — M×N 网格蛇形路径拍摄
工作流:
1. 根据 grid 生成蛇形路径(奇数行左→右,偶数行右→左)
2. 逐台机器:
- 正面:导航 → 扫码(多姿态重试) → 查机型 → 按姿态拍照
- 背面:导航 → 按姿态拍照
3. 回到 (0,0)
"""
import os
import time
import json
import logging
import threading
import subprocess
import math
from typing import Optional, Dict, List
from enum import Enum
import requests
import cv2
import numpy as np
from utils.nav2_navigator import Nav2Navigator, Nav2Status
logger = logging.getLogger(__name__)
ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash"
from config import ARM_CAMERA_CONFIG
ARM_CAMERA_SNAPSHOT = ARM_CAMERA_CONFIG["snapshot_url"]
PHOTOS_DIR = "/home/elephant/photos"
# 二维码扫描重试参数
QR_SCAN_TIMEOUT = 5 # 单次扫描超时
QR_POSE_WAIT = 1.5 # 调整姿态后等待时间
MANUAL_QR_TIMEOUT = 300 # 5分钟超时
class MissionStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
WAITING_QR = "waiting_qr"
WAITING_ERROR = "waiting_error"
WAITING_STEP = "waiting_step"
class MissionExecutorV3:
"""任务执行器 V3 — M×N 网格蛇形路径"""
_instance = None # 单例,供外部停止用
def __init__(self, config: dict):
self.config = config
self.status = MissionStatus.IDLE
MissionExecutorV3._instance = self
# 实时状态报告
self.report = {
"status": "idle",
"step": "",
"progress": 0,
"total": 0,
"log": [],
"error": None,
}
# 线程同步
self._stop = threading.Event()
self._pause = threading.Event()
self._pause.set() # 初始不暂停
self._qr_event = threading.Event()
self._qr_value: Optional[str] = None
# 错误弹窗
self._error_choice = None # "skip" or "abort"
# 单步执行
self._single_step_mode = False
self._step_choice = None # "confirm", "retry", "abort"
self._error_mode = False # True when waiting for error resolution
# 错误弹窗
self._error_choice = None # "skip" or "abort"
# 单步执行
self._single_step_mode = False
self._step_choice = None # "confirm", "retry", "abort"
self._error_mode = False # True when waiting for error resolution
# 设备
from .arm_client import ArmClient
self.arm_client = ArmClient(
config["arm"]["host"],
config["arm"]["port"]
)
from .agv_controller_ros2 import AGVController
self.agv = AGVController(
device=config.get("device", "/dev/agvpro_controller"),
baudrate=config.get("baudrate", 1000000)
)
# Nav2 导航器(直接使用 rclpy BasicNavigator API,比 subprocess 更可靠)
self._nav = Nav2Navigator()
# ==================== 连接 ====================
def connect_all(self) -> Dict[str, bool]:
results = {}
results["agv"] = self.agv.connect()
results["arm"] = self.arm_client.connect()
return results
def disconnect_all(self):
if self.arm_client:
self.arm_client.close()
self.agv.disconnect()
# ==================== 主任务流程 ====================
def execute_mission(
self,
mission_config: dict,
machines: list,
qr_configs: list,
models: list,
single_step: bool = False,
options: dict = None,
) -> dict:
"""
执行完整拍摄任务。
Args:
options: 任务步骤控制开关
- arm_init: 是否执行机械臂位置初始化
- agv_move: 是否执行AGV移动
- qr_scan: 是否执行二维码识别
- front_photo: 是否执行正面拍照
- back_photo: 是否执行背面拍照
"""
"""
执行完整拍摄任务。
Args:
mission_config: {rows, cols, grid, positions}
machines: [{id, row, col, front: {coords}, back: {coords}}]
qr_configs: [{id, name, joint_angles: [a1..a6]}]
models: [{id, name, poses: [{name, arm_angles}], poses_back: [...]}]
Returns:
执行报告 dict
"""
self.status = MissionStatus.RUNNING
self.report = {"status": "running", "step": "初始化", "progress": 0, "total": 0, "log": [], "error": None}
self._stop.clear()
self._pause.set()
start_time = time.time()
try:
rows = int(mission_config.get("rows", 1))
cols = int(mission_config.get("cols", 1))
grid = mission_config.get("grid", [])
positions = mission_config.get("positions", [])
# 如果 grid 为空,从 machines 重建
if not grid or all(not any(rw) if isinstance(rw, list) else True for rw in grid):
try:
cfg_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data")
with open(os.path.join(cfg_dir, "machines_config.json")) as jf:
machines = json.load(jf)
grid = [[False] * cols for _ in range(rows)]
for m in machines:
r = int(m.get("row", 0))
c = int(m.get("col", 0))
if 0 <= r < rows and 0 <= c < cols:
grid[r][c] = True
except Exception:
grid = [[False] * cols for _ in range(rows)]
# 1. 生成蛇形路径
path = MissionExecutorV3._build_snake_path(rows, cols, grid)
if not path:
self._log("❌ 网格中没有点位,任务终止")
self.report["error"] = "No points in grid"
return self._finish(0)
# 统计总机器数(用于进度计算)
total_machines = 0
for rw in grid:
for v in rw:
if v:
total_machines += 1
self.report["total"] = total_machines
# 扫码结果缓存:正面扫到的 QR 传给背面
qr_cache = {}
# 初始化任务列表(机器级别)
self.report["tasks"] = []
for r in range(rows):
for c in range(cols):
if MissionExecutorV3._has_machine(grid, r, c):
self.report["tasks"].append({
"row": r, "col": c,
"machine_id": f"m_{r}_{c}",
"label": f"{r+1}-{c+1}",
"status": "pending",
"step": "等待",
"qr_value": None,
"photos_front": 0,
"photos_back": 0,
})
self._log(f"📍 点位蛇形路径: {len(path)} 个点位, {total_machines} 台机器")
# 任务步骤控制开关
if options is None:
options = {}
opt_arm_init = options.get("arm_init", True)
opt_agv_move = options.get("agv_move", True)
if opt_agv_move:
opt_arm_init = True
opt_qr_scan = options.get("qr_scan", True)
opt_front_photo = options.get("front_photo", True)
opt_back_photo = options.get("back_photo", True)
self._log(f"⚙️ 任务步骤: agv_move={opt_agv_move}, "
f"qr_scan={opt_qr_scan}, front={opt_front_photo}, back={opt_back_photo}")
# 机械臂初始姿态(AGV 移动前恢复)
arm_initial_pose = mission_config.get("arm_initial_pose", [0.0] * 6)
has_arm_pose = self.arm_client and any(abs(a) > 0.01 for a in arm_initial_pose)
# 进度统计
max_actions = total_machines * 2 # 每台机器正面+背面
completed_actions = 0
# 2. 逐点位蛇形执行
pi = 0
while pi < len(path):
if self._stop.is_set():
self._log("⏹️ 任务已停止")
break
self._wait_pause()
pr, c = path[pi] # pr = 点位行, c = 列
# 判断该点位需要做什么
has_front = pr < rows and MissionExecutorV3._has_machine(grid, pr, c)
has_back = pr > 0 and MissionExecutorV3._has_machine(grid, pr - 1, c)
if not has_front and not has_back:
pi += 1
continue
# 日志 & 步骤更新
rl_front = pr + 1 if has_front else 0
cl_front = c + 1 if has_front else 0
rl_back = pr if has_back else 0
cl_back = c + 1 if has_back else 0
log_parts = []
if has_front:
log_parts.append(f"正面:机器{rl_front}-{cl_front}")
task = self._get_task(pr, c)
if task:
task["status"] = "active"
task["step"] = "正面扫码"
if has_back:
log_parts.append(f"背面:机器{rl_back}-{cl_back}")
task = self._get_task(pr - 1, c)
if task:
task["step"] = "背面拍照"
self._log(f"📍 点位 ({pr},{c}) → {' & '.join(log_parts)}")
self._step(f"点位({pr},{c})")
# 恢复机械臂初始姿态(AGV 移动前)
if opt_arm_init and has_arm_pose and opt_agv_move:
self._log(" 🦾 恢复机械臂初始姿态")
try:
self.arm_client.set_angles(arm_initial_pose, speed=500)
self._wait_arm_ready(arm_initial_pose)
except Exception as e:
self._log(f" ⚠️ 机械臂初始化失败: {e}")
# 导航到该点位的坐标
if opt_agv_move:
# 找该点位的任意有效坐标(正面/背面坐标相同)
pos = MissionExecutorV3._find_any_position(positions, pr, c)
if pos and MissionExecutorV3._has_coords(pos):
if not self._navigate(pos, f"点位({pr},{c})"):
self._log(f"⚠️ 导航失败,尝试继续")
choice = self._wait_error(f"点位({pr},{c})导航失败")
if choice == "abort":
break
else:
self._log(f"⚠️ 点位({pr},{c})无有效坐标")
else:
self._log(" ⏭️ 跳过AGV移动")
# --- 正面操作(机器 pr,c 的正面) ---
qr_value = None
if has_front and not self._stop.is_set():
self._wait_pause()
if opt_qr_scan:
qr_value = self._scan_qr_with_poses(qr_configs)
if self._stop.is_set():
break
else:
self._log(" ⏭️ 跳过二维码识别(正面)")
qr_cache[(pr, c)] = qr_value
task = self._get_task(pr, c)
if task and qr_value:
task["qr_value"] = qr_value
if task:
task["step"] = "正面拍照"
model_name = self._lookup_model(qr_value)
self._log(f" 🏷️ 机型: {model_name}")
if opt_front_photo and not self._stop.is_set():
model = self._find_model(models, model_name)
if model:
self._shoot(model, "front", rl_front, cl_front, qr_value or "unknown")
else:
self._log(f" ⚠️ 未找到机型 {model_name}")
else:
self._log(" ⏭️ 跳过正面拍照")
completed_actions += 1
# --- 背面操作(机器 pr-1,c 的背面) ---
if has_back and not self._stop.is_set():
self._wait_pause()
back_qr = qr_cache.get((pr - 1, c), "unknown")
task = self._get_task(pr - 1, c)
if task:
task["step"] = "背面拍照"
model_name = self._lookup_model(back_qr)
self._log(f" 🏷️ 机型(背面): {model_name}")
if opt_back_photo and not self._stop.is_set():
model = self._find_model(models, model_name)
if model:
self._shoot(model, "back", rl_back, cl_back, back_qr)
else:
self._log(f" ⚠️ 未找到机型 {model_name}")
else:
self._log(" ⏭️ 跳过背面拍照")
completed_actions += 1
if task:
task["status"] = "completed"
task["step"] = "完成"
# 更新进度
if max_actions:
self.report["progress"] = min(int(completed_actions / max_actions * 100), 99)
# 单步执行
if single_step and not self._stop.is_set():
choice = self._wait_step_confirm(
rl_front if has_front else rl_back,
cl_front if has_front else cl_back
)
if choice == "abort":
break
elif choice == "retry":
if has_front:
task = self._get_task(pr, c)
if task:
task["status"] = "pending"
task["step"] = "重试开始"
completed_actions = max(0, completed_actions - 2)
continue
pi += 1
# 3. 回到出发点
if not self._stop.is_set() and opt_agv_move:
self._step("返回出发点")
self._log("→ 返回 (0, 0)")
self._nav2_go_to_point(0, 0, 0, timeout_sec=60)
elif not self._stop.is_set():
self._log("⏭️ 跳过返回出发点")
elapsed = time.time() - start_time
return self._finish(elapsed)
except Exception as e:
self._log(f"❌ 任务异常: {e}")
logger.exception("execute_mission 崩溃")
self.report["error"] = str(e)
self.status = MissionStatus.IDLE
self.report["status"] = "idle"
return self.report
def _finish(self, elapsed: float) -> dict:
if self._stop.is_set():
self._step("已停止")
else:
self._step("完成")
self._log(f"✅ 任务完成 ({elapsed:.0f}s)")
self.report["progress"] = 100
self.status = MissionStatus.IDLE
self.report["status"] = "idle"
return self.report
# ==================== 蛇形路径 ====================
@staticmethod
def _build_snake_path(rows: int, cols: int, grid: list) -> list:
"""生成点位级蛇形路径:遍历点位行 0→rows,奇数行左→右,偶数行右→左
每个点位 (pr, c) 同时服务:
- 正面:机器 (pr, c)(如果 pr < rows 且 grid[pr][c] 为真)
- 背面:机器 (pr-1, c)(如果 pr > 0 且 grid[pr-1][c] 为真)
按此路径走完所有点位,是最短的蛇形走位,不再反复横跳。
"""
path = []
for pr in range(rows + 1): # 点位行 = 机器行 + 1
if pr % 2 == 0:
for c in range(cols):
path.append((pr, c))
else:
for c in range(cols - 1, -1, -1):
path.append((pr, c))
return path
@staticmethod
def _has_machine(grid: list, r: int, c: int) -> bool:
if not grid or r >= len(grid):
return False
row = grid[r]
if isinstance(row, list):
return c < len(row) and bool(row[c])
return False
@staticmethod
def _build_grid_from_machines(rows: int, cols: int, machines: list) -> list:
"""从机器列表重建 grid 矩阵"""
if not machines:
return [[False] * cols for _ in range(rows)]
max_r = max(int(m.get("row", 0)) for m in machines) + 1
max_c = max(int(m.get("col", 0)) for m in machines) + 1
gr = max(rows, max_r)
gc = max(cols, max_c)
grid = [[False] * gc for _ in range(gr)]
for m in machines:
r = int(m.get("row", 0))
c = int(m.get("col", 0))
if 0 <= r < gr and 0 <= c < gc:
grid[r][c] = True
return grid
@staticmethod
def pre_generate_tasks(mission_config: dict) -> list:
"""从网格配置预生成任务列表(用于 UI 展示,无需启动执行器)"""
rows = int(mission_config.get("rows", 1))
cols = int(mission_config.get("cols", 1))
grid = mission_config.get("grid", [])
# 如果 grid 为空但从 machines 重建
if not grid and machines:
grid = MissionExecutorV3._build_grid_from_machines(rows, cols, machines)
if grid:
rows = len(grid)
cols = len(grid[0]) if grid else cols
path = MissionExecutorV3._build_snake_path(rows, cols, grid)
tasks = []
for (r, c) in path:
tasks.append({
"row": r, "col": c,
"machine_id": f"m_{r}_{c}",
"label": f"{r+1}-{c+1}",
"status": "pending",
"step": "等待",
"qr_value": None,
"photos_front": 0,
"photos_back": 0,
})
return tasks
# ==================== 点位查找 ====================
@staticmethod
def _find_any_position(positions: list, row: int, col: int) -> Optional[dict]:
"""查找点位的任意有效坐标(正面/背面坐标相同,取第一个有坐标的)"""
for side in ("front", "back"):
p = MissionExecutorV3._find_point(positions, row, col, side)
if p and MissionExecutorV3._has_coords(p):
return p
return None
@staticmethod
def _find_point(positions: list, row: int, col: int, side: str) -> Optional[dict]:
for p in positions:
if p.get("row") == row and p.get("col") == col and p.get("side") == side:
return p
return None
@staticmethod
def _has_coords(point: dict) -> bool:
coords = point.get("coords", [])
return len(coords) >= 2 and (coords[0] != 0 or coords[1] != 0)
# ==================== 导航 ====================
def _navigate(self, point: dict, label: str) -> bool:
coords = point["coords"]
x, y = float(coords[0]), float(coords[1])
yaw = float(coords[2]) if len(coords) >= 3 else 0.0
self._log(f" 🧭 导航到{label}点位 ({x:.2f}, {y:.2f}, yaw={math.degrees(yaw):.0f}°)")
return self._nav2_go_to_point(x, y, yaw)
# ==================== 二维码扫描 ====================
def _wait_arm_ready(self, target_angles: list, timeout: float = 10.0, tolerance: float = 3.0) -> bool:
"""等待机械臂稳定到目标角度(±tolerance 度),超时返回 False"""
if not self.arm_client:
return True
deadline = time.time() + timeout
while time.time() < deadline:
try:
ok, current_rad = self.arm_client.get_angles()
if ok and current_rad and len(current_rad) >= 6:
# get_angles() 返回弧度,target_angles 是角度,需转换后比较
current_deg = [math.degrees(a) for a in current_rad]
if all(abs(current_deg[i] - target_angles[i]) < tolerance for i in range(6)):
return True
except Exception:
pass
time.sleep(0.5)
self._log(f" ⚠️ 机械臂稳定等待超时 (target={target_angles})")
return False
def _scan_qr_with_poses(self, qr_configs: list) -> Optional[str]:
"""用二维码配置中的姿态依次尝试,逐一调整姿态+等2秒+扫码,全部失败才弹框"""
if not qr_configs:
self._log(f" ⚠️ 无二维码配置")
return self._request_manual_qr()
self._log(f" 🔍 尝试 {len(qr_configs)} 个二维码姿态...")
for i, qc in enumerate(qr_configs):
if self._stop.is_set():
return None
self._wait_pause()
angles = qc.get("joint_angles", [])
if not angles or len(angles) < 6:
continue
name = qc.get("name", f"姿态{i+1}")
self._log(f" [{i+1}/{len(qr_configs)}] {name}")
# 调整机械臂姿态
if self.arm_client:
self.arm_client.set_angles(angles, speed=500)
self._wait_arm_ready(angles)
# 读取摄像头并扫码
qr = self._decode_qr_from_arm()
if qr:
self._log(f" ✅ 识别成功: {qr}")
return qr
self._log(f"{name} 未识别到二维码")
self._log(f" ⚠️ 全部 {len(qr_configs)} 个姿态均未识别到二维码")
return self._request_manual_qr()
def _decode_qr_from_arm(self) -> Optional[str]:
"""从机械臂摄像头取一帧,识别二维码"""
for attempt in range(3):
try:
resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=QR_SCAN_TIMEOUT)
if resp.status_code != 200 or not resp.content:
continue
arr = np.frombuffer(resp.content, dtype=np.uint8)
frame = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if frame is None:
continue
detector = cv2.QRCodeDetector()
data, bbox, _ = detector.detectAndDecode(frame)
if data and len(data) > 0:
return data
except Exception:
pass
time.sleep(0.5)
return None
def _request_manual_qr(self) -> Optional[str]:
"""暂停任务,等待手动输入"""
self.status = MissionStatus.WAITING_QR
self.report["status"] = "waiting_qr"
self.report["step"] = "等待手动输入二维码"
self._log(" ⌨️ 弹窗等待手动输入二维码...")
self._qr_event.clear()
if self._qr_event.wait(timeout=MANUAL_QR_TIMEOUT):
self.status = MissionStatus.RUNNING
self.report["status"] = "running"
self._log(f" ✏️ 手动输入: {self._qr_value}")
return self._qr_value
else:
self.status = MissionStatus.RUNNING
self.report["status"] = "running"
self._log(f" ⚠️ 等待超时({MANUAL_QR_TIMEOUT}s),跳过")
return None
def set_manual_qr(self, value: str):
self._qr_value = value.strip()
self._qr_event.set()
# ==================== 机型查询 ====================
def _lookup_model(self, qr_value: Optional[str]) -> str:
"""TODO: 后续通过 HTTP 接口查询机型"""
return "机器1"
@staticmethod
def _find_model(models: list, name: str) -> Optional[dict]:
"""在机型列表中找到匹配的机型"""
for m in models:
if m.get("name") == name or m.get("id") == name:
return m
# fallback: 第一个机型
return models[0] if models else None
# ==================== 姿态拍照 ====================
def _shoot(self, model: dict, side: str, row: int, col: int, qr_value: str):
"""按机型配置的所有姿态依次拍照"""
# 更新任务照片计数
task = self._get_task(row - 1, col - 1)
side_label = "正面" if side == "front" else "背面"
poses = model.get("poses", []) if side == "front" else model.get("poses_back", [])
if not poses:
self._log(f" ⚠️ 机型无{side_label}姿态配置")
return
self._log(f" 📷 {side_label}拍照 ({len(poses)} 个姿态)")
for pi, pose in enumerate(poses):
if self._stop.is_set():
break
self._wait_pause()
angles = pose.get("arm_angles", [])
if not angles or len(angles) < 6:
self._log(f" 跳过 {pose.get('name', f'姿态{pi+1}')}: 无效角度")
continue
name = pose.get("name", f"{side_label}-{pi+1}")
self._log(f" 🎯 {name}")
# 调整机械臂
if self.arm_client:
self.arm_client.set_angles(angles, speed=500)
self._wait_arm_ready(angles)
# 拍照
path = self._capture_arm_photo(row, col, side, pi + 1, qr_value)
if path:
self._log(f" 💾 {os.path.basename(path)}")
def _capture_arm_photo(self, row: int, col: int, side: str,
pose_idx: int, qr_value: str) -> Optional[str]:
"""从机械臂摄像头拍照存本地"""
try:
resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=10)
if resp.status_code != 200 or not resp.content:
logger.error("arm snapshot 请求失败")
return None
os.makedirs(PHOTOS_DIR, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
fname = f"{ts}_r{row}c{col}_{side}_p{pose_idx}_{qr_value[:20]}.jpg"
fpath = os.path.join(PHOTOS_DIR, fname)
with open(fpath, "wb") as f:
f.write(resp.content)
return fpath
except Exception as e:
logger.error(f"拍照异常: {e}")
return None
# ==================== 控制 ====================
def _wait_pause(self):
"""等待暂停状态解除"""
self._pause.wait()
def pause(self):
self._pause.clear()
self.status = MissionStatus.PAUSED
self.report["status"] = "paused"
self.report["step"] = "已暂停"
self._log("⏸️ 任务已暂停")
def resume(self):
self._pause.set()
self.status = MissionStatus.RUNNING
self.report["status"] = "running"
self._log("▶️ 任务已恢复")
def stop(self):
self._stop.set()
self._pause.set() # 解除暂停
self._qr_event.set() # 解除 QR 等待
if self.arm_client:
try:
self.arm_client.task_stop()
except Exception:
pass
self.agv.stop()
self.status = MissionStatus.IDLE
self.report["status"] = "idle"
def get_status(self) -> dict:
return {
"status": self.report["status"],
"step": self.report["step"],
"progress": self.report["progress"],
"total": self.report["total"],
"tasks": self.report.get("tasks", []),
}
def get_logs(self) -> dict:
"""返回实时日志和完整状态"""
return self.report
# ==================== 状态报告 ====================
def _log(self, msg: str):
self.report["log"].append(msg)
# Keep last 500 entries
if len(self.report["log"]) > 500:
self.report["log"] = self.report["log"][-500:]
logger.info(msg)
def _step(self, text: str):
self.report["step"] = text
def _get_task(self, row: int, col: int) -> Optional[dict]:
"""获取指定行列的任务记录"""
for t in self.report.get("tasks", []):
if t["row"] == row and t["col"] == col:
return t
return None
def _progress(self, machine_idx: int, side_code: int):
"""side_code: 1=正面完成, 2=背面完成"""
if self.report["total"]:
self.report["progress"] = min(
int((machine_idx * 2 + side_code) / (self.report["total"] * 2) * 100),
99
)
# ==================== 错误弹窗 ====================
def _wait_error(self, msg: str) -> str:
"""阻塞等待用户选择:skip(跳过)或 abort(中断)"""
self.status = MissionStatus.WAITING_ERROR
self.report["status"] = "waiting_error"
self.report["step"] = msg
self.report["error"] = msg
self._log(f"⚠️ 错误处理: {msg}")
self._error_choice = None
self._error_mode = True
start = time.time()
while self._error_choice is None and not self._stop.is_set():
time.sleep(0.2)
if time.time() - start > 600: # 10分钟超时 → 跳过
self._error_choice = "skip"
choice = self._error_choice or "skip"
self._error_choice = None
self._error_mode = False
if choice == "abort":
self._stop.set()
self._log("⏹️ 用户选择中断")
else:
self._log("⏭️ 用户选择跳过")
# 恢复状态
self.status = MissionStatus.RUNNING if not self._single_step_mode else MissionStatus.WAITING_STEP
self.report["status"] = self.status.value
self.report["error"] = None
return choice
def set_error_choice(self, choice: str):
"""外部 API 设置错误处理选择"""
self._error_choice = choice
# ==================== 单步执行 ====================
def _wait_step_confirm(self, row_label: int, col_label: int) -> str:
"""单步执行:等待用户确认/重试/中断"""
self.status = MissionStatus.WAITING_STEP
self.report["status"] = "waiting_step"
self.report["step"] = f"机器 {row_label}-{col_label} 完成,等待确认"
self.report["current_step"] = {
"row": row_label - 1, "col": col_label - 1,
"label": f"{row_label}-{col_label}"
}
self._log(f"⏸️ 单步执行: 机器 {row_label}-{col_label} 完成,等待确认...")
self._step_choice = None
start = time.time()
while self._step_choice is None and not self._stop.is_set():
time.sleep(0.2)
if time.time() - start > 600: # 10分钟超时 → 确认
self._step_choice = "confirm"
choice = self._step_choice or "confirm"
self._step_choice = None
self.report.pop("current_step", None)
if choice == "abort":
self._stop.set()
self._log("⏹️ 用户选择中断")
elif choice == "retry":
self._log(f"🔄 用户选择重试机器 {row_label}-{col_label}")
else:
self._log(f"✅ 用户确认机器 {row_label}-{col_label}")
return choice
def set_step_choice(self, choice: str):
"""外部 API 设置单步执行选择"""
self._step_choice = choice
# ==================== Nav2 导航 ====================
# (保留原实现)
def _nav2_check_available(self) -> bool:
try:
rc, out, err = self._run_ros2_cmd("ros2 action list")
if rc != 0:
return False
return "/navigate_to_pose" in out
except:
return False
def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0,
timeout_sec: float = 120.0) -> bool:
"""使用 Nav2Navigator 直接发送导航目标(blocking 模式,等待完成)"""
try:
logger.info(f"🧭 导航到目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
ok = self._nav.navigate_to_pose(x, y, yaw, timeout_sec=timeout_sec, blocking=True)
if ok:
logger.info(f"✅ 导航成功到达 ({x:.3f}, {y:.3f})")
else:
logger.warning(f"⚠️ 导航失败或超时 ({x:.3f}, {y:.3f})")
return ok
except Exception as e:
logger.error(f"Nav2 异常: {e}")
return False
def _nav2_cancel(self):
cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || true'"
try:
subprocess.run(cancel_cmd, shell=True, timeout=3)
except:
pass
def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple:
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'"
try:
result = subprocess.run(
full_cmd, shell=True,
capture_output=True, text=True, timeout=timeout
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
except Exception as e:
return -1, "", str(e)