Skip to content

Repackage Pipes

SUMMARY

Pick loose pipes from a pile and repackage them into bundles or containers

Code

INFO

The following code was generated by the Tzara agent.

python
# Pipeline: Pick aluminum cylindrical parts from an angled tray and drop them into a destination box.
# Hardware: UR10E robot at 192.168.2.2 + OnRobot RG2 gripper at 192.168.1.1.
# Strategy:
#   - Define all tunable constants at the top.
#   - Tray is tilted ~23 deg around base X. Compute pick poses by stepping in the tray-local frame
#     (tray-local X and Y), then transforming into base frame using the tray rotation.
#   - For each pick: pre-pick approach along tray normal, move_until_contact to find part,
#     close gripper, retreat, move to drop cell in destination box, open gripper, retreat.
#   - Log every step.

import numpy as np
from loguru import logger

from telekinesis.synapse import utils
from telekinesis.synapse.robots.manipulators import universal_robots
from telekinesis.synapse.tools.parallel_grippers import onrobot

# =====================================================================================
# TUNABLE CONSTANTS
# =====================================================================================

# --- Robot / gripper connection ---
ROBOT_IP = "192.168.2.2"
GRIPPER_IP = "192.168.1.1"
GRIPPER_PROTOCOL = "MODBUS_TCP"

# --- TCP offset (23 cm along tool flange Z) ---
TCP_OFFSET = [0.0, 0.0, 0.23, 0.0, 0.0, 0.0]  # [x, y, z, rx, ry, rz] (m, deg)

# --- Tray geometry ---
# Starting pick pose (top-left of the tray grid) in base frame, Euler degrees.
# The -23 deg Y rotation in this pose encodes the tray tilt orientation that
# the TCP must keep aligned with the tray surface for every pick.
TRAY_START_POSE = [-0.3869, 1.3208, 0.17429, -180.0, -23.0, 90.0]

# Tray is tilted around base X-axis by this angle (deg).
TRAY_TILT_DEG_X = 23.0

# Grid layout on the tray (rows step along tray-local Y, cols along tray-local X).
TRAY_ROWS = 2
TRAY_COLS = 3
TRAY_PITCH_X = 0.16  # spacing along tray-local X [m]
TRAY_PITCH_Y = -0.16  # spacing along tray-local Y [m]

# Approach / retreat along tray-local +Z (i.e. tray normal, away from surface).
TRAY_APPROACH_DIST = 0.08   # pre-pick standoff above the part [m]
TRAY_CONTACT_TRAVEL = 0.05  # max travel of move_until_contact past pre-pick [m]
TRAY_RETREAT_DIST = 0.03   # retreat distance after grasp [m]

# move_until_contact parameters: speed in TCP frame (linear m/s, angular deg/s).
# Move down along tray-local -Z. We push 'cartesian_speed' as a base-frame TCP velocity
# computed from the tray normal at runtime.
CONTACT_SPEED_LINEAR = 0.02  # m/s
CONTACT_ACC = 0.5            # m/s^2

# --- Motion speeds ---
MOVE_SPEED = 0.5            # TCP linear speed [m/s] for moveL
MOVE_ACC = 0.5              # TCP linear acc [m/s^2]

# --- Destination box (flat, not tilted) ---
DROP_START_POSE = [0.01, 0.74661, 0.07552, 180, 0, 90]  # top-left of drop grid (base frame, deg)
DROP_ROWS = 1
DROP_COLS = 6
DROP_PITCH_X = 0.062         # spacing along base X [m]
DROP_PITCH_Y = 0.062         # spacing along base Y [m]
DROP_APPROACH_DIST = 0.20   # pre-drop standoff above box cell [m]
DROP_RETREAT_DIST = 0.20    # retreat distance after release [m]

# --- Gripper ---
GRIPPER_GRASP_FORCE = 40.0  # N (RG2 max ~40 N)

# --- Home pose to return to at the end ---
HOME_JOINTS_DEG = [0.0, -90.0, -90.0, -90.0, 90.0, 0.0]
HOME_JOINT_SPEED = 30.0     # deg/s
HOME_JOINT_ACC = 60.0       # deg/s^2


# =====================================================================================
# HELPERS
# =====================================================================================

def tray_local_to_base(local_xyz: np.ndarray) -> np.ndarray:
    """
    Transform a 3D offset expressed in tray-local frame into a 3D offset in the base frame.
    The tray is rotated about the base X axis by TRAY_TILT_DEG_X.
    """
    a = np.deg2rad(TRAY_TILT_DEG_X)
    Rx = np.array([
        [1.0, 0.0, 0.0],
        [0.0, np.cos(a), -np.sin(a)],
        [0.0, np.sin(a), np.cos(a)],
    ])
    return Rx @ np.asarray(local_xyz, dtype=float)


