Reference Implementation: group0_final#

This page is an API reference for the reference implementation in ~/enpm605_ws/src/final_project/group0_final/. It is provided as an example of:

  • The level of docstring detail expected for the final project (Google-style, parsed by napoleon).

  • How to organize a ROS 2 Python package that combines a behavior tree (py_trees / py_trees_ros), Nav2 action clients, custom services, TF broadcasting, and YAML-driven parameters.

  • The recommended module layout that separates pure mission state (ZoneManager), behavior-tree leaves (bt_nodes/), service servers (service_servers/), and the entry-point script (scripts/).

Use this as a style and structure reference, not a copy-paste template. Your own package will share the same shape but should be written in your own words.

Note

The behavior tree assembled by the entry point is:

Selector "Mission" (memory=False)              <- root
|-- Sequence "Patrol" (memory=True)
|   |-- ZonesRemaining?                  (condition)
|   |-- NavigateToZone                   (action)
|   |-- DetectSurvivor                   (action)
|   |-- Selector "HandleDetection" (memory=False)
|   |   |-- Sequence "SurvivorFound" (memory=False)
|   |   |   |-- IsSurvivorDetected?      (condition)
|   |   |   |-- BroadcastSurvivorTF      (action)
|   |   |   |-- NotifyBase               (action)
|   |   |-- LogNoDetection               (action)
|   |-- AdvanceZone                      (action)
|-- NavigateToBase                       (action)

Mission State#

Mission-state container shared across behavior-tree nodes.

The ZoneManager is a plain Python class – not a Node and not a Behaviour – that owns the ordered list of search zones and the base-station pose. The single instance is injected into every BT node that needs zone information, which lets the tree share state without the py_trees blackboard.

class group0_final.zone_manager.ZoneManager(zones, base_station)#

Bases: object

Track the active search zone, base-station pose, and survivor IDs.

_zones#

Ordered list of zone dictionaries. Each dictionary must contain the keys id (str), x (float), y (float), and yaw (float, radians).

_base_station#

Dictionary with keys x, y, yaw for the return-to-base pose.

_index#

Index of the current zone within _zones. When _index >= len(_zones), no zones remain.

_survivor_count#

Number of survivors discovered so far. Used to generate unique TF child-frame and report IDs (survivor_1, survivor_2, …).

advance()#

Move the cursor to the next zone in the patrol list.

Calling advance when no zones remain is a no-op.

Return type:

None

base_pose()#

Return the base-station pose dict.

Return type:

Dict[str, float]

Returns:

A copy of the {x, y, yaw} base pose. Copying prevents BT nodes from accidentally mutating the shared dictionary.

current_index()#

Return the 0-based index of the current zone.

Return type:

int

Returns:

The index of the zone returned by current_zone. When has_remaining is False this equals total_zones.

current_zone()#

Return the dict of the zone currently being patrolled.

Return type:

Dict[str, float]

Returns:

The current zone’s {id, x, y, yaw} dictionary.

Raises:

IndexError – If the mission has already advanced past the last zone (callers should gate this with has_remaining first).

has_remaining()#

Report whether at least one unvisited zone is still queued.

Return type:

bool

Returns:

True while current_zone is safe to call; False once every zone has been advanced past.

next_survivor_id()#

Allocate and return a unique survivor identifier.

Increments an internal counter so successive calls return "survivor_1", "survivor_2", … – one ID per discovery.

Return type:

str

Returns:

The newly allocated survivor ID string.

total_zones()#

Return the total number of zones in the patrol list.

Return type:

int

Returns:

Length of the underlying zone list.

Behavior Tree Leaves#

Actions#

Action behaviours for the search-and-rescue tree.

This module collects every action leaf node used by the mission tree:

  • NavigateToZone / NavigateToBase – async NavigateToPose action clients that are non-blocking inside update().

  • DetectSurvivor – synchronous detect_survivor service call that caches the result for downstream nodes.

  • BroadcastSurvivorTF – publishes a static map -> survivor_N TF frame using tf2_ros.StaticTransformBroadcaster.

  • NotifyBase – synchronous report_survivor service call.

  • AdvanceZone / LogNoDetection – bookkeeping helpers.

class group0_final.bt_nodes.actions.AdvanceZone(name, zone_manager)#

Bases: Behaviour

Move the ZoneManager cursor to the next zone.

_zone_manager#

Shared ZoneManager instance.

update()#

Advance the zone cursor unconditionally.

