Files
smart-inspection/agv_app/utils/arm_client.py
T
2026-06-13 14:07:19 +08:00

170 lines
5.9 KiB
Python

"""
机械臂通信客户端 - 通过 TCP 连接机械臂端 TCP 服务器
服务器再转发给 RoboFlow (630 Socket API)
"""
import socket
import time
import logging
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
class ArmClient:
"""TCP 客户端,连接机械臂端的 arm_server"""
def __init__(self, host: str, port: int, timeout: float = 10):
self.host = host
self.port = port
self.timeout = timeout
self._sock: Optional[socket.socket] = None
def connect(self) -> bool:
"""建立 TCP 连接"""
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.timeout)
self._sock.connect((self.host, self.port))
logger.info(f"已连接到机械臂 {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"连接机械臂失败: {e}")
return False
def send_command(self, cmd: str) -> Tuple[bool, str]:
"""发送命令并接收响应"""
if not self._sock:
return False, "未连接"
try:
# 发送命令(自动加换行)
self._sock.sendall((cmd + "\n").encode("utf-8"))
# 接收响应
resp = self._sock.recv(1024).decode("utf-8").strip()
return True, resp
except socket.timeout:
return False, "命令超时"
except Exception as e:
return False, str(e)
def close(self):
if self._sock:
self._sock.close()
self._sock = None
def reconnect(self) -> bool:
self.close()
time.sleep(1)
return self.connect()
# ========== 封装机械臂命令 ==========
def get_angles(self) -> Tuple[bool, List[float]]:
"""获取所有关节角度"""
ok, resp = self.send_command("get_angles()")
if ok:
try:
# 兼容 "get_angles:[-260.2,...]" 和 "[-260.2,...]" 两种格式
text = resp.split(":", 1)[-1] if ":" in resp else resp
text = text.strip()
if text.startswith("[") and text.endswith("]"):
nums = text[1:-1].split(",")
angles = [float(x) for x in nums]
if len(angles) == 6:
return True, angles
except:
pass
return False, []
def set_angles(self, angles: List[float], speed: int = 500) -> bool:
"""设置所有关节角度"""
if len(angles) != 6:
return False
cmd = f"set_angles({angles[0]:.2f},{angles[1]:.2f},{angles[2]:.2f},{angles[3]:.2f},{angles[4]:.2f},{angles[5]:.2f},{speed})"
ok, resp = self.send_command(cmd)
return ok and "ok" in resp
def set_angle(self, joint: str, angle: float, speed: int = 500) -> bool:
"""设置单个关节角度"""
cmd = f"set_angle({joint},{angle:.2f},{speed})"
ok, resp = self.send_command(cmd)
return ok and "ok" in resp
def jog_angle(self, joint: str, direction: int, speed: int = 500) -> bool:
"""连续调节关节角度(direction: -1负方向/0停止/1正方向)"""
cmd = f"jog_angle({joint},{direction},{speed})"
ok, resp = self.send_command(cmd)
return ok
def get_coords(self) -> Tuple[bool, List[float]]:
"""获取当前坐标和姿态 [x, y, z, rx, ry, rz]"""
ok, resp = self.send_command("get_coords()")
if ok:
try:
text = resp.split(":", 1)[-1] if ":" in resp else resp
text = text.strip()
if text.startswith("[") and text.endswith("]"):
nums = text[1:-1].split(",")
coords = [float(x) for x in nums]
if len(coords) == 6:
return True, coords
except:
pass
return False, []
def set_coords(self, coords: List[float], speed: int = 500) -> bool:
"""设置坐标和姿态"""
if len(coords) != 6:
return False
cmd = f"set_coords({coords[0]:.2f},{coords[1]:.2f},{coords[2]:.2f},{coords[3]:.2f},{coords[4]:.2f},{coords[5]:.2f},{speed})"
ok, resp = self.send_command(cmd)
return ok and "ok" in resp
def jog_coord(self, axis: str, direction: int, speed: int = 500) -> bool:
"""连续调节坐标轴"""
cmd = f"jog_coord({axis},{direction},{speed})"
ok, resp = self.send_command(cmd)
return ok
def power_on(self) -> bool:
ok, _ = self.send_command("power_on()")
return ok
def state_on(self) -> bool:
ok, _ = self.send_command("state_on()")
return ok
def state_off(self) -> bool:
ok, _ = self.send_command("state_off()")
return ok
def state_check(self) -> bool:
"""检查机械臂状态是否正常"""
ok, resp = self.send_command("state_check()")
# 兼容 "state_check:1" 和 "1" 两种格式
return ok and resp.strip().lstrip("state_check:") == "1"
def check_running(self) -> bool:
"""检查机械臂是否在运行"""
ok, resp = self.send_command("check_running()")
return ok and resp.strip().lstrip("check_running:") == "1"
def wait_done(self, timeout: float = 30) -> bool:
"""等待上一条命令执行完成"""
start = time.time()
while time.time() - start < timeout:
ok, resp = self.send_command("check_running()")
if ok and resp.strip().lstrip("check_running:") == "0":
return True
time.sleep(0.5)
return False
def task_stop(self) -> bool:
ok, _ = self.send_command("task_stop()")
return ok
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()