📚 Introduction: Service-Based Architecture

Welcome! This tutorial demonstrates ROS 2 Task Management using Services instead of Topics. Services provide synchronous request-response communication, making them ideal for task management.

🎯 What is ROS 2 Service?

A service is described by a pair of messages: one for the request and one for the response. A client sends a request and waits for the server to process it and send back a response.

Client
Sends Request
Server
Processes Request
Client
Receives Response

📊 Topics vs Services Comparison

Feature Topics (Pub/Sub) Services (Request/Response)
Communication Asynchronous, continuous Synchronous, one-time
Response No direct response Guaranteed response
Use Case Sensor data, status updates Commands, requests, transactions
Connection Many-to-many One-to-one

🚀 System Architecture

Task Clients
Call add_task service → Get immediate task_id
↓ Service Call ↓
Task Server
Manages queue & executes tasks
↓ Controls ↓
Robot Hardware
Executes movement commands

✨ Key Features

  • Synchronous Communication: Client waits for immediate confirmation
  • Priority Queue: Higher priority tasks execute first
  • Task ID Assignment: Each task gets unique identifier
  • Status Checking: Query task status via service

📋 Prerequisites

  • ROS 2 Humble, Iron, or Jazzy installed
  • Python 3.8+
  • Understanding of ROS 2 nodes and services

🔧 Step 1: Environment Setup

Create Workspace & Package

# Create workspace
mkdir -p ~/task_service_ws/src
cd ~/task_service_ws/src

# Create package with service dependencies
ros2 pkg create --build-type ament_python task_service_management \
    --dependencies rclpy std_msgs geometry_msgs

Understanding Service Structure

ROS 2 services consist of:

  • Service Definition (.srv file): Defines request and response structure
  • Service Server: Handles incoming requests
  • Service Client: Sends requests and receives responses

Service Definition Format

# Service file structure (.srv)
# --- Request ---
field_type field_name
field_type field_name
---
# --- Response ---
field_type field_name
field_type field_name
💡 Example: AddTwoInts.srv
int64 a
int64 b
---
int64 sum

📨 Step 2: Create Custom Services

We'll create services for adding tasks and checking status.

Create Service Directory

cd ~/task_service_ws/src/task_service_management
mkdir srv

AddTask Service (srv/AddTask.srv)

Service to add a new task to the queue

# Request
string task_type
uint8 priority
string[] parameters
---
# Response
uint32 task_id
bool success
string message
Request Fields:
  • task_type: Type of task (move_forward, rotate, etc.)
  • priority: Task priority (0-10, higher = more urgent)
  • parameters: Task-specific parameters
Response Fields:
  • task_id: Unique identifier assigned to task
  • success: Whether task was queued successfully
  • message: Status message

GetTaskStatus Service (srv/GetTaskStatus.srv)

Service to query the status of a task

# Request
uint32 task_id
---
# Response
bool found
string status
string task_type
float64 execution_time

Update package.xml

Add service generation dependencies:

<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

📊 Step 3: Priority Queue Implementation

Same as before - manages task execution order by priority.

Create priority_queue.py

import heapq
from typing import List, Optional
from dataclasses import dataclass, field

@dataclass(order=True)
class PriorityTask:
    priority: int
    task_id: int = field(compare=False)
    task_type: str = field(compare=False)
    parameters: List[str] = field(compare=False, default_factory=list)
    status: str = field(default='queued', compare=False)
    execution_time: float = field(default=0.0, compare=False)

class TaskPriorityQueue:
    def __init__(self):
        self._queue = []
        self._tasks_dict = {}  # Store all tasks by ID

    def push(self, task_id: int, task_type: str, priority: int, parameters: List[str]):
        task = PriorityTask(-priority, task_id, task_type, parameters)
        heapq.heappush(self._queue, task)
        self._tasks_dict[task_id] = task

    def pop(self) -> Optional[PriorityTask]:
        if self._queue:
            task = heapq.heappop(self._queue)
            task.priority = -task.priority
            task.status = 'executing'
            return task
        return None

    def get_task_status(self, task_id: int) -> Optional[PriorityTask]:
        return self._tasks_dict.get(task_id)

    def update_task_status(self, task_id: int, status: str, execution_time: float = 0.0):
        if task_id in self._tasks_dict:
            self._tasks_dict[task_id].status = status
            self._tasks_dict[task_id].execution_time = execution_time

    def size(self) -> int:
        return len(self._queue)

    def is_empty(self) -> bool:
        return len(self._queue) == 0

⚙️ Step 4: Task Executor

Same executor as before - converts tasks to robot commands.

Create task_executor.py

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import time
import math

