Implementation Guide#
This page describes the recommended implementation order, how to divide work between two team members, and key patterns to follow. Read the full Requirements first.
Recommended Implementation Order#
The project has dependencies between components. Follow this order to avoid blocking your partner and to enable testing at each step.
Phase 1: Foundation (Days 1-3)
Step |
Task |
Details |
|---|---|---|
1 |
Create package skeletons |
Create |
2 |
Define service interfaces |
Create |
3 |
Implement service servers |
Write |
4 |
Implement |
Plain Python class. Test it with a simple script that calls its methods. No ROS 2 needed. |
5 |
Create |
Define zones, base station, and the BT tick rate. Reference:
|
6 |
Build the map |
Run |
Phase 2: BT Leaf Nodes (Days 4-7)
Build and test nodes incrementally. Each node can be tested in isolation before assembling the full tree.
Step |
Task |
Details |
|---|---|---|
7 |
Implement |
The hardest node. Uses |
8 |
Implement |
Nearly identical to |
9 |
Implement simple action nodes |
|
10 |
Implement condition nodes |
|
11 |
Implement |
Calls the |
12 |
Implement |
Creates a |
13 |
Implement |
Calls the |
Phase 3: Integration (Days 8-11)
Step |
Task |
Details |
|---|---|---|
14 |
Assemble the full BT |
Write |
15 |
Write the launch file |
Bring up Nav2 (via |
16 |
End-to-end testing |
Launch Gazebo, then your mission launch file. Verify: all
zones visited, survivors detected, TF frames broadcast,
report service called, robot returns to base. Use
|
Phase 4: Polish (Days 12-14)
Step |
Task |
Details |
|---|---|---|
17 |
Test edge cases |
Survivor present at multiple zones, no survivor at others
(verify |
18 |
Code quality |
Add docstrings, type hints, inline comments. Run Ruff. |
19 |
Write README.md |
Contributions section + BT design explanation ( |
20 |
Package and submit |
Follow the Submission checklist. |
Suggested Work Division#
For a team of two, the work divides naturally into infrastructure and behavior tree roles. Both team members should understand the full system, but each focuses on their area.
Role |
Student A: Infrastructure |
Student B: Behavior Tree |
|---|---|---|
Phase 1 |
Create both package skeletons and the CMake interfaces
package. Write |
Write |
Phase 2 |
Write |
Write |
Phase 3 |
Write the launch file. Help with integration testing. |
Assemble the full BT in the entry point script. Integrate with Nav2. |
Phase 4 |
Docstrings and type hints on infrastructure code. Write README contributions section. |
Docstrings and type hints on BT code. Write README BT design section. Test edge cases. |
Key Patterns and Pitfalls#
Service Calls in BT Nodes#
For DetectSurvivor and NotifyBase, you need to call a
service from within update(). Do not use
rclpy.spin_until_future_complete() here – it raises
RuntimeError: Executor is already spinning.
Important
py_trees_ros.trees.BehaviourTree.tick_tock() schedules ticks on
a timer that runs inside the executor that is already spinning
tree.node (via rclpy.spin(tree.node) in your entry point).
rclpy.spin_until_future_complete() tries to enter that same
executor a second time, which is illegal. The classic
“block-until-done” service idiom from a standalone client therefore
does not work inside a BT leaf.
The fix is the canonical asynchronous-BT pattern: submit
call_async once, yield RUNNING until the future is done, then
process the response on the tick when it resolves. The future is
resolved by the same executor that drives ticks, so the BT just has
to hand control back until future.done() is True.
def initialise(self):
# Reset cached state for a fresh active period.
self._future = None
self._response = None
def update(self):
# First tick: submit the request, then yield.
if self._future is None:
if not self._client.service_is_ready():
# Don't block; retry on the next tick.
return py_trees.common.Status.RUNNING
request = MyService.Request()
# ... populate request ...
self._future = self._client.call_async(request)
return py_trees.common.Status.RUNNING
# Subsequent ticks: poll the future.
if not self._future.done():
return py_trees.common.Status.RUNNING
response = self._future.result()
self._future = None # ready for the next active period
if response is None:
return py_trees.common.Status.FAILURE
# ... process response, store any results on self ...
return py_trees.common.Status.SUCCESS
Why this works:
service_is_ready()is a non-blocking check (unlikewait_for_servicewith a timeout, which can stall the tick).call_asyncreturns immediately; the executor that is already spinningtree.nodewill deliver the response to the future whenever it arrives.Returning
RUNNINGhands control back to py_trees, which resumes ticking on the next timer fire. The future is checked again then.Resetting
self._future = Noneafter consuming the response ensures a follow-up active period (e.g., re-entering the same zone) submits a fresh request.
TF Broadcasting#
BroadcastSurvivorTF uses tf2_ros.StaticTransformBroadcaster
to publish a frame. The key fields in TransformStamped:
from tf2_ros import StaticTransformBroadcaster
from geometry_msgs.msg import TransformStamped
# In setup():
self._broadcaster = StaticTransformBroadcaster(kwargs['node'])
# In update():
t = TransformStamped()
t.header.stamp = self._node.get_clock().now().to_msg()
t.header.frame_id = "map" # parent frame
t.child_frame_id = "survivor_1" # new frame name
t.transform.translation.x = sx # survivor x
t.transform.translation.y = sy # survivor y
t.transform.rotation.w = 1.0 # identity rotation
self._broadcaster.sendTransform(t)
Verify with: ros2 run tf2_ros tf2_echo map survivor_1
Memory Flag Choices#
Root Selector (
memory=False): Reactive. The Selector re-evaluates the Patrol Sequence on every tick. Once Patrol fails (no zones remaining), it falls through toNavigateToBase.Patrol Sequence (
memory=True): Resuming. Once the robot reaches a zone and starts detection, the tree does not restart navigation on the next tick.HandleDetection Selector (
memory=False): The detection result is checked fresh each time.SurvivorFound Sequence (
memory=True): Resuming.NotifyBasecallsreport_survivorvia the async-poll pattern (call_async+future.done()) and returnsRUNNINGfor one or more ticks before succeeding. Withmemory=False, everyRUNNINGtick would re-tick the earlier siblings – and re-tickingBroadcastSurvivorTFcallszone_manager.next_survivor_id()again, allocating a freshsurvivor_NID and broadcasting a duplicate static TF frame.memory=Truemakes the Sequence resume from the running child without re-evaluating earlier siblings.
Reading Auto-Declared YAML Parameters#
The Parameter File uses a nested layout
(zones.<id>.{x, y, yaw}) that you cannot enumerate up front
because the per-zone keys are data, not code. The clean approach is
to have ROS auto-declare every key in the YAML by passing two flags
when constructing the parameter-reading node:
from rclpy.node import Node
param_node = Node(
"search_and_rescue_params",
automatically_declare_parameters_from_overrides=True,
allow_undeclared_parameters=True,
)
The first flag declares every parameter loaded from
mission_params.yaml (so zones.zone_a.x,
base_station.yaw, tick_rate_hz all become readable
without explicit declare_parameter calls). The second tolerates
get_parameter calls for keys that might not be present in
the YAML.
Warning
With automatically_declare_parameters_from_overrides=True,
calling declare_parameter("foo", default) for a name
already declared by the YAML raises:
rclpy.exceptions.ParameterAlreadyDeclaredException:
Parameter(s) already declared: ['foo']
This is the single most common cause of “my BT script crashes
silently before printing anything” in this assignment. Guard
every explicit ``declare_parameter`` call with
has_parameter so the explicit declaration acts as a fallback
default rather than a re-declaration:
if not param_node.has_parameter("tick_rate_hz"):
param_node.declare_parameter("tick_rate_hz", 2.0)
tick_rate_hz = param_node.get_parameter("tick_rate_hz").value
Apply this idiom anywhere you need to provide a default for a
parameter that the YAML might also supply (zone_order,
base_station.x/y/yaw, tick_rate_hz).
Reading the zone list back into the natural list[dict] shape
ZoneManager expects:
if not param_node.has_parameter("zone_order"):
param_node.declare_parameter("zone_order", [""])
zone_order = (
param_node.get_parameter("zone_order")
.get_parameter_value()
.string_array_value
)
zones = []
for zone_id in zone_order:
zones.append({
"id": zone_id,
"x": param_node.get_parameter(f"zones.{zone_id}.x").value,
"y": param_node.get_parameter(f"zones.{zone_id}.y").value,
"yaw": param_node.get_parameter(f"zones.{zone_id}.yaw").value,
})
Testing Tips#
Test incrementally. Do not try to assemble the full tree before testing individual nodes.
# Test service servers independently
ros2 run group<N>_final detect_survivor_server_exe &
ros2 service call /detect_survivor \
group<N>_final_interfaces/srv/DetectSurvivor \
"{zone_id: 'zone_a'}"
# Test a minimal BT with just NavigateToZone
# (requires Gazebo + Nav2 running)
# Verify TF frames after a full run
ros2 run tf2_ros tf2_echo map survivor_1
ros2 run tf2_ros tf2_echo map survivor_2
Launch order for full testing:
# Terminal 1: Gazebo
ros2 launch rosbot_gazebo final_project_world.launch.py
# Terminal 2: Mission (this single launch file brings up Nav2
# with your saved map under
# group<N>_final/maps/final_project_map.yaml, the two service
# servers, and the BT node)
ros2 launch group<N>_final search_and_rescue.launch.py
Tip
Killing hanging processes between iterations. Ctrl-C in
the launch terminal sometimes leaves Nav2 lifecycle nodes,
rviz2, or the BT process running in the background. They
then re-bind to action servers / TF / topics on the next
ros2 launch and you get bizarre symptoms (duplicate logs,
“node already exists”, AMCL refusing the new
/initialpose). One-liner to clean up:
pkill -9 -f "nav2|amcl|map_server|lifecycle|controller_server|planner_server|bt_navigator|behavior_server|smoother_server|route_server|opennav|collision_monitor|velocity_smoother|waypoint_follower|rviz2"
Verify the cleanup with ros2 node list – it should print
nothing (or only what Gazebo brings up). Then re-launch.