""" Mock 硬件实现 - 用于本地开发环境(无真实硬件) 通过环境变量 MOCK_HARDWARE=1 启用,模拟所有硬件组件的行为。 """ import time import logging import numpy as np from typing import Tuple, List, Optional from enum import Enum logger = logging.getLogger(__name__) logger.info("[Mock] 使用 Mock 硬件实现 - 本地开发模式") class MockArmClient: """Mock 机械臂客户端""" def __init__(self, host: str = "127.0.0.1", port: int = 5002, timeout: float = 10): self.host = host self.port = port self.timeout = timeout self._connected = False # 默认关节角度 [J1, J2, J3, J4, J5, J6] self._angles = [0.0, -90.0, 90.0, 0.0, 90.0, 0.0] # 默认坐标 [x, y, z, rx, ry, rz] self._coords = [200.0, 0.0, 300.0, 0.0, 180.0, 0.0] self._power_on = False self._state_on = False # Mock socket for compatibility with real ArmClient interface self._sock = None class _MockSocket: """Mock socket for API compatibility""" def settimeout(self, timeout): pass def connect(self) -> bool: """建立连接(Mock)""" logger.info(f"[Mock] 连接机械臂 {self.host}:{self.port}") self._connected = True self._sock = self._MockSocket() # 创建 mock socket return True def close(self): """关闭连接""" self._connected = False self._sock = None logger.info("[Mock] 关闭机械臂连接") def send_command(self, cmd: str) -> Tuple[bool, str]: """发送命令(Mock)""" if not self._connected: return False, "未连接" logger.debug(f"[Mock] 发送命令: {cmd}") return True, "ok" def reconnect(self) -> bool: """重新连接""" self.close() time.sleep(0.5) return self.connect() # ========== 机械臂命令 ========== def get_angles(self) -> Tuple[bool, List[float]]: """获取所有关节角度""" logger.debug(f"[Mock] 获取关节角度: {self._angles}") return True, self._angles def set_angles(self, angles: List[float], speed: int = 500) -> bool: """设置所有关节角度""" if len(angles) != 6: return False logger.info(f"[Mock] 设置关节角度: {angles}, 速度: {speed}") self._angles = list(angles) return True def set_angle(self, joint: str, angle: float, speed: int = 500) -> bool: """设置单个关节角度""" logger.info(f"[Mock] 设置关节 {joint} 角度: {angle}, 速度: {speed}") joint_map = {"J1": 0, "J2": 1, "J3": 2, "J4": 3, "J5": 4, "J6": 5} idx = joint_map.get(joint) if idx is not None: self._angles[idx] = angle return True def jog_angle(self, joint: str, direction: int, speed: int = 500) -> bool: """连续调节关节角度""" logger.debug(f"[Mock] jog_angle({joint}, {direction}, {speed})") return True def get_coords(self) -> Tuple[bool, List[float]]: """获取当前坐标和姿态""" logger.debug(f"[Mock] 获取坐标: {self._coords}") return True, self._coords def set_coords(self, coords: List[float], speed: int = 500) -> bool: """设置坐标和姿态""" if len(coords) != 6: return False logger.info(f"[Mock] 设置坐标: {coords}, 速度: {speed}") self._coords = list(coords) return True def jog_coord(self, axis: str, direction: int, speed: int = 500) -> bool: """连续调节坐标轴""" logger.debug(f"[Mock] jog_coord({axis}, {direction}, {speed})") return True def power_on(self) -> bool: """上电""" logger.info("[Mock] 机械臂上电") self._power_on = True return True def state_on(self) -> bool: """启用状态""" logger.info("[Mock] 机械臂状态启用") self._state_on = True return True def state_off(self) -> bool: """禁用状态""" logger.info("[Mock] 机械臂状态禁用") self._state_on = False return True def state_check(self) -> bool: """检查机械臂状态""" return self._state_on def check_running(self) -> bool: """检查机械臂是否在运行""" return False # Mock模式下始终不在运行 def wait_done(self, timeout: float = 30) -> bool: """等待上一条命令执行完成""" logger.debug(f"[Mock] wait_done({timeout}s) - 立即返回") return True def task_stop(self) -> bool: """停止任务""" logger.info("[Mock] 停止机械臂任务") return True def __enter__(self): self.connect() return self def __exit__(self, *args): self.close() class MockAGVController: """Mock AGV 控制器""" def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000): self.device = device self.baudrate = baudrate self._connected = False self._position = [0.0, 0.0, 0.0] # [x, y, yaw] self._voltage = 48.0 # 模拟电压 self._moving = False def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple: """执行 ros2 命令(Mock)""" logger.debug(f"[Mock] ros2 命令: {cmd}") return 0, "", "" def connect(self) -> bool: """连接 AGV(Mock)""" logger.info(f"[Mock] 连接 AGV 控制器 {self.device}") self._connected = True return True def is_connected(self) -> bool: """检查是否已连接""" return self._connected def move_forward(self, speed: float = 1.0, duration: float = None): """前进""" logger.info(f"[Mock] AGV 前进,速度: {speed}, 时长: {duration}") self._moving = True if duration: time.sleep(min(duration, 0.1)) # Mock 模式下只短暂等待 self.stop() def move_backward(self, speed: float = 1.0, duration: float = None): """后退""" logger.info(f"[Mock] AGV 后退,速度: {speed}, 时长: {duration}") if duration: time.sleep(min(duration, 0.1)) self.stop() def turn_left(self, speed: float = 1.0, duration: float = None): """左转""" logger.info(f"[Mock] AGV 左转,速度: {speed}, 时长: {duration}") if duration: time.sleep(min(duration, 0.1)) self.stop() def turn_right(self, speed: float = 1.0, duration: float = None): """右转""" logger.info(f"[Mock] AGV 右转,速度: {speed}, 时长: {duration}") if duration: time.sleep(min(duration, 0.1)) self.stop() def move_left_lateral(self, speed: float = 1.0, duration: float = None): """向左横向移动""" logger.info(f"[Mock] AGV 向左横向移动,速度: {speed}, 时长: {duration}") if duration: time.sleep(min(duration, 0.1)) self.stop() def move_right_lateral(self, speed: float = 1.0, duration: float = None): """向右横向移动""" logger.info(f"[Mock] AGV 向右横向移动,速度: {speed}, 时长: {duration}") if duration: time.sleep(min(duration, 0.1)) self.stop() def stop(self): """停止""" logger.debug("[Mock] AGV 停止") self._moving = False def get_position(self) -> Optional[List[float]]: """获取 AGV 当前位置""" logger.debug(f"[Mock] 获取 AGV 位置: {self._position}") return self._position def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 1.0) -> bool: """移动到目标点""" logger.info(f"[Mock] AGV 移动到目标点: ({x}, {y}), yaw: {rz}") # 模拟移动完成 self._position = [x, y, rz if rz is not None else self._position[2]] return True def get_battery(self) -> Optional[float]: """获取电池电压""" logger.debug(f"[Mock] 获取电池电压: {self._voltage}V") return self._voltage def disconnect(self): """断开连接""" self.stop() self._connected = False def __enter__(self): self.connect() return self def __exit__(self, *args): self.disconnect() class MockQRScanner: """Mock 二维码扫描器""" def __init__(self, device_index: int = 0, width: int = 640, height: int = 400, prefer_v4l2_ctl: bool = True): self.device_index = device_index self.width = width self.height = height self._opened = False # Mock 二维码内容 self._mock_qr_code = "MOCK_QR_SN_12345" def open(self) -> bool: """打开摄像头(Mock)""" logger.info(f"[Mock] 打开摄像头 /dev/video{self.device_index}") self._opened = True return True def close(self): """关闭摄像头""" self._opened = False logger.info("[Mock] 关闭摄像头") def read_frame(self, timeout: float = 2.0) -> Optional[np.ndarray]: """读取一帧(Mock)""" if not self._opened: return None # 返回空白图像 return np.zeros((self.height, self.width, 3), dtype=np.uint8) def detect_qr(self, frame: np.ndarray) -> Optional[str]: """从图像帧中检测二维码(Mock)""" # 始终返回模拟二维码 logger.debug(f"[Mock] 检测到二维码: {self._mock_qr_code}") return self._mock_qr_code def scan_once(self) -> Optional[str]: """扫描一次(Mock)""" logger.debug("[Mock] 扫描二维码...") return self._mock_qr_code def scan_with_retry(self, max_attempts: int = 5, interval: float = 0.5) -> Optional[str]: """多次扫描(Mock)""" logger.info(f"[Mock] scan_with_retry - 直接返回模拟二维码") return self._mock_qr_code def get_preview_frame(self) -> Optional[np.ndarray]: """获取预览帧(Mock)""" return self.read_frame() def set_mock_qr_code(self, code: str): """设置模拟二维码内容(仅 Mock 模式)""" self._mock_qr_code = code logger.info(f"[Mock] 设置模拟二维码: {code}") def __enter__(self): self.open() return self def __exit__(self, *args): self.close() class MockNav2Status(Enum): """Mock Nav2 状态""" IDLE = "idle" NAVIGATING = "navigating" SUCCEEDED = "succeeded" FAILED = "failed" CANCELLED = "cancelled" class MockNav2Navigator: """Mock Nav2 导航器""" def __init__(self): self.status = MockNav2Status.IDLE self._current_pose = [0.0, 0.0, 0.0] # [x, y, yaw] def _get_current_pose(self) -> List[float]: """获取当前位置(Mock)""" return self._current_pose def navigate_to_pose(self, x: float, y: float, yaw: float = None, timeout_sec: float = 120.0, blocking: bool = True) -> bool: """导航到目标坐标(Mock)""" logger.info(f"[Mock] 导航到: ({x:.3f}, {y:.3f}), yaw={yaw:.1f}°") self.status = MockNav2Status.NAVIGATING # 模拟导航成功 self._current_pose = [x, y, yaw if yaw is not None else 0.0] self.status = MockNav2Status.SUCCEEDED return True def navigate_through_poses(self, poses: List[Tuple[float, float, float]], timeout_per_pose: float = 120.0, blocking: bool = True) -> bool: """通过多个点位导航(Mock)""" logger.info(f"[Mock] 通过 {len(poses)} 个点位导航") for i, (x, y, yaw) in enumerate(poses): logger.debug(f"[Mock] 点位 {i+1}: ({x}, {y}), yaw={yaw}") if poses: self._current_pose = list(poses[-1]) self.status = MockNav2Status.SUCCEEDED return True def stop(self): """停止导航(Mock)""" logger.info("[Mock] 停止导航") self.status = MockNav2Status.IDLE def get_status(self) -> dict: """获取导航状态(Mock)""" return { "status": self.status.value, "position": self._current_pose } def get_current_position(self) -> List[float]: """获取当前位置(Mock)""" return self._current_pose # 导出兼容别名 Nav2Status = MockNav2Status