Files
smart-inspection/agv_app/app.py
T
2026-05-16 23:47:02 +08:00

1259 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AGV 拍摄系统 - Flask 主程序
运行在 AGV 上,端口 5000
"""
import os
import json
import time
import logging
import threading
import subprocess
from flask import Flask, render_template, jsonify, request, Response, send_from_directory
from flask_cors import CORS
from config import SERVER_CONFIG, ARM_CONFIG, AGV_CONFIG, UPLOAD_CONFIG, MAP_CONFIG, ARM_CAMERA_CONFIG, CAMERA_CONFIG, DATA_DIR, State
from utils.arm_client import ArmClient
from utils.agv_controller_ros2 import AGVController
from utils.qr_scanner import QRScanner
from utils.image_uploader import ImageUploader
from utils.mission_executor import MissionExecutor, TaskStatus
from utils.nav2_navigator import Nav2Navigator, Nav2Status
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("agv_app")
app = Flask(__name__, template_folder="templates", static_folder="static", static_url_path="/static")
app.config["SECRET_KEY"] = SERVER_CONFIG["secret_key"]
CORS(app)
# ========== 全局状态 ==========
class GlobalState:
def __init__(self):
self.state = State.IDLE # setting / running / paused / idle
self.mission_data = None # 当前任务配置
self.mission_report = None # 任务执行报告
self.arm_client = None # ArmClient 实例
self.agv_controller = None # AGVController 实例
self.qr_scanner = None # QRScanner 实例
self.camera_opened = False
self.arm_camera_opened = False
self.map_config = {} # 地图配置
self.points_config = [] # 点位配置
self.models_config = [] # 机型配置(姿态)
self.mission_config = { # 任务配置(M×N网格)
"rows": 2,
"cols": 3,
"grid": [], # M×N 布尔矩阵
"positions": [] # 独立点位配置 [{row, col, side, coords, poses}]
}
self.machines_config = [] # 机器配置(每台机器的正面/背面点位+姿态)
self.navigator = None # Nav2Navigator 实例
self.lock = threading.Lock()
def reset(self):
with self.lock:
self.state = State.IDLE
self.mission_data = None
self.mission_report = None
gs = GlobalState()
# ========== 辅助函数 ==========
def get_data_path(name):
return os.path.join(DATA_DIR, name)
def save_json(name, data):
with open(get_data_path(name), "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_json(name, default=None):
path = get_data_path(name)
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
return default if default is not None else {}
# ========== 启动时加载持久化配置 ==========
def load_persisted_config():
"""Flask 启动时加载已保存的配置"""
# 加载地图配置
map_cfg = load_json("map_config.json")
if map_cfg and "map_yaml" in map_cfg:
gs.map_config = map_cfg
print(f"[启动] 加载地图配置: {map_cfg['map_yaml']}")
# 加载点位配置
points_cfg = load_json("points_config.json", [])
if points_cfg:
gs.points_config = points_cfg
print(f"[启动] 加载点位配置: {len(points_cfg)} 个点位")
# 加载机型配置(姿态)
models_cfg = load_json("models_config.json", [])
if models_cfg:
gs.models_config = models_cfg
print(f"[启动] 加载机型配置: {len(models_cfg)} 个机型")
# 加载任务配置(网格尺寸)
mission_cfg = load_json("mission_config.json", {})
if mission_cfg:
gs.mission_config = mission_cfg
print(f"[启动] 加载任务配置: {mission_cfg.get('rows', 0)}行×{mission_cfg.get('cols', 0)}")
# 加载机器配置(正面/背面点位+姿态)
machines_cfg = load_json("machines_config.json", [])
if machines_cfg:
gs.machines_config = machines_cfg
print(f"[启动] 加载机器配置: {len(machines_cfg)} 台机器")
# 在 Flask 2.3+ 使用 @app.before_serving,兼容旧版用 before_first_request
try:
from flask import has_app_context
# Flask 2.3+ 方式
with app.app_context():
load_persisted_config()
except:
# 兼容旧版 Flask
@app.before_first_request
def startup_load():
load_persisted_config()
# ========== 页面路由 ==========
@app.route("/")
def index():
return render_template("index.html")
@app.route("/setting")
def setting_page():
return render_template("setting.html")
@app.route("/running")
def running_page():
return render_template("running.html")
# ========== 系统状态 API ==========
@app.route("/api/status")
def api_status():
"""获取系统整体状态"""
with gs.lock:
# 实际验证机械臂连接(尝试发送一个简单命令)
arm_connected = False
if gs.arm_client and gs.arm_client._sock:
try:
# 设置短超时尝试获取角度,验证连接是否有效
gs.arm_client._sock.settimeout(2)
ok, _ = gs.arm_client.get_angles()
arm_connected = ok
except:
arm_connected = False
# 连接已断开,清理 socket
if gs.arm_client:
gs.arm_client._sock = None
# 实际验证 AGV 连接
agv_connected = False
if gs.agv_controller:
agv_connected = gs.agv_controller.is_connected()
return jsonify({
"state": gs.state,
"agv_connected": agv_connected,
"arm_connected": arm_connected,
"camera_opened": gs.camera_opened,
"arm_camera_opened": gs.arm_camera_opened,
"map_loaded": bool(gs.map_config),
"points_count": len(gs.points_config),
"models_count": len(gs.models_config),
"mission_rows": gs.mission_config.get("rows", 0),
"mission_cols": gs.mission_config.get("cols", 0),
"machines_count": len(gs.machines_config)
})
@app.route("/api/system/connect", methods=["POST"])
def api_connect():
"""连接 AGV 和机械臂"""
results = {"agv": False, "arm": False, "camera": False, "errors": []}
# 连接 AGV
try:
gs.agv_controller = AGVController()
results["agv"] = gs.agv_controller.connect()
except Exception as e:
results["errors"].append(f"AGV: {e}")
# 连接机械臂
try:
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
results["arm"] = gs.arm_client.connect()
if results["arm"]:
# 尝试上电并激活
gs.arm_client.power_on()
time.sleep(0.5)
gs.arm_client.state_on()
except Exception as e:
results["errors"].append(f"机械臂: {e}")
# 打开摄像头
try:
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
results["camera"] = gs.qr_scanner.open()
gs.camera_opened = results["camera"]
except Exception as e:
results["errors"].append(f"摄像头: {e}")
# 检查机械臂摄像头
try:
import requests as _req
r2 = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=3)
gs.arm_camera_opened = (r2.status_code == 200)
r2.close()
except:
gs.arm_camera_opened = False
all_ok = results["agv"] and results["arm"] and results["camera"]
with gs.lock:
if all_ok:
gs.state = State.SETTING
else:
gs.state = State.IDLE
return jsonify(results)
@app.route("/api/system/disconnect", methods=["POST"])
def api_disconnect():
"""断开所有连接"""
if gs.arm_client:
gs.arm_client.close()
gs.arm_client = None
if gs.agv_controller:
gs.agv_controller.disconnect()
gs.agv_controller = None
if gs.qr_scanner:
gs.qr_scanner.close()
gs.qr_scanner = None
gs.camera_opened = False
gs.state = State.IDLE
return jsonify({"ok": True})
@app.route("/api/device/connect", methods=["POST"])
def api_device_connect():
"""连接单个设备,device: agv | arm | camera | arm_camera"""
data = request.json or {}
device = data.get("device", "")
result = {"device": device, "ok": False, "error": ""}
done_event = threading.Event()
res_holder = {}
def _do_connect():
try:
if device == "agv":
gs.agv_controller = AGVController()
res_holder["ok"] = gs.agv_controller.connect()
if not res_holder["ok"]:
res_holder["error"] = "AGV 连接失败,请检查网络或 ROS2"
elif device == "arm":
gs.arm_client = ArmClient(ARM_CONFIG["host"], ARM_CONFIG["port"])
res_holder["ok"] = gs.arm_client.connect()
if res_holder["ok"]:
gs.arm_client.power_on()
time.sleep(0.3)
gs.arm_client.state_on()
else:
res_holder["error"] = "机械臂连接失败"
elif device == "camera":
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
res_holder["ok"] = gs.qr_scanner.open()
gs.camera_opened = res_holder["ok"]
if not res_holder["ok"]:
res_holder["error"] = "AGV 摄像头打开失败"
elif device == "arm_camera":
import requests as _req
r = _req.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
res_holder["ok"] = (r.status_code == 200)
r.close()
gs.arm_camera_opened = res_holder["ok"]
if not res_holder["ok"]:
res_holder["error"] = "机械臂摄像头无响应"
else:
res_holder["error"] = f"未知设备: {device}"
except Exception as e:
res_holder["error"] = str(e)
finally:
done_event.set()
t = threading.Thread(target=_do_connect)
t.daemon = True
t.start()
# 最多等 10 秒,超时则返回失败
if not done_event.wait(10):
result["ok"] = False
result["error"] = "连接超时(10秒),设备无响应"
else:
result["ok"] = res_holder.get("ok", False)
result["error"] = res_holder.get("error", "")
return jsonify(result)
# ========== 地图配置 API ==========
@app.route("/api/map/load", methods=["POST"])
def api_map_load():
"""加载地图文件"""
data = request.json
map_dir = data.get("map_dir", MAP_CONFIG["map_dir"])
map_file = data.get("map_file", MAP_CONFIG["map_file"])
map_yaml = os.path.join(map_dir, map_file)
if not os.path.exists(map_yaml):
return jsonify({"ok": False, "error": f"地图文件不存在: {map_yaml}"}), 400
gs.map_config = {"map_dir": map_dir, "map_file": map_file, "map_yaml": map_yaml}
save_json("map_config.json", gs.map_config)
return jsonify({"ok": True, "map": gs.map_config})
@app.route("/api/map/save", methods=["POST"])
def api_map_save():
"""保存地图配置"""
data = request.json
gs.map_config = data
save_json("map_config.json", data)
return jsonify({"ok": True})
@app.route("/api/map/image")
def api_map_image():
"""返回地图图像(PNG)"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"error": "地图未加载"}), 404
map_yaml = gs.map_config["map_yaml"]
map_dir = os.path.dirname(map_yaml)
# 解析 YAML 获取 PGM 文件名
try:
import yaml
with open(map_yaml, 'r') as f:
meta = yaml.safe_load(f)
pgm_file = meta.get('image', 'map.pgm')
pgm_path = os.path.join(map_dir, pgm_file)
if not os.path.exists(pgm_path):
return jsonify({"error": f"PGM 文件不存在: {pgm_path}"}), 404
# 读取 PGM 并转换为 PNG
import cv2
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return jsonify({"error": "无法读取 PGM 文件"}), 500
# 反转颜色(PGM 中 0=占用,255=空闲,ROS 地图显示习惯)
img = 255 - img
# 编码为 PNG
_, buf = cv2.imencode('.png', img)
return Response(buf.tobytes(), mimetype='image/png')
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/map/meta")
def api_map_meta():
"""返回地图元数据(分辨率、原点、尺寸)"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"ok": False, "error": "地图未加载"}), 404
map_yaml = gs.map_config["map_yaml"]
map_dir = os.path.dirname(map_yaml)
try:
import yaml
with open(map_yaml, 'r') as f:
meta = yaml.safe_load(f)
pgm_file = meta.get('image', 'map.pgm')
pgm_path = os.path.join(map_dir, pgm_file)
import cv2
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
height, width = img.shape if img is not None else (0, 0)
return jsonify({
"ok": True,
"resolution": meta.get('resolution', 0.05),
"origin": meta.get('origin', [0, 0, 0]),
"width": width,
"height": height
})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
# ========== 地图导航 API ==========
@app.route("/api/navigate/to", methods=["POST"])
def api_navigate_to():
"""导航到目标坐标"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"ok": False, "error": "地图未加载,请先在设置中加载地图"}), 400
data = request.json
goal_x = data.get("x")
goal_y = data.get("y")
if goal_x is None or goal_y is None:
return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接,请先连接 AGV"}), 400
try:
if gs.navigator is None:
gs.navigator = Nav2Navigator()
# navigate_to_pose(x, y, yaw=None, timeout_sec=120, blocking=False)
ok = gs.navigator.navigate_to_pose(float(goal_x), float(goal_y), blocking=False)
if ok:
return jsonify({"ok": True, "message": "导航已启动"})
else:
return jsonify({"ok": False, "error": "导航启动失败,可能是Nav2未运行或AGV未连接"}), 400
except Exception as e:
logger.error(f"导航失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/navigate/stop", methods=["POST"])
def api_navigate_stop():
"""停止导航"""
if gs.navigator:
gs.navigator.stop()
return jsonify({"ok": True, "message": "导航已停止"})
return jsonify({"ok": False, "error": "导航器未初始化"}), 400
@app.route("/api/navigate/cancel", methods=["POST"])
def api_navigate_cancel():
"""取消当前导航(别名)"""
if gs.navigator:
gs.navigator.stop()
return jsonify({"ok": True, "message": "导航已取消"})
return jsonify({"ok": True, "message": "无活动导航"})
@app.route("/api/navigate/status", methods=["GET"])
def api_navigate_status():
"""获取导航状态"""
# 懒初始化 navigator
if gs.navigator is None:
try:
gs.navigator = Nav2Navigator()
except Exception as e:
logger.warning(f"Nav2Navigator 初始化失败: {e}")
if gs.navigator:
return jsonify(gs.navigator.get_status())
# navigator 仍为 None,说明 Nav2 不可用
return jsonify({"status": "idle", "current_position": [0, 0, 0], "nav2_available": False})
@app.route("/api/navigate/path", methods=["POST"])
def api_navigate_path():
"""预览路径(仅规划不执行)- Nav2版本不支持预计算路径,返回当前导航状态"""
if not gs.map_config or "map_yaml" not in gs.map_config:
return jsonify({"ok": False, "error": "地图未加载"}), 400
data = request.json
goal_x = data.get("x")
goal_y = data.get("y")
if goal_x is None or goal_y is None:
return jsonify({"ok": False, "error": "缺少目标坐标 x, y"}), 400
try:
if gs.navigator is None:
gs.navigator = Nav2Navigator()
# Nav2 不提供路径预览,直接返回可用状态
current = gs.navigator.get_current_position()
status = gs.navigator.get_status()
return jsonify({
"ok": True,
"message": "Nav2 路径预览不可用,请在 RViz 中查看规划路径",
"current_position": current,
"nav2_available": status.get("nav2_available", False)
})
except Exception as e:
logger.error(f"路径预览失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
# ========== 点位配置 API ==========
@app.route("/api/points/list", methods=["GET"])
def api_points_list():
"""获取所有点位"""
return jsonify({"points": gs.points_config})
@app.route("/api/points/add", methods=["POST"])
def api_points_add():
"""添加点位(从当前位置)"""
data = request.json
point_name = data.get("name", f"point_{len(gs.points_config) + 1}")
# 获取 AGV 当前位置
position = None
if gs.agv_controller and gs.agv_controller.is_connected():
position = gs.agv_controller.get_position()
point = {
"id": f"p_{int(time.time())}",
"name": point_name,
"coords": position or [0.0, 0.0, 0.0], # [x, y, yaw]
"photo_mode": data.get("photo_mode", "front"), # front / back / both
"sequence": data.get("sequence", ["front", "back"]), # both 时的执行顺序
}
gs.points_config.append(point)
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True, "point": point})
@app.route("/api/points/update/<point_id>", methods=["POST"])
def api_points_update(point_id):
"""更新点位"""
data = request.json
for i, p in enumerate(gs.points_config):
if p["id"] == point_id:
gs.points_config[i].update(data)
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "点位不存在"}), 404
@app.route("/api/points/delete/<point_id>", methods=["DELETE"])
def api_points_delete(point_id):
"""删除点位"""
gs.points_config = [p for p in gs.points_config if p["id"] != point_id]
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True})
@app.route("/api/points/save", methods=["POST"])
def api_points_save():
"""保存点位配置"""
data = request.json
gs.points_config = data.get("points", [])
save_json("points_config.json", gs.points_config)
return jsonify({"ok": True, "count": len(gs.points_config)})
@app.route("/api/points/load", methods=["GET"])
def api_points_load():
"""加载已保存的点位配置"""
loaded = load_json("points_config.json", [])
if loaded:
gs.points_config = loaded
return jsonify({"ok": True, "points": loaded or []})
# ========== 机型(姿态组)API ==========
@app.route("/api/models/list", methods=["GET"])
def api_models_list():
"""获取所有机型"""
return jsonify({"models": gs.models_config})
@app.route("/api/models/add", methods=["POST"])
def api_models_add():
"""添加机型"""
data = request.json
model_name = data.get("name", f"model_{len(gs.models_config) + 1}")
model = {
"id": f"m_{int(time.time())}",
"name": model_name,
"serial_prefix": data.get("serial_prefix", ""), # 二维码型号前缀
"poses": [], # 姿态列表
"description": data.get("description", ""),
"notes": data.get("notes", ""), # 备注
}
gs.models_config.append(model)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True, "model": model})
@app.route("/api/models/update/<model_id>", methods=["POST"])
def api_models_update(model_id):
"""更新机型"""
data = request.json
for i, m in enumerate(gs.models_config):
if m["id"] == model_id:
gs.models_config[i].update(data)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "机型不存在"}), 404
@app.route("/api/models/delete/<model_id>", methods=["DELETE"])
def api_models_delete(model_id):
"""删除机型"""
gs.models_config = [m for m in gs.models_config if m["id"] != model_id]
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
@app.route("/api/models/poses/add", methods=["POST"])
def api_poses_add():
"""添加姿态到机型(需指定 model_id)"""
data = request.json
model_id = data.get("model_id")
if not model_id:
return jsonify({"ok": False, "error": "缺少 model_id"}), 400
for m in gs.models_config:
if m["id"] == model_id:
pose = {
"id": f"pose_{int(time.time())}",
"name": data.get("name", f"姿态{len(m['poses']) + 1}"),
"photo_type": data.get("photo_type", "front"), # front / back / nameplate
"arm_angles": data.get("arm_angles", [0.0]*6),
"speed": data.get("speed", 500),
"description": data.get("description", ""),
}
m["poses"].append(pose)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True, "pose": pose})
return jsonify({"ok": False, "error": "机型不存在"}), 404
@app.route("/api/models/<model_id>/poses", methods=["GET"])
def api_model_poses_get(model_id):
"""获取机型的姿态列表"""
for m in gs.models_config:
if m["id"] == model_id:
return jsonify({"poses": m.get("poses", [])})
return jsonify({"poses": []})
@app.route("/api/models/<model_id>/poses/<pose_id>", methods=["PUT"])
def api_model_poses_update(model_id, pose_id):
"""更新姿态"""
data = request.json
for m in gs.models_config:
if m["id"] == model_id:
for i, pose in enumerate(m.get("poses", [])):
if pose["id"] == pose_id:
m["poses"][i].update(data)
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
return jsonify({"ok": False}), 404
@app.route("/api/models/<model_id>/poses/<pose_id>", methods=["DELETE"])
def api_model_poses_delete(model_id, pose_id):
"""删除姿态"""
for m in gs.models_config:
if m["id"] == model_id:
m["poses"] = [p for p in m.get("poses", []) if p["id"] != pose_id]
save_json("models_config.json", gs.models_config)
return jsonify({"ok": True})
return jsonify({"ok": False}), 404
# ========== 任务配置 APIM×N 网格)==========
@app.route("/api/mission/position", methods=["GET"])
def api_mission_position():
"""读取 AGV 当前坐标(供设置点位时使用)"""
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
pos = gs.agv_controller.get_position()
if not pos:
return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400
battery = gs.agv_controller.get_battery()
return jsonify({"ok": True, "position": pos, "battery": battery})
@app.route("/api/mission/init_pose", methods=["POST"])
def api_mission_init_pose():
"""将 AMCL 初始位置设为 (0,0,0),无需 RViz"""
try:
script = "/tmp/ros2_init_pose.sh"
lines = [
"#!/bin/bash",
"export ROS_DOMAIN_ID=0",
"source /opt/ros/humble/setup.bash",
"source ~/agv_pro_ros2/install/setup.bash",
"python3 /tmp/publish_init_pose.py",
]
with open(script, "w") as f:
f.write("\n".join(lines) + "\n")
os.chmod(script, 0o755)
result = subprocess.run(
[script],
capture_output=True, text=True, timeout=12,
env={**os.environ, "ROS_DOMAIN_ID": "0"}
)
logger.info(f"init_pose: rc={result.returncode}")
return jsonify({"ok": True, "message": "初始位置已设为 (0,0,0)"})
except Exception as e:
logger.error(f"初始化位置失败: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/mission/config", methods=["GET"])
def api_mission_config_get():
"""获取任务配置(网格尺寸和空位矩阵)"""
return jsonify({"ok": True, "config": gs.mission_config, "machines": gs.machines_config})
@app.route("/api/mission/config", methods=["POST"])
def api_mission_config_set():
""""设置任务配置(网格尺寸+空位矩阵)"""
data = request.json
rows = data.get("rows", 2)
cols = data.get("cols", 3)
grid = data.get("grid", [])
gs.mission_config["rows"] = rows
gs.mission_config["cols"] = cols
gs.mission_config["grid"] = grid
# 清除超出网格边界的 positions(只保留 front/back 且 row<rows, col<cols
gs.mission_config["positions"] = [
p for p in gs.mission_config.get("positions", [])
if p.get("row", 0) < rows and p.get("col", 0) < cols and p.get("side") in ("front", "back")
]
save_json("mission_config.json", gs.mission_config)
return jsonify({"ok": True, "config": gs.mission_config})
@app.route("/api/mission/machines", methods=["GET"])
def api_mission_machines_list():
"""获取所有机器配置"""
return jsonify({"ok": True, "machines": gs.machines_config})
@app.route("/api/mission/machines/<machine_id>", methods=["GET"])
def api_mission_machine_get(machine_id):
"""获取单台机器配置"""
for m in gs.machines_config:
if m["id"] == machine_id:
return jsonify({"ok": True, "machine": m})
return jsonify({"ok": False, "error": "机器不存在"}), 404
@app.route("/api/mission/machines", methods=["POST"])
def api_mission_machines_save():
""""批量保存/更新机器配置"""
data = request.json
machines = data.get("machines", [])
gs.machines_config = machines
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True, "count": len(gs.machines_config)})
@app.route("/api/mission/machines/add", methods=["POST"])
def api_mission_machine_add():
"""添加单台机器配置"""
data = request.json
machine_id = data.get("id", f"m_{data.get('row', 0)}_{data.get('col', 0)}")
# 检查是否已存在
for m in gs.machines_config:
if m["id"] == machine_id or (m.get("row") == data.get("row") and m.get("col") == data.get("col")):
return jsonify({"ok": False, "error": "该位置已有机器"}), 400
machine = {
"id": machine_id,
"row": data.get("row", 0),
"col": data.get("col", 0),
"front": data.get("front", {"coords": [0, 0, 0], "poses": []}),
"back": data.get("back", {"coords": [0, 0, 0], "poses": []}),
}
gs.machines_config.append(machine)
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True, "machine": machine})
@app.route("/api/mission/machines/<machine_id>", methods=["PUT"])
def api_mission_machine_update(machine_id):
"""更新单台机器配置(正面/背面点位+姿态)"""
data = request.json
for i, m in enumerate(gs.machines_config):
if m["id"] == machine_id:
gs.machines_config[i].update(data)
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "机器不存在"}), 404
@app.route("/api/mission/machines/<machine_id>", methods=["DELETE"])
def api_mission_machine_delete(machine_id):
"""删除单台机器配置"""
gs.machines_config = [m for m in gs.machines_config if m["id"] != machine_id]
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["GET"])
def api_mission_poses_get(machine_id, side):
"""获取机器指定侧的姿态列表(side: front | back"""
for m in gs.machines_config:
if m["id"] == machine_id:
return jsonify({"ok": True, "poses": m.get(side, {}).get("poses", [])})
return jsonify({"ok": False, "poses": []}), 404
@app.route("/api/mission/poses/<machine_id>/<side>", methods=["POST"])
def api_mission_poses_add(machine_id, side):
"""添加姿态到机器指定侧(side: front | back"""
data = request.json
for m in gs.machines_config:
if m["id"] == machine_id:
if side not in m:
return jsonify({"ok": False, "error": f"机器无此侧: {side}"}), 400
pose = {
"id": f"pose_{int(time.time())}",
"name": data.get("name", f"姿态"),
"arm_angles": data.get("arm_angles", [0.0]*6),
"speed": data.get("speed", 500),
"description": data.get("description", ""),
}
m[side]["poses"].append(pose)
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True, "pose": pose})
return jsonify({"ok": False, "error": "机器不存在"}), 404
@app.route("/api/mission/poses/<machine_id>/<side>/<pose_id>", methods=["DELETE"])
def api_mission_poses_delete(machine_id, side, pose_id):
"""删除机器指定侧的姿态"""
for m in gs.machines_config:
if m["id"] == machine_id:
if side not in m:
return jsonify({"ok": False}), 404
m[side]["poses"] = [p for p in m[side]["poses"] if p["id"] != pose_id]
save_json("machines_config.json", gs.machines_config)
return jsonify({"ok": True})
return jsonify({"ok": False}), 404
@app.route("/api/mission/generate_sequence", methods=["GET"])
def api_mission_generate_sequence():
"""根据网格配置和机器配置生成拍摄序列(蛇形)"""
rows = gs.mission_config.get("rows", 2)
cols = gs.mission_config.get("cols", 3)
grid = gs.mission_config.get("grid", [])
machines = gs.machines_config
def get_machine(row, col):
for m in machines:
if m.get("row") == row and m.get("col") == col:
return m
return None
# 蛇形序列:行0从左到右正面→右到左背面,行1从右到左背面→左到右正面...交替
sequence = []
for r in range(rows):
# 检查该行是否有机器
has_any = any(grid[r][c] for c in range(cols)) if r < len(grid) else False
if not has_any:
continue
if r % 2 == 0: # 偶数行:正面从左到右,背面从右到左
# 正面:从左到右
for c in range(cols):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
m = get_machine(r, c)
if m and m.get("front"):
sequence.append({
"machine_id": m["id"],
"row": r, "col": c,
"side": "front",
"row_dir": "lr", # 正面时该行的方向
"row_dir_back": "rl" # 背面时该行的方向
})
# 背面:从右到左
for c in range(cols - 1, -1, -1):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
m = get_machine(r, c)
if m and m.get("back"):
sequence.append({
"machine_id": m["id"],
"row": r, "col": c,
"side": "back",
"row_dir": "lr",
"row_dir_back": "rl"
})
else: # 奇数行:正面从右到左,背面从左到右(方向反转)
# 背面:从左到右(此行的背面在下一行的前面位置,但这里按用户描述:背面先行)
for c in range(cols):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
m = get_machine(r, c)
if m and m.get("back"):
sequence.append({
"machine_id": m["id"],
"row": r, "col": c,
"side": "back",
"row_dir": "rl",
"row_dir_back": "lr"
})
# 正面:从右到左
for c in range(cols - 1, -1, -1):
if r < len(grid) and c < len(grid[r]) and grid[r][c]:
m = get_machine(r, c)
if m and m.get("front"):
sequence.append({
"machine_id": m["id"],
"row": r, "col": c,
"side": "front",
"row_dir": "rl",
"row_dir_back": "lr"
})
return json
# ========== 点位配置 API(独立于机器)==========
@app.route("/api/mission/positions", methods=["GET"])
def api_mission_positions_list():
"""获取所有点位配置"""
positions = gs.mission_config.get("positions", [])
return jsonify({"ok": True, "positions": positions})
@app.route("/api/mission/positions", methods=["POST"])
def api_mission_positions_save():
"""保存点位配置(单个点位:创建或更新)"""
data = request.json
row = int(data.get("row", 0))
col = int(data.get("col", 0))
side = data.get("side", "front") # "front" | "back"
coords = data.get("coords", [0, 0, 0])
poses = data.get("poses", [])
positions = gs.mission_config.setdefault("positions", [])
# 查找是否已有该点位
key = (row, col, side)
for i, p in enumerate(positions):
if int(p.get("row", 0)) == row and int(p.get("col", 0)) == col and p.get("side") == side:
positions[i] = {"row": row, "col": col, "side": side, "coords": coords, "poses": poses}
save_json("mission_config.json", gs.mission_config)
return jsonify({"ok": True, "position": positions[i]})
positions.append({"row": row, "col": col, "side": side, "coords": coords, "poses": poses})
save_json("mission_config.json", gs.mission_config)
return jsonify({"ok": True, "position": positions[-1]})
# ========== 机械臂控制 API ==========
@app.route("/api/arm/get_angles", methods=["GET"])
def api_arm_get_angles():
"""获取当前关节角度"""
if not gs.arm_client:
return jsonify({"ok": False, "error": "未连接机械臂"}), 400
ok, angles = gs.arm_client.get_angles()
return jsonify({"ok": ok, "angles": angles})
@app.route("/api/arm/set_angles", methods=["POST"])
def api_arm_set_angles():
"""设置关节角度"""
data = request.json
angles = data.get("angles", [])
speed = data.get("speed", 500)
if not gs.arm_client:
return jsonify({"ok": False, "error": "未连接机械臂"}), 400
ok = gs.arm_client.set_angles(angles, speed)
return jsonify({"ok": ok})
@app.route("/api/arm/set_angle", methods=["POST"])
def api_arm_set_angle():
"""设置单个关节角度"""
data = request.json
joint = data.get("joint", "J1")
angle = data.get("angle", 0.0)
speed = data.get("speed", 500)
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.set_angle(joint, angle, speed)
return jsonify({"ok": ok})
@app.route("/api/arm/jog", methods=["POST"])
def api_arm_jog():
"""连续调节关节(用于方向键)"""
data = request.json
joint = data.get("joint", "J1")
direction = data.get("direction", 0) # -1 / 0 / 1
speed = data.get("speed", 500)
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.jog_angle(joint, direction, speed)
return jsonify({"ok": ok})
@app.route("/api/arm/get_coords", methods=["GET"])
def api_arm_get_coords():
"""获取当前坐标"""
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok, coords = gs.arm_client.get_coords()
return jsonify({"ok": ok, "coords": coords})
@app.route("/api/arm/state_check", methods=["GET"])
def api_arm_state_check():
"""检查机械臂状态"""
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.state_check()
return jsonify({"ok": ok, "running": not ok})
@app.route("/api/arm/power_on", methods=["POST"])
def api_arm_power_on():
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.power_on()
return jsonify({"ok": ok})
@app.route("/api/arm/state_on", methods=["POST"])
def api_arm_state_on():
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.state_on()
return jsonify({"ok": ok})
@app.route("/api/arm/state_off", methods=["POST"])
def api_arm_state_off():
if not gs.arm_client:
return jsonify({"ok": False}), 400
ok = gs.arm_client.state_off()
return jsonify({"ok": ok})
# ========== 摄像头预览 API ==========
@app.route("/api/camera/preview")
def api_camera_preview():
"""MJPEG 视频流"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return "camera not opened", 400
def gen():
while True:
frame = gs.qr_scanner.read_frame()
if frame is None:
break
# 编码为 JPEG
import cv2
ret, buf = cv2.imencode(".jpg", frame)
if ret:
yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" +
buf.tobytes() + b"\r\n")
return Response(gen(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/api/camera/refresh")
def api_camera_refresh():
"""AGV 摄像头单帧 JPEGpolling 模式)"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return "camera not opened", 400
import cv2
frame = gs.qr_scanner.read_frame()
if frame is None:
return "", 400
ret, buf = cv2.imencode(".jpg", frame)
if ret:
return Response(buf.tobytes(), mimetype="image/jpeg")
return "encode failed", 500
@app.route("/api/camera/capture")
def api_camera_capture():
"""拍摄一张照片"""
if not gs.qr_scanner or not gs.qr_scanner._cap:
return jsonify({"ok": False, "error": "摄像头未打开"}), 400
import cv2
frame = gs.qr_scanner.read_frame()
if frame is None:
return jsonify({"ok": False, "error": "读取帧失败"}), 400
photo_dir = os.path.join(DATA_DIR, "photos")
os.makedirs(photo_dir, exist_ok=True)
photo_path = os.path.join(photo_dir, f"capture_{int(time.time())}.jpg")
cv2.imwrite(photo_path, frame)
return jsonify({"ok": True, "path": photo_path})
@app.route("/api/camera/arm_refresh")
def api_arm_camera_refresh():
"""从机械臂拉一张 JPEG(流式读第一个完整帧,超时则降级)"""
import requests
try:
r = requests.get(ARM_CAMERA_CONFIG["url"], stream=True, timeout=5)
if r.status_code != 200:
return "", 404
data = b""
for chunk in r.iter_content(chunk_size=4096):
data += chunk
# 在累积数据中找 JPEG 完整帧
s = data.find(b"\xff\xd8")
e = data.find(b"\xff\xd9", s + 2) if s >= 0 else -1
if s >= 0 and e > s:
r.close()
return Response(data[s:e+2], mimetype="image/jpeg")
# 数据太长还没找到 JPEG 也直接返回(可能是空流)
if len(data) > 1024 * 1024:
r.close()
return "", 404
r.close()
return "", 404
except Exception as ex:
logger.error(f"arm_refresh 失败: {ex}")
return "", 404
@app.route("/api/camera/qr_scan", methods=["GET"])
def api_qr_scan():
"""扫描一次二维码"""
if not gs.qr_scanner:
return jsonify({"ok": False}), 400
result = gs.qr_scanner.scan_once()
return jsonify({"ok": bool(result), "data": result})
# ========== AGV 移动控制 API ==========
@app.route("/api/agv/move", methods=["POST"])
def api_agv_move():
"""控制 AGV 移动(前进/后退/左转/右转/停止)"""
data = request.json
direction = data.get("direction", "stop") # forward / backward / left / right / stop
speed = data.get("speed", AGV_CONFIG.get("move_speed", 0.5))
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
try:
if direction == "forward":
gs.agv_controller.move_forward(speed)
elif direction == "backward":
gs.agv_controller.move_backward(speed)
elif direction == "left":
gs.agv_controller.turn_left(speed)
elif direction == "right":
gs.agv_controller.turn_right(speed)
elif direction == "left_lateral":
gs.agv_controller.move_left_lateral(speed)
elif direction == "right_lateral":
gs.agv_controller.move_right_lateral(speed)
elif direction == "stop":
gs.agv_controller.stop()
else:
return jsonify({"ok": False, "error": "未知方向"}), 400
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/agv/position", methods=["GET"])
def api_agv_position():
"""获取 AGV 当前位置"""
if not gs.agv_controller or not gs.agv_controller.is_connected():
return jsonify({"ok": False, "error": "AGV 未连接"}), 400
pos = gs.agv_controller.get_position()
if not pos:
return jsonify({"ok": False, "error": "AGV 未发布位置数据(/odom 无数据),请检查 AGV 传感器是否正常"}), 400
battery = gs.agv_controller.get_battery()
return jsonify({"ok": True, "position": pos, "battery": battery})
@app.route("/api/agv/stop", methods=["POST"])
def api_agv_stop():
"""立即停止 AGV"""
if gs.agv_controller:
gs.agv_controller.stop()
return jsonify({"ok": True})
@app.route("/api/agv/reset", methods=["POST"])
def api_agv_reset():
"""撞物体后复位 - 停止运动并尝试重新上电"""
import time
if not gs.agv_controller:
return jsonify({"ok": False, "error": "AGV 控制器未初始化"}), 400
try:
# 1. 先停止运动
gs.agv_controller.stop()
time.sleep(0.5)
# 2. 检查 AGV 对象是否存在
agv = gs.agv_controller._agv
if not agv:
return jsonify({"ok": False, "error": "AGV 未连接,请重新连接"}), 400
# 3. 检查电源状态
power_on = agv.is_power_on()
if not power_on:
# 撞物体后可能自动断电保护,尝试重新上电
agv.power_on()
time.sleep(2)
power_on = agv.is_power_on()
if power_on:
gs.agv_controller._connected = True
return jsonify({"ok": True, "message": "复位成功,已重新上电"})
else:
return jsonify({"ok": False, "error": "上电失败,请检查 AGV 状态"}), 500
else:
# 电源正常,只需要停止
return jsonify({"ok": True, "message": "复位成功,AGV 已停止"})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
# ========== 任务执行 API ==========
@app.route("/api/mission/start", methods=["POST"])
def api_mission_start():
"""开始执行任务"""
if gs.state == State.RUNNING:
return jsonify({"ok": False, "error": "任务已在运行中"}), 400
data = request.json or {}
mission_data = {
"map": gs.map_config,
"points": gs.points_config,
}
def run():
from config import AGV_CONFIG, UPLOAD_CONFIG
executor_config = {
"device": AGV_CONFIG.get("device", "/dev/agvpro_controller"),
"baudrate": AGV_CONFIG.get("baudrate", 1000000),
"arm": ARM_CONFIG,
"upload_url": UPLOAD_CONFIG["url"],
"upload_timeout": UPLOAD_CONFIG["timeout"],
"upload_retries": UPLOAD_CONFIG["max_retries"],
"camera_index": 0,
}
executor = MissionExecutor(executor_config)
# 连接
conn_results = executor.connect_all()
if not conn_results.get("arm") or not conn_results.get("camera"):
gs.mission_report = {"error": "连接失败", "details": conn_results}
gs.state = State.IDLE
return
gs.state = State.RUNNING
report = executor.execute_mission(mission_data)
gs.mission_report = report
executor.disconnect_all()
gs.state = State.IDLE if report["failed"] == 0 else State.PAUSED
thread = threading.Thread(target=run, daemon=True)
thread.start()
return jsonify({"ok": True, "message": "任务已启动"})
@app.route("/api/mission/stop", methods=["POST"])
def api_mission_stop():
"""停止任务"""
if hasattr(MissionExecutor, "_instance"):
MissionExecutor._instance.stop()
gs.state = State.IDLE
return jsonify({"ok": True})
@app.route("/api/mission/pause", methods=["POST"])
def api_mission_pause():
gs.state = State.PAUSED
return jsonify({"ok": True})
@app.route("/api/mission/report", methods=["GET"])
def api_mission_report():
return jsonify({"report": gs.mission_report})
@app.route("/api/mission/state", methods=["GET"])
def api_mission_state():
return jsonify({"state": gs.state})
# ========== 静态资源 ==========
@app.route("/photos/<name>")
def photos(name):
return send_from_directory(os.path.join(DATA_DIR, "photos"), name)
# ========== 启动 ==========
if __name__ == "__main__":
logger.info("=" * 50)
logger.info("AGV 拍摄系统启动")
logger.info(f"监听 {SERVER_CONFIG['host']}:{SERVER_CONFIG['port']}")
logger.info("=" * 50)
# 启动时自动初始化 AGV 摄像头
gs.qr_scanner = QRScanner(CAMERA_CONFIG["device_index"])
gs.camera_opened = gs.qr_scanner.open()
logger.info(f"AGV 摄像头初始化: {'成功' if gs.camera_opened else '失败'}")
app.run(
host=SERVER_CONFIG["host"],
port=SERVER_CONFIG["port"],
debug=SERVER_CONFIG["debug"],
threaded=True
)