231 lines
7.8 KiB
Python
231 lines
7.8 KiB
Python
"""
|
||
任务调度器 - 管理拍摄任务的执行
|
||
"""
|
||
import os
|
||
import json
|
||
import time
|
||
import logging
|
||
from typing import List, Dict, Optional
|
||
from enum import Enum
|
||
|
||
from .arm_client import ArmClient
|
||
from .agv_controller import AGVController
|
||
from .qr_scanner import QRScanner
|
||
from .image_uploader import ImageUploader
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TaskStatus(Enum):
|
||
PENDING = "pending"
|
||
RUNNING = "running"
|
||
COMPLETED = "completed"
|
||
FAILED = "failed"
|
||
PAUSED = "paused"
|
||
|
||
|
||
class MissionExecutor:
|
||
"""任务执行器 - 负责按顺序执行点位拍摄任务"""
|
||
|
||
def __init__(self, config: dict):
|
||
self.config = config
|
||
self.status = TaskStatus.PENDING
|
||
self.current_point_index = 0
|
||
self.current_pose_index = 0
|
||
self.snapshot_serial_map = {} # {point_id: serial_number} 缓存已扫描的 serialNumber
|
||
|
||
# 初始化各模块
|
||
self.agv = AGVController(
|
||
device=config.get("device", "/dev/agvpro_controller"),
|
||
baudrate=config.get("baudrate", 1000000)
|
||
)
|
||
self.arm_client: Optional[ArmClient] = None
|
||
self.uploader = ImageUploader(
|
||
upload_url=config["upload_url"],
|
||
timeout=config.get("upload_timeout", 30),
|
||
max_retries=config.get("upload_retries", 3)
|
||
)
|
||
self.qr_scanner = QRScanner(device_index=config.get("camera_index", 0))
|
||
|
||
# ========== 连接管理 ==========
|
||
|
||
def connect_all(self) -> Dict[str, bool]:
|
||
"""连接 AGV、机械臂、摄像头"""
|
||
results = {}
|
||
|
||
# 连接 AGV
|
||
results["agv"] = self.agv.connect()
|
||
|
||
# 连接机械臂(通过 TCP)
|
||
arm_cfg = self.config["arm"]
|
||
self.arm_client = ArmClient(arm_cfg["host"], arm_cfg["port"])
|
||
results["arm"] = self.arm_client.connect()
|
||
|
||
# 打开摄像头
|
||
results["camera"] = self.qr_scanner.open()
|
||
|
||
return results
|
||
|
||
def disconnect_all(self):
|
||
"""断开所有连接"""
|
||
if self.arm_client:
|
||
self.arm_client.close()
|
||
self.agv.disconnect()
|
||
self.qr_scanner.close()
|
||
|
||
# ========== 任务执行 ==========
|
||
|
||
def execute_mission(self, mission_data: dict) -> dict:
|
||
"""
|
||
执行一个完整任务(一个地图的所有点位)
|
||
mission_data: 包含点位列表的完整任务配置
|
||
返回执行报告
|
||
"""
|
||
self.status = TaskStatus.RUNNING
|
||
report = {
|
||
"total_points": len(mission_data.get("points", [])),
|
||
"completed": 0,
|
||
"failed": 0,
|
||
"details": []
|
||
}
|
||
|
||
points = mission_data.get("points", [])
|
||
for i, point in enumerate(points):
|
||
self.current_point_index = i
|
||
try:
|
||
result = self._execute_point(point)
|
||
report["details"].append(result)
|
||
if result["status"] == "completed":
|
||
report["completed"] += 1
|
||
else:
|
||
report["failed"] += 1
|
||
except Exception as e:
|
||
logger.error(f"点位 {i} 执行异常: {e}")
|
||
report["failed"] += 1
|
||
report["details"].append({
|
||
"point_index": i,
|
||
"point_name": point.get("name", f"point_{i}"),
|
||
"status": "failed",
|
||
"error": str(e)
|
||
})
|
||
|
||
self.status = TaskStatus.COMPLETED if report["failed"] == 0 else TaskStatus.PAUSED
|
||
return report
|
||
|
||
def _execute_point(self, point: dict) -> dict:
|
||
"""执行单个点位的拍摄"""
|
||
point_name = point.get("name", "unknown")
|
||
logger.info(f"开始执行点位: {point_name}")
|
||
|
||
result = {
|
||
"point_name": point_name,
|
||
"poses": []
|
||
}
|
||
|
||
# 1. AGV 移动到点位
|
||
coords = point.get("coords", {})
|
||
x, y = coords.get("x", 0), coords.get("y", 0)
|
||
logger.info(f"AGV 移动到 ({x}, {y})")
|
||
# TODO: 调用导航移动到目标点
|
||
time.sleep(1) # 模拟移动
|
||
|
||
# 2. 执行该点位的所有姿态
|
||
poses = point.get("poses", [])
|
||
for j, pose in enumerate(poses):
|
||
self.current_pose_index = j
|
||
pose_result = self._execute_pose(point, pose, j)
|
||
result["poses"].append(pose_result)
|
||
|
||
# 如果是"两者都要"类型,需要按顺序执行两台机器
|
||
if pose.get("type") == "both":
|
||
# 执行顺序由 pose.sequence 配置
|
||
sequence = pose.get("sequence", ["front_first"])
|
||
for step in sequence:
|
||
if step == "front":
|
||
self._capture_and_upload(point, pose, "front", j)
|
||
elif step == "back":
|
||
self._capture_and_upload(point, pose, "back", j)
|
||
else:
|
||
photo_type = pose.get("photo_type", "front")
|
||
self._capture_and_upload(point, pose, photo_type, j)
|
||
|
||
result["status"] = "completed"
|
||
return result
|
||
|
||
def _execute_pose(self, point: dict, pose: dict, pose_idx: int) -> dict:
|
||
"""执行单个姿态的拍摄"""
|
||
photo_type = pose.get("photo_type", "front")
|
||
camera_source = pose.get("camera", "agv") # agv 或 arm
|
||
|
||
# 如果需要机械臂运动
|
||
arm_angles = pose.get("arm_angles", None)
|
||
if arm_angles and self.arm_client:
|
||
self.arm_client.set_angles(arm_angles, speed=pose.get("speed", 500))
|
||
time.sleep(1) # 等待运动到位
|
||
|
||
return {
|
||
"pose_index": pose_idx,
|
||
"photo_type": photo_type,
|
||
"arm_angles": arm_angles,
|
||
"status": "ready"
|
||
}
|
||
|
||
def _capture_and_upload(self, point: dict, pose: dict, photo_type: str, pose_idx: int):
|
||
"""拍摄并上传"""
|
||
point_id = point.get("id", str(point))
|
||
|
||
# 确定 serialNumber
|
||
if photo_type == "front":
|
||
# 正面:从二维码获取 serialNumber
|
||
serial = self.qr_scanner.scan_with_retry(max_attempts=5, interval=0.5)
|
||
if not serial:
|
||
logger.warning(f"点位 {point.get('name')} 正面拍摄未扫描到二维码,跳过")
|
||
return
|
||
self.snapshot_serial_map[point_id] = serial
|
||
else:
|
||
# 背面:使用缓存的 serialNumber
|
||
serial = self.snapshot_serial_map.get(point_id)
|
||
if not serial:
|
||
logger.warning(f"点位 {point.get('name')} 背面拍摄但无缓存 serialNumber")
|
||
return
|
||
|
||
# 拍摄图片(AGV 端摄像头)
|
||
frame = self.qr_scanner.read_frame()
|
||
if frame is None:
|
||
logger.error("摄像头读取失败")
|
||
return
|
||
|
||
# 保存图片
|
||
photo_dir = os.path.join(os.path.dirname(__file__), "..", "photos")
|
||
os.makedirs(photo_dir, exist_ok=True)
|
||
photo_path = os.path.join(photo_dir, f"{serial}_{photo_type}_{int(time.time())}.jpg")
|
||
import cv2
|
||
cv2.imwrite(photo_path, frame)
|
||
|
||
# 上传
|
||
self.uploader.upload(photo_path, serial, pose_idx, photo_type)
|
||
logger.info(f"上传完成: {serial} {photo_type}")
|
||
|
||
# ========== 状态查询 ==========
|
||
|
||
def get_status(self) -> dict:
|
||
return {
|
||
"task_status": self.status.value,
|
||
"current_point": self.current_point_index,
|
||
"current_pose": self.current_pose_index,
|
||
"agv_connected": self.agv.is_connected(),
|
||
"arm_connected": self.arm_client is not None,
|
||
"camera_opened": self.qr_scanner._cap is not None and self.qr_scanner._cap.isOpened()
|
||
}
|
||
|
||
def pause(self):
|
||
self.status = TaskStatus.PAUSED
|
||
|
||
def resume(self):
|
||
self.status = TaskStatus.RUNNING
|
||
|
||
def stop(self):
|
||
if self.arm_client:
|
||
self.arm_client.task_stop()
|
||
self.agv.stop()
|
||
self.status = TaskStatus.PENDING |