""" 二维码识别模块 - 使用 OpenCV 识别二维码获取 serialNumber """ import cv2 import time import logging import numpy as np from typing import Optional, Tuple logger = logging.getLogger(__name__) # 尝试导入二维码识别库 try: from pyzbar.pyzbar import decode as qr_decode PYZBAR_AVAILABLE = True except ImportError: PYZBAR_AVAILABLE = False logger.warning("pyzbar 未安装,尝试用 OpenCV 内置 QRCodeDetector") class QRScanner: """二维码扫描器""" def __init__(self, device_index: int = 0): self.device_index = device_index self._cap: Optional[cv2.VideoCapture] = None self._qr_detector = cv2.QRCodeDetector() # OpenCV 内置二维码检测器 def open(self) -> bool: """打开摄像头""" try: # 强制 V4L2 后端 self._cap = cv2.VideoCapture(self.device_index, cv2.CAP_V4L2) if not self._cap.isOpened(): self._cap = cv2.VideoCapture(self.device_index) if not self._cap.isOpened(): logger.error(f"无法打开摄像头 {self.device_index}") return False # 确保 OpenCV 做 BGR 转换(部分 V4L2 后端默认不做 YUYV→BGR 转换) self._cap.set(cv2.CAP_PROP_CONVERT_RGB, 1) # 设置分辨率(使用默认分辨率,不强制 MJPG) w = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) logger.info(f"摄像头 {self.device_index} 已打开,分辨率 {w}x{h}") return True except Exception as e: logger.error(f"摄像头打开失败: {e}") return False def close(self): if self._cap: self._cap.release() self._cap = None def _fix_frame(self, frame: np.ndarray) -> Optional[np.ndarray]: """修复绿屏/格式错误帧,返回修复后的 BGR 帧或 None""" if frame is None: return None h, w = frame.shape[:2] if h < 10 or w < 10: return None ndim = len(frame.shape) # 情况 1: 2 通道原始 YUYV → 手动转换 BGR if ndim == 2: frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_YUYV) logger.debug("YUYV 2ch → BGR 转换") return frame # 情况 2: 3 通道但实际帧数据显示为 YUYV(绿屏特征:G 通道全满,B/R 近空) if ndim == 3: g_mean = frame[:, :, 1].mean() if g_mean > 80 and frame[:, :, 0].mean() < 30 and frame[:, :, 2].mean() < 30: # 典型的"Lime"绿屏 — 当做 YUYV 原始数据解码 logger.debug(f"检测到绿屏 (G={g_mean:.0f}, B={frame[:,:,0].mean():.0f}, R={frame[:,:,2].mean():.0f}),尝试修复") try: # 把内存当做 YUYV 数据重新解析 raw_bytes = frame.tobytes() # 3ch w*h 的数据量 = w*h*3 字节 # YUYV 每像素 2 字节,所以一幅 YUYV 图像的总字节 = w*h*2 # 我们只需要取前 w*h*2 字节作为 YUYV 数据 yuyv_len = w * h * 2 if len(raw_bytes) >= yuyv_len: yuyv_img = np.frombuffer(raw_bytes[:yuyv_len], dtype=np.uint8).reshape(h, w, 2) frame = cv2.cvtColor(yuyv_img, cv2.COLOR_YUV2BGR_YUYV) logger.debug("绿屏修复完成") return frame except Exception as e: logger.warning(f"绿屏修复失败: {e}") return None # 情况 3: 全黑帧 if frame.mean() < 5: logger.warning("全黑帧,丢弃") return None # 正常 BGR 帧 return frame def read_frame(self, timeout: float = 2.0) -> Optional[np.ndarray]: """读取一帧(带超时保护,避免 V4L2 select() 永久阻塞)""" if not self._cap or not self._cap.isOpened(): return None from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout pool = ThreadPoolExecutor(max_workers=1) try: fut = pool.submit(self._cap.read) ret, frame = fut.result(timeout=timeout) if not ret or frame is None: return None return self._fix_frame(frame) except FuturesTimeout: logger.warning(f"摄像头 read_frame 超时 ({timeout}s),尝试重建 _cap") self.close() self.open() # 重建后重试一次 if self._cap and self._cap.isOpened(): ret, frame = self._cap.read() if ret and frame is not None: return self._fix_frame(frame) return None except Exception as e: logger.error(f"read_frame 异常: {e}") return None finally: pool.shutdown(wait=False) def detect_qr(self, frame: np.ndarray) -> Optional[str]: """从图像帧中检测二维码""" if frame is None: return None try: # OpenCV 内置二维码检测 data, vertices, _ = self._qr_detector.detectAndDecode(frame) if data and len(data) > 0: return data.strip() except Exception as e: logger.debug(f"二维码检测失败: {e}") return None def scan_once(self) -> Optional[str]: """扫描一次(读取一帧并检测)""" frame = self.read_frame() return self.detect_qr(frame) def scan_with_retry(self, max_attempts: int = 5, interval: float = 0.5) -> Optional[str]: """多次扫描直到成功或达到最大次数""" for i in range(max_attempts): result = self.scan_once() if result: return result time.sleep(interval) return None def get_preview_frame(self) -> Optional[np.ndarray]: """获取预览帧(用于界面显示)""" return self.read_frame() def __enter__(self): self.open() return self def __exit__(self, *args): self.close()