""" 机械臂通信客户端 - 通过 TCP 连接机械臂端 TCP 服务器 服务器再转发给 RoboFlow (630 Socket API) """ import socket import time import logging from typing import List, Optional, Tuple logger = logging.getLogger(__name__) class ArmClient: """TCP 客户端,连接机械臂端的 arm_server""" def __init__(self, host: str, port: int, timeout: float = 10): self.host = host self.port = port self.timeout = timeout self._sock: Optional[socket.socket] = None def connect(self) -> bool: """建立 TCP 连接""" try: self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.settimeout(self.timeout) self._sock.connect((self.host, self.port)) logger.info(f"已连接到机械臂 {self.host}:{self.port}") return True except Exception as e: logger.error(f"连接机械臂失败: {e}") return False def send_command(self, cmd: str) -> Tuple[bool, str]: """发送命令并接收响应""" if not self._sock: return False, "未连接" try: # 发送命令(自动加换行) self._sock.sendall((cmd + "\n").encode("utf-8")) # 接收响应 resp = self._sock.recv(1024).decode("utf-8").strip() return True, resp except socket.timeout: return False, "命令超时" except Exception as e: return False, str(e) def close(self): if self._sock: self._sock.close() self._sock = None def reconnect(self) -> bool: self.close() time.sleep(1) return self.connect() # ========== 封装机械臂命令 ========== def get_angles(self) -> Tuple[bool, List[float]]: """获取所有关节角度""" ok, resp = self.send_command("get_angles()") if ok and resp.startswith("get_angles:["): try: # get_angles:[0.174, 0.520, ...] → list nums = resp.split("[")[1].split("]")[0] angles = [float(x) for x in nums.split(",")] return True, angles except: return False, [] return False, [] def set_angles(self, angles: List[float], speed: int = 500) -> bool: """设置所有关节角度""" if len(angles) != 6: return False cmd = f"set_angles({angles[0]:.2f},{angles[1]:.2f},{angles[2]:.2f},{angles[3]:.2f},{angles[4]:.2f},{angles[5]:.2f},{speed})" ok, resp = self.send_command(cmd) return ok and "ok" in resp def set_angle(self, joint: str, angle: float, speed: int = 500) -> bool: """设置单个关节角度""" cmd = f"set_angle({joint},{angle:.2f},{speed})" ok, resp = self.send_command(cmd) return ok and "ok" in resp def jog_angle(self, joint: str, direction: int, speed: int = 500) -> bool: """连续调节关节角度(direction: -1负方向/0停止/1正方向)""" cmd = f"jog_angle({joint},{direction},{speed})" ok, resp = self.send_command(cmd) return ok def get_coords(self) -> Tuple[bool, List[float]]: """获取当前坐标和姿态 [x, y, z, rx, ry, rz]""" ok, resp = self.send_command("get_coords()") if ok and "get_coords:" in resp: try: nums = resp.split("[")[1].split("]")[0] coords = [float(x) for x in nums.split(",")] return True, coords except: return False, [] return False, [] def set_coords(self, coords: List[float], speed: int = 500) -> bool: """设置坐标和姿态""" if len(coords) != 6: return False cmd = f"set_coords({coords[0]:.2f},{coords[1]:.2f},{coords[2]:.2f},{coords[3]:.2f},{coords[4]:.2f},{coords[5]:.2f},{speed})" ok, resp = self.send_command(cmd) return ok and "ok" in resp def jog_coord(self, axis: str, direction: int, speed: int = 500) -> bool: """连续调节坐标轴""" cmd = f"jog_coord({axis},{direction},{speed})" ok, resp = self.send_command(cmd) return ok def power_on(self) -> bool: ok, _ = self.send_command("power_on()") return ok def state_on(self) -> bool: ok, _ = self.send_command("state_on()") return ok def state_off(self) -> bool: ok, _ = self.send_command("state_off()") return ok def state_check(self) -> bool: """检查机械臂状态是否正常""" ok, resp = self.send_command("state_check()") return ok and resp == "state_check:1" def check_running(self) -> bool: """检查机械臂是否在运行""" ok, resp = self.send_command("check_running()") return ok and resp == "check_running:1" def wait_done(self, timeout: float = 30) -> bool: """等待上一条命令执行完成""" start = time.time() while time.time() - start < timeout: ok, resp = self.send_command("check_running()") if ok and resp == "check_running:0": return True time.sleep(0.5) return False def task_stop(self) -> bool: ok, _ = self.send_command("task_stop()") return ok def __enter__(self): self.connect() return self def __exit__(self, *args): self.close()