Skip to content

Vision-Based Pick and Place

This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a full vision-based pick and place pipeline. Given a natural language instruction and a hardware context, Tzara generates code that captures RGB-D data from an eye-in-hand RealSense D435if, detects the target object and drop location using Grounding DINO, back-projects detections to 3D using the calibrated camera-to-flange transform, and executes a pick-and-place sequence on a UR10e with an OnRobot RG2 gripper.

Demo

UR10e with RealSense eye-in-hand picking and placing a yellow bottle

The Natural Language Instruction

Pick up the yellow bottle and put it in the blue bin

The Generated Code

python
# Telekinesis pipeline: Pick-and-place a yellow bottle into a blue bin
# Hardware: UR10e + OnRobot RG2 + RealSense D435if (eye-in-hand)
# Detection: Grounding DINO via Retina skill group

import numpy as np
import time
import types
import cv2
from loguru import logger

from telekinesis.synapse.robots.manipulators.universal_robots import UniversalRobotsUR10E
from telekinesis.synapse.tools.parallel_grippers.onrobot import OnRobotRG2
from telekinesis.medulla.cameras.realsense import RealSense
from telekinesis import retina, pupil
from datatypes import datatypes

# ----------------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------------
ROBOT_IP = "192.168.1.2"
GRIPPER_IP = "192.168.1.1"

flange_T_cam = np.array([
    [ 3.25458660e-02, -9.99468683e-01, -1.76577372e-03,  7.58630134e-02],
    [ 9.98910351e-01,  3.25867105e-02, -3.34098251e-02, 0.7464808e-02],
    [ 3.34496146e-02, -6.76497960e-04,  9.99440176e-01, -3.37992198e-02],
    [ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  1.00000000e+00],
])

# tcp_T_flange transforms points from the gripper (TCP/fingertips) frame
# into the flange frame. Its translation [0, 0, -0.16] means the gripper TCP
# is offset +0.16 m along the flange's +Z axis. We use this matrix to
# compensate the commanded flange pose so that the gripper fingertips reach
# the desired target point in the base frame.
tcp_T_flange = np.array([
    [-1.0, 0.0, 0.0, 0.0],
    [0.0, -1.0, 0.0, 0.0],
    [0.0, 0.0, 1.0, -0.16],
    [0.0, 0.0, 0.0, 1.0],
])

# Inverse: flange_T_tcp (used if needed elsewhere)
flange_T_tcp = np.linalg.inv(tcp_T_flange)

# OBSERVATION_POSE is a flange pose (no compensation needed; used only for moving
# the robot to an observation viewpoint).
OBSERVATION_POSE = [0., 0.65, 0.85, 180.0, 0.0, 90.0]

# Hardcoded RealSense color intrinsics (no distortion)
intrinsics = types.SimpleNamespace(
    fx=1365.9793701171875,
    fy=1364.888671875,
    ppx=979.55517578125,
    ppy=568.107421875,
)

# Task parameters
APPROACH_HEIGHT = 0.25      # m above target before descent
PICK_DESCEND_DEPTH = 0.02   # m below detected pick point for grasp
PLACE_HEIGHT_ABOVE_BIN = 0.22  # m above bin top to release
SPEED = 0.4
ACCEL = 0.6


# ----------------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------------
def pose_xyzrpy_to_matrix(pose):
    """Convert [x,y,z,rx,ry,rz] (degrees) to a 4x4 homogeneous matrix.
    UR uses rotation vector (axis-angle) in radians for poses, but the Telekinesis
    UR API uses Euler angles in degrees. We follow the UR API convention here:
    rx,ry,rz are interpreted as XYZ Euler angles in degrees.
    """
    x, y, z, rx, ry, rz = pose
    rx_r, ry_r, rz_r = np.deg2rad([rx, ry, rz])
    Rx = np.array([[1, 0, 0],
                   [0, np.cos(rx_r), -np.sin(rx_r)],
                   [0, np.sin(rx_r),  np.cos(rx_r)]])
    Ry = np.array([[ np.cos(ry_r), 0, np.sin(ry_r)],
                   [0, 1, 0],
                   [-np.sin(ry_r), 0, np.cos(ry_r)]])
    Rz = np.array([[np.cos(rz_r), -np.sin(rz_r), 0],
                   [np.sin(rz_r),  np.cos(rz_r), 0],
                   [0, 0, 1]])
    R = Rz @ Ry @ Rx
    T = np.eye(4)
    T[:3, :3] = R
    T[:3, 3] = [x, y, z]
    return T


