机械臂识别二维码
This commit is contained in:
+37
-49
@@ -1379,56 +1379,44 @@ def api_qr_config_scan(qr_id):
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
# 从机械臂摄像头拉取一帧 JPEG
|
||||
r = requests.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
|
||||
if r.status_code != 200:
|
||||
# 从机械臂摄像头 snapshot 端点拉取一帧 JPEG
|
||||
r = requests.get(ARM_CAMERA_CONFIG.get("snapshot_url", ARM_CAMERA_CONFIG["url"]), timeout=8)
|
||||
if r.status_code != 200 or not r.content:
|
||||
return jsonify({"ok": False, "error": "无法连接机械臂摄像头"}), 400
|
||||
data = b""
|
||||
for chunk in r.iter_content(chunk_size=4096):
|
||||
data += chunk
|
||||
s = data.find(b"\xff\xd8")
|
||||
e = data.find(b"\xff\xd9", s + 2) if s >= 0 else -1
|
||||
if s >= 0 and e > s:
|
||||
r.close()
|
||||
jpg_bytes = data[s:e+2]
|
||||
# 解码为 numpy 数组并检测二维码
|
||||
nparr = np.frombuffer(jpg_bytes, np.uint8)
|
||||
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
if frame is None:
|
||||
return jsonify({"ok": False, "error": "图像解码失败"}), 400
|
||||
# 使用 OpenCV QRCodeDetector 检测
|
||||
detector = cv2.QRCodeDetector()
|
||||
result, _, _ = detector.detectAndDecode(frame)
|
||||
if result and len(result.strip()) > 0:
|
||||
result = result.strip()
|
||||
# 保存到配置项
|
||||
for entry in gs.qr_config:
|
||||
if entry["id"] == qr_id:
|
||||
entry["qr_value"] = result
|
||||
# 尝试匹配机型
|
||||
matched_model = None
|
||||
for model in gs.models_config:
|
||||
prefix = model.get("serial_prefix", "")
|
||||
if prefix and result.startswith(prefix):
|
||||
matched_model = model
|
||||
break
|
||||
if matched_model:
|
||||
entry["model_id"] = matched_model["id"]
|
||||
save_json("qr_config.json", gs.qr_config)
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"qr_value": result,
|
||||
"model_id": entry.get("model_id", ""),
|
||||
"model_name": matched_model["name"] if matched_model else ""
|
||||
})
|
||||
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
|
||||
else:
|
||||
return jsonify({"ok": False, "error": "未检测到二维码"})
|
||||
if len(data) > 1024 * 1024:
|
||||
r.close()
|
||||
return jsonify({"ok": False, "error": "摄像头数据流异常"}), 400
|
||||
r.close()
|
||||
return jsonify({"ok": False, "error": "未收到完整图像帧"}), 400
|
||||
jpg_bytes = r.content
|
||||
# 解码为 numpy 数组并检测二维码
|
||||
nparr = np.frombuffer(jpg_bytes, np.uint8)
|
||||
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
if frame is None:
|
||||
return jsonify({"ok": False, "error": "图像解码失败"}), 400
|
||||
# 使用 OpenCV QRCodeDetector 检测
|
||||
detector = cv2.QRCodeDetector()
|
||||
result, _, _ = detector.detectAndDecode(frame)
|
||||
if result and len(result.strip()) > 0:
|
||||
result = result.strip()
|
||||
# 保存到配置项
|
||||
for entry in gs.qr_config:
|
||||
if entry["id"] == qr_id:
|
||||
entry["qr_value"] = result
|
||||
# 尝试匹配机型
|
||||
matched_model = None
|
||||
for model in gs.models_config:
|
||||
prefix = model.get("serial_prefix", "")
|
||||
if prefix and result.startswith(prefix):
|
||||
matched_model = model
|
||||
break
|
||||
if matched_model:
|
||||
entry["model_id"] = matched_model["id"]
|
||||
save_json("qr_config.json", gs.qr_config)
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"qr_value": result,
|
||||
"model_id": entry.get("model_id", ""),
|
||||
"model_name": matched_model["name"] if matched_model else ""
|
||||
})
|
||||
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
|
||||
else:
|
||||
return jsonify({"ok": False, "error": "未检测到二维码"})
|
||||
except Exception as ex:
|
||||
logger.error(f"QR 扫描机械臂摄像头失败: {ex}")
|
||||
return jsonify({"ok": False, "error": f"扫描失败: {str(ex)}"}), 400
|
||||
|
||||
Reference in New Issue
Block a user