This commit is contained in:
ywb
2026-06-13 14:07:19 +08:00
parent 48121b2a05
commit cbc88def27
14 changed files with 626 additions and 80 deletions
+23 -14
View File
@@ -61,14 +61,18 @@ class ArmClient:
def get_angles(self) -> Tuple[bool, List[float]]:
"""获取所有关节角度"""
ok, resp = self.send_command("get_angles()")
if ok and resp.startswith("get_angles:["):
if ok:
try:
# get_angles:[0.174, 0.520, ...] → list
nums = resp.split("[")[1].split("]")[0]
angles = [float(x) for x in nums.split(",")]
return True, angles
# 兼容 "get_angles:[-260.2,...]" 和 "[-260.2,...]" 两种格式
text = resp.split(":", 1)[-1] if ":" in resp else resp
text = text.strip()
if text.startswith("[") and text.endswith("]"):
nums = text[1:-1].split(",")
angles = [float(x) for x in nums]
if len(angles) == 6:
return True, angles
except:
return False, []
pass
return False, []
def set_angles(self, angles: List[float], speed: int = 500) -> bool:
@@ -94,13 +98,17 @@ class ArmClient:
def get_coords(self) -> Tuple[bool, List[float]]:
"""获取当前坐标和姿态 [x, y, z, rx, ry, rz]"""
ok, resp = self.send_command("get_coords()")
if ok and "get_coords:" in resp:
if ok:
try:
nums = resp.split("[")[1].split("]")[0]
coords = [float(x) for x in nums.split(",")]
return True, coords
text = resp.split(":", 1)[-1] if ":" in resp else resp
text = text.strip()
if text.startswith("[") and text.endswith("]"):
nums = text[1:-1].split(",")
coords = [float(x) for x in nums]
if len(coords) == 6:
return True, coords
except:
return False, []
pass
return False, []
def set_coords(self, coords: List[float], speed: int = 500) -> bool:
@@ -132,19 +140,20 @@ class ArmClient:
def state_check(self) -> bool:
"""检查机械臂状态是否正常"""
ok, resp = self.send_command("state_check()")
return ok and resp == "state_check:1"
# 兼容 "state_check:1" 和 "1" 两种格式
return ok and resp.strip().lstrip("state_check:") == "1"
def check_running(self) -> bool:
"""检查机械臂是否在运行"""
ok, resp = self.send_command("check_running()")
return ok and resp == "check_running:1"
return ok and resp.strip().lstrip("check_running:") == "1"
def wait_done(self, timeout: float = 30) -> bool:
"""等待上一条命令执行完成"""
start = time.time()
while time.time() - start < timeout:
ok, resp = self.send_command("check_running()")
if ok and resp == "check_running:0":
if ok and resp.strip().lstrip("check_running:") == "0":
return True
time.sleep(0.5)
return False
+2 -2
View File
@@ -7,8 +7,8 @@ import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# ========== 网络配置(集中管理所有 IP 地址 — 修改此处即可全局生效)==========
AGV_HOST = "192.168.60.177"
ARM_HOST = "192.168.60.88"
AGV_HOST = "192.168.60.80"
ARM_HOST = "192.168.60.120"
# ========== AGV 参数 ==========
AGV_CONFIG = {
+62 -12
View File
@@ -29,19 +29,22 @@ class QRScanner:
def open(self) -> bool:
"""打开摄像头"""
try:
# 强制 V4L2 后端,获取标准彩色格式(与 test/server.py 一致)
# 强制 V4L2 后端
self._cap = cv2.VideoCapture(self.device_index, cv2.CAP_V4L2)
if self._cap.isOpened():
logger.info(f"摄像头 {self.device_index} 已打开 (V4L2)")
return True
else:
# fallback: 不指定后端
if not self._cap.isOpened():
self._cap = cv2.VideoCapture(self.device_index)
if self._cap.isOpened():
logger.info(f"摄像头 {self.device_index} 已打开 (默认后端)")
return True
if not self._cap.isOpened():
logger.error(f"无法打开摄像头 {self.device_index}")
return False
# 确保 OpenCV 做 BGR 转换(部分 V4L2 后端默认不做 YUYV→BGR 转换)
self._cap.set(cv2.CAP_PROP_CONVERT_RGB, 1)
# 设置分辨率(使用默认分辨率,不强制 MJPG)
w = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f"摄像头 {self.device_index} 已打开,分辨率 {w}x{h}")
return True
except Exception as e:
logger.error(f"摄像头打开失败: {e}")
return False
@@ -51,14 +54,61 @@ class QRScanner:
self._cap.release()
self._cap = None
def _fix_frame(self, frame: np.ndarray) -> Optional[np.ndarray]:
"""修复绿屏/格式错误帧,返回修复后的 BGR 帧或 None"""
if frame is None:
return None
h, w = frame.shape[:2]
if h < 10 or w < 10:
return None
ndim = len(frame.shape)
# 情况 1: 2 通道原始 YUYV → 手动转换 BGR
if ndim == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_YUYV)
logger.debug("YUYV 2ch → BGR 转换")
return frame
# 情况 2: 3 通道但实际帧数据显示为 YUYV(绿屏特征:G 通道全满,B/R 近空)
if ndim == 3:
g_mean = frame[:, :, 1].mean()
if g_mean > 220 and frame[:, :, 0].mean() < 30 and frame[:, :, 2].mean() < 30:
# 典型的"Lime"绿屏 — 当做 YUYV 原始数据解码
logger.debug(f"检测到绿屏 (G={g_mean:.0f}, B={frame[:,:,0].mean():.0f}, R={frame[:,:,2].mean():.0f}),尝试修复")
try:
# 把内存当做 YUYV 数据重新解析
raw_bytes = frame.tobytes()
# 3ch w*h 的数据量 = w*h*3 字节
# YUYV 每像素 2 字节,所以一幅 YUYV 图像的总字节 = w*h*2
# 我们只需要取前 w*h*2 字节作为 YUYV 数据
yuyv_len = w * h * 2
if len(raw_bytes) >= yuyv_len:
yuyv_img = np.frombuffer(raw_bytes[:yuyv_len], dtype=np.uint8).reshape(h, w * 2, 1)
frame = cv2.cvtColor(yuyv_img, cv2.COLOR_YUV2BGR_YUYV)
logger.debug("绿屏修复完成")
return frame
except Exception as e:
logger.warning(f"绿屏修复失败: {e}")
return None
# 情况 3: 全黑帧
if frame.mean() < 5:
logger.warning("全黑帧,丢弃")
return None
# 正常 BGR 帧
return frame
def read_frame(self) -> Optional[np.ndarray]:
"""读取一帧"""
if not self._cap or not self._cap.isOpened():
return None
ret, frame = self._cap.read()
if not ret:
if not ret or frame is None:
return None
return frame
return self._fix_frame(frame)
def detect_qr(self, frame: np.ndarray) -> Optional[str]:
"""从图像帧中检测二维码"""
@@ -96,4 +146,4 @@ class QRScanner:
return self
def __exit__(self, *args):
self.close()
self.close()