diff --git a/agv_app/app.py b/agv_app/app.py index d567c60..c834760 100644 --- a/agv_app/app.py +++ b/agv_app/app.py @@ -1379,56 +1379,44 @@ def api_qr_config_scan(qr_id): try: import cv2 import numpy as np - # 从机械臂摄像头拉取一帧 JPEG - r = requests.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5) - if r.status_code != 200: + # 从机械臂摄像头 snapshot 端点拉取一帧 JPEG + r = requests.get(ARM_CAMERA_CONFIG.get("snapshot_url", ARM_CAMERA_CONFIG["url"]), timeout=8) + if r.status_code != 200 or not r.content: return jsonify({"ok": False, "error": "无法连接机械臂摄像头"}), 400 - data = b"" - for chunk in r.iter_content(chunk_size=4096): - data += chunk - s = data.find(b"\xff\xd8") - e = data.find(b"\xff\xd9", s + 2) if s >= 0 else -1 - if s >= 0 and e > s: - r.close() - jpg_bytes = data[s:e+2] - # 解码为 numpy 数组并检测二维码 - nparr = np.frombuffer(jpg_bytes, np.uint8) - frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) - if frame is None: - return jsonify({"ok": False, "error": "图像解码失败"}), 400 - # 使用 OpenCV QRCodeDetector 检测 - detector = cv2.QRCodeDetector() - result, _, _ = detector.detectAndDecode(frame) - if result and len(result.strip()) > 0: - result = result.strip() - # 保存到配置项 - for entry in gs.qr_config: - if entry["id"] == qr_id: - entry["qr_value"] = result - # 尝试匹配机型 - matched_model = None - for model in gs.models_config: - prefix = model.get("serial_prefix", "") - if prefix and result.startswith(prefix): - matched_model = model - break - if matched_model: - entry["model_id"] = matched_model["id"] - save_json("qr_config.json", gs.qr_config) - return jsonify({ - "ok": True, - "qr_value": result, - "model_id": entry.get("model_id", ""), - "model_name": matched_model["name"] if matched_model else "" - }) - return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404 - else: - return jsonify({"ok": False, "error": "未检测到二维码"}) - if len(data) > 1024 * 1024: - r.close() - return jsonify({"ok": False, "error": "摄像头数据流异常"}), 400 - r.close() - return jsonify({"ok": False, "error": "未收到完整图像帧"}), 400 + jpg_bytes = r.content + # 解码为 numpy 数组并检测二维码 + nparr = np.frombuffer(jpg_bytes, np.uint8) + frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + if frame is None: + return jsonify({"ok": False, "error": "图像解码失败"}), 400 + # 使用 OpenCV QRCodeDetector 检测 + detector = cv2.QRCodeDetector() + result, _, _ = detector.detectAndDecode(frame) + if result and len(result.strip()) > 0: + result = result.strip() + # 保存到配置项 + for entry in gs.qr_config: + if entry["id"] == qr_id: + entry["qr_value"] = result + # 尝试匹配机型 + matched_model = None + for model in gs.models_config: + prefix = model.get("serial_prefix", "") + if prefix and result.startswith(prefix): + matched_model = model + break + if matched_model: + entry["model_id"] = matched_model["id"] + save_json("qr_config.json", gs.qr_config) + return jsonify({ + "ok": True, + "qr_value": result, + "model_id": entry.get("model_id", ""), + "model_name": matched_model["name"] if matched_model else "" + }) + return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404 + else: + return jsonify({"ok": False, "error": "未检测到二维码"}) except Exception as ex: logger.error(f"QR 扫描机械臂摄像头失败: {ex}") return jsonify({"ok": False, "error": f"扫描失败: {str(ex)}"}), 400