Return type:

Status

Returns:

Always SUCCESS.

class group0_final.bt_nodes.actions.BroadcastSurvivorTF(name, detect_node, zone_manager)#

Bases: Behaviour

Publish a static map -> survivor_N TF frame.

The static broadcaster latches the transform on /tf_static so the frame stays valid for the rest of the mission even after the BT moves on.

_detect_node#

Reference to DetectSurvivor (source of the survivor coordinates).

_zone_manager#

Shared ZoneManager (allocates survivor IDs).

_node#

Owning ROS 2 node.

_broadcaster#

StaticTransformBroadcaster instance.

last_survivor_id()#

Return the most recently allocated survivor frame ID.

Return type:

str

Returns:

The string used as child_frame_id on the last broadcast (e.g. "survivor_1"). Empty string if the node has not yet published any frame.

setup(**kwargs)#

Create the static TF broadcaster.

Return type:

None

Parameters:

**kwargs – Must contain 'node'.

update()#

Build and publish the map -> survivor_N static transform.

Return type:

Status

Returns:

SUCCESS after sendTransform is called; FAILURE if no survivor pose is available (defensive – the IsSurvivorDetected condition should make this unreachable in practice).

class group0_final.bt_nodes.actions.DetectSurvivor(name, zone_manager)#

Bases: Behaviour

Call detect_survivor for the current zone and cache the result.

Returns SUCCESS once the service call completes – whether or not a survivor was found. The cached result is exposed via was_found and survivor_pose so sibling nodes can branch on it without re-issuing the service call.

_zone_manager#

Shared ZoneManager.

_node#

Owning ROS 2 node.

_client#

Service client for detect_survivor.

_was_found#

Cached found flag from the last response.

_survivor_xy#

Cached survivor coordinates, or None when no survivor has been observed yet.

SERVICE_NAME = 'detect_survivor'#
SERVICE_TIMEOUT_SEC = 5.0#
initialise()#

Reset the cached detection result before each new call.

Return type:

None

set_survivor_id(survivor_id)#

Record the survivor ID assigned to the current detection.

Called by BroadcastSurvivorTF after it allocates a fresh survivor_N frame name. Storing the ID on the detection node lets NotifyBase reuse the same string without holding a direct reference to the broadcaster.

Return type:

None

Parameters:

survivor_id – Newly allocated survivor identifier (e.g. "survivor_1").

setup(**kwargs)#

Create the detect_survivor service client.

Return type:

None

Parameters:

**kwargs – Must contain 'node'.

survivor_id()#

Return the survivor ID assigned to the latest detection.

Return type:

str

Returns:

The string set by BroadcastSurvivorTF, or an empty string when no broadcast has happened for the current zone.

survivor_pose()#

Return the cached survivor coordinates, if any.

Return type:

Optional[Tuple[float, float]]

Returns:

The (x, y) survivor pose in metres in the map frame, or None if no survivor was found by the last call.

update()#

Submit the detection request once and poll the future.

py_trees_ros drives tick_tock from a timer that runs inside the executor that is already spinning tree.node. Calling rclpy.spin_until_future_complete from here would re-enter that same executor and raise Executor is already spinning. Instead, we use the canonical async-BT pattern: submit call_async once, return RUNNING until future.done(), then process the response in a single tick.

Return type:

Status

Returns:

RUNNING while the service is unavailable or the response has not yet arrived; SUCCESS once the call completes (regardless of whether a survivor was found); FAILURE if the service responds with None.

was_found()#

Return the cached result of the last detection call.

Return type:

bool

Returns:

True if the most recent call returned found=True.

class group0_final.bt_nodes.actions.LogNoDetection(name, zone_manager)#

Bases: Behaviour

Log that no survivor was found at the current zone.

_zone_manager#

Shared ZoneManager (used for the log message).

_node#

Owning ROS 2 node.

setup(**kwargs)#

Cache the owning ROS 2 node.

Return type:

None

Parameters:

**kwargs – Must contain 'node'.

update()#

Log a single info-level message and return SUCCESS.

Return type:

Status

Returns:

Always SUCCESS.

class group0_final.bt_nodes.actions.NavigateToBase(name, zone_manager)#

Bases: _NavigateToPoseAction

Send the robot to the base-station pose via Nav2.

_zone_manager#

Shared ZoneManager (for base_pose).

update()#

Tick the navigation, then announce completion exactly once.

