ROS2 for Physical AI: Building Real-Time Robot Pipelines
ROS2 is infrastructure. It doesn't make your robot smart — it handles the plumbing so you can focus on what does. After building the Unitree G1 teleoperation pipeline at the Intelligence at the Frontier hackathon and working on multi-sensor fusion for edge AI systems, here's what I wish I'd known going in.
Why ROS2 Over ROS1
The architectural differences matter for production:
| | ROS1 | ROS2 |
|---|---|---|
| Middleware | roscore (single point of failure) | DDS (distributed, no master) |
| Real-time | No RT guarantees | Can use RT executor + RT OS |
| Security | None | DDS Security (SROS2) |
| Languages | C++, Python | C++, Python, Rust, more |
| QoS | None | Configurable per topic |
For physical AI specifically, the DDS middleware change is critical. In ROS1, if roscore crashes, your entire robot loses communication. In ROS2, nodes discover each other peer-to-peer — a crashed node doesn't take down the system.
Core Concepts
Nodes — single-purpose processes. One node per sensor, one per controller, one per inference pipeline. Small + focused = debuggable.
Topics — typed pub/sub channels. Nodes publish messages; other nodes subscribe. Decoupled.
Services — synchronous request/response. Use for configuration, not control loops.
Actions — long-running tasks with feedback and cancellation. Use for "move arm to position" type operations.
Parameters — runtime-configurable node settings. Load from YAML, change without recompile.
QoS Profiles
Quality of Service settings are what separate toy ROS2 from production ROS2. Every publisher and subscriber has a QoS profile — mismatched profiles mean silent dropped connections.
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
# For sensor data — fast, lossy OK, don't buffer
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# For state/config — reliable delivery required
reliable_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
For the G1 teleoperation pipeline, camera streams used BEST_EFFORT (drop frames rather than queue and lag) while control commands used RELIABLE (never drop a command silently).
A Real Sensor Fusion Node
Here's a stripped-down IMU + joint state fusion node:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu, JointState
from geometry_msgs.msg import TwistStamped
from message_filters import ApproximateTimeSynchronizer, Subscriber
class FusionNode(Node):
def __init__(self):
super().__init__('fusion_node')
# Declare parameters
self.declare_parameter('sync_slop', 0.02) # 20ms time sync tolerance
slop = self.get_parameter('sync_slop').value
qos = rclpy.qos.qos_profile_sensor_data
# Time-synchronized subscribers
imu_sub = Subscriber(self, Imu, '/imu/data', qos_profile=qos)
joint_sub = Subscriber(self, JointState, '/joint_states', qos_profile=qos)
self.sync = ApproximateTimeSynchronizer(
[imu_sub, joint_sub], queue_size=10, slop=slop
)
self.sync.registerCallback(self.fused_callback)
self.pub = self.create_publisher(TwistStamped, '/robot/fused_state', qos)
self.get_logger().info('Fusion node ready')
def fused_callback(self, imu_msg: Imu, joint_msg: JointState):
out = TwistStamped()
out.header.stamp = self.get_clock().now().to_msg()
# Combine IMU angular velocity with joint velocity estimates
out.twist.angular.x = imu_msg.angular_velocity.x
out.twist.angular.y = imu_msg.angular_velocity.y
out.twist.angular.z = imu_msg.angular_velocity.z
self.pub.publish(out)
def main():
rclpy.init()
node = FusionNode()
rclpy.spin(node)
rclpy.shutdown()
ApproximateTimeSynchronizer is essential when sensors publish at different rates or with jitter — it matches messages within the slop time window rather than requiring exact timestamp matches.
Lifecycle Nodes for Production
Standard nodes have two states: running or crashed. Lifecycle nodes have managed transitions:
Unconfigured → Inactive → Active → Finalized
This matters for hardware: you can configure a node (open device handles, load calibration) before activating it (start streaming data). You can deactivate without destroying (pause streaming, keep device open). Clean shutdown sequences become possible.
from rclpy.lifecycle import LifecycleNode, State, TransitionCallbackReturn
class CameraNode(LifecycleNode):
def on_configure(self, state: State) -> TransitionCallbackReturn:
self.camera = open_camera_device()
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
self.timer = self.create_timer(0.033, self.capture_frame) # 30Hz
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
self.timer.cancel()
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
self.camera.close()
return TransitionCallbackReturn.SUCCESS
Real-Time Execution
ROS2's default executor is not real-time. For control loops with hard timing requirements, use the StaticSingleThreadedExecutor and pin it to an isolated CPU core:
import rclpy
from rclpy.executors import StaticSingleThreadedExecutor
rclpy.init()
executor = StaticSingleThreadedExecutor()
executor.add_node(control_node)
# In production: set thread affinity + SCHED_FIFO before this
executor.spin()
On Linux with PREEMPT_RT kernel:
# Set real-time priority for the executor thread
chrt -f 80 ./control_node_executable
# Isolate CPU core 3 from OS scheduler
# Add to kernel cmdline: isolcpus=3 rcu_nocbs=3
taskset -c 3 ./control_node_executable
For our G1 control pipeline, the low-latency NVIDIA Sonic integration required sub-10ms control loop timing — this required RT kernel + core isolation.
Launch Files
Don't start nodes manually. Use launch files:
# launch/robot.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
return LaunchDescription([
DeclareLaunchArgument('use_sim', default_value='false'),
Node(
package='my_robot',
executable='fusion_node',
name='fusion',
parameters=[{
'sync_slop': 0.02,
'use_sim_time': LaunchConfiguration('use_sim'),
}],
remappings=[('/imu/data', '/hardware/imu/raw')],
),
Node(
package='my_robot',
executable='control_node',
name='controller',
prefix='chrt -f 80', # RT priority
),
])
remappings let you wire topic names at launch without changing code — essential when integrating multiple hardware vendors with different topic naming conventions.
Integration with AI Inference
For physical AI, the inference pipeline is just another node:
class InferenceNode(Node):
def __init__(self):
super().__init__('inference')
# Load TensorRT engine once at startup
self.engine = load_trt_engine('policy.trt')
qos = rclpy.qos.qos_profile_sensor_data
self.sub = self.create_subscription(
Image, '/camera/rgb', self.infer_callback, qos
)
self.pub = self.create_publisher(
JointTrajectory, '/robot/target_joints', 10
)
def infer_callback(self, img_msg):
# Convert ROS image to tensor
frame = ros_image_to_numpy(img_msg)
# Run TensorRT inference
action = self.engine.infer(frame)
# Publish joint trajectory
traj = numpy_to_joint_trajectory(action)
self.pub.publish(traj)
The DDS transport handles the latency between camera capture and joint command publication. With BEST_EFFORT QoS on the camera topic and a fast TensorRT policy, end-to-end latency from pixel to joint command can stay under 20ms on Jetson Orin.
Bag Files for Debugging
ROS2 bag files record all topic data for replay:
# Record everything
ros2 bag record -a -o experiment_01
# Replay at original rate
ros2 bag play experiment_01
# Replay at 0.5x speed for debugging
ros2 bag play --rate 0.5 experiment_01
At the hackathon, bag files were invaluable for diagnosing failure modes in the G1 walking policy — we could replay the exact sensor data that caused a fall and trace through the inference outputs frame by frame.
What ROS2 Doesn't Solve
ROS2 handles communication. It doesn't handle:
- Timing guarantees without RT kernel + proper thread setup
- Safety — no built-in watchdogs or fault containment
- Sensor calibration — still manual, use robot_calibration or your own
- State estimation — use robot_localization or nav2 EKF plugin
Physical AI at the frontier is about closing the loop between perception and action. ROS2 gives you the plumbing to build that loop reliably. What runs inside the loop — the policies, the perception models, the planning — that's where the interesting engineering lives.