# -*- coding: utf-8 -*- """ 任务执行器 V3 — M×N 网格蛇形路径拍摄 工作流: 1. 根据 grid 生成蛇形路径(奇数行左→右,偶数行右→左) 2. 逐台机器: - 正面:导航 → 扫码(多姿态重试) → 查机型 → 按姿态拍照 - 背面:导航 → 按姿态拍照 3. 回到 (0,0) """ import os import time import json import logging import threading import subprocess import math from typing import Optional, Dict, List from enum import Enum import requests import cv2 import numpy as np from utils.nav2_navigator import Nav2Navigator, Nav2Status logger = logging.getLogger(__name__) ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" from config import ARM_CAMERA_CONFIG ARM_CAMERA_SNAPSHOT = ARM_CAMERA_CONFIG["snapshot_url"] PHOTOS_DIR = "/home/elephant/photos" UPLOAD_URL = "https://ts.zhijian168.com/prod-api/file/uploadImage" # 二维码扫描重试参数 QR_SCAN_TIMEOUT = 5 # 单次扫描超时 QR_POSE_WAIT = 1.5 # 调整姿态后等待时间 MANUAL_QR_TIMEOUT = 300 # 5分钟超时(现已改为无限等待,stop 时解除) class MissionStatus(str, Enum): IDLE = "idle" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" WAITING_QR = "waiting_qr" WAITING_ERROR = "waiting_error" WAITING_STEP = "waiting_step" class MissionExecutorV3: """任务执行器 V3 — M×N 网格蛇形路径""" _instance = None # 单例,供外部停止用 def __init__(self, config: dict): self.config = config self.status = MissionStatus.IDLE MissionExecutorV3._instance = self # 实时状态报告 self.report = { "status": "idle", "step": "", "progress": 0, "total": 0, "log": [], "error": None, } # 线程同步 self._stop = threading.Event() self._pause = threading.Event() self._pause.set() # 初始不暂停 self._qr_event = threading.Event() self._qr_value: Optional[str] = None # 错误弹窗 self._error_choice = None # "skip" or "abort" # 单步执行 self._single_step_mode = False self._step_choice = None # "confirm", "retry", "abort" self._error_mode = False # True when waiting for error resolution # 错误弹窗 self._error_choice = None # "skip" or "abort" # 单步执行 self._single_step_mode = False self._step_choice = None # "confirm", "retry", "abort" self._error_mode = False # True when waiting for error resolution # 设备 from .arm_client import ArmClient self.arm_client = ArmClient( config["arm"]["host"], config["arm"]["port"] ) from .agv_controller_ros2 import AGVController self.agv = AGVController( device=config.get("device", "/dev/agvpro_controller"), baudrate=config.get("baudrate", 1000000) ) # Nav2 导航器(直接使用 rclpy BasicNavigator API,比 subprocess 更可靠) self._nav = Nav2Navigator() # 速度控制(默认值,可在 execute_mission 时覆写) self.arm_speed = 500 self.agv_speed = 0.5 # 照片上传序号计数器(连续递增,从1开始) self.next_upload_index = 1 # ==================== 连接 ==================== def connect_all(self) -> Dict[str, bool]: results = {} results["agv"] = self.agv.connect() results["arm"] = self.arm_client.connect() return results def disconnect_all(self): if self.arm_client: self.arm_client.close() self.agv.disconnect() # ==================== 主任务流程 ==================== def execute_mission( self, mission_config: dict, machines: list, qr_configs: list, models: list, single_step: bool = False, options: dict = None, ) -> dict: """ 执行完整拍摄任务。 Args: options: 任务步骤控制开关 - arm_init: 是否执行机械臂位置初始化 - agv_move: 是否执行AGV移动 - qr_scan: 是否执行二维码识别 - front_photo: 是否执行正面拍照 - back_photo: 是否执行背面拍照 """ """ 执行完整拍摄任务。 Args: mission_config: {rows, cols, grid, positions} machines: [{id, row, col, front: {coords}, back: {coords}}] qr_configs: [{id, name, joint_angles: [a1..a6]}] models: [{id, name, poses: [{name, arm_angles}], poses_back: [...]}] Returns: 执行报告 dict """ self.status = MissionStatus.RUNNING self.report = {"status": "running", "step": "初始化", "progress": 0, "total": 0, "log": [], "error": None} self._stop.clear() self._pause.set() start_time = time.time() try: rows = int(mission_config.get("rows", 1)) cols = int(mission_config.get("cols", 1)) grid = mission_config.get("grid", []) positions = mission_config.get("positions", []) # 如果 grid 为空,从 machines 重建 if not grid or all(not any(rw) if isinstance(rw, list) else True for rw in grid): try: cfg_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data") with open(os.path.join(cfg_dir, "machines_config.json")) as jf: machines = json.load(jf) grid = [[False] * cols for _ in range(rows)] for m in machines: r = int(m.get("row", 0)) c = int(m.get("col", 0)) if 0 <= r < rows and 0 <= c < cols: grid[r][c] = True except Exception: grid = [[False] * cols for _ in range(rows)] # 1. 生成蛇形路径 path = MissionExecutorV3._build_snake_path(rows, cols, grid) if not path: self._log("❌ 网格中没有点位,任务终止") self.report["error"] = "No points in grid" return self._finish(0) # 统计总机器数(用于进度计算) total_machines = 0 for rw in grid: for v in rw: if v: total_machines += 1 self.report["total"] = total_machines # 扫码结果缓存:正面扫到的 QR 传给背面 qr_cache = {} # 初始化任务列表(机器级别) self.report["tasks"] = [] for r in range(rows): for c in range(cols): if MissionExecutorV3._has_machine(grid, r, c): self.report["tasks"].append({ "row": r, "col": c, "machine_id": f"m_{r}_{c}", "label": f"{r+1}-{c+1}", "status": "pending", "step": "等待", "qr_value": None, "photos_front": 0, "photos_back": 0, }) # 初始化点位状态和机器状态(实时推送给前端) self.report["point_status"] = {} for _pr in range(rows + 1): for _c in range(cols): pk = f"{_pr}_{_c}" has_f = _pr < rows and MissionExecutorV3._has_machine(grid, _pr, _c) has_b = _pr > 0 and MissionExecutorV3._has_machine(grid, _pr - 1, _c) self.report["point_status"][pk] = "pending" if (has_f or has_b) else "skipped" self.report["machine_status"] = {} for _r in range(rows): for _c in range(cols): if MissionExecutorV3._has_machine(grid, _r, _c): self.report["machine_status"][f"{_r}_{_c}"] = { "has_machine": True, "qr": "pending", "qr_val": None, "front": "pending", "front_cnt": 0, "back": "pending", "back_cnt": 0, "status": "pending", "step": "等待", } self._log(f"📍 点位蛇形路径: {len(path)} 个点位, {total_machines} 台机器") # 重置照片上传序号(每次任务开始时重置,从1开始) self.next_upload_index = 1 # 任务步骤控制开关 if options is None: options = {} opt_arm_init = options.get("arm_init", True) opt_agv_move = options.get("agv_move", True) if opt_agv_move: opt_arm_init = True opt_qr_scan = options.get("qr_scan", True) opt_front_photo = options.get("front_photo", True) opt_back_photo = options.get("back_photo", True) self._log(f"⚙️ 任务步骤: agv_move={opt_agv_move}, " f"qr_scan={opt_qr_scan}, front={opt_front_photo}, back={opt_back_photo}") # 机械臂初始姿态(AGV 移动前恢复) arm_initial_pose = mission_config.get("arm_initial_pose", [0.0] * 6) has_arm_pose = self.arm_client and any(abs(a) > 0.01 for a in arm_initial_pose) # 速度控制(从前端传入) self.arm_speed = int(options.get("arm_speed", 500)) self.agv_speed = float(options.get("agv_speed", 0.5)) self._log(f"🚀 AGV速度={self.agv_speed:.1f}m/s, 机械臂速度={self.arm_speed}") # 设置 Nav2 导航速度(仅在任务开始时设一次) if opt_agv_move: self._set_nav_speed() # 进度统计 max_actions = total_machines * 2 # 每台机器正面+背面 completed_actions = 0 # 2. 逐点位蛇形执行 pi = 0 while pi < len(path): if self._stop.is_set(): self._log("⏹️ 任务已停止") break self._wait_pause() pr, c = path[pi] # pr = 点位行, c = 列 # 判断该点位需要做什么 has_front = pr < rows and MissionExecutorV3._has_machine(grid, pr, c) has_back = pr > 0 and MissionExecutorV3._has_machine(grid, pr - 1, c) if not has_front and not has_back: self._log(f"📍 点位 ({pr},{c}) → 空位") pi += 1 continue # 日志 & 步骤更新 rl_front = pr + 1 if has_front else 0 cl_front = c + 1 if has_front else 0 rl_back = pr if has_back else 0 cl_back = c + 1 if has_back else 0 log_parts = [] if has_front: log_parts.append(f"正面:机器{rl_front}-{cl_front}") task = self._get_task(pr, c) if task: task["status"] = "active" task["step"] = "正面扫码" if has_back: log_parts.append(f"背面:机器{rl_back}-{cl_back}") task = self._get_task(pr - 1, c) if task: task["step"] = "背面拍照" self._log(f"📍 点位 ({pr},{c}) → {' & '.join(log_parts)}") self._step(f"点位({pr},{c})") # 恢复机械臂初始姿态(AGV 移动前) if opt_arm_init and has_arm_pose and opt_agv_move: self._log(" 🦾 恢复机械臂初始姿态") try: self.arm_client.set_angles(arm_initial_pose, speed=self.arm_speed) self._wait_arm_ready(arm_initial_pose) except Exception as e: self._log(f" ⚠️ 机械臂初始化失败: {e}") # 更新点位状态:开始导航 pk = f"{pr}_{c}" if pk in self.report.get("point_status", {}): self.report["point_status"][pk] = "active" # 导航到该点位的坐标 nav_ok = False if opt_agv_move: # 找该点位的任意有效坐标(正面/背面坐标相同) pos = MissionExecutorV3._find_any_position(positions, pr, c) if pos and MissionExecutorV3._has_coords(pos): nav_ok = self._navigate(pos, f"点位({pr},{c})") if not nav_ok: self._log(f"⚠️ 导航失败,尝试继续") choice = self._wait_error(f"点位({pr},{c})导航失败") if choice == "abort": break else: self._log(f"⚠️ 点位({pr},{c})无有效坐标") else: self._log(" ⏭️ 跳过AGV移动") nav_ok = True # 无移动视为到达 # 更新点位状态:到达 if pk in self.report.get("point_status", {}): self.report["point_status"][pk] = "done" # --- 正面操作(机器 pr,c 的正面) --- qr_value = None if has_front and not self._stop.is_set(): self._wait_pause() # 更新机器状态:正面开始 mk = f"{pr}_{c}" if mk in self.report.get("machine_status", {}): self.report["machine_status"][mk]["status"] = "active" self.report["machine_status"][mk]["step"] = "正面扫码" if opt_qr_scan: qr_value = self._scan_qr_with_poses(qr_configs, machine_row=pr) if self._stop.is_set(): break else: self._log(" ⏭️ 跳过二维码识别(正面)") qr_cache[(pr, c)] = qr_value # 更新机器状态:扫码完成 mk2 = f"{pr}_{c}" if mk2 in self.report.get("machine_status", {}): self.report["machine_status"][mk2]["qr"] = "done" if qr_value else "skipped" self.report["machine_status"][mk2]["qr_val"] = qr_value self.report["machine_status"][mk2]["step"] = "正面拍照" task = self._get_task(pr, c) if task and qr_value: task["qr_value"] = qr_value if task: task["step"] = "正面拍照" model_name = self._lookup_model(qr_value) self._log(f" 🏷️ 机型: {model_name}") if opt_front_photo and not self._stop.is_set(): model = self._find_model(models, model_name) if model: self._shoot(model, "front", rl_front, cl_front, qr_value or "unknown", pr) else: self._log(f" ⚠️ 未找到机型 {model_name}") else: self._log(" ⏭️ 跳过正面拍照") completed_actions += 1 # 更新机器状态:正面拍照完成 mk3 = f"{pr}_{c}" if mk3 in self.report.get("machine_status", {}): self.report["machine_status"][mk3]["front"] = "done" if opt_front_photo else "skipped" self.report["machine_status"][mk3]["front_cnt"] = self.report["machine_status"][mk3].get("front_cnt", 0) + 1 # --- 背面操作(机器 pr-1,c 的背面) --- if has_back and not self._stop.is_set(): self._wait_pause() back_qr = qr_cache.get((pr - 1, c), "unknown") # 更新机器状态:背面开始 mk_b = f"{pr-1}_{c}" if mk_b in self.report.get("machine_status", {}): self.report["machine_status"][mk_b]["step"] = "背面拍照" task = self._get_task(pr - 1, c) if task: task["step"] = "背面拍照" model_name = self._lookup_model(back_qr) self._log(f" 🏷️ 机型(背面): {model_name}") if opt_back_photo and not self._stop.is_set(): model = self._find_model(models, model_name) if model: self._shoot(model, "back", rl_back, cl_back, back_qr, pr - 1) else: self._log(f" ⚠️ 未找到机型 {model_name}") else: self._log(" ⏭️ 跳过背面拍照") completed_actions += 1 # 更新机器状态:背面完成 mk_b2 = f"{pr-1}_{c}" if mk_b2 in self.report.get("machine_status", {}): self.report["machine_status"][mk_b2]["back"] = "done" if opt_back_photo else "skipped" self.report["machine_status"][mk_b2]["back_cnt"] = self.report["machine_status"][mk_b2].get("back_cnt", 0) + 1 self.report["machine_status"][mk_b2]["status"] = "completed" self.report["machine_status"][mk_b2]["step"] = "完成" if task: task["status"] = "completed" task["step"] = "完成" # 更新进度 if max_actions: self.report["progress"] = min(int(completed_actions / max_actions * 100), 99) # 单步执行 if single_step and not self._stop.is_set(): choice = self._wait_step_confirm( rl_front if has_front else rl_back, cl_front if has_front else cl_back ) if choice == "abort": break elif choice == "retry": if has_front: task = self._get_task(pr, c) if task: task["status"] = "pending" task["step"] = "重试开始" completed_actions = max(0, completed_actions - 2) continue pi += 1 # 3. 回到出发点 if not self._stop.is_set() and opt_agv_move: self._step("返回出发点") self._log("→ 返回 (0, 0)") self._nav2_go_to_point(0, 0, 0, timeout_sec=60) elif not self._stop.is_set(): self._log("⏭️ 跳过返回出发点") elapsed = time.time() - start_time return self._finish(elapsed) except Exception as e: self._log(f"❌ 任务异常: {e}") logger.exception("execute_mission 崩溃") self.report["error"] = str(e) self.status = MissionStatus.IDLE self.report["status"] = "idle" return self.report def _finish(self, elapsed: float) -> dict: if self._stop.is_set(): self._step("已停止") else: self._step("完成") self._log(f"✅ 任务完成 ({elapsed:.0f}s)") self.report["progress"] = 100 self.status = MissionStatus.IDLE self.report["status"] = "idle" return self.report # ==================== 蛇形路径 ==================== @staticmethod def _build_snake_path(rows: int, cols: int, grid: list) -> list: """生成点位级蛇形路径:遍历点位行 0→rows,奇数行左→右,偶数行右→左 每个点位 (pr, c) 同时服务: - 正面:机器 (pr, c)(如果 pr < rows 且 grid[pr][c] 为真) - 背面:机器 (pr-1, c)(如果 pr > 0 且 grid[pr-1][c] 为真) 按此路径走完所有点位,是最短的蛇形走位,不再反复横跳。 """ path = [] for pr in range(rows + 1): # 点位行 = 机器行 + 1 if pr % 2 == 0: for c in range(cols): path.append((pr, c)) else: for c in range(cols - 1, -1, -1): path.append((pr, c)) return path @staticmethod def _has_machine(grid: list, r: int, c: int) -> bool: if not grid or r >= len(grid): return False row = grid[r] if isinstance(row, list): return c < len(row) and bool(row[c]) return False @staticmethod def _build_grid_from_machines(rows: int, cols: int, machines: list) -> list: """从机器列表重建 grid 矩阵""" if not machines: return [[False] * cols for _ in range(rows)] max_r = max(int(m.get("row", 0)) for m in machines) + 1 max_c = max(int(m.get("col", 0)) for m in machines) + 1 gr = max(rows, max_r) gc = max(cols, max_c) grid = [[False] * gc for _ in range(gr)] for m in machines: r = int(m.get("row", 0)) c = int(m.get("col", 0)) if 0 <= r < gr and 0 <= c < gc: grid[r][c] = True return grid @staticmethod def pre_generate_tasks(mission_config: dict) -> list: """从网格配置预生成任务列表(用于 UI 展示,无需启动执行器)""" rows = int(mission_config.get("rows", 1)) cols = int(mission_config.get("cols", 1)) grid = mission_config.get("grid", []) # 如果 grid 为空但从 machines 重建 if not grid and machines: grid = MissionExecutorV3._build_grid_from_machines(rows, cols, machines) if grid: rows = len(grid) cols = len(grid[0]) if grid else cols path = MissionExecutorV3._build_snake_path(rows, cols, grid) tasks = [] for (r, c) in path: tasks.append({ "row": r, "col": c, "machine_id": f"m_{r}_{c}", "label": f"{r+1}-{c+1}", "status": "pending", "step": "等待", "qr_value": None, "photos_front": 0, "photos_back": 0, }) return tasks # ==================== 点位查找 ==================== @staticmethod def _find_any_position(positions: list, row: int, col: int) -> Optional[dict]: """查找点位的任意有效坐标(正面/背面坐标相同,取第一个有坐标的)""" for side in ("front", "back"): p = MissionExecutorV3._find_point(positions, row, col, side) if p and MissionExecutorV3._has_coords(p): return p return None @staticmethod def _find_point(positions: list, row: int, col: int, side: str) -> Optional[dict]: for p in positions: if p.get("row") == row and p.get("col") == col and p.get("side") == side: return p return None @staticmethod def _has_coords(point: dict) -> bool: coords = point.get("coords", []) return len(coords) >= 2 and (coords[0] != 0 or coords[1] != 0) # ==================== 导航 ==================== def _set_nav_speed(self) -> bool: """动态设置 Nav2 控制器最大速度参数""" try: # 尝试设置 controller_server 的线速度参数 vel = self.agv_speed cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 param set /controller_server FollowPath.desired_linear_vel {vel:.2f} 2>/dev/null || true'" subprocess.run(cmd, shell=True, timeout=5, capture_output=True) self._log(f" 🚀 AGV 速度设为 {vel:.1f} m/s") return True except Exception as e: logger.warning(f"设置 AGV 速度失败: {e}") return False def _navigate(self, point: dict, label: str) -> bool: coords = point["coords"] x, y = float(coords[0]), float(coords[1]) yaw = float(coords[2]) if len(coords) >= 3 else 0.0 self._log(f" 🧭 导航到{label}点位 ({x:.2f}, {y:.2f}, yaw={math.degrees(yaw):.0f}°)") return self._nav2_go_to_point(x, y, yaw) # ==================== 二维码扫描 ==================== def _wait_arm_ready(self, target_angles: list, timeout: float = 15.0, tolerance: float = 2.0) -> bool: """等待机械臂稳定到目标角度(±tolerance 度),超时返回 False""" if not self.arm_client: return True deadline = time.time() + timeout while time.time() < deadline: try: ok, current = self.arm_client.get_angles() if ok and current and len(current) >= 6: # get_angles() 返回角度(度),与 target_angles 直接比较 if all(abs(current[i] - target_angles[i]) < tolerance for i in range(6)): return True except Exception: pass time.sleep(0.5) self._log(f" ⚠️ 机械臂稳定等待超时 (target={target_angles})") return False def _scan_qr_with_poses(self, qr_configs: list, machine_row: int = 0) -> Optional[str]: """用二维码配置中的姿态依次尝试,逐一调整姿态+等2秒+扫码,全部失败才弹框 machine_row: 机器所在行,奇数行(1,3,5)时关节角度需取反 """ if not qr_configs: self._log(f" ⚠️ 无二维码配置") return self._request_manual_qr() # 判断是否需要反转关节角度 invert = (machine_row % 2 == 1) if invert: self._log(f" 🔄 扫码:AGV朝向相反,关节角度取反") self._log(f" 🔍 尝试 {len(qr_configs)} 个二维码姿态...") for i, qc in enumerate(qr_configs): if self._stop.is_set(): return None self._wait_pause() angles = qc.get("joint_angles", []) if not angles or len(angles) < 6: continue # 需要时反转关节角度 if invert: angles = [-a for a in angles] name = qc.get("name", f"姿态{i+1}") self._log(f" [{i+1}/{len(qr_configs)}] {name}{' (反转)' if invert else ''}") # 调整机械臂姿态 if self.arm_client: self.arm_client.set_angles(angles, speed=self.arm_speed) self._wait_arm_ready(angles) # 读取摄像头并扫码 qr = self._decode_qr_from_arm() if qr: self._log(f" ✅ 识别成功: {qr}") return qr self._log(f" ❌ {name} 未识别到二维码") self._log(f" ⚠️ 全部 {len(qr_configs)} 个姿态均未识别到二维码") return self._request_manual_qr() def _decode_qr_from_arm(self) -> Optional[str]: """从机械臂摄像头取一帧,识别二维码""" for attempt in range(3): try: resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=QR_SCAN_TIMEOUT) if resp.status_code != 200 or not resp.content: continue arr = np.frombuffer(resp.content, dtype=np.uint8) frame = cv2.imdecode(arr, cv2.IMREAD_COLOR) if frame is None: continue detector = cv2.QRCodeDetector() data, bbox, _ = detector.detectAndDecode(frame) if data and len(data) > 0: return data except Exception: pass time.sleep(0.5) return None def _request_manual_qr(self) -> Optional[str]: """暂停任务,等待手动输入(不超时,必须输入才能继续;stop 时解除)""" self.status = MissionStatus.WAITING_QR self.report["status"] = "waiting_qr" self.report["step"] = "等待手动输入二维码" self._log(" ⌨️ 弹窗等待手动输入二维码(不可跳过)...") self._qr_event.clear() self._qr_event.wait() # 无限等待,直到 set_manual_qr 或 stop() 触发 self.status = MissionStatus.RUNNING self.report["status"] = "running" if self._qr_value: self._log(f" ✏️ 手动输入: {self._qr_value}") return self._qr_value else: self._log(f" ⚠️ 任务已停止") return None def set_manual_qr(self, value: str): self._qr_value = value.strip() self._qr_event.set() # ==================== 机型查询 ==================== def _lookup_model(self, qr_value: Optional[str]) -> str: """TODO: 后续通过 HTTP 接口查询机型""" return "机器1" @staticmethod def _find_model(models: list, name: str) -> Optional[dict]: """在机型列表中找到匹配的机型""" for m in models: if m.get("name") == name or m.get("id") == name: return m # fallback: 第一个机型 return models[0] if models else None # ==================== 姿态拍照 ==================== def _shoot(self, model: dict, side: str, row: int, col: int, qr_value: str, machine_row: int = 0): """按机型配置的所有姿态依次拍照 machine_row: 当前操作的机器所在行,用于判断是否需要关节反转 蛇形路径下,AGV 朝向在奇数行(1,3,5)时与出发时相反, 此时机器的正面/背面朝向与标定相反,需要把所有关节角度取反 """ # 判断是否需要反转关节角度(AGV 朝向与出发时相反) invert = (machine_row % 2 == 1) # 机器行是奇数(1,3,5...)时,AGV从对面来,需反转 side_label = "正面" if side == "front" else "背面" if invert: self._log(f" 🔄 {side_label}拍摄:AGV朝向相反,关节角度取反") poses = model.get("poses", []) if side == "front" else model.get("poses_back", []) if not poses: self._log(f" ⚠️ 机型无{side_label}姿态配置") return self._log(f" 📷 {side_label}拍照 ({len(poses)} 个姿态)") for pi, pose in enumerate(poses): if self._stop.is_set(): break self._wait_pause() angles = pose.get("arm_angles", []) if not angles or len(angles) < 6: self._log(f" 跳过 {pose.get('name', f'姿态{pi+1}')}: 无效角度") continue # 需要时对所有关节角度取反 if invert: angles = [-a for a in angles] name = pose.get("name", f"{side_label}-{pi+1}") self._log(f" 🎯 {name}{' (反转)' if invert else ''}") # 调整机械臂 if self.arm_client: self.arm_client.set_angles(angles, speed=self.arm_speed) self._wait_arm_ready(angles) # 拍照(upload_index 连续递增) path = self._capture_arm_photo(row, col, side, pi + 1, qr_value, upload_index=self.next_upload_index) self.next_upload_index += 1 if path: self._log(f" 💾 {os.path.basename(path)}") def _capture_arm_photo(self, row: int, col: int, side: str, pose_idx: int, qr_value: str, upload_index: int = 0) -> Optional[str]: """从机械臂摄像头拍照,直接上传到服务器(不保存本地) upload_index: 从1开始,先正面后背面,由调用方维护 """ try: resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=10) if resp.status_code != 200 or not resp.content: logger.error("arm snapshot 请求失败") return None # 生成文件名(用于上传) ts = time.strftime("%Y%m%d_%H%M%S") fname = f"{ts}_r{row}c{col}_{side}_p{pose_idx}_{qr_value[:20]}.jpg" # 直接上传到服务器(不保存本地) if qr_value: self._upload_photo_bytes(fname, resp.content, qr_value, upload_index) else: self._log(" ⚠️ 无二维码,跳过上传") return fname # 返回文件名(用于日志) except Exception as e: logger.error(f"拍照异常: {e}") return None def _upload_photo(self, filepath: str, serial_number: str, index: int) -> bool: """上传照片到远程服务器 Args: filepath: 本地文件路径 serial_number: 二维码/序列号 index: 1=正面, 2=背面 """ try: filename = os.path.basename(filepath) headers = { "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VyX2tleSI6ImZhNTNkZTZiLWE3NjYtNDZmNC05MDUyLTQ2MjUzZTAyNjdmNSIsInVzZXJuYW1lIjoiYWRtaW4ifQ.lC4vKThZo4aAOLsekm2kPgaEJRqRx-YDQWKfHFqxdPNESCKy57l3eIqaKTj2ZjAMaoYAwYlMrv5M1zAOJsO_PA" } with open(filepath, "rb") as f: files = {"file": (filename, f, "image/jpeg")} data = {"serialNumber": serial_number, "index": str(index)} resp = requests.post(UPLOAD_URL, files=files, data=data, headers=headers, timeout=30) if resp.status_code == 200: self._log(f" ☁️ 上传成功 [{index}]: {filename}") return True else: self._log(f" ⚠️ 上传失败 [{index}] HTTP {resp.status_code}: {resp.text[:200]}") return False except Exception as e: self._log(f" ❌ 上传异常 [{index}]: {e}") return False # ==================== 控制 ==================== def _wait_pause(self): """等待暂停状态解除""" self._pause.wait() def pause(self): self._pause.clear() self.status = MissionStatus.PAUSED self.report["status"] = "paused" self.report["step"] = "已暂停" self._log("⏸️ 任务已暂停") def resume(self): self._pause.set() self.status = MissionStatus.RUNNING self.report["status"] = "running" self._log("▶️ 任务已恢复") def stop(self): self._stop.set() self._pause.set() # 解除暂停 self._qr_event.set() # 解除 QR 等待 if self.arm_client: try: self.arm_client.task_stop() except Exception: pass self.agv.stop() self.status = MissionStatus.IDLE self.report["status"] = "idle" def get_status(self) -> dict: return { "status": self.report["status"], "step": self.report["step"], "progress": self.report["progress"], "total": self.report["total"], "tasks": self.report.get("tasks", []), } def get_logs(self) -> dict: """返回实时日志和完整状态""" return self.report # ==================== 状态报告 ==================== def _log(self, msg: str): self.report["log"].append(msg) # Keep last 500 entries if len(self.report["log"]) > 500: self.report["log"] = self.report["log"][-500:] logger.info(msg) def _step(self, text: str): self.report["step"] = text def _get_task(self, row: int, col: int) -> Optional[dict]: """获取指定行列的任务记录""" for t in self.report.get("tasks", []): if t["row"] == row and t["col"] == col: return t return None def _progress(self, machine_idx: int, side_code: int): """side_code: 1=正面完成, 2=背面完成""" if self.report["total"]: self.report["progress"] = min( int((machine_idx * 2 + side_code) / (self.report["total"] * 2) * 100), 99 ) # ==================== 错误弹窗 ==================== def _wait_error(self, msg: str) -> str: """阻塞等待用户选择:skip(跳过)或 abort(中断)""" self.status = MissionStatus.WAITING_ERROR self.report["status"] = "waiting_error" self.report["step"] = msg self.report["error"] = msg self._log(f"⚠️ 错误处理: {msg}") self._error_choice = None self._error_mode = True start = time.time() while self._error_choice is None and not self._stop.is_set(): time.sleep(0.2) if time.time() - start > 600: # 10分钟超时 → 跳过 self._error_choice = "skip" choice = self._error_choice or "skip" self._error_choice = None self._error_mode = False if choice == "abort": self._stop.set() self._log("⏹️ 用户选择中断") else: self._log("⏭️ 用户选择跳过") # 恢复状态 self.status = MissionStatus.RUNNING if not self._single_step_mode else MissionStatus.WAITING_STEP self.report["status"] = self.status.value self.report["error"] = None return choice def set_error_choice(self, choice: str): """外部 API 设置错误处理选择""" self._error_choice = choice # ==================== 单步执行 ==================== def _wait_step_confirm(self, row_label: int, col_label: int) -> str: """单步执行:等待用户确认/重试/中断""" self.status = MissionStatus.WAITING_STEP self.report["status"] = "waiting_step" self.report["step"] = f"机器 {row_label}-{col_label} 完成,等待确认" self.report["current_step"] = { "row": row_label - 1, "col": col_label - 1, "label": f"{row_label}-{col_label}" } self._log(f"⏸️ 单步执行: 机器 {row_label}-{col_label} 完成,等待确认...") self._step_choice = None start = time.time() while self._step_choice is None and not self._stop.is_set(): time.sleep(0.2) if time.time() - start > 600: # 10分钟超时 → 确认 self._step_choice = "confirm" choice = self._step_choice or "confirm" self._step_choice = None self.report.pop("current_step", None) if choice == "abort": self._stop.set() self._log("⏹️ 用户选择中断") elif choice == "retry": self._log(f"🔄 用户选择重试机器 {row_label}-{col_label}") else: self._log(f"✅ 用户确认机器 {row_label}-{col_label}") return choice def set_step_choice(self, choice: str): """外部 API 设置单步执行选择""" self._step_choice = choice # ==================== Nav2 导航 ==================== # (保留原实现) def _nav2_check_available(self) -> bool: try: rc, out, err = self._run_ros2_cmd("ros2 action list") if rc != 0: return False return "/navigate_to_pose" in out except: return False def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0, timeout_sec: float = 120.0) -> bool: """使用 Nav2Navigator 直接发送导航目标(blocking 模式,等待完成)""" try: logger.info(f"🧭 导航到目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°") ok = self._nav.navigate_to_pose(x, y, yaw, timeout_sec=timeout_sec, blocking=True) if ok: logger.info(f"✅ 导航成功到达 ({x:.3f}, {y:.3f})") else: logger.warning(f"⚠️ 导航失败或超时 ({x:.3f}, {y:.3f})") return ok except Exception as e: logger.error(f"Nav2 异常: {e}") return False def _nav2_cancel(self): cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || true'" try: subprocess.run(cancel_cmd, shell=True, timeout=3) except: pass def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" try: result = subprocess.run( full_cmd, shell=True, capture_output=True, text=True, timeout=timeout ) return result.returncode, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Timeout" except Exception as e: return -1, "", str(e)