The base class handles the Nav2 NavigateToPose lifecycle. We add a single one-shot Mission complete. log when the goal succeeds so the operator can tell the mission is done. OneShot in the tree assembly prevents this branch from being re-ticked, but the flag is kept as a defensive guard in case the decorator is removed later.

Return type:

Status

Returns:

The status returned by the base NavigateToPose poll.

class group0_final.bt_nodes.actions.NavigateToZone(name, zone_manager)#

Bases: _NavigateToPoseAction

Send the robot to zone_manager.current_zone via Nav2.

_zone_manager#

Shared ZoneManager; queried each initialise.

class group0_final.bt_nodes.actions.NotifyBase(name, detect_node)#

Bases: Behaviour

Call report_survivor for the survivor that was just found.

Sends the survivor location as a geometry_msgs/PointStamped in the map frame so the receiving server (and any downstream consumer) can interpret the coordinates without a TF lookup.

_detect_node#

DetectSurvivor action node (source of both the survivor coordinates and the survivor ID, which the broadcaster stashed back onto it).

_node#

Owning ROS 2 node.

_client#

Service client for report_survivor.

SERVICE_NAME = 'report_survivor'#
SERVICE_TIMEOUT_SEC = 5.0#
initialise()#

Reset the future so the next active period submits afresh.

Return type:

None

setup(**kwargs)#

Create the report_survivor service client.

Return type:

None

Parameters:

**kwargs – Must contain 'node'.

update()#

Submit the report once and poll the future across ticks.

Same async pattern as DetectSurvivor.update: blocking on rclpy.spin_until_future_complete would re-enter the executor that is already spinning tree.node, so we submit the request once and yield RUNNING until future.done().

Return type:

Status

Returns:

RUNNING while the service is unavailable or the response has not yet arrived; SUCCESS once the server sets acknowledged=True; FAILURE if the response is None or acknowledged=False, or if the survivor pose / ID has not been set by the upstream BT nodes.

class group0_final.bt_nodes.actions._NavigateToPoseAction(name)#

Bases: Behaviour

Common base class wrapping the asynchronous NavigateToPose client.

Subclasses provide the target pose for each tick by implementing _target_pose. The base class handles goal submission, status polling, and cancellation in a way that keeps update() non-blocking (a hard requirement for a BT leaf).

Lifecycle:
  • setup: create the ActionClient.

  • initialise: send a goal asynchronously, reset state flags.

  • update: poll for goal acceptance and result; return RUNNING while in flight, SUCCESS on Nav2 success, FAILURE on rejection / failure / cancellation.

  • terminate: cancel any in-flight goal.

_target_label#

Short label used in log messages (e.g. zone id).

_node#

The owning rclpy node.

_client#

ActionClient for navigate_to_pose.

_goal_handle#

Handle returned by Nav2 once a goal is accepted.

_send_future#

Future for send_goal_async.

_result_future#

Future for get_result_async.

_done#

True once a terminal status has been observed.

_success#

Final outcome (only valid when _done is True).

ACTION_NAME = 'navigate_to_pose'#
initialise()#

Submit a fresh navigation goal at the start of an active period.

Resets the per-goal state and calls send_goal_async. The future stored in _send_future is polled by update.

Return type:

None

setup(**kwargs)#

Acquire the ROS 2 node and create the NavigateToPose client.

Return type:

None

Parameters:

**kwargs – Must contain 'node' (the owning rclpy.node.Node).

Raises:

KeyError – If 'node' is missing.

terminate(new_status)#

Cancel an in-flight Nav2 goal when the BT preempts the action.

Return type:

None

Parameters:

new_status – The status the BT is transitioning into. Used to skip cancellation when the goal already terminated successfully.

update()#

Poll the asynchronous goal/result futures.

Return type:

Status

Returns:

RUNNING while the goal is in flight, SUCCESS once Nav2 reports success, FAILURE on rejection or any non-success terminal status.

group0_final.bt_nodes.actions._make_pose_stamped(node, x, y, yaw, frame_id='map')#

Build a PoseStamped in the requested frame.

Return type:

PoseStamped

Parameters:
  • node – ROS 2 node used to read the current clock.

  • x – Target x in metres.

  • y – Target y in metres.

  • yaw – Target heading in radians.

  • frame_id – Frame the pose is expressed in (default "map").

Returns:

A populated geometry_msgs/PoseStamped.

group0_final.bt_nodes.actions._yaw_to_quaternion(yaw)#

