Skip to content

Commit

Permalink
feat(hardware_control): Handle Q moves in the hardware controller mov…
Browse files Browse the repository at this point in the history
…e function (#16730)
  • Loading branch information
Laura-Danielle authored Nov 8, 2024
1 parent 53bcf40 commit 159ea83
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async def get_jaw_state(self) -> GripperJawState:
...

async def tip_action(
self, origin: Dict[Axis, float], targets: List[Tuple[Dict[Axis, float], float]]
self, origin: float, targets: List[Tuple[float, float]]
) -> None:
...

Expand Down
202 changes: 148 additions & 54 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,30 +597,104 @@ async def update_encoder_position(self) -> OT3AxisMap[float]:
return axis_convert(self._encoder_position, 0.0)

def _handle_motor_status_response(
self,
response: NodeMap[MotorPositionStatus],
self, response: NodeMap[MotorPositionStatus], handle_gear_move: bool = False
) -> None:
for axis, pos in response.items():
self._position.update({axis: pos.motor_position})
self._encoder_position.update({axis: pos.encoder_position})
# TODO (FPS 6-01-2023): Remove this once the Feature Flag to ignore stall detection is removed.
# This check will latch the motor status for an axis at "true" if it was ever set to true.
# To account for the case where a motor axis has its power reset, we also depend on the
# "encoder_ok" flag staying set (it will only be False if the motor axis has not been
# homed since a power cycle)
motor_ok_latch = (
(not self._feature_flags.stall_detection_enabled)
and ((axis in self._motor_status) and self._motor_status[axis].motor_ok)
and self._motor_status[axis].encoder_ok
)
self._motor_status.update(
{
axis: MotorStatus(
motor_ok=(pos.motor_ok or motor_ok_latch),
encoder_ok=pos.encoder_ok,
if handle_gear_move and axis == NodeId.pipette_left:
self._gear_motor_position = {axis: pos.motor_position}
else:
self._position.update({axis: pos.motor_position})
self._encoder_position.update({axis: pos.encoder_position})
# TODO (FPS 6-01-2023): Remove this once the Feature Flag to ignore stall detection is removed.
# This check will latch the motor status for an axis at "true" if it was ever set to true.
# To account for the case where a motor axis has its power reset, we also depend on the
# "encoder_ok" flag staying set (it will only be False if the motor axis has not been
# homed since a power cycle)
motor_ok_latch = (
(not self._feature_flags.stall_detection_enabled)
and (
(axis in self._motor_status)
and self._motor_status[axis].motor_ok
)
}
and self._motor_status[axis].encoder_ok
)
self._motor_status.update(
{
axis: MotorStatus(
motor_ok=(pos.motor_ok or motor_ok_latch),
encoder_ok=pos.encoder_ok,
)
}
)

def _build_move_node_axis_runner(
self,
origin: Dict[Axis, float],
target: Dict[Axis, float],
speed: float,
stop_condition: HWStopCondition,
nodes_in_moves_only: bool,
) -> Tuple[Optional[MoveGroupRunner], bool]:
if not target:
return None, False
move_target = MoveTarget.build(position=target, max_speed=speed)
try:
_, movelist = self._move_manager.plan_motion(
origin=origin, target_list=[move_target]
)
except ZeroLengthMoveError as zme:
log.debug(f"Not moving because move was zero length {str(zme)}")
return None, False
moves = movelist[0]
log.debug(
f"move: machine coordinates {target} from origin: machine coordinates {origin} at speed: {speed} requires {moves}"
)

ordered_nodes = self._motor_nodes()
if nodes_in_moves_only:
moving_axes = {
axis_to_node(ax) for move in moves for ax in move.unit_vector.keys()
}
ordered_nodes = ordered_nodes.intersection(moving_axes)

move_group, _ = create_move_group(
origin, moves, ordered_nodes, MoveStopCondition[stop_condition.name]
)
return (
MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True
if not self._feature_flags.stall_detection_enabled
else False,
),
False,
)

def _build_move_gear_axis_runner(
self,
possible_q_axis_origin: Optional[float],
possible_q_axis_target: Optional[float],
speed: float,
nodes_in_moves_only: bool,
) -> Tuple[Optional[MoveGroupRunner], bool]:
if possible_q_axis_origin is None or possible_q_axis_target is None:
return None, True
tip_motor_move_group = self._build_tip_action_group(
possible_q_axis_origin, [(possible_q_axis_target, speed)]
)
if nodes_in_moves_only:
ordered_nodes = self._motor_nodes()

ordered_nodes.intersection({axis_to_node(Axis.Q)})
return (
MoveGroupRunner(
move_groups=[tip_motor_move_group],
ignore_stalls=True
if not self._feature_flags.stall_detection_enabled
else False,
),
True,
)

@requires_update
@requires_estop
Expand Down Expand Up @@ -648,40 +722,51 @@ async def move(
Returns:
None
"""
move_target = MoveTarget.build(position=target, max_speed=speed)
try:
_, movelist = self._move_manager.plan_motion(
origin=origin, target_list=[move_target]
)
except ZeroLengthMoveError as zme:
log.debug(f"Not moving because move was zero length {str(zme)}")
return
moves = movelist[0]
log.info(f"move: machine {target} from {origin} requires {moves}")
possible_q_axis_origin = origin.pop(Axis.Q, None)
possible_q_axis_target = target.pop(Axis.Q, None)

ordered_nodes = self._motor_nodes()
if nodes_in_moves_only:
moving_axes = {
axis_to_node(ax) for move in moves for ax in move.unit_vector.keys()
}
ordered_nodes = ordered_nodes.intersection(moving_axes)

group = create_move_group(
origin, moves, ordered_nodes, MoveStopCondition[stop_condition.name]
maybe_runners = (
self._build_move_node_axis_runner(
origin, target, speed, stop_condition, nodes_in_moves_only
),
self._build_move_gear_axis_runner(
possible_q_axis_origin,
possible_q_axis_target,
speed,
nodes_in_moves_only,
),
)
move_group, _ = group
runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True
if not self._feature_flags.stall_detection_enabled
else False,
log.debug(f"The move groups are {maybe_runners}.")

gather_moving_nodes = set()
all_moving_nodes = set()
for runner, _ in maybe_runners:
if runner:
for n in runner.all_nodes():
gather_moving_nodes.add(n)
for n in runner.all_moving_nodes():
all_moving_nodes.add(n)

pipettes_moving = moving_pipettes_in_move_group(
gather_moving_nodes, all_moving_nodes
)

pipettes_moving = moving_pipettes_in_move_group(move_group)
async def _runner_coroutine(
runner: MoveGroupRunner, is_gear_move: bool
) -> Tuple[Dict[NodeId, MotorPositionStatus], bool]:
positions = await runner.run(can_messenger=self._messenger)
return positions, is_gear_move

coros = [
_runner_coroutine(runner, is_gear_move)
for runner, is_gear_move in maybe_runners
if runner
]
async with self._monitor_overpressure(pipettes_moving):
positions = await runner.run(can_messenger=self._messenger)
self._handle_motor_status_response(positions)
all_positions = await asyncio.gather(*coros)

for positions, handle_gear_move in all_positions:
self._handle_motor_status_response(positions, handle_gear_move)

def _get_axis_home_distance(self, axis: Axis) -> float:
if self.check_motor_status([axis]):
Expand Down Expand Up @@ -796,6 +881,7 @@ async def home(
Axis.to_kind(Axis.Q)
],
)

for position in positions:
self._handle_motor_status_response(position)
return axis_convert(self._position, 0.0)
Expand Down Expand Up @@ -840,17 +926,25 @@ async def home_tip_motors(
self._gear_motor_position = {}
raise e

async def tip_action(
self, origin: Dict[Axis, float], targets: List[Tuple[Dict[Axis, float], float]]
) -> None:
def _build_tip_action_group(
self, origin: float, targets: List[Tuple[float, float]]
) -> MoveGroup:

move_targets = [
MoveTarget.build(target_pos, speed) for target_pos, speed in targets
MoveTarget.build({Axis.Q: target_pos}, speed)
for target_pos, speed in targets
]
_, moves = self._move_manager.plan_motion(
origin=origin, target_list=move_targets
origin={Axis.Q: origin}, target_list=move_targets
)
move_group = create_tip_action_group(moves[0], [NodeId.pipette_left], "clamp")

return create_tip_action_group(moves[0], [NodeId.pipette_left], "clamp")

async def tip_action(
self, origin: float, targets: List[Tuple[float, float]]
) -> None:

move_group = self._build_tip_action_group(origin, targets)
runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True
Expand Down
6 changes: 4 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,12 @@ async def get_jaw_state(self) -> GripperJawState:
return self._sim_jaw_state

async def tip_action(
self, origin: Dict[Axis, float], targets: List[Tuple[Dict[Axis, float], float]]
self, origin: float, targets: List[Tuple[float, float]]
) -> None:
self._gear_motor_position.update(
coalesce_move_segments(origin, [target[0] for target in targets])
coalesce_move_segments(
{Axis.Q: origin}, [{Axis.Q: target[0]} for target in targets]
)
)
await asyncio.sleep(0)

Expand Down
16 changes: 3 additions & 13 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,10 @@ def create_gripper_jaw_hold_group(encoder_position_um: int) -> MoveGroup:
return move_group


def moving_pipettes_in_move_group(group: MoveGroup) -> List[NodeId]:
def moving_pipettes_in_move_group(
all_nodes: Set[NodeId], moving_nodes: Set[NodeId]
) -> List[NodeId]:
"""Utility function to get which pipette nodes are moving either in z or their plunger."""
all_nodes = [node for step in group for node, _ in step.items()]
moving_nodes = moving_axes_in_move_group(group)
pipettes_moving: List[NodeId] = [
k for k in moving_nodes if k in [NodeId.pipette_left, NodeId.pipette_right]
]
Expand All @@ -512,16 +512,6 @@ def moving_pipettes_in_move_group(group: MoveGroup) -> List[NodeId]:
return pipettes_moving


def moving_axes_in_move_group(group: MoveGroup) -> Set[NodeId]:
"""Utility function to get only the moving nodes in a move group."""
ret: Set[NodeId] = set()
for step in group:
for node, node_step in step.items():
if node_step.is_moving_step():
ret.add(node)
return ret


AxisMapPayload = TypeVar("AxisMapPayload")


Expand Down
16 changes: 10 additions & 6 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,8 @@ async def home_gear_motors(self) -> None:
):
# move toward home until a safe distance
await self._backend.tip_action(
origin={Axis.Q: current_pos_float},
targets=[({Axis.Q: self._config.safe_home_distance}, 400)],
origin=current_pos_float,
targets=[(self._config.safe_home_distance, 400)],
)

# update current position
Expand Down Expand Up @@ -1438,6 +1438,10 @@ async def _move(
check_motion_bounds(to_check, target_position, bounds, check_bounds)
self._log.info(f"Move: deck {target_position} becomes machine {machine_pos}")
origin = await self._backend.update_position()

if self._gantry_load == GantryLoad.HIGH_THROUGHPUT:
origin[Axis.Q] = self._backend.gear_motor_position or 0.0

async with contextlib.AsyncExitStack() as stack:
if acquire_lock:
await stack.enter_async_context(self._motion_lock)
Expand Down Expand Up @@ -2155,8 +2159,8 @@ async def _high_throughput_check_tip(self) -> AsyncIterator[None]:
# only move tip motors if they are not already below the sensor
if tip_motor_pos_float < tip_presence_check_target:
await self._backend.tip_action(
origin={Axis.Q: tip_motor_pos_float},
targets=[({Axis.Q: tip_presence_check_target}, 400)],
origin=tip_motor_pos_float,
targets=[(tip_presence_check_target, 400)],
)
try:
yield
Expand Down Expand Up @@ -2227,11 +2231,11 @@ async def _tip_motor_action(
gear_origin_float = self._backend.gear_motor_position or 0.0

move_targets = [
({Axis.Q: move_segment.distance}, move_segment.speed or 400)
(move_segment.distance, move_segment.speed or 400)
for move_segment in pipette_spec
]
await self._backend.tip_action(
origin={Axis.Q: gear_origin_float}, targets=move_targets
origin=gear_origin_float, targets=move_targets
)
await self.home_gear_motors()

Expand Down
Loading

0 comments on commit 159ea83

Please sign in to comment.