This commit is contained in:
ywb
2026-06-16 14:17:05 +08:00
parent 62292edc70
commit 916b44bc3c
10 changed files with 725 additions and 133 deletions
+263 -24
View File
@@ -8,10 +8,11 @@ import time
import logging
import threading
import subprocess
import requests
from flask import Flask, render_template, jsonify, request, Response, send_from_directory
from flask_cors import CORS
from config import SERVER_CONFIG, ARM_CONFIG, AGV_CONFIG, UPLOAD_CONFIG, MAP_CONFIG, ARM_CAMERA_CONFIG, CAMERA_CONFIG, DATA_DIR, State
from config import SERVER_CONFIG, ARM_CONFIG, AGV_CONFIG, UPLOAD_CONFIG, MAP_CONFIG, ARM_CAMERA_CONFIG, CAMERA_CONFIG, DATA_DIR, State, ZHIJIAN_BASE_URL, ZHIJIAN_AUTH_TOKEN, set_api_mode
from utils.arm_client import ArmClient
from utils.agv_controller_ros2 import AGVController
from utils.qr_scanner import QRScanner
@@ -54,6 +55,7 @@ class GlobalState:
self.qr_config = [] # 二维码配置(独立点位列表)
self.navigator = None # Nav2Navigator 实例
self.current_customs = None # 当前设定的报关单信息
self.inspection = None # 查验状态 {customs_id, customs_name, items: [{inventoryCode, inventoryName, spec, quantify, inspected}]}
self.error_msg = "" # 错误弹窗消息(waiting_error 状态时)
self.lock = threading.Lock()
@@ -1213,10 +1215,8 @@ def api_arm_camera_refresh():
@app.route("/api/camera/arm_preview")
def api_arm_camera_preview():
"""代理机械臂 MJPEG 视频流,逐帧上下翻转后返回"""
"""代理机械臂 MJPEG 视频流,直透返回(不翻转)"""
import requests
import cv2
import numpy as np
try:
upstream = requests.get(
ARM_CAMERA_CONFIG["url"],
@@ -1241,16 +1241,8 @@ def api_arm_camera_preview():
if start != -1 and end != -1 and end > start:
jpg_data = buf[start:end+2]
buf = buf[end+2:]
# 解码 → 上下翻转 → 编码
img = cv2.imdecode(np.frombuffer(jpg_data, dtype=np.uint8), cv2.IMREAD_COLOR)
if img is not None:
img = cv2.flip(img, 0)
ret, flipped_jpg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85])
if ret:
yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + flipped_jpg.tobytes() + b"\r\n"
else:
# 解码失败,直透原始帧(不应发生)
yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + jpg_data + b"\r\n"
# 直透原始帧(不翻转)
yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + jpg_data + b"\r\n"
else:
break
finally:
@@ -1344,6 +1336,11 @@ def api_agv_reset():
def api_mission_start():
"""开始执行任务(V3: M×N Grid 蛇形路径)"""
data = request.json or {}
# 必须先设置报关单(开始查验)
if not gs.inspection:
return jsonify({"ok": False, "error": "请先在「设置→报关单」中选择报关单并点击「开始查验」"}), 400
single_step = bool(data.get("single_step", False))
# 任务步骤控制开关
options = {
@@ -1538,6 +1535,18 @@ def api_mission_state():
result["waiting_step"] = False
result["waiting_error"] = False
# 查验状态
result["inspection"] = gs.inspection
# QR 输入弹窗消息
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
rpt = MissionExecutorV3._instance.report
result["qr_message"] = rpt.get("qr_message", "")
result["step_label"] = rpt.get("step_label", "")
else:
result["qr_message"] = ""
result["step_label"] = ""
return jsonify(result)
@app.route("/api/mission/log", methods=["GET"])
@@ -1755,10 +1764,45 @@ def api_qr_config_scan(qr_id):
# ========== 环境切换 API ==========
@app.route("/api/config/mode", methods=["GET"])
def api_config_mode_get():
"""获取当前 API 环境模式"""
import config
return jsonify({
"ok": True,
"test_mode": config.TEST_MODE,
"base_url": config.ZHIJIAN_BASE_URL,
"label": "测试环境" if config.TEST_MODE else "正式环境"
})
@app.route("/api/config/mode", methods=["POST"])
def api_config_mode_set():
"""切换 API 环境"""
body = request.get_json(silent=True) or {}
test_mode = body.get("test_mode", True)
set_api_mode(test_mode)
import config
logger.info(f"API 环境已切换为: {'测试' if test_mode else '正式'}{config.ZHIJIAN_BASE_URL}")
return jsonify({
"ok": True,
"test_mode": config.TEST_MODE,
"base_url": config.ZHIJIAN_BASE_URL,
"label": "测试环境" if config.TEST_MODE else "正式环境"
})
# ========== 报关单接口(代理外部 API ==========
_ZHIJIAN_BASE = "https://ts.zhijian168.com/prod-api/zhijian/integration"
_ZHIJIAN_AUTH = "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VyX2tleSI6ImZhNTNkZTZiLWE3NjYtNDZmNC05MDUyLTQ2MjUzZTAyNjdmNSIsInVzZXJuYW1lIjoiYWRtaW4ifQ.lC4vKThZo4aAOLsekm2kPgaEJRqRx-YDQWKfHFqxdPNESCKy57l3eIqaKTj2ZjAMaoYAwYlMrv5M1zAOJsO_PA"
def _get_zhijian_base():
"""动态获取报关单 API base,跟随环境切换"""
import config
base = f"{config.ZHIJIAN_BASE_URL}{config.API_PREFIX}/zhijian/integration"
return base
_ZHIJIAN_AUTH = ZHIJIAN_AUTH_TOKEN
@app.route("/api/customs/list")
@@ -1767,36 +1811,49 @@ def api_customs_list():
import requests
page = request.args.get("pageNum", 1)
size = request.args.get("pageSize", 50)
url = f"{_ZHIJIAN_BASE}/customsListPage?pageNum={page}&pageSize={size}"
url = f"{_get_zhijian_base()}/customsListPage?pageNum={page}&pageSize={size}"
logger.info(f"[customs/list] 🔍 请求 → {url}")
try:
r = requests.get(url, headers={"Authorization": _ZHIJIAN_AUTH}, timeout=15)
logger.info(f"[customs/list] 📡 响应 HTTP {r.status_code}, body长度={len(r.text)}")
if r.status_code != 200:
logger.warning(f"[customs/list] ⚠️ 返回非200: {r.status_code}")
return jsonify({"ok": False, "error": f"报关单API返回 {r.status_code}"}), 502
data = r.json()
rows = data.get("rows", [])
logger.info(f"[customs/list] ✅ 获取到 {len(rows)} 条报关单")
return jsonify({"ok": True, "data": data})
except Exception as e:
logger.error(f"获取报关单列表失败: {e}")
logger.error(f"[customs/list] ❌ 失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 502
@app.route("/api/customs/machines")
def api_customs_machines():
"""根据报关单 ID 获取机器列表(代理外部 API)
查询参数: customsId=xxx
数据源:cjt_customs_item 表 → Java customsMachines 接口
"""
import requests
customs_id = request.args.get("customsId", "")
logger.info(f"[customs/machines] 📥 customsId={customs_id}")
if not customs_id:
logger.warning("[customs/machines] ⚠️ 缺少 customsId")
return jsonify({"ok": False, "error": "缺少 customsId 参数"}), 400
url = f"{_ZHIJIAN_BASE}/customsMachines?customsId={customs_id}"
try:
url = f"{_get_zhijian_base()}/customsMachines?customsId={customs_id}"
logger.info(f"[customs/machines] 🔍 请求 → {url}")
r = requests.get(url, headers={"Authorization": _ZHIJIAN_AUTH}, timeout=15)
logger.info(f"[customs/machines] 📡 响应 HTTP {r.status_code}, body长度={len(r.text)}")
if r.status_code != 200:
return jsonify({"ok": False, "error": f"机器列表API返回 {r.status_code}"}), 502
data = r.json()
return jsonify({"ok": True, "data": data})
logger.warning(f"[customs/machines] ⚠️ 返回非200: {r.status_code}, body={r.text[:300]}")
return jsonify({"ok": False, "error": "机器列表API返回非200"}), 502
result = r.json()
machines = result.get("data") or []
logger.info(f"[customs/machines] ✅ 获取到 {len(machines)} 条机器记录")
return jsonify({"ok": True, "data": result})
except Exception as e:
logger.error(f"获取报关单机器列表失败: {e}")
logger.error(f"[customs/machines] ❌ 失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 502
@@ -1820,6 +1877,188 @@ def api_customs_selected_get():
return jsonify({"ok": True, "customs": c})
# ========== 查验 API ==========
@app.route("/api/customs/inspection/start", methods=["POST"])
def api_customs_inspection_start():
"""开始查验:加载报关单机器列表,初始化查验计数"""
data = request.json or {}
customs_id = data.get("customsId", "")
if not customs_id:
return jsonify({"ok": False, "error": "缺少 customsId"}), 400
# 获取报关单机器列表
try:
machines_url = f"{_get_zhijian_base()}/customsMachines?customsId={customs_id}"
logger.info(f"[inspection/start] 🔍 获取机器列表 → {machines_url}")
r = requests.get(
machines_url,
headers={"Authorization": _ZHIJIAN_AUTH},
timeout=15
)
logger.info(f"[inspection/start] 📡 机器列表响应 HTTP {r.status_code}, body长度={len(r.text)}")
if r.status_code != 200:
logger.warning(f"[inspection/start] ⚠️ 返回非200: {r.status_code}, body={r.text[:300]}")
return jsonify({"ok": False, "error": f"接口返回 {r.status_code}"}), 502
j = r.json()
machines = j.get("data") or []
logger.info(f"[inspection/start] ✅ 获取到 {len(machines)} 条机器记录")
except Exception as e:
logger.error(f"[inspection/start] ❌ 获取机器列表失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 502
# 按 inventoryCode 聚合(同一物料可能有多条序列号记录)
items_dict = {}
for m in machines:
code = m.get("inventoryCode") or m.get("machineCode") or "unknown"
if code not in items_dict:
items_dict[code] = {
"inventoryCode": code,
"inventoryName": m.get("inventoryName") or m.get("machineName") or "-",
"spec": m.get("inventorySpecification") or m.get("spec") or "-",
"quantify": 0,
"inspected": 0,
}
# 累计数量(quantify 字段可能来自 customsMachines 的返回值)
q = m.get("quantify", 0)
if q:
items_dict[code]["quantify"] += int(float(q))
# 如果 quantify 全部为 0,用机器条目数作为数量
for item in items_dict.values():
if item["quantify"] == 0:
item["quantify"] = sum(1 for m in machines if (m.get("inventoryCode") or "") == item["inventoryCode"])
logger.info(f"[inspection/start] 📊 聚合结果: {len(items_dict)} 种机型, total {sum(i['quantify'] for i in items_dict.values())}")
# 获取报关单名称
customs_name = data.get("customsName") or customs_id
try:
list_url = f"{_get_zhijian_base()}/customsListPage?pageNum=1&pageSize=100"
logger.info(f"[inspection/start] 🔍 获取报关单名称 → {list_url}")
r2 = requests.get(
list_url,
headers={"Authorization": _ZHIJIAN_AUTH},
timeout=15
)
if r2.status_code == 200:
j2 = r2.json()
for row in j2.get("rows", []):
c = row.get("customs", {})
if str(c.get("id", "")) == str(customs_id):
customs_name = c.get("customsCode") or row.get("orderCode") or customs_name
break
except:
pass
gs.inspection = {
"customsId": customs_id,
"customsName": customs_name,
"items": list(items_dict.values()),
"startedAt": time.time(),
}
logger.info(f"开始查验: {customs_name} ({len(gs.inspection['items'])} 种机型,共 {sum(i['quantify'] for i in gs.inspection['items'])} 台)")
return jsonify({"ok": True, "inspection": gs.inspection})
@app.route("/api/customs/inspection", methods=["GET"])
def api_customs_inspection():
"""获取当前查验状态"""
return jsonify({"ok": True, "inspection": gs.inspection})
@app.route("/api/customs/inspection/end", methods=["POST"])
def api_customs_inspection_end():
"""结束查验"""
gs.inspection = None
return jsonify({"ok": True})
@app.route("/api/customs/printer", methods=["GET"])
def api_customs_printer():
"""代理 /zhijian/profile/printer 查询,同时更新查验计数
GET ?serialNumber=xxx
"""
sn = request.args.get("serialNumber", "").strip()
logger.info(f"[printer] 📥 收到查询请求 serialNumber={sn}")
if not sn:
logger.warning("[printer] ⚠️ 缺少 serialNumber 参数")
return jsonify({"ok": False, "error": "缺少 serialNumber"}), 400
# 调用 Java profile/printer 接口
api_base = _get_zhijian_base().rstrip("/")
profile_url = f"{api_base[:api_base.rfind('/')]}/profile/printer?serialNumber={sn}"
logger.info(f"[printer] 🔍 请求 Java → {profile_url}")
try:
r = requests.get(profile_url, headers={"Authorization": _ZHIJIAN_AUTH}, timeout=15)
logger.info(f"[printer] 📡 Java响应 HTTP {r.status_code}: {r.text[:500]}")
if r.status_code != 200:
logger.warning(f"[printer] ⚠️ Java返回非200: {r.status_code}")
return jsonify({"ok": False, "error": f"profile/printer 返回 {r.status_code}"}), 502
j = r.json()
except Exception as e:
logger.error(f"[printer] ❌ 查询 Java printer 失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 502
data = j.get("data", j)
printer = data.get("printer")
order_item = data.get("orderItem")
logger.info(f"[printer] 📊 Java返回解析: printer={'yes' if printer else 'no'}, orderItem={'yes' if order_item else 'no'}")
result = {
"ok": True,
"printer": printer,
"orderItem": order_item,
"modelName": "机器1", # 默认
"inventoryCode": None,
"matchedItem": None,
"hasInspection": gs.inspection is not None,
}
# 提取 inventory 信息(优先级: orderItem.inventory > printer.inventory > printer.model/machineModel
if order_item and order_item.get("inventory"):
inv = order_item["inventory"]
result["modelName"] = inv.get("inventoryName") or inv.get("name") or "机器1"
result["inventoryCode"] = inv.get("inventoryCode") or inv.get("code")
logger.info(f"[printer] 🏷️ 从 orderItem.inventory 提取: modelName={result['modelName']}, inventoryCode={result['inventoryCode']}")
elif printer and printer.get("inventory"):
inv = printer["inventory"]
result["modelName"] = inv.get("inventoryName") or inv.get("name") or "机器1"
result["inventoryCode"] = inv.get("inventoryCode") or inv.get("code")
logger.info(f"[printer] 🏷️ 从 printer.inventory 提取: modelName={result['modelName']}, inventoryCode={result['inventoryCode']}")
elif printer:
result["modelName"] = printer.get("model") or printer.get("machineModel") or "机器1"
logger.info(f"[printer] 🏷️ 从 printer.model/machineModel 提取: modelName={result['modelName']}")
else:
logger.warning(f"[printer] ⚠️ printer 和 orderItem 均为空,回退 modelName=机器1")
# 更新查验计数
if gs.inspection and result["inventoryCode"]:
for item in gs.inspection["items"]:
if item["inventoryCode"] == result["inventoryCode"]:
item["inspected"] += 1
result["matchedItem"] = item
logger.info(f"[printer] ✅ 查验计数更新: {item['inventoryName']}{item['inspected']}/{item['quantify']}")
break
else:
logger.warning(f"[printer] ⚠️ inventoryCode={result['inventoryCode']} 不在查验清单中")
logger.info(f"[printer] 📤 返回结果: modelName={result['modelName']}, inventoryCode={result['inventoryCode']}, hasInspection={result['hasInspection']}, matched={'yes' if result['matchedItem'] else 'no'}")
return jsonify(result)
@app.route("/api/customs/inspection/update", methods=["POST"])
def api_customs_inspection_update():
"""直接更新查验计数(由执行器调用)"""
data = request.json or {}
code = data.get("inventoryCode", "")
if not gs.inspection or not code:
return jsonify({"ok": False})
for item in gs.inspection["items"]:
if item["inventoryCode"] == code:
item["inspected"] += 1
return jsonify({"ok": True, "item": item})
return jsonify({"ok": False, "error": "未匹配"})
# ========== 静态资源 ==========
@app.route("/photos/<name>")
def photos(name):