Skip to content

PVC Pipe Depalletizing

This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a depalletizing pipeline. Tzara generates code that picks PVC pipes arranged in a grid on a table and drops them into a paper packaging box. Placement inside the box is unstructured — the robot simply drops each pipe rather than placing it precisely.

Demo

UR10e picking PVC pipes from a grid and dropping into a packaging box

The Natural Language Instruction

On my table i have multiple pvc pipes sorted in grid structure. I need to pick them and place into paper packaging box. Placing does not have to be structured, you can just drop them. Add logging at each step.

The Generated Code

python
# Depalletizing pipeline: UR10e + OnRobot RG6
# Picks PVC pipes from a flat grid on a table and drops them into a packaging box.
# Drop placement is unstructured — no precise in-box positioning required.

import math
from loguru import logger

from telekinesis.synapse.robots.manipulators.universal_robots import UniversalRobotsUR10E
from telekinesis.synapse.tools.parallel_grippers.onrobot import OnRobotRG6

# ============================================================
# Constants
# ============================================================

ROBOT_IP   = "192.168.1.2"
GRIPPER_IP = "192.168.1.1"
GRIPPER_PROTOCOL = "MODBUS_TCP"

# Pick grid layout
PICK_GRID_ROWS = 4
PICK_GRID_COLS = 5
PICK_ORIGIN_XYZ = [0.050, 0.820, 0.195]   # base frame position of grid slot (0, 0) in meters
PICK_ROW_PITCH  = -0.070                   # row spacing along Y (m)
PICK_COL_PITCH  =  0.070                   # column spacing along X (m)

# Pick orientation: TCP pointing straight down
PICK_RPY_DEG = [180.0, 0.0, 90.0]

# Z levels (meters)
Z_APPROACH = 0.320   # safe clearance height above the table
Z_PICK     = PICK_ORIGIN_XYZ[2]  # contact height (same for all grid slots)

# Drop pose: a single fixed pose above the packaging box
DROP_POSE = [-0.350, 0.750, 0.320, 180.0, 0.0, 90.0]

# Gripper settings (mm / N)
GRIPPER_OPEN_MM  = 70.0
GRIPPER_CLOSE_MM = 18.0
GRIPPER_FORCE_N  = 50.0

# Motion parameters
CART_SPEED = 0.50    # m/s
CART_ACCEL = 0.70    # m/s^2

# Home joint pose (degrees)
HOME_JOINT_DEG = [90.0, -90.0, -90.0, -90.0, 90.0, 0.0]
JOINT_SPEED_DEG = 45.0
JOINT_ACCEL_DEG = 90.0


# ============================================================
# Helper functions
# ============================================================

def generate_pick_poses():
    """Return a list of pick poses [x, y, z, rx, ry, rz] for every grid slot."""
    poses = []
    for r in range(PICK_GRID_ROWS):
        for c in range(PICK_GRID_COLS):
            x = PICK_ORIGIN_XYZ[0] + c * PICK_COL_PITCH
            y = PICK_ORIGIN_XYZ[1] + r * PICK_ROW_PITCH
            z = Z_PICK
            poses.append([x, y, z] + list(PICK_RPY_DEG))
    logger.debug(f"Generated {len(poses)} pick poses ({PICK_GRID_ROWS} rows x {PICK_GRID_COLS} cols)")
    return poses


def _approach(pose, z_safe):
    """Return a copy of pose with z replaced by z_safe."""
    p = list(pose)
    p[2] = z_safe
    return p


def pick_part(robot, gripper, pick_pose, index):
    """Execute pick sequence for a single PVC pipe."""
    approach = _approach(pick_pose, Z_APPROACH)

    logger.info(f"[Part {index}] Moving to pick approach")
    logger.debug(f"    approach = {approach}")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
    logger.success(f"[Part {index}] At pick approach")

    logger.info(f"[Part {index}] Opening gripper")
    gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Part {index}] Gripper open")

    logger.info(f"[Part {index}] Descending to pick pose")
    logger.debug(f"    pick pose = {pick_pose}")
    robot.set_cartesian_pose(pick_pose, speed=CART_SPEED, acceleration=CART_ACCEL)
    logger.success(f"[Part {index}] At pick pose")

    logger.info(f"[Part {index}] Closing gripper to grasp pipe")
    gripper.move(position=GRIPPER_CLOSE_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Part {index}] Pipe grasped")

    logger.info(f"[Part {index}] Retreating from pick")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
    logger.success(f"[Part {index}] Retreated from pick")


def drop_part(robot, gripper, index):
    """Move to the fixed drop pose above the box and release the pipe."""
    logger.info(f"[Part {index}] Moving to drop pose above packaging box")
    logger.debug(f"    drop pose = {DROP_POSE}")
    robot.set_cartesian_pose(DROP_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
    logger.success(f"[Part {index}] At drop pose")

    logger.info(f"[Part {index}] Opening gripper to drop pipe")
    gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Part {index}] Pipe dropped into box")


def run_pipeline(robot, gripper):
    """Iterate the pick grid and drop every pipe into the packaging box."""
    pick_poses = generate_pick_poses()
    n_parts = len(pick_poses)

    logger.info(f"Starting depalletizing pipeline: {n_parts} pipes to process")

    logger.info("Moving to home pose before starting")
    robot.set_joint_positions(HOME_JOINT_DEG, speed=JOINT_SPEED_DEG, acceleration=JOINT_ACCEL_DEG)
    logger.success("At home pose")

    for i, pick_pose in enumerate(pick_poses):
        logger.info(f"===== Pipe {i + 1} / {n_parts} =====")
        pick_part(robot, gripper, pick_pose, i + 1)
        drop_part(robot, gripper, i + 1)
        logger.success(f"===== Pipe {i + 1} complete =====")

    logger.info("All pipes processed — returning to home pose")
    robot.set_joint_positions(HOME_JOINT_DEG, speed=JOINT_SPEED_DEG, acceleration=JOINT_ACCEL_DEG)
    logger.success("Pipeline complete")


# ============================================================
# Main
# ============================================================

def main():
    robot   = UniversalRobotsUR10E()
    gripper = OnRobotRG6()

    robot_connected   = False
    gripper_connected = False

    try:
        logger.info(f"Connecting to UR10e at {ROBOT_IP}")
        robot.connect(ROBOT_IP)
        robot_connected = True
        logger.success("Robot connected")

        logger.info(f"Connecting to OnRobot RG6 at {GRIPPER_IP} via {GRIPPER_PROTOCOL}")
        gripper.connect(ip=GRIPPER_IP, protocol=GRIPPER_PROTOCOL)
        gripper_connected = True
        logger.success("Gripper connected")

        run_pipeline(robot, gripper)

    except Exception as e:
        logger.exception(f"Pipeline failed: {type(e).__name__}: {e}")
    finally:
        if gripper_connected:
            try:
                logger.info("Disconnecting gripper")
                gripper.disconnect()
            except Exception as e:
                logger.warning(f"Gripper disconnect failed: {e}")
        if robot_connected:
            try:
                logger.info("Disconnecting robot")
                robot.disconnect()
            except Exception as e:
                logger.warning(f"Robot disconnect failed: {e}")
        logger.info("Cleanup complete")


if __name__ == "__main__":
    main()