任务执行

This commit is contained in:
ywb
2026-05-23 10:02:19 +08:00
parent 35d5c3d70e
commit 6920403035
6 changed files with 883 additions and 420 deletions
+61 -24
View File
@@ -16,7 +16,7 @@ from utils.arm_client import ArmClient
from utils.agv_controller_ros2 import AGVController
from utils.qr_scanner import QRScanner
from utils.image_uploader import ImageUploader
from utils.mission_executor import MissionExecutor, TaskStatus
from utils.mission_executor import MissionExecutorV3
from utils.nav2_navigator import Nav2Navigator, Nav2Status
# 配置日志
@@ -1241,41 +1241,39 @@ def api_agv_reset():
# ========== 任务执行 API ==========
@app.route("/api/mission/start", methods=["POST"])
def api_mission_start():
"""开始执行任务"""
"""开始执行任务V3: M×N Grid 蛇形路径)"""
if gs.state == State.RUNNING:
return jsonify({"ok": False, "error": "任务已在运行中"}), 400
data = request.json or {}
mission_data = {
"map": gs.map_config,
"points": gs.points_config,
}
def run():
from config import AGV_CONFIG, UPLOAD_CONFIG
executor_config = {
from config import AGV_CONFIG
config = {
"device": AGV_CONFIG.get("device", "/dev/agvpro_controller"),
"baudrate": AGV_CONFIG.get("baudrate", 1000000),
"arm": ARM_CONFIG,
"upload_url": UPLOAD_CONFIG["url"],
"upload_timeout": UPLOAD_CONFIG["timeout"],
"upload_retries": UPLOAD_CONFIG["max_retries"],
"camera_index": 0,
}
executor = MissionExecutor(executor_config)
executor = MissionExecutorV3(config)
# 连接
conn_results = executor.connect_all()
if not conn_results.get("arm") or not conn_results.get("camera"):
gs.mission_report = {"error": "连接失败", "details": conn_results}
conn = executor.connect_all()
if not conn.get("agv") or not conn.get("arm"):
gs.mission_report = {"error": "连接失败", "details": conn}
gs.state = State.IDLE
return
gs.state = State.RUNNING
report = executor.execute_mission(mission_data)
machines_list = gs.machines_config if isinstance(gs.machines_config, list) else gs.machines_config.get("machines", [])
models_list = gs.models_config if isinstance(gs.models_config, list) else gs.models_config.get("models", [])
report = executor.execute_mission(
mission_config=gs.mission_config,
machines=machines_list,
qr_configs=gs.qr_config,
models=models_list,
)
gs.mission_report = report
executor.disconnect_all()
gs.state = State.IDLE if report["failed"] == 0 else State.PAUSED
gs.state = State.IDLE if report.get("error") is None else State.PAUSED
thread = threading.Thread(target=run, daemon=True)
thread.start()
@@ -1284,23 +1282,62 @@ def api_mission_start():
@app.route("/api/mission/stop", methods=["POST"])
def api_mission_stop():
"""停止任务"""
if hasattr(MissionExecutor, "_instance"):
MissionExecutor._instance.stop()
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.stop()
gs.state = State.IDLE
return jsonify({"ok": True})
@app.route("/api/mission/pause", methods=["POST"])
def api_mission_pause():
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.pause()
gs.state = State.PAUSED
return jsonify({"ok": True})
@app.route("/api/mission/resume", methods=["POST"])
def api_mission_resume():
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.resume()
gs.state = State.RUNNING
return jsonify({"ok": True})
@app.route("/api/mission/report", methods=["GET"])
def api_mission_report():
return jsonify({"report": gs.mission_report})
@app.route("/api/mission/state", methods=["GET"])
def api_mission_state():
return jsonify({"state": gs.state})
"""返回任务状态 + 执行器详情"""
result = {"state": gs.state}
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
ex = MissionExecutorV3._instance
result.update(ex.get_status())
else:
# 空闲时预生成任务列表(基于网格和机器配置)
mc = gs.mission_config
if mc:
result["tasks"] = MissionExecutorV3.pre_generate_tasks(mc)
return jsonify(result)
@app.route("/api/mission/log", methods=["GET"])
def api_mission_log():
"""返回实时日志"""
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
ex = MissionExecutorV3._instance
return jsonify(ex.get_logs())
return jsonify(gs.mission_report or {"log": []})
@app.route("/api/mission/manual-qr", methods=["POST"])
def api_mission_manual_qr():
"""手动输入二维码值"""
data = request.json or {}
qr = data.get("qr", "").strip()
if not qr:
return jsonify({"ok": False, "error": "二维码不能为空"}), 400
if hasattr(MissionExecutorV3, "_instance") and MissionExecutorV3._instance:
MissionExecutorV3._instance.set_manual_qr(qr)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "没有运行中的任务"}), 400
# ========== 二维码配置 API ==========
+138
View File
@@ -774,3 +774,141 @@ a:hover { text-decoration: underline; }
color: #666;
font-size: 12px;
}
/* ========== 实时日志 ========== */
.log-box {
background: #0a0a0a;
color: #00ff88;
font-family: 'Courier New', 'Menlo', monospace;
font-size: 13px;
line-height: 1.6;
max-height: 320px;
overflow-y: auto;
padding: 12px 16px;
border-radius: 6px;
margin-top: 8px;
border: 1px solid #1a1a1a;
}
.log-line {
padding: 2px 0;
border-bottom: 1px solid #111;
word-break: break-all;
}
.log-empty {
color: #555;
font-style: italic;
padding: 12px 0;
}
/* ========== 弹窗 ========== */
.modal-overlay {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.75);
display: flex;
align-items: center;
justify-content: center;
z-index: 9999;
}
.modal {
background: #1a1a2e;
padding: 28px 32px;
border-radius: 12px;
min-width: 400px;
max-width: 90%;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.5);
}
.modal h3 {
margin: 0 0 12px 0;
color: #e0e0e0;
}
.modal p {
color: #aaa;
margin: 0 0 16px 0;
}
.modal input[type="text"] {
width: 100%;
padding: 10px 12px;
background: #0a0a0a;
border: 1px solid #333;
border-radius: 6px;
color: #e0e0e0;
font-size: 15px;
outline: none;
box-sizing: border-box;
}
.modal input[type="text"]:focus {
border-color: #409eff;
}
.modal-actions {
display: flex;
gap: 12px;
margin-top: 20px;
}
.modal-actions .btn {
flex: 1;
}
/* ========== 任务清单 ========== */
.task-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(130px, 1fr));
gap: 10px;
margin-top: 10px;
}
.task-cell {
background: #0a0a0a;
border: 1px solid #1a1a1a;
border-radius: 8px;
padding: 12px;
text-align: center;
transition: all 0.3s;
}
.task-cell.task-active {
border-color: #409eff;
background: #0d1b2a;
}
.task-cell.task-completed {
border-color: #4caf50;
opacity: 0.7;
}
.task-cell.task-active .task-step-text {
color: #409eff;
font-weight: bold;
}
.task-pos {
font-size: 16px;
font-weight: bold;
color: #e0e0e0;
margin-bottom: 6px;
}
.task-status-icon {
font-size: 20px;
margin-bottom: 4px;
}
.task-step-text {
font-size: 12px;
color: #888;
margin-bottom: 4px;
}
.task-info {
font-size: 11px;
color: #666;
}
.task-qr {
font-family: monospace;
color: #aaa;
}
.task-photos {
color: #888;
}
.pulse-icon {
animation: taskPulse 1s infinite;
}
@keyframes taskPulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.3; }
}
+73 -23
View File
@@ -7,22 +7,27 @@ createApp({
data() {
return {
missionState: 'idle',
currentPoint: 0,
totalPoints: 0,
progress: 0,
tasks: [],
report: null,
previewUrl: API + '/api/camera/preview',
polling: null
polling: null,
logs: [],
showQrModal: false,
qrValue: '',
}
},
computed: {
missionStateText() {
const map = { idle: '空闲', running: '任务运行中', paused: '已暂停', completed: '已完成' }
const map = {
idle: '空闲',
running: '任务运行中',
paused: '已暂停',
completed: '已完成',
waiting_qr: '等待输入二维码'
}
return map[this.missionState] || '未知'
},
progressPercent() {
if (!this.totalPoints) return 0
return Math.round((this.currentPoint / this.totalPoints) * 100)
}
},
mounted() {
this.poll()
@@ -33,35 +38,55 @@ createApp({
methods: {
poll() {
this.refresh()
this.polling = setInterval(this.refresh, 2000)
this.pollLogs()
this.polling = setInterval(() => {
this.refresh()
this.pollLogs()
}, 2000)
},
async refresh() {
try {
const res = await fetch(API + '/api/mission/state')
const data = await res.json()
this.missionState = data.state || 'idle'
this.missionState = data.status || 'idle'
this.progress = data.progress || 0
if (data.tasks) this.tasks = data.tasks
if (this.missionState === 'running') {
// QR 弹窗
if (this.missionState === 'waiting_qr' && !this.showQrModal) {
this.showQrModal = true
this.qrValue = ''
}
// 完成后获取报告
if (this.missionState === 'idle' && this.tasks.length > 0) {
const reportRes = await fetch(API + '/api/mission/report')
const reportData = await reportRes.json()
if (reportData.report) {
this.totalPoints = reportData.report.total_points || 0
this.currentPoint = reportData.report.details?.length || 0
this.report = reportData.report
}
} else if (this.missionState === 'idle') {
const reportRes = await fetch(API + '/api/mission/report')
const reportData = await reportRes.json()
if (reportData.report) {
this.report = reportData.report
this.totalPoints = reportData.report.total_points || 0
this.currentPoint = reportData.report.details?.length || 0
}
}
} catch (e) {}
},
async pollLogs() {
if (this.missionState !== 'running' && this.missionState !== 'waiting_qr') return
try {
const res = await fetch(API + '/api/mission/log')
const data = await res.json()
if (data.log) this.logs = data.log
if (data.progress != null) this.progress = data.progress
if (data.tasks) this.tasks = data.tasks
// 自动滚到底
this.$nextTick(() => {
const box = this.$refs.logBox
if (box) box.scrollTop = box.scrollHeight
})
} catch (e) {}
},
async startMission() {
if (this.missionState !== 'idle') return
this.logs = []
this.progress = 0
this.report = null
this.showQrModal = false
await fetch(API + '/api/mission/start', { method: 'POST' })
this.missionState = 'running'
},
@@ -69,9 +94,34 @@ createApp({
await fetch(API + '/api/mission/pause', { method: 'POST' })
this.missionState = 'paused'
},
async resumeMission() {
await fetch(API + '/api/mission/resume', { method: 'POST' })
this.missionState = 'running'
this.showQrModal = false
},
async stopMission() {
await fetch(API + '/api/mission/stop', { method: 'POST' })
this.missionState = 'idle'
this.showQrModal = false
},
async submitQr() {
const val = this.qrValue.trim()
await fetch(API + '/api/mission/manual-qr', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ qr: val || ' ' })
})
this.showQrModal = false
this.qrValue = ''
},
cancelQr() {
this.showQrModal = false
this.qrValue = ''
fetch(API + '/api/mission/manual-qr', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ qr: 'SKIP' })
})
},
onPreviewError(e) {
e.target.style.display = 'none'
+55 -15
View File
@@ -25,10 +25,10 @@
<span class="pulse"></span>
[[ missionStateText ]]
</div>
<div class="running-progress" v-if="missionState === 'running'">
<span>点位 [[ currentPoint + 1 ]] / [[ totalPoints ]]</span>
<div class="running-progress" v-if="missionState === 'running' || missionState === 'waiting_qr'">
<span>进度 [[ Math.round(progress) ]]%</span>
<div class="progress-bar">
<div class="progress-fill" :style="{width: progressPercent + '%'}"></div>
<div class="progress-fill" :style="{width: progress + '%'}"></div>
</div>
</div>
</div>
@@ -39,12 +39,49 @@
<button class="btn btn-warning btn-large" @click="pauseMission" :disabled="missionState !== 'running'">
⏸️ 暂停
</button>
<button class="btn btn-primary btn-large" @click="resumeMission" :disabled="missionState !== 'paused'">
▶️ 继续
</button>
<button class="btn btn-error btn-large" @click="stopMission" :disabled="missionState === 'idle'">
⏹️ 停止
</button>
</div>
</section>
<!-- 任务清单 -->
<section class="card" v-if="tasks.length > 0">
<h2>📋 任务清单 ([[ tasks.length ]] 台机器)</h2>
<div class="task-grid">
<div v-for="task in tasks" :key="task.machine_id"
class="task-cell" :class="'task-' + task.status"
:title="task.step">
<div class="task-pos">[[ task.label ]]</div>
<div class="task-status-icon">
<span v-if="task.status === 'pending'"></span>
<span v-else-if="task.status === 'active'" class="pulse-icon">🔄</span>
<span v-else-if="task.status === 'completed'"></span>
<span v-else></span>
</div>
<div class="task-step-text">[[ task.step ]]</div>
<div class="task-info">
<div v-if="task.qr_value" class="task-qr">🏷 [[ task.qr_value.substring(0,8) ]]</div>
<div class="task-photos" v-if="task.photos_front || task.photos_back">
📷 [[ task.photos_front ]]正 [[ task.photos_back ]]背
</div>
</div>
</div>
</div>
</section>
<!-- 实时日志 -->
<section class="card" v-if="missionState === 'running' || missionState === 'waiting_qr'">
<h2>📜 实时日志</h2>
<div class="log-box" ref="logBox">
<div v-for="(log, i) in logs" :key="i" class="log-line">[[ log ]]</div>
<div v-if="logs.length === 0" class="log-empty">等待任务开始...</div>
</div>
</section>
<!-- 实时预览 -->
<section class="card">
<h2>📷 摄像头预览</h2>
@@ -55,24 +92,27 @@
<!-- 任务报告 -->
<section class="card" v-if="report">
<h2>📋 任务报告</h2>
<h2>📊 任务报告</h2>
<div class="report-summary">
<div class="stat ok">✅ 完成: [[ report.completed ]]</div>
<div class="stat error">❌ 失败: [[ report.failed ]]</div>
<div class="stat">总计: [[ report.total_points ]]</div>
</div>
<div class="report-details">
<div v-for="(detail, i) in report.details" :key="i" class="report-item">
<div class="report-point">
<span class="report-status" :class="detail.status">[[ detail.status === 'completed' ? '✅' : '❌' ]]</span>
[[ detail.point_name ]]
</div>
<div v-for="(pose, pi) in detail.poses" :key="pi" class="report-pose">
[[ pose.photo_type ]] - [[ pose.status ]]
</div>
</div>
</div>
</section>
<!-- 手动输入二维码弹窗 -->
<div class="modal-overlay" v-if="showQrModal">
<div class="modal">
<h3>⌨️ 手动输入二维码</h3>
<p>所有姿态均未识别到二维码,请手动输入:</p>
<input type="text" v-model="qrValue" placeholder="输入二维码内容" autofocus @keyup.enter="submitQr">
<div class="modal-actions">
<button class="btn btn-primary" @click="submitQr">确认</button>
<button class="btn" @click="cancelQr">跳过</button>
</div>
</div>
</div>
</main>
</div>
-113
View File
@@ -348,119 +348,6 @@
</div>
</section>
<!-- 中:选中机器的配置 -->
<section class="card" v-if="selectedMachine && selectedMachine.front && selectedMachine.back" style="margin-top:16px">
<h2>② 点位配置 — 第{% raw %}{{ selectedMachine.row+1 }}{% endraw %}行 第{% raw %}{{ selectedMachine.col+1 }}{% endraw %}列 <button class="btn btn-small" @click="clearSelection()">← 返回</button></h2>
<!-- 正面点位 -->
<div class="machine-form">
<h3>📷 正面点位</h3>
<div class="form-row">
<div class="form-group">
<label>X 坐标</label>
<input type="number" step="0.01" v-model.number="selectedMachine.front.coords[0]" placeholder="0.00">
</div>
<div class="form-group">
<label>Y 坐标</label>
<input type="number" step="0.01" v-model.number="selectedMachine.front.coords[1]" placeholder="0.00">
</div>
<div class="form-group">
<label>Yaw (弧度)</label>
<input type="number" step="0.01" v-model.number="selectedMachine.front.coords[2]" placeholder="0.00">
</div>
<div class="form-group" style="align-self:end">
<button class="btn btn-small btn-primary" @click="readPosition('front')" :disabled="!agvConnected">📍 读取当前位置</button>
</div>
</div>
<!-- 正面姿态列表 -->
<div v-if="selectedMachine.front.poses && selectedMachine.front.poses.length > 0" class="pose-list">
<h4>正面姿态 ({% raw %}{{ selectedMachine.front.poses.length }}{% endraw %} 个)</h4>
<div v-for="pose in selectedMachine.front.poses" :key="pose.id" class="pose-item">
<span class="pose-name">{% raw %}{{ pose.name }}{% endraw %}</span>
<span class="pose-angles" v-if="pose.arm_angles">角度: {% raw %}{{ formatAngles(pose.arm_angles) }}{% endraw %}</span>
<button class="btn-icon" @click="deletePose(selectedMachine.id, 'front', pose.id)">🗑️</button>
</div>
</div>
<div class="pose-add">
<input type="text" v-model="poseForm.name" placeholder="姿态名称(如:正面全景)">
<button class="btn btn-small btn-success" @click="addPoseToMachine(selectedMachine.id, 'front')"> 添加姿态</button>
</div>
</div>
<!-- 背面点位 -->
<div class="machine-form" style="margin-top:16px">
<h3>📷 背面点位</h3>
<div class="form-row">
<div class="form-group">
<label>X 坐标</label>
<input type="number" step="0.01" v-model.number="selectedMachine.back.coords[0]" placeholder="0.00">
</div>
<div class="form-group">
<label>Y 坐标</label>
<input type="number" step="0.01" v-model.number="selectedMachine.back.coords[1]" placeholder="0.00">
</div>
<div class="form-group">
<label>Yaw (弧度)</label>
<input type="number" step="0.01" v-model.number="selectedMachine.back.coords[2]" placeholder="0.00">
</div>
<div class="form-group" style="align-self:end">
<button class="btn btn-small btn-primary" @click="readPosition('back')" :disabled="!agvConnected">📍 读取当前位置</button>
</div>
</div>
<!-- 背面姿态列表 -->
<div v-if="selectedMachine.back.poses && selectedMachine.back.poses.length > 0" class="pose-list">
<h4>背面姿态 ({% raw %}{{ selectedMachine.back.poses.length }}{% endraw %} 个)</h4>
<div v-for="pose in selectedMachine.back.poses" :key="pose.id" class="pose-item">
<span class="pose-name">{% raw %}{{ pose.name }}{% endraw %}</span>
<span class="pose-angles" v-if="pose.arm_angles">角度: {% raw %}{{ formatAngles(pose.arm_angles) }}{% endraw %}</span>
<button class="btn-icon" @click="deletePose(selectedMachine.id, 'back', pose.id)">🗑️</button>
</div>
</div>
<div class="pose-add">
<input type="text" v-model="poseForm.name" placeholder="姿态名称(如:背面细节)">
<button class="btn btn-small btn-success" @click="addPoseToMachine(selectedMachine.id, 'back')"> 添加姿态</button>
</div>
</div>
<!-- 二维码位置 -->
<div class="machine-form" style="margin-top:16px" v-if="hasQr">
<h3>🔍 二维码位置</h3>
<div class="form-row">
<div class="form-group">
<label>X 坐标</label>
<input type="number" step="0.01" :value="safeQrCoord(0)" @input="setQrCoord(0, Number($event.target.value))" placeholder="0.00">
</div>
<div class="form-group">
<label>Y 坐标</label>
<input type="number" step="0.01" :value="safeQrCoord(1)" @input="setQrCoord(1, Number($event.target.value))" placeholder="0.00">
</div>
<div class="form-group">
<label>Yaw (弧度)</label>
<input type="number" step="0.01" :value="safeQrCoord(2)" @input="setQrCoord(2, Number($event.target.value))" placeholder="0.00">
</div>
<div class="form-group" style="align-self:end">
<button class="btn btn-small btn-primary" @click="readQRPosition" :disabled="!agvConnected">📍 读取当前位置</button>
</div>
</div>
<!-- QR 扫描结果 -->
<div v-if="hasQrValue" style="margin-top:8px;padding:8px;background:#0f1923;border-radius:6px">
<span style="font-size:13px">📱 二维码值: <strong>{% raw %}{{ safeQr('qr_value') }}{% endraw %}</strong></span>
<span v-if="hasQrModelId" style="margin-left:12px;font-size:13px">🏷️ 匹配机型: <strong>{% raw %}{{ safeQrModelName() }}{% endraw %}</strong></span>
<span v-else style="margin-left:12px;font-size:13px;color:#ff9800">⚠️ 未匹配到机型</span>
</div>
<div class="btn-row" style="margin-top:8px">
<button class="btn btn-small btn-success" @click="scanQRCode(selectedMachine.id)" :disabled="!cameraOpened || qrScanning">
{% raw %}{{ qrScanning ? '扫描中...' : '📷 扫描二维码' }}{% endraw %}
</button>
</div>
</div>
<div class="btn-row" style="margin-top:16px">
<button class="btn btn-danger" @click="deleteMachine(selectedMachine.id)">🗑️ 删除此机器</button>
<button class="btn btn-secondary" @click="saveMachineCoords">💾 保存此机器配置</button>
</div>
</section>
<!-- 下:序列预览 -->
<section class="card" v-if="sequence && sequence.length > 0" style="margin-top:16px">
<h2>③ 🐍 蛇形拍摄序列预览</h2>
+547 -236
View File
@@ -1,245 +1,615 @@
# -*- coding: utf-8 -*-
"""
任务调度器 - 管理拍摄任务的执行
任务执行器 V3 — M×N 网格蛇形路径拍摄
工作流:
1. 根据 grid 生成蛇形路径(奇数行左→右,偶数行右→左)
2. 逐台机器:
- 正面:导航 → 扫码(多姿态重试) → 查机型 → 按姿态拍照
- 背面:导航 → 按姿态拍照
3. 回到 (0,0)
"""
import os
import json
import time
import json
import logging
import threading
import subprocess
import math
from typing import List, Dict, Optional
from typing import Optional, Dict, List
from enum import Enum
from .arm_client import ArmClient
from .agv_controller_ros2 import AGVController
from .qr_scanner import QRScanner
from .image_uploader import ImageUploader
import requests
import cv2
import numpy as np
logger = logging.getLogger(__name__)
# ROS2 环境设置(与 agv_controller_ros2.py 保持一致)
ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash"
ARM_CAMERA_SNAPSHOT = "http://192.168.110.164:5003/api/camera/snapshot"
PHOTOS_DIR = "/home/elephant/photos"
# 二维码扫描重试参数
QR_SCAN_TIMEOUT = 5 # 单次扫描超时
QR_POSE_WAIT = 1.5 # 调整姿态后等待时间
MANUAL_QR_TIMEOUT = 300 # 5分钟超时
class TaskStatus(Enum):
PENDING = "pending"
class MissionStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
COMPLETED = "completed"
WAITING_QR = "waiting_qr"
class MissionExecutor:
"""任务执行器 - 负责按顺序执行点位拍摄任务"""
class MissionExecutorV3:
"""任务执行器 V3 — M×N 网格蛇形路径"""
_instance = None # 单例,供外部停止用
def __init__(self, config: dict):
self.config = config
self.status = TaskStatus.PENDING
self.current_point_index = 0
self.current_pose_index = 0
self.snapshot_serial_map = {} # {point_id: serial_number} 缓存已扫描的 serialNumber
self.status = MissionStatus.IDLE
MissionExecutorV3._instance = self
# 初始化各模块
# 实时状态报告
self.report = {
"status": "idle",
"step": "",
"progress": 0,
"total": 0,
"log": [],
"error": None,
}
# 线程同步
self._stop = threading.Event()
self._pause = threading.Event()
self._pause.set() # 初始不暂停
self._qr_event = threading.Event()
self._qr_value: Optional[str] = None
# 设备
from .arm_client import ArmClient
self.arm_client = ArmClient(
config["arm"]["host"],
config["arm"]["port"]
)
from .agv_controller_ros2 import AGVController
self.agv = AGVController(
device=config.get("device", "/dev/agvpro_controller"),
baudrate=config.get("baudrate", 1000000)
)
self.arm_client: Optional[ArmClient] = None
self.uploader = ImageUploader(
upload_url=config["upload_url"],
timeout=config.get("upload_timeout", 30),
max_retries=config.get("upload_retries", 3)
)
self.qr_scanner = QRScanner(device_index=config.get("camera_index", 0))
# ========== 连接管理 ==========
# ==================== 连接 ====================
def connect_all(self) -> Dict[str, bool]:
"""连接 AGV、机械臂、摄像头"""
results = {}
# 连接 AGV
results["agv"] = self.agv.connect()
# 连接机械臂(通过 TCP
arm_cfg = self.config["arm"]
self.arm_client = ArmClient(arm_cfg["host"], arm_cfg["port"])
results["arm"] = self.arm_client.connect()
# 打开摄像头
results["camera"] = self.qr_scanner.open()
return results
def disconnect_all(self):
"""断开所有连接"""
if self.arm_client:
self.arm_client.close()
self.agv.disconnect()
self.qr_scanner.close()
# ========== 任务执行 ==========
# ==================== 主任务流程 ====================
def execute_mission(self, mission_data: dict) -> dict:
def execute_mission(
self,
mission_config: dict,
machines: list,
qr_configs: list,
models: list,
) -> dict:
"""
执行一个完整任务(一个地图的所有点位)
mission_data: 包含点位列表的完整任务配置
返回执行报告
"""
self.status = TaskStatus.RUNNING
report = {
"total_points": len(mission_data.get("points", [])),
"completed": 0,
"failed": 0,
"details": []
}
执行完整拍摄任务。
Args:
mission_config: {rows, cols, grid, positions}
machines: [{id, row, col, front: {coords}, back: {coords}}]
qr_configs: [{id, name, joint_angles: [a1..a6]}]
models: [{id, name, poses: [{name, arm_angles}], poses_back: [...]}]
Returns:
执行报告 dict
"""
self.status = MissionStatus.RUNNING
self.report = {"status": "running", "step": "初始化", "progress": 0, "total": 0, "log": [], "error": None}
self._stop.clear()
self._pause.set()
start_time = time.time()
points = mission_data.get("points", [])
for i, point in enumerate(points):
self.current_point_index = i
try:
result = self._execute_point(point)
report["details"].append(result)
if result["status"] == "completed":
report["completed"] += 1
rows = int(mission_config.get("rows", 1))
cols = int(mission_config.get("cols", 1))
grid = mission_config.get("grid", [])
positions = mission_config.get("positions", [])
# 1. 生成蛇形路径
path = MissionExecutorV3._build_snake_path(rows, cols, grid)
if not path:
self._log("❌ 网格中没有机器,任务终止")
self.report["error"] = "No machines in grid"
return self._finish(0)
self.report["total"] = len(path)
self._log(f"📍 蛇形路径生成: {len(path)} 台机器")
# 初始化任务列表
self.report["tasks"] = [{
"row": r, "col": c,
"machine_id": f"m_{r}_{c}",
"label": f"{r+1}-{c+1}",
"status": "pending",
"step": "等待",
"qr_value": None,
"photos_front": 0,
"photos_back": 0,
} for (r, c) in path]
# 初始化任务列表
self.report["tasks"] = [{
"row": r, "col": c,
"machine_id": f"m_{r}_{c}",
"label": f"{r+1}-{c+1}",
"status": "pending",
"step": "等待",
"qr_value": None,
"photos_front": 0,
"photos_back": 0,
} for (r, c) in path]
# 2. 逐台执行
for idx, (r, c) in enumerate(path):
if self._stop.is_set():
self._log("⏹️ 任务已停止")
break
self._wait_pause()
# 更新任务状态 → 正面开始
task = self._get_task(r, c)
if task:
task["status"] = "active"
task["step"] = "正面扫码"
machine_id = f"m_{r}_{c}"
machine = next((m for m in machines if m.get("id") == machine_id), None)
if not machine:
self._log(f"⚠️ 机器 {r+1}-{c+1} 不存在,跳过")
continue
rl, cl = r + 1, c + 1 # 显示用的 1-based
# --- 正面 ---
self._log(f"📍 机器 {rl}-{cl} 进入正面点位")
self._step(f"机器 {rl}-{cl} 正面")
# 导航到正面点位
front_pt = self._find_point(positions, r, c, "front")
if front_pt and self._has_coords(front_pt):
if not self._navigate(front_pt, "正面"):
self._log(f"⚠️ 导航失败,尝试继续")
else:
report["failed"] += 1
self._log(f"⚠️ 无正面点位坐标")
# 扫描二维码
qr_value = self._scan_qr_with_poses(qr_configs)
if self._stop.is_set():
break
# 查机型 + 更新任务步骤
model_name = self._lookup_model(qr_value)
self._log(f" 🏷️ 机型: {model_name}")
if task and qr_value:
task["qr_value"] = qr_value
if task:
task["step"] = "正面拍照"
# 正面拍照
model = self._find_model(models, model_name)
if model:
self._shoot(model, "front", rl, cl, qr_value or "unknown")
else:
self._log(f" ⚠️ 未找到机型 {model_name}")
self._progress(idx, 1)
# --- 背面 ---
if task:
task["step"] = "背面拍照"
self._log(f"📍 机器 {rl}-{cl} 进入背面点位")
self._step(f"机器 {rl}-{cl} 背面")
# 导航到背面点位
back_pt = self._find_point(positions, r + 1, c, "back")
if back_pt and self._has_coords(back_pt):
if not self._navigate(back_pt, "背面"):
self._log(f"⚠️ 导航失败,尝试继续")
else:
self._log(f"⚠️ 无背面点位坐标")
# 背面拍照
if model:
self._shoot(model, "back", rl, cl, qr_value or "unknown")
# 标记任务完成
if task:
task["status"] = "completed"
task["step"] = "完成"
self._progress(idx, 2)
# 3. 回到出发点
if not self._stop.is_set():
self._step("返回出发点")
self._log("→ 返回 (0, 0)")
self._nav2_go_to_point(0, 0, 0, timeout_sec=60)
elapsed = time.time() - start_time
return self._finish(elapsed)
except Exception as e:
logger.error(f"点位 {i} 执行异常: {e}")
report["failed"] += 1
report["details"].append({
"point_index": i,
"point_name": point.get("name", f"point_{i}"),
"status": "failed",
"error": str(e)
self._log(f"❌ 任务异常: {e}")
logger.exception("execute_mission 崩溃")
self.report["error"] = str(e)
self.status = MissionStatus.IDLE
self.report["status"] = "idle"
return self.report
def _finish(self, elapsed: float) -> dict:
if self._stop.is_set():
self._step("已停止")
else:
self._step("完成")
self._log(f"✅ 任务完成 ({elapsed:.0f}s)")
self.report["progress"] = 100
self.status = MissionStatus.IDLE
self.report["status"] = "idle"
return self.report
# ==================== 蛇形路径 ====================
@staticmethod
def _build_snake_path(rows: int, cols: int, grid: list) -> list:
"""奇数行(0,2,4...)左→右,偶数行(1,3,5...)右→左"""
path = []
for r in range(rows):
if r % 2 == 0:
for c in range(cols):
if MissionExecutorV3._has_machine(grid, r, c):
path.append((r, c))
else:
for c in range(cols - 1, -1, -1):
if MissionExecutorV3._has_machine(grid, r, c):
path.append((r, c))
return path
@staticmethod
def _has_machine(grid: list, r: int, c: int) -> bool:
if not grid or r >= len(grid):
return False
row = grid[r]
if isinstance(row, list):
return c < len(row) and bool(row[c])
return False
@staticmethod
def _build_grid_from_machines(rows: int, cols: int, machines: list) -> list:
"""从机器列表重建 grid 矩阵"""
if not machines:
return [[False] * cols for _ in range(rows)]
max_r = max(int(m.get("row", 0)) for m in machines) + 1
max_c = max(int(m.get("col", 0)) for m in machines) + 1
gr = max(rows, max_r)
gc = max(cols, max_c)
grid = [[False] * gc for _ in range(gr)]
for m in machines:
r = int(m.get("row", 0))
c = int(m.get("col", 0))
if 0 <= r < gr and 0 <= c < gc:
grid[r][c] = True
return grid
@staticmethod
def pre_generate_tasks(mission_config: dict) -> list:
"""从网格配置预生成任务列表(用于 UI 展示,无需启动执行器)"""
rows = int(mission_config.get("rows", 1))
cols = int(mission_config.get("cols", 1))
grid = mission_config.get("grid", [])
# 如果 grid 为空但从 machines 重建
if not grid and machines:
grid = MissionExecutorV3._build_grid_from_machines(rows, cols, machines)
if grid:
rows = len(grid)
cols = len(grid[0]) if grid else cols
path = MissionExecutorV3._build_snake_path(rows, cols, grid)
tasks = []
for (r, c) in path:
tasks.append({
"row": r, "col": c,
"machine_id": f"m_{r}_{c}",
"label": f"{r+1}-{c+1}",
"status": "pending",
"step": "等待",
"qr_value": None,
"photos_front": 0,
"photos_back": 0,
})
return tasks
self.status = TaskStatus.COMPLETED if report["failed"] == 0 else TaskStatus.PAUSED
return report
# ==================== 点位查找 ====================
def _execute_point(self, point: dict) -> dict:
"""执行单个点位的拍摄"""
point_name = point.get("name", "unknown")
logger.info(f"开始执行点位: {point_name}")
@staticmethod
def _find_point(positions: list, row: int, col: int, side: str) -> Optional[dict]:
for p in positions:
if p.get("row") == row and p.get("col") == col and p.get("side") == side:
return p
return None
result = {
"point_name": point_name,
"poses": []
}
@staticmethod
def _has_coords(point: dict) -> bool:
coords = point.get("coords", [])
return len(coords) >= 2 and (coords[0] != 0 or coords[1] != 0)
# 1. AGV 移动到点位(Nav2 导航)
coords = point.get("coords", {})
x, y = coords.get("x", 0), coords.get("y", 0)
rz = coords.get("rz", 0.0) # 目标朝向
logger.info(f"AGV Nav2 导航到 ({x}, {y}), yaw={rz}")
# ==================== 导航 ====================
nav_result = self._nav2_go_to_point(x, y, rz)
if not nav_result:
logger.warning(f"AGV 导航到点位 {point_name} 失败,跳过")
return {"point_name": point_name, "status": "failed", "error": "导航失败"}
def _navigate(self, point: dict, label: str) -> bool:
coords = point["coords"]
x, y = float(coords[0]), float(coords[1])
yaw = float(coords[2]) if len(coords) >= 3 else 0.0
self._log(f" 🧭 导航到{label}点位 ({x:.2f}, {y:.2f}, yaw={math.degrees(yaw):.0f}°)")
return self._nav2_go_to_point(x, y, yaw)
# 2. 执行该点位的所有姿态
poses = point.get("poses", [])
for j, pose in enumerate(poses):
self.current_pose_index = j
pose_result = self._execute_pose(point, pose, j)
result["poses"].append(pose_result)
# ==================== 二维码扫描 ====================
# 如果是"两者都要"类型,需要按顺序执行两台机器
if pose.get("type") == "both":
# 执行顺序由 pose.sequence 配置
sequence = pose.get("sequence", ["front_first"])
for step in sequence:
if step == "front":
self._capture_and_upload(point, pose, "front", j)
elif step == "back":
self._capture_and_upload(point, pose, "back", j)
else:
photo_type = pose.get("photo_type", "front")
self._capture_and_upload(point, pose, photo_type, j)
def _scan_qr_with_poses(self, qr_configs: list) -> Optional[str]:
"""用二维码配置中的姿态依次尝试"""
if not qr_configs:
self._log(f" ⚠️ 无二维码配置")
return self._request_manual_qr()
result["status"] = "completed"
return result
self._log(f" 🔍 尝试 {len(qr_configs)} 个二维码姿态...")
for i, qc in enumerate(qr_configs):
if self._stop.is_set():
return None
self._wait_pause()
def _execute_pose(self, point: dict, pose: dict, pose_idx: int) -> dict:
"""执行单个姿态的拍摄"""
photo_type = pose.get("photo_type", "front")
camera_source = pose.get("camera", "agv") # agv 或 arm
angles = qc.get("joint_angles", [])
if not angles or len(angles) < 6:
continue
# 如果需要机械臂运动
arm_angles = pose.get("arm_angles", None)
if arm_angles and self.arm_client:
self.arm_client.set_angles(arm_angles, speed=pose.get("speed", 500))
time.sleep(1) # 等待运动到位
name = qc.get("name", f"姿态{i+1}")
self._log(f" [{i+1}/{len(qr_configs)}] {name}")
return {
"pose_index": pose_idx,
"photo_type": photo_type,
"arm_angles": arm_angles,
"status": "ready"
}
# 调整机械臂
if self.arm_client:
self.arm_client.set_angles(angles, speed=500)
time.sleep(QR_POSE_WAIT)
def _capture_and_upload(self, point: dict, pose: dict, photo_type: str, pose_idx: int):
"""拍摄并上传"""
point_id = point.get("id", str(point))
# 扫码
qr = self._decode_qr_from_arm()
if qr:
self._log(f" ✅ 识别成功: {qr}")
return qr
# 确定 serialNumber
if photo_type == "front":
# 正面:从二维码获取 serialNumber
serial = self.qr_scanner.scan_with_retry(max_attempts=5, interval=0.5)
if not serial:
logger.warning(f"点位 {point.get('name')} 正面拍摄未扫描到二维码,跳过")
return
self.snapshot_serial_map[point_id] = serial
else:
# 背面:使用缓存的 serialNumber
serial = self.snapshot_serial_map.get(point_id)
if not serial:
logger.warning(f"点位 {point.get('name')} 背面拍摄但无缓存 serialNumber")
return
time.sleep(0.3)
# 拍摄图片(AGV 端摄像头)
frame = self.qr_scanner.read_frame()
self._log(f" ⚠️ 全部 {len(qr_configs)} 个姿态均未识别到二维码")
return self._request_manual_qr()
def _decode_qr_from_arm(self) -> Optional[str]:
"""从机械臂摄像头取一帧,识别二维码"""
for attempt in range(3):
try:
resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=QR_SCAN_TIMEOUT)
if resp.status_code != 200 or not resp.content:
continue
arr = np.frombuffer(resp.content, dtype=np.uint8)
frame = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if frame is None:
logger.error("摄像头读取失败")
continue
detector = cv2.QRCodeDetector()
data, bbox, _ = detector.detectAndDecode(frame)
if data and len(data) > 0:
return data
except Exception:
pass
time.sleep(0.5)
return None
def _request_manual_qr(self) -> Optional[str]:
"""暂停任务,等待手动输入"""
self.status = MissionStatus.WAITING_QR
self.report["status"] = "waiting_qr"
self.report["step"] = "等待手动输入二维码"
self._log(" ⌨️ 弹窗等待手动输入二维码...")
self._qr_event.clear()
if self._qr_event.wait(timeout=MANUAL_QR_TIMEOUT):
self.status = MissionStatus.RUNNING
self.report["status"] = "running"
self._log(f" ✏️ 手动输入: {self._qr_value}")
return self._qr_value
else:
self.status = MissionStatus.RUNNING
self.report["status"] = "running"
self._log(f" ⚠️ 等待超时({MANUAL_QR_TIMEOUT}s),跳过")
return None
def set_manual_qr(self, value: str):
self._qr_value = value.strip()
self._qr_event.set()
# ==================== 机型查询 ====================
def _lookup_model(self, qr_value: Optional[str]) -> str:
"""TODO: 后续通过 HTTP 接口查询机型"""
return "机器1"
@staticmethod
def _find_model(models: list, name: str) -> Optional[dict]:
"""在机型列表中找到匹配的机型"""
for m in models:
if m.get("name") == name or m.get("id") == name:
return m
# fallback: 第一个机型
return models[0] if models else None
# ==================== 姿态拍照 ====================
def _shoot(self, model: dict, side: str, row: int, col: int, qr_value: str):
"""按机型配置的所有姿态依次拍照"""
# 更新任务照片计数
task = self._get_task(row - 1, col - 1)
side_label = "正面" if side == "front" else "背面"
poses = model.get("poses", []) if side == "front" else model.get("poses_back", [])
if not poses:
self._log(f" ⚠️ 机型无{side_label}姿态配置")
return
# 保存图片
photo_dir = os.path.join(os.path.dirname(__file__), "..", "photos")
os.makedirs(photo_dir, exist_ok=True)
photo_path = os.path.join(photo_dir, f"{serial}_{photo_type}_{int(time.time())}.jpg")
import cv2
cv2.imwrite(photo_path, frame)
self._log(f" 📷 {side_label}拍照 ({len(poses)} 个姿态)")
for pi, pose in enumerate(poses):
if self._stop.is_set():
break
self._wait_pause()
# 上传
self.uploader.upload(photo_path, serial, pose_idx, photo_type)
logger.info(f"上传完成: {serial} {photo_type}")
angles = pose.get("arm_angles", [])
if not angles or len(angles) < 6:
self._log(f" 跳过 {pose.get('name', f'姿态{pi+1}')}: 无效角度")
continue
# ========== Nav2 导航 ==========
name = pose.get("name", f"{side_label}-{pi+1}")
self._log(f" 🎯 {name}")
# 调整机械臂
if self.arm_client:
self.arm_client.set_angles(angles, speed=500)
time.sleep(QR_POSE_WAIT)
# 拍照
path = self._capture_arm_photo(row, col, side, pi + 1, qr_value)
if path:
self._log(f" 💾 {os.path.basename(path)}")
def _capture_arm_photo(self, row: int, col: int, side: str,
pose_idx: int, qr_value: str) -> Optional[str]:
"""从机械臂摄像头拍照存本地"""
try:
resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=10)
if resp.status_code != 200 or not resp.content:
logger.error("arm snapshot 请求失败")
return None
os.makedirs(PHOTOS_DIR, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
fname = f"{ts}_r{row}c{col}_{side}_p{pose_idx}_{qr_value[:20]}.jpg"
fpath = os.path.join(PHOTOS_DIR, fname)
with open(fpath, "wb") as f:
f.write(resp.content)
return fpath
except Exception as e:
logger.error(f"拍照异常: {e}")
return None
# ==================== 控制 ====================
def _wait_pause(self):
"""等待暂停状态解除"""
self._pause.wait()
def pause(self):
self._pause.clear()
self.status = MissionStatus.PAUSED
self.report["status"] = "paused"
self.report["step"] = "已暂停"
self._log("⏸️ 任务已暂停")
def resume(self):
self._pause.set()
self.status = MissionStatus.RUNNING
self.report["status"] = "running"
self._log("▶️ 任务已恢复")
def stop(self):
self._stop.set()
self._pause.set() # 解除暂停
self._qr_event.set() # 解除 QR 等待
if self.arm_client:
try:
self.arm_client.task_stop()
except Exception:
pass
self.agv.stop()
self.status = MissionStatus.IDLE
self.report["status"] = "idle"
def get_status(self) -> dict:
return {
"status": self.report["status"],
"step": self.report["step"],
"progress": self.report["progress"],
"total": self.report["total"],
"tasks": self.report.get("tasks", []),
}
def get_logs(self) -> dict:
"""返回实时日志和完整状态"""
return self.report
# ==================== 状态报告 ====================
def _log(self, msg: str):
self.report["log"].append(msg)
# Keep last 500 entries
if len(self.report["log"]) > 500:
self.report["log"] = self.report["log"][-500:]
logger.info(msg)
def _step(self, text: str):
self.report["step"] = text
def _get_task(self, row: int, col: int) -> Optional[dict]:
"""获取指定行列的任务记录"""
for t in self.report.get("tasks", []):
if t["row"] == row and t["col"] == col:
return t
return None
def _progress(self, machine_idx: int, side_code: int):
"""side_code: 1=正面完成, 2=背面完成"""
if self.report["total"]:
self.report["progress"] = min(
int((machine_idx * 2 + side_code) / (self.report["total"] * 2) * 100),
99
)
# ==================== Nav2 导航 ====================
# (保留原实现)
def _nav2_check_available(self) -> bool:
"""检查 Nav2 action server 是否可用"""
try:
rc, out, err = self._run_ros2_cmd("ros2 action list")
if rc != 0:
return False
return "/navigate_to_pose" in out or "navigate_to_pose" in out
return "/navigate_to_pose" in out
except:
return False
def _nav2_send_goal(self, x: float, y: float, yaw: float, timeout_sec: float = 120.0) -> bool:
"""
通过 ros2 action 发送 navigate_to_pose 目标并等待结果
Returns:
是否成功到达
"""
# 检查 Nav2 可用性
def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0,
timeout_sec: float = 120.0) -> bool:
if not self._nav2_check_available():
logger.error("Nav2 action server 不可用,请确保 navigation2_active.launch.py 已启动")
logger.error("Nav2 action server 不可用")
return False
# 构建四元数
qz = math.sin(yaw / 2.0)
qw = math.cos(yaw / 2.0)
@@ -262,125 +632,66 @@ class MissionExecutor:
f" w: {qw}"
)
logger.info(f"Nav2 发送导航目标: ({x:.3f}, {y:.3f}), yaw={math.degrees(yaw):.1f}°")
# 发送 goal 并监听反馈和结果
cmd = (
f"ros2 action send_goal /navigate_to_pose "
f"nav2_msgs/action/NavigateToPose "
f"'{pose_yaml}' "
f"--feedback"
f"'{pose_yaml}' --feedback"
)
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'"
try:
process = subprocess.Popen(
full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
full_cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
succeeded = False
check_interval = 1.0
elapsed = 0.0
while elapsed < timeout_sec:
import select
reads = [process.stdout]
ready, _, _ = select.select(reads, [], [], check_interval)
elapsed += check_interval
ready, _, _ = select.select(reads, [], [], 1.0)
elapsed += 1.0
if process.stdout in ready:
line = process.stdout.readline()
if not line:
break
line_stripped = line.strip()
if "succeeded" in line_stripped.lower():
logger.info("✅ Nav2 导航成功到达目标")
ls = line.strip()
if "succeeded" in ls.lower():
succeeded = True
break
elif "failed" in line_stripped.lower() or "aborted" in line_stripped.lower():
logger.warning(f"⚠️ Nav2 导航失败: {line_stripped}")
elif "failed" in ls.lower() or "aborted" in ls.lower():
break
elif "canceled" in line_stripped.lower() or "cancelled" in line_stripped.lower():
logger.info("Nav2 导航被取消")
elif "canceled" in ls.lower() or "cancelled" in ls.lower():
break
if process.poll() is not None:
# 进程结束但没读到 succeeded
break
# 确保进程结束
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
return succeeded
except Exception as e:
logger.error(f"Nav2 导航异常: {e}")
logger.error(f"Nav2 异常: {e}")
return False
def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0,
timeout_sec: float = 120.0) -> bool:
"""
AGV 通过 Nav2 导航到目标点位
Args:
x, y: 目标世界坐标(米)
yaw: 目标朝向(弧度)
timeout_sec: 超时时间
Returns:
是否成功到达
"""
return self._nav2_send_goal(x, y, yaw, timeout_sec)
def _nav2_cancel(self):
"""取消当前 Nav2 导航"""
cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || ros2 action cancel /navigate_through_poses 2>/dev/null || true'"
cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || true'"
try:
subprocess.run(cancel_cmd, shell=True, timeout=3)
logger.info("Nav2 导航已取消")
except:
pass
def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple:
"""执行 ros2 命令"""
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'"
try:
result = subprocess.run(
full_cmd, shell=True, capture_output=True, text=True, timeout=timeout
full_cmd, shell=True,
capture_output=True, text=True, timeout=timeout
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", "Timeout"
except Exception as e:
return -1, "", str(e)
# ========== 状态查询 ==========
def get_status(self) -> dict:
return {
"task_status": self.status.value,
"current_point": self.current_point_index,
"current_pose": self.current_pose_index,
"agv_connected": self.agv.is_connected(),
"arm_connected": self.arm_client is not None,
"camera_opened": self.qr_scanner._cap is not None and self.qr_scanner._cap.isOpened()
}
def pause(self):
self.status = TaskStatus.PAUSED
def resume(self):
self.status = TaskStatus.RUNNING
def stop(self):
if self.arm_client:
self.arm_client.task_stop()
self.agv.stop()
self.status = TaskStatus.PENDING