Skip to content

Screw Sorting

This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a screw sorting pipeline. Tzara generates code that detects screws in a blue bin using Grounding DINO, rejects oversized bounding boxes, segments each screw with SAM, and classifies it as black, large gray, or small gray based on mask color and area. The gripper picks each screw at a hardcoded height with a PCA-derived orientation and places it in one of three boxes. Full Rerun visualization and detailed logging are included at every stage.

Demo

UR10e sorting three types of screws by color and size into separate boxes

The Natural Language Instruction

In the blue bin there are 3 types of screws, one is black two are gray. One gray type is bigger than the other gray type. Can you sort them in 3 boxes. Differentiate the two gray ones by mask size. Reject big bboxes returned by dino or qwen. Hardcode grasp height to 0.01. Do PCA on the mask for grasping orientation. 3 boxes poses [0.3041, 0.85220, 0.086, 180.0, 0.0, 180.0], [0.3041, 0.73786, 0.086, 180.0, 0.0, 180.0], [0.3041, 0.62573, 0.086, 180.0, 0.0, 180.0]. Add detailed logging and Rerun visualization for every stage and step so we can see what is going on. The goal is robust debugging and traceability so failures can be diagnosed easily.

The Generated Code

python
# Screw sorting pipeline: UR10e + OnRobot RG6
# Detects screws in a blue bin using Grounding DINO, rejects oversized bounding boxes,
# segments each screw with SAM, classifies as black / large-gray / small-gray by
# mask color and area, computes grasping orientation via PCA, and places into one
# of three boxes. Full Rerun visualization and detailed loguru logging throughout.

import time
import numpy as np
import cv2
from loguru import logger
import rerun as rr

from telekinesis.synapse.robots.manipulators.universal_robots import UniversalRobotsUR10E
from telekinesis.synapse.tools.parallel_grippers.onrobot import OnRobotRG6
from telekinesis.medulla.cameras.realsense import RealSense
from telekinesis.retina import grounding_dino
from telekinesis.retina import segment_anything

# ============================================================
# Constants
# ============================================================

ROBOT_IP         = "192.168.1.2"
GRIPPER_IP       = "192.168.1.1"
GRIPPER_PROTOCOL = "MODBUS_TCP"

# Pose from which the camera scans the source bin [x, y, z, rx, ry, rz (deg)]
SCAN_POSE = [0.050, 0.700, 0.450, 180.0, 0.0, 90.0]

# Home joint pose (deg)
HOME_JOINT_POSE_DEG = [90.0, -90.0, -90.0, -90.0, 90.0, 0.0]

# Destination box poses [x, y, z, rx, ry, rz (deg)]
BOX_POSES = [
    [0.3041, 0.85220, 0.086, 180.0, 0.0, 180.0],   # BOX_0: black screws
    [0.3041, 0.73786, 0.086, 180.0, 0.0, 180.0],   # BOX_1: large gray screws
    [0.3041, 0.62573, 0.086, 180.0, 0.0, 180.0],   # BOX_2: small gray screws
]

# Detection / classification thresholds
DINO_PROMPT      = "screw ."
MAX_BBOX_AREA    = 40000        # px² — reject detections larger than this
DARK_THRESHOLD   = 80          # mean masked pixel intensity below this → black screw
GRAY_SIZE_THRESHOLD = 1200     # mask area in px: above → large gray, below → small gray

# Grasp parameters
GRASP_Z           = 0.01       # m — hardcoded grasp height
APPROACH_OFFSET_Z = 0.120      # m — clearance above grasp / place pose
PLACE_Z_OFFSET    = 0.030      # m — small drop height above box rim

# Motion parameters
CART_SPEED      = 0.50   # m/s
CART_ACCEL      = 0.60   # m/s^2
JOINT_SPEED_DEG = 45.0   # deg/s
JOINT_ACCEL_DEG = 90.0   # deg/s^2

# Gripper parameters
GRIPPER_OPEN_MM  = 50.0
GRIPPER_CLOSE_MM = 8.0
GRIPPER_FORCE_N  = 30.0

