加入二维码设置

This commit is contained in:
ywb
2026-05-20 20:02:31 +08:00
parent f05d6ea059
commit 084faad1b2
5 changed files with 738 additions and 325 deletions
+172
View File
@@ -51,6 +51,7 @@ class GlobalState:
"positions": [] # 独立点位配置 [{row, col, side, coords, poses}]
}
self.machines_config = [] # 机器配置(每台机器的正面/背面点位+姿态)
self.qr_config = [] # 二维码配置(独立点位列表)
self.navigator = None # Nav2Navigator 实例
self.lock = threading.Lock()
@@ -112,6 +113,12 @@ def load_persisted_config():
gs.machines_config = machines_cfg
print(f"[启动] 加载机器配置: {len(machines_cfg)} 台机器")
# 加载二维码配置
qr_cfg = load_json("qr_config.json", [])
if qr_cfg:
gs.qr_config = qr_cfg
print(f"[启动] 加载二维码配置: {len(qr_cfg)} 个点位")
# 在 Flask 2.3+ 使用 @app.before_serving,兼容旧版用 before_first_request
try:
from flask import has_app_context
@@ -783,6 +790,7 @@ def api_mission_machine_add():
"col": data.get("col", 0),
"front": data.get("front", {"coords": [0, 0, 0], "poses": []}),
"back": data.get("back", {"coords": [0, 0, 0], "poses": []}),
"qr": data.get("qr", {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}),
}
gs.machines_config.append(machine)
save_json("machines_config.json", gs.machines_config)
@@ -807,6 +815,39 @@ def api_mission_machine_delete(machine_id):
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
@app.route("/api/mission/qr_scan/<machine_id>", methods=["POST"])
def api_mission_qr_scan(machine_id):
"""扫描二维码并关联到机器"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return jsonify({"ok": False, "error": "AGV 摄像头未打开"}), 400
result = gs.qr_scanner.scan_once()
if result:
# 在 machines_config 和 mission_config 中查找机器
for i, m in enumerate(gs.machines_config):
if m["id"] == machine_id:
if "qr" not in m:
m["qr"] = {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}
m["qr"]["qr_value"] = result
# 尝试匹配机型(通过 serial_prefix
matched_model = None
for model in gs.models_config:
prefix = model.get("serial_prefix", "")
if prefix and result.startswith(prefix):
matched_model = model
break
if matched_model:
m["qr"]["model_id"] = matched_model["id"]
save_json("machines_config.json", gs.machines_config)
return jsonify({
"ok": True,
"qr_value": result,
"model_id": m["qr"].get("model_id", ""),
"model_name": matched_model["name"] if matched_model else ""
})
return jsonify({"ok": False, "error": f"机器 {machine_id} 不存在"}), 404
return jsonify({"ok": False, "error": "未检测到二维码"})
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["GET"])
def api_mission_poses_get(machine_id, side):
"""获取机器指定侧的姿态列表(side: front | back"""
@@ -1272,6 +1313,137 @@ def api_mission_state():
return jsonify({"state": gs.state})
# ========== 二维码配置 API ==========
@app.route("/api/qr/configs", methods=["GET"])
def api_qr_configs_get():
"""获取所有二维码配置"""
return jsonify({"ok": True, "configs": gs.qr_config})
@app.route("/api/qr/configs", methods=["POST"])
def api_qr_configs_add():
"""添加二维码配置"""
data = request.json or {}
new_entry = {
"id": "qr_" + str(int(time.time() * 1000)),
"name": data.get("name", f"二维码{len(gs.qr_config) + 1}"),
"joint_angles": data.get("joint_angles", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
"qr_value": "",
"model_id": "",
}
gs.qr_config.append(new_entry)
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True, "entry": new_entry})
@app.route("/api/qr/configs/<qr_id>", methods=["PUT"])
def api_qr_configs_update(qr_id):
"""更新二维码配置"""
data = request.json or {}
for entry in gs.qr_config:
if entry["id"] == qr_id:
if "name" in data:
entry["name"] = data["name"]
if "joint_angles" in data:
entry["joint_angles"] = data["joint_angles"]
if "qr_value" in data:
entry["qr_value"] = data["qr_value"]
if "model_id" in data:
entry["model_id"] = data["model_id"]
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True, "entry": entry})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
@app.route("/api/qr/configs/<qr_id>", methods=["DELETE"])
def api_qr_configs_delete(qr_id):
"""删除二维码配置"""
for i, entry in enumerate(gs.qr_config):
if entry["id"] == qr_id:
gs.qr_config.pop(i)
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
@app.route("/api/qr/configs/<qr_id>/read-angles", methods=["POST"])
def api_qr_read_angles(qr_id):
"""读取机械臂当前关节角度并保存到指定二维码配置"""
if not gs.arm_client:
return jsonify({"ok": False, "error": "机械臂未连接"}), 400
ok, angles = gs.arm_client.get_angles()
if not ok or not angles:
return jsonify({"ok": False, "error": "无法读取机械臂角度"}), 400
for entry in gs.qr_config:
if entry["id"] == qr_id:
entry["joint_angles"] = list(angles)
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True, "joint_angles": entry["joint_angles"]})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
@app.route("/api/qr/scan/<qr_id>", methods=["POST"])
def api_qr_config_scan(qr_id):
"""获取机械臂摄像头图像,识别二维码并保存到指定配置项"""
import requests
try:
import cv2
import numpy as np
# 从机械臂摄像头拉取一帧 JPEG
r = requests.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
if r.status_code != 200:
return jsonify({"ok": False, "error": "无法连接机械臂摄像头"}), 400
data = b""
for chunk in r.iter_content(chunk_size=4096):
data += chunk
s = data.find(b"\xff\xd8")
e = data.find(b"\xff\xd9", s + 2) if s >= 0 else -1
if s >= 0 and e > s:
r.close()
jpg_bytes = data[s:e+2]
# 解码为 numpy 数组并检测二维码
nparr = np.frombuffer(jpg_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return jsonify({"ok": False, "error": "图像解码失败"}), 400
# 使用 OpenCV QRCodeDetector 检测
detector = cv2.QRCodeDetector()
result, _, _ = detector.detectAndDecode(frame)
if result and len(result.strip()) > 0:
result = result.strip()
# 保存到配置项
for entry in gs.qr_config:
if entry["id"] == qr_id:
entry["qr_value"] = result
# 尝试匹配机型
matched_model = None
for model in gs.models_config:
prefix = model.get("serial_prefix", "")
if prefix and result.startswith(prefix):
matched_model = model
break
if matched_model:
entry["model_id"] = matched_model["id"]
save_json("qr_config.json", gs.qr_config)
return jsonify({
"ok": True,
"qr_value": result,
"model_id": entry.get("model_id", ""),
"model_name": matched_model["name"] if matched_model else ""
})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
else:
return jsonify({"ok": False, "error": "未检测到二维码"})
if len(data) > 1024 * 1024:
r.close()
return jsonify({"ok": False, "error": "摄像头数据流异常"}), 400
r.close()
return jsonify({"ok": False, "error": "未收到完整图像帧"}), 400
except Exception as ex:
logger.error(f"QR 扫描机械臂摄像头失败: {ex}")
return jsonify({"ok": False, "error": f"扫描失败: {str(ex)}"}), 400
# ========== 静态资源 ==========
@app.route("/photos/<name>")
def photos(name):