Files
smart-inspection/agv_app/app.py
T
2026-06-08 11:42:41 +08:00

1709 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AGV 拍摄系统 - Flask 主程序
运行在 AGV 上,端口 5000
"""
import os
import json
import time
import logging
import threading
import subprocess
from flask import Flask, render_template, jsonify, request, Response, send_from_directory
from flask_cors import CORS
from config import SERVER_CONFIG, ARM_CONFIG, AGV_CONFIG, UPLOAD_CONFIG, MAP_CONFIG, ARM_CAMERA_CONFIG, CAMERA_CONFIG, DATA_DIR, State
from utils.arm_client import ArmClient
from utils.agv_controller_ros2 import AGVController
from utils.qr_scanner import QRScanner
from utils.image_uploader import ImageUploader
from utils.mission_executor import MissionExecutorV3
from utils.nav2_navigator import Nav2Navigator, Nav2Status
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("agv_app")
app = Flask(__name__, template_folder="templates", static_folder="static", static_url_path="/static")
app.config["SECRET_KEY"] = SERVER_CONFIG["secret_key"]
CORS(app)
# ========== 全局状态 ==========
class GlobalState:
def __init__(self):
self.state = State.IDLE # setting / running / paused / idle
self.mission_data = None # 当前任务配置
self.mission_report = None # 任务执行报告
self.arm_client = None # ArmClient 实例
self.agv_controller = None # AGVController 实例
self.qr_scanner = None # QRScanner 实例
self.camera_opened = False
self.arm_camera_opened = False
self.map_config = {} # 地图配置
self.points_config = [] # 点位配置
self.models_config = [] # 机型配置(姿态)
self.mission_config = { # 任务配置(M×N网格)
"rows": 2,
"cols": 3,
"grid": [], # M×N 布尔矩阵
"positions": [] # 独立点位配置 [{row, col, side, coords, poses}]
}
self.machines_config = [] # 机器配置(每台机器的正面/背面点位+姿态)
self.qr_config = [] # 二维码配置(独立点位列表)
self.navigator = None # Nav2Navigator 实例
self.error_msg = "" # 错误弹窗消息(waiting_error 状态时)
self.lock = threading.Lock()
def reset(self):
with self.lock:
self.state = State.IDLE
self.mission_data = None
self.mission_report = None
gs = GlobalState()
# ========== 辅助函数 ==========
def get_data_path(name):
return os.path.join(DATA_DIR, name)
def save_json(name, data):
with open(get_data_path(name), "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_json(name, default=None):
path = get_data_path(name)
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
return default if default is not None else {}
# ========== 启动时加载持久化配置 ==========
def load_persisted_config():
"""Flask 启动时加载已保存的配置"""
# 加载地图配置
map_cfg = load_json("map_config.json")
if map_cfg and "map_yaml" in map_cfg:
gs.map_config = map_cfg
print(f"[启动] 加载地图配置: {map_cfg['map_yaml']}")
# 加载点位配置
points_cfg = load_json("points_config.json", [])
if points_cfg:
gs.points_config = points_cfg
print(f"[启动] 加载点位配置: {len(points_cfg)} 个点位")
# 加载机型配置(姿态)
models_cfg = load_json("models_config.json", [])
if models_cfg:
gs.models_config = models_cfg
print(f"[启动] 加载机型配置: {len(models_cfg)} 个机型")
# 加载任务配置(网格尺寸)
mission_cfg = load_json("mission_config.json", {})
if mission_cfg:
gs.mission_config = mission_cfg
print(f"[启动] 加载任务配置: {mission_cfg.get('rows', 0)}行×{mission_cfg.get('cols', 0)}")
# 加载机器配置(正面/背面点位+姿态)
machines_cfg = load_json("machines_config.json", [])
if machines_cfg:
gs.machines_config = machines_cfg
print(f"[启动] 加载机器配置: {len(machines_cfg)} 台机器")
# 加载二维码配置
qr_cfg = load_json("qr_config.json", [])
if qr_cfg:
gs.qr_config = qr_cfg
print(f"[启动] 加载二维码配置: {len(qr_cfg)} 个点位")
# 在 Flask 2.3+ 使用 @app.before_serving,兼容旧版用 before_first_request
try:
from flask import has_app_context
# Flask 2.3+ 方式
with app.app_context():
load_persisted_config()
# 启动时自动连接所有设备(异步,不阻塞 Flask 启动)
import threading
def _auto_connect_all():
time.sleep(2) # 等待 Flask 完全就绪
# 连接 AGV
try:
from utils.agv_controller_ros2 import AGVController
gs.agv_controller = AGVController()
if gs.agv_controller.connect():
print("[启动] AGV 自动连接成功")
else:
print("[启动] AGV 自动连接失败,请手动连接")
except Exception as e:
print(f"[启动] AGV 自动连接异常: {e}")
# 连接机械臂
try:
from utils.arm_client import ArmClient
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
if gs.arm_client.connect():
gs.arm_client.power_on()
gs.arm_client.state_on()
print("[启动] 机械臂自动连接成功")
else:
print("[启动] 机械臂自动连接失败,请手动连接")
except Exception as e:
print(f"[启动] 机械臂自动连接异常: {e}")
threading.Thread(target=_auto_connect_all, daemon=True).start()
except:
# 兼容旧版 Flask
@app.before_first_request
def startup_load():
load_persisted_config()
# ========== 页面路由 ==========
@app.route("/")
def index():
return render_template("index.html")
@app.route("/setting")
def setting_page():
return render_template("setting.html")
@app.route("/running")
def running_page():
return render_template("running.html")
# ========== 系统状态 API ==========
@app.route("/api/status")
def api_status():
"""获取系统整体状态"""
with gs.lock:
# 实际验证机械臂连接(尝试发送一个简单命令)
arm_connected = False
if gs.arm_client and gs.arm_client._sock:
try:
# 设置短超时尝试获取角度,验证连接是否有效
gs.arm_client._sock.settimeout(2)
ok, _ = gs.arm_client.get_angles()
arm_connected = ok
except:
arm_connected = False
# 连接已断开,清理 socket
if gs.arm_client:
gs.arm_client._sock = None
# 实际验证 AGV 连接
agv_connected = False
if gs.agv_controller:
agv_connected = gs.agv_controller.is_connected()
# 实时检测机械臂摄像头是否可用
try:
import requests as _armcam_req
_r = _armcam_req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=3)
gs.arm_camera_opened = (_r.status_code == 200)
_r.close()
except:
gs.arm_camera_opened = False
return jsonify({
"state": gs.state,
"agv_connected": agv_connected,
"arm_connected": arm_connected,
"camera_opened": gs.camera_opened,
"arm_camera_opened": gs.arm_camera_opened,
"map_loaded": bool(gs.map_config),
"points_count": len(gs.points_config),
"models_count": len(gs.models_config),
"mission_rows": gs.mission_config.get("rows", 0),
"mission_cols": gs.mission_config.get("cols", 0),
"machines_count": len(gs.machines_config)
})
@app.route("/api/system/connect", methods=["POST"])
def api_connect():
"""连接 AGV 和机械臂"""
results = {"agv": False, "arm": False, "camera": False, "errors": []}
# 连接 AGV
try:
gs.agv_controller = AGVController()
results["agv"] = gs.agv_controller.connect()
except Exception as e:
results["errors"].append(f"AGV: {e}")
# 连接机械臂
try:
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
results["arm"] = gs.arm_client.connect()
if results["arm"]:
# 尝试上电并激活
gs.arm_client.power_on()
time.sleep(0.5)
gs.arm_client.state_on()
except Exception as e:
results["errors"].append(f"机械臂: {e}")
# 打开摄像头
try:
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
results["camera"] = gs.qr_scanner.open()
gs.camera_opened = results["camera"]
except Exception as e:
results["errors"].append(f"摄像头: {e}")
# 检查机械臂摄像头
try:
import requests as _req
r2 = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=3)
gs.arm_camera_opened = (r2.status_code == 200)
r2.close()
except:
gs.arm_camera_opened = False
all_ok = results["agv"] and results["arm"] and results["camera"]
with gs.lock:
if all_ok:
gs.state = State.SETTING
else:
gs.state = State.IDLE
return jsonify(results)
@app.route("/api/system/disconnect", methods=["POST"])
def api_disconnect():
"""断开所有连接"""
if gs.arm_client:
gs.arm_client.close()
gs.arm_client = None
if gs.agv_controller:
gs.agv_controller.disconnect()
gs.agv_controller = None
if gs.qr_scanner:
gs.qr_scanner.close()
gs.qr_scanner = None
gs.camera_opened = False
gs.state = State.IDLE
return jsonify({"ok": True})
@app.route("/api/device/connect", methods=["POST"])
def api_device_connect():
"""连接单个设备,device: agv | arm | camera | arm_camera"""
data = request.json or {}
device = data.get("device", "")
result = {"device": device, "ok": False, "error": ""}
done_event = threading.Event()
res_holder = {}
def _do_connect():
try:
if device == "agv":
gs.agv_controller = AGVController()
res_holder["ok"] = gs.agv_controller.connect()
if not res_holder["ok"]:
res_holder["error"] = "AGV 连接失败,请检查网络或 ROS2"
elif device == "arm":
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
res_holder["ok"] = gs.arm_client.connect()
if res_holder["ok"]:
gs.arm_client.power_on()
time.sleep(0.3)
gs.arm_client.state_on()
else:
res_holder["error"] = "机械臂连接失败"
elif device == "camera":
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
res_holder["ok"] = gs.qr_scanner.open()
gs.camera_opened = res_holder["ok"]
if not res_holder["ok"]:
res_holder["error"] = "AGV 摄像头打开失败"
elif device == "arm_camera":
import requests as _req
r = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
res_holder["ok"] = (r.status_code == 200)
r.close()
gs.arm_camera_opened = res_holder["ok"]
if not res_holder["ok"]:
res_holder["error"] = "机械臂摄像头无响应"
else:
res_holder["error"] = f"未知设备: {device}"
except Exception as e:
res_holder["error"] = str(e)
finally:
done_event.set()
t = threading.Thread(target=_do_connect)
t.daemon = True
t.start()
# 最多等 10 秒,超时则返回失败
if not done_event.wait(10):
result["ok"] = False
result["error"] = "连接超时(10秒),设备无响应"
else:
result["ok"] = res_holder.get("ok", False)
result["error"] = res_holder.get("error", "")
return jsonify(result)
# ========== 地图配置 API ==========
@app.route("/api/map/load", methods=["POST"])
def api_map_load():
"""加载地图文件"""
data = request.json
map_dir = data.get("map_dir", MAP_CONFIG["map_dir"])
map_file = data.get("map_file", MAP_CONFIG["map_file"])
map_yaml = os.path.join(map_dir, map_file)
if not os.path.exists(map_yaml):
return jsonify({"ok": False, "error": f"地图文件不存在: {map_yaml}"}), 400
gs.map_config = {"map_dir": map_dir, "map_file": map_file, "map_yaml": map_yaml}
save_json("map_config.json", gs.map_config)
return jsonify({"ok": True, "map": gs.map_config})
@app.route("/api/map/save", methods=["POST"])
def api_map_save():
"""保存地图配置"""
data = request.json
gs.map_config = data
save_json("map_config.json", data)
return jsonify({"ok": True})
@app.route("/api/map/image")
def api_map_image():
"""返回地图图像(PNG)"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"error": "地图未加载"}), 404
map_yaml = gs.map_config["map_yaml"]
map_dir = os.path.dirname(map_yaml)
# 解析 YAML 获取 PGM 文件名
try:
import yaml
with open(map_yaml, 'r') as f:
meta = yaml.safe_load(f)
pgm_file = meta.get('image', 'map.pgm')
pgm_path = os.path.join(map_dir, pgm_file)
if not os.path.exists(pgm_path):
return jsonify({"error": f"PGM 文件不存在: {pgm_path}"}), 404
# 读取 PGM 并转换为 PNG
import cv2
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return jsonify({"error": "无法读取 PGM 文件"}), 500
# 反转颜色(PGM 中 0=占用,255=空闲,ROS 地图显示习惯)
img = 255 - img
# 编码为 PNG
_, buf = cv2.imencode('.png', img)
return Response(buf.tobytes(), mimetype='image/png')
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/map/meta")
def api_map_meta():
"""返回地图元数据(分辨率、原点、尺寸)"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"ok": False, "error": "地图未加载"}), 404
map_yaml = gs.map_config["map_yaml"]
map_dir = os.path.dirname(map_yaml)
try:
import yaml
with open(map_yaml, 'r') as f:
meta = yaml.safe_load(f)
pgm_file = meta.get('image', 'map.pgm')
pgm_path = os.path.join(map_dir, pgm_file)
import cv2
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
height, width = img.shape if img is not None else (0, 0)
return jsonify({
"ok": True,
"resolution": meta.get('resolution', 0.05),
"origin": meta.get('origin', [0, 0, 0]),
"width": width,
"height": height
})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
# ========== 地图导航 API ==========
@app.route("/api/navigate/to", methods=["POST"])
def api_navigate_to():
"""导航到目标坐标"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"ok": False, "error": "地图未加载,请先在设置中加载地图"}), 400
data = request.json
goal_x = data.get("x")
goal_y = data.get("y")
goal_yaw = data.get("yaw") # 姿态参数,可选
if goal_x is None or goal_y is None:
return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接,请先连接 AGV"}), 400
try:
if gs.navigator is None:
gs.navigator = Nav2Navigator()
# navigate_to_pose(x, y, yaw=None, timeout_sec=120, blocking=False)
yaw_arg = float(goal_yaw) if goal_yaw is not None else None
ok = gs.navigator.navigate_to_pose(float(goal_x), float(goal_y), yaw_arg, blocking=False)
if ok:
return jsonify({"ok": True, "message": "导航已启动"})
else:
return jsonify({"ok": False, "error": "导航启动失败,可能是Nav2未运行或AGV未连接"}), 400
except Exception as e:
logger.error(f"导航失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/navigate/stop", methods=["POST"])
def api_navigate_stop():
"""停止导航"""
if gs.navigator:
gs.navigator.stop()
return jsonify({"ok": True, "message": "导航已停止"})
return jsonify({"ok": False, "error": "导航器未初始化"}), 400
@app.route("/api/navigate/cancel", methods=["POST"])
def api_navigate_cancel():
"""取消当前导航(别名)"""
if gs.navigator:
gs.navigator.stop()
return jsonify({"ok": True, "message": "导航已取消"})
return jsonify({"ok": True, "message": "无活动导航"})
@app.route("/api/navigate/status", methods=["GET"])
def api_navigate_status():
"""获取导航状态"""
# 懒初始化 navigator
if gs.navigator is None:
try:
gs.navigator = Nav2Navigator()
except Exception as e:
logger.warning(f"Nav2Navigator 初始化失败: {e}")
if gs.navigator:
return jsonify(gs.navigator.get_status())
# navigator 仍为 None,说明 Nav2 不可用
return jsonify({"status": "idle", "current_position": [0, 0, 0], "nav2_available": False})
@app.route("/api/navigate/path", methods=["POST"])
def api_navigate_path():
"""预览路径(仅规划不执行)- Nav2版本不支持预计算路径,返回当前导航状态"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"ok": False, "error": "地图未加载"}), 400
data = request.json
goal_x = data.get("x")
goal_y = data.get("y")
if goal_x is None or goal_y is None:
return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400
try:
if gs.navigator is None:
gs.navigator = Nav2Navigator()
# Nav2 不提供路径预览,直接返回可用状态
current = gs.navigator.get_current_position()
status = gs.navigator.get_status()
return jsonify({
"ok": True,
"message": "Nav2 路径预览不可用,请在 RViz 中查看规划路径",
"current_position": current,
"nav2_available": status.get("nav2_available", False)
})
except Exception as e:
logger.error(f"路径预览失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
# ========== 点位配置 API ==========
@app.route("/api/points/list", methods=["GET"])
def api_points_list():
"""获取所有点位"""
return jsonify({"points": gs.points_config})
@app.route("/api/points/add", methods=["POST"])
def api_points_add():
"""添加点位(从当前位置)"""
data = request.json
point_name = data.get("name", f"point_{len(gs.points_config) + 1}")
# 获取 AGV 当前位置
position = None
if gs.agv_controller and gs.agv_controller.is_connected():
position = gs.agv_controller.get_position()
point = {
"id": f"p_{int(time.time())}",
"name": point_name,
"coords": position or [0.0, 0.0, 0.0], # [x, y, yaw]
"photo_mode": data.get("photo_mode", "front"), # front / back / both
"sequence": data.get("sequence", ["front", "back"]), # both 时的执行顺序
}
gs.points_config.append(point)
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True, "point": point})
@app.route("/api/points/update/<point_id>", methods=["POST"])
def api_points_update(point_id):
"""更新点位"""
data = request.json
for i, p in enumerate(gs.points_config):
if p["id"] == point_id:
gs.points_config[i].update(data)
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "点位不存在"}), 404
@app.route("/api/points/delete/<point_id>", methods=["DELETE"])
def api_points_delete(point_id):
"""删除点位"""
gs.points_config = [p for p in gs.points_config if p["id"] != point_id]
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True})
@app.route("/api/points/save", methods=["POST"])
def api_points_save():
"""保存点位配置"""
data = request.json
gs.points_config = data.get("points", [])
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True, "count": len(gs.points_config)})
@app.route("/api/points/load", methods=["GET"])
def api_points_load():
"""加载已保存的点位配置"""
loaded = load_json("points_config.json", [])
if loaded:
gs.points_config = loaded
return jsonify({"ok": True, "points": loaded or []})
# ========== 机型(姿态组)API ==========
@app.route("/api/models/list", methods=["GET"])
def api_models_list():
"""获取所有机型"""
return jsonify({"models": gs.models_config})
@app.route("/api/models/add", methods=["POST"])
def api_models_add():
"""添加机型"""
data = request.json
model_name = data.get("name", f"model_{len(gs.models_config) + 1}")
model = {
"id": f"m_{int(time.time())}",
"name": model_name,
"serial_prefix": data.get("serial_prefix", ""), # 二维码型号前缀
"poses": [], # 姿态列表
"description": data.get("description", ""),
"notes": data.get("notes", ""), # 备注
}
gs.models_config.append(model)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True, "model": model})
@app.route("/api/models/update/<model_id>", methods=["POST"])
def api_models_update(model_id):
"""更新机型"""
data = request.json
for i, m in enumerate(gs.models_config):
if m["id"] == model_id:
gs.models_config[i].update(data)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "机型不存在"}), 404
@app.route("/api/models/delete/<model_id>", methods=["DELETE"])
def api_models_delete(model_id):
"""删除机型"""
gs.models_config = [m for m in gs.models_config if m["id"] != model_id]
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
@app.route("/api/models/poses/add", methods=["POST"])
def api_poses_add():
"""添加姿态到机型(需指定 model_id)"""
data = request.json
model_id = data.get("model_id")
if not model_id:
return jsonify({"ok": False, "error": "缺少 model_id"}), 400
for m in gs.models_config:
if m["id"] == model_id:
pose = {
"id": f"pose_{int(time.time())}",
"name": data.get("name", f"姿态{len(m['poses']) + 1}"),
"photo_type": data.get("photo_type", "front"), # front / back / nameplate
"arm_angles": data.get("arm_angles", [0.0]*6),
"speed": data.get("speed", 500),
"description": data.get("description", ""),
}
m["poses"].append(pose)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True, "pose": pose})
return jsonify({"ok": False, "error": "机型不存在"}), 404
@app.route("/api/models/<model_id>/poses", methods=["GET"])
def api_model_poses_get(model_id):
"""获取机型的姿态列表"""
for m in gs.models_config:
if m["id"] == model_id:
return jsonify({"poses": m.get("poses", [])})
return jsonify({"poses": []})
@app.route("/api/models/<model_id>/poses/<pose_id>", methods=["PUT"])
def api_model_poses_update(model_id, pose_id):
"""更新姿态"""
data = request.json
for m in gs.models_config:
if m["id"] == model_id:
for i, pose in enumerate(m.get("poses", [])):
if pose["id"] == pose_id:
m["poses"][i].update(data)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
return jsonify({"ok": False}), 404
@app.route("/api/models/<model_id>/poses/<pose_id>", methods=["DELETE"])
def api_model_poses_delete(model_id, pose_id):
"""删除姿态"""
for m in gs.models_config:
if m["id"] == model_id:
m["poses"] = [p for p in m.get("poses", []) if p["id"] != pose_id]
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
return jsonify({"ok": False}), 404
# ========== 任务配置 APIM×N 网格)==========
@app.route("/api/mission/position", methods=["GET"])
def api_mission_position():
"""读取 AGV 当前坐标(供设置点位时使用)"""
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
pos = gs.agv_controller.get_position()
if not pos:
return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400
battery = gs.agv_controller.get_battery()
return jsonify({"ok": True, "position": pos, "battery": battery})
@app.route("/api/mission/init_pose", methods=["POST"])
def api_mission_init_pose():
"""将 AMCL 初始位置设为 (0,0,0),无需 RViz"""
try:
script = "/tmp/ros2_init_pose.sh"
yaml_content = (
"pose:\n"
" header:\n"
" stamp:\n"
" sec: 0\n"
" nanosec: 0\n"
" frame_id: map\n"
" pose:\n"
" position:\n"
" x: 0.0\n"
" y: 0.0\n"
" z: 0.0\n"
" covariance: [0.25, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.25, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]\n"
" orientation:\n"
" x: 0.0\n"
" y: 0.0\n"
" z: 0.0\n"
" w: 1.0\n"
)
goal_file = "/tmp/nav2_goal_{}.yaml".format(os.getpid())
with open(goal_file, "w") as f:
f.write(yaml_content)
lines = [
"#!/bin/bash",
"export ROS_DOMAIN_ID=1",
"source /opt/ros/humble/setup.bash",
"source /home/elephant/agv_pro_ros2/install/setup.bash",
f'ros2 topic pub --once /initialpose geometry_msgs/PoseWithCovarianceStamped "$(cat {goal_file})"',
]
with open(script, "w") as f:
f.write("\n".join(lines) + "\n")
os.chmod(script, 0o755)
result = subprocess.run(
[script],
capture_output=True, text=True, timeout=12,
env={**os.environ, "ROS_DOMAIN_ID": "1"}
)
logger.info(f"init_pose: rc={result.returncode}, stdout={result.stdout[:200]}, stderr={result.stderr[:200]}")
return jsonify({"ok": True, "message": "初始位置已设为 (0,0,0)"})
except Exception as e:
logger.error(f"初始化位置失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/mission/config", methods=["GET"])
def api_mission_config_get():
"""获取任务配置(网格尺寸和空位矩阵)"""
return jsonify({"ok": True, "config": gs.mission_config, "machines": gs.machines_config})
@app.route("/api/mission/config", methods=["POST"])
def api_mission_config_set():
""""设置任务配置(网格尺寸+空位矩阵+机械臂初始姿态)"""
data = request.json
rows = data.get("rows", 2)
cols = data.get("cols", 3)
grid = data.get("grid", [])
arm_initial_pose = data.get("arm_initial_pose", [0.0] * 6)
gs.mission_config["rows"] = rows
gs.mission_config["cols"] = cols
gs.mission_config["grid"] = grid
gs.mission_config["arm_initial_pose"] = arm_initial_pose
# 清除超出网格边界的 positions(只保留 front/back 且 row<=rows, col<cols
gs.mission_config["positions"] = [
p for p in gs.mission_config.get("positions", [])
if p.get("row", 0) <= rows and p.get("col", 0) < cols and p.get("side") in ("front", "back")
]
save_json("mission_config.json", gs.mission_config)
return jsonify({"ok": True, "config": gs.mission_config})
@app.route("/api/mission/machines", methods=["GET"])
def api_mission_machines_list():
"""获取所有机器配置"""
return jsonify({"ok": True, "machines": gs.machines_config})
@app.route("/api/mission/machines/<machine_id>", methods=["GET"])
def api_mission_machine_get(machine_id):
"""获取单台机器配置"""
for m in gs.machines_config:
if m["id"] == machine_id:
return jsonify({"ok": True, "machine": m})
return jsonify({"ok": False, "error": "机器不存在"}), 404
@app.route("/api/mission/machines", methods=["POST"])
def api_mission_machines_save():
""""批量保存/更新机器配置"""
data = request.json
machines = data.get("machines", [])
gs.machines_config = machines
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True, "count": len(gs.machines_config)})
@app.route("/api/mission/machines/add", methods=["POST"])
def api_mission_machine_add():
"""添加单台机器配置"""
data = request.json
machine_id = data.get("id", f"m_{data.get('row', 0)}_{data.get('col', 0)}")
# 检查是否已存在
for m in gs.machines_config:
if m["id"] == machine_id or (m.get("row") == data.get("row") and m.get("col") == data.get("col")):
return jsonify({"ok": False, "error": "该位置已有机器"}), 400
machine = {
"id": machine_id,
"row": data.get("row", 0),
"col": data.get("col", 0),
"front": data.get("front", {"coords": [0, 0, 0], "poses": []}),
"back": data.get("back", {"coords": [0, 0, 0], "poses": []}),
"qr": data.get("qr", {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}),
}
gs.machines_config.append(machine)
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True, "machine": machine})
@app.route("/api/mission/machines/<machine_id>", methods=["PUT"])
def api_mission_machine_update(machine_id):
"""更新单台机器配置(正面/背面点位+姿态)"""
data = request.json
for i, m in enumerate(gs.machines_config):
if m["id"] == machine_id:
gs.machines_config[i].update(data)
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "机器不存在"}), 404
@app.route("/api/mission/machines/<machine_id>", methods=["DELETE"])
def api_mission_machine_delete(machine_id):
"""删除单台机器配置"""
gs.machines_config = [m for m in gs.machines_config if m["id"] != machine_id]
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
@app.route("/api/mission/qr_scan/<machine_id>", methods=["POST"])
def api_mission_qr_scan(machine_id):
"""扫描二维码并关联到机器"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return jsonify({"ok": False, "error": "AGV 摄像头未打开"}), 400
result = gs.qr_scanner.scan_once()
if result:
# 在 machines_config 和 mission_config 中查找机器
for i, m in enumerate(gs.machines_config):
if m["id"] == machine_id:
if "qr" not in m:
m["qr"] = {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}
m["qr"]["qr_value"] = result
# 尝试匹配机型(通过 serial_prefix
matched_model = None
for model in gs.models_config:
prefix = model.get("serial_prefix", "")
if prefix and result.startswith(prefix):
matched_model = model
break
if matched_model:
m["qr"]["model_id"] = matched_model["id"]
save_json("machines_config.json", gs.machines_config)
return jsonify({
"ok": True,
"qr_value": result,
"model_id": m["qr"].get("model_id", ""),
"model_name": matched_model["name"] if matched_model else ""
})
return jsonify({"ok": False, "error": f"机器 {machine_id} 不存在"}), 404
return jsonify({"ok": False, "error": "未检测到二维码"})
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["GET"])
def api_mission_poses_get(machine_id, side):
"""获取机器指定侧的姿态列表(side: front | back"""
for m in gs.machines_config:
if m["id"] == machine_id:
return jsonify({"ok": True, "poses": m.get(side, {}).get("poses", [])})
return jsonify({"ok": False, "poses": []}), 404
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["POST"])
def api_mission_poses_add(machine_id, side):
"""添加姿态到机器指定侧(side: front | back"""
data = request.json
for m in gs.machines_config:
if m["id"] == machine_id:
if side not in m:
return jsonify({"ok": False, "error": f"机器无此侧: {side}"}), 400
pose = {
"id": f"pose_{int(time.time())}",
"name": data.get("name", f"姿态"),
"arm_angles": data.get("arm_angles", [0.0]*6),
"speed": data.get("speed", 500),
"description": data.get("description", ""),
}
m[side]["poses"].append(pose)
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True, "pose": pose})
return jsonify({"ok": False, "error": "机器不存在"}), 404
@app.route("/api/mission/poses/<machine_id>/<side>/<pose_id>", methods=["DELETE"])
def api_mission_poses_delete(machine_id, side, pose_id):
"""删除机器指定侧的姿态"""
for m in gs.machines_config:
if m["id"] == machine_id:
if side not in m:
return jsonify({"ok": False}), 404
m[side]["poses"] = [p for p in m[side]["poses"] if p["id"] != pose_id]
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
return jsonify({"ok": False}), 404
@app.route("/api/mission/generate_sequence", methods=["GET"])
def api_mission_generate_sequence():
"""根据网格配置和机器配置生成拍摄序列(蛇形)"""
rows = int(gs.mission_config.get("rows", 2))
cols = int(gs.mission_config.get("cols", 3))
grid = gs.mission_config.get("grid", [])
machines = gs.machines_config
if (not grid or all(not any(row) if isinstance(row, list) else True for row in grid)) and machines:
grid = [[False] * cols for _ in range(rows)]
for m in machines:
r = int(m.get("row", 0))
c = int(m.get("col", 0))
if 0 <= r < rows and 0 <= c < cols:
grid[r][c] = True
def get_machine(row, col):
for m in machines:
if m.get("row") == row and m.get("col") == col:
return m
return None
# 点位蛇形序列:同一点位同时有上一行背面和下一行正面时,先背面再正面。
sequence = []
for pr in range(rows + 1):
cols_iter = range(cols) if pr % 2 == 0 else range(cols - 1, -1, -1)
row_dir = "lr" if pr % 2 == 0 else "rl"
for c in cols_iter:
if pr > 0 and pr - 1 < len(grid) and c < len(grid[pr - 1]) and grid[pr - 1][c]:
m = get_machine(pr - 1, c)
if m and m.get("back"):
sequence.append({
"machine_id": m["id"],
"row": pr - 1, "col": c,
"point_row": pr,
"side": "back",
"row_dir": row_dir
})
if pr < rows and pr < len(grid) and c < len(grid[pr]) and grid[pr][c]:
m = get_machine(pr, c)
if m and m.get("front"):
sequence.append({
"machine_id": m["id"],
"row": pr, "col": c,
"point_row": pr,
"side": "front",
"row_dir": row_dir
})
return jsonify({"ok": True, "sequence": sequence})
# ========== 点位配置 API(独立于机器)==========
@app.route("/api/mission/positions", methods=["GET"])
def api_mission_positions_list():
"""获取所有点位配置"""
positions = gs.mission_config.get("positions", [])
return jsonify({"ok": True, "positions": positions})
@app.route("/api/mission/positions", methods=["POST"])
def api_mission_positions_save():
"""保存点位配置(单个点位:创建或更新)"""
data = request.json
row = int(data.get("row", 0))
col = int(data.get("col", 0))
side = data.get("side", "front") # "front" | "back"
coords = data.get("coords", [0, 0, 0])
poses = data.get("poses", [])
positions = gs.mission_config.setdefault("positions", [])
# 查找是否已有该点位
key = (row, col, side)
for i, p in enumerate(positions):
if int(p.get("row", 0)) == row and int(p.get("col", 0)) == col and p.get("side") == side:
positions[i] = {"row": row, "col": col, "side": side, "coords": coords, "poses": poses}
save_json("mission_config.json", gs.mission_config)
return jsonify({"ok": True, "position": positions[i]})
positions.append({"row": row, "col": col, "side": side, "coords": coords, "poses": poses})
save_json("mission_config.json", gs.mission_config)
return jsonify({"ok": True, "position": positions[-1]})
# ========== 机械臂控制 API ==========
@app.route("/api/arm/get_angles", methods=["GET"])
def api_arm_get_angles():
"""获取当前关节角度"""
if not gs.arm_client:
return jsonify({"ok": False, "error": "未连接机械臂"}), 400
ok, angles = gs.arm_client.get_angles()
return jsonify({"ok": ok, "angles": angles})
@app.route("/api/arm/set_angles", methods=["POST"])
def api_arm_set_angles():
"""设置关节角度"""
data = request.json or {}
angles = data.get("angles", [])
speed = data.get("speed", 500)
if not isinstance(angles, list) or len(angles) != 6:
return jsonify({"ok": False, "error": "角度数据必须包含 6 个关节值"}), 400
try:
angles = [float(a) for a in angles]
speed = int(speed)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "角度或速度格式错误"}), 400
if not gs.arm_client:
return jsonify({"ok": False, "error": "未连接机械臂"}), 400
ok = gs.arm_client.set_angles(angles, speed)
return jsonify({"ok": ok, "error": None if ok else "机械臂执行失败"})
@app.route("/api/arm/set_angle", methods=["POST"])
def api_arm_set_angle():
"""设置单个关节角度"""
data = request.json
joint = data.get("joint", "J1")
angle = data.get("angle", 0.0)
speed = data.get("speed", 500)
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.set_angle(joint, angle, speed)
return jsonify({"ok": ok})
@app.route("/api/arm/jog", methods=["POST"])
def api_arm_jog():
"""连续调节关节(用于方向键)"""
data = request.json
joint = data.get("joint", "J1")
direction = data.get("direction", 0) # -1 / 0 / 1
speed = data.get("speed", 500)
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.jog_angle(joint, direction, speed)
return jsonify({"ok": ok})
@app.route("/api/arm/get_coords", methods=["GET"])
def api_arm_get_coords():
"""获取当前坐标"""
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok, coords = gs.arm_client.get_coords()
return jsonify({"ok": ok, "coords": coords})
@app.route("/api/arm/state_check", methods=["GET"])
def api_arm_state_check():
"""检查机械臂状态"""
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.state_check()
return jsonify({"ok": ok, "running": not ok})
@app.route("/api/arm/power_on", methods=["POST"])
def api_arm_power_on():
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.power_on()
return jsonify({"ok": ok})
@app.route("/api/arm/state_on", methods=["POST"])
def api_arm_state_on():
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.state_on()
return jsonify({"ok": ok})
@app.route("/api/arm/state_off", methods=["POST"])
def api_arm_state_off():
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.state_off()
return jsonify({"ok": ok})
# ========== 摄像头预览 API ==========
@app.route("/api/camera/preview")
def api_camera_preview():
"""MJPEG 视频流"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return "camera not opened", 400
def gen():
while True:
frame = gs.qr_scanner.read_frame()
if frame is None:
break
# 编码为 JPEG
import cv2
ret, buf = cv2.imencode(".jpg", frame)
if ret:
yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" +
buf.tobytes() + b"\r\n")
return Response(gen(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/api/camera/refresh")
def api_camera_refresh():
"""AGV 摄像头单帧 JPEGpolling 模式)"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return "camera not opened", 400
import cv2
frame = gs.qr_scanner.read_frame()
if frame is None:
return "", 400
ret, buf = cv2.imencode(".jpg", frame)
if ret:
return Response(buf.tobytes(), mimetype="image/jpeg")
return "encode failed", 500
@app.route("/api/camera/capture")
def api_camera_capture():
"""拍摄一张照片"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return jsonify({"ok": False, "error": "摄像头未打开"}), 400
import cv2
frame = gs.qr_scanner.read_frame()
if frame is None:
return jsonify({"ok": False, "error": "读取帧失败"}), 400
photo_dir = os.path.join(DATA_DIR, "photos")
os.makedirs(photo_dir, exist_ok=True)
photo_path = os.path.join(photo_dir, f"capture_{int(time.time())}.jpg")
cv2.imwrite(photo_path, frame)
return jsonify({"ok": True, "path": photo_path})
@app.route("/api/camera/arm_refresh")
def api_arm_camera_refresh():
"""从机械臂拉一张 JPEG(请求 snapshot 端点,简单 HTTP GET"""
import requests
try:
r = requests.get(ARM_CAMERA_CONFIG.get("snapshot_url", ARM_CAMERA_CONFIG["url"]), timeout=8)
if r.status_code == 200 and r.content:
resp = Response(r.content, mimetype="image/jpeg")
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, max-age=0"
resp.headers["Pragma"] = "no-cache"
resp.headers["Expires"] = "0"
return resp
return "", 404
except Exception as ex:
logger.info(f"arm_refresh 不可用: {ex}")
return "", 404
@app.route("/api/camera/arm_preview")
def api_arm_camera_preview():
"""代理机械臂 MJPEG 视频流,供页面连续预览。"""
import requests
try:
upstream = requests.get(
ARM_CAMERA_CONFIG["url"],
stream=True,
timeout=(3, 10),
)
if upstream.status_code != 200:
upstream.close()
return "", 404
def generate():
try:
for chunk in upstream.iter_content(chunk_size=8192):
if chunk:
yield chunk
finally:
upstream.close()
return Response(
generate(),
mimetype="multipart/x-mixed-replace; boundary=frame",
headers={"Cache-Control": "no-cache, no-store, must-revalidate, max-age=0"},
)
except Exception as ex:
logger.info(f"arm_preview 不可用: {ex}")
return "", 404
@app.route("/api/camera/qr_scan", methods=["GET"])
def api_qr_scan():
"""扫描一次二维码"""
if not gs.qr_scanner:
return jsonify({"ok": False}), 400
result = gs.qr_scanner.scan_once()
return jsonify({"ok": bool(result), "data": result})
# ========== AGV 移动控制 API ==========
@app.route("/api/agv/move", methods=["POST"])
def api_agv_move():
"""控制 AGV 移动(前进/后退/左转/右转/停止)"""
data = request.json
direction = data.get("direction", "stop") # forward / backward / left / right / stop
speed = data.get("speed", AGV_CONFIG.get("move_speed", 1.0))
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
try:
if direction == "forward":
gs.agv_controller.move_forward(speed)
elif direction == "backward":
gs.agv_controller.move_backward(speed)
elif direction == "left":
gs.agv_controller.turn_left(speed)
elif direction == "right":
gs.agv_controller.turn_right(speed)
elif direction == "left_lateral":
gs.agv_controller.move_left_lateral(speed)
elif direction == "right_lateral":
gs.agv_controller.move_right_lateral(speed)
elif direction == "stop":
gs.agv_controller.stop()
else:
return jsonify({"ok": False, "error": "未知方向"}), 400
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/agv/position", methods=["GET"])
def api_agv_position():
"""获取 AGV 当前位置"""
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
pos = gs.agv_controller.get_position()
if not pos:
return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400
battery = gs.agv_controller.get_battery()
return jsonify({"ok": True, "position": pos, "battery": battery})
@app.route("/api/agv/stop", methods=["POST"])
def api_agv_stop():
"""立即停止 AGV"""
if gs.agv_controller:
gs.agv_controller.stop()
return jsonify({"ok": True})
@app.route("/api/agv/reset", methods=["POST"])
def api_agv_reset():
"""撞物体后复位 - 停止运动并重新检查 ROS2 连接"""
import time
if not gs.agv_controller:
return jsonify({"ok": False, "error": "AGV 控制器未初始化"}), 400
try:
gs.agv_controller.stop()
time.sleep(0.5)
if gs.agv_controller.connect():
return jsonify({"ok": True, "message": "复位成功,AGV 已停止并重新连接"})
return jsonify({"ok": False, "error": "AGV 已停止,但 ROS2 连接检查失败"}), 500
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
# ========== 任务执行 API ==========
@app.route("/api/mission/start", methods=["POST"])
def api_mission_start():
"""开始执行任务(V3: M×N Grid 蛇形路径)"""
data = request.json or {}
single_step = bool(data.get("single_step", False))
# 任务步骤控制开关
options = {
"arm_init": bool(data.get("arm_init", True)),
"agv_move": bool(data.get("agv_move", True)),
"qr_scan": bool(data.get("qr_scan", True)),
"front_photo": bool(data.get("front_photo", True)),
"back_photo": bool(data.get("back_photo", True)),
"agv_speed": float(data.get("agv_speed", 1.0)),
"arm_speed": int(data.get("arm_speed", 1000)),
}
print(f"[Mission] options: {options}")
existing = getattr(MissionExecutorV3, "_instance", None)
if existing and existing.report.get("status") not in ("idle", "completed"):
return jsonify({"ok": False, "error": "任务已在运行中"}), 400
MissionExecutorV3._instance = None
def run(single_step):
from config import AGV_CONFIG
config = {
"device": AGV_CONFIG.get("device", "/dev/agvpro_controller"),
"baudrate": AGV_CONFIG.get("baudrate", 1000000),
"arm": ARM_CONFIG,
}
executor = MissionExecutorV3(config)
try:
conn = executor.connect_all()
# 当所有机械臂相关步骤都关闭时,机械臂连接为可选
need_arm = options.get("qr_scan", True) or options.get("front_photo", True) or options.get("back_photo", True)
if not conn.get("agv"):
gs.mission_report = {"error": "AGV 连接失败", "details": conn}
gs.state = State.IDLE
return
if need_arm and not conn.get("arm"):
gs.mission_report = {"error": "机械臂连接失败,请先连接机械臂", "details": conn}
gs.state = State.IDLE
return
gs.state = State.RUNNING
machines_list = gs.machines_config if isinstance(gs.machines_config, list) else gs.machines_config.get("machines", [])
models_list = gs.models_config if isinstance(gs.models_config, list) else gs.models_config.get("models", [])
report = executor.execute_mission(
mission_config=gs.mission_config,
machines=machines_list,
qr_configs=gs.qr_config,
models=models_list,
single_step=single_step,
options=options,
)
gs.mission_report = report
gs.state = State.IDLE if report.get("error") is None else State.PAUSED
finally:
executor.disconnect_all()
thread = threading.Thread(target=run, args=(single_step,), daemon=True)
thread.start()
return jsonify({"ok": True, "single_step": single_step})
@app.route("/api/mission/stop", methods=["POST"])
def api_mission_stop():
"""停止任务"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.stop()
gs.state = State.IDLE
return jsonify({"ok": True})
@app.route("/api/mission/pause", methods=["POST"])
def api_mission_pause():
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.pause()
gs.state = State.PAUSED
return jsonify({"ok": True})
@app.route("/api/mission/resume", methods=["POST"])
def api_mission_resume():
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.resume()
gs.state = State.RUNNING
return jsonify({"ok": True})
@app.route("/api/mission/report", methods=["GET"])
def api_mission_report():
return jsonify({"report": gs.mission_report})
@app.route("/api/mission/state", methods=["GET"])
def api_mission_state():
"""返回任务状态 + 预生成任务列表"""
result = {"state": gs.state}
# 如果有执行器实例,合并执行器状态
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
ex = MissionExecutorV3._instance
result.update(ex.get_status())
# 从配置文件预生成任务列表(不依赖 MissionExecutorV3 类)
try:
base = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(base, "data", "mission_config.json")) as jf:
mc = json.load(jf)
machines = []
try:
with open(os.path.join(base, "data", "machines_config.json")) as jf:
machines = json.load(jf)
except:
pass
rows = int(mc.get("rows", 1))
cols = int(mc.get("cols", 1))
grid = mc.get("grid", [])
# 如果 grid 为空,从 machines 重建(只取 rows×cols 范围内的机器)
if (not grid or all(not any(rw) if isinstance(rw, list) else True for rw in grid)) and machines:
grid = [[False] * cols for _ in range(rows)]
for m in machines:
r = int(m.get("row", 0))
c = int(m.get("col", 0))
if 0 <= r < rows and 0 <= c < cols:
grid[r][c] = True
# 蛇形路径
path = []
for r in range(rows):
if r % 2 == 0:
for c in range(cols):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
path.append((r, c))
else:
for c in range(cols - 1, -1, -1):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
path.append((r, c))
# 网格级别任务数据
result["rows"] = rows
result["cols"] = cols
result["grid"] = grid
result["point_status"] = {f"{pr}_{c}": "pending" for pr in range(rows+1) for c in range(cols)}
result["machine_status"] = {}
for r in range(rows):
for c in range(cols):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
result["machine_status"][f"{r}_{c}"] = {
"has_machine": True,
"qr": "pending", "qr_val": None,
"front": "pending", "front_cnt": 0,
"back": "pending", "back_cnt": 0,
"status": "pending", "step": "等待",
}
# 保留旧的 tasks 列表(兼容)
tlist = []
for (r, c) in path:
tlist.append({
"row": r, "col": c,
"machine_id": "m_{}_{}".format(r, c),
"label": "{}-{}".format(r+1, c+1),
"status": "pending",
"step": "等待",
"qr_value": None,
"photos_front": 0,
"photos_back": 0,
})
result["tasks"] = tlist
except Exception:
result["rows"] = 1
result["cols"] = 1
result["grid"] = []
result["point_status"] = {}
result["machine_status"] = {}
result["tasks"] = []
# 机械臂摄像头状态
result["arm_camera_opened"] = gs.arm_camera_opened
# 错误弹窗状态和实时网格状态
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
ex = MissionExecutorV3._instance
st = ex.get_status()
result["error_msg"] = st.get("error", "")
result["waiting_step"] = (st.get("status") == "waiting_step")
result["waiting_error"] = (st.get("status") == "waiting_error")
# 从 executor.report 读取实时点/机器状态
rpt = ex.report
if rpt.get("point_status"):
result["point_status"] = rpt["point_status"]
if rpt.get("machine_status"):
result["machine_status"] = rpt["machine_status"]
else:
result["error_msg"] = ""
result["waiting_step"] = False
result["waiting_error"] = False
return jsonify(result)
@app.route("/api/mission/log", methods=["GET"])
def api_mission_log():
"""返回实时日志"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
ex = MissionExecutorV3._instance
return jsonify(ex.get_logs())
return jsonify(gs.mission_report or {"log": []})
@app.route("/api/mission/manual-qr", methods=["POST"])
def api_mission_manual_qr():
"""手动输入二维码值"""
data = request.json or {}
qr = data.get("qr", "").strip()
if not qr:
return jsonify({"ok": False, "error": "二维码不能为空"}), 400
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_manual_qr(qr)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
# ========== 错误弹窗 API ==========
@app.route("/api/mission/error-skip", methods=["POST"])
def api_mission_error_skip():
"""用户选择:跳过当前错误"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_error_choice("skip")
return jsonify({"ok": True, "choice": "skip"})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
@app.route("/api/mission/error-abort", methods=["POST"])
def api_mission_error_abort():
"""用户选择:中断任务"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_error_choice("abort")
return jsonify({"ok": True, "choice": "abort"})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
# ========== 单步执行 API ==========
@app.route("/api/mission/singlestep/confirm", methods=["POST"])
def api_mission_singlestep_confirm():
"""单步执行:确认当前步骤正确,继续"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_step_choice("confirm")
return jsonify({"ok": True, "choice": "confirm"})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
@app.route("/api/mission/singlestep/retry", methods=["POST"])
def api_mission_singlestep_retry():
"""单步执行:重试当前步骤"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_step_choice("retry")
return jsonify({"ok": True, "choice": "retry"})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
@app.route("/api/mission/singlestep/abort", methods=["POST"])
def api_mission_singlestep_abort():
"""单步执行:中断任务"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_step_choice("abort")
return jsonify({"ok": True, "choice": "abort"})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
# ========== 二维码配置 API ==========
@app.route("/api/qr/configs", methods=["GET"])
def api_qr_configs_get():
"""获取所有二维码配置"""
return jsonify({"ok": True, "configs": gs.qr_config})
@app.route("/api/qr/configs", methods=["POST"])
def api_qr_configs_add():
"""添加二维码配置"""
data = request.json or {}
new_entry = {
"id": "qr_" + str(int(time.time() * 1000)),
"name": data.get("name", f"二维码{len(gs.qr_config) + 1}"),
"joint_angles": data.get("joint_angles", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
"qr_value": "",
"model_id": "",
}
gs.qr_config.append(new_entry)
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True, "entry": new_entry})
@app.route("/api/qr/configs/<qr_id>", methods=["PUT"])
def api_qr_configs_update(qr_id):
"""更新二维码配置"""
data = request.json or {}
for entry in gs.qr_config:
if entry["id"] == qr_id:
if "name" in data:
entry["name"] = data["name"]
if "joint_angles" in data:
entry["joint_angles"] = data["joint_angles"]
if "qr_value" in data:
entry["qr_value"] = data["qr_value"]
if "model_id" in data:
entry["model_id"] = data["model_id"]
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True, "entry": entry})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
@app.route("/api/qr/configs/<qr_id>", methods=["DELETE"])
def api_qr_configs_delete(qr_id):
"""删除二维码配置"""
for i, entry in enumerate(gs.qr_config):
if entry["id"] == qr_id:
gs.qr_config.pop(i)
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
@app.route("/api/qr/configs/<qr_id>/read-angles", methods=["POST"])
def api_qr_read_angles(qr_id):
"""读取机械臂当前关节角度并保存到指定二维码配置"""
if not gs.arm_client:
return jsonify({"ok": False, "error": "机械臂未连接"}), 400
ok, angles = gs.arm_client.get_angles()
if not ok or not angles:
return jsonify({"ok": False, "error": "无法读取机械臂角度"}), 400
for entry in gs.qr_config:
if entry["id"] == qr_id:
entry["joint_angles"] = list(angles)
save_json("qr_config.json", gs.qr_config)
return jsonify({"ok": True, "joint_angles": entry["joint_angles"]})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
@app.route("/api/qr/scan/<qr_id>", methods=["POST"])
def api_qr_config_scan(qr_id):
"""获取机械臂摄像头图像,识别二维码并保存到指定配置项(pyzbar 优先,OpenCV 兜底)"""
import requests
try:
jpg_bytes = None
# 多次尝试获取清晰帧
for _ in range(3):
try:
r = requests.get(ARM_CAMERA_CONFIG.get("snapshot_url", ARM_CAMERA_CONFIG["url"]), timeout=8)
if r.status_code == 200 and r.content:
jpg_bytes = r.content
break
except:
pass
import time; time.sleep(0.3)
if jpg_bytes is None:
return jsonify({"ok": False, "error": "无法连接机械臂摄像头"}), 400
import cv2
import numpy as np
nparr = np.frombuffer(jpg_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return jsonify({"ok": False, "error": "图像解码失败"}), 400
result = None
# 方法1: pyzbar(识别率更高)
try:
from PIL import Image
from pyzbar.pyzbar import decode as pyzbar_decode
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
codes = pyzbar_decode(pil_img)
if codes and codes[0].data:
result = codes[0].data.decode("utf-8").strip()
logger.info(f"pyzbar 扫码成功: {result}")
except Exception as e:
logger.debug(f"pyzbar 扫码失败: {e}")
# 方法2: OpenCV QRCodeDetector(兜底)
if not result:
try:
detector = cv2.QRCodeDetector()
val, _, _ = detector.detectAndDecode(frame)
if val and len(val.strip()) > 0:
result = val.strip()
logger.info(f"OpenCV 扫码成功: {result}")
except Exception as e:
logger.debug(f"OpenCV 扫码失败: {e}")
if result:
# 保存到配置项
for entry in gs.qr_config:
if entry["id"] == qr_id:
entry["qr_value"] = result
matched_model = None
for model in gs.models_config:
prefix = model.get("serial_prefix", "")
if prefix and result.startswith(prefix):
matched_model = model
break
if matched_model:
entry["model_id"] = matched_model["id"]
save_json("qr_config.json", gs.qr_config)
return jsonify({
"ok": True,
"qr_value": result,
"model_id": entry.get("model_id", ""),
"model_name": matched_model["name"] if matched_model else ""
})
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
return jsonify({"ok": False, "error": "未检测到二维码,请调整机械臂姿态或手动输入"})
except Exception as ex:
logger.error(f"QR 扫描失败: {ex}")
return jsonify({"ok": False, "error": f"扫描失败: {str(ex)}"}), 400
# ========== 静态资源 ==========
@app.route("/photos/<name>")
def photos(name):
return send_from_directory(os.path.join(DATA_DIR, "photos"), name)
# ========== 启动 ==========
if __name__ == "__main__":
logger.info("=" * 50)
logger.info("AGV 拍摄系统启动")
logger.info(f"监听 {SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}")
logger.info("=" * 50)
# 启动时自动初始化 AGV 摄像头
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
gs.camera_opened = gs.qr_scanner.open()
logger.info(f"AGV 摄像头初始化: {'成功' if gs.camera_opened else '失败'}")
# 启动时自动检测机械臂摄像头
try:
import requests as _startup_req
r = _startup_req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
gs.arm_camera_opened = (r.status_code == 200)
r.close()
logger.info(f"机械臂摄像头检测: {'成功' if gs.arm_camera_opened else '失败'}")
except Exception as _e:
gs.arm_camera_opened = False
logger.warning(f"机械臂摄像头检测失败: {_e}")
app.run(
host=SERVER_CONFIG["host"],
port=SERVER_CONFIG["port"],
debug=SERVER_CONFIG["debug"],
threaded=True
)