class TaskExecutor(Node):
    def __init__(self):
        super().__init__('task_executor')
        self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
        self.get_logger().info('Task Executor initialized')

    def execute_task(self, task_type: str, parameters: list) -> tuple:
        start_time = time.time()
        success = False
        message = ""

        try:
            if task_type == "move_forward":
                success = self._move_forward(parameters)
                message = f"Moved forward {parameters[0]}m"

            elif task_type == "rotate":
                success = self._rotate(parameters)
                message = f"Rotated {parameters[0]} degrees"

            elif task_type == "stop":
                success = self._stop()
                message = "Robot stopped"

            else:
                message = f"Unknown task: {task_type}"

        except Exception as e:
            message = f"Error: {str(e)}"

        execution_time = time.time() - start_time
        return success, message, execution_time

    def _move_forward(self, parameters: list) -> bool:
        try:
            distance = float(parameters[0])
            speed = 0.2
            duration = distance / speed

            twist = Twist()
            twist.linear.x = speed

            start = time.time()
            while (time.time() - start) < duration:
                self.cmd_vel_pub.publish(twist)
                time.sleep(0.1)

            twist.linear.x = 0.0
            self.cmd_vel_pub.publish(twist)
            return True
        except:
            return False

    def _rotate(self, parameters: list) -> bool:
        try:
            angle = float(parameters[0])
            speed = 0.5
            duration = abs(math.radians(angle)) / speed

            twist = Twist()
            twist.angular.z = speed if angle > 0 else -speed

            start = time.time()
            while (time.time() - start) < duration:
                self.cmd_vel_pub.publish(twist)
                time.sleep(0.1)

            twist.angular.z = 0.0
            self.cmd_vel_pub.publish(twist)
            return True
        except:
            return False

    def _stop(self) -> bool:
        twist = Twist()
        for _ in range(5):
            self.cmd_vel_pub.publish(twist)
            time.sleep(0.1)
        return True

🖥️ Step 5: Task Server with Services

The server now uses services instead of topics for task management.

Create task_server.py

import rclpy
from rclpy.node import Node
import threading
from task_service_management.srv import AddTask, GetTaskStatus
from .priority_queue import TaskPriorityQueue
from .task_executor import TaskExecutor

class TaskServer(Node):
    def __init__(self):
        super().__init__('task_server')

        # Components
        self.task_queue = TaskPriorityQueue()
        self.executor = TaskExecutor()

        # Service Servers
        self.add_task_service = self.create_service(
            AddTask,
            'add_task',
            self.add_task_callback
        )

        self.get_status_service = self.create_service(
            GetTaskStatus,
            'get_task_status',
            self.get_status_callback
        )

        # State
        self.task_counter = 0
        self.is_executing = False

        # Processing timer
        self.timer = self.create_timer(0.5, self.process_tasks)

        self.get_logger().info('Task Server ready with services')

    def add_task_callback(self, request, response):
        """Handle add_task service request"""
        task_id = self.task_counter
        self.task_counter += 1

        try:
            # Add task to priority queue
            self.task_queue.push(
                task_id,
                request.task_type,
                request.priority,
                list(request.parameters)
            )

            response.task_id = task_id
            response.success = True
            response.message = f"Task queued with ID {task_id}"

            self.get_logger().info(
                f"Task {task_id} added: {request.task_type} (priority {request.priority})"
            )

        except Exception as e:
            response.task_id = 0
            response.success = False
            response.message = f"Error: {str(e)}"

        return response

    def get_status_callback(self, request, response):
        """Handle get_task_status service request"""
        task = self.task_queue.get_task_status(request.task_id)

        if task:
            response.found = True
            response.status = task.status
            response.task_type = task.task_type
            response.execution_time = task.execution_time
        else:
            response.found = False
            response.status = "not_found"
            response.task_type = ""
            response.execution_time = 0.0

        return response

    def process_tasks(self):
        """Process tasks from queue"""
        if self.is_executing or self.task_queue.is_empty():
            return

        task = self.task_queue.pop()
        if not task:
            return

        self.is_executing = True

        self.get_logger().info(f"Executing task {task.task_id}: {task.task_type}")

        # Execute in thread
        thread = threading.Thread(
            target=self._execute_task,
            args=(task,)
        )
        thread.start()

    def _execute_task(self, task):
        """Execute task and update status"""
        success, message, exec_time = self.executor.execute_task(
            task.task_type,
            task.parameters
        )

        status = 'completed' if success else 'failed'
        self.task_queue.update_task_status(task.task_id, status, exec_time)

        self.get_logger().info(
            f"Task {task.task_id} {status}: {message} ({exec_time:.2f}s)"
        )

        self.is_executing = False

def main(args=None):
    rclpy.init(args=args)
    server = TaskServer()
    rclpy.spin(server)
    server.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

🔑 Key Differences from Topic-Based

  • create_service(): Instead of create_subscription()
  • Callback returns response: Must return response object
  • Synchronous: Client waits for server response
  • Immediate feedback: Client gets task_id instantly

💻 Step 6: Task Client with Service Calls

Create task_client.py

import rclpy
from rclpy.node import Node
from task_service_management.srv import AddTask, GetTaskStatus
import time

