650 lines
22 KiB
Python
650 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
任务执行器 V3 — M×N 网格蛇形路径拍摄
|
|
|
|
工作流:
|
|
1. 根据 grid 生成蛇形路径(奇数行左→右,偶数行右→左)
|
|
2. 逐台机器:
|
|
- 正面:导航 → 扫码(多姿态重试) → 查机型 → 按姿态拍照
|
|
- 背面:导航 → 按姿态拍照
|
|
3. 回到 (0,0)
|
|
"""
|
|
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
import subprocess
|
|
import math
|
|
from typing import Optional, Dict, List
|
|
from enum import Enum
|
|
|
|
import requests
|
|
import cv2
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ROS2_SETUP_CMD = "source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash"
|
|
from config import ARM_CAMERA_CONFIG
|
|
ARM_CAMERA_SNAPSHOT = ARM_CAMERA_CONFIG["snapshot_url"]
|
|
PHOTOS_DIR = "/home/elephant/photos"
|
|
|
|
# 二维码扫描重试参数
|
|
QR_SCAN_TIMEOUT = 5 # 单次扫描超时
|
|
QR_POSE_WAIT = 1.5 # 调整姿态后等待时间
|
|
MANUAL_QR_TIMEOUT = 300 # 5分钟超时
|
|
|
|
|
|
class MissionStatus(str, Enum):
|
|
IDLE = "idle"
|
|
RUNNING = "running"
|
|
PAUSED = "paused"
|
|
COMPLETED = "completed"
|
|
WAITING_QR = "waiting_qr"
|
|
|
|
|
|
class MissionExecutorV3:
|
|
"""任务执行器 V3 — M×N 网格蛇形路径"""
|
|
|
|
_instance = None # 单例,供外部停止用
|
|
|
|
def __init__(self, config: dict):
|
|
self.config = config
|
|
self.status = MissionStatus.IDLE
|
|
MissionExecutorV3._instance = self
|
|
|
|
# 实时状态报告
|
|
self.report = {
|
|
"status": "idle",
|
|
"step": "",
|
|
"progress": 0,
|
|
"total": 0,
|
|
"log": [],
|
|
"error": None,
|
|
}
|
|
|
|
# 线程同步
|
|
self._stop = threading.Event()
|
|
self._pause = threading.Event()
|
|
self._pause.set() # 初始不暂停
|
|
self._qr_event = threading.Event()
|
|
self._qr_value: Optional[str] = None
|
|
|
|
# 设备
|
|
from .arm_client import ArmClient
|
|
self.arm_client = ArmClient(
|
|
config["arm"]["host"],
|
|
config["arm"]["port"]
|
|
)
|
|
|
|
from .agv_controller_ros2 import AGVController
|
|
self.agv = AGVController(
|
|
device=config.get("device", "/dev/agvpro_controller"),
|
|
baudrate=config.get("baudrate", 1000000)
|
|
)
|
|
|
|
# ==================== 连接 ====================
|
|
|
|
def connect_all(self) -> Dict[str, bool]:
|
|
results = {}
|
|
results["agv"] = self.agv.connect()
|
|
results["arm"] = self.arm_client.connect()
|
|
return results
|
|
|
|
def disconnect_all(self):
|
|
if self.arm_client:
|
|
self.arm_client.close()
|
|
self.agv.disconnect()
|
|
|
|
# ==================== 主任务流程 ====================
|
|
|
|
def execute_mission(
|
|
self,
|
|
mission_config: dict,
|
|
machines: list,
|
|
qr_configs: list,
|
|
models: list,
|
|
) -> dict:
|
|
"""
|
|
执行完整拍摄任务。
|
|
|
|
Args:
|
|
mission_config: {rows, cols, grid, positions}
|
|
machines: [{id, row, col, front: {coords}, back: {coords}}]
|
|
qr_configs: [{id, name, joint_angles: [a1..a6]}]
|
|
models: [{id, name, poses: [{name, arm_angles}], poses_back: [...]}]
|
|
|
|
Returns:
|
|
执行报告 dict
|
|
"""
|
|
self.status = MissionStatus.RUNNING
|
|
self.report = {"status": "running", "step": "初始化", "progress": 0, "total": 0, "log": [], "error": None}
|
|
self._stop.clear()
|
|
self._pause.set()
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
rows = int(mission_config.get("rows", 1))
|
|
cols = int(mission_config.get("cols", 1))
|
|
grid = mission_config.get("grid", [])
|
|
positions = mission_config.get("positions", [])
|
|
|
|
# 1. 生成蛇形路径
|
|
path = self._build_snake_path(rows, cols, grid)
|
|
if not path:
|
|
self._log("❌ 网格中没有机器,任务终止")
|
|
self.report["error"] = "No machines in grid"
|
|
return self._finish(0)
|
|
|
|
self.report["total"] = len(path)
|
|
self._log(f"📍 蛇形路径生成: {len(path)} 台机器")
|
|
|
|
# 初始化任务列表
|
|
self.report["tasks"] = [{
|
|
"row": r, "col": c,
|
|
"machine_id": f"m_{r}_{c}",
|
|
"label": f"{r+1}-{c+1}",
|
|
"status": "pending",
|
|
"step": "等待",
|
|
"qr_value": None,
|
|
"photos_front": 0,
|
|
"photos_back": 0,
|
|
} for (r, c) in path]
|
|
|
|
# 初始化任务列表
|
|
self.report["tasks"] = [{
|
|
"row": r, "col": c,
|
|
"machine_id": f"m_{r}_{c}",
|
|
"label": f"{r+1}-{c+1}",
|
|
"status": "pending",
|
|
"step": "等待",
|
|
"qr_value": None,
|
|
"photos_front": 0,
|
|
"photos_back": 0,
|
|
} for (r, c) in path]
|
|
|
|
# 2. 逐台执行
|
|
for idx, (r, c) in enumerate(path):
|
|
if self._stop.is_set():
|
|
self._log("⏹️ 任务已停止")
|
|
break
|
|
|
|
self._wait_pause()
|
|
|
|
# 更新任务状态 → 正面开始
|
|
task = self._get_task(r, c)
|
|
if task:
|
|
task["status"] = "active"
|
|
task["step"] = "正面扫码"
|
|
|
|
machine_id = f"m_{r}_{c}"
|
|
machine = next((m for m in machines if m.get("id") == machine_id), None)
|
|
if not machine:
|
|
self._log(f"⚠️ 机器 {r+1}-{c+1} 不存在,跳过")
|
|
continue
|
|
|
|
rl, cl = r + 1, c + 1 # 显示用的 1-based
|
|
|
|
# --- 正面 ---
|
|
self._log(f"📍 机器 {rl}-{cl} 进入正面点位")
|
|
self._step(f"机器 {rl}-{cl} 正面")
|
|
|
|
# 导航到正面点位
|
|
front_pt = self._find_point(positions, r, c, "front")
|
|
if front_pt and self._has_coords(front_pt):
|
|
if not self._navigate(front_pt, "正面"):
|
|
self._log(f"⚠️ 导航失败,尝试继续")
|
|
else:
|
|
self._log(f"⚠️ 无正面点位坐标")
|
|
|
|
# 扫描二维码
|
|
qr_value = self._scan_qr_with_poses(qr_configs)
|
|
if self._stop.is_set():
|
|
break
|
|
|
|
# 查机型 + 更新任务步骤
|
|
model_name = self._lookup_model(qr_value)
|
|
self._log(f" 🏷️ 机型: {model_name}")
|
|
if task and qr_value:
|
|
task["qr_value"] = qr_value
|
|
if task:
|
|
task["step"] = "正面拍照"
|
|
|
|
# 正面拍照
|
|
model = self._find_model(models, model_name)
|
|
if model:
|
|
self._shoot(model, "front", rl, cl, qr_value or "unknown")
|
|
else:
|
|
self._log(f" ⚠️ 未找到机型 {model_name}")
|
|
|
|
self._progress(idx, 1)
|
|
|
|
# --- 背面 ---
|
|
if task:
|
|
task["step"] = "背面拍照"
|
|
self._log(f"📍 机器 {rl}-{cl} 进入背面点位")
|
|
self._step(f"机器 {rl}-{cl} 背面")
|
|
|
|
# 导航到背面点位
|
|
back_pt = self._find_point(positions, r + 1, c, "back")
|
|
if back_pt and self._has_coords(back_pt):
|
|
if not self._navigate(back_pt, "背面"):
|
|
self._log(f"⚠️ 导航失败,尝试继续")
|
|
else:
|
|
self._log(f"⚠️ 无背面点位坐标")
|
|
|
|
# 背面拍照
|
|
if model:
|
|
self._shoot(model, "back", rl, cl, qr_value or "unknown")
|
|
|
|
# 标记任务完成
|
|
if task:
|
|
task["status"] = "completed"
|
|
task["step"] = "完成"
|
|
self._progress(idx, 2)
|
|
|
|
# 3. 回到出发点
|
|
if not self._stop.is_set():
|
|
self._step("返回出发点")
|
|
self._log("→ 返回 (0, 0)")
|
|
self._nav2_go_to_point(0, 0, 0, timeout_sec=60)
|
|
|
|
elapsed = time.time() - start_time
|
|
return self._finish(elapsed)
|
|
|
|
except Exception as e:
|
|
self._log(f"❌ 任务异常: {e}")
|
|
logger.exception("execute_mission 崩溃")
|
|
self.report["error"] = str(e)
|
|
self.status = MissionStatus.IDLE
|
|
self.report["status"] = "idle"
|
|
return self.report
|
|
|
|
def _finish(self, elapsed: float) -> dict:
|
|
if self._stop.is_set():
|
|
self._step("已停止")
|
|
else:
|
|
self._step("完成")
|
|
self._log(f"✅ 任务完成 ({elapsed:.0f}s)")
|
|
self.report["progress"] = 100
|
|
self.status = MissionStatus.IDLE
|
|
self.report["status"] = "idle"
|
|
return self.report
|
|
|
|
# ==================== 蛇形路径 ====================
|
|
|
|
def _build_snake_path(self, rows: int, cols: int, grid: list) -> list:
|
|
"""奇数行(0,2,4...)左→右,偶数行(1,3,5...)右→左"""
|
|
path = []
|
|
for r in range(rows):
|
|
if r % 2 == 0:
|
|
for c in range(cols):
|
|
if self._has_machine(grid, r, c):
|
|
path.append((r, c))
|
|
else:
|
|
for c in range(cols - 1, -1, -1):
|
|
if self._has_machine(grid, r, c):
|
|
path.append((r, c))
|
|
return path
|
|
|
|
@staticmethod
|
|
def _has_machine(grid: list, r: int, c: int) -> bool:
|
|
if not grid or r >= len(grid):
|
|
return False
|
|
row = grid[r]
|
|
if isinstance(row, list):
|
|
return c < len(row) and bool(row[c])
|
|
return False
|
|
|
|
# ==================== 点位查找 ====================
|
|
|
|
@staticmethod
|
|
def _find_point(positions: list, row: int, col: int, side: str) -> Optional[dict]:
|
|
for p in positions:
|
|
if p.get("row") == row and p.get("col") == col and p.get("side") == side:
|
|
return p
|
|
return None
|
|
|
|
@staticmethod
|
|
def _has_coords(point: dict) -> bool:
|
|
coords = point.get("coords", [])
|
|
return len(coords) >= 2 and (coords[0] != 0 or coords[1] != 0)
|
|
|
|
# ==================== 导航 ====================
|
|
|
|
def _navigate(self, point: dict, label: str) -> bool:
|
|
coords = point["coords"]
|
|
x, y = float(coords[0]), float(coords[1])
|
|
yaw = float(coords[2]) if len(coords) >= 3 else 0.0
|
|
self._log(f" 🧭 导航到{label}点位 ({x:.2f}, {y:.2f}, yaw={math.degrees(yaw):.0f}°)")
|
|
return self._nav2_go_to_point(x, y, yaw)
|
|
|
|
# ==================== 二维码扫描 ====================
|
|
|
|
def _scan_qr_with_poses(self, qr_configs: list) -> Optional[str]:
|
|
"""用二维码配置中的姿态依次尝试"""
|
|
if not qr_configs:
|
|
self._log(f" ⚠️ 无二维码配置")
|
|
return self._request_manual_qr()
|
|
|
|
self._log(f" 🔍 尝试 {len(qr_configs)} 个二维码姿态...")
|
|
for i, qc in enumerate(qr_configs):
|
|
if self._stop.is_set():
|
|
return None
|
|
self._wait_pause()
|
|
|
|
angles = qc.get("joint_angles", [])
|
|
if not angles or len(angles) < 6:
|
|
continue
|
|
|
|
name = qc.get("name", f"姿态{i+1}")
|
|
self._log(f" [{i+1}/{len(qr_configs)}] {name}")
|
|
|
|
# 调整机械臂
|
|
if self.arm_client:
|
|
self.arm_client.set_angles(angles, speed=500)
|
|
time.sleep(QR_POSE_WAIT)
|
|
|
|
# 扫码
|
|
qr = self._decode_qr_from_arm()
|
|
if qr:
|
|
self._log(f" ✅ 识别成功: {qr}")
|
|
return qr
|
|
|
|
time.sleep(0.3)
|
|
|
|
self._log(f" ⚠️ 全部 {len(qr_configs)} 个姿态均未识别到二维码")
|
|
return self._request_manual_qr()
|
|
|
|
def _decode_qr_from_arm(self) -> Optional[str]:
|
|
"""从机械臂摄像头取一帧,识别二维码"""
|
|
for attempt in range(3):
|
|
try:
|
|
resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=QR_SCAN_TIMEOUT)
|
|
if resp.status_code != 200 or not resp.content:
|
|
continue
|
|
|
|
arr = np.frombuffer(resp.content, dtype=np.uint8)
|
|
frame = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
if frame is None:
|
|
continue
|
|
|
|
detector = cv2.QRCodeDetector()
|
|
data, bbox, _ = detector.detectAndDecode(frame)
|
|
if data and len(data) > 0:
|
|
return data
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.5)
|
|
return None
|
|
|
|
def _request_manual_qr(self) -> Optional[str]:
|
|
"""暂停任务,等待手动输入"""
|
|
self.status = MissionStatus.WAITING_QR
|
|
self.report["status"] = "waiting_qr"
|
|
self.report["step"] = "等待手动输入二维码"
|
|
self._log(" ⌨️ 弹窗等待手动输入二维码...")
|
|
|
|
self._qr_event.clear()
|
|
if self._qr_event.wait(timeout=MANUAL_QR_TIMEOUT):
|
|
self.status = MissionStatus.RUNNING
|
|
self.report["status"] = "running"
|
|
self._log(f" ✏️ 手动输入: {self._qr_value}")
|
|
return self._qr_value
|
|
else:
|
|
self.status = MissionStatus.RUNNING
|
|
self.report["status"] = "running"
|
|
self._log(f" ⚠️ 等待超时({MANUAL_QR_TIMEOUT}s),跳过")
|
|
return None
|
|
|
|
def set_manual_qr(self, value: str):
|
|
self._qr_value = value.strip()
|
|
self._qr_event.set()
|
|
|
|
# ==================== 机型查询 ====================
|
|
|
|
def _lookup_model(self, qr_value: Optional[str]) -> str:
|
|
"""TODO: 后续通过 HTTP 接口查询机型"""
|
|
return "机器1"
|
|
|
|
@staticmethod
|
|
def _find_model(models: list, name: str) -> Optional[dict]:
|
|
"""在机型列表中找到匹配的机型"""
|
|
for m in models:
|
|
if m.get("name") == name or m.get("id") == name:
|
|
return m
|
|
# fallback: 第一个机型
|
|
return models[0] if models else None
|
|
|
|
# ==================== 姿态拍照 ====================
|
|
|
|
def _shoot(self, model: dict, side: str, row: int, col: int, qr_value: str):
|
|
"""按机型配置的所有姿态依次拍照"""
|
|
# 更新任务照片计数
|
|
task = self._get_task(row - 1, col - 1)
|
|
side_label = "正面" if side == "front" else "背面"
|
|
poses = model.get("poses", []) if side == "front" else model.get("poses_back", [])
|
|
if not poses:
|
|
self._log(f" ⚠️ 机型无{side_label}姿态配置")
|
|
return
|
|
|
|
self._log(f" 📷 {side_label}拍照 ({len(poses)} 个姿态)")
|
|
for pi, pose in enumerate(poses):
|
|
if self._stop.is_set():
|
|
break
|
|
self._wait_pause()
|
|
|
|
angles = pose.get("arm_angles", [])
|
|
if not angles or len(angles) < 6:
|
|
self._log(f" 跳过 {pose.get('name', f'姿态{pi+1}')}: 无效角度")
|
|
continue
|
|
|
|
name = pose.get("name", f"{side_label}-{pi+1}")
|
|
self._log(f" 🎯 {name}")
|
|
|
|
# 调整机械臂
|
|
if self.arm_client:
|
|
self.arm_client.set_angles(angles, speed=500)
|
|
time.sleep(QR_POSE_WAIT)
|
|
|
|
# 拍照
|
|
path = self._capture_arm_photo(row, col, side, pi + 1, qr_value)
|
|
if path:
|
|
self._log(f" 💾 {os.path.basename(path)}")
|
|
|
|
def _capture_arm_photo(self, row: int, col: int, side: str,
|
|
pose_idx: int, qr_value: str) -> Optional[str]:
|
|
"""从机械臂摄像头拍照存本地"""
|
|
try:
|
|
resp = requests.get(ARM_CAMERA_SNAPSHOT, timeout=10)
|
|
if resp.status_code != 200 or not resp.content:
|
|
logger.error("arm snapshot 请求失败")
|
|
return None
|
|
|
|
os.makedirs(PHOTOS_DIR, exist_ok=True)
|
|
ts = time.strftime("%Y%m%d_%H%M%S")
|
|
fname = f"{ts}_r{row}c{col}_{side}_p{pose_idx}_{qr_value[:20]}.jpg"
|
|
fpath = os.path.join(PHOTOS_DIR, fname)
|
|
with open(fpath, "wb") as f:
|
|
f.write(resp.content)
|
|
return fpath
|
|
except Exception as e:
|
|
logger.error(f"拍照异常: {e}")
|
|
return None
|
|
|
|
# ==================== 控制 ====================
|
|
|
|
def _wait_pause(self):
|
|
"""等待暂停状态解除"""
|
|
self._pause.wait()
|
|
|
|
def pause(self):
|
|
self._pause.clear()
|
|
self.status = MissionStatus.PAUSED
|
|
self.report["status"] = "paused"
|
|
self.report["step"] = "已暂停"
|
|
self._log("⏸️ 任务已暂停")
|
|
|
|
def resume(self):
|
|
self._pause.set()
|
|
self.status = MissionStatus.RUNNING
|
|
self.report["status"] = "running"
|
|
self._log("▶️ 任务已恢复")
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
self._pause.set() # 解除暂停
|
|
self._qr_event.set() # 解除 QR 等待
|
|
if self.arm_client:
|
|
try:
|
|
self.arm_client.task_stop()
|
|
except Exception:
|
|
pass
|
|
self.agv.stop()
|
|
self.status = MissionStatus.IDLE
|
|
self.report["status"] = "idle"
|
|
|
|
def get_status(self) -> dict:
|
|
return {
|
|
"status": self.report["status"],
|
|
"step": self.report["step"],
|
|
"progress": self.report["progress"],
|
|
"total": self.report["total"],
|
|
}
|
|
|
|
def get_logs(self) -> dict:
|
|
"""返回实时日志和完整状态"""
|
|
return self.report
|
|
|
|
# ==================== 状态报告 ====================
|
|
|
|
def _log(self, msg: str):
|
|
self.report["log"].append(msg)
|
|
# Keep last 500 entries
|
|
if len(self.report["log"]) > 500:
|
|
self.report["log"] = self.report["log"][-500:]
|
|
logger.info(msg)
|
|
|
|
def _step(self, text: str):
|
|
self.report["step"] = text
|
|
|
|
def _get_task(self, row: int, col: int) -> Optional[dict]:
|
|
"""获取指定行列的任务记录"""
|
|
for t in self.report.get("tasks", []):
|
|
if t["row"] == row and t["col"] == col:
|
|
return t
|
|
return None
|
|
|
|
def _progress(self, machine_idx: int, side_code: int):
|
|
"""side_code: 1=正面完成, 2=背面完成"""
|
|
if self.report["total"]:
|
|
self.report["progress"] = min(
|
|
int((machine_idx * 2 + side_code) / (self.report["total"] * 2) * 100),
|
|
99
|
|
)
|
|
|
|
# ==================== Nav2 导航 ====================
|
|
# (保留原实现)
|
|
|
|
def _nav2_check_available(self) -> bool:
|
|
try:
|
|
rc, out, err = self._run_ros2_cmd("ros2 action list")
|
|
if rc != 0:
|
|
return False
|
|
return "/navigate_to_pose" in out
|
|
except:
|
|
return False
|
|
|
|
def _nav2_go_to_point(self, x: float, y: float, yaw: float = 0.0,
|
|
timeout_sec: float = 120.0) -> bool:
|
|
if not self._nav2_check_available():
|
|
logger.error("Nav2 action server 不可用")
|
|
return False
|
|
|
|
qz = math.sin(yaw / 2.0)
|
|
qw = math.cos(yaw / 2.0)
|
|
|
|
pose_yaml = (
|
|
f"pose:\n"
|
|
f" header:\n"
|
|
f" stamp:\n"
|
|
f" sec: 0\n"
|
|
f" nanosec: 0\n"
|
|
f" frame_id: 'map'\n"
|
|
f" pose:\n"
|
|
f" position:\n"
|
|
f" x: {x}\n"
|
|
f" y: {y}\n"
|
|
f" z: 0.0\n"
|
|
f" orientation:\n"
|
|
f" x: 0.0\n"
|
|
f" y: 0.0\n"
|
|
f" z: {qz}\n"
|
|
f" w: {qw}"
|
|
)
|
|
|
|
cmd = (
|
|
f"ros2 action send_goal /navigate_to_pose "
|
|
f"nav2_msgs/action/NavigateToPose "
|
|
f"'{pose_yaml}' --feedback"
|
|
)
|
|
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'"
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
full_cmd, shell=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
|
|
)
|
|
succeeded = False
|
|
elapsed = 0.0
|
|
while elapsed < timeout_sec:
|
|
import select
|
|
reads = [process.stdout]
|
|
ready, _, _ = select.select(reads, [], [], 1.0)
|
|
elapsed += 1.0
|
|
if process.stdout in ready:
|
|
line = process.stdout.readline()
|
|
if not line:
|
|
break
|
|
ls = line.strip()
|
|
if "succeeded" in ls.lower():
|
|
succeeded = True
|
|
break
|
|
elif "failed" in ls.lower() or "aborted" in ls.lower():
|
|
break
|
|
elif "canceled" in ls.lower() or "cancelled" in ls.lower():
|
|
break
|
|
if process.poll() is not None:
|
|
break
|
|
if process.poll() is None:
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
return succeeded
|
|
except Exception as e:
|
|
logger.error(f"Nav2 异常: {e}")
|
|
return False
|
|
|
|
def _nav2_cancel(self):
|
|
cancel_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 action cancel /navigate_to_pose 2>/dev/null || true'"
|
|
try:
|
|
subprocess.run(cancel_cmd, shell=True, timeout=3)
|
|
except:
|
|
pass
|
|
|
|
def _run_ros2_cmd(self, cmd: str, timeout: float = 5.0) -> tuple:
|
|
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'"
|
|
try:
|
|
result = subprocess.run(
|
|
full_cmd, shell=True,
|
|
capture_output=True, text=True, timeout=timeout
|
|
)
|
|
return result.returncode, result.stdout.strip(), result.stderr.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return -1, "", "Timeout"
|
|
except Exception as e:
|
|
return -1, "", str(e) |