Files
smart-inspection/arm_server/arm_server.py
T
2026-05-21 19:44:25 +08:00

308 lines
9.9 KiB
Python

"""
机械臂服务端 - 机械臂端主程序
运行在 10.247.46.165 上,端口 5002 (TCP) + 5003 (视频流)
通过 TCP Socket 接收 AGV 发来的指令,转发给 RoboFlow (630 Socket API)
同时通过 ffmpeg 提供 HTTP 视频流
"""
import socket
import threading
import time
import logging
import os
import sys
import subprocess
from flask import Flask, Response, jsonify
from werkzeug.serving import make_server
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.expanduser("~/work/arm_server/server.log"))
]
)
logger = logging.getLogger("arm_server")
# ========== Flask HTTP 服务器 - 视频流 (ffmpeg) ==========
arm_video_app = Flask(__name__)
ARM_CAMERA_INDEX = 0 # 机械臂端摄像头设备号
_ffmpeg_proc = None
def _ensure_ffmpeg():
"""确保 ffmpeg 进程在运行,自动重启崩溃的进程"""
global _ffmpeg_proc
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
logger.info(f"启动 ffmpeg 视频流 (Video{ARM_CAMERA_INDEX})")
_ffmpeg_proc = subprocess.Popen(
[
"ffmpeg",
"-f", "v4l2",
"-input_format", "mjpeg",
"-re",
"-i", f"/dev/video{ARM_CAMERA_INDEX}",
"-vf", "rotate=PI",
"-q:v", "8",
"-f", "mjpeg",
"-"
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
time.sleep(2.0) # 等待 ffmpeg 初始化完成(设备冷启动可能需要更久)
@arm_video_app.route("/api/camera/preview")
def arm_camera_preview():
"""机械臂摄像头 MJPEG 流 (ffmpeg)"""
_ensure_ffmpeg()
def generate():
global _ffmpeg_proc
try:
while True:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
_ensure_ffmpeg()
jpeg = _ffmpeg_proc.stdout.read(65536)
if not jpeg:
# ffmpeg 无数据输出,重启它
logger.warning("ffmpeg stdout 空,重启")
_ffmpeg_proc.terminate()
_ffmpeg_proc = None
_ensure_ffmpeg()
continue
yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + jpeg + b"\r\n")
except Exception as e:
logger.error(f"视频流异常: {e}")
finally:
logger.info("视频流连接关闭")
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
@arm_video_app.route("/api/camera/status")
def arm_camera_status():
"""摄像头状态"""
global _ffmpeg_proc
running = _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None
return jsonify({"opened": running})
@arm_video_app.route("/api/camera/restart", methods=["POST"])
def arm_camera_restart():
"""重启视频流"""
global _ffmpeg_proc
if _ffmpeg_proc:
_ffmpeg_proc.terminate()
_ffmpeg_proc = None
_ensure_ffmpeg()
return jsonify({"ok": True})
@arm_video_app.route("/api/camera/snapshot")
def arm_camera_snapshot():
"""机械臂摄像头单帧 JPEG — pkill -9 强杀旧 ffmpeg,再临时抓一帧"""
import subprocess
global _ffmpeg_proc
# 用 pkill -9 强杀所有 ffmpeg 进程,释放 /dev/video0
subprocess.run(["pkill", "-9", "-f", "ffmpeg"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=3)
time.sleep(0.3)
_ffmpeg_proc = None
proc = subprocess.run(
[
"ffmpeg",
"-f", "v4l2",
"-input_format", "mjpeg",
"-i", f"/dev/video{ARM_CAMERA_INDEX}",
"-vf", "rotate=PI",
"-vframes", "1",
"-q:v", "8",
"-f", "mjpeg",
"pipe:1"
],
stdout=subprocess.PIPE,
timeout=5,
stderr=subprocess.DEVNULL
)
if proc.returncode == 0 and proc.stdout:
r = Response(proc.stdout, mimetype="image/jpeg")
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, max-age=0"
r.headers["Pragma"] = "no-cache"
r.headers["Expires"] = "0"
return r
logger.warning(f"ffmpeg snapshot failed: rc={proc.returncode}")
return "", 500
# ========== RoboFlow 630 Socket API 客户端 ==========
class RoboFlowClient:
"""通过 Socket 连接 RoboFlow 630 机械臂控制盒"""
def __init__(self, host: str = "127.0.0.1", port: int = 5001, timeout: float = 10):
self.host = host
self.port = port
self.timeout = timeout
self._sock: socket.socket = None
def connect(self) -> bool:
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.timeout)
self._sock.connect((self.host, self.port))
logger.info(f"已连接到 RoboFlow {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"连接 RoboFlow 失败: {e}")
return False
def send_recv(self, cmd: str) -> str:
"""发送命令并等待响应"""
if not self._sock:
raise ConnectionError("未连接到 RoboFlow")
try:
self._sock.sendall((cmd + "\n").encode("utf-8"))
resp = self._sock.recv(4096).decode("utf-8").strip()
return resp
except socket.timeout:
return "ERROR: timeout"
except Exception as e:
return f"ERROR: {e}"
def close(self):
if self._sock:
self._sock.close()
self._sock = None
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()
# ========== TCP 服务器 - 接收 AGV 指令 ==========
class AGVCommandServer:
"""TCP 服务器,接收 AGV 发来的指令"""
def __init__(self, host: str = "0.0.0.0", port: int = 5002):
self.host = host
self.port = port
self._sock: socket.socket = None
self._running = False
self.roboflow: RoboFlowClient = None
self._connect_roboflow()
def _connect_roboflow(self):
self.roboflow = RoboFlowClient()
if self.roboflow.connect():
logger.info("RoboFlow 连接成功")
else:
logger.warning("RoboFlow 连接失败,服务将以 limited 模式运行")
def start(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self.host, self.port))
self._sock.listen(5)
self._running = True
logger.info(f"=" * 50)
logger.info(f"机械臂服务端已启动,监听 {self.host}:{self.port}")
logger.info(f"等待 AGV 连接...")
logger.info(f"=" * 50)
while self._running:
try:
self._sock.settimeout(1.0)
try:
client_sock, addr = self._sock.accept()
logger.info(f"AGV 已连接: {addr}")
threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True).start()
except socket.timeout:
continue
except Exception as e:
if self._running:
logger.error(f"服务器异常: {e}")
break
def _handle_client(self, client_sock: socket.socket):
try:
client_sock.settimeout(30)
buffer = ""
while self._running:
try:
data = client_sock.recv(4096)
if not data:
break
buffer += data.decode("utf-8")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
response = self._execute_command(line)
client_sock.sendall((response + "\n").encode("utf-8"))
logger.info(f"CMD: {line}{response}")
except socket.timeout:
continue
except Exception as e:
logger.error(f"客户端处理异常: {e}")
finally:
client_sock.close()
logger.info("AGV 客户端已断开")
def _execute_command(self, cmd: str) -> str:
if not self.roboflow or not self.roboflow._sock:
return f"ERROR: RoboFlow not connected"
try:
return self.roboflow.send_recv(cmd)
except Exception as e:
return f"ERROR: {e}"
def stop(self):
self._running = False
if self._sock:
try:
self._sock.close()
except:
pass
if self.roboflow:
self.roboflow.close()
logger.info("机械臂服务端已停止")
# ========== 入口 ==========
def main():
import signal
server = AGVCommandServer(port=5002)
# 启动 Flask 视频流服务(端口 5003)
arm_server_http = make_server("0.0.0.0", 5003, arm_video_app, threaded=True)
http_thread = threading.Thread(target=arm_server_http.serve_forever, daemon=True)
http_thread.start()
logger.info("机械臂视频流服务已启动: http://0.0.0.0:5003")
def signal_handler(sig, frame):
logger.info("收到停止信号...")
global _ffmpeg_proc
if _ffmpeg_proc:
_ffmpeg_proc.terminate()
server.stop()
arm_server_http.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
server.start()
if __name__ == "__main__":
main()