Files
smart-inspection/scan_fixer/fix_scan_timestamp_v6.py
T
FaulknerWu 1429442dbd Rename customs-tablet-frontend to public-frontend and add new features
- Rename customs-tablet-frontend/ to public-frontend/ for broader scope
- Add new pages: customs, inspection with camera integration
- Add new services: apiClient.ts, backendApi.ts, normalizers.ts
- Add CameraFrame component for real-time video streaming
- Add scan_fixer module with clock_publisher and timestamp fix utilities
- Update startup scripts to support new frontend structure
- Update arm_server configuration and service files

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 10:18:20 +08:00

86 lines
2.8 KiB
Python

#!/usr/bin/env python3
"""激光雷达时间戳动态修正器。
订阅 /scan,将 LaserScan 的 header.stamp 替换为当前 ROS 时间后发布到
/scan_corrected,保证 AMCL/Costmap 收到的雷达时间戳和 TF 时间同步。
"""
import os
import sys
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
LOCKFILE = "/tmp/scan_fixer.lock"
LOG_INTERVAL_COUNT = 200
def ensure_single_instance(lockfile: str) -> None:
if os.path.exists(lockfile):
with open(lockfile) as f:
old_pid = int(f.read().strip())
try:
os.kill(old_pid, 0)
print(f"Another fixer running PID {old_pid}, exit.", file=sys.stderr)
sys.exit(1)
except (OSError, ProcessLookupError):
print(f"Stale lock removed (PID {old_pid} dead)", file=sys.stderr)
with open(lockfile, "w") as f:
f.write(str(os.getpid()))
def copy_scan_with_current_time(source_scan: LaserScan, node: Node) -> LaserScan:
corrected_scan = LaserScan()
corrected_scan.header.frame_id = source_scan.header.frame_id
corrected_scan.header.stamp = node.get_clock().now().to_msg()
corrected_scan.angle_min = source_scan.angle_min
corrected_scan.angle_max = source_scan.angle_max
corrected_scan.angle_increment = source_scan.angle_increment
corrected_scan.time_increment = source_scan.time_increment
corrected_scan.scan_time = source_scan.scan_time
corrected_scan.range_min = source_scan.range_min
corrected_scan.range_max = source_scan.range_max
corrected_scan.ranges = source_scan.ranges
corrected_scan.intensities = source_scan.intensities
return corrected_scan
def main():
ensure_single_instance(LOCKFILE)
rclpy.init(args=sys.argv[1:])
node = Node("scan_timestamp_fixer")
publisher = node.create_publisher(LaserScan, "/scan_corrected", 10)
count = 0
first_timestamp = None
def on_scan(scan: LaserScan):
nonlocal count, first_timestamp
count += 1
if first_timestamp is None:
first_timestamp = scan.header.stamp.sec
node.get_logger().info(f"First /scan timestamp: {first_timestamp}")
publisher.publish(copy_scan_with_current_time(scan, node))
if count % LOG_INTERVAL_COUNT == 0:
node.get_logger().info(f"#{count} republished with current time")
node.create_subscription(LaserScan, "/scan", on_scan, 10)
node.get_logger().info(f"Fixer v6 PID={os.getpid()}, using current system time")
try:
while rclpy.ok():
rclpy.spin_once(node, timeout_sec=0.1)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if os.path.exists(LOCKFILE):
os.remove(LOCKFILE)
if __name__ == "__main__":
main()