Topic Communication and Data Streaming
Dr. Abdulkarim Albanna
Artificial Intelligence
University of Petra
3 hours (Week 3 of 16)
By the end of this week, students will be able to:
A node in ROS2 is a process representing the smallest program unit within the framework. Each node runs in its own independent memory space and is designed for a specific purpose, such as controlling a motor, capturing images from a camera, or performing path planning. Nodes communicate with one another by exchanging messages, facilitating collaborative functionality within the robotic system.
The publish-subscribe (pub-sub) pattern is a messaging paradigm where:
ROS2 supports different communication patterns between nodes. Understanding these patterns is crucial for designing efficient robotic systems.
One publisher sends data to exactly one subscriber.
Use Case: Direct sensor-to-controller communication, dedicated data streams.
One publisher broadcasts data to multiple subscribers.
Use Case: Broadcasting sensor data, system status updates, camera feeds.
Multiple publishers send data to one subscriber (data aggregation).
Use Case: Sensor fusion, multi-robot coordination, distributed data collection.
Multiple publishers and multiple subscribers share the same topic.
Use Case: Multi-robot systems, distributed logging, collaborative tasks.
| Pattern | Publishers | Subscribers | Best Use Case | 
|---|---|---|---|
| 1:1 | 1 | 1 | Direct dedicated communication | 
| 1:M | 1 | Many | Broadcasting data to multiple nodes | 
| M:1 | Many | 1 | Data aggregation and fusion | 
| M:M | Many | Many | Complex distributed systems | 
| Concept | Description | 
|---|---|
| Topic | Named channel for messages | 
| Message | Data structure sent over topic | 
| Publisher | Node that sends messages | 
| Subscriber | Node that receives messages | 
| Callback | Function called when message arrives | 
| Queue Size | Number of messages buffered | 
ROS2 provides several standard message packages:
std_msgs: Basic data types (Int, Float, String, Bool)geometry_msgs: Geometric data (Pose, Twist, Transform)sensor_msgs: Sensor data (Image, LaserScan, Imu, Temperature)nav_msgs: Navigation data (Odometry, Path, OccupancyGrid)| Package | Message | Purpose | 
|---|---|---|
| std_msgs | String | Text messages | 
| std_msgs | Int32 | Integer values | 
| std_msgs | Float32 | Floating point values | 
| std_msgs | Bool | Boolean values | 
| geometry_msgs | Twist | Velocity commands (linear & angular) | 
| geometry_msgs | Pose | Position and orientation | 
| sensor_msgs | Temperature | Temperature readings | 
| sensor_msgs | Imu | IMU sensor data | 
| sensor_msgs | Image | Camera images | 
# Example: geometry_msgs/Twist
# Used for velocity commands
geometry_msgs/Vector3 linear
  float64 x  # Forward/backward velocity
  float64 y  # Left/right velocity
  float64 z  # Up/down velocity
geometry_msgs/Vector3 angular
  float64 x  # Roll rate
  float64 y  # Pitch rate
  float64 z  # Yaw rate (turning)
                    A publisher node sends messages to a topic at regular or event-driven intervals.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Temperature
import random
class TemperaturePublisher(Node):
    def __init__(self):
        super().__init__('temperature_publisher')
        # Create publisher
        self.publisher_ = self.create_publisher(
            Temperature,        # Message type
            'temperature_data', # Topic name
            10                  # Queue size
        )
        # Create timer (publish every 1 second)
        self.timer_ = self.create_timer(1.0, self.publish_temperature)
        self.get_logger().info('Temperature Publisher started')
    def publish_temperature(self):
        # Create message
        msg = Temperature()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'temperature_sensor'
        msg.temperature = 20.0 + random.uniform(-5.0, 5.0)  # Simulate sensor
        msg.variance = 0.5
        # Publish message
        self.publisher_.publish(msg)
        self.get_logger().info(f'Published temperature: {msg.temperature:.2f} C')
def main(args=None):
    rclpy.init(args=args)
    node = TemperaturePublisher()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()
if __name__ == '__main__':
    main()
                    self.publisher_ = self.create_publisher(
    Temperature,        # Message type
    'temperature_data', # Topic name
    10                  # Queue size (QoS)
)
                    Parameters:
msg_type: The message class to publishtopic: Name of the topic (string)qos_profile: Queue size or QoS settingsself.timer_ = self.create_timer(
    1.0,                        # Period in seconds
    self.publish_temperature    # Callback function
)
                    # Create message instance
