Cup Detection — Compare All Models
This example demonstrates how to use Tzara to compare four detection models side by side on a live webcam stream. YOLOX, RF-DETR, Grounding DINO, and Qwen run in parallel on each frame, with results displayed in a 2×2 grid and saved as PNG frames.
Four detection models compared side by side: Grounding DINO, YOLOX, RF-DETR, and Qwen
The Natural Language Instructions
Prompt 1: Capture an image with a webcam and detect the cup in the image
Prompt 2: Add visualization using OpenCV
Prompt 3: Also add YOLOX, RF-DETR, and Qwen so that I can compare the results — run on a video stream
Prompt 4: Save the video output
Prompt 5: Save images instead of the videoThe Generated Code
python
import datetime
import os
import cv2
import numpy as np
import concurrent.futures
from datatypes import datatypes
from telekinesis.medulla.cameras.webcam import Webcam
from telekinesis import retina
def draw_detections(frame_bgr, detections, categories, color=(0, 255, 0)):
annotated = frame_bgr.copy()
cat_id_to_name = {cat["id"]: cat["name"] for cat in categories}
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.55
thickness = 2
for det in detections:
bbox = det.get("bbox", [0, 0, 0, 0])
x, y, w, h = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
score = det.get("score", None)
cat_id = det.get("category_id", None)
label = cat_id_to_name.get(cat_id, "cup")
label_str = f"{label}: {score:.2f}" if score is not None else label
cv2.rectangle(annotated, (x, y), (x + w, y + h), color, 2)
(tw, th), baseline = cv2.getTextSize(label_str, font, font_scale, thickness)
label_y = max(y - 5, th + 5)
cv2.rectangle(annotated, (x, label_y - th - baseline), (x + tw, label_y + baseline), color, cv2.FILLED)
cv2.putText(annotated, label_str, (x, label_y), font, font_scale, (0, 0, 0), thickness)
return annotated
def add_title_bar(panel, title, bar_height=28):
bar = np.zeros((bar_height, panel.shape[1], 3), dtype=np.uint8)
cv2.putText(bar, title, (6, bar_height - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2)
return np.vstack([bar, panel])
def run_grounding_dino(image):
annotations, categories = retina.detect_objects_using_grounding_dino(
image=image, prompt="cup .", box_threshold=0.25, text_threshold=0.25,
)
return annotations.to_list(), categories.to_list()
def run_yolox(image):
annotations, categories = retina.detect_objects_using_yolox(
image=image, score_threshold=0.25, nms_threshold=0.45,
)
all_dets = annotations.to_list()
all_cats = categories.to_list()
cup_cat_ids = {cat["id"] for cat in all_cats if "cup" in cat["name"].lower()}
filtered = [d for d in all_dets if d.get("category_id") in cup_cat_ids]
return filtered, all_cats
def run_rfdetr(image):
annotations, categories = retina.detect_objects_using_rfdetr(
image=image, score_threshold=0.5,
)
all_dets = annotations.to_list()
all_cats = categories.to_list()
cup_cat_ids = {cat["id"] for cat in all_cats if "cup" in cat["name"].lower()}
filtered = [d for d in all_dets if d.get("category_id") in cup_cat_ids]
return filtered, all_cats
def run_qwen(image):
annotations = retina.detect_objects_using_qwen(
image=image, prompt="cup",
)
dets = annotations.to_list()
cats = [{"id": 0, "name": "cup"}]
return dets, cats
DETECTORS = [
("Grounding DINO", run_grounding_dino, (0, 255, 0)),
("YOLOX", run_yolox, (255, 128, 0)),
("RF-DETR", run_rfdetr, (0, 128, 255)),
("Qwen", run_qwen, (128, 0, 255)),
]
cam = Webcam(name="webcam_0", camera_id=0)
connected = cam.connect()
if not connected:
raise RuntimeError("Failed to connect to webcam.")
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"cup_detections_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
print(f"Saving frames to directory: {output_dir}")
frame_index = 0
print("Starting detection loop — press 'q' in the display window to quit.")
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
while True:
frame_bgr = cam.capture_single_color_frame()
if frame_bgr is None:
print("Warning: failed to capture frame, retrying...")
continue
image = datatypes.Image(image=frame_bgr, color_model="BGR")
futures = {
executor.submit(fn, image): (name, color)
for name, fn, color in DETECTORS
}
results = {}
for future in concurrent.futures.as_completed(futures):
name, color = futures[future]
try:
dets, cats = future.result()
except Exception as exc:
print(f"[{name}] detection error: {exc}")
dets, cats = [], []
results[name] = (dets, cats, color)
panels = []
for det_name, _, color in DETECTORS:
dets, cats, col = results[det_name]
panel = draw_detections(frame_bgr, dets, cats, color=col)
panel = add_title_bar(panel, det_name)
panels.append(panel)
ref_h, ref_w = panels[0].shape[:2]
resized = [cv2.resize(p, (ref_w, ref_h)) if p.shape[0] != ref_h or p.shape[1] != ref_w else p for p in panels]
top_row = np.hstack([resized[0], resized[1]])
bottom_row = np.hstack([resized[2], resized[3]])
grid = np.vstack([top_row, bottom_row])
frame_index += 1
save_path = os.path.join(output_dir, f"frame_{frame_index:05d}.png")
cv2.imwrite(save_path, grid)
print(f"Saved: {save_path}")
cv2.imshow("Cup Detections — 2x2 Grid (press q to quit)", grid)
if cv2.waitKey(1) & 0xFF == ord('q'):
print("Quit requested.")
break
finally:
cam.disconnect()
cv2.destroyAllWindows()
print("Disconnected webcam and closed windows.")
print(f"All frames saved to directory: {output_dir}")

