Files
smart-inspection/agv_app/utils/qr_scanner.py
T
FaulknerWu 1429442dbd Rename customs-tablet-frontend to public-frontend and add new features
- Rename customs-tablet-frontend/ to public-frontend/ for broader scope
- Add new pages: customs, inspection with camera integration
- Add new services: apiClient.ts, backendApi.ts, normalizers.ts
- Add CameraFrame component for real-time video streaming
- Add scan_fixer module with clock_publisher and timestamp fix utilities
- Update startup scripts to support new frontend structure
- Update arm_server configuration and service files

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 10:18:20 +08:00

298 lines
10 KiB
Python

"""
二维码识别模块 - 使用 AGV 摄像头识别二维码获取 serialNumber
优先通过 v4l2-ctl 读取 MJPG 帧,避开部分 Jetson/arm64 环境中 OpenCV
直接读取 MJPG 花屏或解码失败的问题;v4l2-ctl 不可用时回退到 OpenCV。
"""
import cv2
import time
import logging
import os
import subprocess
import numpy as np
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from typing import Optional
logger = logging.getLogger(__name__)
try:
from pyzbar.pyzbar import decode as qr_decode
PYZBAR_AVAILABLE = True
except ImportError:
PYZBAR_AVAILABLE = False
logger.warning("pyzbar 未安装,尝试用 OpenCV 内置 QRCodeDetector")
class QRScanner:
"""二维码扫描器。"""
V4L2_CTL = "/usr/bin/v4l2-ctl"
DEFAULT_WIDTH = 640
DEFAULT_HEIGHT = 400
V4L2_STREAM_COUNT = 3
V4L2_READ_TIMEOUT = 5.0
OPENCV_READ_TIMEOUT = 2.0
def __init__(
self,
device_index: int = 0,
width: int = DEFAULT_WIDTH,
height: int = DEFAULT_HEIGHT,
prefer_v4l2_ctl: bool = True,
):
self.device_index = device_index
self.width = width
self.height = height
self.prefer_v4l2_ctl = prefer_v4l2_ctl
self._cap: Optional[cv2.VideoCapture] = None
self._v4l2_ctl_ready = False
self._qr_detector = cv2.QRCodeDetector()
def _device_path(self) -> str:
return f"/dev/video{self.device_index}"
def _build_v4l2_cmd(self, stream_count: int = V4L2_STREAM_COUNT) -> list:
return [
self.V4L2_CTL,
"-d", self._device_path(),
"--set-fmt-video",
f"width={self.width},height={self.height},pixelformat=MJPG",
"--stream-mmap",
"--stream-to=-",
f"--stream-count={stream_count}",
]
def _check_v4l2_ctl(self) -> bool:
if not self.prefer_v4l2_ctl:
return False
if not os.path.exists(self._device_path()):
logger.warning(f"摄像头设备 {self._device_path()} 不存在,回退到 OpenCV")
return False
try:
subprocess.run(
[self.V4L2_CTL, "--version"],
capture_output=True,
timeout=2,
check=False,
)
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
logger.warning(f"v4l2-ctl 不可用: {self.V4L2_CTL},回退到 OpenCV")
return False
def open(self) -> bool:
"""打开摄像头"""
self.close()
self._v4l2_ctl_ready = self._check_v4l2_ctl()
if self._v4l2_ctl_ready:
logger.info(
f"摄像头 {self._device_path()} 使用 v4l2-ctl MJPG 读取,"
f"分辨率 {self.width}x{self.height}"
)
return True
return self._open_opencv_capture()
def _open_opencv_capture(self) -> bool:
try:
self._cap = cv2.VideoCapture(self.device_index, cv2.CAP_V4L2)
if not self._cap.isOpened():
self._cap = cv2.VideoCapture(self.device_index)
if not self._cap.isOpened():
logger.error(f"无法打开摄像头 {self.device_index}")
return False
self._cap.set(cv2.CAP_PROP_CONVERT_RGB, 1)
w = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f"摄像头 {self.device_index} 使用 OpenCV 读取,分辨率 {w}x{h}")
return True
except Exception as e:
logger.error(f"摄像头打开失败: {e}")
return False
def close(self):
if self._cap:
self._cap.release()
self._cap = None
self._v4l2_ctl_ready = False
@staticmethod
def _extract_first_jpeg(data: bytes) -> Optional[bytes]:
"""从 v4l2-ctl 输出流中提取第一帧完整 JPEG。"""
soi = data.find(b"\xff\xd8")
if soi == -1:
return None
eoi = data.find(b"\xff\xd9", soi + 2)
if eoi == -1 or eoi <= soi:
return None
return data[soi:eoi + 2]
def _read_frame_with_v4l2_ctl(self) -> Optional[np.ndarray]:
try:
proc = subprocess.run(
self._build_v4l2_cmd(),
capture_output=True,
timeout=self.V4L2_READ_TIMEOUT,
check=False,
)
jpeg_data = self._extract_first_jpeg(proc.stdout)
if jpeg_data is None or len(jpeg_data) < 100:
return None
frame = cv2.imdecode(
np.frombuffer(jpeg_data, dtype=np.uint8),
cv2.IMREAD_COLOR,
)
if frame is None:
return None
if frame.mean() < 3 or frame.mean() > 250:
return None
return frame
except subprocess.TimeoutExpired:
logger.warning("v4l2-ctl 读取超时")
return None
except Exception as e:
logger.warning(f"v4l2-ctl 读取异常: {e}")
return None
def _fix_frame(self, frame: np.ndarray) -> Optional[np.ndarray]:
"""修复绿屏/格式错误帧,返回修复后的 BGR 帧或 None"""
if frame is None:
return None
h, w = frame.shape[:2]
if h < 10 or w < 10:
return None
ndim = len(frame.shape)
# 情况 1: 2 通道原始 YUYV → 手动转换 BGR
if ndim == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_YUYV)
logger.debug("YUYV 2ch → BGR 转换")
return frame
# 情况 2: 3 通道但实际帧数据显示为 YUYV(绿屏特征:G 通道全满,B/R 近空)
if ndim == 3:
g_mean = frame[:, :, 1].mean()
if g_mean > 80 and frame[:, :, 0].mean() < 30 and frame[:, :, 2].mean() < 30:
# 典型的"Lime"绿屏 — 当做 YUYV 原始数据解码
logger.debug(f"检测到绿屏 (G={g_mean:.0f}, B={frame[:,:,0].mean():.0f}, R={frame[:,:,2].mean():.0f}),尝试修复")
try:
# 把内存当做 YUYV 数据重新解析
raw_bytes = frame.tobytes()
# 3ch w*h 的数据量 = w*h*3 字节
# YUYV 每像素 2 字节,所以一幅 YUYV 图像的总字节 = w*h*2
# 我们只需要取前 w*h*2 字节作为 YUYV 数据
yuyv_len = w * h * 2
if len(raw_bytes) >= yuyv_len:
yuyv_img = np.frombuffer(raw_bytes[:yuyv_len], dtype=np.uint8).reshape(h, w, 2)
frame = cv2.cvtColor(yuyv_img, cv2.COLOR_YUV2BGR_YUYV)
logger.debug("绿屏修复完成")
return frame
except Exception as e:
logger.warning(f"绿屏修复失败: {e}")
return None
# 情况 3: 全黑帧
if frame.mean() < 5:
logger.warning("全黑帧,丢弃")
return None
# 正常 BGR 帧
return frame
def _read_frame_with_opencv(self, timeout: float) -> Optional[np.ndarray]:
if not self._cap or not self._cap.isOpened():
return None
pool = ThreadPoolExecutor(max_workers=1)
try:
fut = pool.submit(self._cap.read)
ret, frame = fut.result(timeout=timeout)
if not ret or frame is None:
return None
return self._fix_frame(frame)
except FuturesTimeout:
logger.warning(f"摄像头 read_frame 超时 ({timeout}s),尝试重建 _cap")
self.close()
self.open()
# 重建后重试一次
if self._cap and self._cap.isOpened():
ret, frame = self._cap.read()
if ret and frame is not None:
return self._fix_frame(frame)
return None
except Exception as e:
logger.error(f"read_frame 异常: {e}")
return None
finally:
pool.shutdown(wait=False)
def read_frame(self, timeout: float = OPENCV_READ_TIMEOUT) -> Optional[np.ndarray]:
"""读取一帧。"""
if self._v4l2_ctl_ready:
frame = self._read_frame_with_v4l2_ctl()
if frame is not None:
return frame
logger.debug("v4l2-ctl 未读到有效帧,尝试 OpenCV 兜底")
if not self._cap or not self._cap.isOpened():
self._open_opencv_capture()
return self._read_frame_with_opencv(timeout)
def detect_qr(self, frame: np.ndarray) -> Optional[str]:
"""从图像帧中检测二维码"""
if frame is None:
return None
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
try:
data, vertices, _ = self._qr_detector.detectAndDecode(frame)
if data and len(data) > 0:
return data.strip()
except Exception as e:
logger.debug(f"OpenCV QR 检测失败: {e}")
if PYZBAR_AVAILABLE:
try:
results = qr_decode(frame)
for res in results:
data = res.data.decode("utf-8").strip()
if data:
return data
except Exception as e:
logger.debug(f"pyzbar 检测失败: {e}")
return None
def scan_once(self) -> Optional[str]:
"""扫描一次(读取一帧并检测)"""
frame = self.read_frame()
return self.detect_qr(frame)
def scan_with_retry(self, max_attempts: int = 5, interval: float = 0.5) -> Optional[str]:
"""多次扫描直到成功或达到最大次数"""
for i in range(max_attempts):
logger.debug(f"QR 扫描尝试 {i + 1}/{max_attempts}")
result = self.scan_once()
if result:
return result
time.sleep(interval)
return None
def get_preview_frame(self) -> Optional[np.ndarray]:
"""获取预览帧(用于界面显示)"""
return self.read_frame()
def __enter__(self):
self.open()
return self
def __exit__(self, *args):
self.close()