Skip to content

Candy Bar Sorting

This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a candy bar sorting pipeline. Tzara generates code that scans a bin of mixed chocolate bars, classifies each one by color using a vision-language model, computes the grasping orientation from a PCA of the segmentation mask, and places each bar into the correct bin. Full Rerun visualization and detailed logging are included at every stage for debugging and traceability.

Demo

UR10e sorting chocolate bars by color into three separate bins

The Natural Language Instruction

In the bin there are 3 different types of chocolate, yellow, orange and brown. Can you sort them into 3 different bins. Do PCA on the mask for grasping orientation. Add detailed logging and Rerun visualization for every stage and step so we can see what is going on. The goal is robust debugging and traceability so failures can be diagnosed easily.

The Generated Code

python
# Candy bar sorting pipeline: UR10e + OnRobot RG6
# Scans a bin of mixed chocolate bars, classifies each by color (yellow / orange / brown)
# using QWEN VLM, computes grasping orientation via PCA on the SAM segmentation mask,
# and places each bar into the correct bin.
# Full Rerun visualization and detailed loguru logging at every stage.

import time
import numpy as np
import cv2
from loguru import logger
import rerun as rr

from telekinesis.synapse.robots.manipulators.universal_robots import UniversalRobotsUR10E
from telekinesis.synapse.tools.parallel_grippers.onrobot import OnRobotRG6
from telekinesis.medulla.cameras.realsense import RealSense
from telekinesis.retina import qwen
from telekinesis.retina import segment_anything

# ============================================================
# Constants
# ============================================================

ROBOT_IP    = "192.168.1.2"
GRIPPER_IP  = "192.168.1.1"
GRIPPER_PROTOCOL = "MODBUS_TCP"

# Pose from which the camera scans the source bin [x, y, z, rx, ry, rz (deg)]
SCAN_POSE = [0.050, 0.620, 0.450, 180.0, 0.0, 90.0]

# Home joint pose (deg)
HOME_JOINT_POSE_DEG = [90.0, -90.0, -90.0, -90.0, 90.0, 0.0]

# Destination bin poses [x, y, z, rx, ry, rz (deg)]
YELLOW_BIN_POSE = [0.450, 0.300, 0.086, 180.0, 0.0, 90.0]
ORANGE_BIN_POSE = [0.450, 0.150, 0.086, 180.0, 0.0, 90.0]
BROWN_BIN_POSE  = [0.450, 0.000, 0.086, 180.0, 0.0, 90.0]

BIN_POSE_BY_COLOR = {
    "yellow": YELLOW_BIN_POSE,
    "orange": ORANGE_BIN_POSE,
    "brown":  BROWN_BIN_POSE,
}

# Grasp / place parameters
GRASP_Z           = 0.086   # m — height at which the gripper closes on the bar
APPROACH_OFFSET_Z = 0.120   # m — clearance above grasp / place pose
PLACE_Z_OFFSET    = 0.030   # m — small drop height above bin rim

# Motion parameters
CART_SPEED      = 0.50   # m/s
CART_ACCEL      = 0.60   # m/s^2
JOINT_SPEED_DEG = 45.0   # deg/s
JOINT_ACCEL_DEG = 90.0   # deg/s^2

# Gripper parameters
GRIPPER_OPEN_MM  = 70.0
GRIPPER_CLOSE_MM = 20.0
GRIPPER_FORCE_N  = 40.0

# QWEN prompt
QWEN_PROMPT = (
    "Look at the object inside the bounding box. "
    "Classify its color as exactly one of: yellow, orange, brown. "
    "Reply with only the color word."
)

# ============================================================
# Geometry helpers
# ============================================================

def _approach(pose: list, dz: float) -> list:
    """Return a copy of pose with z raised by dz."""
    p = list(pose)
    p[2] += dz
    return p


def pca_angle_from_mask(mask: np.ndarray) -> float:
    """
    Compute the major-axis angle (radians) of the binary mask using PCA.

    Returns the angle of the first principal component in image space,
    which is used to rotate the gripper around Z for aligned grasping.
    """
    ys, xs = np.nonzero(mask)
    if len(xs) < 5:
        logger.warning("PCA: too few mask pixels, defaulting to 0 rad")
        return 0.0

    pts = np.column_stack([xs, ys]).astype(np.float64)
    mean = pts.mean(axis=0)
    centered = pts - mean
    cov = (centered.T @ centered) / len(pts)
    eigenvalues, eigenvectors = np.linalg.eigh(cov)
    major_axis = eigenvectors[:, np.argmax(eigenvalues)]   # (dx, dy) in image
    angle_rad = np.arctan2(major_axis[1], major_axis[0])
    logger.debug(f"PCA: major_axis={major_axis}, angle={np.degrees(angle_rad):.2f} deg")
    return float(angle_rad)


