""" AGV 拍摄系统 - Flask 主程序 运行在 AGV 上,端口 5000 """ import os import json import time import logging import threading import subprocess from flask import Flask, render_template, jsonify, request, Response, send_from_directory from flask_cors import CORS from config import SERVER_CONFIG, ARM_CONFIG, AGV_CONFIG, UPLOAD_CONFIG, MAP_CONFIG, ARM_CAMERA_CONFIG, CAMERA_CONFIG, DATA_DIR, State from utils.arm_client import ArmClient from utils.agv_controller_ros2 import AGVController from utils.qr_scanner import QRScanner from utils.image_uploader import ImageUploader from utils.mission_executor import MissionExecutorV3 from utils.nav2_navigator import Nav2Navigator, Nav2Status # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) logger = logging.getLogger("agv_app") app = Flask(__name__, template_folder="templates", static_folder="static", static_url_path="/static") app.config["SECRET_KEY"] = SERVER_CONFIG["secret_key"] CORS(app) # ========== 全局状态 ========== class GlobalState: def __init__(self): self.state = State.IDLE # setting / running / paused / idle self.mission_data = None # 当前任务配置 self.mission_report = None # 任务执行报告 self.arm_client = None # ArmClient 实例 self.agv_controller = None # AGVController 实例 self.qr_scanner = None # QRScanner 实例 self.camera_opened = False self.arm_camera_opened = False self.map_config = {} # 地图配置 self.points_config = [] # 点位配置 self.models_config = [] # 机型配置(姿态) self.mission_config = { # 任务配置(M×N网格) "rows": 2, "cols": 3, "grid": [], # M×N 布尔矩阵 "positions": [] # 独立点位配置 [{row, col, side, coords, poses}] } self.machines_config = [] # 机器配置(每台机器的正面/背面点位+姿态) self.qr_config = [] # 二维码配置(独立点位列表) self.navigator = None # Nav2Navigator 实例 self.error_msg = "" # 错误弹窗消息(waiting_error 状态时) self.lock = threading.Lock() def reset(self): with self.lock: self.state = State.IDLE self.mission_data = None self.mission_report = None gs = GlobalState() # ========== 辅助函数 ========== def get_data_path(name): return os.path.join(DATA_DIR, name) def save_json(name, data): with open(get_data_path(name), "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_json(name, default=None): path = get_data_path(name) if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) return default if default is not None else {} # ========== 启动时加载持久化配置 ========== def load_persisted_config(): """Flask 启动时加载已保存的配置""" # 加载地图配置 map_cfg = load_json("map_config.json") if map_cfg and "map_yaml" in map_cfg: gs.map_config = map_cfg print(f"[启动] 加载地图配置: {map_cfg['map_yaml']}") # 加载点位配置 points_cfg = load_json("points_config.json", []) if points_cfg: gs.points_config = points_cfg print(f"[启动] 加载点位配置: {len(points_cfg)} 个点位") # 加载机型配置(姿态) models_cfg = load_json("models_config.json", []) if models_cfg: gs.models_config = models_cfg print(f"[启动] 加载机型配置: {len(models_cfg)} 个机型") # 加载任务配置(网格尺寸) mission_cfg = load_json("mission_config.json", {}) if mission_cfg: gs.mission_config = mission_cfg print(f"[启动] 加载任务配置: {mission_cfg.get('rows', 0)}行×{mission_cfg.get('cols', 0)}列") # 加载机器配置(正面/背面点位+姿态) machines_cfg = load_json("machines_config.json", []) if machines_cfg: gs.machines_config = machines_cfg print(f"[启动] 加载机器配置: {len(machines_cfg)} 台机器") # 加载二维码配置 qr_cfg = load_json("qr_config.json", []) if qr_cfg: gs.qr_config = qr_cfg print(f"[启动] 加载二维码配置: {len(qr_cfg)} 个点位") # 在 Flask 2.3+ 使用 @app.before_serving,兼容旧版用 before_first_request try: from flask import has_app_context # Flask 2.3+ 方式 with app.app_context(): load_persisted_config() # 启动时自动重连 AGV(异步,不阻塞 Flask 启动) import threading def _auto_reconnect(): time.sleep(2) # 等待 Flask 完全就绪 try: from utils.agv_controller_ros2 import AGVController gs.agv_controller = AGVController() if gs.agv_controller.connect(): print("[启动] AGV 自动连接成功") else: print("[启动] AGV 自动连接失败,请手动连接") except Exception as e: print(f"[启动] AGV 自动连接异常: {e}") threading.Thread(target=_auto_reconnect, daemon=True).start() except: # 兼容旧版 Flask @app.before_first_request def startup_load(): load_persisted_config() # ========== 页面路由 ========== @app.route("/") def index(): return render_template("index.html") @app.route("/setting") def setting_page(): return render_template("setting.html") @app.route("/running") def running_page(): return render_template("running.html") # ========== 系统状态 API ========== @app.route("/api/status") def api_status(): """获取系统整体状态""" with gs.lock: # 实际验证机械臂连接(尝试发送一个简单命令) arm_connected = False if gs.arm_client and gs.arm_client._sock: try: # 设置短超时尝试获取角度,验证连接是否有效 gs.arm_client._sock.settimeout(2) ok, _ = gs.arm_client.get_angles() arm_connected = ok except: arm_connected = False # 连接已断开,清理 socket if gs.arm_client: gs.arm_client._sock = None # 实际验证 AGV 连接 agv_connected = False if gs.agv_controller: agv_connected = gs.agv_controller.is_connected() return jsonify({ "state": gs.state, "agv_connected": agv_connected, "arm_connected": arm_connected, "camera_opened": gs.camera_opened, "arm_camera_opened": gs.arm_camera_opened, "map_loaded": bool(gs.map_config), "points_count": len(gs.points_config), "models_count": len(gs.models_config), "mission_rows": gs.mission_config.get("rows", 0), "mission_cols": gs.mission_config.get("cols", 0), "machines_count": len(gs.machines_config) }) @app.route("/api/system/connect", methods=["POST"]) def api_connect(): """连接 AGV 和机械臂""" results = {"agv": False, "arm": False, "camera": False, "errors": []} # 连接 AGV try: gs.agv_controller = AGVController() results["agv"] = gs.agv_controller.connect() except Exception as e: results["errors"].append(f"AGV: {e}") # 连接机械臂 try: gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"]) results["arm"] = gs.arm_client.connect() if results["arm"]: # 尝试上电并激活 gs.arm_client.power_on() time.sleep(0.5) gs.arm_client.state_on() except Exception as e: results["errors"].append(f"机械臂: {e}") # 打开摄像头 try: gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"]) results["camera"] = gs.qr_scanner.open() gs.camera_opened = results["camera"] except Exception as e: results["errors"].append(f"摄像头: {e}") # 检查机械臂摄像头 try: import requests as _req r2 = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=3) gs.arm_camera_opened = (r2.status_code == 200) r2.close() except: gs.arm_camera_opened = False all_ok = results["agv"] and results["arm"] and results["camera"] with gs.lock: if all_ok: gs.state = State.SETTING else: gs.state = State.IDLE return jsonify(results) @app.route("/api/system/disconnect", methods=["POST"]) def api_disconnect(): """断开所有连接""" if gs.arm_client: gs.arm_client.close() gs.arm_client = None if gs.agv_controller: gs.agv_controller.disconnect() gs.agv_controller = None if gs.qr_scanner: gs.qr_scanner.close() gs.qr_scanner = None gs.camera_opened = False gs.state = State.IDLE return jsonify({"ok": True}) @app.route("/api/device/connect", methods=["POST"]) def api_device_connect(): """连接单个设备,device: agv | arm | camera | arm_camera""" data = request.json or {} device = data.get("device", "") result = {"device": device, "ok": False, "error": ""} done_event = threading.Event() res_holder = {} def _do_connect(): try: if device == "agv": gs.agv_controller = AGVController() res_holder["ok"] = gs.agv_controller.connect() if not res_holder["ok"]: res_holder["error"] = "AGV 连接失败,请检查网络或 ROS2" elif device == "arm": gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"]) res_holder["ok"] = gs.arm_client.connect() if res_holder["ok"]: gs.arm_client.power_on() time.sleep(0.3) gs.arm_client.state_on() else: res_holder["error"] = "机械臂连接失败" elif device == "camera": gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"]) res_holder["ok"] = gs.qr_scanner.open() gs.camera_opened = res_holder["ok"] if not res_holder["ok"]: res_holder["error"] = "AGV 摄像头打开失败" elif device == "arm_camera": import requests as _req r = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5) res_holder["ok"] = (r.status_code == 200) r.close() gs.arm_camera_opened = res_holder["ok"] if not res_holder["ok"]: res_holder["error"] = "机械臂摄像头无响应" else: res_holder["error"] = f"未知设备: {device}" except Exception as e: res_holder["error"] = str(e) finally: done_event.set() t = threading.Thread(target=_do_connect) t.daemon = True t.start() # 最多等 10 秒,超时则返回失败 if not done_event.wait(10): result["ok"] = False result["error"] = "连接超时(10秒),设备无响应" else: result["ok"] = res_holder.get("ok", False) result["error"] = res_holder.get("error", "") return jsonify(result) # ========== 地图配置 API ========== @app.route("/api/map/load", methods=["POST"]) def api_map_load(): """加载地图文件""" data = request.json map_dir = data.get("map_dir", MAP_CONFIG["map_dir"]) map_file = data.get("map_file", MAP_CONFIG["map_file"]) map_yaml = os.path.join(map_dir, map_file) if not os.path.exists(map_yaml): return jsonify({"ok": False, "error": f"地图文件不存在: {map_yaml}"}), 400 gs.map_config = {"map_dir": map_dir, "map_file": map_file, "map_yaml": map_yaml} save_json("map_config.json", gs.map_config) return jsonify({"ok": True, "map": gs.map_config}) @app.route("/api/map/save", methods=["POST"]) def api_map_save(): """保存地图配置""" data = request.json gs.map_config = data save_json("map_config.json", data) return jsonify({"ok": True}) @app.route("/api/map/image") def api_map_image(): """返回地图图像(PNG)""" if not gs.map_config or "map_yaml" not in gs.map_config: return jsonify({"error": "地图未加载"}), 404 map_yaml = gs.map_config["map_yaml"] map_dir = os.path.dirname(map_yaml) # 解析 YAML 获取 PGM 文件名 try: import yaml with open(map_yaml, 'r') as f: meta = yaml.safe_load(f) pgm_file = meta.get('image', 'map.pgm') pgm_path = os.path.join(map_dir, pgm_file) if not os.path.exists(pgm_path): return jsonify({"error": f"PGM 文件不存在: {pgm_path}"}), 404 # 读取 PGM 并转换为 PNG import cv2 img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE) if img is None: return jsonify({"error": "无法读取 PGM 文件"}), 500 # 反转颜色(PGM 中 0=占用,255=空闲,ROS 地图显示习惯) img = 255 - img # 编码为 PNG _, buf = cv2.imencode('.png', img) return Response(buf.tobytes(), mimetype='image/png') except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/map/meta") def api_map_meta(): """返回地图元数据(分辨率、原点、尺寸)""" if not gs.map_config or "map_yaml" not in gs.map_config: return jsonify({"ok": False, "error": "地图未加载"}), 404 map_yaml = gs.map_config["map_yaml"] map_dir = os.path.dirname(map_yaml) try: import yaml with open(map_yaml, 'r') as f: meta = yaml.safe_load(f) pgm_file = meta.get('image', 'map.pgm') pgm_path = os.path.join(map_dir, pgm_file) import cv2 img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE) height, width = img.shape if img is not None else (0, 0) return jsonify({ "ok": True, "resolution": meta.get('resolution', 0.05), "origin": meta.get('origin', [0, 0, 0]), "width": width, "height": height }) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 # ========== 地图导航 API ========== @app.route("/api/navigate/to", methods=["POST"]) def api_navigate_to(): """导航到目标坐标""" if not gs.map_config or "map_yaml" not in gs.map_config: return jsonify({"ok": False, "error": "地图未加载,请先在设置中加载地图"}), 400 data = request.json goal_x = data.get("x") goal_y = data.get("y") goal_yaw = data.get("yaw") # 姿态参数,可选 if goal_x is None or goal_y is None: return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400 if not gs.agv_controller or not gs.agv_controller.is_connected(): return jsonify({"ok": False, "error": "AGV 未连接,请先连接 AGV"}), 400 try: if gs.navigator is None: gs.navigator = Nav2Navigator() # navigate_to_pose(x, y, yaw=None, timeout_sec=120, blocking=False) yaw_arg = float(goal_yaw) if goal_yaw is not None else None ok = gs.navigator.navigate_to_pose(float(goal_x), float(goal_y), yaw_arg, blocking=False) if ok: return jsonify({"ok": True, "message": "导航已启动"}) else: return jsonify({"ok": False, "error": "导航启动失败,可能是Nav2未运行或AGV未连接"}), 400 except Exception as e: logger.error(f"导航失败: {e}") return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/navigate/stop", methods=["POST"]) def api_navigate_stop(): """停止导航""" if gs.navigator: gs.navigator.stop() return jsonify({"ok": True, "message": "导航已停止"}) return jsonify({"ok": False, "error": "导航器未初始化"}), 400 @app.route("/api/navigate/cancel", methods=["POST"]) def api_navigate_cancel(): """取消当前导航(别名)""" if gs.navigator: gs.navigator.stop() return jsonify({"ok": True, "message": "导航已取消"}) return jsonify({"ok": True, "message": "无活动导航"}) @app.route("/api/navigate/status", methods=["GET"]) def api_navigate_status(): """获取导航状态""" # 懒初始化 navigator if gs.navigator is None: try: gs.navigator = Nav2Navigator() except Exception as e: logger.warning(f"Nav2Navigator 初始化失败: {e}") if gs.navigator: return jsonify(gs.navigator.get_status()) # navigator 仍为 None,说明 Nav2 不可用 return jsonify({"status": "idle", "current_position": [0, 0, 0], "nav2_available": False}) @app.route("/api/navigate/path", methods=["POST"]) def api_navigate_path(): """预览路径(仅规划不执行)- Nav2版本不支持预计算路径,返回当前导航状态""" if not gs.map_config or "map_yaml" not in gs.map_config: return jsonify({"ok": False, "error": "地图未加载"}), 400 data = request.json goal_x = data.get("x") goal_y = data.get("y") if goal_x is None or goal_y is None: return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400 try: if gs.navigator is None: gs.navigator = Nav2Navigator() # Nav2 不提供路径预览,直接返回可用状态 current = gs.navigator.get_current_position() status = gs.navigator.get_status() return jsonify({ "ok": True, "message": "Nav2 路径预览不可用,请在 RViz 中查看规划路径", "current_position": current, "nav2_available": status.get("nav2_available", False) }) except Exception as e: logger.error(f"路径预览失败: {e}") return jsonify({"ok": False, "error": str(e)}), 500 # ========== 点位配置 API ========== @app.route("/api/points/list", methods=["GET"]) def api_points_list(): """获取所有点位""" return jsonify({"points": gs.points_config}) @app.route("/api/points/add", methods=["POST"]) def api_points_add(): """添加点位(从当前位置)""" data = request.json point_name = data.get("name", f"point_{len(gs.points_config) + 1}") # 获取 AGV 当前位置 position = None if gs.agv_controller and gs.agv_controller.is_connected(): position = gs.agv_controller.get_position() point = { "id": f"p_{int(time.time())}", "name": point_name, "coords": position or [0.0, 0.0, 0.0], # [x, y, yaw] "photo_mode": data.get("photo_mode", "front"), # front / back / both "sequence": data.get("sequence", ["front", "back"]), # both 时的执行顺序 } gs.points_config.append(point) save_json("points_config.json", gs.points_config) return jsonify({"ok": True, "point": point}) @app.route("/api/points/update/", methods=["POST"]) def api_points_update(point_id): """更新点位""" data = request.json for i, p in enumerate(gs.points_config): if p["id"] == point_id: gs.points_config[i].update(data) save_json("points_config.json", gs.points_config) return jsonify({"ok": True}) return jsonify({"ok": False, "error": "点位不存在"}), 404 @app.route("/api/points/delete/", methods=["DELETE"]) def api_points_delete(point_id): """删除点位""" gs.points_config = [p for p in gs.points_config if p["id"] != point_id] save_json("points_config.json", gs.points_config) return jsonify({"ok": True}) @app.route("/api/points/save", methods=["POST"]) def api_points_save(): """保存点位配置""" data = request.json gs.points_config = data.get("points", []) save_json("points_config.json", gs.points_config) return jsonify({"ok": True, "count": len(gs.points_config)}) @app.route("/api/points/load", methods=["GET"]) def api_points_load(): """加载已保存的点位配置""" loaded = load_json("points_config.json", []) if loaded: gs.points_config = loaded return jsonify({"ok": True, "points": loaded or []}) # ========== 机型(姿态组)API ========== @app.route("/api/models/list", methods=["GET"]) def api_models_list(): """获取所有机型""" return jsonify({"models": gs.models_config}) @app.route("/api/models/add", methods=["POST"]) def api_models_add(): """添加机型""" data = request.json model_name = data.get("name", f"model_{len(gs.models_config) + 1}") model = { "id": f"m_{int(time.time())}", "name": model_name, "serial_prefix": data.get("serial_prefix", ""), # 二维码型号前缀 "poses": [], # 姿态列表 "description": data.get("description", ""), "notes": data.get("notes", ""), # 备注 } gs.models_config.append(model) save_json("models_config.json", gs.models_config) return jsonify({"ok": True, "model": model}) @app.route("/api/models/update/", methods=["POST"]) def api_models_update(model_id): """更新机型""" data = request.json for i, m in enumerate(gs.models_config): if m["id"] == model_id: gs.models_config[i].update(data) save_json("models_config.json", gs.models_config) return jsonify({"ok": True}) return jsonify({"ok": False, "error": "机型不存在"}), 404 @app.route("/api/models/delete/", methods=["DELETE"]) def api_models_delete(model_id): """删除机型""" gs.models_config = [m for m in gs.models_config if m["id"] != model_id] save_json("models_config.json", gs.models_config) return jsonify({"ok": True}) @app.route("/api/models/poses/add", methods=["POST"]) def api_poses_add(): """添加姿态到机型(需指定 model_id)""" data = request.json model_id = data.get("model_id") if not model_id: return jsonify({"ok": False, "error": "缺少 model_id"}), 400 for m in gs.models_config: if m["id"] == model_id: pose = { "id": f"pose_{int(time.time())}", "name": data.get("name", f"姿态{len(m['poses']) + 1}"), "photo_type": data.get("photo_type", "front"), # front / back / nameplate "arm_angles": data.get("arm_angles", [0.0]*6), "speed": data.get("speed", 500), "description": data.get("description", ""), } m["poses"].append(pose) save_json("models_config.json", gs.models_config) return jsonify({"ok": True, "pose": pose}) return jsonify({"ok": False, "error": "机型不存在"}), 404 @app.route("/api/models//poses", methods=["GET"]) def api_model_poses_get(model_id): """获取机型的姿态列表""" for m in gs.models_config: if m["id"] == model_id: return jsonify({"poses": m.get("poses", [])}) return jsonify({"poses": []}) @app.route("/api/models//poses/", methods=["PUT"]) def api_model_poses_update(model_id, pose_id): """更新姿态""" data = request.json for m in gs.models_config: if m["id"] == model_id: for i, pose in enumerate(m.get("poses", [])): if pose["id"] == pose_id: m["poses"][i].update(data) save_json("models_config.json", gs.models_config) return jsonify({"ok": True}) return jsonify({"ok": False}), 404 @app.route("/api/models//poses/", methods=["DELETE"]) def api_model_poses_delete(model_id, pose_id): """删除姿态""" for m in gs.models_config: if m["id"] == model_id: m["poses"] = [p for p in m.get("poses", []) if p["id"] != pose_id] save_json("models_config.json", gs.models_config) return jsonify({"ok": True}) return jsonify({"ok": False}), 404 # ========== 任务配置 API(M×N 网格)========== @app.route("/api/mission/position", methods=["GET"]) def api_mission_position(): """读取 AGV 当前坐标(供设置点位时使用)""" if not gs.agv_controller or not gs.agv_controller.is_connected(): return jsonify({"ok": False, "error": "AGV 未连接"}), 400 pos = gs.agv_controller.get_position() if not pos: return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400 battery = gs.agv_controller.get_battery() return jsonify({"ok": True, "position": pos, "battery": battery}) @app.route("/api/mission/init_pose", methods=["POST"]) def api_mission_init_pose(): """将 AMCL 初始位置设为 (0,0,0),无需 RViz""" try: script = "/tmp/ros2_init_pose.sh" yaml_content = ( "pose:\n" " header:\n" " stamp:\n" " sec: 0\n" " nanosec: 0\n" " frame_id: map\n" " pose:\n" " position:\n" " x: 0.0\n" " y: 0.0\n" " z: 0.0\n" " covariance: [0.25, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.25, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]\n" " orientation:\n" " x: 0.0\n" " y: 0.0\n" " z: 0.0\n" " w: 1.0\n" ) goal_file = "/tmp/nav2_goal_{}.yaml".format(os.getpid()) with open(goal_file, "w") as f: f.write(yaml_content) lines = [ "#!/bin/bash", "export ROS_DOMAIN_ID=1", "source /opt/ros/humble/setup.bash", "source /home/elephant/agv_pro_ros2/install/setup.bash", f'ros2 topic pub --once /initialpose geometry_msgs/PoseWithCovarianceStamped "$(cat {goal_file})"', ] with open(script, "w") as f: f.write("\n".join(lines) + "\n") os.chmod(script, 0o755) result = subprocess.run( [script], capture_output=True, text=True, timeout=12, env={**os.environ, "ROS_DOMAIN_ID": "1"} ) logger.info(f"init_pose: rc={result.returncode}, stdout={result.stdout[:200]}, stderr={result.stderr[:200]}") return jsonify({"ok": True, "message": "初始位置已设为 (0,0,0)"}) except Exception as e: logger.error(f"初始化位置失败: {e}") return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/mission/config", methods=["GET"]) def api_mission_config_get(): """获取任务配置(网格尺寸和空位矩阵)""" return jsonify({"ok": True, "config": gs.mission_config, "machines": gs.machines_config}) @app.route("/api/mission/config", methods=["POST"]) def api_mission_config_set(): """"设置任务配置(网格尺寸+空位矩阵+机械臂初始姿态)""" data = request.json rows = data.get("rows", 2) cols = data.get("cols", 3) grid = data.get("grid", []) arm_initial_pose = data.get("arm_initial_pose", [0.0] * 6) gs.mission_config["rows"] = rows gs.mission_config["cols"] = cols gs.mission_config["grid"] = grid gs.mission_config["arm_initial_pose"] = arm_initial_pose # 清除超出网格边界的 positions(只保留 front/back 且 row<=rows, col", methods=["GET"]) def api_mission_machine_get(machine_id): """获取单台机器配置""" for m in gs.machines_config: if m["id"] == machine_id: return jsonify({"ok": True, "machine": m}) return jsonify({"ok": False, "error": "机器不存在"}), 404 @app.route("/api/mission/machines", methods=["POST"]) def api_mission_machines_save(): """"批量保存/更新机器配置""" data = request.json machines = data.get("machines", []) gs.machines_config = machines save_json("machines_config.json", gs.machines_config) return jsonify({"ok": True, "count": len(gs.machines_config)}) @app.route("/api/mission/machines/add", methods=["POST"]) def api_mission_machine_add(): """添加单台机器配置""" data = request.json machine_id = data.get("id", f"m_{data.get('row', 0)}_{data.get('col', 0)}") # 检查是否已存在 for m in gs.machines_config: if m["id"] == machine_id or (m.get("row") == data.get("row") and m.get("col") == data.get("col")): return jsonify({"ok": False, "error": "该位置已有机器"}), 400 machine = { "id": machine_id, "row": data.get("row", 0), "col": data.get("col", 0), "front": data.get("front", {"coords": [0, 0, 0], "poses": []}), "back": data.get("back", {"coords": [0, 0, 0], "poses": []}), "qr": data.get("qr", {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}), } gs.machines_config.append(machine) save_json("machines_config.json", gs.machines_config) return jsonify({"ok": True, "machine": machine}) @app.route("/api/mission/machines/", methods=["PUT"]) def api_mission_machine_update(machine_id): """更新单台机器配置(正面/背面点位+姿态)""" data = request.json for i, m in enumerate(gs.machines_config): if m["id"] == machine_id: gs.machines_config[i].update(data) save_json("machines_config.json", gs.machines_config) return jsonify({"ok": True}) return jsonify({"ok": False, "error": "机器不存在"}), 404 @app.route("/api/mission/machines/", methods=["DELETE"]) def api_mission_machine_delete(machine_id): """删除单台机器配置""" gs.machines_config = [m for m in gs.machines_config if m["id"] != machine_id] save_json("machines_config.json", gs.machines_config) return jsonify({"ok": True}) @app.route("/api/mission/qr_scan/", methods=["POST"]) def api_mission_qr_scan(machine_id): """扫描二维码并关联到机器""" if not gs.qr_scanner or not gs.qr_scanner._cap: return jsonify({"ok": False, "error": "AGV 摄像头未打开"}), 400 result = gs.qr_scanner.scan_once() if result: # 在 machines_config 和 mission_config 中查找机器 for i, m in enumerate(gs.machines_config): if m["id"] == machine_id: if "qr" not in m: m["qr"] = {"coords": [0, 0, 0], "qr_value": "", "model_id": ""} m["qr"]["qr_value"] = result # 尝试匹配机型(通过 serial_prefix) matched_model = None for model in gs.models_config: prefix = model.get("serial_prefix", "") if prefix and result.startswith(prefix): matched_model = model break if matched_model: m["qr"]["model_id"] = matched_model["id"] save_json("machines_config.json", gs.machines_config) return jsonify({ "ok": True, "qr_value": result, "model_id": m["qr"].get("model_id", ""), "model_name": matched_model["name"] if matched_model else "" }) return jsonify({"ok": False, "error": f"机器 {machine_id} 不存在"}), 404 return jsonify({"ok": False, "error": "未检测到二维码"}) @app.route("/api/mission/poses//", methods=["GET"]) def api_mission_poses_get(machine_id, side): """获取机器指定侧的姿态列表(side: front | back)""" for m in gs.machines_config: if m["id"] == machine_id: return jsonify({"ok": True, "poses": m.get(side, {}).get("poses", [])}) return jsonify({"ok": False, "poses": []}), 404 @app.route("/api/mission/poses//", methods=["POST"]) def api_mission_poses_add(machine_id, side): """添加姿态到机器指定侧(side: front | back)""" data = request.json for m in gs.machines_config: if m["id"] == machine_id: if side not in m: return jsonify({"ok": False, "error": f"机器无此侧: {side}"}), 400 pose = { "id": f"pose_{int(time.time())}", "name": data.get("name", f"姿态"), "arm_angles": data.get("arm_angles", [0.0]*6), "speed": data.get("speed", 500), "description": data.get("description", ""), } m[side]["poses"].append(pose) save_json("machines_config.json", gs.machines_config) return jsonify({"ok": True, "pose": pose}) return jsonify({"ok": False, "error": "机器不存在"}), 404 @app.route("/api/mission/poses///", methods=["DELETE"]) def api_mission_poses_delete(machine_id, side, pose_id): """删除机器指定侧的姿态""" for m in gs.machines_config: if m["id"] == machine_id: if side not in m: return jsonify({"ok": False}), 404 m[side]["poses"] = [p for p in m[side]["poses"] if p["id"] != pose_id] save_json("machines_config.json", gs.machines_config) return jsonify({"ok": True}) return jsonify({"ok": False}), 404 @app.route("/api/mission/generate_sequence", methods=["GET"]) def api_mission_generate_sequence(): """根据网格配置和机器配置生成拍摄序列(蛇形)""" rows = int(gs.mission_config.get("rows", 2)) cols = int(gs.mission_config.get("cols", 3)) grid = gs.mission_config.get("grid", []) machines = gs.machines_config if (not grid or all(not any(row) if isinstance(row, list) else True for row in grid)) and machines: grid = [[False] * cols for _ in range(rows)] for m in machines: r = int(m.get("row", 0)) c = int(m.get("col", 0)) if 0 <= r < rows and 0 <= c < cols: grid[r][c] = True def get_machine(row, col): for m in machines: if m.get("row") == row and m.get("col") == col: return m return None # 点位蛇形序列:同一点位同时有上一行背面和下一行正面时,先背面再正面。 sequence = [] for pr in range(rows + 1): cols_iter = range(cols) if pr % 2 == 0 else range(cols - 1, -1, -1) row_dir = "lr" if pr % 2 == 0 else "rl" for c in cols_iter: if pr > 0 and pr - 1 < len(grid) and c < len(grid[pr - 1]) and grid[pr - 1][c]: m = get_machine(pr - 1, c) if m and m.get("back"): sequence.append({ "machine_id": m["id"], "row": pr - 1, "col": c, "point_row": pr, "side": "back", "row_dir": row_dir }) if pr < rows and pr < len(grid) and c < len(grid[pr]) and grid[pr][c]: m = get_machine(pr, c) if m and m.get("front"): sequence.append({ "machine_id": m["id"], "row": pr, "col": c, "point_row": pr, "side": "front", "row_dir": row_dir }) return jsonify({"ok": True, "sequence": sequence}) # ========== 点位配置 API(独立于机器)========== @app.route("/api/mission/positions", methods=["GET"]) def api_mission_positions_list(): """获取所有点位配置""" positions = gs.mission_config.get("positions", []) return jsonify({"ok": True, "positions": positions}) @app.route("/api/mission/positions", methods=["POST"]) def api_mission_positions_save(): """保存点位配置(单个点位:创建或更新)""" data = request.json row = int(data.get("row", 0)) col = int(data.get("col", 0)) side = data.get("side", "front") # "front" | "back" coords = data.get("coords", [0, 0, 0]) poses = data.get("poses", []) positions = gs.mission_config.setdefault("positions", []) # 查找是否已有该点位 key = (row, col, side) for i, p in enumerate(positions): if int(p.get("row", 0)) == row and int(p.get("col", 0)) == col and p.get("side") == side: positions[i] = {"row": row, "col": col, "side": side, "coords": coords, "poses": poses} save_json("mission_config.json", gs.mission_config) return jsonify({"ok": True, "position": positions[i]}) positions.append({"row": row, "col": col, "side": side, "coords": coords, "poses": poses}) save_json("mission_config.json", gs.mission_config) return jsonify({"ok": True, "position": positions[-1]}) # ========== 机械臂控制 API ========== @app.route("/api/arm/get_angles", methods=["GET"]) def api_arm_get_angles(): """获取当前关节角度""" if not gs.arm_client: return jsonify({"ok": False, "error": "未连接机械臂"}), 400 ok, angles = gs.arm_client.get_angles() return jsonify({"ok": ok, "angles": angles}) @app.route("/api/arm/set_angles", methods=["POST"]) def api_arm_set_angles(): """设置关节角度""" data = request.json or {} angles = data.get("angles", []) speed = data.get("speed", 500) if not isinstance(angles, list) or len(angles) != 6: return jsonify({"ok": False, "error": "角度数据必须包含 6 个关节值"}), 400 try: angles = [float(a) for a in angles] speed = int(speed) except (TypeError, ValueError): return jsonify({"ok": False, "error": "角度或速度格式错误"}), 400 if not gs.arm_client: return jsonify({"ok": False, "error": "未连接机械臂"}), 400 ok = gs.arm_client.set_angles(angles, speed) return jsonify({"ok": ok, "error": None if ok else "机械臂执行失败"}) @app.route("/api/arm/set_angle", methods=["POST"]) def api_arm_set_angle(): """设置单个关节角度""" data = request.json joint = data.get("joint", "J1") angle = data.get("angle", 0.0) speed = data.get("speed", 500) if not gs.arm_client: return jsonify({"ok": False}), 400 ok = gs.arm_client.set_angle(joint, angle, speed) return jsonify({"ok": ok}) @app.route("/api/arm/jog", methods=["POST"]) def api_arm_jog(): """连续调节关节(用于方向键)""" data = request.json joint = data.get("joint", "J1") direction = data.get("direction", 0) # -1 / 0 / 1 speed = data.get("speed", 500) if not gs.arm_client: return jsonify({"ok": False}), 400 ok = gs.arm_client.jog_angle(joint, direction, speed) return jsonify({"ok": ok}) @app.route("/api/arm/get_coords", methods=["GET"]) def api_arm_get_coords(): """获取当前坐标""" if not gs.arm_client: return jsonify({"ok": False}), 400 ok, coords = gs.arm_client.get_coords() return jsonify({"ok": ok, "coords": coords}) @app.route("/api/arm/state_check", methods=["GET"]) def api_arm_state_check(): """检查机械臂状态""" if not gs.arm_client: return jsonify({"ok": False}), 400 ok = gs.arm_client.state_check() return jsonify({"ok": ok, "running": not ok}) @app.route("/api/arm/power_on", methods=["POST"]) def api_arm_power_on(): if not gs.arm_client: return jsonify({"ok": False}), 400 ok = gs.arm_client.power_on() return jsonify({"ok": ok}) @app.route("/api/arm/state_on", methods=["POST"]) def api_arm_state_on(): if not gs.arm_client: return jsonify({"ok": False}), 400 ok = gs.arm_client.state_on() return jsonify({"ok": ok}) @app.route("/api/arm/state_off", methods=["POST"]) def api_arm_state_off(): if not gs.arm_client: return jsonify({"ok": False}), 400 ok = gs.arm_client.state_off() return jsonify({"ok": ok}) # ========== 摄像头预览 API ========== @app.route("/api/camera/preview") def api_camera_preview(): """MJPEG 视频流""" if not gs.qr_scanner or not gs.qr_scanner._cap: return "camera not opened", 400 def gen(): while True: frame = gs.qr_scanner.read_frame() if frame is None: break # 编码为 JPEG import cv2 ret, buf = cv2.imencode(".jpg", frame) if ret: yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + buf.tobytes() + b"\r\n") return Response(gen(), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route("/api/camera/refresh") def api_camera_refresh(): """AGV 摄像头单帧 JPEG(polling 模式)""" if not gs.qr_scanner or not gs.qr_scanner._cap: return "camera not opened", 400 import cv2 frame = gs.qr_scanner.read_frame() if frame is None: return "", 400 ret, buf = cv2.imencode(".jpg", frame) if ret: return Response(buf.tobytes(), mimetype="image/jpeg") return "encode failed", 500 @app.route("/api/camera/capture") def api_camera_capture(): """拍摄一张照片""" if not gs.qr_scanner or not gs.qr_scanner._cap: return jsonify({"ok": False, "error": "摄像头未打开"}), 400 import cv2 frame = gs.qr_scanner.read_frame() if frame is None: return jsonify({"ok": False, "error": "读取帧失败"}), 400 photo_dir = os.path.join(DATA_DIR, "photos") os.makedirs(photo_dir, exist_ok=True) photo_path = os.path.join(photo_dir, f"capture_{int(time.time())}.jpg") cv2.imwrite(photo_path, frame) return jsonify({"ok": True, "path": photo_path}) @app.route("/api/camera/arm_refresh") def api_arm_camera_refresh(): """从机械臂拉一张 JPEG(请求 snapshot 端点,简单 HTTP GET)""" import requests try: r = requests.get(ARM_CAMERA_CONFIG.get("snapshot_url", ARM_CAMERA_CONFIG["url"]), timeout=8) if r.status_code == 200 and r.content: resp = Response(r.content, mimetype="image/jpeg") resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" resp.headers["Expires"] = "0" return resp return "", 404 except Exception as ex: logger.info(f"arm_refresh 不可用: {ex}") return "", 404 @app.route("/api/camera/arm_preview") def api_arm_camera_preview(): """代理机械臂 MJPEG 视频流,供页面连续预览。""" import requests try: upstream = requests.get( ARM_CAMERA_CONFIG["url"], stream=True, timeout=(3, 10), ) if upstream.status_code != 200: upstream.close() return "", 404 def generate(): try: for chunk in upstream.iter_content(chunk_size=8192): if chunk: yield chunk finally: upstream.close() return Response( generate(), mimetype="multipart/x-mixed-replace; boundary=frame", headers={"Cache-Control": "no-cache, no-store, must-revalidate, max-age=0"}, ) except Exception as ex: logger.info(f"arm_preview 不可用: {ex}") return "", 404 @app.route("/api/camera/qr_scan", methods=["GET"]) def api_qr_scan(): """扫描一次二维码""" if not gs.qr_scanner: return jsonify({"ok": False}), 400 result = gs.qr_scanner.scan_once() return jsonify({"ok": bool(result), "data": result}) # ========== AGV 移动控制 API ========== @app.route("/api/agv/move", methods=["POST"]) def api_agv_move(): """控制 AGV 移动(前进/后退/左转/右转/停止)""" data = request.json direction = data.get("direction", "stop") # forward / backward / left / right / stop speed = data.get("speed", AGV_CONFIG.get("move_speed", 0.5)) if not gs.agv_controller or not gs.agv_controller.is_connected(): return jsonify({"ok": False, "error": "AGV 未连接"}), 400 try: if direction == "forward": gs.agv_controller.move_forward(speed) elif direction == "backward": gs.agv_controller.move_backward(speed) elif direction == "left": gs.agv_controller.turn_left(speed) elif direction == "right": gs.agv_controller.turn_right(speed) elif direction == "left_lateral": gs.agv_controller.move_left_lateral(speed) elif direction == "right_lateral": gs.agv_controller.move_right_lateral(speed) elif direction == "stop": gs.agv_controller.stop() else: return jsonify({"ok": False, "error": "未知方向"}), 400 return jsonify({"ok": True}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/agv/position", methods=["GET"]) def api_agv_position(): """获取 AGV 当前位置""" if not gs.agv_controller or not gs.agv_controller.is_connected(): return jsonify({"ok": False, "error": "AGV 未连接"}), 400 pos = gs.agv_controller.get_position() if not pos: return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400 battery = gs.agv_controller.get_battery() return jsonify({"ok": True, "position": pos, "battery": battery}) @app.route("/api/agv/stop", methods=["POST"]) def api_agv_stop(): """立即停止 AGV""" if gs.agv_controller: gs.agv_controller.stop() return jsonify({"ok": True}) @app.route("/api/agv/reset", methods=["POST"]) def api_agv_reset(): """撞物体后复位 - 停止运动并重新检查 ROS2 连接""" import time if not gs.agv_controller: return jsonify({"ok": False, "error": "AGV 控制器未初始化"}), 400 try: gs.agv_controller.stop() time.sleep(0.5) if gs.agv_controller.connect(): return jsonify({"ok": True, "message": "复位成功,AGV 已停止并重新连接"}) return jsonify({"ok": False, "error": "AGV 已停止,但 ROS2 连接检查失败"}), 500 except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 # ========== 任务执行 API ========== @app.route("/api/mission/start", methods=["POST"]) def api_mission_start(): """开始执行任务(V3: M×N Grid 蛇形路径)""" data = request.json or {} single_step = bool(data.get("single_step", False)) # 任务步骤控制开关 options = { "arm_init": bool(data.get("arm_init", True)), "agv_move": bool(data.get("agv_move", True)), "qr_scan": bool(data.get("qr_scan", True)), "front_photo": bool(data.get("front_photo", True)), "back_photo": bool(data.get("back_photo", True)), "agv_speed": float(data.get("agv_speed", 0.5)), "arm_speed": int(data.get("arm_speed", 500)), } print(f"[Mission] options: {options}") existing = getattr(MissionExecutorV3, "_instance", None) if existing and existing.report.get("status") not in ("idle", "completed"): return jsonify({"ok": False, "error": "任务已在运行中"}), 400 MissionExecutorV3._instance = None def run(single_step): from config import AGV_CONFIG config = { "device": AGV_CONFIG.get("device", "/dev/agvpro_controller"), "baudrate": AGV_CONFIG.get("baudrate", 1000000), "arm": ARM_CONFIG, } executor = MissionExecutorV3(config) try: conn = executor.connect_all() # 当所有机械臂相关步骤都关闭时,机械臂连接为可选 need_arm = options.get("qr_scan", True) or options.get("front_photo", True) or options.get("back_photo", True) if not conn.get("agv"): gs.mission_report = {"error": "AGV 连接失败", "details": conn} gs.state = State.IDLE return if need_arm and not conn.get("arm"): gs.mission_report = {"error": "机械臂连接失败,请先连接机械臂", "details": conn} gs.state = State.IDLE return gs.state = State.RUNNING machines_list = gs.machines_config if isinstance(gs.machines_config, list) else gs.machines_config.get("machines", []) models_list = gs.models_config if isinstance(gs.models_config, list) else gs.models_config.get("models", []) report = executor.execute_mission( mission_config=gs.mission_config, machines=machines_list, qr_configs=gs.qr_config, models=models_list, single_step=single_step, options=options, ) gs.mission_report = report gs.state = State.IDLE if report.get("error") is None else State.PAUSED finally: executor.disconnect_all() thread = threading.Thread(target=run, args=(single_step,), daemon=True) thread.start() return jsonify({"ok": True, "single_step": single_step}) @app.route("/api/mission/stop", methods=["POST"]) def api_mission_stop(): """停止任务""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.stop() gs.state = State.IDLE return jsonify({"ok": True}) @app.route("/api/mission/pause", methods=["POST"]) def api_mission_pause(): if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.pause() gs.state = State.PAUSED return jsonify({"ok": True}) @app.route("/api/mission/resume", methods=["POST"]) def api_mission_resume(): if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.resume() gs.state = State.RUNNING return jsonify({"ok": True}) @app.route("/api/mission/report", methods=["GET"]) def api_mission_report(): return jsonify({"report": gs.mission_report}) @app.route("/api/mission/state", methods=["GET"]) def api_mission_state(): """返回任务状态 + 预生成任务列表""" result = {"state": gs.state} # 如果有执行器实例,合并执行器状态 if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: ex = MissionExecutorV3._instance result.update(ex.get_status()) # 从配置文件预生成任务列表(不依赖 MissionExecutorV3 类) try: base = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(base, "data", "mission_config.json")) as jf: mc = json.load(jf) machines = [] try: with open(os.path.join(base, "data", "machines_config.json")) as jf: machines = json.load(jf) except: pass rows = int(mc.get("rows", 1)) cols = int(mc.get("cols", 1)) grid = mc.get("grid", []) # 如果 grid 为空,从 machines 重建(只取 rows×cols 范围内的机器) if (not grid or all(not any(rw) if isinstance(rw, list) else True for rw in grid)) and machines: grid = [[False] * cols for _ in range(rows)] for m in machines: r = int(m.get("row", 0)) c = int(m.get("col", 0)) if 0 <= r < rows and 0 <= c < cols: grid[r][c] = True # 蛇形路径 path = [] for r in range(rows): if r % 2 == 0: for c in range(cols): if r < len(grid) and c < len(grid[r]) and grid[r][c]: path.append((r, c)) else: for c in range(cols - 1, -1, -1): if r < len(grid) and c < len(grid[r]) and grid[r][c]: path.append((r, c)) # 网格级别任务数据 result["rows"] = rows result["cols"] = cols result["grid"] = grid result["point_status"] = {f"{pr}_{c}": "pending" for pr in range(rows+1) for c in range(cols)} result["machine_status"] = {} for r in range(rows): for c in range(cols): if r < len(grid) and c < len(grid[r]) and grid[r][c]: result["machine_status"][f"{r}_{c}"] = { "has_machine": True, "qr": "pending", "qr_val": None, "front": "pending", "front_cnt": 0, "back": "pending", "back_cnt": 0, "status": "pending", "step": "等待", } # 保留旧的 tasks 列表(兼容) tlist = [] for (r, c) in path: tlist.append({ "row": r, "col": c, "machine_id": "m_{}_{}".format(r, c), "label": "{}-{}".format(r+1, c+1), "status": "pending", "step": "等待", "qr_value": None, "photos_front": 0, "photos_back": 0, }) result["tasks"] = tlist except Exception: result["rows"] = 1 result["cols"] = 1 result["grid"] = [] result["point_status"] = {} result["machine_status"] = {} result["tasks"] = [] # 机械臂摄像头状态 result["arm_camera_opened"] = gs.arm_camera_opened # 错误弹窗状态和实时网格状态 if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: ex = MissionExecutorV3._instance st = ex.get_status() result["error_msg"] = st.get("error", "") result["waiting_step"] = (st.get("status") == "waiting_step") result["waiting_error"] = (st.get("status") == "waiting_error") # 从 executor.report 读取实时点/机器状态 rpt = ex.report if rpt.get("point_status"): result["point_status"] = rpt["point_status"] if rpt.get("machine_status"): result["machine_status"] = rpt["machine_status"] else: result["error_msg"] = "" result["waiting_step"] = False result["waiting_error"] = False return jsonify(result) @app.route("/api/mission/log", methods=["GET"]) def api_mission_log(): """返回实时日志""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: ex = MissionExecutorV3._instance return jsonify(ex.get_logs()) return jsonify(gs.mission_report or {"log": []}) @app.route("/api/mission/manual-qr", methods=["POST"]) def api_mission_manual_qr(): """手动输入二维码值""" data = request.json or {} qr = data.get("qr", "").strip() if not qr: return jsonify({"ok": False, "error": "二维码不能为空"}), 400 if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.set_manual_qr(qr) return jsonify({"ok": True}) return jsonify({"ok": False, "error": "没有运行中的任务"}), 400 # ========== 错误弹窗 API ========== @app.route("/api/mission/error-skip", methods=["POST"]) def api_mission_error_skip(): """用户选择:跳过当前错误""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.set_error_choice("skip") return jsonify({"ok": True, "choice": "skip"}) return jsonify({"ok": False, "error": "没有运行中的任务"}), 400 @app.route("/api/mission/error-abort", methods=["POST"]) def api_mission_error_abort(): """用户选择:中断任务""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.set_error_choice("abort") return jsonify({"ok": True, "choice": "abort"}) return jsonify({"ok": False, "error": "没有运行中的任务"}), 400 # ========== 单步执行 API ========== @app.route("/api/mission/singlestep/confirm", methods=["POST"]) def api_mission_singlestep_confirm(): """单步执行:确认当前步骤正确,继续""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.set_step_choice("confirm") return jsonify({"ok": True, "choice": "confirm"}) return jsonify({"ok": False, "error": "没有运行中的任务"}), 400 @app.route("/api/mission/singlestep/retry", methods=["POST"]) def api_mission_singlestep_retry(): """单步执行:重试当前步骤""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.set_step_choice("retry") return jsonify({"ok": True, "choice": "retry"}) return jsonify({"ok": False, "error": "没有运行中的任务"}), 400 @app.route("/api/mission/singlestep/abort", methods=["POST"]) def api_mission_singlestep_abort(): """单步执行:中断任务""" if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance: MissionExecutorV3._instance.set_step_choice("abort") return jsonify({"ok": True, "choice": "abort"}) return jsonify({"ok": False, "error": "没有运行中的任务"}), 400 # ========== 二维码配置 API ========== @app.route("/api/qr/configs", methods=["GET"]) def api_qr_configs_get(): """获取所有二维码配置""" return jsonify({"ok": True, "configs": gs.qr_config}) @app.route("/api/qr/configs", methods=["POST"]) def api_qr_configs_add(): """添加二维码配置""" data = request.json or {} new_entry = { "id": "qr_" + str(int(time.time() * 1000)), "name": data.get("name", f"二维码{len(gs.qr_config) + 1}"), "joint_angles": data.get("joint_angles", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), "qr_value": "", "model_id": "", } gs.qr_config.append(new_entry) save_json("qr_config.json", gs.qr_config) return jsonify({"ok": True, "entry": new_entry}) @app.route("/api/qr/configs/", methods=["PUT"]) def api_qr_configs_update(qr_id): """更新二维码配置""" data = request.json or {} for entry in gs.qr_config: if entry["id"] == qr_id: if "name" in data: entry["name"] = data["name"] if "joint_angles" in data: entry["joint_angles"] = data["joint_angles"] if "qr_value" in data: entry["qr_value"] = data["qr_value"] if "model_id" in data: entry["model_id"] = data["model_id"] save_json("qr_config.json", gs.qr_config) return jsonify({"ok": True, "entry": entry}) return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404 @app.route("/api/qr/configs/", methods=["DELETE"]) def api_qr_configs_delete(qr_id): """删除二维码配置""" for i, entry in enumerate(gs.qr_config): if entry["id"] == qr_id: gs.qr_config.pop(i) save_json("qr_config.json", gs.qr_config) return jsonify({"ok": True}) return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404 @app.route("/api/qr/configs//read-angles", methods=["POST"]) def api_qr_read_angles(qr_id): """读取机械臂当前关节角度并保存到指定二维码配置""" if not gs.arm_client: return jsonify({"ok": False, "error": "机械臂未连接"}), 400 ok, angles = gs.arm_client.get_angles() if not ok or not angles: return jsonify({"ok": False, "error": "无法读取机械臂角度"}), 400 for entry in gs.qr_config: if entry["id"] == qr_id: entry["joint_angles"] = list(angles) save_json("qr_config.json", gs.qr_config) return jsonify({"ok": True, "joint_angles": entry["joint_angles"]}) return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404 @app.route("/api/qr/scan/", methods=["POST"]) def api_qr_config_scan(qr_id): """获取机械臂摄像头图像,识别二维码并保存到指定配置项""" import requests try: import cv2 import numpy as np # 从机械臂摄像头 snapshot 端点拉取一帧 JPEG r = requests.get(ARM_CAMERA_CONFIG.get("snapshot_url", ARM_CAMERA_CONFIG["url"]), timeout=8) if r.status_code != 200 or not r.content: return jsonify({"ok": False, "error": "无法连接机械臂摄像头"}), 400 jpg_bytes = r.content # 解码为 numpy 数组并检测二维码 nparr = np.frombuffer(jpg_bytes, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return jsonify({"ok": False, "error": "图像解码失败"}), 400 # 使用 OpenCV QRCodeDetector 检测 detector = cv2.QRCodeDetector() result, _, _ = detector.detectAndDecode(frame) if result and len(result.strip()) > 0: result = result.strip() # 保存到配置项 for entry in gs.qr_config: if entry["id"] == qr_id: entry["qr_value"] = result # 尝试匹配机型 matched_model = None for model in gs.models_config: prefix = model.get("serial_prefix", "") if prefix and result.startswith(prefix): matched_model = model break if matched_model: entry["model_id"] = matched_model["id"] save_json("qr_config.json", gs.qr_config) return jsonify({ "ok": True, "qr_value": result, "model_id": entry.get("model_id", ""), "model_name": matched_model["name"] if matched_model else "" }) return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404 else: return jsonify({"ok": False, "error": "未检测到二维码"}) except Exception as ex: logger.error(f"QR 扫描机械臂摄像头失败: {ex}") return jsonify({"ok": False, "error": f"扫描失败: {str(ex)}"}), 400 # ========== 静态资源 ========== @app.route("/photos/") def photos(name): return send_from_directory(os.path.join(DATA_DIR, "photos"), name) # ========== 启动 ========== if __name__ == "__main__": logger.info("=" * 50) logger.info("AGV 拍摄系统启动") logger.info(f"监听 {SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}") logger.info("=" * 50) # 启动时自动初始化 AGV 摄像头 gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"]) gs.camera_opened = gs.qr_scanner.open() logger.info(f"AGV 摄像头初始化: {'成功' if gs.camera_opened else '失败'}") # 启动时自动检测机械臂摄像头 try: import requests as _startup_req r = _startup_req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5) gs.arm_camera_opened = (r.status_code == 200) r.close() logger.info(f"机械臂摄像头检测: {'成功' if gs.arm_camera_opened else '失败'}") except Exception as _e: gs.arm_camera_opened = False logger.warning(f"机械臂摄像头检测失败: {_e}") app.run( host=SERVER_CONFIG["host"], port=SERVER_CONFIG["port"], debug=SERVER_CONFIG["debug"], threaded=True )