def compute_tray_pick_pose(row: int, col: int) -> list[float]:
    """
    Compute the base-frame pick pose for tray cell (row, col).
    Orientation is kept identical to TRAY_START_POSE to remain aligned with tray surface.
    Translation is start_xyz + R_x(tray_tilt) * [col*pitch_x, row*pitch_y, 0].
    """
    start = np.array(TRAY_START_POSE, dtype=float)
    local_offset = np.array([col * TRAY_PITCH_X, row * TRAY_PITCH_Y, 0.0])
    base_offset = tray_local_to_base(local_offset)
    pose = start.copy()
    pose[0:3] = start[0:3] + base_offset
    return pose.tolist()


def offset_along_tray_normal(pose: list[float], distance: float) -> list[float]:
    """
    Offset a pose along the tray-local +Z axis (tray normal) by `distance` meters,
    expressed in the base frame.
    """
    base_offset = tray_local_to_base(np.array([0.0, 0.0, distance]))
    out = list(pose)
    out[0] += float(base_offset[0])
    out[1] += float(base_offset[1])
    out[2] += float(base_offset[2])
    return out


def compute_drop_pose(row: int, col: int) -> list[float]:
    """
    Compute base-frame drop pose for destination cell (row, col). Box is flat.
    """
    pose = list(DROP_START_POSE)
    pose[0] = DROP_START_POSE[0] + col * DROP_PITCH_X
    pose[1] = DROP_START_POSE[1] + row * DROP_PITCH_Y
    return pose


def offset_along_base_z(pose: list[float], distance: float) -> list[float]:
    """Offset a flat-box pose straight up along base +Z by `distance` meters."""
    out = list(pose)
    out[2] += float(distance)
    return out


def contact_speed_along_tray_normal_down() -> list[float]:
    """
    TCP velocity vector for move_until_contact, expressed in base frame.
    Pointing along tray-local -Z (into the tray surface). Angular components zero.
    """
    base_vel = tray_local_to_base(np.array([0.0, 0.0, -CONTACT_SPEED_LINEAR]))
    return [float(base_vel[0]), float(base_vel[1]), float(base_vel[2]), 0.0, 0.0, 0.0]


def contact_direction_along_tray_normal_down() -> list[float]:
    """
    Direction vector (unit, in base frame) used by move_until_contact to detect contacts.
    Same direction as the contact velocity but unit length; angular part zeroed.
    """
    base_dir = tray_local_to_base(np.array([0.0, 0.0, -1.0]))
    return [float(base_dir[0]), float(base_dir[1]), float(base_dir[2]), 0.0, 0.0, 0.0]


# =====================================================================================
# MAIN PIPELINE
# =====================================================================================

