Skip to content

Classify Labels on Conveyor

SUMMARY

Classify package labels as they move down a conveyor. Best for shipping, sorting, and fulfillment QA — replace manual scan stations and keep throughput steady.

Code

INFO

The following code was generated by the Tzara agent.

python
# Pipeline for package label classification
# 3-stage: BiRefNet -> Contour detection -> Grounding DINO
# Visualizes 4 rerun windows and saves annotated video
 
import cv2
import numpy as np
import rerun as rr
from loguru import logger
 
from datatypes import datatypes
from telekinesis import cornea, retina, pupil
 
# ============================================================
# Tunable constants
# ============================================================
INPUT_VIDEO_PATH = "output/package_label.mp4"
OUTPUT_VIDEO_PATH = "output.mp4"
 
# Image ROI [x, y, width, height]
ROI_X, ROI_Y, ROI_W, ROI_H = 455, 207, 511, 356
 
# Process every Nth frame
FRAME_STRIDE = 20
 
# BiRefNet
BIREFNET_MASK_THRESHOLD = 0
 
# Contour detection
CONTOUR_MIN_AREA = 10000
CONTOUR_MAX_AREA = 10_000_000
CONTOUR_RETRIEVAL_MODE = "external"
CONTOUR_APPROX_METHOD = "simple"
 
# Grounding DINO
GDINO_PROMPT = "shipping label . barcode . address label . sticker ."
GDINO_BOX_THRESHOLD = 0.25
GDINO_TEXT_THRESHOLD = 0.25
 
# Class IDs and colors (RGB)
CLASS_NO_LABEL = 0
CLASS_LABEL_PRESENT = 1
COLOR_NO_LABEL = (255, 0, 0)       # red
COLOR_LABEL_PRESENT = (0, 255, 0)  # green
 
# Rerun entity paths
RR_APP_ID = "package_label_classification"
RR_PATH_INPUT = "input/frame"
RR_PATH_MASK = "stage1_birefnet/mask"
RR_PATH_CONTOUR = "stage2_contour/image"
RR_PATH_CONTOUR_BOX = "stage2_contour/image/bbox"
RR_PATH_FINAL = "stage3_final/image"
RR_PATH_FINAL_BOX = "stage3_final/image/bbox"
 
 
def get_largest_valid_contour_bbox(mask_np: np.ndarray):
    # Returns (x, y, w, h) in ROI-local coords or None
    try:
        annotations = retina.detect_contours(
            image=mask_np,
            retrieval_mode=CONTOUR_RETRIEVAL_MODE,
            approx_method=CONTOUR_APPROX_METHOD,
            min_area=CONTOUR_MIN_AREA,
            max_area=CONTOUR_MAX_AREA,
        )
    except Exception as e:
        logger.warning(f"Contour detection failed: {e}")
        return None
 
    if annotations is None:
        return None
 
    try:
        ann_list = annotations.to_list()
    except Exception as e:
        logger.warning(f"Failed to read contour annotations: {e}")
        return None
 
    if not ann_list:
        return None
 
    # Pick contour with largest bbox area
    best = None
    best_area = -1.0
    for ann in ann_list:
        bbox = ann.get("bbox", None)
        if bbox is None or len(bbox) != 4:
            continue
        x, y, w, h = bbox
        area = float(w) * float(h)
        if area > best_area:
            best_area = area
            best = (float(x), float(y), float(w), float(h))
    return best
 
 
def detect_labels_in_crop(crop_bgr: np.ndarray) -> bool:
    # Grounding DINO expects RGB
    try:
        crop_rgb = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2RGB)
        annotations, _categories = retina.detect_objects_using_grounding_dino(
            image=crop_rgb,
            prompt=GDINO_PROMPT,
            box_threshold=GDINO_BOX_THRESHOLD,
            text_threshold=GDINO_TEXT_THRESHOLD,
        )
    except Exception as e:
        logger.warning(f"Grounding DINO failed: {e}")
        return False
 
    if annotations is None:
        return False
 
    try:
        ann_list = annotations.to_list()
    except Exception as e:
        logger.warning(f"Failed to read GDINO annotations: {e}")
        return False
 
    return len(ann_list) > 0
 
 