class TaskClient(Node):
    def __init__(self):
        super().__init__('task_client')

        # Service clients
        self.add_task_client = self.create_client(AddTask, 'add_task')
        self.get_status_client = self.create_client(GetTaskStatus, 'get_task_status')

        # Wait for services
        while not self.add_task_client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for add_task service...')

        self.get_logger().info('Task Client ready')

    def send_task(self, task_type: str, priority: int, parameters: list):
        """Send task and wait for response"""
        request = AddTask.Request()
        request.task_type = task_type
        request.priority = priority
        request.parameters = parameters

        self.get_logger().info(f'Sending task: {task_type} (priority {priority})')

        # Call service synchronously
        future = self.add_task_client.call_async(request)
        rclpy.spin_until_future_complete(self, future)

        if future.result() is not None:
            response = future.result()
            if response.success:
                self.get_logger().info(
                    f'Task accepted! ID: {response.task_id} - {response.message}'
                )
                return response.task_id
            else:
                self.get_logger().error(f'Task rejected: {response.message}')
                return None
        else:
            self.get_logger().error('Service call failed')
            return None

    def check_task_status(self, task_id: int):
        """Check status of a task"""
        request = GetTaskStatus.Request()
        request.task_id = task_id

        future = self.get_status_client.call_async(request)
        rclpy.spin_until_future_complete(self, future)

        if future.result() is not None:
            response = future.result()
            if response.found:
                self.get_logger().info(
                    f'Task {task_id}: {response.status} - '
                    f'{response.task_type} ({response.execution_time:.2f}s)'
                )
            else:
                self.get_logger().warn(f'Task {task_id} not found')
        else:
            self.get_logger().error('Status check failed')

def main(args=None):
    rclpy.init(args=args)
    client = TaskClient()

    try:
        # Send tasks with different priorities
        task1 = client.send_task('move_forward', priority=3, parameters=['2.0'])
        task2 = client.send_task('rotate', priority=8, parameters=['90'])
        task3 = client.send_task('move_forward', priority=5, parameters=['1.5'])

        # Wait a bit
        time.sleep(2)

        # Check status
        if task1:
            client.check_task_status(task1)
        if task2:
            client.check_task_status(task2)

    except KeyboardInterrupt:
        pass
    finally:
        client.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

🔄 Service Call Flow

1. Client
Creates request
Calls service
2. Server
Receives request
Processes
Returns response
3. Client
Gets response
(task_id, status)

⚙️ Step 7: Package Configuration

Update setup.py

from setuptools import setup
import os
from glob import glob

package_name = 'task_service_management'

setup(
    name=package_name,
    version='0.0.1',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='Your Name',
    maintainer_email='your@email.com',
    description='ROS2 Task Management with Services',
    license='Apache-2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'task_server = task_service_management.task_server:main',
            'task_client = task_service_management.task_client:main',
        ],
    },
)

Update package.xml

<?xml version="1.0"?>
<package format="3">
  <name>task_service_management</name>
  <version>0.0.1</version>
  <description>Task Management with ROS2 Services</description>
  <maintainer email="your@email.com">Your Name</maintainer>
  <license>Apache-2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>rosidl_default_generators</buildtool_depend>

  <depend>rclpy</depend>
  <depend>std_msgs</depend>
  <depend>geometry_msgs</depend>

  <exec_depend>rosidl_default_runtime</exec_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

Update CMakeLists.txt (if needed)

cmake_minimum_required(VERSION 3.5)
project(task_service_management)

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

# Generate services
rosidl_generate_interfaces(${PROJECT_NAME}
  "srv/AddTask.srv"
  "srv/GetTaskStatus.srv"
)

ament_package()

🔨 Step 8: Build and Test

Build the Package

cd ~/task_service_ws
colcon build --packages-select task_service_management
source install/setup.bash

Terminal 1: Run Server

ros2 run task_service_management task_server

Terminal 2: Run Client

ros2 run task_service_management task_client

Test Services Manually

# List available services
ros2 service list

# Call add_task service
ros2 service call /add_task task_service_management/srv/AddTask \
"{task_type: 'move_forward', priority: 5, parameters: ['2.0']}"

# Check task status
ros2 service call /get_task_status task_service_management/srv/GetTaskStatus \
"{task_id: 0}"

# View service type
ros2 service type /add_task
✅ Expected Output:
task_id: 0
success: True
message: "Task queued with ID 0"

🎉 Congratulations!

You've successfully built a ROS 2 Task Management System using Services!

✨ What You've Learned

  • ✅ Created custom ROS 2 service definitions (.srv files)
  • ✅ Implemented service server with request/response handling
  • ✅ Built service client with synchronous calls
  • ✅ Integrated priority queue for task management
  • ✅ Connected services to robot control logic

🎯 Service vs Topic Summary

Aspect Our Service Implementation Previous Topic Implementation
Task Submission Call add_task service → Get task_id Publish to topic → No response
Status Check Call get_task_status service Subscribe to status topic
Communication Synchronous, blocking Asynchronous, non-blocking
Feedback Immediate confirmation Eventually via separate topic

🚀 Next Steps

Enhance Your System:
  • Add CancelTask service
  • Implement GetQueueStatus service
  • Create ClearQueue service
  • Add task scheduling (execute at specific time)
  • Implement task dependencies
  • Add authentication/authorization

📚 Additional Resources