Skip to content

Cup Detection — Compare All Models

This example demonstrates how to use Tzara to compare four detection models side by side on a live webcam stream. YOLOX, RF-DETR, Grounding DINO, and Qwen run in parallel on each frame, with results displayed in a 2×2 grid and saved as PNG frames.

Cup detection compared across four models in a 2x2 grid

Four detection models compared side by side: Grounding DINO, YOLOX, RF-DETR, and Qwen

The Natural Language Instructions

Prompt 1: Capture an image with a webcam and detect the cup in the image
Prompt 2: Add visualization using OpenCV
Prompt 3: Also add YOLOX, RF-DETR, and Qwen so that I can compare the results — run on a video stream
Prompt 4: Save the video output
Prompt 5: Save images instead of the video

The Generated Code

python
import datetime
import os
import cv2
import numpy as np
import concurrent.futures
from datatypes import datatypes
from telekinesis.medulla.cameras.webcam import Webcam
from telekinesis import retina

def draw_detections(frame_bgr, detections, categories, color=(0, 255, 0)):
    annotated = frame_bgr.copy()
    cat_id_to_name = {cat["id"]: cat["name"] for cat in categories}
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.55
    thickness = 2
    for det in detections:
        bbox = det.get("bbox", [0, 0, 0, 0])
        x, y, w, h = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
        score = det.get("score", None)
        cat_id = det.get("category_id", None)
        label = cat_id_to_name.get(cat_id, "cup")
        label_str = f"{label}: {score:.2f}" if score is not None else label
        cv2.rectangle(annotated, (x, y), (x + w, y + h), color, 2)
        (tw, th), baseline = cv2.getTextSize(label_str, font, font_scale, thickness)
        label_y = max(y - 5, th + 5)
        cv2.rectangle(annotated, (x, label_y - th - baseline), (x + tw, label_y + baseline), color, cv2.FILLED)
        cv2.putText(annotated, label_str, (x, label_y), font, font_scale, (0, 0, 0), thickness)
    return annotated

def add_title_bar(panel, title, bar_height=28):
    bar = np.zeros((bar_height, panel.shape[1], 3), dtype=np.uint8)
    cv2.putText(bar, title, (6, bar_height - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2)
    return np.vstack([bar, panel])

def run_grounding_dino(image):
    annotations, categories = retina.detect_objects_using_grounding_dino(
        image=image, prompt="cup .", box_threshold=0.25, text_threshold=0.25,
    )
    return annotations.to_list(), categories.to_list()

def run_yolox(image):
    annotations, categories = retina.detect_objects_using_yolox(
        image=image, score_threshold=0.25, nms_threshold=0.45,
    )
    all_dets = annotations.to_list()
    all_cats = categories.to_list()
    cup_cat_ids = {cat["id"] for cat in all_cats if "cup" in cat["name"].lower()}
    filtered = [d for d in all_dets if d.get("category_id") in cup_cat_ids]
    return filtered, all_cats

def run_rfdetr(image):
    annotations, categories = retina.detect_objects_using_rfdetr(
        image=image, score_threshold=0.5,
    )
    all_dets = annotations.to_list()
    all_cats = categories.to_list()
    cup_cat_ids = {cat["id"] for cat in all_cats if "cup" in cat["name"].lower()}
    filtered = [d for d in all_dets if d.get("category_id") in cup_cat_ids]
    return filtered, all_cats

def run_qwen(image):
    annotations = retina.detect_objects_using_qwen(
        image=image, prompt="cup",
    )
    dets = annotations.to_list()
    cats = [{"id": 0, "name": "cup"}]
    return dets, cats

DETECTORS = [
    ("Grounding DINO", run_grounding_dino, (0, 255,   0)),
    ("YOLOX",          run_yolox,          (255, 128,  0)),
    ("RF-DETR",        run_rfdetr,         (0, 128, 255)),
    ("Qwen",           run_qwen,           (128,   0, 255)),
]

cam = Webcam(name="webcam_0", camera_id=0)
connected = cam.connect()
if not connected:
    raise RuntimeError("Failed to connect to webcam.")

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"cup_detections_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
print(f"Saving frames to directory: {output_dir}")

frame_index = 0

print("Starting detection loop — press 'q' in the display window to quit.")

try:
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        while True:
            frame_bgr = cam.capture_single_color_frame()
            if frame_bgr is None:
                print("Warning: failed to capture frame, retrying...")
                continue

            image = datatypes.Image(image=frame_bgr, color_model="BGR")

            futures = {
                executor.submit(fn, image): (name, color)
                for name, fn, color in DETECTORS
            }

            results = {}
            for future in concurrent.futures.as_completed(futures):
                name, color = futures[future]
                try:
                    dets, cats = future.result()
                except Exception as exc:
                    print(f"[{name}] detection error: {exc}")
                    dets, cats = [], []
                results[name] = (dets, cats, color)

            panels = []
            for det_name, _, color in DETECTORS:
                dets, cats, col = results[det_name]
                panel = draw_detections(frame_bgr, dets, cats, color=col)
                panel = add_title_bar(panel, det_name)
                panels.append(panel)

            ref_h, ref_w = panels[0].shape[:2]
            resized = [cv2.resize(p, (ref_w, ref_h)) if p.shape[0] != ref_h or p.shape[1] != ref_w else p for p in panels]

            top_row    = np.hstack([resized[0], resized[1]])
            bottom_row = np.hstack([resized[2], resized[3]])
            grid = np.vstack([top_row, bottom_row])

            frame_index += 1
            save_path = os.path.join(output_dir, f"frame_{frame_index:05d}.png")
            cv2.imwrite(save_path, grid)
            print(f"Saved: {save_path}")

            cv2.imshow("Cup Detections — 2x2 Grid (press q to quit)", grid)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("Quit requested.")
                break

finally:
    cam.disconnect()
    cv2.destroyAllWindows()
    print("Disconnected webcam and closed windows.")
    print(f"All frames saved to directory: {output_dir}")