Files
2026-06-19 18:10:43 +08:00

407 lines
14 KiB
Python

"""
机械臂服务端 - 机械臂端主程序
运行在 10.247.46.165 上,端口 5002 (TCP) + 5003 (视频流)
通过 TCP Socket 接收 AGV 发来的指令,转发给 RoboFlow (ElephantRobot)
同时通过 ffmpeg 提供 HTTP 视频流
"""
import socket
import threading
import time
import logging
import os
import sys
import subprocess
import io
from PIL import Image
from flask import Flask, Response, jsonify
from werkzeug.serving import make_server
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.expanduser("~/work/arm_server/server.log"))
]
)
logger = logging.getLogger("arm_server")
# ========== Flask HTTP 服务器 - 视频流 (ffmpeg) ==========
arm_video_app = Flask(__name__)
ARM_CAMERA_INDEX = 0 # 机械臂端摄像头设备号
_ffmpeg_proc = None
_ffmpeg_thread = None
_ffmpeg_lock = threading.Lock()
_frame_cond = threading.Condition()
_latest_frame = None
_latest_frame_ts = 0.0
_stop_ffmpeg_reader = threading.Event()
_invalid_count = 0 # 连续无效帧计数
_MAX_INVALID = 30 # 连续 30 帧无效 → 重启 ffmpeg
_MAX_BUF_SIZE = 2 * 1024 * 1024 # 2MB buffer 上限
def _validate_jpeg(data):
"""验证 JPEG 数据是否有效,返回 True/False"""
try:
Image.open(io.BytesIO(data)).verify()
return True
except Exception:
return False
def _stop_ffmpeg():
"""停止 ffmpeg 采集进程和读帧线程。"""
global _ffmpeg_proc
_stop_ffmpeg_reader.set()
if _ffmpeg_proc and _ffmpeg_proc.poll() is None:
_ffmpeg_proc.terminate()
try:
_ffmpeg_proc.wait(timeout=2)
except subprocess.TimeoutExpired:
_ffmpeg_proc.kill()
_ffmpeg_proc = None
def _frame_reader():
"""从 ffmpeg 的连续 MJPEG 输出中解析 JPEG 帧,校验有效性并缓存最新一帧。
当摄像头 USB 掉线重连时,ffmpeg 会从失效 fd 读取垃圾数据,
产生假 JPEG 帧(花屏)。这里通过 PIL 校验帧有效性,
连续无效帧过多时自动重启 ffmpeg 恢复。
"""
global _ffmpeg_proc, _latest_frame, _latest_frame_ts, _invalid_count
buf = b""
while not _stop_ffmpeg_reader.is_set():
proc = _ffmpeg_proc
if proc is None or proc.poll() is not None or proc.stdout is None:
time.sleep(0.1)
continue
chunk = proc.stdout.read(8192)
if not chunk:
if proc.poll() is not None:
break
time.sleep(0.02)
continue
buf += chunk
# 防止垃圾数据撑爆内存
if len(buf) > _MAX_BUF_SIZE:
logger.warning(f"frame buffer 超过 {_MAX_BUF_SIZE} 字节,丢弃并重启 ffmpeg")
buf = b""
_stop_ffmpeg()
continue
while True:
start = buf.find(b"\xff\xd8")
end = buf.find(b"\xff\xd9", start + 2) if start >= 0 else -1
if start < 0:
buf = buf[-2:]
break
if end < 0:
buf = buf[start:]
break
frame = buf[start:end + 2]
buf = buf[end + 2:]
# JPEG 校验:摄像头掉线时帧数据会损坏
if _validate_jpeg(frame):
with _frame_cond:
_latest_frame = frame
_latest_frame_ts = time.time()
_frame_cond.notify_all()
_invalid_count = 0
else:
_invalid_count += 1
if _invalid_count >= _MAX_INVALID:
logger.error(f"连续 {_MAX_INVALID} 帧无效,摄像头可能掉线,重启 ffmpeg")
_stop_ffmpeg()
_invalid_count = 0
break # 跳出循环让 _ensure_ffmpeg 重建
def _ensure_ffmpeg():
"""确保 ffmpeg 进程在运行,自动重启崩溃的进程"""
global _ffmpeg_proc, _ffmpeg_thread, _invalid_count
with _ffmpeg_lock:
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
return
_stop_ffmpeg_reader.set()
if _ffmpeg_proc and _ffmpeg_proc.poll() is None:
_ffmpeg_proc.terminate()
_stop_ffmpeg_reader.clear()
_invalid_count = 0 # 重置错误计数
logger.info(f"启动 ffmpeg 视频流 (Video{ARM_CAMERA_INDEX})")
_ffmpeg_proc = subprocess.Popen(
[
"ffmpeg",
"-f", "v4l2",
"-input_format", "mjpeg",
"-framerate", "8",
"-video_size", "640x480",
"-i", f"/dev/video{ARM_CAMERA_INDEX}",
"-fflags", "nobuffer",
"-analyzeduration", "0",
"-f", "mjpeg",
"-"
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
_ffmpeg_thread = threading.Thread(target=_frame_reader, daemon=True)
_ffmpeg_thread.start()
def _get_latest_frame(timeout: float = 3.0):
"""返回缓存的最新 JPEG 帧;必要时等待首帧。"""
_ensure_ffmpeg()
deadline = time.time() + timeout
with _frame_cond:
while _latest_frame is None and time.time() < deadline:
_frame_cond.wait(timeout=0.2)
return _latest_frame
@arm_video_app.route("/api/camera/preview")
def arm_camera_preview():
"""机械臂摄像头 MJPEG 流,共用后台 ffmpeg 采集进程。"""
_ensure_ffmpeg()
def generate():
last_ts = 0.0
try:
while True:
frame = _get_latest_frame(timeout=3.0)
if frame is None:
logger.warning("等待摄像头帧超时,重启 ffmpeg")
_stop_ffmpeg()
continue
with _frame_cond:
if _latest_frame_ts <= last_ts:
_frame_cond.wait(timeout=1.0)
frame = _latest_frame
last_ts = _latest_frame_ts
if frame:
yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
except Exception as e:
logger.error(f"视频流异常: {e}")
finally:
logger.info("视频流连接关闭")
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
@arm_video_app.route("/api/camera/status")
def arm_camera_status():
"""摄像头状态"""
global _ffmpeg_proc
running = _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None
return jsonify({
"opened": running,
"frame_age": time.time() - _latest_frame_ts if _latest_frame_ts else None,
"invalid_count": _invalid_count
})
@arm_video_app.route("/api/camera/restart", methods=["POST"])
def arm_camera_restart():
"""重启视频流"""
global _latest_frame, _latest_frame_ts, _invalid_count
_stop_ffmpeg()
with _frame_cond:
_latest_frame = None
_latest_frame_ts = 0.0
_invalid_count = 0
_ensure_ffmpeg()
return jsonify({"ok": True})
@arm_video_app.route("/api/camera/snapshot")
def arm_camera_snapshot():
"""机械臂摄像头单帧 JPEG,从常驻视频流缓存读取最新帧。"""
frame = _get_latest_frame(timeout=3.0)
if frame:
r = Response(frame, mimetype="image/jpeg")
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, max-age=0"
r.headers["Pragma"] = "no-cache"
r.headers["Expires"] = "0"
return r
logger.warning("snapshot failed: no cached frame")
return "", 500
# ========== TCP 服务器 - 接收 AGV 指令 ==========
class AGVCommandServer:
"""TCP 服务器,接收 AGV 发来的指令,通过 ElephantRobot 转发给 RoboFlow"""
def __init__(self, elephant, host: str = "0.0.0.0", port: int = 5002):
self.host = host
self.port = port
self._sock: socket.socket = None
self._running = False
# 直接从外部注入已激活的 ElephantRobot 实例
if elephant is None:
logger.warning("ElephantRobot 实例为空,命令将返回错误")
self._el = elephant
def start(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind((self.host, self.port))
self._sock.listen(5)
self._running = True
logger.info(f"=" * 50)
logger.info(f"机械臂服务端已启动,监听 {self.host}:{self.port}")
logger.info(f"等待 AGV 连接...")
logger.info(f"=" * 50)
while self._running:
try:
self._sock.settimeout(1.0)
try:
client_sock, addr = self._sock.accept()
logger.info(f"AGV 已连接: {addr}")
threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True).start()
except socket.timeout:
continue
except Exception as e:
if self._running:
logger.error(f"服务器异常: {e}")
break
def _handle_client(self, client_sock: socket.socket):
try:
client_sock.settimeout(30)
buffer = ""
while self._running:
try:
data = client_sock.recv(4096)
if not data:
break
buffer += data.decode("utf-8")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
response = self._execute_command(line)
client_sock.sendall((response + "\n").encode("utf-8"))
logger.info(f"CMD: {line}{response}")
except socket.timeout:
continue
except Exception as e:
logger.error(f"客户端处理异常: {e}")
finally:
client_sock.close()
logger.info("AGV 客户端已断开")
def _execute_command(self, cmd: str) -> str:
"""通过 ElephantRobot.send_command 转发给 RoboFlow"""
if self._el is None:
return "ERROR: Robot not initialized"
try:
return self._el.send_command(cmd)
except Exception as e:
return f"ERROR: {e}"
def stop(self):
self._running = False
if self._sock:
try:
self._sock.close()
except:
pass
logger.info("机械臂服务端已停止")
# ========== 入口 ==========
_elephant = None # 全局 ElephantRobot 实例
def power_on_arm(max_retries: int = 5) -> bool:
"""通过 ElephantRobot 给机械臂上电并激活(带重试),返回 ElephantRobot 实例"""
global _elephant
from pymycobot import ElephantRobot
for attempt in range(1, max_retries + 1):
try:
logger.info(f"正在通过 ElephantRobot 连接 RoboFlow (尝试 {attempt}/{max_retries})...")
el = ElephantRobot("127.0.0.1", 5001)
el.start_client()
logger.info("ElephantRobot start_client 成功,等待2秒...")
time.sleep(2)
el._power_on()
logger.info("power_on 指令已发送,等待2秒...")
time.sleep(2)
el.start_robot()
logger.info("start_robot 指令已发送,等待5秒...")
time.sleep(5)
logger.info("✅ 机械臂上电+激活 全部完成")
# 保存到全局,确保后续复用
_elephant = el
return True
except Exception as e:
logger.warning(f"⚠️ 第 {attempt} 次尝试失败: {e}")
if attempt < max_retries:
logger.info(f"等待 3 秒后重试...")
time.sleep(3)
else:
logger.error(f"❌ 所有 {max_retries} 次尝试均失败,将以 limited 模式运行")
return False
return False
def main():
import signal
# 先通过 ElephantRobot 给机械臂上电并激活
power_on_arm()
# 将全局 _elephant 传给指令服务器
server = AGVCommandServer(_elephant, port=5002)
# 启动 Flask 视频流服务(端口 5003)
arm_server_http = None
for attempt in range(5):
try:
arm_server_http = make_server("0.0.0.0", 5003, arm_video_app, threaded=True)
break
except OSError as e:
if attempt < 4 and "Address already in use" in str(e):
logger.warning(f"端口 5003 被占用(第{attempt+1}次),等待...")
time.sleep(3)
else:
raise
http_thread = threading.Thread(target=arm_server_http.serve_forever, daemon=True)
http_thread.start()
logger.info("机械臂视频流服务已启动: http://0.0.0.0:5003")
def signal_handler(sig, frame):
logger.info("收到停止信号...")
global _ffmpeg_proc, _elephant
if _ffmpeg_proc:
_ffmpeg_proc.terminate()
server.stop()
arm_server_http.shutdown()
if _elephant:
try:
_elephant.stop_client()
except:
pass
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
server.start()
if __name__ == "__main__":
main()