def main():
    logger.info("Initializing rerun")
    rr.init(RR_APP_ID, spawn=True)
 
    logger.info(f"Opening input video: {INPUT_VIDEO_PATH}")
    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    if not cap.isOpened():
        logger.error(f"Could not open input video: {INPUT_VIDEO_PATH}")
        return
 
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    logger.info(f"Video: {frame_w}x{frame_h} @ {fps:.2f} FPS")
 
    # Output video writer (write at reduced FPS to match stride)
    out_fps = max(1.0, fps / FRAME_STRIDE)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, out_fps, (frame_w, frame_h))
    if not writer.isOpened():
        logger.error(f"Could not open output video for writing: {OUTPUT_VIDEO_PATH}")
        cap.release()
        return
 
    frame_idx = 0
    processed = 0
 
    try:
        while True:
            ret, frame_bgr = cap.read()
            if not ret:
                logger.info("End of video reached")
                break
 
            if frame_idx % FRAME_STRIDE != 0:
                frame_idx += 1
                continue
 
            logger.info(f"Processing frame {frame_idx}")
 
            # Convert to RGB for visualization and processing
            frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
 
            # ---- Window 1: input frame ----
            rr.log(RR_PATH_INPUT, rr.Image(frame_rgb))
 
            # ---- Crop ROI ----
            x0 = max(0, ROI_X)
            y0 = max(0, ROI_Y)
            x1 = min(frame_w, ROI_X + ROI_W)
            y1 = min(frame_h, ROI_Y + ROI_H)
            roi_rgb = frame_rgb[y0:y1, x0:x1].copy()
            if roi_rgb.size == 0:
                logger.warning(f"Frame {frame_idx}: empty ROI, skipping")
                frame_idx += 1
                continue
 
            # ---- Stage 1: BiRefNet ----
            logger.info(f"Frame {frame_idx}: Stage 1 - BiRefNet foreground segmentation")
            try:
                pano_ann = cornea.segment_image_using_foreground_birefnet(
                    image=roi_rgb,
                    mask_threshold=BIREFNET_MASK_THRESHOLD,
                )
            except Exception as e:
                logger.warning(f"Frame {frame_idx}: BiRefNet failed: {e}")
                frame_idx += 1
                continue
 
            if pano_ann is None:
                logger.warning(f"Frame {frame_idx}: BiRefNet returned no annotation, skipping")
                frame_idx += 1
                continue
 
            try:
                labeled_mask = pano_ann.labeled_mask
                if isinstance(labeled_mask, datatypes.Image):
                    mask_np = labeled_mask.to_numpy()
                else:
                    mask_np = np.asarray(labeled_mask)
            except Exception as e:
                logger.warning(f"Frame {frame_idx}: failed to extract BiRefNet mask: {e}")
                frame_idx += 1
                continue
 
            # Make a binary uint8 mask for downstream + visualization
            if mask_np.ndim == 3:
                mask_gray = cv2.cvtColor(mask_np, cv2.COLOR_RGB2GRAY)
            else:
                mask_gray = mask_np
            mask_bin = (mask_gray > 0).astype(np.uint8) * 255
 
            # ---- Window 2: BiRefNet mask ----
            rr.log(RR_PATH_MASK, rr.Image(mask_bin))
 
            # ---- Stage 2: contour detection ----
            logger.info(f"Frame {frame_idx}: Stage 2 - Contour detection")
            bbox_local = get_largest_valid_contour_bbox(mask_bin)
 
            # Visualize stage 2 (ROI image + bbox if any)
            rr.log(RR_PATH_CONTOUR, rr.Image(roi_rgb))
 
            if bbox_local is None:
                logger.warning(f"Frame {frame_idx}: no valid contour bbox, skipping to next frame")
                # Clear any previous bbox on this entity
                rr.log(RR_PATH_CONTOUR_BOX, rr.Clear(recursive=False))
                rr.log(RR_PATH_FINAL, rr.Image(frame_rgb))
                rr.log(RR_PATH_FINAL_BOX, rr.Clear(recursive=False))
                # Still write the unannotated frame to output
                writer.write(frame_bgr)
                frame_idx += 1
                processed += 1
                continue
 
            bx, by, bw, bh = bbox_local
            # Clip bbox to ROI
            bx_i = int(max(0, bx))
            by_i = int(max(0, by))
            bw_i = int(max(1, min(roi_rgb.shape[1] - bx_i, bw)))
            bh_i = int(max(1, min(roi_rgb.shape[0] - by_i, bh)))
 
            # Log stage 2 bbox (centers + half_sizes, in ROI image coords)
            cx_local = bx_i + bw_i / 2.0
            cy_local = by_i + bh_i / 2.0
            rr.log(
                RR_PATH_CONTOUR_BOX,
                rr.Boxes2D(
                    centers=[[cx_local, cy_local]],
                    half_sizes=[[bw_i / 2.0, bh_i / 2.0]],
                    colors=[[0, 255, 255]],
                    labels=["package"],
                ),
            )
 
            # ---- Stage 3: Grounding DINO on bbox crop ----
            logger.info(f"Frame {frame_idx}: Stage 3 - Grounding DINO label detection")
            crop_rgb = roi_rgb[by_i:by_i + bh_i, bx_i:bx_i + bw_i].copy()
            if crop_rgb.size == 0:
                logger.warning(f"Frame {frame_idx}: empty bbox crop, treating as no_label")
                has_label = False
            else:
                crop_bgr = cv2.cvtColor(crop_rgb, cv2.COLOR_RGB2BGR)
                has_label = detect_labels_in_crop(crop_bgr)
 
            if has_label:
                class_id = CLASS_LABEL_PRESENT
                color = COLOR_LABEL_PRESENT
                label_text = "label_present"
            else:
                class_id = CLASS_NO_LABEL
                color = COLOR_NO_LABEL
                label_text = "no_label"
 
            logger.info(f"Frame {frame_idx}: classification = {label_text} (class {class_id})")
 
            # Convert bbox to full-frame coords
            fx = x0 + bx_i
            fy = y0 + by_i
            fw = bw_i
            fh = bh_i
 
            # ---- Window 4: final annotated frame ----
            annotated_bgr = frame_bgr.copy()
            cv2.rectangle(
                annotated_bgr,
                (fx, fy),
                (fx + fw, fy + fh),
                (color[2], color[1], color[0]),  # BGR
                3,
            )
            cv2.putText(
                annotated_bgr,
                label_text,
                (fx, max(0, fy - 8)),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (color[2], color[1], color[0]),
                2,
                cv2.LINE_AA,
            )
 
            annotated_rgb = cv2.cvtColor(annotated_bgr, cv2.COLOR_BGR2RGB)
            rr.log(RR_PATH_FINAL, rr.Image(annotated_rgb))
            rr.log(
                RR_PATH_FINAL_BOX,
                rr.Boxes2D(
                    centers=[[fx + fw / 2.0, fy + fh / 2.0]],
                    half_sizes=[[fw / 2.0, fh / 2.0]],
                    colors=[[color[0], color[1], color[2]]],
                    labels=[label_text],
                    class_ids=[class_id],
                ),
            )
 
            # Write annotated frame to output video
            writer.write(annotated_bgr)
            processed += 1
            frame_idx += 1
 
    finally:
        logger.info(f"Processed {processed} frames; releasing resources")
        cap.release()
        writer.release()
        logger.info(f"Output video saved to {OUTPUT_VIDEO_PATH}")
 
 
if __name__ == "__main__":
    main()