def grasp_pose_from_detection(cx_m: float, cy_m: float, angle_rad: float) -> list:
    """
    Build a 6-DOF grasp pose [x, y, z, rx, ry, rz_deg] from centroid and PCA angle.
    rx, ry are fixed (gripper pointing down). rz is set from PCA angle.
    """
    rz_deg = float(np.degrees(angle_rad))
    return [cx_m, cy_m, GRASP_Z, 180.0, 0.0, rz_deg]


# ============================================================
# Vision helpers
# ============================================================

def pixel_to_robot_xy(px: int, py: int, camera: RealSense) -> tuple[float, float]:
    """
    Back-project a pixel to robot-frame XY using the RealSense depth frame.
    Returns (x_m, y_m) in robot base coordinates.
    """
    depth_m = camera.get_depth_at_pixel(px, py)
    x_m, y_m, _ = camera.deproject_pixel_to_point(px, py, depth_m)
    return float(x_m), float(y_m)


def classify_color(image: np.ndarray, bbox: tuple, mask: np.ndarray) -> str:
    """
    Use QWEN VLM to classify the chocolate bar color within bbox as
    'yellow', 'orange', or 'brown'. Falls back to 'brown' on failure.
    """
    x1, y1, x2, y2 = bbox
    crop = image[y1:y2, x1:x2]
    response = qwen.classify(image=crop, prompt=QWEN_PROMPT)
    color = response.strip().lower()
    if color not in ("yellow", "orange", "brown"):
        logger.warning(f"QWEN returned unexpected color '{color}', defaulting to 'brown'")
        color = "brown"
    return color


# ============================================================
# Pick / place motions
# ============================================================

def pick(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, grasp_pose: list, idx: int):
    approach = _approach(grasp_pose, APPROACH_OFFSET_Z)

    logger.info(f"[Bar {idx}] Open gripper")
    gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)

    logger.info(f"[Bar {idx}] Move to approach: {approach}")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    logger.info(f"[Bar {idx}] Descend to grasp: {grasp_pose}")
    robot.set_cartesian_pose(grasp_pose, speed=CART_SPEED * 0.5, acceleration=CART_ACCEL * 0.5)

    logger.info(f"[Bar {idx}] Close gripper")
    gripper.move(position=GRIPPER_CLOSE_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Bar {idx}] Grasped")

    logger.info(f"[Bar {idx}] Retreat to approach")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    rr.log(f"bar_{idx}/pick", rr.TextLog(f"Picked bar {idx} at grasp pose {grasp_pose}"))


def place(robot: UniversalRobotsUR10E, gripper: OnRobotRG6,
          bin_pose: list, color: str, idx: int):
    place_pose = list(bin_pose)
    place_pose[2] += PLACE_Z_OFFSET
    approach = _approach(place_pose, APPROACH_OFFSET_Z)

    logger.info(f"[Bar {idx}] Move to {color} bin approach: {approach}")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    logger.info(f"[Bar {idx}] Descend to place: {place_pose}")
    robot.set_cartesian_pose(place_pose, speed=CART_SPEED * 0.5, acceleration=CART_ACCEL * 0.5)

    logger.info(f"[Bar {idx}] Open gripper — release")
    gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Bar {idx}] Released into {color} bin")

    logger.info(f"[Bar {idx}] Retreat from bin")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    rr.log(f"bar_{idx}/place", rr.TextLog(f"Placed bar {idx} ({color}) at {place_pose}"))


# ============================================================
# Main pipeline
# ============================================================

