📚 Introduction: Service-Based Architecture
Welcome! This tutorial demonstrates ROS 2 Task Management using Services instead of Topics. Services provide synchronous request-response communication, making them ideal for task management.
🎯 What is ROS 2 Service?
A service is described by a pair of messages: one for the request and one for the response. A client sends a request and waits for the server to process it and send back a response.
Sends Request
Processes Request
Receives Response
📊 Topics vs Services Comparison
| Feature | Topics (Pub/Sub) | Services (Request/Response) |
|---|---|---|
| Communication | Asynchronous, continuous | Synchronous, one-time |
| Response | No direct response | Guaranteed response |
| Use Case | Sensor data, status updates | Commands, requests, transactions |
| Connection | Many-to-many | One-to-one |
🚀 System Architecture
Call add_task service → Get immediate task_id
Manages queue & executes tasks
Executes movement commands
✨ Key Features
- Synchronous Communication: Client waits for immediate confirmation
- Priority Queue: Higher priority tasks execute first
- Task ID Assignment: Each task gets unique identifier
- Status Checking: Query task status via service
📋 Prerequisites
- ROS 2 Humble, Iron, or Jazzy installed
- Python 3.8+
- Understanding of ROS 2 nodes and services
🔧 Step 1: Environment Setup
Create Workspace & Package
# Create workspace
mkdir -p ~/task_service_ws/src
cd ~/task_service_ws/src
# Create package with service dependencies
ros2 pkg create --build-type ament_python task_service_management \
--dependencies rclpy std_msgs geometry_msgs
Understanding Service Structure
ROS 2 services consist of:
- Service Definition (.srv file): Defines request and response structure
- Service Server: Handles incoming requests
- Service Client: Sends requests and receives responses
Service Definition Format
# Service file structure (.srv) # --- Request --- field_type field_name field_type field_name --- # --- Response --- field_type field_name field_type field_name
int64 a int64 b --- int64 sum
📨 Step 2: Create Custom Services
We'll create services for adding tasks and checking status.
Create Service Directory
cd ~/task_service_ws/src/task_service_management mkdir srv
AddTask Service (srv/AddTask.srv)
Service to add a new task to the queue
# Request string task_type uint8 priority string[] parameters --- # Response uint32 task_id bool success string message
task_type: Type of task (move_forward, rotate, etc.)priority: Task priority (0-10, higher = more urgent)parameters: Task-specific parameters
task_id: Unique identifier assigned to tasksuccess: Whether task was queued successfullymessage: Status message
GetTaskStatus Service (srv/GetTaskStatus.srv)
Service to query the status of a task
# Request uint32 task_id --- # Response bool found string status string task_type float64 execution_time
Update package.xml
Add service generation dependencies:
<buildtool_depend>rosidl_default_generators</buildtool_depend> <exec_depend>rosidl_default_runtime</exec_depend> <member_of_group>rosidl_interface_packages</member_of_group>
📊 Step 3: Priority Queue Implementation
Same as before - manages task execution order by priority.
Create priority_queue.py
import heapq
from typing import List, Optional
from dataclasses import dataclass, field
@dataclass(order=True)
class PriorityTask:
priority: int
task_id: int = field(compare=False)
task_type: str = field(compare=False)
parameters: List[str] = field(compare=False, default_factory=list)
status: str = field(default='queued', compare=False)
execution_time: float = field(default=0.0, compare=False)
class TaskPriorityQueue:
def __init__(self):
self._queue = []
self._tasks_dict = {} # Store all tasks by ID
def push(self, task_id: int, task_type: str, priority: int, parameters: List[str]):
task = PriorityTask(-priority, task_id, task_type, parameters)
heapq.heappush(self._queue, task)
self._tasks_dict[task_id] = task
def pop(self) -> Optional[PriorityTask]:
if self._queue:
task = heapq.heappop(self._queue)
task.priority = -task.priority
task.status = 'executing'
return task
return None
def get_task_status(self, task_id: int) -> Optional[PriorityTask]:
return self._tasks_dict.get(task_id)
def update_task_status(self, task_id: int, status: str, execution_time: float = 0.0):
if task_id in self._tasks_dict:
self._tasks_dict[task_id].status = status
self._tasks_dict[task_id].execution_time = execution_time
def size(self) -> int:
return len(self._queue)
def is_empty(self) -> bool:
return len(self._queue) == 0
⚙️ Step 4: Task Executor
Same executor as before - converts tasks to robot commands.
Create task_executor.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import time
import math
class TaskExecutor(Node):
def __init__(self):
super().__init__('task_executor')
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.get_logger().info('Task Executor initialized')
def execute_task(self, task_type: str, parameters: list) -> tuple:
start_time = time.time()
success = False
message = ""
try:
if task_type == "move_forward":
success = self._move_forward(parameters)
message = f"Moved forward {parameters[0]}m"
elif task_type == "rotate":
success = self._rotate(parameters)
message = f"Rotated {parameters[0]} degrees"
elif task_type == "stop":
success = self._stop()
message = "Robot stopped"
else:
message = f"Unknown task: {task_type}"
except Exception as e:
message = f"Error: {str(e)}"
execution_time = time.time() - start_time
return success, message, execution_time
def _move_forward(self, parameters: list) -> bool:
try:
distance = float(parameters[0])
speed = 0.2
duration = distance / speed
twist = Twist()
twist.linear.x = speed
start = time.time()
while (time.time() - start) < duration:
self.cmd_vel_pub.publish(twist)
time.sleep(0.1)
twist.linear.x = 0.0
self.cmd_vel_pub.publish(twist)
return True
except:
return False
def _rotate(self, parameters: list) -> bool:
try:
angle = float(parameters[0])
speed = 0.5
duration = abs(math.radians(angle)) / speed
twist = Twist()
twist.angular.z = speed if angle > 0 else -speed
start = time.time()
while (time.time() - start) < duration:
self.cmd_vel_pub.publish(twist)
time.sleep(0.1)
twist.angular.z = 0.0
self.cmd_vel_pub.publish(twist)
return True
except:
return False
def _stop(self) -> bool:
twist = Twist()
for _ in range(5):
self.cmd_vel_pub.publish(twist)
time.sleep(0.1)
return True
🖥️ Step 5: Task Server with Services
The server now uses services instead of topics for task management.
Create task_server.py
import rclpy
from rclpy.node import Node
import threading
from task_service_management.srv import AddTask, GetTaskStatus
from .priority_queue import TaskPriorityQueue
from .task_executor import TaskExecutor
class TaskServer(Node):
def __init__(self):
super().__init__('task_server')
# Components
self.task_queue = TaskPriorityQueue()
self.executor = TaskExecutor()
# Service Servers
self.add_task_service = self.create_service(
AddTask,
'add_task',
self.add_task_callback
)
self.get_status_service = self.create_service(
GetTaskStatus,
'get_task_status',
self.get_status_callback
)
# State
self.task_counter = 0
self.is_executing = False
# Processing timer
self.timer = self.create_timer(0.5, self.process_tasks)
self.get_logger().info('Task Server ready with services')
def add_task_callback(self, request, response):
"""Handle add_task service request"""
task_id = self.task_counter
self.task_counter += 1
try:
# Add task to priority queue
self.task_queue.push(
task_id,
request.task_type,
request.priority,
list(request.parameters)
)
response.task_id = task_id
response.success = True
response.message = f"Task queued with ID {task_id}"
self.get_logger().info(
f"Task {task_id} added: {request.task_type} (priority {request.priority})"
)
except Exception as e:
response.task_id = 0
response.success = False
response.message = f"Error: {str(e)}"
return response
def get_status_callback(self, request, response):
"""Handle get_task_status service request"""
task = self.task_queue.get_task_status(request.task_id)
if task:
response.found = True
response.status = task.status
response.task_type = task.task_type
response.execution_time = task.execution_time
else:
response.found = False
response.status = "not_found"
response.task_type = ""
response.execution_time = 0.0
return response
def process_tasks(self):
"""Process tasks from queue"""
if self.is_executing or self.task_queue.is_empty():
return
task = self.task_queue.pop()
if not task:
return
self.is_executing = True
self.get_logger().info(f"Executing task {task.task_id}: {task.task_type}")
# Execute in thread
thread = threading.Thread(
target=self._execute_task,
args=(task,)
)
thread.start()
def _execute_task(self, task):
"""Execute task and update status"""
success, message, exec_time = self.executor.execute_task(
task.task_type,
task.parameters
)
status = 'completed' if success else 'failed'
self.task_queue.update_task_status(task.task_id, status, exec_time)
self.get_logger().info(
f"Task {task.task_id} {status}: {message} ({exec_time:.2f}s)"
)
self.is_executing = False
def main(args=None):
rclpy.init(args=args)
server = TaskServer()
rclpy.spin(server)
server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
🔑 Key Differences from Topic-Based
- create_service(): Instead of create_subscription()
- Callback returns response: Must return response object
- Synchronous: Client waits for server response
- Immediate feedback: Client gets task_id instantly
💻 Step 6: Task Client with Service Calls
Create task_client.py
import rclpy
from rclpy.node import Node
from task_service_management.srv import AddTask, GetTaskStatus
import time
class TaskClient(Node):
def __init__(self):
super().__init__('task_client')
# Service clients
self.add_task_client = self.create_client(AddTask, 'add_task')
self.get_status_client = self.create_client(GetTaskStatus, 'get_task_status')
# Wait for services
while not self.add_task_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for add_task service...')
self.get_logger().info('Task Client ready')
def send_task(self, task_type: str, priority: int, parameters: list):
"""Send task and wait for response"""
request = AddTask.Request()
request.task_type = task_type
request.priority = priority
request.parameters = parameters
self.get_logger().info(f'Sending task: {task_type} (priority {priority})')
# Call service synchronously
future = self.add_task_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response = future.result()
if response.success:
self.get_logger().info(
f'Task accepted! ID: {response.task_id} - {response.message}'
)
return response.task_id
else:
self.get_logger().error(f'Task rejected: {response.message}')
return None
else:
self.get_logger().error('Service call failed')
return None
def check_task_status(self, task_id: int):
"""Check status of a task"""
request = GetTaskStatus.Request()
request.task_id = task_id
future = self.get_status_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response = future.result()
if response.found:
self.get_logger().info(
f'Task {task_id}: {response.status} - '
f'{response.task_type} ({response.execution_time:.2f}s)'
)
else:
self.get_logger().warn(f'Task {task_id} not found')
else:
self.get_logger().error('Status check failed')
def main(args=None):
rclpy.init(args=args)
client = TaskClient()
try:
# Send tasks with different priorities
task1 = client.send_task('move_forward', priority=3, parameters=['2.0'])
task2 = client.send_task('rotate', priority=8, parameters=['90'])
task3 = client.send_task('move_forward', priority=5, parameters=['1.5'])
# Wait a bit
time.sleep(2)
# Check status
if task1:
client.check_task_status(task1)
if task2:
client.check_task_status(task2)
except KeyboardInterrupt:
pass
finally:
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
🔄 Service Call Flow
Creates request
Calls service
Receives request
Processes
Returns response
Gets response
(task_id, status)
⚙️ Step 7: Package Configuration
Update setup.py
from setuptools import setup
import os
from glob import glob
package_name = 'task_service_management'
setup(
name=package_name,
version='0.0.1',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='your@email.com',
description='ROS2 Task Management with Services',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'task_server = task_service_management.task_server:main',
'task_client = task_service_management.task_client:main',
],
},
)
Update package.xml
<?xml version="1.0"?>
<package format="3">
<name>task_service_management</name>
<version>0.0.1</version>
<description>Task Management with ROS2 Services</description>
<maintainer email="your@email.com">Your Name</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_python</build_type>
</export>
</package>
Update CMakeLists.txt (if needed)
cmake_minimum_required(VERSION 3.5)
project(task_service_management)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
# Generate services
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/AddTask.srv"
"srv/GetTaskStatus.srv"
)
ament_package()
🔨 Step 8: Build and Test
Build the Package
cd ~/task_service_ws colcon build --packages-select task_service_management source install/setup.bash
Terminal 1: Run Server
ros2 run task_service_management task_server
Terminal 2: Run Client
ros2 run task_service_management task_client
Test Services Manually
# List available services
ros2 service list
# Call add_task service
ros2 service call /add_task task_service_management/srv/AddTask \
"{task_type: 'move_forward', priority: 5, parameters: ['2.0']}"
# Check task status
ros2 service call /get_task_status task_service_management/srv/GetTaskStatus \
"{task_id: 0}"
# View service type
ros2 service type /add_task
task_id: 0 success: True message: "Task queued with ID 0"
🎉 Congratulations!
You've successfully built a ROS 2 Task Management System using Services!
✨ What You've Learned
- ✅ Created custom ROS 2 service definitions (.srv files)
- ✅ Implemented service server with request/response handling
- ✅ Built service client with synchronous calls
- ✅ Integrated priority queue for task management
- ✅ Connected services to robot control logic
🎯 Service vs Topic Summary
| Aspect | Our Service Implementation | Previous Topic Implementation |
|---|---|---|
| Task Submission | Call add_task service → Get task_id | Publish to topic → No response |
| Status Check | Call get_task_status service | Subscribe to status topic |
| Communication | Synchronous, blocking | Asynchronous, non-blocking |
| Feedback | Immediate confirmation | Eventually via separate topic |
🚀 Next Steps
- Add CancelTask service
- Implement GetQueueStatus service
- Create ClearQueue service
- Add task scheduling (execute at specific time)
- Implement task dependencies
- Add authentication/authorization