msg = Temperature()
# Fill message fields
msg.temperature = 25.5
msg.variance = 0.5
# Publish
self.publisher_.publish(msg)
                    A subscriber node receives messages from a topic and processes them in a callback function.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Temperature
class TemperatureSubscriber(Node):
    def __init__(self):
        super().__init__('temperature_subscriber')
        # Create subscriber
        self.subscription_ = self.create_subscription(
            Temperature,              # Message type
            'temperature_data',       # Topic name
            self.temperature_callback,  # Callback function
            10                        # Queue size
        )
        self.get_logger().info('Temperature Subscriber started')
    def temperature_callback(self, msg):
        """Called when a message is received"""
        temp = msg.temperature
        self.get_logger().info(f'Received temperature: {temp:.2f} C')
        # Process data
        if temp > 30.0:
            self.get_logger().warn('High temperature detected!')
        elif temp < 10.0:
            self.get_logger().warn('Low temperature detected!')
def main(args=None):
    rclpy.init(args=args)
    node = TemperatureSubscriber()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()
if __name__ == '__main__':
    main()
                    self.subscription_ = self.create_subscription(
    Temperature,              # Message type
    'temperature_data',       # Topic name
    self.temperature_callback,  # Callback function
    10                        # Queue size
)
                    def temperature_callback(self, msg):
    """Automatically called when message arrives"""
    # Access message data
    temperature = msg.temperature
    variance = msg.variance
    # Process the data
    self.get_logger().info(f'Temperature: {temperature}')
                    time.sleep()Quality of Service (QoS) policies control how messages are delivered between publishers and subscribers.
| Policy | Description | 
|---|---|
| Reliability | Reliable (guaranteed) or Best Effort | 
| Durability | Transient Local (keep last) or Volatile | 
| History | Keep Last N or Keep All | 
| Depth | Queue size (number of messages) | 
| Lifespan | How long messages are valid | 
| Deadline | Maximum time between messages | 
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
# Custom QoS
qos_profile = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10
)
# Use with publisher
self.publisher_ = self.create_publisher(
    Temperature,
    'temperature',
    qos_profile
)
# Predefined profiles
from rclpy.qos import qos_profile_sensor_data
from rclpy.qos import qos_profile_system_default
self.sensor_pub_ = self.create_publisher(
    Temperature,
    'temperature',
    qos_profile_sensor_data
)
                    # List all topics
ros2 topic list
# Show topic information
ros2 topic info /temperature
# Echo topic messages
ros2 topic echo /temperature
# Show message type
ros2 topic type /temperature
# Publish from command line
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  "{linear: {x: 1.0}, angular: {z: 0.5}}"
# Check publishing rate
ros2 topic hz /temperature
# Bandwidth usage
ros2 topic bw /camera/image_raw
# Find topics by type
ros2 topic find sensor_msgs/msg/Temperature
                    Problem: Subscriber doesn't receive messages
Solutions:
ros2 node listProblem: Messages not received despite matching topics
Solution:
ros2 topic info /topic_name to check QoS/robot/sensors/temperaturetime.sleep() in callbacksclass RobotNode(Node):
    def __init__(self):
        super().__init__('robot_node')
        # Publishers
        self._setup_publishers()
        # Subscribers
        self._setup_subscribers()
        # Timers
        self._setup_timers()
        # State variables
        self._initialize_state()
        self.get_logger().info('Node initialized')
    def _setup_publishers(self):
        """Initialize all publishers"""
        self.cmd_pub_ = self.create_publisher(...)
    def _setup_subscribers(self):
        """Initialize all subscribers"""
        self.sensor_sub_ = self.create_subscription(...)
    def _setup_timers(self):
        """Initialize all timers"""
        self.timer_ = self.create_timer(...)
    def _initialize_state(self):
        """Initialize state variables"""
        self.robot_state_ = "idle"
                    Create a complete sensor monitoring and fusion system for a simulated robot.
Package Name: sensor_fusion_system
sensor_publisher.py)
                            sensor_publisher.py)
                                data_logger.py)
                                alert_system.py)
                                sensor_fusion.py)
                                | Component | Points | 
|---|---|
| Sensor Publisher Node | 20 | 
| Data Logger Node | 20 | 
| Alert System Node | 20 | 
| Fusion Node | 20 | 
| Code Quality & Documentation | 10 | 
| Testing & Demonstration | 10 | 
| Bonus Challenges | +10 | 
| Total | 100 (+10) | 
Complete Week 3 project and review callback functions
This week covered topic-based communication in ROS2, including publishers, subscribers, message types, and data fusion concepts. We implemented practical examples based on the Computer Vision Pipeline and Navigation System architectures.