# Class labels for logging / Rerun
CLASS_LABELS = {0: "black", 1: "large_gray", 2: "small_gray"}

# ============================================================
# Geometry helpers
# ============================================================

def _approach(pose: list, dz: float) -> list:
    """Return a copy of pose with z raised by dz."""
    p = list(pose)
    p[2] += dz
    return p


def pca_angle_from_mask(mask: np.ndarray) -> float:
    """
    Compute the major-axis angle (radians) of the binary mask using PCA.
    Returns the angle of the first principal component in image space,
    used to rotate the gripper around Z for aligned grasping.
    """
    ys, xs = np.nonzero(mask)
    if len(xs) < 5:
        logger.warning("PCA: too few mask pixels, defaulting to 0 rad")
        return 0.0

    pts = np.column_stack([xs, ys]).astype(np.float64)
    mean = pts.mean(axis=0)
    centered = pts - mean
    cov = (centered.T @ centered) / len(pts)
    eigenvalues, eigenvectors = np.linalg.eigh(cov)
    major_axis = eigenvectors[:, np.argmax(eigenvalues)]
    angle_rad = np.arctan2(major_axis[1], major_axis[0])
    logger.debug(f"PCA: major_axis={major_axis}, angle={np.degrees(angle_rad):.2f} deg")
    return float(angle_rad)


def grasp_pose_from_centroid(cx_m: float, cy_m: float, angle_rad: float) -> list:
    """Build a 6-DOF grasp pose using hardcoded GRASP_Z and PCA angle as rz."""
    rz_deg = float(np.degrees(angle_rad))
    return [cx_m, cy_m, GRASP_Z, 180.0, 0.0, rz_deg]


def pixel_to_robot_xy(px: int, py: int, camera: RealSense) -> tuple[float, float]:
    """Back-project a pixel to robot-frame XY via RealSense depth."""
    depth_m = camera.get_depth_at_pixel(px, py)
    x_m, y_m, _ = camera.deproject_pixel_to_point(px, py, depth_m)
    return float(x_m), float(y_m)


# ============================================================
# Classification
# ============================================================

def classify_screw(image: np.ndarray, mask: np.ndarray, idx: int) -> int:
    """
    Classify screw type from mask color and area.

    Returns:
        0 — black screw  (dark mean intensity)
        1 — large gray   (gray, mask_area > GRAY_SIZE_THRESHOLD)
        2 — small gray   (gray, mask_area <= GRAY_SIZE_THRESHOLD)
    """
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    masked_pixels = gray[mask]
    mean_intensity = float(masked_pixels.mean()) if len(masked_pixels) > 0 else 0.0
    mask_area = int(mask.sum())

    logger.debug(f"[Screw {idx}] mean_intensity={mean_intensity:.1f}, mask_area={mask_area}")

    if mean_intensity < DARK_THRESHOLD:
        label = 0
    elif mask_area > GRAY_SIZE_THRESHOLD:
        label = 1
    else:
        label = 2

    logger.info(f"[Screw {idx}] Classified as '{CLASS_LABELS[label]}' "
                f"(mean_intensity={mean_intensity:.1f}, mask_area={mask_area})")
    return label


# ============================================================
# Pick / place motions
# ============================================================

def pick(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, grasp_pose: list, idx: int):
    approach = _approach(grasp_pose, APPROACH_OFFSET_Z)

    logger.info(f"[Screw {idx}] Open gripper")
    gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)

    logger.info(f"[Screw {idx}] Move to approach: {approach}")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    logger.info(f"[Screw {idx}] Descend to grasp: {grasp_pose}")
    robot.set_cartesian_pose(grasp_pose, speed=CART_SPEED * 0.4, acceleration=CART_ACCEL * 0.4)

    logger.info(f"[Screw {idx}] Close gripper")
    gripper.move(position=GRIPPER_CLOSE_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Screw {idx}] Grasped")

    logger.info(f"[Screw {idx}] Retreat to approach")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    rr.log(f"screw_{idx}/pick",
           rr.TextLog(f"Picked screw {idx} at grasp_pose={grasp_pose}"))


