Screw Sorting
This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a screw sorting pipeline. Tzara generates code that detects screws in a blue bin using Grounding DINO, rejects oversized bounding boxes, segments each screw with SAM, and classifies it as black, large gray, or small gray based on mask color and area. The gripper picks each screw at a hardcoded height with a PCA-derived orientation and places it in one of three boxes. Full Rerun visualization and detailed logging are included at every stage.
Demo
UR10e sorting three types of screws by color and size into separate boxes
The Natural Language Instruction
In the blue bin there are 3 types of screws, one is black two are gray. One gray type is bigger than the other gray type. Can you sort them in 3 boxes. Differentiate the two gray ones by mask size. Reject big bboxes returned by dino or qwen. Hardcode grasp height to 0.01. Do PCA on the mask for grasping orientation. 3 boxes poses [0.3041, 0.85220, 0.086, 180.0, 0.0, 180.0], [0.3041, 0.73786, 0.086, 180.0, 0.0, 180.0], [0.3041, 0.62573, 0.086, 180.0, 0.0, 180.0]. Add detailed logging and Rerun visualization for every stage and step so we can see what is going on. The goal is robust debugging and traceability so failures can be diagnosed easily.The Generated Code
python
# Screw sorting pipeline: UR10e + OnRobot RG6
# Detects screws in a blue bin using Grounding DINO, rejects oversized bounding boxes,
# segments each screw with SAM, classifies as black / large-gray / small-gray by
# mask color and area, computes grasping orientation via PCA, and places into one
# of three boxes. Full Rerun visualization and detailed loguru logging throughout.
import time
import numpy as np
import cv2
from loguru import logger
import rerun as rr
from telekinesis.synapse.robots.manipulators.universal_robots import UniversalRobotsUR10E
from telekinesis.synapse.tools.parallel_grippers.onrobot import OnRobotRG6
from telekinesis.medulla.cameras.realsense import RealSense
from telekinesis.retina import grounding_dino
from telekinesis.retina import segment_anything
# ============================================================
# Constants
# ============================================================
ROBOT_IP = "192.168.1.2"
GRIPPER_IP = "192.168.1.1"
GRIPPER_PROTOCOL = "MODBUS_TCP"
# Pose from which the camera scans the source bin [x, y, z, rx, ry, rz (deg)]
SCAN_POSE = [0.050, 0.700, 0.450, 180.0, 0.0, 90.0]
# Home joint pose (deg)
HOME_JOINT_POSE_DEG = [90.0, -90.0, -90.0, -90.0, 90.0, 0.0]
# Destination box poses [x, y, z, rx, ry, rz (deg)]
BOX_POSES = [
[0.3041, 0.85220, 0.086, 180.0, 0.0, 180.0], # BOX_0: black screws
[0.3041, 0.73786, 0.086, 180.0, 0.0, 180.0], # BOX_1: large gray screws
[0.3041, 0.62573, 0.086, 180.0, 0.0, 180.0], # BOX_2: small gray screws
]
# Detection / classification thresholds
DINO_PROMPT = "screw ."
MAX_BBOX_AREA = 40000 # px² — reject detections larger than this
DARK_THRESHOLD = 80 # mean masked pixel intensity below this → black screw
GRAY_SIZE_THRESHOLD = 1200 # mask area in px: above → large gray, below → small gray
# Grasp parameters
GRASP_Z = 0.01 # m — hardcoded grasp height
APPROACH_OFFSET_Z = 0.120 # m — clearance above grasp / place pose
PLACE_Z_OFFSET = 0.030 # m — small drop height above box rim
# Motion parameters
CART_SPEED = 0.50 # m/s
CART_ACCEL = 0.60 # m/s^2
JOINT_SPEED_DEG = 45.0 # deg/s
JOINT_ACCEL_DEG = 90.0 # deg/s^2
# Gripper parameters
GRIPPER_OPEN_MM = 50.0
GRIPPER_CLOSE_MM = 8.0
GRIPPER_FORCE_N = 30.0
# Class labels for logging / Rerun
CLASS_LABELS = {0: "black", 1: "large_gray", 2: "small_gray"}
# ============================================================
# Geometry helpers
# ============================================================
def _approach(pose: list, dz: float) -> list:
"""Return a copy of pose with z raised by dz."""
p = list(pose)
p[2] += dz
return p
def pca_angle_from_mask(mask: np.ndarray) -> float:
"""
Compute the major-axis angle (radians) of the binary mask using PCA.
Returns the angle of the first principal component in image space,
used to rotate the gripper around Z for aligned grasping.
"""
ys, xs = np.nonzero(mask)
if len(xs) < 5:
logger.warning("PCA: too few mask pixels, defaulting to 0 rad")
return 0.0
pts = np.column_stack([xs, ys]).astype(np.float64)
mean = pts.mean(axis=0)
centered = pts - mean
cov = (centered.T @ centered) / len(pts)
eigenvalues, eigenvectors = np.linalg.eigh(cov)
major_axis = eigenvectors[:, np.argmax(eigenvalues)]
angle_rad = np.arctan2(major_axis[1], major_axis[0])
logger.debug(f"PCA: major_axis={major_axis}, angle={np.degrees(angle_rad):.2f} deg")
return float(angle_rad)
def grasp_pose_from_centroid(cx_m: float, cy_m: float, angle_rad: float) -> list:
"""Build a 6-DOF grasp pose using hardcoded GRASP_Z and PCA angle as rz."""
rz_deg = float(np.degrees(angle_rad))
return [cx_m, cy_m, GRASP_Z, 180.0, 0.0, rz_deg]
def pixel_to_robot_xy(px: int, py: int, camera: RealSense) -> tuple[float, float]:
"""Back-project a pixel to robot-frame XY via RealSense depth."""
depth_m = camera.get_depth_at_pixel(px, py)
x_m, y_m, _ = camera.deproject_pixel_to_point(px, py, depth_m)
return float(x_m), float(y_m)
# ============================================================
# Classification
# ============================================================
def classify_screw(image: np.ndarray, mask: np.ndarray, idx: int) -> int:
"""
Classify screw type from mask color and area.
Returns:
0 — black screw (dark mean intensity)
1 — large gray (gray, mask_area > GRAY_SIZE_THRESHOLD)
2 — small gray (gray, mask_area <= GRAY_SIZE_THRESHOLD)
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
masked_pixels = gray[mask]
mean_intensity = float(masked_pixels.mean()) if len(masked_pixels) > 0 else 0.0
mask_area = int(mask.sum())
logger.debug(f"[Screw {idx}] mean_intensity={mean_intensity:.1f}, mask_area={mask_area}")
if mean_intensity < DARK_THRESHOLD:
label = 0
elif mask_area > GRAY_SIZE_THRESHOLD:
label = 1
else:
label = 2
logger.info(f"[Screw {idx}] Classified as '{CLASS_LABELS[label]}' "
f"(mean_intensity={mean_intensity:.1f}, mask_area={mask_area})")
return label
# ============================================================
# Pick / place motions
# ============================================================
def pick(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, grasp_pose: list, idx: int):
approach = _approach(grasp_pose, APPROACH_OFFSET_Z)
logger.info(f"[Screw {idx}] Open gripper")
gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
logger.info(f"[Screw {idx}] Move to approach: {approach}")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.info(f"[Screw {idx}] Descend to grasp: {grasp_pose}")
robot.set_cartesian_pose(grasp_pose, speed=CART_SPEED * 0.4, acceleration=CART_ACCEL * 0.4)
logger.info(f"[Screw {idx}] Close gripper")
gripper.move(position=GRIPPER_CLOSE_MM, force=GRIPPER_FORCE_N)
logger.success(f"[Screw {idx}] Grasped")
logger.info(f"[Screw {idx}] Retreat to approach")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
rr.log(f"screw_{idx}/pick",
rr.TextLog(f"Picked screw {idx} at grasp_pose={grasp_pose}"))
def place(robot: UniversalRobotsUR10E, gripper: OnRobotRG6,
box_idx: int, idx: int):
box_pose = list(BOX_POSES[box_idx])
place_pose = list(box_pose)
place_pose[2] += PLACE_Z_OFFSET
approach = _approach(place_pose, APPROACH_OFFSET_Z)
logger.info(f"[Screw {idx}] Move to BOX_{box_idx} approach: {approach}")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.info(f"[Screw {idx}] Descend to place: {place_pose}")
robot.set_cartesian_pose(place_pose, speed=CART_SPEED * 0.4, acceleration=CART_ACCEL * 0.4)
logger.info(f"[Screw {idx}] Open gripper — release")
gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
logger.success(f"[Screw {idx}] Released into BOX_{box_idx} ({CLASS_LABELS[box_idx]})")
logger.info(f"[Screw {idx}] Retreat from box")
robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)
rr.log(f"screw_{idx}/place",
rr.TextLog(f"Placed screw {idx} ({CLASS_LABELS[box_idx]}) at {place_pose}"))
# ============================================================
# Main pipeline
# ============================================================
def run_pipeline(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, camera: RealSense):
logger.info("=== Screw Sorting Pipeline Start ===")
# Move to scan pose
logger.info(f"Moving to scan pose: {SCAN_POSE}")
robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.success("At scan pose")
# Capture RGB frame
logger.info("Capturing RGB frame from RealSense")
rgb = camera.get_color_frame() # np.ndarray HxWx3 uint8 BGR
logger.success(f"Captured frame shape={rgb.shape}")
rr.log("camera/rgb", rr.Image(rgb[:, :, ::-1]))
# Detect screws with Grounding DINO
logger.info(f"Running Grounding DINO with prompt: '{DINO_PROMPT}'")
raw_detections = grounding_dino.detect(image=rgb, prompt=DINO_PROMPT)
logger.info(f"Grounding DINO returned {len(raw_detections)} raw detections")
# Filter oversized bounding boxes
detections = []
for d in raw_detections:
x1, y1, x2, y2 = d["bbox"]
area = (x2 - x1) * (y2 - y1)
if area > MAX_BBOX_AREA:
logger.warning(f"Rejecting detection bbox={d['bbox']} area={area} > MAX_BBOX_AREA={MAX_BBOX_AREA}")
rr.log("camera/rejected_bboxes",
rr.TextLog(f"Rejected bbox={d['bbox']} area={area}"))
else:
detections.append(d)
logger.info(f"{len(detections)} detections remain after bbox-area filter")
if not detections:
logger.warning("No valid screw detections — exiting pipeline")
return
# Log accepted bounding boxes
boxes_xyxy = np.array([[d["bbox"][0], d["bbox"][1], d["bbox"][2], d["bbox"][3]]
for d in detections], dtype=np.float32)
rr.log("camera/detections/bboxes",
rr.Boxes2D(array=boxes_xyxy, array_format=rr.Box2DFormat.XYXY))
logger.debug(f"Logged {len(detections)} accepted bounding boxes to Rerun")
for idx, det in enumerate(detections):
bbox = det["bbox"] # (x1, y1, x2, y2) pixels
x1, y1, x2, y2 = bbox
cx_px = int((x1 + x2) / 2)
cy_px = int((y1 + y2) / 2)
logger.info(f"[Screw {idx}] BBox={bbox}, centroid_px=({cx_px}, {cy_px})")
# Segment with SAM
logger.info(f"[Screw {idx}] Running SAM segmentation")
mask = segment_anything.segment(image=rgb, bbox=bbox) # HxW bool
mask_area = int(mask.sum())
logger.success(f"[Screw {idx}] Mask computed, nonzero px={mask_area}")
mask_vis = (mask.astype(np.uint8) * 255)
rr.log(f"screw_{idx}/mask", rr.Image(mask_vis))
# Classify
logger.info(f"[Screw {idx}] Classifying screw type")
box_idx = classify_screw(rgb, mask, idx)
rr.log(f"screw_{idx}/classification",
rr.TextLog(f"Type: {CLASS_LABELS[box_idx]} (box_idx={box_idx})"))
# PCA orientation
logger.info(f"[Screw {idx}] Computing PCA orientation")
angle_rad = pca_angle_from_mask(mask)
logger.info(f"[Screw {idx}] PCA angle = {np.degrees(angle_rad):.2f} deg")
# Back-project centroid to robot frame
cx_m, cy_m = pixel_to_robot_xy(cx_px, cy_px, camera)
logger.debug(f"[Screw {idx}] Robot-frame centroid: x={cx_m:.4f} m, y={cy_m:.4f} m")
# Build grasp pose (GRASP_Z hardcoded)
grasp_pose = grasp_pose_from_centroid(cx_m, cy_m, angle_rad)
logger.info(f"[Screw {idx}] Grasp pose (GRASP_Z hardcoded to {GRASP_Z}): {grasp_pose}")
rr.log(f"screw_{idx}/grasp_pose",
rr.TextLog(f"Grasp pose: {grasp_pose}"))
# Pick
pick(robot, gripper, grasp_pose, idx)
# Return to scan pose for clearance before placing
logger.info(f"[Screw {idx}] Return to scan pose for clearance")
robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
# Place
logger.info(f"[Screw {idx}] Placing into BOX_{box_idx} ({CLASS_LABELS[box_idx]})")
place(robot, gripper, box_idx, idx)
# Return to scan pose for next pick
logger.info(f"[Screw {idx}] Return to scan pose")
robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
logger.success(f"[Screw {idx}] Complete")
# Home
logger.info("All screws sorted — returning to home pose")
robot.set_joint_positions(HOME_JOINT_POSE_DEG,
speed=JOINT_SPEED_DEG,
acceleration=JOINT_ACCEL_DEG)
logger.success("=== Screw Sorting Pipeline Complete ===")
# ============================================================
# Entry point
# ============================================================
def main():
rr.init("screw_sorting")
logger.info("Rerun session 'screw_sorting' initialized")
robot = UniversalRobotsUR10E()
gripper = OnRobotRG6()
camera = RealSense()
robot_connected = False
gripper_connected = False
camera_started = False
try:
logger.info(f"Connecting to UR10e at {ROBOT_IP}")
robot.connect(ROBOT_IP)
robot_connected = True
logger.success("Robot connected")
logger.info(f"Connecting to OnRobot RG6 at {GRIPPER_IP} via {GRIPPER_PROTOCOL}")
gripper.connect(ip=GRIPPER_IP, protocol=GRIPPER_PROTOCOL)
gripper_connected = True
logger.success("Gripper connected")
logger.info("Starting RealSense camera")
camera.start()
camera_started = True
logger.success("Camera started")
run_pipeline(robot, gripper, camera)
except Exception as e:
logger.exception(f"Pipeline failed: {type(e).__name__}: {e}")
finally:
if camera_started:
try:
logger.info("Stopping camera")
camera.stop()
except Exception as e:
logger.warning(f"Camera stop failed: {e}")
if gripper_connected:
try:
logger.info("Disconnecting gripper")
gripper.disconnect()
except Exception as e:
logger.warning(f"Gripper disconnect failed: {e}")
if robot_connected:
try:
logger.info("Disconnecting robot")
robot.disconnect()
except Exception as e:
logger.warning(f"Robot disconnect failed: {e}")
logger.info("Cleanup complete")
if __name__ == "__main__":
main()