def matrix_to_xyzrpy_deg(T):
    """Recover [x,y,z,rx,ry,rz] (degrees) from a 4x4 homogeneous matrix,
    inverting the convention R = Rz @ Ry @ Rx used in pose_xyzrpy_to_matrix.
    """
    R = T[:3, :3]
    x, y, z = T[:3, 3]
    sy = R[0, 2]
    ry = np.arcsin(np.clip(sy, -1.0, 1.0))
    if abs(sy) < 0.99999:
        rx = np.arctan2(-R[1, 2], R[2, 2])
        rz = np.arctan2(-R[0, 1], R[0, 0])
    else:
        # gimbal lock
        rx = np.arctan2(R[2, 1], R[1, 1])
        rz = 0.0
    return [float(x), float(y), float(z),
            float(np.rad2deg(rx)), float(np.rad2deg(ry)), float(np.rad2deg(rz))]


def gripper_target_to_flange_pose(target_xyz, orient_deg):
    """Given a desired gripper-TCP position (in base frame) and orientation
    (XYZ Euler degrees), compute the flange pose [x,y,z,rx,ry,rz] (degrees)
    that places the gripper at the target.
    """
    base_T_gripper = pose_xyzrpy_to_matrix(
        [target_xyz[0], target_xyz[1], target_xyz[2],
         orient_deg[0], orient_deg[1], orient_deg[2]]
    )
    base_T_flange_cmd = base_T_gripper @ tcp_T_flange
    return matrix_to_xyzrpy_deg(base_T_flange_cmd)


def pixel_to_camera_point(intrinsics, u, v, depth_m):
    """Back-project pixel (u,v) with depth (meters) to 3D point in camera frame."""
    fx, fy = intrinsics.fx, intrinsics.fy
    cx, cy = intrinsics.ppx, intrinsics.ppy
    x = (u - cx) * depth_m / fx
    y = (v - cy) * depth_m / fy
    z = depth_m
    return np.array([x, y, z, 1.0])


def get_median_depth(depth_image, u, v, half_window=3):
    """Sample a small patch around (u,v) and return the median non-zero depth."""
    h, w = depth_image.shape[:2]
    u0 = max(0, u - half_window)
    u1 = min(w, u + half_window + 1)
    v0 = max(0, v - half_window)
    v1 = min(h, v + half_window + 1)
    patch = depth_image[v0:v1, u0:u1].astype(np.float32)
    valid = patch[patch > 0]
    if valid.size == 0:
        return None
    return float(np.median(valid))


def find_detection_for_label(annotations, categories, target_label):
    """Find the highest-scoring detection whose category name matches target_label."""
    ann_list = annotations.to_list()
    cat_list = categories.to_list()
    cat_id_to_name = {c["id"]: c["name"].lower() for c in cat_list}
    target_lc = target_label.lower()

    best = None
    best_score = -1.0
    for ann in ann_list:
        cat_name = cat_id_to_name.get(ann.get("category_id"), "")
        if target_lc in cat_name or cat_name in target_lc:
            score = ann.get("score", 0.0) or 0.0
            if score > best_score:
                best_score = score
                best = ann
    return best


