Update project structure
This commit is contained in:
@@ -1,161 +0,0 @@
|
||||
"""
|
||||
AGV 导航控制模块 - 通过 pymycobot 控制 AGV 运动
|
||||
"""
|
||||
import time
|
||||
import logging
|
||||
from typing import Tuple, Optional, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 尝试导入 pymycobot
|
||||
try:
|
||||
from pymycobot import MyAGVPro
|
||||
MYCOBOT_AVAILABLE = True
|
||||
except ImportError:
|
||||
MYCOBOT_AVAILABLE = False
|
||||
logger.warning("pymycobot 未安装,AGV 控制功能不可用")
|
||||
|
||||
|
||||
class AGVController:
|
||||
"""AGV 运动控制"""
|
||||
|
||||
def __init__(self, device: str = "/dev/agvpro_controller", baudrate: int = 1000000):
|
||||
self.device = device
|
||||
self.baudrate = baudrate
|
||||
self._agv: Optional[MyAGVPro] = None
|
||||
self._connected = False
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""连接 AGV"""
|
||||
if not MYCOBOT_AVAILABLE:
|
||||
logger.error("pymycobot 不可用")
|
||||
return False
|
||||
try:
|
||||
self._agv = MyAGVPro(self.device, self.baudrate, debug=False)
|
||||
# 检查是否上电
|
||||
if self._agv.is_power_on():
|
||||
self._connected = True
|
||||
logger.info("AGV 连接成功")
|
||||
return True
|
||||
else:
|
||||
logger.warning("AGV 未上电,尝试上电...")
|
||||
self._agv.power_on()
|
||||
time.sleep(2)
|
||||
if self._agv.is_power_on():
|
||||
self._connected = True
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"AGV 连接失败: {e}")
|
||||
return False
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected and self._agv is not None
|
||||
|
||||
def move_forward(self, speed: float = 0.5, duration: float = None):
|
||||
"""前进"""
|
||||
if not self.is_connected():
|
||||
return
|
||||
self._agv.move_forward(speed)
|
||||
if duration:
|
||||
time.sleep(duration)
|
||||
self.stop()
|
||||
|
||||
def move_backward(self, speed: float = 0.5, duration: float = None):
|
||||
"""后退"""
|
||||
if not self.is_connected():
|
||||
return
|
||||
self._agv.move_backward(speed)
|
||||
if duration:
|
||||
time.sleep(duration)
|
||||
self.stop()
|
||||
|
||||
def turn_left(self, speed: float = 0.5, duration: float = None):
|
||||
"""左转"""
|
||||
if not self.is_connected():
|
||||
return
|
||||
self._agv.turn_left(speed)
|
||||
if duration:
|
||||
time.sleep(duration)
|
||||
self.stop()
|
||||
|
||||
def turn_right(self, speed: float = 0.5, duration: float = None):
|
||||
"""右转"""
|
||||
if not self.is_connected():
|
||||
return
|
||||
self._agv.turn_right(speed)
|
||||
if duration:
|
||||
time.sleep(duration)
|
||||
self.stop()
|
||||
|
||||
def move_left_lateral(self, speed: float = 0.5, duration: float = None):
|
||||
"""向左横向移动"""
|
||||
if not self.is_connected():
|
||||
return
|
||||
self._agv.move_left_lateral(speed)
|
||||
if duration:
|
||||
time.sleep(duration)
|
||||
self.stop()
|
||||
|
||||
def move_right_lateral(self, speed: float = 0.5, duration: float = None):
|
||||
"""向右横向移动"""
|
||||
if not self.is_connected():
|
||||
return
|
||||
self._agv.move_right_lateral(speed)
|
||||
if duration:
|
||||
time.sleep(duration)
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
"""停止"""
|
||||
if self.is_connected():
|
||||
self._agv.stop()
|
||||
|
||||
def get_position(self) -> Optional[List[float]]:
|
||||
"""获取 AGV 当前位置 [x, y, rz]"""
|
||||
if not self.is_connected():
|
||||
return None
|
||||
try:
|
||||
# 启用自动报告以获取位置
|
||||
self._agv.set_auto_report_state(1)
|
||||
time.sleep(0.5)
|
||||
msg = self._agv.get_auto_report_message()
|
||||
if msg and len(msg) >= 3:
|
||||
return [msg[0], msg[1], msg[2]]
|
||||
except Exception as e:
|
||||
logger.error(f"获取 AGV 位置失败: {e}")
|
||||
return None
|
||||
|
||||
def go_to_point(self, x: float, y: float, rz: float = None, speed: float = 0.5) -> bool:
|
||||
"""移动到目标点(简单的方向控制实现)"""
|
||||
# 注意:AGV Pro 的 pymycobot 没有直接 goto API
|
||||
# 需要 ROS2 SLAM 导航支持,此处提供基础运动接口
|
||||
# 实际导航需要结合地图和路径规划
|
||||
logger.warning("go_to_point 需要 ROS2 导航支持,当前仅记录目标")
|
||||
return True
|
||||
|
||||
def get_battery(self) -> Optional[float]:
|
||||
"""获取电池电压"""
|
||||
if not self.is_connected():
|
||||
return None
|
||||
try:
|
||||
self._agv.set_auto_report_state(1)
|
||||
msg = self._agv.get_auto_report_message()
|
||||
if msg and len(msg) > 5:
|
||||
return msg[5] # 电池电压
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
def disconnect(self):
|
||||
if self._agv:
|
||||
self.stop()
|
||||
self._agv = None
|
||||
self._connected = False
|
||||
|
||||
def __enter__(self):
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.disconnect()
|
||||
@@ -1,92 +0,0 @@
|
||||
"""
|
||||
配置文件 - 所有可配置参数集中管理
|
||||
"""
|
||||
import os
|
||||
|
||||
# 基础路径(部署后对应 ~/work/agv_app)
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# ========== 网络配置(集中管理所有 IP 地址 — 修改此处即可全局生效)==========
|
||||
AGV_HOST = "192.168.60.80"
|
||||
ARM_HOST = "192.168.60.120"
|
||||
|
||||
# ========== AGV 参数 ==========
|
||||
AGV_CONFIG = {
|
||||
"device": "/dev/agvpro_controller",
|
||||
"baudrate": 10000000,
|
||||
"move_speed": 1.0,
|
||||
"turn_speed": 1.0,
|
||||
}
|
||||
|
||||
# ========== 机械臂 TCP 客户端 ==========
|
||||
ARM_CONFIG = {
|
||||
"host": ARM_HOST,
|
||||
"port": 5002,
|
||||
"timeout": 8,
|
||||
"retry_times": 3,
|
||||
"retry_interval": 1,
|
||||
}
|
||||
|
||||
# ========== 地图 ==========
|
||||
MAP_CONFIG = {
|
||||
"map_dir": "/home/elephant/agv_pro_ros2/src/agv_pro_navigation2/map/",
|
||||
"map_file": "map.yaml",
|
||||
}
|
||||
|
||||
# ========== 摄像头 ==========
|
||||
CAMERA_CONFIG = {
|
||||
"device_index": 4, # AGV 摄像头 video4(标准彩色摄像头,V4L2后端)
|
||||
"backend": "v4l2", # 使用 V4L2 后端获取标准彩色格式(640x480)
|
||||
"qr_detect_interval": 0.5,
|
||||
"capture_delay": 0.5,
|
||||
}
|
||||
|
||||
# ========== 机械臂摄像头流 ==========
|
||||
ARM_CAMERA_CONFIG = {
|
||||
"url": f"http://{ARM_HOST}:5003/api/camera/preview",
|
||||
"snapshot_url": f"http://{ARM_HOST}:5003/api/camera/snapshot",
|
||||
}
|
||||
|
||||
# ========== HTTP 上传 ==========
|
||||
UPLOAD_CONFIG = {
|
||||
"url": "https://ts.zhijian168.com/prod-api/file/uploadImage",
|
||||
"timeout": 30,
|
||||
"max_retries": 3,
|
||||
}
|
||||
|
||||
# ========== Flask 服务器 ==========
|
||||
SERVER_CONFIG = {
|
||||
"host": "0.0.0.0",
|
||||
"port": 5000,
|
||||
"secret_key": "agv630_secret_key_2024",
|
||||
"debug": False,
|
||||
}
|
||||
|
||||
# ========== 任务配置存储路径 ==========
|
||||
DATA_DIR = os.path.join(BASE_DIR, "data")
|
||||
os.makedirs(DATA_DIR, exist_ok=True)
|
||||
|
||||
# ========== 关节角度范围限制 ==========
|
||||
JOINT_LIMITS = {
|
||||
"J1": (-180.0, 180.0),
|
||||
"J2": (-270.0, 90.0),
|
||||
"J3": (-150.0, 150.0),
|
||||
"J4": (-260.0, 80.0),
|
||||
"J5": (-168.0, 168.0),
|
||||
"J6": (-174.0, 174.0),
|
||||
}
|
||||
|
||||
# ========== 机械臂默认速度 ==========
|
||||
DEFAULT_ARM_SPEED = 1000
|
||||
|
||||
# ========== 状态定义 ==========
|
||||
class State:
|
||||
SETTING = "setting"
|
||||
RUNNING = "running"
|
||||
PAUSED = "paused"
|
||||
IDLE = "idle"
|
||||
|
||||
class PhotoType:
|
||||
FRONT = "front"
|
||||
BACK = "back"
|
||||
NAMEPLATE = "nameplate"
|
||||
@@ -1,663 +0,0 @@
|
||||
"""
|
||||
地图导航模块 - A* 路径规划 + Pure Pursuit 路径跟踪
|
||||
在已知地图上规划路径,控制 AGV 自动导航到目标坐标
|
||||
|
||||
依赖:numpy, cv2, Pillow(均已安装在 AGV 上)
|
||||
不依赖:激光雷达、SLAM、Nav2
|
||||
"""
|
||||
|
||||
import os
|
||||
import math
|
||||
import heapq
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
import subprocess
|
||||
import numpy as np
|
||||
import cv2
|
||||
import yaml
|
||||
from typing import List, Tuple, Optional, Dict
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ROS2 环境设置(与 agv_controller_ros2.py 保持一致)
|
||||
ROS2_SETUP_CMD = "export ROS_DOMAIN_ID=1 && source /opt/ros/humble/setup.bash && source ~/agv_pro_ros2/install/setup.bash"
|
||||
|
||||
|
||||
# ========== 坐标转换 ==========
|
||||
|
||||
class CoordTransformer:
|
||||
"""地图世界坐标 ↔ 栅格坐标 双向转换"""
|
||||
|
||||
def __init__(self, resolution: float, origin: List[float], width: int, height: int):
|
||||
"""
|
||||
Args:
|
||||
resolution: 地图分辨率(米/像素)
|
||||
origin: [x, y, yaw] 地图原点在世界坐标系中的位置
|
||||
width: 地图宽度(像素)
|
||||
height: 地图高度(像素)
|
||||
"""
|
||||
self.resolution = resolution
|
||||
self.origin = origin # [ox, oy, oyaw]
|
||||
self.width = width
|
||||
self.height = height
|
||||
|
||||
def world_to_grid(self, wx: float, wy: float) -> Tuple[int, int]:
|
||||
"""世界坐标 → 栅格坐标 [col, row]"""
|
||||
col = int((wx - self.origin[0]) / self.resolution)
|
||||
row = int((wy - self.origin[1]) / self.resolution)
|
||||
# ROS 地图 row=0 对应图像最上方(y 最大值),需要翻转
|
||||
row = self.height - 1 - row
|
||||
return (col, row)
|
||||
|
||||
def grid_to_world(self, col: int, row: int) -> Tuple[float, float]:
|
||||
"""栅格坐标 [col, row] → 世界坐标 [x, y]"""
|
||||
# 翻转 row
|
||||
actual_row = self.height - 1 - row
|
||||
wx = col * self.resolution + self.origin[0]
|
||||
wy = actual_row * self.resolution + self.origin[1]
|
||||
return (wx, wy)
|
||||
|
||||
def world_to_grid_center(self, wx: float, wy: float) -> Tuple[float, float]:
|
||||
"""世界坐标 → 栅格中心的世界坐标(对齐到栅格)"""
|
||||
col, row = self.world_to_grid(wx, wy)
|
||||
return self.grid_to_world(col, row)
|
||||
|
||||
|
||||
# ========== A* 路径规划 ==========
|
||||
|
||||
class AStarPlanner:
|
||||
"""A* 路径规划器,在栅格地图上规划最短路径"""
|
||||
|
||||
# 8方向移动:右、左、下、上、右下、右上、左下、左上
|
||||
DIRECTIONS = [
|
||||
(1, 0), (-1, 0), (0, 1), (0, -1),
|
||||
(1, 1), (1, -1), (-1, 1), (-1, -1)
|
||||
]
|
||||
# 对角线移动的代价乘数(sqrt(2))
|
||||
DIR_COSTS = [1.0, 1.0, 1.0, 1.0, 1.414, 1.414, 1.414, 1.414]
|
||||
|
||||
def __init__(self, occupancy_grid: np.ndarray, inflation_radius: int = 3):
|
||||
"""
|
||||
Args:
|
||||
occupancy_grid: 栅格地图,0=空闲,255=障碍物
|
||||
inflation_radius: 障碍物膨胀半径(像素),AGV 有一定体积不能贴墙走
|
||||
"""
|
||||
self.grid = occupancy_grid
|
||||
self.height, self.width = occupancy_grid.shape
|
||||
self.inflated = self._inflate(inflation_radius)
|
||||
|
||||
def _inflate(self, radius: int) -> np.ndarray:
|
||||
"""膨胀障碍物区域"""
|
||||
if radius <= 0:
|
||||
return self.grid.copy()
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * radius + 1, 2 * radius + 1))
|
||||
inflated = cv2.dilate(self.grid, kernel, iterations=1)
|
||||
# 确保二值化
|
||||
inflated = np.where(inflated > 50, 255, 0).astype(np.uint8)
|
||||
return inflated
|
||||
|
||||
def plan(self, start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]:
|
||||
"""
|
||||
A* 路径规划
|
||||
|
||||
Args:
|
||||
start: 起点栅格坐标 (col, row)
|
||||
goal: 终点栅格坐标 (col, row)
|
||||
|
||||
Returns:
|
||||
路径点列表 [(col, row), ...],包含起点和终点;无法规划时返回 None
|
||||
"""
|
||||
# 边界检查
|
||||
if not self._is_valid(start) or not self._is_valid(goal):
|
||||
logger.warning(f"起点或终点无效: start={start}, goal={goal}")
|
||||
# 尝试找最近的可行点
|
||||
start = self._find_nearest_free(start)
|
||||
goal = self._find_nearest_free(goal)
|
||||
if start is None or goal is None:
|
||||
logger.error("无法找到有效的起点或终点")
|
||||
return None
|
||||
|
||||
# 检查终点是否被障碍物包围
|
||||
if self.inflated[goal[1], goal[0]] > 50:
|
||||
goal = self._find_nearest_free(goal)
|
||||
|
||||
if goal is None:
|
||||
logger.error("终点周围无可行区域")
|
||||
return None
|
||||
|
||||
# A* 算法
|
||||
open_set = []
|
||||
heapq.heappush(open_set, (0.0, start))
|
||||
came_from = {}
|
||||
g_score = {start: 0.0}
|
||||
closed_set = set()
|
||||
|
||||
while open_set:
|
||||
_, current = heapq.heappop(open_set)
|
||||
|
||||
if current in closed_set:
|
||||
continue
|
||||
closed_set.add(current)
|
||||
|
||||
if current == goal:
|
||||
# 回溯路径
|
||||
path = []
|
||||
while current in came_from:
|
||||
path.append(current)
|
||||
current = came_from[current]
|
||||
path.append(start)
|
||||
path.reverse()
|
||||
return path
|
||||
|
||||
for i, (dx, dy) in enumerate(self.DIRECTIONS):
|
||||
neighbor = (current[0] + dx, current[1] + dy)
|
||||
|
||||
if neighbor in closed_set:
|
||||
continue
|
||||
|
||||
if not self._is_valid(neighbor):
|
||||
continue
|
||||
|
||||
if self.inflated[neighbor[1], neighbor[0]] > 50:
|
||||
continue
|
||||
|
||||
move_cost = self.DIR_COSTS[i]
|
||||
tentative_g = g_score[current] + move_cost
|
||||
|
||||
if tentative_g < g_score.get(neighbor, float('inf')):
|
||||
came_from[neighbor] = current
|
||||
g_score[neighbor] = tentative_g
|
||||
f_score = tentative_g + self._heuristic(neighbor, goal)
|
||||
heapq.heappush(open_set, (f_score, neighbor))
|
||||
|
||||
logger.warning("A* 无法找到路径")
|
||||
return None
|
||||
|
||||
def _heuristic(self, a: Tuple[int, int], b: Tuple[int, int]) -> float:
|
||||
"""对角线距离启发式"""
|
||||
dx = abs(a[0] - b[0])
|
||||
dy = abs(a[1] - b[1])
|
||||
return max(dx, dy) + (1.414 - 1) * min(dx, dy)
|
||||
|
||||
def _is_valid(self, pos: Tuple[int, int]) -> bool:
|
||||
return 0 <= pos[0] < self.width and 0 <= pos[1] < self.height
|
||||
|
||||
def _find_nearest_free(self, pos: Tuple[int, int], max_dist: int = 10) -> Optional[Tuple[int, int]]:
|
||||
"""在 pos 附近找最近的可行点"""
|
||||
for r in range(1, max_dist + 1):
|
||||
for dx in range(-r, r + 1):
|
||||
for dy in range(-r, r + 1):
|
||||
n = (pos[0] + dx, pos[1] + dy)
|
||||
if self._is_valid(n) and self.inflated[n[1], n[0]] == 0:
|
||||
return n
|
||||
return None
|
||||
|
||||
|
||||
# ========== 路径平滑 ==========
|
||||
|
||||
def smooth_path(grid: np.ndarray, path: List[Tuple[int, int]],
|
||||
weight_data: float = 0.3, weight_smooth: float = 0.5,
|
||||
tolerance: float = 1e-5, max_iter: int = 500) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
路径平滑(梯度下降法)
|
||||
在障碍物约束下让路径更平滑,减少不必要的转向
|
||||
"""
|
||||
if len(path) <= 2:
|
||||
return path
|
||||
|
||||
height, width = grid.shape
|
||||
new_path = [list(p) for p in path]
|
||||
|
||||
for iteration in range(max_iter):
|
||||
change = 0.0
|
||||
for i in range(1, len(new_path) - 1):
|
||||
for j in range(2):
|
||||
old_val = new_path[i][j]
|
||||
# 数据项:趋向原始路径点
|
||||
data_gradient = weight_data * (path[i][j] - new_path[i][j])
|
||||
# 平滑项:趋向邻居中点
|
||||
smooth_gradient = weight_smooth * (
|
||||
new_path[i - 1][j] + new_path[i + 1][j] - 2 * new_path[i][j]
|
||||
)
|
||||
new_path[i][j] += data_gradient + smooth_gradient
|
||||
|
||||
# 边界约束
|
||||
new_path[i][0] = max(0, min(width - 1, new_path[i][0]))
|
||||
new_path[i][1] = max(0, min(height - 1, new_path[i][1]))
|
||||
|
||||
# 障碍物约束
|
||||
col, row = int(round(new_path[i][0])), int(round(new_path[i][1]))
|
||||
if 0 <= col < width and 0 <= row < height:
|
||||
if grid[row, col] > 50:
|
||||
new_path[i][j] = old_val # 回退
|
||||
|
||||
change += abs(new_path[i][j] - old_val)
|
||||
|
||||
if change < tolerance:
|
||||
break
|
||||
|
||||
return [(int(round(p[0])), int(round(p[1]))) for p in new_path]
|
||||
|
||||
|
||||
# ========== 路径降采样 ==========
|
||||
|
||||
def downsample_path(path: List[Tuple[int, int]], min_dist: int = 3) -> List[Tuple[int, int]]:
|
||||
"""降采样路径,移除过近的点,减少 cmd_vel 发布频率"""
|
||||
if len(path) <= 2:
|
||||
return path
|
||||
|
||||
result = [path[0]]
|
||||
for p in path[1:]:
|
||||
last = result[-1]
|
||||
dist = math.hypot(p[0] - last[0], p[1] - last[1])
|
||||
if dist >= min_dist:
|
||||
result.append(p)
|
||||
# 确保终点包含在内
|
||||
if result[-1] != path[-1]:
|
||||
result.append(path[-1])
|
||||
return result
|
||||
|
||||
|
||||
# ========== Pure Pursuit 控制器 ==========
|
||||
|
||||
class PurePursuitController:
|
||||
"""Pure Pursuit 路径跟踪控制器"""
|
||||
|
||||
def __init__(self, lookahead_distance: float = 0.3,
|
||||
max_linear_speed: float = 0.4,
|
||||
max_angular_speed: float = 0.8,
|
||||
goal_tolerance: float = 0.15,
|
||||
slow_down_distance: float = 0.5):
|
||||
"""
|
||||
Args:
|
||||
lookahead_distance: 前视距离(米),越大转弯越平缓
|
||||
max_linear_speed: 最大线速度 (m/s)
|
||||
max_angular_speed: 最大角速度 (rad/s)
|
||||
goal_tolerance: 到达目标容差(米)
|
||||
slow_down_distance: 开始减速的距离(米)
|
||||
"""
|
||||
self.lookahead_distance = lookahead_distance
|
||||
self.max_linear_speed = max_linear_speed
|
||||
self.max_angular_speed = max_angular_speed
|
||||
self.goal_tolerance = goal_tolerance
|
||||
self.slow_down_distance = slow_down_distance
|
||||
self.transformer: Optional[CoordTransformer] = None
|
||||
|
||||
def set_transformer(self, transformer: CoordTransformer):
|
||||
self.transformer = transformer
|
||||
|
||||
def compute(self, current_pos: Tuple[float, float, float],
|
||||
path_world: List[Tuple[float, float]]) -> Tuple[float, float, bool]:
|
||||
"""
|
||||
计算控制量
|
||||
|
||||
Args:
|
||||
current_pos: (x, y, yaw) 当前世界坐标
|
||||
path_world: 路径点列表 [(x, y), ...] 世界坐标
|
||||
|
||||
Returns:
|
||||
(linear_x, angular_z, reached) 线速度、角速度、是否到达
|
||||
"""
|
||||
if not path_world:
|
||||
return (0.0, 0.0, True)
|
||||
|
||||
x, y, yaw = current_pos
|
||||
|
||||
# 检查是否到达终点
|
||||
goal = path_world[-1]
|
||||
dist_to_goal = math.hypot(goal[0] - x, goal[1] - y)
|
||||
if dist_to_goal < self.goal_tolerance:
|
||||
return (0.0, 0.0, True)
|
||||
|
||||
# 找前视点(lookahead point)
|
||||
lookahead_point = self._find_lookahead_point(x, y, path_world)
|
||||
|
||||
if lookahead_point is None:
|
||||
# 已经越过最后一个点
|
||||
return (0.0, 0.0, True)
|
||||
|
||||
lx, ly = lookahead_point
|
||||
|
||||
# 转换到机器人坐标系
|
||||
dx = lx - x
|
||||
dy = ly - y
|
||||
|
||||
# 旋转到机器人坐标系(x 轴朝前)
|
||||
local_x = dx * math.cos(yaw) + dy * math.sin(yaw)
|
||||
local_y = -dx * math.sin(yaw) + dy * math.cos(yaw)
|
||||
|
||||
# 弧长 = 角度 * 半径 → curvature = 2 * ly / L^2
|
||||
L = math.hypot(local_x, local_y)
|
||||
if L < 1e-6:
|
||||
return (0.0, 0.0, True)
|
||||
|
||||
curvature = 2.0 * local_y / (L * L)
|
||||
angular_z = curvature * self.max_linear_speed
|
||||
|
||||
# 根据距离调整速度
|
||||
linear_x = self.max_linear_speed
|
||||
if dist_to_goal < self.slow_down_distance:
|
||||
ratio = max(0.15, dist_to_goal / self.slow_down_distance)
|
||||
linear_x *= ratio
|
||||
|
||||
# 限制角速度
|
||||
angular_z = max(-self.max_angular_speed, min(self.max_angular_speed, angular_z))
|
||||
|
||||
# 如果角度偏差太大,先原位转弯
|
||||
angle_to_goal = math.atan2(ly - y, lx - x) - yaw
|
||||
angle_to_goal = math.atan2(math.sin(angle_to_goal), math.cos(angle_to_goal))
|
||||
|
||||
if abs(angle_to_goal) > math.pi / 3:
|
||||
# 角度偏差 > 60°,先原位转弯
|
||||
linear_x = 0.0
|
||||
angular_z = max(-self.max_angular_speed, min(self.max_angular_speed, angle_to_goal * 1.5))
|
||||
|
||||
return (linear_x, angular_z, False)
|
||||
|
||||
def _find_lookahead_point(self, x: float, y: float,
|
||||
path: List[Tuple[float, float]]) -> Optional[Tuple[float, float]]:
|
||||
"""沿路径找到前视距离处的点"""
|
||||
for i in range(len(path) - 1, -1, -1):
|
||||
dist = math.hypot(path[i][0] - x, path[i][1] - y)
|
||||
if dist >= self.lookahead_distance:
|
||||
return path[i]
|
||||
# 如果所有点都在前视距离内,返回终点
|
||||
return path[-1] if path else None
|
||||
|
||||
|
||||
# ========== 导航器(核心模块) ==========
|
||||
|
||||
class NavStatus(Enum):
|
||||
IDLE = "idle"
|
||||
PLANNING = "planning"
|
||||
NAVIGATING = "navigating"
|
||||
REACHED = "reached"
|
||||
FAILED = "failed"
|
||||
CANCELLED = "cancelled"
|
||||
|
||||
|
||||
class MapNavigator:
|
||||
"""地图导航器 — 整合路径规划与路径跟踪"""
|
||||
|
||||
def __init__(self, map_yaml_path: str):
|
||||
"""
|
||||
Args:
|
||||
map_yaml_path: map.yaml 文件的绝对路径
|
||||
"""
|
||||
self.map_yaml_path = map_yaml_path
|
||||
self.transformer: Optional[CoordTransformer] = None
|
||||
self.planner: Optional[AStarPlanner] = None
|
||||
self.controller = PurePursuitController()
|
||||
self.controller.set_transformer(self.transformer)
|
||||
|
||||
# 导航状态
|
||||
self.status = NavStatus.IDLE
|
||||
self._nav_thread: Optional[threading.Thread] = None
|
||||
self._cancel_event = threading.Event()
|
||||
|
||||
# 当前路径(世界坐标)
|
||||
self.path_world: List[Tuple[float, float]] = []
|
||||
self.current_position = [0.0, 0.0, 0.0] # [x, y, yaw]
|
||||
|
||||
# 加载地图
|
||||
self._load_map()
|
||||
|
||||
def _load_map(self):
|
||||
"""加载地图 PGM + YAML"""
|
||||
with open(self.map_yaml_path, 'r') as f:
|
||||
meta = yaml.safe_load(f)
|
||||
|
||||
map_dir = os.path.dirname(self.map_yaml_path)
|
||||
pgm_path = os.path.join(map_dir, meta['image'])
|
||||
|
||||
# 读取 PGM 灰度图
|
||||
img = cv2.imread(pgm_path, cv2.IMREAD_GRAYSCALE)
|
||||
if img is None:
|
||||
raise FileNotFoundError(f"无法读取地图文件: {pgm_path}")
|
||||
|
||||
# ROS 地图:0=占用(障碍物),254=空闲,205=未知
|
||||
# 转为二值:空闲=0,障碍物=255
|
||||
self.occupancy = np.where(img <= 50, 255, 0).astype(np.uint8)
|
||||
# 未知区域(205 附近)也视为障碍物
|
||||
self.occupancy = np.where((img > 50) & (img < 250), 255, self.occupancy)
|
||||
|
||||
resolution = meta['resolution']
|
||||
origin = meta.get('origin', [0, 0, 0])
|
||||
height, width = img.shape
|
||||
|
||||
self.transformer = CoordTransformer(resolution, origin, width, height)
|
||||
self.planner = AStarPlanner(self.occupancy, inflation_radius=3)
|
||||
self.controller.set_transformer(self.transformer)
|
||||
|
||||
self._map_meta = meta
|
||||
logger.info(f"地图加载完成: {width}x{height}, 分辨率 {resolution}m, 原点 {origin}")
|
||||
|
||||
def get_odom(self) -> List[float]:
|
||||
"""从 /odom 话题获取当前位置 [x, y, yaw]"""
|
||||
try:
|
||||
cmd = f"timeout 5 ros2 topic echo /odom --once 2>/dev/null"
|
||||
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && {cmd}'"
|
||||
result = subprocess.run(
|
||||
full_cmd, shell=True, capture_output=True, text=True, timeout=6
|
||||
)
|
||||
if result.returncode == 0 and result.stdout:
|
||||
yaml_str = result.stdout.split('---')[0]
|
||||
data = yaml.safe_load(yaml_str)
|
||||
if data:
|
||||
pos = data.get("pose", {}).get("pose", {}).get("position", {})
|
||||
x, y = pos.get("x", 0.0), pos.get("y", 0.0)
|
||||
orient = data.get("pose", {}).get("pose", {}).get("orientation", {})
|
||||
qz, qw = orient.get("z", 0.0), orient.get("w", 1.0)
|
||||
yaw = math.atan2(2.0 * qw * qz, 1.0 - 2.0 * qz * qz)
|
||||
self.current_position = [x, y, yaw]
|
||||
return self.current_position
|
||||
except Exception as e:
|
||||
logger.debug(f"获取 odom 失败: {e}")
|
||||
return self.current_position
|
||||
|
||||
def _publish_cmd_vel(self, linear_x: float, angular_z: float):
|
||||
"""发布速度命令到 /cmd_vel"""
|
||||
msg = (
|
||||
f'{{"linear": {{"x": {linear_x:.4f}, "y": 0.0, "z": 0.0}}, '
|
||||
f'"angular": {{"x": 0.0, "y": 0.0, "z": {angular_z:.4f}}}}}'
|
||||
)
|
||||
full_cmd = f"bash -c '{ROS2_SETUP_CMD} && ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \"{msg}\" --once'"
|
||||
try:
|
||||
subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=3)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("发布 cmd_vel 超时")
|
||||
|
||||
def _stop_cmd_vel(self):
|
||||
"""发布停止命令"""
|
||||
self._publish_cmd_vel(0.0, 0.0)
|
||||
|
||||
def plan_path(self, goal_x: float, goal_y: float,
|
||||
start_x: float = None, start_y: float = None) -> bool:
|
||||
"""
|
||||
规划路径(不执行导航)
|
||||
|
||||
Args:
|
||||
goal_x, goal_y: 目标世界坐标(米)
|
||||
start_x, start_y: 起点世界坐标(米),默认使用当前 odom
|
||||
|
||||
Returns:
|
||||
是否规划成功
|
||||
"""
|
||||
if self.transformer is None:
|
||||
logger.error("地图未加载")
|
||||
return False
|
||||
|
||||
# 获取起点
|
||||
if start_x is None or start_y is None:
|
||||
pos = self.get_odom()
|
||||
start_x, start_y = pos[0], pos[1]
|
||||
|
||||
# 坐标转换
|
||||
start_grid = self.transformer.world_to_grid(start_x, start_y)
|
||||
goal_grid = self.transformer.world_to_grid(goal_x, goal_y)
|
||||
|
||||
logger.info(f"规划路径: 起点(世界){start_x:.2f},{start_y:.2f} → (栅格){start_grid}")
|
||||
logger.info(f" 终点(世界){goal_x:.2f},{goal_y:.2f} → (栅格){goal_grid}")
|
||||
|
||||
# A* 规划
|
||||
path_grid = self.planner.plan(start_grid, goal_grid)
|
||||
if path_grid is None:
|
||||
logger.warning("路径规划失败")
|
||||
return False
|
||||
|
||||
# 路径平滑
|
||||
path_grid = smooth_path(self.planner.inflated, path_grid)
|
||||
|
||||
# 降采样
|
||||
path_grid = downsample_path(path_grid, min_dist=2)
|
||||
|
||||
# 转换为世界坐标
|
||||
self.path_world = [self.transformer.grid_to_world(c, r) for c, r in path_grid]
|
||||
|
||||
logger.info(f"路径规划成功: {len(self.path_world)} 个路径点")
|
||||
return True
|
||||
|
||||
def navigate_to(self, goal_x: float, goal_y, blocking: bool = False) -> bool:
|
||||
"""
|
||||
导航到目标点
|
||||
|
||||
Args:
|
||||
goal_x, goal_y: 目标世界坐标(米)
|
||||
blocking: 是否阻塞等待导航完成
|
||||
|
||||
Returns:
|
||||
非阻塞模式下返回 True(表示已启动),阻塞模式下返回是否到达
|
||||
"""
|
||||
if self.status == NavStatus.NAVIGATING:
|
||||
logger.warning("导航正在进行中,请先停止当前导航")
|
||||
return False
|
||||
|
||||
# 规划路径
|
||||
if not self.plan_path(goal_x, goal_y):
|
||||
self.status = NavStatus.FAILED
|
||||
return False
|
||||
|
||||
# 启动导航线程
|
||||
self._cancel_event.clear()
|
||||
self.status = NavStatus.NAVIGATING
|
||||
self._nav_thread = threading.Thread(
|
||||
target=self._navigate_thread,
|
||||
args=(goal_x, goal_y),
|
||||
daemon=True
|
||||
)
|
||||
self._nav_thread.start()
|
||||
|
||||
if blocking:
|
||||
self._nav_thread.join()
|
||||
return self.status == NavStatus.REACHED
|
||||
|
||||
return True
|
||||
|
||||
def _navigate_thread(self, goal_x: float, goal_y: float):
|
||||
"""导航线程"""
|
||||
logger.info(f"开始导航 → 目标 ({goal_x:.2f}, {goal_y:.2f})")
|
||||
|
||||
try:
|
||||
# 转弯朝向第一个路径点
|
||||
self._initial_turn()
|
||||
|
||||
# 跟踪路径
|
||||
last_cmd_time = time.time()
|
||||
cmd_interval = 0.2 # cmd_vel 发布间隔(秒)
|
||||
|
||||
while not self._cancel_event.is_set():
|
||||
pos = self.get_odom()
|
||||
x, y, yaw = pos
|
||||
|
||||
linear_x, angular_z, reached = self.controller.compute(
|
||||
(x, y, yaw), self.path_world
|
||||
)
|
||||
|
||||
if reached:
|
||||
self._stop_cmd_vel()
|
||||
self.status = NavStatus.REACHED
|
||||
logger.info("✅ 已到达目标点")
|
||||
return
|
||||
|
||||
# 控制发布频率
|
||||
now = time.time()
|
||||
if now - last_cmd_time >= cmd_interval:
|
||||
self._publish_cmd_vel(linear_x, angular_z)
|
||||
last_cmd_time = now
|
||||
|
||||
time.sleep(0.05) # 50ms 控制循环
|
||||
|
||||
# 被取消
|
||||
self._stop_cmd_vel()
|
||||
self.status = NavStatus.CANCELLED
|
||||
logger.info("导航已取消")
|
||||
|
||||
except Exception as e:
|
||||
self._stop_cmd_vel()
|
||||
self.status = NavStatus.FAILED
|
||||
logger.error(f"导航异常: {e}")
|
||||
|
||||
def _initial_turn(self):
|
||||
"""导航开始前,先原地转向朝向第一个路径点"""
|
||||
if len(self.path_world) < 2:
|
||||
return
|
||||
|
||||
pos = self.get_odom()
|
||||
x, y, yaw = pos
|
||||
target = self.path_world[1] # 第一个路径点是当前位置,取第二个
|
||||
|
||||
angle_to_target = math.atan2(target[1] - y, target[0] - x) - yaw
|
||||
angle_to_target = math.atan2(math.sin(angle_to_target), math.cos(angle_to_target))
|
||||
|
||||
if abs(angle_to_target) < 0.1: # < 6°,不需要转弯
|
||||
return
|
||||
|
||||
logger.info(f"初始转向: {math.degrees(angle_to_target):.1f}°")
|
||||
|
||||
# 分段旋转(避免一步到位导致超调)
|
||||
steps = max(3, int(abs(angle_to_target) / 0.2))
|
||||
step_angle = angle_to_target / steps
|
||||
step_time = abs(step_angle) / self.controller.max_angular_speed + 0.1
|
||||
|
||||
for _ in range(steps):
|
||||
if self._cancel_event.is_set():
|
||||
return
|
||||
angular = max(-self.controller.max_angular_speed,
|
||||
min(self.controller.max_angular_speed, step_angle * 2))
|
||||
self._publish_cmd_vel(0.0, angular)
|
||||
time.sleep(step_time)
|
||||
|
||||
self._stop_cmd_vel()
|
||||
time.sleep(0.2) # 稳定后继续
|
||||
|
||||
def stop(self):
|
||||
"""停止当前导航"""
|
||||
if self.status == NavStatus.NAVIGATING:
|
||||
self._cancel_event.set()
|
||||
self._stop_cmd_vel()
|
||||
if self._nav_thread and self._nav_thread.is_alive():
|
||||
self._nav_thread.join(timeout=3)
|
||||
self.status = NavStatus.CANCELLED
|
||||
|
||||
def get_status(self) -> dict:
|
||||
"""获取导航状态"""
|
||||
pos = self.get_odom()
|
||||
return {
|
||||
"status": self.status.value,
|
||||
"current_position": pos,
|
||||
"path_length": len(self.path_world),
|
||||
"path": self.path_world if self.status in (NavStatus.NAVIGATING, NavStatus.REACHED) else []
|
||||
}
|
||||
|
||||
def get_path_preview(self, goal_x: float, goal_y: float) -> Optional[List[Tuple[float, float]]]:
|
||||
"""
|
||||
预览路径(仅规划不执行),用于前端可视化
|
||||
|
||||
Returns:
|
||||
世界坐标路径列表,或 None(规划失败)
|
||||
"""
|
||||
if self.plan_path(goal_x, goal_y):
|
||||
return self.path_world
|
||||
return None
|
||||
Reference in New Issue
Block a user