# -*- coding: utf-8 -*- """ 任务执行器 V3 — M×N 网格蛇形路径拍摄 工作流: 1. 根据 grid 生成蛇形路径(奇数行左→右,偶数行右→左) 2. 逐台机器: - 正面:导航 → 扫码(多姿态重试) → 查机型 → 按姿态拍照 - 背面:导航 → 按姿态拍照 3. 回到 (0,0) """ import os import time import json import logging import threading import subprocess import math from typing import Optional, Dict, List from enum import Enum import requests import cv2 import numpy as np logger = logging.getLogger(__name__) ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash" ARM_CAMERA_SNAPSHOT = "http://192.168.110.164:5003/api/camera/snapshot" PHOTOS_DIR = "/home/elephant/photos" # 二维码扫描重试参数 QR_SCAN_TIMEOUT = 5 # 单次扫描超时 QR_POSE_WAIT = 1.5 # 调整姿态后等待时间 MANUAL_QR_TIMEOUT = 300 # 5分钟超时 class MissionStatus(str, Enum): IDLE = "idle" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" WAITING_QR = "waiting_qr" class MissionExecutorV3: """任务执行器 V3 — M×N 网格蛇形路径""" _instance = None # 单例,供外部停止用 def __init__(self, config: dict): self.config = config self.status = MissionStatus.IDLE MissionExecutorV3._instance = self # 实时状态报告 self.report = { "status": "idle", "step": "", "progress": 0, "total": 0, "log": [], "error": None, } # 线程同步 self._stop = threading.Event() self._pause = threading.Event() self._pause.set() # 初始不暂停 self._qr_event = threading.Event() self._qr_value: Optional[str] = None # 设备 from .arm_client import ArmClient self.arm_client = ArmClient( config["arm"]["host"], config["arm"]["port"] ) from .agv_controller_ros2 import AGVController self.agv = AGVController( device=config.get("device", "/dev/agvpro_controller"), baudrate=config.get("baudrate", 1000000) ) # ==================== 连接 ==================== def connect_all(self) -> Dict[str, bool]: results = {} results["agv"] = self.agv.connect() results["arm"] = self.arm_client.connect() return results def disconnect_all(self): if self.arm_client: self.arm_client.close() self.agv.disconnect() # ==================== 主任务流程 ==================== def execute_mission( self, mission_config: dict, machines: list, qr_configs: list, models: list, ) -> dict: """ 执行完整拍摄任务。 Args: mission_config: {rows, cols, grid, positions} machines: [{id, row, col, front: {coords}, back: {coords}}] qr_configs: [{id, name, joint_angles: [a1..a6]}] models: [{id, name, poses: [{name, arm_angles}], poses_back: [...]}] Returns: 执行报告 dict """ self.status = MissionStatus.RUNNING self.report = {"status": "running", "step": "初始化", "progress": 0, "total": 0, "log": [], "error": None} self._stop.clear() self._pause.set() start_time = time.time() try: rows = int(mission_config.get("rows", 1)) cols = int(mission_config.get("cols", 1)) grid = mission_config.get("grid", []) positions = mission_config.get("positions", []) # 如果 grid 为空,从 machines 重建 if not grid or all(not any(rw) if isinstance(rw, list) else True for rw in grid): try: cfg_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data") with open(os.path.join(cfg_dir, "machines_config.json")) as jf: machines = json.load(jf) grid = [[False] * cols for _ in range(rows)] for m in machines: r = int(m.get("row", 0)) c = int(m.get("col", 0)) if 0 <= r < rows and 0 <= c < cols: grid[r][c] = True except Exception: grid = [[False] * cols for _ in range(rows)] # 1. 生成蛇形路径 path = MissionExecutorV3._build_snake_path(rows, cols, grid) if not path: self._log("❌ 网格中没有机器,任务终止") self.report["error"] = "No machines in grid" return self._finish(0) self.report["total"] = len(path) self._log(f"📍 蛇形路径生成: {len(path)} 台机器") # 初始化任务列表 self.report["tasks"] = [{ "row": r, "col": c, "machine_id": f"m_{r}_{c}", "label": f"{r+1}-{c+1}", "status": "pending", "step": "等待", "qr_value": None, "photos_front": 0, "photos_back": 0, } for (r, c) in path] # 初始化任务列表 self.report["tasks"] = [{ "row": r, "col": c, "machine_id": f"m_{r}_{c}", "label": f"{r+1}-{c+1}", "status": "pending", "step": "等待", "qr_value": None, "photos_front": 0, "photos_back": 0, } for (r, c) in path] # 2. 逐台执行 for idx, (r, c) in enumerate(path): if self._stop.is_set(): self._log("⏹️ 任务已停止") break self._wait_pause() # 更新任务状态 → 正面开始 task = self._get_task(r, c) if task: task["status"] = "active" task["step"] = "正面扫码" machine_id = f"m_{r}_{c}" machine = next((m for m in machines if m.get("id") == machine_id), None) if not machine: self._log(f"⚠️ 机器 {r+1}-{c+1} 不存在,跳过") continue rl, cl = r + 1, c + 1 # 显示用的 1-based # --- 正面 --- self._log(f"📍 机器 {rl}-{cl} 进入正面点位") self._step(f"机器 {rl}-{cl} 正面") # 导航到正面点位 front_pt = self._find_point(positions, r, c, "front") if front_pt and self._has_coords(front_pt): if not self._navigate(front_pt, "正面"): self._log(f"⚠️ 导航失败,尝试继续") else: self._log(f"⚠️ 无正面点位坐标") # 扫描二维码 qr_value = self._scan_qr_with_poses(qr_configs) if self._stop.is_set(): break # 查机型 + 更新任务步骤 model_name = self._lookup_model(qr_value) self._log(f" 🏷️ 机型: {model_name}") if task and qr_value: task["qr_value"] = qr_value if task: task["step"] = "正面拍照" # 正面拍照 model = self._find_model(models, model_name) if model: self._shoot(model, "front", rl, cl, qr_value or "unknown") else: self._log(f" ⚠️ 未找到机型 {model_name}") self._progress(idx, 1) # --- 背面 --- if task: task["step"] = "背面拍照" self._log(f"📍 机器 {rl}-{cl} 进入背面点位") self._step(f"机器 {rl}-{cl} 背面") # 导航到背面点位 back_pt = self._find_point(positions, r + 1, c, "back") if back_pt and self._has_coords(back_pt): if not self._navigate(back_pt, "背面"): self._log(f"⚠️ 导航失败,尝试继续") else: self._log(f"⚠️ 无背面点位坐标") # 背面拍照 if model: self._shoot(model, "back", rl, cl, qr_value or "unknown") # 标记任务完成 if task: task["status"] = "completed" task["step"] = "完成" self._progress(idx, 2) # 3. 回到出发点 if not self._stop.is_set(): self._step("返回出发点") self._log("→ 返回 (0, 0)") self._nav2_go_to_point(0, 0, 0, timeout_sec=60) elapsed = time.time() - start_time return self._finish(elapsed) except Exception as e: self._log(f"❌ 任务异常: {e}") logger.exception("execute_mission 崩溃") self.report["error"] = str(e) self.status = MissionStatus.IDLE self.report["status"] = "idle" return self.report def _finish(self, elapsed: float) -> dict: if self._stop.is_set(): self._step("已停止") else: self._step("完成") self._log(f"✅ 任务完成 ({elapsed:.0f}s)") self.report["progress"] = 100 self.status = MissionStatus.IDLE self.report["status"] = "idle" return self.report # ==================== 蛇形路径 ==================== @staticmethod def _build_snake_path(rows: int, cols: int, grid: list) -> list: """奇数行(0,2,4...)左→右,偶数行(1,3,5...)右→左""" path = [] for r in range(rows): if r % 2 == 0: for c in range(cols): if MissionExecutorV3._has_machine(grid, r, c): path.append((r, c)) else: for c in range(cols - 1, -1, -1): if MissionExecutorV3._has_machine(grid, r, c): path.append((r, c)) return path @staticmethod def _has_machine(grid: list, r: int, c: int) -> bool: if not grid or r >= len(grid): return False row = grid[r] if isinstance(row, list): return c < len(row) and bool(row[c]) return False @staticmethod def _build_grid_from_machines(rows: int, cols: int, machines: list) -> list: """从机器列表重建 grid 矩阵""" if not machines: return [[False] * cols for _ in range(rows)] max_r = max(int(m.get("row", 0)) for m in machines) + 1 max_c = max(int(m.get("col", 0)) for m in machines) + 1 gr = max(rows, max_r) gc = max(cols, max_c) grid = [[False] * gc for _ in range(gr)] for m in machines: r = int(m.get("row", 0)) c = int(m.get("col", 0)) if 0 <= r < gr and 0 <= c < gc: grid[r][c] = True return grid @staticmethod def pre_generate_tasks(mission_config: dict) -> list: """从网格配置预生成任务列表(用于 UI 展示,无需启动执行器)""" rows = int(mission_config.get("rows", 1)) cols = int(mission_config.get("cols", 1)) grid = mission_config.get("grid", []) # 如果 grid 为空但从 machines 重建 if not grid and machines: grid = MissionExecutorV3._build_grid_from_machines(rows, cols, machines) if grid: rows = len(grid) cols = len(grid[0]) if grid else cols path = MissionExecutorV3._build_snake_path(rows, cols, grid) tasks = [] for (r, c) in path: tasks.append({ "row": r, "col": c, "machine_id": f"m_{r}_{c}", "label": f"{r+1}-{c+1}", "status": "pending", "step": "等待", "qr_value": None, "photos_front": 0, "photos_back": 0, }) return tasks # ==================== 点位查找 ==================== @staticmethod def _find_point(positions: list, row: int, col: int, side: str) -> Optional[dict]: for p in positions: if p.get("row") == row and p.get("col") == col and p.get("side") == side: return p return None @staticmethod def _has_coords(point: dict) -> bool: coords = point.get("coords", []) return len(coords) >= 2 and (coords[0] != 0 or coords[1] != 0) # ==================== 导航 ==================== def _navigate(self, point: dict, label: str) -> bool: coords = point["coords"] x, y = float(coords[0]), float(coords[1]) yaw = float(coords[2]) if len(coords) >= 3 else 0.0 self._log(f" 🧭 导航到{label}点位 ({x:.2f}, {y:.2f}, yaw={math.degrees(yaw):.0f}°)") return self._nav2_go_to_point(x, y, yaw) # ==================== 二维码扫描 ==================== def _scan_qr_with_poses(self, qr_configs: list) -> Optional[str]: """用二维码配置中的姿态依次尝试""" if not qr_configs: self._log(f" ⚠️ 无二维码配置") return self._request_manual_qr() self._log(f" 🔍 尝试 {len(qr_configs)} 个二维码姿态...") for i, qc in enumerate(qr_configs): if self._stop.is_set(): return None self._wait_pause() angles = qc.get("joint_angles", []) if not angles or len(angles) < 6: continue name = qc.get("name", f"姿态{i+1}") self._log(f" [{i+1}/{len(qr_configs)}] {name}") # 调整机械臂 if self.arm_client: self.arm_client.set_angles(angles, speed=500) time.sleep(QR_POSE_WAIT) # 扫码 qr = self._decode_qr_from_arm() if qr: self._log(f" ✅ 识别成功: {qr}") return qr time.sleep(0.3) self._log(f" ⚠️ 全部 {len(qr_configs)} 个姿态均未识别到二维码") return self._request_manual_qr() def _decode_qr_from_arm(self) -> Optional[str]: """从机械臂摄像头取一帧,识别二维码""" for attempt in range(3): try: resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=QR_SCAN_TIMEOUT) if resp.status_code != 200 or not resp.content: continue arr = np.frombuffer(resp.content, dtype=np.uint8) frame = cv2.imdecode(arr, cv2.IMREAD_COLOR) if frame is None: continue detector = cv2.QRCodeDetector() data, bbox, _ = detector.detectAndDecode(frame) if data and len(data) > 0: return data except Exception: pass time.sleep(0.5) return None def _request_manual_qr(self) -> Optional[str]: """暂停任务,等待手动输入""" self.status = MissionStatus.WAITING_QR self.report["status"] = "waiting_qr" self.report["step"] = "等待手动输入二维码" self._log(" ⌨️ 弹窗等待手动输入二维码...") self._qr_event.clear() if self._qr_event.wait(timeout=MANUAL_QR_TIMEOUT): self.status = MissionStatus.RUNNING self.report["status"] = "running" self._log(f" ✏️ 手动输入: {self._qr_value}") return self._qr_value else: self.status = MissionStatus.RUNNING self.report["status"] = "running" self._log(f" ⚠️ 等待超时({MANUAL_QR_TIMEOUT}s),跳过") return None def set_manual_qr(self, value: str): self._qr_value = value.strip() self._qr_event.set() # ==================== 机型查询 ==================== def _lookup_model(self, qr_value: Optional[str]) -> str: """TODO: 后续通过 HTTP 接口查询机型""" return "机器1" @staticmethod def _find_model(models: list, name: str) -> Optional[dict]: """在机型列表中找到匹配的机型""" for m in models: if m.get("name") == name or m.get("id") == name: return m # fallback: 第一个机型 return models[0] if models else None # ==================== 姿态拍照 ==================== def _shoot(self, model: dict, side: str, row: int, col: int, qr_value: str): """按机型配置的所有姿态依次拍照""" # 更新任务照片计数 task = self._get_task(row - 1, col - 1) side_label = "正面" if side == "front" else "背面" poses = model.get("poses", []) if side == "front" else model.get("poses_back", []) if not poses: self._log(f" ⚠️ 机型无{side_label}姿态配置") return self._log(f" 📷 {side_label}拍照 ({len(poses)} 个姿态)") for pi, pose in enumerate(poses): if self._stop.is_set(): break self._wait_pause() angles = pose.get("arm_angles", []) if not angles or len(angles) < 6: self._log(f" 跳过 {pose.get('name', f'姿态{pi+1}')}: 无效角度") continue name = pose.get("name", f"{side_label}-{pi+1}") self._log(f" 🎯 {name}") # 调整机械臂 if self.arm_client: self.arm_client.set_angles(angles, speed=500) time.sleep(QR_POSE_WAIT) # 拍照 path = self._capture_arm_photo(row, col, side, pi + 1, qr_value) if path: self._log(f" 💾 {os.path.basename(path)}") def _capture_arm_photo(self, row: int, col: int, side: str, pose_idx: int, qr_value: str) -> Optional[str]: """从机械臂摄像头拍照存本地""" try: resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=10) if resp.status_code != 200 or not resp.content: logger.error("arm snapshot 请求失败") return None os.makedirs(PHOTOS_DIR, exist_ok=True) ts = time.strftime("%Y%m%d_%H%M%S") fname = f"{ts}_r{row}c{col}_{side}_p{pose_idx}_{qr_value[:20]}.jpg" fpath = os.path.join(PHOTOS_DIR, fname) with open(fpath, "wb") as f: f.write(resp.content) return fpath except Exception as e: logger.error(f"拍照异常: {e}") return None # ==================== 控制 ==================== def _wait_pause(self): """等待暂停状态解除""" self._pause.wait() def pause(self): self._pause.clear() self.status = MissionStatus.PAUSED self.report["status"] = "paused" self.report["step"] = "已暂停" self._log("⏸️ 任务已暂停") def resume(self): self._pause.set() self.status = MissionStatus.RUNNING self.report["status"] = "running" self._log("▶️ 任务已恢复") def stop(self): self._stop.set() self._pause.set() # 解除暂停 self._qr_event.set() # 解除 QR 等待 if self.arm_client: try: self.arm_client.task_stop() except Exception: pass self.agv.stop() self.status = MissionStatus.IDLE self.report["status"] = "idle" def get_status(self) -> dict: return { "status": self.report["status"], "step": self.report["step"], "progress": self.report["progress"], "total": self.report["total"], "tasks": self.report.get("tasks", []), } def get_logs(self) -> dict: """返回实时日志和完整状态""" return self.report # ==================== 状态报告 ==================== def _log(self, msg: str): self.report["log"].append(msg) # Keep last 500 entries if len(self.report["log"]) > 500: self.report["log"] = self.report["log"][-500:] logger.info(msg) def _step(self, text: str): self.report["step"] = text def _get_task(self, row: int, col: int) -> Optional[dict]: """获取指定行列的任务记录""" for t in self.report.get("tasks", []): if t["row"] == row and t["col"] == col: return t return None def _progress(self, machine_idx: int, side_code: int): """side_code: 1=正面完成, 2=背面完成""" if self.report["total"]: self.report["progress"] = min( int((machine_idx * 2 + side_code) / (self.report["total"] * 2) * 100), 99 ) # ==================== Nav2 导航 ==================== # (保留原实现) def _nav2_check_available(self) -> bool: try: rc, out, err = self._run_ros2_cmd("ros2 action list") if rc != 0: return False return "/navigate_to_pose" in out except: return False def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0, timeout_sec: float = 120.0) -> bool: if not self._nav2_check_available(): logger.error("Nav2 action server 不可用") return False qz = math.sin(yaw / 2.0) qw = math.cos(yaw / 2.0) pose_yaml = ( f"pose:\n" f" header:\n" f" stamp:\n" f" sec: 0\n" f" nanosec: 0\n" f" frame_id: 'map'\n" f" pose:\n" f" position:\n" f" x: {x}\n" f" y: {y}\n" f" z: 0.0\n" f" orientation:\n" f" x: 0.0\n" f" y: 0.0\n" f" z: {qz}\n" f" w: {qw}" ) cmd = ( f"ros2 action send_goal /navigate_to_pose " f"nav2_msgs/action/NavigateToPose " f"'{pose_yaml}' --feedback" ) full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" try: process = subprocess.Popen( full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) succeeded = False elapsed = 0.0 while elapsed < timeout_sec: import select reads = [process.stdout] ready, _, _ = select.select(reads, [], [], 1.0) elapsed += 1.0 if process.stdout in ready: line = process.stdout.readline() if not line: break ls = line.strip() if "succeeded" in ls.lower(): succeeded = True break elif "failed" in ls.lower() or "aborted" in ls.lower(): break elif "canceled" in ls.lower() or "cancelled" in ls.lower(): break if process.poll() is not None: break if process.poll() is None: process.terminate() try: process.wait(timeout=3) except subprocess.TimeoutExpired: process.kill() return succeeded except Exception as e: logger.error(f"Nav2 异常: {e}") return False def _nav2_cancel(self): cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || true'" try: subprocess.run(cancel_cmd, shell=True, timeout=3) except: pass def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'" try: result = subprocess.run( full_cmd, shell=True, capture_output=True, text=True, timeout=timeout ) return result.returncode, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Timeout" except Exception as e: return -1, "", str(e)