def run_pipeline(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, camera: RealSense):
    logger.info("=== Candy Bar Sorting Pipeline Start ===")

    # Move to scan pose
    logger.info(f"Moving to scan pose: {SCAN_POSE}")
    robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
    logger.success("At scan pose")

    # Capture RGB frame
    logger.info("Capturing RGB frame from RealSense")
    rgb = camera.get_color_frame()          # np.ndarray HxWx3 uint8 BGR
    logger.success(f"Captured frame shape={rgb.shape}")

    rr.log("camera/rgb", rr.Image(rgb[:, :, ::-1]))   # log as RGB

    # Detect all chocolates with QWEN
    logger.info("Running QWEN detection for all chocolates in the bin")
    detections = qwen.detect(
        image=rgb,
        prompt="Detect all chocolate bars in the image. Return bounding boxes.",
    )
    logger.info(f"QWEN returned {len(detections)} detections")

    if not detections:
        logger.warning("No chocolates detected — exiting pipeline")
        return

    # Log all bounding boxes
    boxes_xyxy = np.array([[d["bbox"][0], d["bbox"][1], d["bbox"][2], d["bbox"][3]]
                            for d in detections], dtype=np.float32)
    rr.log("camera/detections/bboxes",
           rr.Boxes2D(array=boxes_xyxy, array_format=rr.Box2DFormat.XYXY))
    logger.debug(f"Logged {len(detections)} bounding boxes to Rerun")

    for idx, det in enumerate(detections):
        bbox = det["bbox"]   # (x1, y1, x2, y2) in pixels
        x1, y1, x2, y2 = bbox
        cx_px = int((x1 + x2) / 2)
        cy_px = int((y1 + y2) / 2)
        logger.info(f"[Bar {idx}] BBox={bbox}, centroid_px=({cx_px}, {cy_px})")

        # Segment with SAM
        logger.info(f"[Bar {idx}] Running SAM segmentation")
        mask = segment_anything.segment(image=rgb, bbox=bbox)   # HxW bool
        logger.success(f"[Bar {idx}] Mask computed, nonzero px={mask.sum()}")

        # Log mask overlay
        mask_vis = (mask.astype(np.uint8) * 255)
        rr.log(f"bar_{idx}/mask", rr.Image(mask_vis))

        # PCA for grasping orientation
        logger.info(f"[Bar {idx}] Computing PCA orientation from mask")
        angle_rad = pca_angle_from_mask(mask)
        logger.info(f"[Bar {idx}] PCA angle = {np.degrees(angle_rad):.2f} deg")

        # Back-project centroid to robot frame
        cx_m, cy_m = pixel_to_robot_xy(cx_px, cy_px, camera)
        logger.debug(f"[Bar {idx}] Robot-frame centroid: x={cx_m:.4f} m, y={cy_m:.4f} m")

        # Build grasp pose
        grasp_pose = grasp_pose_from_detection(cx_m, cy_m, angle_rad)
        logger.info(f"[Bar {idx}] Grasp pose: {grasp_pose}")
        rr.log(f"bar_{idx}/grasp_pose",
               rr.TextLog(f"Grasp pose: {grasp_pose}"))

        # Classify color
        logger.info(f"[Bar {idx}] Classifying color with QWEN VLM")
        color = classify_color(rgb, bbox, mask)
        logger.success(f"[Bar {idx}] Color classified as '{color}'")
        rr.log(f"bar_{idx}/color", rr.TextLog(f"Color: {color}"))

        # Pick
        pick(robot, gripper, grasp_pose, idx)

        # Return to scan pose before placing
        logger.info(f"[Bar {idx}] Return to scan pose for clearance")
        robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)

        # Place in correct bin
        bin_pose = BIN_POSE_BY_COLOR[color]
        logger.info(f"[Bar {idx}] Placing into {color} bin at {bin_pose}")
        place(robot, gripper, bin_pose, color, idx)

        # Return to scan pose for next pick
        logger.info(f"[Bar {idx}] Return to scan pose")
        robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
        logger.success(f"[Bar {idx}] Complete")

    # Home
    logger.info("All bars sorted — returning to home pose")
    robot.set_joint_positions(HOME_JOINT_POSE_DEG,
                              speed=JOINT_SPEED_DEG,
                              acceleration=JOINT_ACCEL_DEG)
    logger.success("=== Candy Bar Sorting Pipeline Complete ===")


# ============================================================
# Entry point
# ============================================================

def main():
    rr.init("candy_sorting")
    logger.info("Rerun session 'candy_sorting' initialized")

    robot   = UniversalRobotsUR10E()
    gripper = OnRobotRG6()
    camera  = RealSense()

    robot_connected   = False
    gripper_connected = False
    camera_started    = False

    try:
        logger.info(f"Connecting to UR10e at {ROBOT_IP}")
        robot.connect(ROBOT_IP)
        robot_connected = True
        logger.success("Robot connected")

        logger.info(f"Connecting to OnRobot RG6 at {GRIPPER_IP} via {GRIPPER_PROTOCOL}")
        gripper.connect(ip=GRIPPER_IP, protocol=GRIPPER_PROTOCOL)
        gripper_connected = True
        logger.success("Gripper connected")

        logger.info("Starting RealSense camera")
        camera.start()
        camera_started = True
        logger.success("Camera started")

        run_pipeline(robot, gripper, camera)

    except Exception as e:
        logger.exception(f"Pipeline failed: {type(e).__name__}: {e}")
    finally:
        if camera_started:
            try:
                logger.info("Stopping camera")
                camera.stop()
            except Exception as e:
                logger.warning(f"Camera stop failed: {e}")
        if gripper_connected:
            try:
                logger.info("Disconnecting gripper")
                gripper.disconnect()
            except Exception as e:
                logger.warning(f"Gripper disconnect failed: {e}")
        if robot_connected:
            try:
                logger.info("Disconnecting robot")
                robot.disconnect()
            except Exception as e:
                logger.warning(f"Robot disconnect failed: {e}")
        logger.info("Cleanup complete")


if __name__ == "__main__":
    main()