Candy Bar Sorting
This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a candy bar sorting pipeline. Tzara generates code that scans a bin of mixed chocolate bars, classifies each one by color using a vision-language model, computes the grasping orientation from a PCA of the segmentation mask, and places each bar into the correct bin. Full Rerun visualization and detailed logging are included at every stage for debugging and traceability.
Demo
UR10e sorting chocolate bars by color into three separate bins
The Natural Language Instruction
In the bin there are 3 different types of chocolate, yellow, orange and brown. Can you sort them into 3 different bins. Do PCA on the mask for grasping orientation. Add detailed logging and Rerun visualization for every stage and step so we can see what is going on. The goal is robust debugging and traceability so failures can be diagnosed easily.The Generated Code
python
# Candy bar sorting pipeline: UR10e + OnRobot RG6
# Scans a bin of mixed chocolate bars, classifies each by color (yellow / orange / brown)
# using QWEN VLM, computes grasping orientation via PCA on the SAM segmentation mask,
# and places each bar into the correct bin.
# Full Rerun visualization and detailed loguru logging at every stage.
import time
import numpy as np
import cv2
from loguru import logger
import rerun as rr
from telekinesis.synapse.robots.manipulators.universal_robots import UniversalRobotsUR10E
from telekinesis.synapse.tools.parallel_grippers.onrobot import OnRobotRG6
from telekinesis.medulla.cameras.realsense import RealSense
from telekinesis.retina import qwen
from telekinesis.retina import segment_anything
# ============================================================
# Constants
# ============================================================
ROBOT_IP = "192.168.1.2"
GRIPPER_IP = "192.168.1.1"
GRIPPER_PROTOCOL = "MODBUS_TCP"
# Pose from which the camera scans the source bin [x, y, z, rx, ry, rz (deg)]
SCAN_POSE = [0.050, 0.620, 0.450, 180.0, 0.0, 90.0]
# Home joint pose (deg)
HOME_JOINT_POSE_DEG = [90.0, -90.0, -90.0, -90.0, 90.0, 0.0]
# Destination bin poses [x, y, z, rx, ry, rz (deg)]
YELLOW_BIN_POSE = [0.450, 0.300, 0.086, 180.0, 0.0, 90.0]
ORANGE_BIN_POSE = [0.450, 0.150, 0.086, 180.0, 0.0, 90.0]
BROWN_BIN_POSE = [0.450, 0.000, 0.086, 180.0, 0.0, 90.0]
BIN_POSE_BY_COLOR = {
"yellow": YELLOW_BIN_POSE,
"orange": ORANGE_BIN_POSE,
"brown": BROWN_BIN_POSE,
}
# Grasp / place parameters
GRASP_Z = 0.086 # m — height at which the gripper closes on the bar
APPROACH_OFFSET_Z = 0.120 # m — clearance above grasp / place pose
PLACE_Z_OFFSET = 0.030 # m — small drop height above bin rim
# Motion parameters
CART_SPEED = 0.50 # m/s
CART_ACCEL = 0.60 # m/s^2
JOINT_SPEED_DEG = 45.0 # deg/s
JOINT_ACCEL_DEG = 90.0 # deg/s^2
# Gripper parameters
GRIPPER_OPEN_MM = 70.0
GRIPPER_CLOSE_MM = 20.0
GRIPPER_FORCE_N = 40.0
# QWEN prompt
QWEN_PROMPT = (
"Look at the object inside the bounding box. "
"Classify its color as exactly one of: yellow, orange, brown. "
"Reply with only the color word."
)
# ============================================================
# Geometry helpers
# ============================================================
def _approach(pose: list, dz: float) -> list:
"""Return a copy of pose with z raised by dz."""
p = list(pose)
p[2] += dz
return p
def pca_angle_from_mask(mask: np.ndarray) -> float:
"""
Compute the major-axis angle (radians) of the binary mask using PCA.
Returns the angle of the first principal component in image space,
which is used to rotate the gripper around Z for aligned grasping.
"""
ys, xs = np.nonzero(mask)
if len(xs) < 5:
logger.warning("PCA: too few mask pixels, defaulting to 0 rad")
return 0.0
pts = np.column_stack([xs, ys]).astype(np.float64)
mean = pts.mean(axis=0)
centered = pts - mean
cov = (centered.T @ centered) / len(pts)
eigenvalues, eigenvectors = np.linalg.eigh(cov)
major_axis = eigenvectors[:, np.argmax(eigenvalues)] # (dx, dy) in image
angle_rad = np.arctan2(major_axis[1], major_axis[0])
logger.debug(f"PCA: major_axis={major_axis}, angle={np.degrees(angle_rad):.2f} deg")
return float(angle_rad)
def grasp_pose_from_detection(cx_m: float, cy_m: float, angle_rad: float) -> list:
"""
Build a 6-DOF grasp pose [x, y, z, rx, ry, rz_deg] from centroid and PCA angle.
rx, ry are fixed (gripper pointing down). rz is set from PCA angle.
"""
rz_deg = float(np.degrees(angle_rad))
return [cx_m, cy_m, GRASP_Z, 180.0, 0.0, rz_deg]
# ============================================================
# Vision helpers
# ============================================================
def pixel_to_robot_xy(px: int, py: int, camera: RealSense) -> tuple[float, float]:
"""
Back-project a pixel to robot-frame XY using the RealSense depth frame.
Returns (x_m, y_m) in robot base coordinates.
"""
depth_m = camera.get_depth_at_pixel(px, py)
x_m, y_m, _ = camera.deproject_pixel_to_point(px, py, depth_m)
return float(x_m), float(y_m)
def classify_color(image: np.ndarray, bbox: tuple, mask: np.ndarray) -> str:
"""
Use QWEN VLM to classify the chocolate bar color within bbox as
'yellow', 'orange', or 'brown'. Falls back to 'brown' on failure.
"""
x1, y1, x2, y2 = bbox
crop = image[y1:y2, x1:x2]
response = qwen.classify(image=crop, prompt=QWEN_PROMPT)
color = response.strip().lower()
if color not in ("yellow", "orange", "brown"):
logger.warning(f"QWEN returned unexpected color '{color}', defaulting to 'brown'")
color = "brown"
return color
# ============================================================
# Pick / place motions
# ============================================================
def pick(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, grasp_pose: list, idx: int):
approach = _approach(grasp_pose, APPROACH_OFFSET_Z)
logger.info(f"[Bar {idx}] Open gripper")
gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
logger.info(f"[Bar {idx}] Move to approach: {approach}")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.info(f"[Bar {idx}] Descend to grasp: {grasp_pose}")
robot.set_cartesian_pose(grasp_pose, speed=CART_SPEED * 0.5, acceleration=CART_ACCEL * 0.5)
logger.info(f"[Bar {idx}] Close gripper")
gripper.move(position=GRIPPER_CLOSE_MM, force=GRIPPER_FORCE_N)
logger.success(f"[Bar {idx}] Grasped")
logger.info(f"[Bar {idx}] Retreat to approach")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
rr.log(f"bar_{idx}/pick", rr.TextLog(f"Picked bar {idx} at grasp pose {grasp_pose}"))
def place(robot: UniversalRobotsUR10E, gripper: OnRobotRG6,
bin_pose: list, color: str, idx: int):
place_pose = list(bin_pose)
place_pose[2] += PLACE_Z_OFFSET
approach = _approach(place_pose, APPROACH_OFFSET_Z)
logger.info(f"[Bar {idx}] Move to {color} bin approach: {approach}")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.info(f"[Bar {idx}] Descend to place: {place_pose}")
robot.set_cartesian_pose(place_pose, speed=CART_SPEED * 0.5, acceleration=CART_ACCEL * 0.5)
logger.info(f"[Bar {idx}] Open gripper — release")
gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
logger.success(f"[Bar {idx}] Released into {color} bin")
logger.info(f"[Bar {idx}] Retreat from bin")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
rr.log(f"bar_{idx}/place", rr.TextLog(f"Placed bar {idx} ({color}) at {place_pose}"))
# ============================================================
# Main pipeline
# ============================================================
def run_pipeline(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, camera: RealSense):
logger.info("=== Candy Bar Sorting Pipeline Start ===")
# Move to scan pose
logger.info(f"Moving to scan pose: {SCAN_POSE}")
robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.success("At scan pose")
# Capture RGB frame
logger.info("Capturing RGB frame from RealSense")
rgb = camera.get_color_frame() # np.ndarray HxWx3 uint8 BGR
logger.success(f"Captured frame shape={rgb.shape}")
rr.log("camera/rgb", rr.Image(rgb[:, :, ::-1])) # log as RGB
# Detect all chocolates with QWEN
logger.info("Running QWEN detection for all chocolates in the bin")
detections = qwen.detect(
image=rgb,
prompt="Detect all chocolate bars in the image. Return bounding boxes.",
)
logger.info(f"QWEN returned {len(detections)} detections")
if not detections:
logger.warning("No chocolates detected — exiting pipeline")
return
# Log all bounding boxes
boxes_xyxy = np.array([[d["bbox"][0], d["bbox"][1], d["bbox"][2], d["bbox"][3]]
for d in detections], dtype=np.float32)
rr.log("camera/detections/bboxes",
rr.Boxes2D(array=boxes_xyxy, array_format=rr.Box2DFormat.XYXY))
logger.debug(f"Logged {len(detections)} bounding boxes to Rerun")
for idx, det in enumerate(detections):
bbox = det["bbox"] # (x1, y1, x2, y2) in pixels
x1, y1, x2, y2 = bbox
cx_px = int((x1 + x2) / 2)
cy_px = int((y1 + y2) / 2)
logger.info(f"[Bar {idx}] BBox={bbox}, centroid_px=({cx_px}, {cy_px})")
# Segment with SAM
logger.info(f"[Bar {idx}] Running SAM segmentation")
mask = segment_anything.segment(image=rgb, bbox=bbox) # HxW bool
logger.success(f"[Bar {idx}] Mask computed, nonzero px={mask.sum()}")
# Log mask overlay
mask_vis = (mask.astype(np.uint8) * 255)
rr.log(f"bar_{idx}/mask", rr.Image(mask_vis))
# PCA for grasping orientation
logger.info(f"[Bar {idx}] Computing PCA orientation from mask")
angle_rad = pca_angle_from_mask(mask)
logger.info(f"[Bar {idx}] PCA angle = {np.degrees(angle_rad):.2f} deg")
# Back-project centroid to robot frame
cx_m, cy_m = pixel_to_robot_xy(cx_px, cy_px, camera)
logger.debug(f"[Bar {idx}] Robot-frame centroid: x={cx_m:.4f} m, y={cy_m:.4f} m")
# Build grasp pose
grasp_pose = grasp_pose_from_detection(cx_m, cy_m, angle_rad)
logger.info(f"[Bar {idx}] Grasp pose: {grasp_pose}")
rr.log(f"bar_{idx}/grasp_pose",
rr.TextLog(f"Grasp pose: {grasp_pose}"))
# Classify color
logger.info(f"[Bar {idx}] Classifying color with QWEN VLM")
color = classify_color(rgb, bbox, mask)
logger.success(f"[Bar {idx}] Color classified as '{color}'")
rr.log(f"bar_{idx}/color", rr.TextLog(f"Color: {color}"))
# Pick
pick(robot, gripper, grasp_pose, idx)
# Return to scan pose before placing
logger.info(f"[Bar {idx}] Return to scan pose for clearance")
robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
# Place in correct bin
bin_pose = BIN_POSE_BY_COLOR[color]
logger.info(f"[Bar {idx}] Placing into {color} bin at {bin_pose}")
place(robot, gripper, bin_pose, color, idx)
# Return to scan pose for next pick
logger.info(f"[Bar {idx}] Return to scan pose")
robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.success(f"[Bar {idx}] Complete")
# Home
logger.info("All bars sorted — returning to home pose")
robot.set_joint_positions(HOME_JOINT_POSE_DEG,
speed=JOINT_SPEED_DEG,
acceleration=JOINT_ACCEL_DEG)
logger.success("=== Candy Bar Sorting Pipeline Complete ===")
# ============================================================
# Entry point
# ============================================================
def main():
rr.init("candy_sorting")
logger.info("Rerun session 'candy_sorting' initialized")
robot = UniversalRobotsUR10E()
gripper = OnRobotRG6()
camera = RealSense()
robot_connected = False
gripper_connected = False
camera_started = False
try:
logger.info(f"Connecting to UR10e at {ROBOT_IP}")
robot.connect(ROBOT_IP)
robot_connected = True
logger.success("Robot connected")
logger.info(f"Connecting to OnRobot RG6 at {GRIPPER_IP} via {GRIPPER_PROTOCOL}")
gripper.connect(ip=GRIPPER_IP, protocol=GRIPPER_PROTOCOL)
gripper_connected = True
logger.success("Gripper connected")
logger.info("Starting RealSense camera")
camera.start()
camera_started = True
logger.success("Camera started")
run_pipeline(robot, gripper, camera)
except Exception as e:
logger.exception(f"Pipeline failed: {type(e).__name__}: {e}")
finally:
if camera_started:
try:
logger.info("Stopping camera")
camera.stop()
except Exception as e:
logger.warning(f"Camera stop failed: {e}")
if gripper_connected:
try:
logger.info("Disconnecting gripper")
gripper.disconnect()
except Exception as e:
logger.warning(f"Gripper disconnect failed: {e}")
if robot_connected:
try:
logger.info("Disconnecting robot")
robot.disconnect()
except Exception as e:
logger.warning(f"Robot disconnect failed: {e}")
logger.info("Cleanup complete")
if __name__ == "__main__":
main()