def place(robot: UniversalRobotsUR10E, gripper: OnRobotRG6,
          box_idx: int, idx: int):
    box_pose = list(BOX_POSES[box_idx])
    place_pose = list(box_pose)
    place_pose[2] += PLACE_Z_OFFSET
    approach = _approach(place_pose, APPROACH_OFFSET_Z)

    logger.info(f"[Screw {idx}] Move to BOX_{box_idx} approach: {approach}")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    logger.info(f"[Screw {idx}] Descend to place: {place_pose}")
    robot.set_cartesian_pose(place_pose, speed=CART_SPEED * 0.4, acceleration=CART_ACCEL * 0.4)

    logger.info(f"[Screw {idx}] Open gripper — release")
    gripper.move(position=GRIPPER_OPEN_MM, force=GRIPPER_FORCE_N)
    logger.success(f"[Screw {idx}] Released into BOX_{box_idx} ({CLASS_LABELS[box_idx]})")

    logger.info(f"[Screw {idx}] Retreat from box")
    robot.set_cartesian_pose(approach, speed=CART_SPEED, acceleration=CART_ACCEL)

    rr.log(f"screw_{idx}/place",
           rr.TextLog(f"Placed screw {idx} ({CLASS_LABELS[box_idx]}) at {place_pose}"))


# ============================================================
# Main pipeline
# ============================================================

def run_pipeline(robot: UniversalRobotsUR10E, gripper: OnRobotRG6, camera: RealSense):
    logger.info("=== Screw Sorting Pipeline Start ===")

    # Move to scan pose
    logger.info(f"Moving to scan pose: {SCAN_POSE}")
    robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
    logger.success("At scan pose")

    # Capture RGB frame
    logger.info("Capturing RGB frame from RealSense")
    rgb = camera.get_color_frame()   # np.ndarray HxWx3 uint8 BGR
    logger.success(f"Captured frame shape={rgb.shape}")
    rr.log("camera/rgb", rr.Image(rgb[:, :, ::-1]))

    # Detect screws with Grounding DINO
    logger.info(f"Running Grounding DINO with prompt: '{DINO_PROMPT}'")
    raw_detections = grounding_dino.detect(image=rgb, prompt=DINO_PROMPT)
    logger.info(f"Grounding DINO returned {len(raw_detections)} raw detections")

    # Filter oversized bounding boxes
    detections = []
    for d in raw_detections:
        x1, y1, x2, y2 = d["bbox"]
        area = (x2 - x1) * (y2 - y1)
        if area > MAX_BBOX_AREA:
            logger.warning(f"Rejecting detection bbox={d['bbox']} area={area} > MAX_BBOX_AREA={MAX_BBOX_AREA}")
            rr.log("camera/rejected_bboxes",
                   rr.TextLog(f"Rejected bbox={d['bbox']} area={area}"))
        else:
            detections.append(d)

    logger.info(f"{len(detections)} detections remain after bbox-area filter")

    if not detections:
        logger.warning("No valid screw detections — exiting pipeline")
        return

    # Log accepted bounding boxes
    boxes_xyxy = np.array([[d["bbox"][0], d["bbox"][1], d["bbox"][2], d["bbox"][3]]
                            for d in detections], dtype=np.float32)
    rr.log("camera/detections/bboxes",
           rr.Boxes2D(array=boxes_xyxy, array_format=rr.Box2DFormat.XYXY))
    logger.debug(f"Logged {len(detections)} accepted bounding boxes to Rerun")

    for idx, det in enumerate(detections):
        bbox = det["bbox"]   # (x1, y1, x2, y2) pixels
        x1, y1, x2, y2 = bbox
        cx_px = int((x1 + x2) / 2)
        cy_px = int((y1 + y2) / 2)
        logger.info(f"[Screw {idx}] BBox={bbox}, centroid_px=({cx_px}, {cy_px})")

        # Segment with SAM
        logger.info(f"[Screw {idx}] Running SAM segmentation")
        mask = segment_anything.segment(image=rgb, bbox=bbox)   # HxW bool
        mask_area = int(mask.sum())
        logger.success(f"[Screw {idx}] Mask computed, nonzero px={mask_area}")

        mask_vis = (mask.astype(np.uint8) * 255)
        rr.log(f"screw_{idx}/mask", rr.Image(mask_vis))

        # Classify
        logger.info(f"[Screw {idx}] Classifying screw type")
        box_idx = classify_screw(rgb, mask, idx)
        rr.log(f"screw_{idx}/classification",
               rr.TextLog(f"Type: {CLASS_LABELS[box_idx]} (box_idx={box_idx})"))

        # PCA orientation
        logger.info(f"[Screw {idx}] Computing PCA orientation")
        angle_rad = pca_angle_from_mask(mask)
        logger.info(f"[Screw {idx}] PCA angle = {np.degrees(angle_rad):.2f} deg")

        # Back-project centroid to robot frame
        cx_m, cy_m = pixel_to_robot_xy(cx_px, cy_px, camera)
        logger.debug(f"[Screw {idx}] Robot-frame centroid: x={cx_m:.4f} m, y={cy_m:.4f} m")

        # Build grasp pose (GRASP_Z hardcoded)
        grasp_pose = grasp_pose_from_centroid(cx_m, cy_m, angle_rad)
        logger.info(f"[Screw {idx}] Grasp pose (GRASP_Z hardcoded to {GRASP_Z}): {grasp_pose}")
        rr.log(f"screw_{idx}/grasp_pose",
               rr.TextLog(f"Grasp pose: {grasp_pose}"))

        # Pick
        pick(robot, gripper, grasp_pose, idx)

        # Return to scan pose for clearance before placing
        logger.info(f"[Screw {idx}] Return to scan pose for clearance")
        robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)

        # Place
        logger.info(f"[Screw {idx}] Placing into BOX_{box_idx} ({CLASS_LABELS[box_idx]})")
        place(robot, gripper, box_idx, idx)

        # Return to scan pose for next pick
        logger.info(f"[Screw {idx}] Return to scan pose")
        robot.set_cartesian_pose(SCAN_POSE, speed=CART_SPEED, acceleration=CART_ACCEL)
        logger.success(f"[Screw {idx}] Complete")

    # Home
    logger.info("All screws sorted — returning to home pose")
    robot.set_joint_positions(HOME_JOINT_POSE_DEG,
                              speed=JOINT_SPEED_DEG,
                              acceleration=JOINT_ACCEL_DEG)
    logger.success("=== Screw Sorting Pipeline Complete ===")