def main() -> None:
    robot = universal_robots.UniversalRobotsUR10E()
    gripper = onrobot.OnRobotRG2()

    robot_connected = False
    gripper_connected = False

    try:
        # ---- Step 1: Connect to robot ----
        logger.info(f"Connecting to UR10E at {ROBOT_IP} ...")
        robot.connect(ip=ROBOT_IP)
        robot_connected = True
        logger.info("UR10E connected.")

        # Apply TCP offset (23 cm along flange Z)
        logger.info(f"Setting TCP offset {TCP_OFFSET} on the controller.")
        robot.set_tcp(TCP_OFFSET)

        # ---- Step 2: Connect to gripper ----
        logger.info(f"Connecting to OnRobot RG2 at {GRIPPER_IP} (protocol={GRIPPER_PROTOCOL}) ...")
        gripper.connect(ip=GRIPPER_IP, protocol=GRIPPER_PROTOCOL)
        gripper_connected = True
        logger.info("OnRobot RG2 connected.")

        # ---- Step 3: Initialize gripper (open) ----
        logger.info("Opening gripper to initialize.")
        status = gripper.open(force=GRIPPER_GRASP_FORCE, asynchronous=False)
        logger.info(f"Gripper open status: {status}")

        # Pre-compute the contact velocity / direction used at every pick.
        contact_vel = contact_speed_along_tray_normal_down()
        contact_dir = contact_direction_along_tray_normal_down()
        logger.info(f"Tray-normal contact velocity (base frame): {contact_vel}")
        logger.info(f"Tray-normal contact direction (base frame): {contact_dir}")

        # ---- Step 4: Iterate over tray grid ----
        total_cells = TRAY_ROWS * TRAY_COLS
        max_drop_cells = DROP_ROWS * DROP_COLS
        n_to_pick = min(total_cells, max_drop_cells)
        logger.info(
            f"Tray grid: {TRAY_ROWS}x{TRAY_COLS} ({total_cells} cells). "
            f"Drop grid: {DROP_ROWS}x{DROP_COLS} ({max_drop_cells} cells). "
            f"Will process {n_to_pick} parts."
        )

        idx = 0
        for r in range(TRAY_ROWS):
            for c in range(TRAY_COLS):
                if idx >= n_to_pick:
                    break

                logger.info(f"--- Pick #{idx + 1}/{n_to_pick} :: tray cell (row={r}, col={c}) ---")

                # 4a: Pick pose at the tray surface (orientation aligned with tray).
                pick_pose = compute_tray_pick_pose(r, c)
                logger.info(f"Computed pick pose (base frame, deg): {pick_pose}")

                # 4b: Pre-pick approach above the tray surface along tray normal.
                pre_pick_pose = offset_along_tray_normal(pick_pose, TRAY_APPROACH_DIST)
                logger.info(f"Moving to pre-pick pose: {pre_pick_pose}")
                robot.set_cartesian_pose(
                    cartesian_pose=pre_pick_pose,
                    speed=MOVE_SPEED,
                    acceleration=MOVE_ACC,
                    asynchronous=False,
                )

                # 4c: move to drop pose
                robot.set_cartesian_pose(
                    cartesian_pose=pick_pose,
                    speed=MOVE_SPEED,
                    acceleration=MOVE_ACC,
                    asynchronous=False,
                )

                # 4d: Close gripper to grasp.
                logger.info("Closing gripper to grasp the part.")
                grasp_status = gripper.close(force=GRIPPER_GRASP_FORCE, asynchronous=False)
                logger.info(f"Gripper close status: {grasp_status}")

                # 4e: Retreat along tray normal back to (above) pre-pick.
                retreat_pose = offset_along_tray_normal(pick_pose, TRAY_RETREAT_DIST)
                logger.info(f"Retreating along tray normal to: {retreat_pose}")
                robot.set_cartesian_pose(
                    cartesian_pose=retreat_pose,
                    speed=MOVE_SPEED,
                    acceleration=MOVE_ACC,
                    asynchronous=False,
                )

                # 4f: Compute drop pose for the corresponding destination cell.
                drop_r = idx // DROP_COLS
                drop_c = idx % DROP_COLS
                drop_pose = compute_drop_pose(drop_r, drop_c)
                logger.info(
                    f"Computed drop pose for box cell (row={drop_r}, col={drop_c}): {drop_pose}"
                )

                # 4g: Pre-drop above the drop cell.
                pre_drop_pose = offset_along_base_z(drop_pose, DROP_APPROACH_DIST)
                logger.info(f"Moving to pre-drop pose: {pre_drop_pose}")
                robot.set_cartesian_pose(
                    cartesian_pose=pre_drop_pose,
                    speed=MOVE_SPEED,
                    acceleration=MOVE_ACC,
                    asynchronous=False,
                )

                # 4h: Move down to drop pose.
                logger.info(f"Moving to drop pose: {drop_pose}")
                robot.set_cartesian_pose(
                    cartesian_pose=drop_pose,
                    speed=MOVE_SPEED,
                    acceleration=MOVE_ACC,
                    asynchronous=False,
                )

                # 4i: Open gripper to release / drop the part.
                logger.info("Opening gripper to release the part.")
                release_status = gripper.open(force=GRIPPER_GRASP_FORCE, asynchronous=False)
                logger.info(f"Gripper open status: {release_status}")

                # 4j: Retreat upward in base frame.
                post_drop_pose = offset_along_base_z(drop_pose, DROP_RETREAT_DIST)
                logger.info(f"Retreating from drop pose to: {post_drop_pose}")
                robot.set_cartesian_pose(
                    cartesian_pose=post_drop_pose,
                    speed=MOVE_SPEED,
                    acceleration=MOVE_ACC,
                    asynchronous=False,
                )

                logger.info(f"Pick #{idx + 1} complete.")
                idx += 1

            if idx >= n_to_pick:
                break

        # ---- Step 4 done; return home ----
        logger.info(f"All {idx} parts processed. Returning home: {HOME_JOINTS_DEG}")
        robot.set_joint_positions(
            joint_positions=HOME_JOINTS_DEG,
            speed=HOME_JOINT_SPEED,
            acceleration=HOME_JOINT_ACC,
            asynchronous=False,
        )
        logger.info("Robot at home position. Pipeline complete.")

    except Exception as e:
        logger.exception(f"Pipeline aborted due to error: {e}")
        # Best-effort safety stop on the robot if it is still connected.
        try:
            if robot_connected:
                logger.warning("Attempting to stop any ongoing robot motion.")
                robot.stop_cartesian_motion(stopping_speed=0.5)
        except Exception as stop_err:
            logger.error(f"Failed to stop robot motion cleanly: {stop_err}")
        raise

    finally:
        # ---- Hardware cleanup (always run) ----
        # Open gripper before disconnect so a part isn't left clamped.
        if gripper_connected:
            try:
                logger.info("Opening gripper before disconnect (safety).")
                gripper.open(force=GRIPPER_GRASP_FORCE, asynchronous=False)
            except Exception as g_err:
                logger.error(f"Failed to open gripper during cleanup: {g_err}")
            try:
                logger.info("Disconnecting OnRobot RG2.")
                gripper.disconnect()
            except Exception as g_err:
                logger.error(f"Error disconnecting gripper: {g_err}")

        if robot_connected:
            try:
                logger.info("Disconnecting UR10E.")
                robot.disconnect()
            except Exception as r_err:
                logger.error(f"Error disconnecting robot: {r_err}")

        logger.info("Cleanup complete.")


if __name__ == "__main__":
    main()