""" 二维码识别模块 - 使用 AGV 摄像头识别二维码获取 serialNumber 优先通过 v4l2-ctl 读取 MJPG 帧,避开部分 Jetson/arm64 环境中 OpenCV 直接读取 MJPG 花屏或解码失败的问题;v4l2-ctl 不可用时回退到 OpenCV。 """ import cv2 import time import logging import os import subprocess import numpy as np from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout from typing import Optional logger = logging.getLogger(__name__) try: from pyzbar.pyzbar import decode as qr_decode PYZBAR_AVAILABLE = True except ImportError: PYZBAR_AVAILABLE = False logger.warning("pyzbar 未安装,尝试用 OpenCV 内置 QRCodeDetector") class QRScanner: """二维码扫描器。""" V4L2_CTL = "/usr/bin/v4l2-ctl" DEFAULT_WIDTH = 640 DEFAULT_HEIGHT = 400 V4L2_STREAM_COUNT = 3 V4L2_READ_TIMEOUT = 5.0 OPENCV_READ_TIMEOUT = 2.0 def __init__( self, device_index: int = 0, width: int = DEFAULT_WIDTH, height: int = DEFAULT_HEIGHT, prefer_v4l2_ctl: bool = True, ): self.device_index = device_index self.width = width self.height = height self.prefer_v4l2_ctl = prefer_v4l2_ctl self._cap: Optional[cv2.VideoCapture] = None self._v4l2_ctl_ready = False self._qr_detector = cv2.QRCodeDetector() def _device_path(self) -> str: return f"/dev/video{self.device_index}" def _build_v4l2_cmd(self, stream_count: int = V4L2_STREAM_COUNT) -> list: return [ self.V4L2_CTL, "-d", self._device_path(), "--set-fmt-video", f"width={self.width},height={self.height},pixelformat=MJPG", "--stream-mmap", "--stream-to=-", f"--stream-count={stream_count}", ] def _check_v4l2_ctl(self) -> bool: if not self.prefer_v4l2_ctl: return False if not os.path.exists(self._device_path()): logger.warning(f"摄像头设备 {self._device_path()} 不存在,回退到 OpenCV") return False try: subprocess.run( [self.V4L2_CTL, "--version"], capture_output=True, timeout=2, check=False, ) return True except (FileNotFoundError, subprocess.TimeoutExpired): logger.warning(f"v4l2-ctl 不可用: {self.V4L2_CTL},回退到 OpenCV") return False def open(self) -> bool: """打开摄像头""" self.close() self._v4l2_ctl_ready = self._check_v4l2_ctl() if self._v4l2_ctl_ready: logger.info( f"摄像头 {self._device_path()} 使用 v4l2-ctl MJPG 读取," f"分辨率 {self.width}x{self.height}" ) return True return self._open_opencv_capture() def _open_opencv_capture(self) -> bool: try: self._cap = cv2.VideoCapture(self.device_index, cv2.CAP_V4L2) if not self._cap.isOpened(): self._cap = cv2.VideoCapture(self.device_index) if not self._cap.isOpened(): logger.error(f"无法打开摄像头 {self.device_index}") return False self._cap.set(cv2.CAP_PROP_CONVERT_RGB, 1) w = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) logger.info(f"摄像头 {self.device_index} 使用 OpenCV 读取,分辨率 {w}x{h}") return True except Exception as e: logger.error(f"摄像头打开失败: {e}") return False def close(self): if self._cap: self._cap.release() self._cap = None self._v4l2_ctl_ready = False @staticmethod def _extract_first_jpeg(data: bytes) -> Optional[bytes]: """从 v4l2-ctl 输出流中提取第一帧完整 JPEG。""" soi = data.find(b"\xff\xd8") if soi == -1: return None eoi = data.find(b"\xff\xd9", soi + 2) if eoi == -1 or eoi <= soi: return None return data[soi:eoi + 2] def _read_frame_with_v4l2_ctl(self) -> Optional[np.ndarray]: try: proc = subprocess.run( self._build_v4l2_cmd(), capture_output=True, timeout=self.V4L2_READ_TIMEOUT, check=False, ) jpeg_data = self._extract_first_jpeg(proc.stdout) if jpeg_data is None or len(jpeg_data) < 100: return None frame = cv2.imdecode( np.frombuffer(jpeg_data, dtype=np.uint8), cv2.IMREAD_COLOR, ) if frame is None: return None if frame.mean() < 3 or frame.mean() > 250: return None return frame except subprocess.TimeoutExpired: logger.warning("v4l2-ctl 读取超时") return None except Exception as e: logger.warning(f"v4l2-ctl 读取异常: {e}") return None def _fix_frame(self, frame: np.ndarray) -> Optional[np.ndarray]: """修复绿屏/格式错误帧,返回修复后的 BGR 帧或 None""" if frame is None: return None h, w = frame.shape[:2] if h < 10 or w < 10: return None ndim = len(frame.shape) # 情况 1: 2 通道原始 YUYV → 手动转换 BGR if ndim == 2: frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_YUYV) logger.debug("YUYV 2ch → BGR 转换") return frame # 情况 2: 3 通道但实际帧数据显示为 YUYV(绿屏特征:G 通道全满,B/R 近空) if ndim == 3: g_mean = frame[:, :, 1].mean() if g_mean > 80 and frame[:, :, 0].mean() < 30 and frame[:, :, 2].mean() < 30: # 典型的"Lime"绿屏 — 当做 YUYV 原始数据解码 logger.debug(f"检测到绿屏 (G={g_mean:.0f}, B={frame[:,:,0].mean():.0f}, R={frame[:,:,2].mean():.0f}),尝试修复") try: # 把内存当做 YUYV 数据重新解析 raw_bytes = frame.tobytes() # 3ch w*h 的数据量 = w*h*3 字节 # YUYV 每像素 2 字节,所以一幅 YUYV 图像的总字节 = w*h*2 # 我们只需要取前 w*h*2 字节作为 YUYV 数据 yuyv_len = w * h * 2 if len(raw_bytes) >= yuyv_len: yuyv_img = np.frombuffer(raw_bytes[:yuyv_len], dtype=np.uint8).reshape(h, w, 2) frame = cv2.cvtColor(yuyv_img, cv2.COLOR_YUV2BGR_YUYV) logger.debug("绿屏修复完成") return frame except Exception as e: logger.warning(f"绿屏修复失败: {e}") return None # 情况 3: 全黑帧 if frame.mean() < 5: logger.warning("全黑帧,丢弃") return None # 正常 BGR 帧 return frame def _read_frame_with_opencv(self, timeout: float) -> Optional[np.ndarray]: if not self._cap or not self._cap.isOpened(): return None pool = ThreadPoolExecutor(max_workers=1) try: fut = pool.submit(self._cap.read) ret, frame = fut.result(timeout=timeout) if not ret or frame is None: return None return self._fix_frame(frame) except FuturesTimeout: logger.warning(f"摄像头 read_frame 超时 ({timeout}s),尝试重建 _cap") self.close() self.open() # 重建后重试一次 if self._cap and self._cap.isOpened(): ret, frame = self._cap.read() if ret and frame is not None: return self._fix_frame(frame) return None except Exception as e: logger.error(f"read_frame 异常: {e}") return None finally: pool.shutdown(wait=False) def read_frame(self, timeout: float = OPENCV_READ_TIMEOUT) -> Optional[np.ndarray]: """读取一帧。""" if self._v4l2_ctl_ready: frame = self._read_frame_with_v4l2_ctl() if frame is not None: return frame logger.debug("v4l2-ctl 未读到有效帧,尝试 OpenCV 兜底") if not self._cap or not self._cap.isOpened(): self._open_opencv_capture() return self._read_frame_with_opencv(timeout) def detect_qr(self, frame: np.ndarray) -> Optional[str]: """从图像帧中检测二维码""" if frame is None: return None if len(frame.shape) == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) try: data, vertices, _ = self._qr_detector.detectAndDecode(frame) if data and len(data) > 0: return data.strip() except Exception as e: logger.debug(f"OpenCV QR 检测失败: {e}") if PYZBAR_AVAILABLE: try: results = qr_decode(frame) for res in results: data = res.data.decode("utf-8").strip() if data: return data except Exception as e: logger.debug(f"pyzbar 检测失败: {e}") return None def scan_once(self) -> Optional[str]: """扫描一次(读取一帧并检测)""" frame = self.read_frame() return self.detect_qr(frame) def scan_with_retry(self, max_attempts: int = 5, interval: float = 0.5) -> Optional[str]: """多次扫描直到成功或达到最大次数""" for i in range(max_attempts): logger.debug(f"QR 扫描尝试 {i + 1}/{max_attempts}") result = self.scan_once() if result: return result time.sleep(interval) return None def get_preview_frame(self) -> Optional[np.ndarray]: """获取预览帧(用于界面显示)""" return self.read_frame() def __enter__(self): self.open() return self def __exit__(self, *args): self.close()