Introduction
Having recently uploaded your code to the Teensy/Arduino, Part 2 will show users how to run ROS nodes on the Raspberry Pi. This part will focus more on running ROS on a single Pheeno. If you want to see how to get ROS running with multiple Pheenos and a single computer acting as ROS master, please have a look here in Part 3.
Python Code
Lets take a look at a code example for a Pheeno to undergo a random walk with obstacle avoidance:
#!/usr/bin/env python
import rospy
import random
import argparse
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
def get_args():
""" Get arguments from rosrun for individual deployment. """
parser = argparse.ArgumentParser(
description="Obstacle avoidance python script."
)
# Required arguments
parser.add_argument("-n", "--number",
action="namespace",
required=False,
help="Add a pheeno number namespace.",
default="")
return parser.parse_args(rospy.myargv()[1:])
def random_turn():
""" Random turn direction.
Returns a random turn direction for the pheeno based on a uniform
distribution.
"""
if random.random() < 0.5:
turn_direction = -0.05 # Turn right
else:
turn_direction = 0.05 # Turn left
return turn_direction
def obstacle_check(msg, ir_location):
global is_obstacle_in_way
global sensor_triggered
global sensor_limits
if msg.data < sensor_limits[ir_location]:
sensor_triggered = ir_location
is_obstacle_in_way[ir_location] = True
else:
is_obstacle_in_way[ir_location] = False
if __name__ == "__main__":
# Get arguments from argument parser.
input_args = get_args()
if input_args.name is "":
pheeno_number = ""
else:
pheeno_number = "/pheeno_" + str(input_args.number)
# Initialize Node
rospy.init_node("pheeno_obstacle_avoidance")
# Create Publishers and Subscribers
pub = rospy.Publisher(
pheeno_number + "/cmd_vel", Twist, queue_size=100)
sub_ir_center = rospy.Subscriber(
pheeno_number + "/scan_center", Float32, obstacle_check,
callback_args="center")
sub_ir_back = rospy.Subscriber(
pheeno_number + "/scan_back", Float32, obstacle_check,
callback_args="back")
sub_ir_right = rospy.Subscriber(
pheeno_number + "/scan_right", Float32, obstacle_check,
callback_args="right")
sub_ir_left = rospy.Subscriber(
pheeno_number + "/scan_left", Float32, obstacle_check,
callback_args="left")
sub_ir_cr = rospy.Subscriber(
pheeno_number + "/scan_cr", Float32, obstacle_check,
callback_args="cr")
sub_ir_cl = rospy.Subscriber(
pheeno_number + "/scan_cl", Float32, obstacle_check,
callback_args="cl")
# Global Variables
global is_obstacle_in_way
global sensor_triggered
global sensor_limits
# Other important variables
is_obstacle_in_way = {"center": False, "cr": False, "right": False,
"back": False, "left": False, "cl": False}
sensor_triggered = 0
sensors = {"center": 0.05, "cr": -0.05, "right": -0.05,
"back": -0.05, "left": 0.05, "cl": 0.05}
sensor_limits = {"center": 25.0, "cr": 10.0, "right": 10.0,
"back": 15.0, "left": 10.0, "cl": 10.0}
cmd_vel_msg = Twist()
obs_cmd_vel_msg = Twist()
rate = rospy.Rate(2)
count = 0
while not rospy.is_shutdown():
if True in is_obstacle_in_way.values():
obs_cmd_vel_msg.linear.x = 0
obs_cmd_vel_msg.angular.z = sensors[sensor_triggered]
pub.publish(obs_cmd_vel_msg)
# Prevent a random turn into an object after trying to avoid.
count = 3
else:
if count is 0:
cmd_vel_msg.linear.x = 0
cmd_vel_msg.angular.z = random_turn()
pub.publish(cmd_vel_msg)
count += 1
elif count is 3:
cmd_vel_msg.linear.x = 0.05
cmd_vel_msg.angular.z = 0
pub.publish(cmd_vel_msg)
count += 1
elif count is 15:
count = 0
else:
pub.publish(cmd_vel_msg)
count += 1
rate.sleep()
Import Statements
import rospy
import random
import argparse
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
For any Python script to run with ROS, the first library you to import should be rospy
. It contains all the necessary methods, classes, and variables we will need to write our code. The next two are for this specific program. The last two libraries are specific to the messages we will use in our Publishers and Subscribers. For this example, we will be subscribing to sensor value topics that use a Float32
message type. Our cmd_vel
topic sends Twist
messages.
If you recall from Part 1, the message types we are importing are the same as those in the Arduino code example!
The Main Script
You may have noticed that we passed by the function definitions, but we promise we will get back to them! Here is where the meat of our python ROS script resides, and it will make understanding the function definitions easier when know when and where they are called!
Namespaces and Pheeno Numbering
if __name__ == "__main__":
# Get arguments from argument parser.
input_args = get_args()
if input_args is "":
pheeno_number = ""
else:
pheeno_number = "/pheeno_" + str(input_args.number)
The if
statement at the beginning is something that programmers should use often when writing Python code. For those who do not know why we use this statement, this link gives a good explanation for its use.
To begin, we start by taking in any arguments provided to use when the rosrun
command attempts to create a node from this script. The only argument this script allows is to number your Pheeno.
Why do we need to add a number? Well, it helps to differentiate the ROS topics for one Pheeno and another one. Say for example, we have two Pheenos. If we sent a Twist
message using the /cmd_vel
topic to move only Pheeno #1, both would actually move! This is because we never set a namespace for our device. There are more advanced ways to set up namespaces, but, for the remainder of this tutorial, we will just add a specific name when we want to publish or subscribe to topics related to a specific Pheeno. Like this!
pub = rospy.Publisher("/pheeno_55/cmd_vel", Twist, queue_size=100)
If you followed the guide here setting up your Pheeno, you will know that the Pheeno is given a network number, such as 192.168.119.55
by you. The first 3 sets of numbers are what a router uses to identify local connections, but the last number is used to identify unique devices. For example, if my phone is connected to the same router as the Pheeno, it may have the address 192.168.119.34
that is automatically (and randomly) assigned to it by the router. In our case, each Pheeno was given a static IP. This means that it won’t be given a randomly assigned number when it connects to the router, but it will continue to use the one we gave it!
When using multiple Pheenos, it is good practice to number the robot based on the IP number you gave it. For this code and the code available online, we use 55
as our numbered Pheeno. You can even do this with a single Pheeno too since you gave it a static IP upon setting the Raspberry Pi as well. The if
statement following the code, however, gives us an option! If you have a single Pheeno and have written all your Publishers and Subscribers in the Arduino code without a Pheeno namespace, then just leave the space after -n
blank.
Nodes, Publishers, and Subscribers
# Initialize Node
rospy.init_node("pheeno_obstacle_avoidance")
Here we see how to initialize the node. All it requires is a simple call to the init_node()
function within the rospy
library. All it requires is a String
name argument that will be our nodes name. If you have multiple Pheenos you do not need to add a namespace to these node names.
# Create Publishers and Subscribers
pub = rospy.Publisher(
pheeno_number + "/cmd_vel", Twist, queue_size=100)
sub_ir_center = rospy.Subscriber(
pheeno_number + "/scan_center", Float32, obstacle_check,
callback_args="center")
sub_ir_back = rospy.Subscriber(
pheeno_number + "/scan_back", Float32, obstacle_check,
callback_args="back")
sub_ir_right = rospy.Subscriber(
pheeno_number + "/scan_right", Float32, obstacle_check,
callback_args="right")
sub_ir_left = rospy.Subscriber(
pheeno_number + "/scan_left", Float32, obstacle_check,
callback_args="left")
sub_ir_cr = rospy.Subscriber(
pheeno_number + "/scan_cr", Float32, obstacle_check,
callback_args="cr")
sub_ir_cl = rospy.Subscriber(
pheeno_number + "/scan_cl", Float32, obstacle_check,
callback_args="cl")
Creating a Publisher and Subscriber is very similar to the way it is written in our Arduino code section. For the Publisher, we see that two arguments are required and an optional one is added for this program. The first is the name of the ROS topic, the second is the ROS message type, and the optional third is for the queue size. The queue_size
option establishes the amount of messages built up for asynchronous publishing.
The Subscribers are only a little different. They have three arguments and an optional one that we will use for just this code. The first is the ROS topic name, the second the the ROS message type, and the third is the callback function we wish to our program to use when a message arrives to the Subscriber. The optional argument is something I find very useful. It allows users to add extra arguments to the callback function. In our case, we are going to be having an extra argument that passes the location of the IR sensor. We will talk more about why we send back the IR scanner locations in the Function Definitions section later.
Global and Other Variables
# Global Variables
global is_obstacle_in_way
global sensor_triggered
global sensor_limits
# Other important variables
is_obstacle_in_way = {"center": False, "cr": False, "right": False,
"back": False, "left": False, "cl": False}
sensor_triggered = 0
sensors = {"center": 0.05, "cr": -0.05, "right": -0.05,
"back": -0.05, "left": 0.05, "cl": 0.05}
sensor_limits = {"center": 25.0, "cr": 10.0, "right": 10.0,
"back": 15.0, "left": 10.0, "cl": 10.0}
cmd_vel_msg = Twist()
obs_cmd_vel_msg = Twist()
rate = rospy.Rate(2)
count = 0
The following variables are created to help make the program run quicker and allows for easy changing. As you can see, I use four dictionary variables. Each key variable is a string containing the position of the sensor. In the Function Definitions and Loop sections, we will see how we will use these variables.
There is also one more variable I would like to address and it is the rate
variable that is given the rospy.Rate(2)
value. This corresponds to the publish rate for our node in Hz. At the end of the while
loop, which I will discuss later, a rate.sleep()
statement is used. This will put the node to sleep for the remainder of the time left for $2$ Hz. $2$ Hz is $0.5$ seconds, so if the while
loop finishes a single loop in $0.25$ seconds, the node will pause for the remainder of the time until it finishes reaches $0.5$ seconds.
Loop
while not rospy.is_shutdown():
if True in is_obstacle_in_way.values():
obs_cmd_vel_msg.linear.x = 0
obs_cmd_vel_msg.angular.z = sensors[sensor_triggered]
pub.publish(obs_cmd_vel_msg)
# Prevent a random turn into an object after trying to avoid.
count = 3
else:
if count is 0:
cmd_vel_msg.linear.x = 0
cmd_vel_msg.angular.z = random_turn()
pub.publish(cmd_vel_msg)
count += 1
elif count is 3:
cmd_vel_msg.linear.x = 0.05
cmd_vel_msg.angular.z = 0
pub.publish(cmd_vel_msg)
count += 1
elif count is 15:
count = 0
else:
pub.publish(cmd_vel_msg)
count += 1
rate.sleep()
This while
loop represents the repeated actions that our Pheeno will take. The condition for this loop is a default and should be used anytime you create a node using Python. All it does is stop the loop (and node) if ROS shuts down or the node is closed.
The loop starts by evaluating if an object is in the way. The variable it checks in the if
statement is a dictionary that stores if an object is detected at a certain distance from the Pheeno. Writing
if True in is_obstacle_in_way.values():
checks if any of the values in the dictionary are True
. If any of the values within the dictionary are True
, then the Pheeno must attempt to avoid the object. It does this by publishing to the /cmd_vel
topic where it wants to movie. Another dictionary (sensors
) is used to determine which direction and speed the Pheeno should turn. Notice how we make the linear.x
equal to 0. This just makes sure that the Pheeno will stop moving and just turn away from the object.
If there are no objects in the way, the if
statement will continue to the other portion. This portion controls the random walk using a count
variable. Since the publish rate ($2$ Hz), we know that every $0.5$ seconds a command is being published to the Pheeno. So every increase in the count
variable corresponds to about $0.5$ seconds occurring in real-time (this isn’t completely accurate, but it is a good guess).
When the count
variable is $0$, it sets a random direction for the Pheeno to turn based on a returned value from the random_turn()
function. As stated previously, the linear.x
is kept at $0$ to ensure the Pheeno will only turn in place. The Twist
message is then published to the /cmd_vel
topic. At $3$, we switch to linear motion. The angular.z
is set to $0$ and the linear.x
is then given a static velocity value. Finally, the count
variable is reset at $15$. If the count
lands on a number that wasn’t stated, it will continue to publish the current speeds on the /cmd_vel
topic as shown in the final else
statement.
Function Definitions
Three functions were made for this example. The first is for an argument parser, so we can skip that because python scripts do not always require arguments.
def random_turn():
""" Random turn direction.
Returns a random turn direction for the pheeno based on a uniform
distribution.
"""
if random.random() < 0.5:
turn_direction = -0.05 # Turn right
else:
turn_direction = 0.05 # Turn left
return turn_direction
This function will be used by our Pheeno to undergo a random walk. After being called, it will return an angular speed of either $0.05$ m/s or $-0.5$ m/s that is determined by a random number. This is where importing the random
library comes into play!
def obstacle_check(msg, ir_location):
global is_obstacle_in_way
global sensor_triggered
global sensor_limits
if msg.data < sensor_limits[ir_location]:
sensor_triggered = ir_location
is_obstacle_in_way[ir_location] = True
else:
is_obstacle_in_way[ir_location] = False
Each received message by our IR sensor subscribers will result in calling this callback function. msg
is the argument containing the ROS message received by the Subscriber while the ir_location
argument is what callback_args
option assigns. As stated in Part 1 of this guide, ROS messages wrap the actual message. To access the data in the message requires a dot operator to get to it as can be seen by my use of msg.data
.
A quick comparison is done to sensor limits that I have set within the sensor_limits
dictionary variable. If the IR sensor at that location gives a reading lower than our limit, it will log the IR sensor that was triggered and change its boolean value within the is_obstacle_in_way
dictionary variable. As you can see, the ir_location
argument provided to us by the callback_args
option is very useful, especially in our case with multiple Subscribers containing similar information.