def compute_base_point_from_pixel(u, v, depth_image, intrinsics, base_T_flange):
    """Project pixel to 3D in camera frame, then transform to robot base frame."""
    z = get_median_depth(depth_image, int(u), int(v), half_window=3)
    if z is None or z <= 0.0:
        raise RuntimeError(f"Invalid depth at pixel ({u},{v})")
    p_cam = pixel_to_camera_point(intrinsics, int(u), int(v), z)
    base_T_cam = base_T_flange @ flange_T_cam
    p_base = base_T_cam @ p_cam
    return p_base[:3]


def bbox_center(ann):
    """Return (cx, cy) center of a COCO XYWH bbox dict."""
    bbox = ann["bbox"]
    x, y, w, h = bbox
    return (x + w / 2.0, y + h / 2.0)


# ----------------------------------------------------------------------------
# Main pipeline
# ----------------------------------------------------------------------------
robot = None
gripper = None
camera = None

try:
    # ---- 1. Connect hardware ----
    logger.info("Connecting to UR10e robot...")
    robot = UniversalRobotsUR10E()
    robot.connect(ROBOT_IP)

    logger.info("Connecting to OnRobot RG2 gripper...")
    gripper = OnRobotRG2()
    gripper.connect(GRIPPER_IP, protocol="MODBUS_TCP")

    logger.info("Connecting to RealSense D435if...")
    camera = RealSense(name="d435if")
    camera.connect(warmup_frames=30)

    logger.info(f"Color intrinsics: fx={intrinsics.fx:.2f} fy={intrinsics.fy:.2f} "
                f"cx={intrinsics.ppx:.2f} cy={intrinsics.ppy:.2f}")

    # ---- 2. Move to observation pose, open gripper ----
    logger.info("Moving to observation pose...")
    robot.set_cartesian_pose(OBSERVATION_POSE, speed=SPEED, acceleration=ACCEL)
    logger.info("Opening gripper...")
    gripper.open(force=40.0)
    time.sleep(0.5)

    # ---- 3. Capture RGB + depth ----
    logger.info("Capturing RGB + depth frames...")
    rgb = camera.capture_single_color_frame()
    depth = camera.capture_single_depth_frame()
    if rgb is None or depth is None:
        raise RuntimeError("Failed to capture RGB or depth frame from RealSense.")
    logger.info(f"RGB shape: {rgb.shape}, depth shape: {depth.shape}")

    # ---- 4. Detect with Grounding DINO ----
    # Grounding DINO requires dot-separated lowercase prompts ending with a dot.
    prompt = "a yellow bottle . a blue bin ."
    logger.info(f"Running Grounding DINO with prompt: {prompt!r}")
    annotations, categories = retina.detect_objects_using_grounding_dino(
        image=rgb,
        prompt=prompt,
        box_threshold=0.30,
        text_threshold=0.25,
    )

    bottle_ann = find_detection_for_label(annotations, categories, "yellow bottle")
    if bottle_ann is None:
        bottle_ann = find_detection_for_label(annotations, categories, "bottle")
    bin_ann = find_detection_for_label(annotations, categories, "blue bin")
    if bin_ann is None:
        bin_ann = find_detection_for_label(annotations, categories, "bin")

    if bottle_ann is None:
        raise RuntimeError("Yellow bottle not detected by Grounding DINO.")
    if bin_ann is None:
        raise RuntimeError("Blue bin not detected by Grounding DINO.")
    logger.info(f"Bottle bbox: {bottle_ann['bbox']}, score={bottle_ann.get('score')}")
    logger.info(f"Bin bbox:    {bin_ann['bbox']}, score={bin_ann.get('score')}")

    # ---- 5/6/7. Compute 3D points in base frame ----
    # Current robot pose (flange in base frame) at observation pose
    current_pose = robot.get_cartesian_pose()
    base_T_flange = pose_xyzrpy_to_matrix(current_pose)
    logger.info(f"Current robot pose: {current_pose}")

    bottle_u, bottle_v = bbox_center(bottle_ann)
    bin_u, bin_v = bbox_center(bin_ann)

    pick_point_base = compute_base_point_from_pixel(
        bottle_u, bottle_v, depth, intrinsics, base_T_flange
    )
    place_point_base = compute_base_point_from_pixel(
        bin_u, bin_v, depth, intrinsics, base_T_flange
    )
    logger.info(f"Pick point (base):  {pick_point_base}")
    logger.info(f"Place point (base): {place_point_base}")

    # Use the orientation of the observation pose for top-down approach (already pointing down)
    obs_orient = OBSERVATION_POSE[3:]  # [rx, ry, rz] degrees

    # ---- 8. Pick sequence ----
    pick_above = gripper_target_to_flange_pose(
        [pick_point_base[0], pick_point_base[1], pick_point_base[2] + APPROACH_HEIGHT],
        obs_orient,
    )
    pick_grasp = gripper_target_to_flange_pose(
        [pick_point_base[0], pick_point_base[1], pick_point_base[2] - PICK_DESCEND_DEPTH],
        obs_orient,
    )

    logger.info(f"Moving above bottle: {pick_above}")
    robot.set_cartesian_pose(pick_above, speed=SPEED, acceleration=ACCEL)

    logger.info(f"Descending to grasp: {pick_grasp}")
    robot.set_cartesian_pose(pick_grasp, speed=0.15, acceleration=0.3)

    logger.info("Closing gripper to grasp bottle...")
    gripper.close(force=40.0)
    time.sleep(0.8)

    logger.info("Lifting bottle...")
    robot.set_cartesian_pose(pick_above, speed=0.2, acceleration=0.4)

    # ---- 9. Place sequence ----
    place_above = gripper_target_to_flange_pose(
        [place_point_base[0], place_point_base[1], place_point_base[2] + APPROACH_HEIGHT],
        obs_orient,
    )
    place_release = gripper_target_to_flange_pose(
        [place_point_base[0], place_point_base[1], place_point_base[2] + PLACE_HEIGHT_ABOVE_BIN],
        obs_orient,
    )

    logger.info(f"Moving above bin: {place_above}")
    robot.set_cartesian_pose(place_above, speed=SPEED, acceleration=ACCEL)

    logger.info(f"Descending to release height: {place_release}")
    robot.set_cartesian_pose(place_release, speed=0.2, acceleration=0.4)

    logger.info("Opening gripper to release bottle...")
    gripper.open(force=40.0)
    time.sleep(0.8)

    logger.info("Retreating above bin...")
    robot.set_cartesian_pose(place_above, speed=0.2, acceleration=0.4)

    # ---- 10. Return to observation pose ----
    logger.info("Returning to observation pose...")
    robot.set_cartesian_pose(OBSERVATION_POSE, speed=SPEED, acceleration=ACCEL)

    logger.info("Pick-and-place pipeline completed successfully.")

except Exception as e:
    logger.exception(f"Pipeline failed: {e}")
    # Attempt to halt robot safely on failure
    try:
        if robot is not None and robot.is_connected():
            robot.stop_cartesian_motion(stopping_speed=0.5)
    except Exception as stop_err:
        logger.error(f"Failed to stop robot motion: {stop_err}")
    raise

finally:
    # ---- Hardware cleanup (always runs) ----
    logger.info("Cleaning up hardware...")
    if gripper is not None:
        try:
            gripper.disconnect()
            logger.info("Gripper disconnected.")
        except Exception as e:
            logger.error(f"Error disconnecting gripper: {e}")

    if camera is not None:
        try:
            camera.disconnect()
            camera.delete()
            logger.info("Camera disconnected and deleted.")
        except Exception as e:
            logger.error(f"Error disconnecting camera: {e}")

    if robot is not None:
        try:
            robot.disconnect()
            logger.info("Robot disconnected.")
        except Exception as e:
            logger.error(f"Error disconnecting robot: {e}")