Convert a yaw angle (radians) into a unit quaternion (z-axis rotation).

Return type:

Quaternion

Parameters:

yaw – Heading angle in radians.

Returns:

geometry_msgs/Quaternion representing the yaw rotation about the z-axis. Roll and pitch are zero.

Conditions#

Condition behaviours for the search-and-rescue tree.

Condition nodes return only SUCCESS or FAILURE (never RUNNING). They give the parent composite an instantaneous yes/no answer used to gate downstream actions.

class group0_final.bt_nodes.conditions.IsSurvivorDetected(name, detect_node)#

Bases: Behaviour

Reports the most recent DetectSurvivor result.

Reads the result through a direct reference to the DetectSurvivor action node (constructor injection), avoiding the py_trees blackboard.

_detect_node#

Reference to the DetectSurvivor BT action node whose was_found method is queried each tick.

update()#

Return the cached result from the last detection call.

Return type:

Status

Returns:

SUCCESS if the previous call reported a survivor was found at the current zone; FAILURE otherwise.

class group0_final.bt_nodes.conditions.ZonesRemaining(name, zone_manager)#

Bases: Behaviour

Succeeds while the patrol still has unvisited zones.

Wraps ZoneManager.has_remaining: returns SUCCESS while the patrol list still has work, FAILURE once every zone has been visited. The parent Patrol Sequence uses this answer to decide whether to keep patrolling or fall through to the return-to-base branch.

_zone_manager#

Shared ZoneManager instance whose state is queried on every tick.

update()#

Check whether the patrol list still has zones to visit.

Return type:

Status

Returns:

SUCCESS if at least one unvisited zone remains; FAILURE once the patrol has been exhausted.

Service Servers#

Simulated survivor-detection service server.

This module implements a stand-in for a real perception system. Instead of running computer vision, it answers DetectSurvivor requests with a hardcoded lookup that maps zone IDs to known survivor coordinates in the map frame.

The server does not check whether the robot is physically near the zone – the behavior tree is responsible for sequencing detection only after navigation succeeds.

class group0_final.service_servers.detect_survivor_server.DetectSurvivorServer#

Bases: Node

ROS 2 node that advertises the simulated detect_survivor service.

_service#

The created ROS 2 service (detect_survivor).

_survivors#

The hardcoded zone -> (x, y) lookup table.

group0_final.service_servers.detect_survivor_server.main(args=None)#

ROS 2 entry point for the detection service server.

Return type:

None

Parameters:

args – Forwarded to rclpy.init (defaults to sys.argv).

Simulated command-center service server.

Receives ReportSurvivor requests from the behavior tree and pretends to forward them to a remote command center. In reality the report is just logged to the ROS 2 console. The server warns (but still acknowledges) if a request arrives in a frame other than "map", since the BT contract sends survivor positions in the global fixed frame.

class group0_final.service_servers.report_survivor_server.ReportSurvivorServer#

Bases: Node

ROS 2 node that advertises the simulated report_survivor service.

_service#

The created ROS 2 service (report_survivor).

group0_final.service_servers.report_survivor_server.main(args=None)#

ROS 2 entry point for the report service server.

Return type:

None

Parameters:

args – Forwarded to rclpy.init (defaults to sys.argv).

Entry Point#

Entry point: assemble and run the search-and-rescue behaviour tree.

The BT structure (composites + leaves):

Selector "Mission" (memory=False)              <- root
|-- Sequence "Patrol" (memory=True)
|   |-- ZonesRemaining?                  (condition)
|   |-- NavigateToZone                   (action)
|   |-- DetectSurvivor                   (action)
|   |-- Selector "HandleDetection" (memory=False)
|   |   |-- Sequence "SurvivorFound" (memory=False)
|   |   |   |-- IsSurvivorDetected?      (condition)
|   |   |   |-- BroadcastSurvivorTF      (action)
|   |   |   |-- NotifyBase               (action)
|   |   |-- LogNoDetection               (action)
|   |-- AdvanceZone                      (action)
|-- NavigateToBase                       (action)

The Patrol Sequence uses memory=True so once NavigateToZone returns RUNNING the tree does not restart navigation on the next tick. The other composites are reactive (memory=False).

group0_final.scripts.main_search_and_rescue.main(args=None)#

ROS 2 entry point: load parameters, build the tree, spin.

Return type:

None

Parameters:

args – Forwarded to rclpy.init (defaults to sys.argv).