# ============================================================
# Entry point
# ============================================================

def main():
    rr.init("screw_sorting")
    logger.info("Rerun session 'screw_sorting' initialized")

    robot   = UniversalRobotsUR10E()
    gripper = OnRobotRG6()
    camera  = RealSense()

    robot_connected   = False
    gripper_connected = False
    camera_started    = False

    try:
        logger.info(f"Connecting to UR10e at {ROBOT_IP}")
        robot.connect(ROBOT_IP)
        robot_connected = True
        logger.success("Robot connected")

        logger.info(f"Connecting to OnRobot RG6 at {GRIPPER_IP} via {GRIPPER_PROTOCOL}")
        gripper.connect(ip=GRIPPER_IP, protocol=GRIPPER_PROTOCOL)
        gripper_connected = True
        logger.success("Gripper connected")

        logger.info("Starting RealSense camera")
        camera.start()
        camera_started = True
        logger.success("Camera started")

        run_pipeline(robot, gripper, camera)

    except Exception as e:
        logger.exception(f"Pipeline failed: {type(e).__name__}: {e}")
    finally:
        if camera_started:
            try:
                logger.info("Stopping camera")
                camera.stop()
            except Exception as e:
                logger.warning(f"Camera stop failed: {e}")
        if gripper_connected:
            try:
                logger.info("Disconnecting gripper")
                gripper.disconnect()
            except Exception as e:
                logger.warning(f"Gripper disconnect failed: {e}")
        if robot_connected:
            try:
                logger.info("Disconnecting robot")
                robot.disconnect()
            except Exception as e:
                logger.warning(f"Robot disconnect failed: {e}")
        logger.info("Cleanup complete")


if __name__ == "__main__":
    main()