Files
smart-inspection/agv_app/utils/mock_hardware.py
T
FaulknerWu cb6498cd2b Refactor infrastructure scripts and add mock hardware support
Changes:
- Refactor project scripts for better dev/prod workflow separation
- Add mock_hardware.py for local development without real hardware
- Add Makefile for common commands
- Add .env.example for environment variable reference
- Split scripts into dev-backend.sh, dev-frontend.sh, prod-backend.sh
- Add stop.sh for clean shutdown

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 12:31:32 +08:00

383 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Mock 硬件实现 - 用于本地开发环境(无真实硬件)
通过环境变量 MOCK_HARDWARE=1 启用,模拟所有硬件组件的行为。
"""
import time
import logging
import numpy as np
from typing import Tuple, List, Optional
from enum import Enum
logger = logging.getLogger(__name__)
logger.info("[Mock] 使用 Mock 硬件实现 - 本地开发模式")
class MockArmClient:
"""Mock 机械臂客户端"""
def __init__(self, host: str = "127.0.0.1", port: int = 5002, timeout: float = 10):
self.host = host
self.port = port
self.timeout = timeout
self._connected = False
# 默认关节角度 [J1, J2, J3, J4, J5, J6]
self._angles = [0.0, -90.0, 90.0, 0.0, 90.0, 0.0]
# 默认坐标 [x, y, z, rx, ry, rz]
self._coords = [200.0, 0.0, 300.0, 0.0, 180.0, 0.0]
self._power_on = False
self._state_on = False
# Mock socket for compatibility with real ArmClient interface
self._sock = None
class _MockSocket:
"""Mock socket for API compatibility"""
def settimeout(self, timeout):
pass
def connect(self) -> bool:
"""建立连接(Mock"""
logger.info(f"[Mock] 连接机械臂 {self.host}:{self.port}")
self._connected = True
self._sock = self._MockSocket() # 创建 mock socket
return True
def close(self):
"""关闭连接"""
self._connected = False
self._sock = None
logger.info("[Mock] 关闭机械臂连接")
def send_command(self, cmd: str) -> Tuple[bool, str]:
"""发送命令(Mock"""
if not self._connected:
return False, "未连接"
logger.debug(f"[Mock] 发送命令: {cmd}")
return True, "ok"
def reconnect(self) -> bool:
"""重新连接"""
self.close()
time.sleep(0.5)
return self.connect()
# ========== 机械臂命令 ==========
def get_angles(self) -> Tuple[bool, List[float]]:
"""获取所有关节角度"""
logger.debug(f"[Mock] 获取关节角度: {self._angles}")
return True, self._angles
def set_angles(self, angles: List[float], speed: int = 500) -> bool:
"""设置所有关节角度"""
if len(angles) != 6:
return False
logger.info(f"[Mock] 设置关节角度: {angles}, 速度: {speed}")
self._angles = list(angles)
return True
def set_angle(self, joint: str, angle: float, speed: int = 500) -> bool:
"""设置单个关节角度"""
logger.info(f"[Mock] 设置关节 {joint} 角度: {angle}, 速度: {speed}")
joint_map = {"J1": 0, "J2": 1, "J3": 2, "J4": 3, "J5": 4, "J6": 5}
idx = joint_map.get(joint)
if idx is not None:
self._angles[idx] = angle
return True
def jog_angle(self, joint: str, direction: int, speed: int = 500) -> bool:
"""连续调节关节角度"""
logger.debug(f"[Mock] jog_angle({joint}, {direction}, {speed})")
return True
def get_coords(self) -> Tuple[bool, List[float]]:
"""获取当前坐标和姿态"""
logger.debug(f"[Mock] 获取坐标: {self._coords}")
return True, self._coords
def set_coords(self, coords: List[float], speed: int = 500) -> bool:
"""设置坐标和姿态"""
if len(coords) != 6:
return False
logger.info(f"[Mock] 设置坐标: {coords}, 速度: {speed}")
self._coords = list(coords)
return True
def jog_coord(self, axis: str, direction: int, speed: int = 500) -> bool:
"""连续调节坐标轴"""
logger.debug(f"[Mock] jog_coord({axis}, {direction}, {speed})")
return True
def power_on(self) -> bool:
"""上电"""
logger.info("[Mock] 机械臂上电")
self._power_on = True
return True
def state_on(self) -> bool:
"""启用状态"""
logger.info("[Mock] 机械臂状态启用")
self._state_on = True
return True
def state_off(self) -> bool:
"""禁用状态"""
logger.info("[Mock] 机械臂状态禁用")
self._state_on = False
return True
def state_check(self) -> bool:
"""检查机械臂状态"""
return self._state_on
def check_running(self) -> bool:
"""检查机械臂是否在运行"""
return False # Mock模式下始终不在运行
def wait_done(self, timeout: float = 30) -> bool:
"""等待上一条命令执行完成"""
logger.debug(f"[Mock] wait_done({timeout}s) - 立即返回")
return True
def task_stop(self) -> bool:
"""停止任务"""
logger.info("[Mock] 停止机械臂任务")
return True
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()
class MockAGVController:
"""Mock AGV 控制器"""
def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000):
self.device = device
self.baudrate = baudrate
self._connected = False
self._position = [0.0, 0.0, 0.0] # [x, y, yaw]
self._voltage = 48.0 # 模拟电压
self._moving = False
def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple:
"""执行 ros2 命令(Mock"""
logger.debug(f"[Mock] ros2 命令: {cmd}")
return 0, "", ""
def connect(self) -> bool:
"""连接 AGVMock"""
logger.info(f"[Mock] 连接 AGV 控制器 {self.device}")
self._connected = True
return True
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected
def move_forward(self, speed: float = 1.0, duration: float = None):
"""前进"""
logger.info(f"[Mock] AGV 前进,速度: {speed}, 时长: {duration}")
self._moving = True
if duration:
time.sleep(min(duration, 0.1)) # Mock 模式下只短暂等待
self.stop()
def move_backward(self, speed: float = 1.0, duration: float = None):
"""后退"""
logger.info(f"[Mock] AGV 后退,速度: {speed}, 时长: {duration}")
if duration:
time.sleep(min(duration, 0.1))
self.stop()
def turn_left(self, speed: float = 1.0, duration: float = None):
"""左转"""
logger.info(f"[Mock] AGV 左转,速度: {speed}, 时长: {duration}")
if duration:
time.sleep(min(duration, 0.1))
self.stop()
def turn_right(self, speed: float = 1.0, duration: float = None):
"""右转"""
logger.info(f"[Mock] AGV 右转,速度: {speed}, 时长: {duration}")
if duration:
time.sleep(min(duration, 0.1))
self.stop()
def move_left_lateral(self, speed: float = 1.0, duration: float = None):
"""向左横向移动"""
logger.info(f"[Mock] AGV 向左横向移动,速度: {speed}, 时长: {duration}")
if duration:
time.sleep(min(duration, 0.1))
self.stop()
def move_right_lateral(self, speed: float = 1.0, duration: float = None):
"""向右横向移动"""
logger.info(f"[Mock] AGV 向右横向移动,速度: {speed}, 时长: {duration}")
if duration:
time.sleep(min(duration, 0.1))
self.stop()
def stop(self):
"""停止"""
logger.debug("[Mock] AGV 停止")
self._moving = False
def get_position(self) -> Optional[List[float]]:
"""获取 AGV 当前位置"""
logger.debug(f"[Mock] 获取 AGV 位置: {self._position}")
return self._position
def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 1.0) -> bool:
"""移动到目标点"""
logger.info(f"[Mock] AGV 移动到目标点: ({x}, {y}), yaw: {rz}")
# 模拟移动完成
self._position = [x, y, rz if rz is not None else self._position[2]]
return True
def get_battery(self) -> Optional[float]:
"""获取电池电压"""
logger.debug(f"[Mock] 获取电池电压: {self._voltage}V")
return self._voltage
def disconnect(self):
"""断开连接"""
self.stop()
self._connected = False
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()
class MockQRScanner:
"""Mock 二维码扫描器"""
def __init__(self, device_index: int = 0, width: int = 640, height: int = 400, prefer_v4l2_ctl: bool = True):
self.device_index = device_index
self.width = width
self.height = height
self._opened = False
# Mock 二维码内容
self._mock_qr_code = "MOCK_QR_SN_12345"
def open(self) -> bool:
"""打开摄像头(Mock"""
logger.info(f"[Mock] 打开摄像头 /dev/video{self.device_index}")
self._opened = True
return True
def close(self):
"""关闭摄像头"""
self._opened = False
logger.info("[Mock] 关闭摄像头")
def read_frame(self, timeout: float = 2.0) -> Optional[np.ndarray]:
"""读取一帧(Mock"""
if not self._opened:
return None
# 返回空白图像
return np.zeros((self.height, self.width, 3), dtype=np.uint8)
def detect_qr(self, frame: np.ndarray) -> Optional[str]:
"""从图像帧中检测二维码(Mock"""
# 始终返回模拟二维码
logger.debug(f"[Mock] 检测到二维码: {self._mock_qr_code}")
return self._mock_qr_code
def scan_once(self) -> Optional[str]:
"""扫描一次(Mock"""
logger.debug("[Mock] 扫描二维码...")
return self._mock_qr_code
def scan_with_retry(self, max_attempts: int = 5, interval: float = 0.5) -> Optional[str]:
"""多次扫描(Mock"""
logger.info(f"[Mock] scan_with_retry - 直接返回模拟二维码")
return self._mock_qr_code
def get_preview_frame(self) -> Optional[np.ndarray]:
"""获取预览帧(Mock"""
return self.read_frame()
def set_mock_qr_code(self, code: str):
"""设置模拟二维码内容(仅 Mock 模式)"""
self._mock_qr_code = code
logger.info(f"[Mock] 设置模拟二维码: {code}")
def __enter__(self):
self.open()
return self
def __exit__(self, *args):
self.close()
class MockNav2Status(Enum):
"""Mock Nav2 状态"""
IDLE = "idle"
NAVIGATING = "navigating"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
class MockNav2Navigator:
"""Mock Nav2 导航器"""
def __init__(self):
self.status = MockNav2Status.IDLE
self._current_pose = [0.0, 0.0, 0.0] # [x, y, yaw]
def _get_current_pose(self) -> List[float]:
"""获取当前位置(Mock"""
return self._current_pose
def navigate_to_pose(self, x: float, y: float, yaw: float = None,
timeout_sec: float = 120.0,
blocking: bool = True) -> bool:
"""导航到目标坐标(Mock"""
logger.info(f"[Mock] 导航到: ({x:.3f}, {y:.3f}), yaw={yaw:.1f}°")
self.status = MockNav2Status.NAVIGATING
# 模拟导航成功
self._current_pose = [x, y, yaw if yaw is not None else 0.0]
self.status = MockNav2Status.SUCCEEDED
return True
def navigate_through_poses(self, poses: List[Tuple[float, float, float]],
timeout_per_pose: float = 120.0,
blocking: bool = True) -> bool:
"""通过多个点位导航(Mock"""
logger.info(f"[Mock] 通过 {len(poses)} 个点位导航")
for i, (x, y, yaw) in enumerate(poses):
logger.debug(f"[Mock] 点位 {i+1}: ({x}, {y}), yaw={yaw}")
if poses:
self._current_pose = list(poses[-1])
self.status = MockNav2Status.SUCCEEDED
return True
def stop(self):
"""停止导航(Mock"""
logger.info("[Mock] 停止导航")
self.status = MockNav2Status.IDLE
def get_status(self) -> dict:
"""获取导航状态(Mock"""
return {
"status": self.status.value,
"position": self._current_pose
}
def get_current_position(self) -> List[float]:
"""获取当前位置(Mock"""
return self._current_pose
# 导出兼容别名
Nav2Status = MockNav2Status