1479 lines
56 KiB
Python
1479 lines
56 KiB
Python
"""
|
||
AGV 拍摄系统 - Flask 主程序
|
||
运行在 AGV 上,端口 5000
|
||
"""
|
||
import os
|
||
import json
|
||
import time
|
||
import logging
|
||
import threading
|
||
import subprocess
|
||
from flask import Flask, render_template, jsonify, request, Response, send_from_directory
|
||
from flask_cors import CORS
|
||
|
||
from config import SERVER_CONFIG, ARM_CONFIG, AGV_CONFIG, UPLOAD_CONFIG, MAP_CONFIG, ARM_CAMERA_CONFIG, CAMERA_CONFIG, DATA_DIR, State
|
||
from utils.arm_client import ArmClient
|
||
from utils.agv_controller_ros2 import AGVController
|
||
from utils.qr_scanner import QRScanner
|
||
from utils.image_uploader import ImageUploader
|
||
from utils.mission_executor import MissionExecutor, TaskStatus
|
||
from utils.nav2_navigator import Nav2Navigator, Nav2Status
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||
)
|
||
logger = logging.getLogger("agv_app")
|
||
|
||
app = Flask(__name__, template_folder="templates", static_folder="static", static_url_path="/static")
|
||
app.config["SECRET_KEY"] = SERVER_CONFIG["secret_key"]
|
||
CORS(app)
|
||
|
||
# ========== 全局状态 ==========
|
||
class GlobalState:
|
||
def __init__(self):
|
||
self.state = State.IDLE # setting / running / paused / idle
|
||
self.mission_data = None # 当前任务配置
|
||
self.mission_report = None # 任务执行报告
|
||
self.arm_client = None # ArmClient 实例
|
||
self.agv_controller = None # AGVController 实例
|
||
self.qr_scanner = None # QRScanner 实例
|
||
self.camera_opened = False
|
||
self.arm_camera_opened = False
|
||
self.map_config = {} # 地图配置
|
||
self.points_config = [] # 点位配置
|
||
self.models_config = [] # 机型配置(姿态)
|
||
self.mission_config = { # 任务配置(M×N网格)
|
||
"rows": 2,
|
||
"cols": 3,
|
||
"grid": [], # M×N 布尔矩阵
|
||
"positions": [] # 独立点位配置 [{row, col, side, coords, poses}]
|
||
}
|
||
self.machines_config = [] # 机器配置(每台机器的正面/背面点位+姿态)
|
||
self.qr_config = [] # 二维码配置(独立点位列表)
|
||
self.navigator = None # Nav2Navigator 实例
|
||
self.lock = threading.Lock()
|
||
|
||
def reset(self):
|
||
with self.lock:
|
||
self.state = State.IDLE
|
||
self.mission_data = None
|
||
self.mission_report = None
|
||
|
||
gs = GlobalState()
|
||
|
||
|
||
# ========== 辅助函数 ==========
|
||
def get_data_path(name):
|
||
return os.path.join(DATA_DIR, name)
|
||
|
||
def save_json(name, data):
|
||
with open(get_data_path(name), "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
def load_json(name, default=None):
|
||
path = get_data_path(name)
|
||
if os.path.exists(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
return default if default is not None else {}
|
||
|
||
|
||
# ========== 启动时加载持久化配置 ==========
|
||
def load_persisted_config():
|
||
"""Flask 启动时加载已保存的配置"""
|
||
# 加载地图配置
|
||
map_cfg = load_json("map_config.json")
|
||
if map_cfg and "map_yaml" in map_cfg:
|
||
gs.map_config = map_cfg
|
||
print(f"[启动] 加载地图配置: {map_cfg['map_yaml']}")
|
||
|
||
# 加载点位配置
|
||
points_cfg = load_json("points_config.json", [])
|
||
if points_cfg:
|
||
gs.points_config = points_cfg
|
||
print(f"[启动] 加载点位配置: {len(points_cfg)} 个点位")
|
||
|
||
# 加载机型配置(姿态)
|
||
models_cfg = load_json("models_config.json", [])
|
||
if models_cfg:
|
||
gs.models_config = models_cfg
|
||
print(f"[启动] 加载机型配置: {len(models_cfg)} 个机型")
|
||
|
||
# 加载任务配置(网格尺寸)
|
||
mission_cfg = load_json("mission_config.json", {})
|
||
if mission_cfg:
|
||
gs.mission_config = mission_cfg
|
||
print(f"[启动] 加载任务配置: {mission_cfg.get('rows', 0)}行×{mission_cfg.get('cols', 0)}列")
|
||
|
||
# 加载机器配置(正面/背面点位+姿态)
|
||
machines_cfg = load_json("machines_config.json", [])
|
||
if machines_cfg:
|
||
gs.machines_config = machines_cfg
|
||
print(f"[启动] 加载机器配置: {len(machines_cfg)} 台机器")
|
||
|
||
# 加载二维码配置
|
||
qr_cfg = load_json("qr_config.json", [])
|
||
if qr_cfg:
|
||
gs.qr_config = qr_cfg
|
||
print(f"[启动] 加载二维码配置: {len(qr_cfg)} 个点位")
|
||
|
||
# 在 Flask 2.3+ 使用 @app.before_serving,兼容旧版用 before_first_request
|
||
try:
|
||
from flask import has_app_context
|
||
# Flask 2.3+ 方式
|
||
with app.app_context():
|
||
load_persisted_config()
|
||
# 启动时自动重连 AGV(异步,不阻塞 Flask 启动)
|
||
import threading
|
||
def _auto_reconnect():
|
||
time.sleep(2) # 等待 Flask 完全就绪
|
||
try:
|
||
from utils.agv_controller_ros2 import AGVController
|
||
gs.agv_controller = AGVController()
|
||
if gs.agv_controller.connect():
|
||
print("[启动] AGV 自动连接成功")
|
||
else:
|
||
print("[启动] AGV 自动连接失败,请手动连接")
|
||
except Exception as e:
|
||
print(f"[启动] AGV 自动连接异常: {e}")
|
||
threading.Thread(target=_auto_reconnect, daemon=True).start()
|
||
except:
|
||
# 兼容旧版 Flask
|
||
@app.before_first_request
|
||
def startup_load():
|
||
load_persisted_config()
|
||
|
||
# ========== 页面路由 ==========
|
||
@app.route("/")
|
||
def index():
|
||
return render_template("index.html")
|
||
|
||
@app.route("/setting")
|
||
def setting_page():
|
||
return render_template("setting.html")
|
||
|
||
@app.route("/running")
|
||
def running_page():
|
||
return render_template("running.html")
|
||
|
||
|
||
# ========== 系统状态 API ==========
|
||
@app.route("/api/status")
|
||
def api_status():
|
||
"""获取系统整体状态"""
|
||
with gs.lock:
|
||
# 实际验证机械臂连接(尝试发送一个简单命令)
|
||
arm_connected = False
|
||
if gs.arm_client and gs.arm_client._sock:
|
||
try:
|
||
# 设置短超时尝试获取角度,验证连接是否有效
|
||
gs.arm_client._sock.settimeout(2)
|
||
ok, _ = gs.arm_client.get_angles()
|
||
arm_connected = ok
|
||
except:
|
||
arm_connected = False
|
||
# 连接已断开,清理 socket
|
||
if gs.arm_client:
|
||
gs.arm_client._sock = None
|
||
|
||
# 实际验证 AGV 连接
|
||
agv_connected = False
|
||
if gs.agv_controller:
|
||
agv_connected = gs.agv_controller.is_connected()
|
||
|
||
return jsonify({
|
||
"state": gs.state,
|
||
"agv_connected": agv_connected,
|
||
"arm_connected": arm_connected,
|
||
"camera_opened": gs.camera_opened,
|
||
"arm_camera_opened": gs.arm_camera_opened,
|
||
"map_loaded": bool(gs.map_config),
|
||
"points_count": len(gs.points_config),
|
||
"models_count": len(gs.models_config),
|
||
"mission_rows": gs.mission_config.get("rows", 0),
|
||
"mission_cols": gs.mission_config.get("cols", 0),
|
||
"machines_count": len(gs.machines_config)
|
||
})
|
||
|
||
@app.route("/api/system/connect", methods=["POST"])
|
||
def api_connect():
|
||
"""连接 AGV 和机械臂"""
|
||
results = {"agv": False, "arm": False, "camera": False, "errors": []}
|
||
|
||
# 连接 AGV
|
||
try:
|
||
gs.agv_controller = AGVController()
|
||
results["agv"] = gs.agv_controller.connect()
|
||
except Exception as e:
|
||
results["errors"].append(f"AGV: {e}")
|
||
|
||
# 连接机械臂
|
||
try:
|
||
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
|
||
results["arm"] = gs.arm_client.connect()
|
||
if results["arm"]:
|
||
# 尝试上电并激活
|
||
gs.arm_client.power_on()
|
||
time.sleep(0.5)
|
||
gs.arm_client.state_on()
|
||
except Exception as e:
|
||
results["errors"].append(f"机械臂: {e}")
|
||
|
||
# 打开摄像头
|
||
try:
|
||
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
|
||
results["camera"] = gs.qr_scanner.open()
|
||
gs.camera_opened = results["camera"]
|
||
except Exception as e:
|
||
results["errors"].append(f"摄像头: {e}")
|
||
|
||
# 检查机械臂摄像头
|
||
try:
|
||
import requests as _req
|
||
r2 = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=3)
|
||
gs.arm_camera_opened = (r2.status_code == 200)
|
||
r2.close()
|
||
except:
|
||
gs.arm_camera_opened = False
|
||
|
||
all_ok = results["agv"] and results["arm"] and results["camera"]
|
||
with gs.lock:
|
||
if all_ok:
|
||
gs.state = State.SETTING
|
||
else:
|
||
gs.state = State.IDLE
|
||
|
||
return jsonify(results)
|
||
|
||
@app.route("/api/system/disconnect", methods=["POST"])
|
||
def api_disconnect():
|
||
"""断开所有连接"""
|
||
if gs.arm_client:
|
||
gs.arm_client.close()
|
||
gs.arm_client = None
|
||
if gs.agv_controller:
|
||
gs.agv_controller.disconnect()
|
||
gs.agv_controller = None
|
||
if gs.qr_scanner:
|
||
gs.qr_scanner.close()
|
||
gs.qr_scanner = None
|
||
gs.camera_opened = False
|
||
gs.state = State.IDLE
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/device/connect", methods=["POST"])
|
||
def api_device_connect():
|
||
"""连接单个设备,device: agv | arm | camera | arm_camera"""
|
||
data = request.json or {}
|
||
device = data.get("device", "")
|
||
result = {"device": device, "ok": False, "error": ""}
|
||
|
||
done_event = threading.Event()
|
||
res_holder = {}
|
||
|
||
def _do_connect():
|
||
try:
|
||
if device == "agv":
|
||
gs.agv_controller = AGVController()
|
||
res_holder["ok"] = gs.agv_controller.connect()
|
||
if not res_holder["ok"]:
|
||
res_holder["error"] = "AGV 连接失败,请检查网络或 ROS2"
|
||
elif device == "arm":
|
||
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
|
||
res_holder["ok"] = gs.arm_client.connect()
|
||
if res_holder["ok"]:
|
||
gs.arm_client.power_on()
|
||
time.sleep(0.3)
|
||
gs.arm_client.state_on()
|
||
else:
|
||
res_holder["error"] = "机械臂连接失败"
|
||
elif device == "camera":
|
||
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
|
||
res_holder["ok"] = gs.qr_scanner.open()
|
||
gs.camera_opened = res_holder["ok"]
|
||
if not res_holder["ok"]:
|
||
res_holder["error"] = "AGV 摄像头打开失败"
|
||
elif device == "arm_camera":
|
||
import requests as _req
|
||
r = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
|
||
res_holder["ok"] = (r.status_code == 200)
|
||
r.close()
|
||
gs.arm_camera_opened = res_holder["ok"]
|
||
if not res_holder["ok"]:
|
||
res_holder["error"] = "机械臂摄像头无响应"
|
||
else:
|
||
res_holder["error"] = f"未知设备: {device}"
|
||
except Exception as e:
|
||
res_holder["error"] = str(e)
|
||
finally:
|
||
done_event.set()
|
||
|
||
t = threading.Thread(target=_do_connect)
|
||
t.daemon = True
|
||
t.start()
|
||
# 最多等 10 秒,超时则返回失败
|
||
if not done_event.wait(10):
|
||
result["ok"] = False
|
||
result["error"] = "连接超时(10秒),设备无响应"
|
||
else:
|
||
result["ok"] = res_holder.get("ok", False)
|
||
result["error"] = res_holder.get("error", "")
|
||
|
||
return jsonify(result)
|
||
|
||
|
||
# ========== 地图配置 API ==========
|
||
@app.route("/api/map/load", methods=["POST"])
|
||
def api_map_load():
|
||
"""加载地图文件"""
|
||
data = request.json
|
||
map_dir = data.get("map_dir", MAP_CONFIG["map_dir"])
|
||
map_file = data.get("map_file", MAP_CONFIG["map_file"])
|
||
map_yaml = os.path.join(map_dir, map_file)
|
||
|
||
if not os.path.exists(map_yaml):
|
||
return jsonify({"ok": False, "error": f"地图文件不存在: {map_yaml}"}), 400
|
||
|
||
gs.map_config = {"map_dir": map_dir, "map_file": map_file, "map_yaml": map_yaml}
|
||
save_json("map_config.json", gs.map_config)
|
||
return jsonify({"ok": True, "map": gs.map_config})
|
||
|
||
@app.route("/api/map/save", methods=["POST"])
|
||
def api_map_save():
|
||
"""保存地图配置"""
|
||
data = request.json
|
||
gs.map_config = data
|
||
save_json("map_config.json", data)
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/map/image")
|
||
def api_map_image():
|
||
"""返回地图图像(PNG)"""
|
||
if not gs.map_config or "map_yaml" not in gs.map_config:
|
||
return jsonify({"error": "地图未加载"}), 404
|
||
|
||
map_yaml = gs.map_config["map_yaml"]
|
||
map_dir = os.path.dirname(map_yaml)
|
||
|
||
# 解析 YAML 获取 PGM 文件名
|
||
try:
|
||
import yaml
|
||
with open(map_yaml, 'r') as f:
|
||
meta = yaml.safe_load(f)
|
||
pgm_file = meta.get('image', 'map.pgm')
|
||
pgm_path = os.path.join(map_dir, pgm_file)
|
||
|
||
if not os.path.exists(pgm_path):
|
||
return jsonify({"error": f"PGM 文件不存在: {pgm_path}"}), 404
|
||
|
||
# 读取 PGM 并转换为 PNG
|
||
import cv2
|
||
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
|
||
if img is None:
|
||
return jsonify({"error": "无法读取 PGM 文件"}), 500
|
||
|
||
# 反转颜色(PGM 中 0=占用,255=空闲,ROS 地图显示习惯)
|
||
img = 255 - img
|
||
|
||
# 编码为 PNG
|
||
_, buf = cv2.imencode('.png', img)
|
||
return Response(buf.tobytes(), mimetype='image/png')
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@app.route("/api/map/meta")
|
||
def api_map_meta():
|
||
"""返回地图元数据(分辨率、原点、尺寸)"""
|
||
if not gs.map_config or "map_yaml" not in gs.map_config:
|
||
return jsonify({"ok": False, "error": "地图未加载"}), 404
|
||
|
||
map_yaml = gs.map_config["map_yaml"]
|
||
map_dir = os.path.dirname(map_yaml)
|
||
|
||
try:
|
||
import yaml
|
||
with open(map_yaml, 'r') as f:
|
||
meta = yaml.safe_load(f)
|
||
pgm_file = meta.get('image', 'map.pgm')
|
||
pgm_path = os.path.join(map_dir, pgm_file)
|
||
|
||
import cv2
|
||
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
|
||
height, width = img.shape if img is not None else (0, 0)
|
||
|
||
return jsonify({
|
||
"ok": True,
|
||
"resolution": meta.get('resolution', 0.05),
|
||
"origin": meta.get('origin', [0, 0, 0]),
|
||
"width": width,
|
||
"height": height
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
# ========== 地图导航 API ==========
|
||
|
||
@app.route("/api/navigate/to", methods=["POST"])
|
||
def api_navigate_to():
|
||
"""导航到目标坐标"""
|
||
if not gs.map_config or "map_yaml" not in gs.map_config:
|
||
return jsonify({"ok": False, "error": "地图未加载,请先在设置中加载地图"}), 400
|
||
|
||
data = request.json
|
||
goal_x = data.get("x")
|
||
goal_y = data.get("y")
|
||
goal_yaw = data.get("yaw") # 姿态参数,可选
|
||
if goal_x is None or goal_y is None:
|
||
return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400
|
||
|
||
if not gs.agv_controller or not gs.agv_controller.is_connected():
|
||
return jsonify({"ok": False, "error": "AGV 未连接,请先连接 AGV"}), 400
|
||
|
||
try:
|
||
if gs.navigator is None:
|
||
gs.navigator = Nav2Navigator()
|
||
# navigate_to_pose(x, y, yaw=None, timeout_sec=120, blocking=False)
|
||
yaw_arg = float(goal_yaw) if goal_yaw is not None else None
|
||
ok = gs.navigator.navigate_to_pose(float(goal_x), float(goal_y), yaw_arg, blocking=False)
|
||
if ok:
|
||
return jsonify({"ok": True, "message": "导航已启动"})
|
||
else:
|
||
return jsonify({"ok": False, "error": "导航启动失败,可能是Nav2未运行或AGV未连接"}), 400
|
||
except Exception as e:
|
||
logger.error(f"导航失败: {e}")
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/navigate/stop", methods=["POST"])
|
||
def api_navigate_stop():
|
||
"""停止导航"""
|
||
if gs.navigator:
|
||
gs.navigator.stop()
|
||
return jsonify({"ok": True, "message": "导航已停止"})
|
||
return jsonify({"ok": False, "error": "导航器未初始化"}), 400
|
||
|
||
|
||
@app.route("/api/navigate/cancel", methods=["POST"])
|
||
def api_navigate_cancel():
|
||
"""取消当前导航(别名)"""
|
||
if gs.navigator:
|
||
gs.navigator.stop()
|
||
return jsonify({"ok": True, "message": "导航已取消"})
|
||
return jsonify({"ok": True, "message": "无活动导航"})
|
||
|
||
|
||
@app.route("/api/navigate/status", methods=["GET"])
|
||
def api_navigate_status():
|
||
"""获取导航状态"""
|
||
# 懒初始化 navigator
|
||
if gs.navigator is None:
|
||
try:
|
||
gs.navigator = Nav2Navigator()
|
||
except Exception as e:
|
||
logger.warning(f"Nav2Navigator 初始化失败: {e}")
|
||
if gs.navigator:
|
||
return jsonify(gs.navigator.get_status())
|
||
# navigator 仍为 None,说明 Nav2 不可用
|
||
return jsonify({"status": "idle", "current_position": [0, 0, 0], "nav2_available": False})
|
||
|
||
|
||
@app.route("/api/navigate/path", methods=["POST"])
|
||
def api_navigate_path():
|
||
"""预览路径(仅规划不执行)- Nav2版本不支持预计算路径,返回当前导航状态"""
|
||
if not gs.map_config or "map_yaml" not in gs.map_config:
|
||
return jsonify({"ok": False, "error": "地图未加载"}), 400
|
||
|
||
data = request.json
|
||
goal_x = data.get("x")
|
||
goal_y = data.get("y")
|
||
if goal_x is None or goal_y is None:
|
||
return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400
|
||
|
||
try:
|
||
if gs.navigator is None:
|
||
gs.navigator = Nav2Navigator()
|
||
# Nav2 不提供路径预览,直接返回可用状态
|
||
current = gs.navigator.get_current_position()
|
||
status = gs.navigator.get_status()
|
||
return jsonify({
|
||
"ok": True,
|
||
"message": "Nav2 路径预览不可用,请在 RViz 中查看规划路径",
|
||
"current_position": current,
|
||
"nav2_available": status.get("nav2_available", False)
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"路径预览失败: {e}")
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
# ========== 点位配置 API ==========
|
||
@app.route("/api/points/list", methods=["GET"])
|
||
def api_points_list():
|
||
"""获取所有点位"""
|
||
return jsonify({"points": gs.points_config})
|
||
|
||
@app.route("/api/points/add", methods=["POST"])
|
||
def api_points_add():
|
||
"""添加点位(从当前位置)"""
|
||
data = request.json
|
||
point_name = data.get("name", f"point_{len(gs.points_config) + 1}")
|
||
|
||
# 获取 AGV 当前位置
|
||
position = None
|
||
if gs.agv_controller and gs.agv_controller.is_connected():
|
||
position = gs.agv_controller.get_position()
|
||
|
||
point = {
|
||
"id": f"p_{int(time.time())}",
|
||
"name": point_name,
|
||
"coords": position or [0.0, 0.0, 0.0], # [x, y, yaw]
|
||
"photo_mode": data.get("photo_mode", "front"), # front / back / both
|
||
"sequence": data.get("sequence", ["front", "back"]), # both 时的执行顺序
|
||
}
|
||
gs.points_config.append(point)
|
||
save_json("points_config.json", gs.points_config)
|
||
return jsonify({"ok": True, "point": point})
|
||
|
||
@app.route("/api/points/update/<point_id>", methods=["POST"])
|
||
def api_points_update(point_id):
|
||
"""更新点位"""
|
||
data = request.json
|
||
for i, p in enumerate(gs.points_config):
|
||
if p["id"] == point_id:
|
||
gs.points_config[i].update(data)
|
||
save_json("points_config.json", gs.points_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False, "error": "点位不存在"}), 404
|
||
|
||
@app.route("/api/points/delete/<point_id>", methods=["DELETE"])
|
||
def api_points_delete(point_id):
|
||
"""删除点位"""
|
||
gs.points_config = [p for p in gs.points_config if p["id"] != point_id]
|
||
save_json("points_config.json", gs.points_config)
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/points/save", methods=["POST"])
|
||
def api_points_save():
|
||
"""保存点位配置"""
|
||
data = request.json
|
||
gs.points_config = data.get("points", [])
|
||
save_json("points_config.json", gs.points_config)
|
||
return jsonify({"ok": True, "count": len(gs.points_config)})
|
||
|
||
@app.route("/api/points/load", methods=["GET"])
|
||
def api_points_load():
|
||
"""加载已保存的点位配置"""
|
||
loaded = load_json("points_config.json", [])
|
||
if loaded:
|
||
gs.points_config = loaded
|
||
return jsonify({"ok": True, "points": loaded or []})
|
||
|
||
|
||
# ========== 机型(姿态组)API ==========
|
||
@app.route("/api/models/list", methods=["GET"])
|
||
def api_models_list():
|
||
"""获取所有机型"""
|
||
return jsonify({"models": gs.models_config})
|
||
|
||
@app.route("/api/models/add", methods=["POST"])
|
||
def api_models_add():
|
||
"""添加机型"""
|
||
data = request.json
|
||
model_name = data.get("name", f"model_{len(gs.models_config) + 1}")
|
||
model = {
|
||
"id": f"m_{int(time.time())}",
|
||
"name": model_name,
|
||
"serial_prefix": data.get("serial_prefix", ""), # 二维码型号前缀
|
||
"poses": [], # 姿态列表
|
||
"description": data.get("description", ""),
|
||
"notes": data.get("notes", ""), # 备注
|
||
}
|
||
gs.models_config.append(model)
|
||
save_json("models_config.json", gs.models_config)
|
||
return jsonify({"ok": True, "model": model})
|
||
|
||
@app.route("/api/models/update/<model_id>", methods=["POST"])
|
||
def api_models_update(model_id):
|
||
"""更新机型"""
|
||
data = request.json
|
||
for i, m in enumerate(gs.models_config):
|
||
if m["id"] == model_id:
|
||
gs.models_config[i].update(data)
|
||
save_json("models_config.json", gs.models_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False, "error": "机型不存在"}), 404
|
||
|
||
@app.route("/api/models/delete/<model_id>", methods=["DELETE"])
|
||
def api_models_delete(model_id):
|
||
"""删除机型"""
|
||
gs.models_config = [m for m in gs.models_config if m["id"] != model_id]
|
||
save_json("models_config.json", gs.models_config)
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/models/poses/add", methods=["POST"])
|
||
def api_poses_add():
|
||
"""添加姿态到机型(需指定 model_id)"""
|
||
data = request.json
|
||
model_id = data.get("model_id")
|
||
if not model_id:
|
||
return jsonify({"ok": False, "error": "缺少 model_id"}), 400
|
||
for m in gs.models_config:
|
||
if m["id"] == model_id:
|
||
pose = {
|
||
"id": f"pose_{int(time.time())}",
|
||
"name": data.get("name", f"姿态{len(m['poses']) + 1}"),
|
||
"photo_type": data.get("photo_type", "front"), # front / back / nameplate
|
||
"arm_angles": data.get("arm_angles", [0.0]*6),
|
||
"speed": data.get("speed", 500),
|
||
"description": data.get("description", ""),
|
||
}
|
||
m["poses"].append(pose)
|
||
save_json("models_config.json", gs.models_config)
|
||
return jsonify({"ok": True, "pose": pose})
|
||
return jsonify({"ok": False, "error": "机型不存在"}), 404
|
||
|
||
@app.route("/api/models/<model_id>/poses", methods=["GET"])
|
||
def api_model_poses_get(model_id):
|
||
"""获取机型的姿态列表"""
|
||
for m in gs.models_config:
|
||
if m["id"] == model_id:
|
||
return jsonify({"poses": m.get("poses", [])})
|
||
return jsonify({"poses": []})
|
||
|
||
@app.route("/api/models/<model_id>/poses/<pose_id>", methods=["PUT"])
|
||
def api_model_poses_update(model_id, pose_id):
|
||
"""更新姿态"""
|
||
data = request.json
|
||
for m in gs.models_config:
|
||
if m["id"] == model_id:
|
||
for i, pose in enumerate(m.get("poses", [])):
|
||
if pose["id"] == pose_id:
|
||
m["poses"][i].update(data)
|
||
save_json("models_config.json", gs.models_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False}), 404
|
||
|
||
@app.route("/api/models/<model_id>/poses/<pose_id>", methods=["DELETE"])
|
||
def api_model_poses_delete(model_id, pose_id):
|
||
"""删除姿态"""
|
||
for m in gs.models_config:
|
||
if m["id"] == model_id:
|
||
m["poses"] = [p for p in m.get("poses", []) if p["id"] != pose_id]
|
||
save_json("models_config.json", gs.models_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False}), 404
|
||
|
||
|
||
# ========== 任务配置 API(M×N 网格)==========
|
||
@app.route("/api/mission/position", methods=["GET"])
|
||
def api_mission_position():
|
||
"""读取 AGV 当前坐标(供设置点位时使用)"""
|
||
if not gs.agv_controller or not gs.agv_controller.is_connected():
|
||
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
|
||
pos = gs.agv_controller.get_position()
|
||
if not pos:
|
||
return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400
|
||
battery = gs.agv_controller.get_battery()
|
||
return jsonify({"ok": True, "position": pos, "battery": battery})
|
||
|
||
|
||
@app.route("/api/mission/init_pose", methods=["POST"])
|
||
def api_mission_init_pose():
|
||
"""将 AMCL 初始位置设为 (0,0,0),无需 RViz"""
|
||
try:
|
||
script = "/tmp/ros2_init_pose.sh"
|
||
yaml_content = (
|
||
"pose:\n"
|
||
" header:\n"
|
||
" stamp:\n"
|
||
" sec: 0\n"
|
||
" nanosec: 0\n"
|
||
" frame_id: map\n"
|
||
" pose:\n"
|
||
" position:\n"
|
||
" x: 0.0\n"
|
||
" y: 0.0\n"
|
||
" z: 0.0\n"
|
||
" covariance: [0.25, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.25, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]\n"
|
||
" orientation:\n"
|
||
" x: 0.0\n"
|
||
" y: 0.0\n"
|
||
" z: 0.0\n"
|
||
" w: 1.0\n"
|
||
)
|
||
goal_file = "/tmp/nav2_goal_{}.yaml".format(os.getpid())
|
||
with open(goal_file, "w") as f:
|
||
f.write(yaml_content)
|
||
lines = [
|
||
"#!/bin/bash",
|
||
"export ROS_DOMAIN_ID=1",
|
||
"source /opt/ros/humble/setup.bash",
|
||
"source /home/elephant/agv_pro_ros2/install/setup.bash",
|
||
f'ros2 topic pub --once /initialpose geometry_msgs/PoseWithCovarianceStamped "$(cat {goal_file})"',
|
||
]
|
||
with open(script, "w") as f:
|
||
f.write("\n".join(lines) + "\n")
|
||
os.chmod(script, 0o755)
|
||
result = subprocess.run(
|
||
[script],
|
||
capture_output=True, text=True, timeout=12,
|
||
env={**os.environ, "ROS_DOMAIN_ID": "1"}
|
||
)
|
||
logger.info(f"init_pose: rc={result.returncode}, stdout={result.stdout[:200]}, stderr={result.stderr[:200]}")
|
||
return jsonify({"ok": True, "message": "初始位置已设为 (0,0,0)"})
|
||
except Exception as e:
|
||
logger.error(f"初始化位置失败: {e}")
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/mission/config", methods=["GET"])
|
||
def api_mission_config_get():
|
||
"""获取任务配置(网格尺寸和空位矩阵)"""
|
||
return jsonify({"ok": True, "config": gs.mission_config, "machines": gs.machines_config})
|
||
|
||
@app.route("/api/mission/config", methods=["POST"])
|
||
def api_mission_config_set():
|
||
""""设置任务配置(网格尺寸+空位矩阵)"""
|
||
data = request.json
|
||
rows = data.get("rows", 2)
|
||
cols = data.get("cols", 3)
|
||
grid = data.get("grid", [])
|
||
gs.mission_config["rows"] = rows
|
||
gs.mission_config["cols"] = cols
|
||
gs.mission_config["grid"] = grid
|
||
# 清除超出网格边界的 positions(只保留 front/back 且 row<rows, col<cols)
|
||
gs.mission_config["positions"] = [
|
||
p for p in gs.mission_config.get("positions", [])
|
||
if p.get("row", 0) < rows and p.get("col", 0) < cols and p.get("side") in ("front", "back")
|
||
]
|
||
save_json("mission_config.json", gs.mission_config)
|
||
return jsonify({"ok": True, "config": gs.mission_config})
|
||
|
||
|
||
@app.route("/api/mission/machines", methods=["GET"])
|
||
def api_mission_machines_list():
|
||
"""获取所有机器配置"""
|
||
return jsonify({"ok": True, "machines": gs.machines_config})
|
||
|
||
@app.route("/api/mission/machines/<machine_id>", methods=["GET"])
|
||
def api_mission_machine_get(machine_id):
|
||
"""获取单台机器配置"""
|
||
for m in gs.machines_config:
|
||
if m["id"] == machine_id:
|
||
return jsonify({"ok": True, "machine": m})
|
||
return jsonify({"ok": False, "error": "机器不存在"}), 404
|
||
|
||
@app.route("/api/mission/machines", methods=["POST"])
|
||
def api_mission_machines_save():
|
||
""""批量保存/更新机器配置"""
|
||
data = request.json
|
||
machines = data.get("machines", [])
|
||
gs.machines_config = machines
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({"ok": True, "count": len(gs.machines_config)})
|
||
|
||
@app.route("/api/mission/machines/add", methods=["POST"])
|
||
def api_mission_machine_add():
|
||
"""添加单台机器配置"""
|
||
data = request.json
|
||
machine_id = data.get("id", f"m_{data.get('row', 0)}_{data.get('col', 0)}")
|
||
# 检查是否已存在
|
||
for m in gs.machines_config:
|
||
if m["id"] == machine_id or (m.get("row") == data.get("row") and m.get("col") == data.get("col")):
|
||
return jsonify({"ok": False, "error": "该位置已有机器"}), 400
|
||
machine = {
|
||
"id": machine_id,
|
||
"row": data.get("row", 0),
|
||
"col": data.get("col", 0),
|
||
"front": data.get("front", {"coords": [0, 0, 0], "poses": []}),
|
||
"back": data.get("back", {"coords": [0, 0, 0], "poses": []}),
|
||
"qr": data.get("qr", {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}),
|
||
}
|
||
gs.machines_config.append(machine)
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({"ok": True, "machine": machine})
|
||
|
||
@app.route("/api/mission/machines/<machine_id>", methods=["PUT"])
|
||
def api_mission_machine_update(machine_id):
|
||
"""更新单台机器配置(正面/背面点位+姿态)"""
|
||
data = request.json
|
||
for i, m in enumerate(gs.machines_config):
|
||
if m["id"] == machine_id:
|
||
gs.machines_config[i].update(data)
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False, "error": "机器不存在"}), 404
|
||
|
||
|
||
@app.route("/api/mission/machines/<machine_id>", methods=["DELETE"])
|
||
def api_mission_machine_delete(machine_id):
|
||
"""删除单台机器配置"""
|
||
gs.machines_config = [m for m in gs.machines_config if m["id"] != machine_id]
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/mission/qr_scan/<machine_id>", methods=["POST"])
|
||
def api_mission_qr_scan(machine_id):
|
||
"""扫描二维码并关联到机器"""
|
||
if not gs.qr_scanner or not gs.qr_scanner._cap:
|
||
return jsonify({"ok": False, "error": "AGV 摄像头未打开"}), 400
|
||
result = gs.qr_scanner.scan_once()
|
||
if result:
|
||
# 在 machines_config 和 mission_config 中查找机器
|
||
for i, m in enumerate(gs.machines_config):
|
||
if m["id"] == machine_id:
|
||
if "qr" not in m:
|
||
m["qr"] = {"coords": [0, 0, 0], "qr_value": "", "model_id": ""}
|
||
m["qr"]["qr_value"] = result
|
||
# 尝试匹配机型(通过 serial_prefix)
|
||
matched_model = None
|
||
for model in gs.models_config:
|
||
prefix = model.get("serial_prefix", "")
|
||
if prefix and result.startswith(prefix):
|
||
matched_model = model
|
||
break
|
||
if matched_model:
|
||
m["qr"]["model_id"] = matched_model["id"]
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({
|
||
"ok": True,
|
||
"qr_value": result,
|
||
"model_id": m["qr"].get("model_id", ""),
|
||
"model_name": matched_model["name"] if matched_model else ""
|
||
})
|
||
return jsonify({"ok": False, "error": f"机器 {machine_id} 不存在"}), 404
|
||
return jsonify({"ok": False, "error": "未检测到二维码"})
|
||
|
||
|
||
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["GET"])
|
||
def api_mission_poses_get(machine_id, side):
|
||
"""获取机器指定侧的姿态列表(side: front | back)"""
|
||
for m in gs.machines_config:
|
||
if m["id"] == machine_id:
|
||
return jsonify({"ok": True, "poses": m.get(side, {}).get("poses", [])})
|
||
return jsonify({"ok": False, "poses": []}), 404
|
||
|
||
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["POST"])
|
||
def api_mission_poses_add(machine_id, side):
|
||
"""添加姿态到机器指定侧(side: front | back)"""
|
||
data = request.json
|
||
for m in gs.machines_config:
|
||
if m["id"] == machine_id:
|
||
if side not in m:
|
||
return jsonify({"ok": False, "error": f"机器无此侧: {side}"}), 400
|
||
pose = {
|
||
"id": f"pose_{int(time.time())}",
|
||
"name": data.get("name", f"姿态"),
|
||
"arm_angles": data.get("arm_angles", [0.0]*6),
|
||
"speed": data.get("speed", 500),
|
||
"description": data.get("description", ""),
|
||
}
|
||
m[side]["poses"].append(pose)
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({"ok": True, "pose": pose})
|
||
return jsonify({"ok": False, "error": "机器不存在"}), 404
|
||
|
||
@app.route("/api/mission/poses/<machine_id>/<side>/<pose_id>", methods=["DELETE"])
|
||
def api_mission_poses_delete(machine_id, side, pose_id):
|
||
"""删除机器指定侧的姿态"""
|
||
for m in gs.machines_config:
|
||
if m["id"] == machine_id:
|
||
if side not in m:
|
||
return jsonify({"ok": False}), 404
|
||
m[side]["poses"] = [p for p in m[side]["poses"] if p["id"] != pose_id]
|
||
save_json("machines_config.json", gs.machines_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False}), 404
|
||
|
||
@app.route("/api/mission/generate_sequence", methods=["GET"])
|
||
def api_mission_generate_sequence():
|
||
"""根据网格配置和机器配置生成拍摄序列(蛇形)"""
|
||
rows = gs.mission_config.get("rows", 2)
|
||
cols = gs.mission_config.get("cols", 3)
|
||
grid = gs.mission_config.get("grid", [])
|
||
machines = gs.machines_config
|
||
|
||
def get_machine(row, col):
|
||
for m in machines:
|
||
if m.get("row") == row and m.get("col") == col:
|
||
return m
|
||
return None
|
||
|
||
# 蛇形序列:行0从左到右正面→右到左背面,行1从右到左背面→左到右正面...交替
|
||
sequence = []
|
||
for r in range(rows):
|
||
# 检查该行是否有机器
|
||
has_any = any(grid[r][c] for c in range(cols)) if r < len(grid) else False
|
||
if not has_any:
|
||
continue
|
||
|
||
if r % 2 == 0: # 偶数行:正面从左到右,背面从右到左
|
||
# 正面:从左到右
|
||
for c in range(cols):
|
||
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
|
||
m = get_machine(r, c)
|
||
if m and m.get("front"):
|
||
sequence.append({
|
||
"machine_id": m["id"],
|
||
"row": r, "col": c,
|
||
"side": "front",
|
||
"row_dir": "lr", # 正面时该行的方向
|
||
"row_dir_back": "rl" # 背面时该行的方向
|
||
})
|
||
# 背面:从右到左
|
||
for c in range(cols - 1, -1, -1):
|
||
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
|
||
m = get_machine(r, c)
|
||
if m and m.get("back"):
|
||
sequence.append({
|
||
"machine_id": m["id"],
|
||
"row": r, "col": c,
|
||
"side": "back",
|
||
"row_dir": "lr",
|
||
"row_dir_back": "rl"
|
||
})
|
||
else: # 奇数行:正面从右到左,背面从左到右(方向反转)
|
||
# 背面:从左到右(此行的背面在下一行的前面位置,但这里按用户描述:背面先行)
|
||
for c in range(cols):
|
||
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
|
||
m = get_machine(r, c)
|
||
if m and m.get("back"):
|
||
sequence.append({
|
||
"machine_id": m["id"],
|
||
"row": r, "col": c,
|
||
"side": "back",
|
||
"row_dir": "rl",
|
||
"row_dir_back": "lr"
|
||
})
|
||
# 正面:从右到左
|
||
for c in range(cols - 1, -1, -1):
|
||
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
|
||
m = get_machine(r, c)
|
||
if m and m.get("front"):
|
||
sequence.append({
|
||
"machine_id": m["id"],
|
||
"row": r, "col": c,
|
||
"side": "front",
|
||
"row_dir": "rl",
|
||
"row_dir_back": "lr"
|
||
})
|
||
|
||
return json
|
||
|
||
# ========== 点位配置 API(独立于机器)==========
|
||
@app.route("/api/mission/positions", methods=["GET"])
|
||
def api_mission_positions_list():
|
||
"""获取所有点位配置"""
|
||
positions = gs.mission_config.get("positions", [])
|
||
return jsonify({"ok": True, "positions": positions})
|
||
|
||
@app.route("/api/mission/positions", methods=["POST"])
|
||
def api_mission_positions_save():
|
||
"""保存点位配置(单个点位:创建或更新)"""
|
||
data = request.json
|
||
row = int(data.get("row", 0))
|
||
col = int(data.get("col", 0))
|
||
side = data.get("side", "front") # "front" | "back"
|
||
coords = data.get("coords", [0, 0, 0])
|
||
poses = data.get("poses", [])
|
||
|
||
positions = gs.mission_config.setdefault("positions", [])
|
||
# 查找是否已有该点位
|
||
key = (row, col, side)
|
||
for i, p in enumerate(positions):
|
||
if int(p.get("row", 0)) == row and int(p.get("col", 0)) == col and p.get("side") == side:
|
||
positions[i] = {"row": row, "col": col, "side": side, "coords": coords, "poses": poses}
|
||
save_json("mission_config.json", gs.mission_config)
|
||
return jsonify({"ok": True, "position": positions[i]})
|
||
positions.append({"row": row, "col": col, "side": side, "coords": coords, "poses": poses})
|
||
save_json("mission_config.json", gs.mission_config)
|
||
return jsonify({"ok": True, "position": positions[-1]})
|
||
|
||
|
||
|
||
|
||
# ========== 机械臂控制 API ==========
|
||
@app.route("/api/arm/get_angles", methods=["GET"])
|
||
def api_arm_get_angles():
|
||
"""获取当前关节角度"""
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False, "error": "未连接机械臂"}), 400
|
||
ok, angles = gs.arm_client.get_angles()
|
||
return jsonify({"ok": ok, "angles": angles})
|
||
|
||
@app.route("/api/arm/set_angles", methods=["POST"])
|
||
def api_arm_set_angles():
|
||
"""设置关节角度"""
|
||
data = request.json
|
||
angles = data.get("angles", [])
|
||
speed = data.get("speed", 500)
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False, "error": "未连接机械臂"}), 400
|
||
ok = gs.arm_client.set_angles(angles, speed)
|
||
return jsonify({"ok": ok})
|
||
|
||
@app.route("/api/arm/set_angle", methods=["POST"])
|
||
def api_arm_set_angle():
|
||
"""设置单个关节角度"""
|
||
data = request.json
|
||
joint = data.get("joint", "J1")
|
||
angle = data.get("angle", 0.0)
|
||
speed = data.get("speed", 500)
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok = gs.arm_client.set_angle(joint, angle, speed)
|
||
return jsonify({"ok": ok})
|
||
|
||
@app.route("/api/arm/jog", methods=["POST"])
|
||
def api_arm_jog():
|
||
"""连续调节关节(用于方向键)"""
|
||
data = request.json
|
||
joint = data.get("joint", "J1")
|
||
direction = data.get("direction", 0) # -1 / 0 / 1
|
||
speed = data.get("speed", 500)
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok = gs.arm_client.jog_angle(joint, direction, speed)
|
||
return jsonify({"ok": ok})
|
||
|
||
@app.route("/api/arm/get_coords", methods=["GET"])
|
||
def api_arm_get_coords():
|
||
"""获取当前坐标"""
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok, coords = gs.arm_client.get_coords()
|
||
return jsonify({"ok": ok, "coords": coords})
|
||
|
||
@app.route("/api/arm/state_check", methods=["GET"])
|
||
def api_arm_state_check():
|
||
"""检查机械臂状态"""
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok = gs.arm_client.state_check()
|
||
return jsonify({"ok": ok, "running": not ok})
|
||
|
||
@app.route("/api/arm/power_on", methods=["POST"])
|
||
def api_arm_power_on():
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok = gs.arm_client.power_on()
|
||
return jsonify({"ok": ok})
|
||
|
||
@app.route("/api/arm/state_on", methods=["POST"])
|
||
def api_arm_state_on():
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok = gs.arm_client.state_on()
|
||
return jsonify({"ok": ok})
|
||
|
||
@app.route("/api/arm/state_off", methods=["POST"])
|
||
def api_arm_state_off():
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False}), 400
|
||
ok = gs.arm_client.state_off()
|
||
return jsonify({"ok": ok})
|
||
|
||
|
||
# ========== 摄像头预览 API ==========
|
||
@app.route("/api/camera/preview")
|
||
def api_camera_preview():
|
||
"""MJPEG 视频流"""
|
||
if not gs.qr_scanner or not gs.qr_scanner._cap:
|
||
return "camera not opened", 400
|
||
|
||
def gen():
|
||
while True:
|
||
frame = gs.qr_scanner.read_frame()
|
||
if frame is None:
|
||
break
|
||
# 编码为 JPEG
|
||
import cv2
|
||
ret, buf = cv2.imencode(".jpg", frame)
|
||
if ret:
|
||
yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" +
|
||
buf.tobytes() + b"\r\n")
|
||
|
||
return Response(gen(), mimetype="multipart/x-mixed-replace; boundary=frame")
|
||
|
||
@app.route("/api/camera/refresh")
|
||
def api_camera_refresh():
|
||
"""AGV 摄像头单帧 JPEG(polling 模式)"""
|
||
if not gs.qr_scanner or not gs.qr_scanner._cap:
|
||
return "camera not opened", 400
|
||
import cv2
|
||
frame = gs.qr_scanner.read_frame()
|
||
if frame is None:
|
||
return "", 400
|
||
ret, buf = cv2.imencode(".jpg", frame)
|
||
if ret:
|
||
return Response(buf.tobytes(), mimetype="image/jpeg")
|
||
return "encode failed", 500
|
||
|
||
@app.route("/api/camera/capture")
|
||
def api_camera_capture():
|
||
"""拍摄一张照片"""
|
||
if not gs.qr_scanner or not gs.qr_scanner._cap:
|
||
return jsonify({"ok": False, "error": "摄像头未打开"}), 400
|
||
import cv2
|
||
frame = gs.qr_scanner.read_frame()
|
||
if frame is None:
|
||
return jsonify({"ok": False, "error": "读取帧失败"}), 400
|
||
|
||
photo_dir = os.path.join(DATA_DIR, "photos")
|
||
os.makedirs(photo_dir, exist_ok=True)
|
||
photo_path = os.path.join(photo_dir, f"capture_{int(time.time())}.jpg")
|
||
cv2.imwrite(photo_path, frame)
|
||
return jsonify({"ok": True, "path": photo_path})
|
||
|
||
@app.route("/api/camera/arm_refresh")
|
||
def api_arm_camera_refresh():
|
||
"""从机械臂拉一张 JPEG(流式读第一个完整帧)"""
|
||
import requests
|
||
try:
|
||
r = requests.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=8)
|
||
if r.status_code != 200:
|
||
return "", 404
|
||
data = b""
|
||
for chunk in r.iter_content(chunk_size=8192):
|
||
data += chunk
|
||
# 在累积数据中找 JPEG 完整帧
|
||
s = data.find(b"\xff\xd8")
|
||
e = data.find(b"\xff\xd9", s + 2) if s >= 0 else -1
|
||
if s >= 0 and e > s:
|
||
r.close()
|
||
return Response(data[s:e+2], mimetype="image/jpeg")
|
||
# 累积超过 5MB 还没找到完整帧,说明流异常,放弃
|
||
if len(data) > 5 * 1024 * 1024:
|
||
r.close()
|
||
return "", 404
|
||
r.close()
|
||
return "", 404
|
||
except Exception as ex:
|
||
logger.error(f"arm_refresh 失败: {ex}")
|
||
return "", 404
|
||
|
||
|
||
@app.route("/api/camera/qr_scan", methods=["GET"])
|
||
def api_qr_scan():
|
||
"""扫描一次二维码"""
|
||
if not gs.qr_scanner:
|
||
return jsonify({"ok": False}), 400
|
||
result = gs.qr_scanner.scan_once()
|
||
return jsonify({"ok": bool(result), "data": result})
|
||
|
||
|
||
# ========== AGV 移动控制 API ==========
|
||
@app.route("/api/agv/move", methods=["POST"])
|
||
def api_agv_move():
|
||
"""控制 AGV 移动(前进/后退/左转/右转/停止)"""
|
||
data = request.json
|
||
direction = data.get("direction", "stop") # forward / backward / left / right / stop
|
||
speed = data.get("speed", AGV_CONFIG.get("move_speed", 0.5))
|
||
if not gs.agv_controller or not gs.agv_controller.is_connected():
|
||
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
|
||
try:
|
||
if direction == "forward":
|
||
gs.agv_controller.move_forward(speed)
|
||
elif direction == "backward":
|
||
gs.agv_controller.move_backward(speed)
|
||
elif direction == "left":
|
||
gs.agv_controller.turn_left(speed)
|
||
elif direction == "right":
|
||
gs.agv_controller.turn_right(speed)
|
||
elif direction == "left_lateral":
|
||
gs.agv_controller.move_left_lateral(speed)
|
||
elif direction == "right_lateral":
|
||
gs.agv_controller.move_right_lateral(speed)
|
||
elif direction == "stop":
|
||
gs.agv_controller.stop()
|
||
else:
|
||
return jsonify({"ok": False, "error": "未知方向"}), 400
|
||
return jsonify({"ok": True})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
@app.route("/api/agv/position", methods=["GET"])
|
||
def api_agv_position():
|
||
"""获取 AGV 当前位置"""
|
||
if not gs.agv_controller or not gs.agv_controller.is_connected():
|
||
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
|
||
pos = gs.agv_controller.get_position()
|
||
if not pos:
|
||
return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400
|
||
battery = gs.agv_controller.get_battery()
|
||
return jsonify({"ok": True, "position": pos, "battery": battery})
|
||
|
||
@app.route("/api/agv/stop", methods=["POST"])
|
||
def api_agv_stop():
|
||
"""立即停止 AGV"""
|
||
if gs.agv_controller:
|
||
gs.agv_controller.stop()
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/agv/reset", methods=["POST"])
|
||
def api_agv_reset():
|
||
"""撞物体后复位 - 停止运动并尝试重新上电"""
|
||
import time
|
||
if not gs.agv_controller:
|
||
return jsonify({"ok": False, "error": "AGV 控制器未初始化"}), 400
|
||
try:
|
||
# 1. 先停止运动
|
||
gs.agv_controller.stop()
|
||
time.sleep(0.5)
|
||
|
||
# 2. 检查 AGV 对象是否存在
|
||
agv = gs.agv_controller._agv
|
||
if not agv:
|
||
return jsonify({"ok": False, "error": "AGV 未连接,请重新连接"}), 400
|
||
|
||
# 3. 检查电源状态
|
||
power_on = agv.is_power_on()
|
||
if not power_on:
|
||
# 撞物体后可能自动断电保护,尝试重新上电
|
||
agv.power_on()
|
||
time.sleep(2)
|
||
power_on = agv.is_power_on()
|
||
if power_on:
|
||
gs.agv_controller._connected = True
|
||
return jsonify({"ok": True, "message": "复位成功,已重新上电"})
|
||
else:
|
||
return jsonify({"ok": False, "error": "上电失败,请检查 AGV 状态"}), 500
|
||
else:
|
||
# 电源正常,只需要停止
|
||
return jsonify({"ok": True, "message": "复位成功,AGV 已停止"})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
# ========== 任务执行 API ==========
|
||
@app.route("/api/mission/start", methods=["POST"])
|
||
def api_mission_start():
|
||
"""开始执行任务"""
|
||
if gs.state == State.RUNNING:
|
||
return jsonify({"ok": False, "error": "任务已在运行中"}), 400
|
||
|
||
data = request.json or {}
|
||
mission_data = {
|
||
"map": gs.map_config,
|
||
"points": gs.points_config,
|
||
}
|
||
|
||
def run():
|
||
from config import AGV_CONFIG, UPLOAD_CONFIG
|
||
executor_config = {
|
||
"device": AGV_CONFIG.get("device", "/dev/agvpro_controller"),
|
||
"baudrate": AGV_CONFIG.get("baudrate", 1000000),
|
||
"arm": ARM_CONFIG,
|
||
"upload_url": UPLOAD_CONFIG["url"],
|
||
"upload_timeout": UPLOAD_CONFIG["timeout"],
|
||
"upload_retries": UPLOAD_CONFIG["max_retries"],
|
||
"camera_index": 0,
|
||
}
|
||
executor = MissionExecutor(executor_config)
|
||
|
||
# 连接
|
||
conn_results = executor.connect_all()
|
||
if not conn_results.get("arm") or not conn_results.get("camera"):
|
||
gs.mission_report = {"error": "连接失败", "details": conn_results}
|
||
gs.state = State.IDLE
|
||
return
|
||
|
||
gs.state = State.RUNNING
|
||
report = executor.execute_mission(mission_data)
|
||
gs.mission_report = report
|
||
executor.disconnect_all()
|
||
gs.state = State.IDLE if report["failed"] == 0 else State.PAUSED
|
||
|
||
thread = threading.Thread(target=run, daemon=True)
|
||
thread.start()
|
||
return jsonify({"ok": True, "message": "任务已启动"})
|
||
|
||
@app.route("/api/mission/stop", methods=["POST"])
|
||
def api_mission_stop():
|
||
"""停止任务"""
|
||
if hasattr(MissionExecutor, "_instance"):
|
||
MissionExecutor._instance.stop()
|
||
gs.state = State.IDLE
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/mission/pause", methods=["POST"])
|
||
def api_mission_pause():
|
||
gs.state = State.PAUSED
|
||
return jsonify({"ok": True})
|
||
|
||
@app.route("/api/mission/report", methods=["GET"])
|
||
def api_mission_report():
|
||
return jsonify({"report": gs.mission_report})
|
||
|
||
@app.route("/api/mission/state", methods=["GET"])
|
||
def api_mission_state():
|
||
return jsonify({"state": gs.state})
|
||
|
||
|
||
# ========== 二维码配置 API ==========
|
||
@app.route("/api/qr/configs", methods=["GET"])
|
||
def api_qr_configs_get():
|
||
"""获取所有二维码配置"""
|
||
return jsonify({"ok": True, "configs": gs.qr_config})
|
||
|
||
|
||
@app.route("/api/qr/configs", methods=["POST"])
|
||
def api_qr_configs_add():
|
||
"""添加二维码配置"""
|
||
data = request.json or {}
|
||
new_entry = {
|
||
"id": "qr_" + str(int(time.time() * 1000)),
|
||
"name": data.get("name", f"二维码{len(gs.qr_config) + 1}"),
|
||
"joint_angles": data.get("joint_angles", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
|
||
"qr_value": "",
|
||
"model_id": "",
|
||
}
|
||
gs.qr_config.append(new_entry)
|
||
save_json("qr_config.json", gs.qr_config)
|
||
return jsonify({"ok": True, "entry": new_entry})
|
||
|
||
|
||
@app.route("/api/qr/configs/<qr_id>", methods=["PUT"])
|
||
def api_qr_configs_update(qr_id):
|
||
"""更新二维码配置"""
|
||
data = request.json or {}
|
||
for entry in gs.qr_config:
|
||
if entry["id"] == qr_id:
|
||
if "name" in data:
|
||
entry["name"] = data["name"]
|
||
if "joint_angles" in data:
|
||
entry["joint_angles"] = data["joint_angles"]
|
||
if "qr_value" in data:
|
||
entry["qr_value"] = data["qr_value"]
|
||
if "model_id" in data:
|
||
entry["model_id"] = data["model_id"]
|
||
save_json("qr_config.json", gs.qr_config)
|
||
return jsonify({"ok": True, "entry": entry})
|
||
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
|
||
|
||
|
||
@app.route("/api/qr/configs/<qr_id>", methods=["DELETE"])
|
||
def api_qr_configs_delete(qr_id):
|
||
"""删除二维码配置"""
|
||
for i, entry in enumerate(gs.qr_config):
|
||
if entry["id"] == qr_id:
|
||
gs.qr_config.pop(i)
|
||
save_json("qr_config.json", gs.qr_config)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
|
||
|
||
|
||
@app.route("/api/qr/configs/<qr_id>/read-angles", methods=["POST"])
|
||
def api_qr_read_angles(qr_id):
|
||
"""读取机械臂当前关节角度并保存到指定二维码配置"""
|
||
if not gs.arm_client:
|
||
return jsonify({"ok": False, "error": "机械臂未连接"}), 400
|
||
ok, angles = gs.arm_client.get_angles()
|
||
if not ok or not angles:
|
||
return jsonify({"ok": False, "error": "无法读取机械臂角度"}), 400
|
||
for entry in gs.qr_config:
|
||
if entry["id"] == qr_id:
|
||
entry["joint_angles"] = list(angles)
|
||
save_json("qr_config.json", gs.qr_config)
|
||
return jsonify({"ok": True, "joint_angles": entry["joint_angles"]})
|
||
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
|
||
|
||
|
||
@app.route("/api/qr/scan/<qr_id>", methods=["POST"])
|
||
def api_qr_config_scan(qr_id):
|
||
"""获取机械臂摄像头图像,识别二维码并保存到指定配置项"""
|
||
import requests
|
||
try:
|
||
import cv2
|
||
import numpy as np
|
||
# 从机械臂摄像头拉取一帧 JPEG
|
||
r = requests.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
|
||
if r.status_code != 200:
|
||
return jsonify({"ok": False, "error": "无法连接机械臂摄像头"}), 400
|
||
data = b""
|
||
for chunk in r.iter_content(chunk_size=4096):
|
||
data += chunk
|
||
s = data.find(b"\xff\xd8")
|
||
e = data.find(b"\xff\xd9", s + 2) if s >= 0 else -1
|
||
if s >= 0 and e > s:
|
||
r.close()
|
||
jpg_bytes = data[s:e+2]
|
||
# 解码为 numpy 数组并检测二维码
|
||
nparr = np.frombuffer(jpg_bytes, np.uint8)
|
||
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||
if frame is None:
|
||
return jsonify({"ok": False, "error": "图像解码失败"}), 400
|
||
# 使用 OpenCV QRCodeDetector 检测
|
||
detector = cv2.QRCodeDetector()
|
||
result, _, _ = detector.detectAndDecode(frame)
|
||
if result and len(result.strip()) > 0:
|
||
result = result.strip()
|
||
# 保存到配置项
|
||
for entry in gs.qr_config:
|
||
if entry["id"] == qr_id:
|
||
entry["qr_value"] = result
|
||
# 尝试匹配机型
|
||
matched_model = None
|
||
for model in gs.models_config:
|
||
prefix = model.get("serial_prefix", "")
|
||
if prefix and result.startswith(prefix):
|
||
matched_model = model
|
||
break
|
||
if matched_model:
|
||
entry["model_id"] = matched_model["id"]
|
||
save_json("qr_config.json", gs.qr_config)
|
||
return jsonify({
|
||
"ok": True,
|
||
"qr_value": result,
|
||
"model_id": entry.get("model_id", ""),
|
||
"model_name": matched_model["name"] if matched_model else ""
|
||
})
|
||
return jsonify({"ok": False, "error": f"二维码 {qr_id} 不存在"}), 404
|
||
else:
|
||
return jsonify({"ok": False, "error": "未检测到二维码"})
|
||
if len(data) > 1024 * 1024:
|
||
r.close()
|
||
return jsonify({"ok": False, "error": "摄像头数据流异常"}), 400
|
||
r.close()
|
||
return jsonify({"ok": False, "error": "未收到完整图像帧"}), 400
|
||
except Exception as ex:
|
||
logger.error(f"QR 扫描机械臂摄像头失败: {ex}")
|
||
return jsonify({"ok": False, "error": f"扫描失败: {str(ex)}"}), 400
|
||
|
||
|
||
# ========== 静态资源 ==========
|
||
@app.route("/photos/<name>")
|
||
def photos(name):
|
||
return send_from_directory(os.path.join(DATA_DIR, "photos"), name)
|
||
|
||
|
||
# ========== 启动 ==========
|
||
if __name__ == "__main__":
|
||
logger.info("=" * 50)
|
||
logger.info("AGV 拍摄系统启动")
|
||
logger.info(f"监听 {SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}")
|
||
logger.info("=" * 50)
|
||
# 启动时自动初始化 AGV 摄像头
|
||
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
|
||
gs.camera_opened = gs.qr_scanner.open()
|
||
logger.info(f"AGV 摄像头初始化: {'成功' if gs.camera_opened else '失败'}")
|
||
# 启动时自动检测机械臂摄像头
|
||
try:
|
||
import requests as _startup_req
|
||
r = _startup_req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
|
||
gs.arm_camera_opened = (r.status_code == 200)
|
||
r.close()
|
||
logger.info(f"机械臂摄像头检测: {'成功' if gs.arm_camera_opened else '失败'}")
|
||
except Exception as _e:
|
||
gs.arm_camera_opened = False
|
||
logger.warning(f"机械臂摄像头检测失败: {_e}")
|
||
app.run(
|
||
host=SERVER_CONFIG["host"],
|
||
port=SERVER_CONFIG["port"],
|
||
debug=SERVER_CONFIG["debug"],
|
||
threaded=True
|
||
)
|