Skip to content

Detect Cellphone from Webcam Video

This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a computer vision pipeline that detects a cellphone from a webcam video feed. Tzara generates code that captures video from the webcamV, runs a pre-trained YOLOx model for object detection, and outputs bounding boxes.

The Natural Language Instruction

The natural language instruction for this task is:

Capture an video stream with webcam and detect the cellphone in every frame and visualize it

The Generated Code

The generated code captures video from the webcam, detects cellphones using YOLOx, and prints the bounding boxes and confidence scores for each detected cellphone in every frame.

python
import cv2
import numpy as np
import threading
import queue
from loguru import logger

# Import the Webcam class from medulla
from telekinesis.medulla.cameras.webcam import Webcam

# Import Telekinesis detection and visualization modules
from telekinesis import retina
from datatypes import datatypes

# ─────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────
CAMERA_NAME = "webcam_0"
CAMERA_ID = 0
SCORE_THRESHOLD = 0.35
NMS_THRESHOLD = 0.45
CELLPHONE_CLASS_NAMES = {"cell phone", "cellphone", "mobile phone", "phone"}
FRAME_QUEUE_MAX = 2  # keep latency low

# ─────────────────────────────────────────────
# Shared state
# ─────────────────────────────────────────────
frame_queue: queue.Queue = queue.Queue(maxsize=FRAME_QUEUE_MAX)
stop_event = threading.Event()


def capture_thread_fn(webcam: Webcam) -> None:
    """
    Continuously capture frames from the webcam and push them into the queue.
    Drops the oldest frame when the queue is full to avoid stale data.
    """
    logger.info("Capture thread started.")
    while not stop_event.is_set():
        try:
            frame = webcam.capture_video_color_frame()  # returns RGB ndarray
            if frame is None:
                continue
            # Drop oldest frame if queue is full to keep latency low
            if frame_queue.full():
                try:
                    frame_queue.get_nowait()
                except queue.Empty:
                    pass
            frame_queue.put(frame)
        except RuntimeError as exc:
            logger.error(f"Capture error: {exc}")
            stop_event.set()
            break
    logger.info("Capture thread stopped.")


def draw_detections(
    frame_bgr: np.ndarray,
    annotations: datatypes.ObjectDetectionAnnotations,
    categories: datatypes.Categories,
) -> np.ndarray:
    """
    Draw bounding boxes and labels for detected cellphones on the frame.

    Args:
        frame_bgr: BGR image as numpy array.
        annotations: COCO-style object detection annotations.
        categories: COCO category list returned by the detector.

    Returns:
        Annotated BGR image.
    """
    # Build a quick id→name lookup from categories
    cat_id_to_name: dict[int, str] = {}
    if categories is not None and hasattr(categories, "categories"):
        for cat in categories.categories:
            cat_id_to_name[cat["id"]] = cat["name"].lower()

    if annotations is None or not hasattr(annotations, "annotations"):
        return frame_bgr

    for ann in annotations.annotations:
        cat_name = cat_id_to_name.get(ann["category_id"], "").lower()
        # Filter to cellphone detections only
        if not any(kw in cat_name for kw in CELLPHONE_CLASS_NAMES):
            continue

        # bbox is [x, y, w, h] in COCO XYWH format
        x, y, w, h = ann["bbox"]
        x1, y1, x2, y2 = int(x), int(y), int(x + w), int(y + h)
        score = ann.get("score", None)

        # Draw bounding box
        cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # Build label string
        label = f"cell phone"
        if score is not None:
            label += f" {score:.2f}"

        # Draw label background
        (tw, th), baseline = cv2.getTextSize(
            label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2
        )
        cv2.rectangle(
            frame_bgr,
            (x1, y1 - th - baseline - 4),
            (x1 + tw, y1),
            (0, 255, 0),
            -1,
        )
        cv2.putText(
            frame_bgr,
            label,
            (x1, y1 - baseline - 2),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.6,
            (0, 0, 0),
            2,
            cv2.LINE_AA,
        )

    return frame_bgr


def main() -> None:
    # ── 1. Initialise and connect the webcam ──────────────────────────────────
    webcam = Webcam(name=CAMERA_NAME, camera_id=CAMERA_ID)
    webcam.connect()
    logger.info("Webcam connected.")

    # ── 2. Start the background capture thread ────────────────────────────────
    cap_thread = threading.Thread(target=capture_thread_fn, args=(webcam,), daemon=True)
    cap_thread.start()

    logger.info("Starting detection loop. Press 'q' to quit.")

    try:
        while not stop_event.is_set():
            # ── 3. Grab the latest frame ──────────────────────────────────────
            try:
                frame_rgb = frame_queue.get(timeout=1.0)
            except queue.Empty:
                continue

            # ── 4. Run cellphone detection via YOLOX ──────────────────────────
            # retina.detect_objects_using_yolox expects an RGB image
            try:
                annotations, categories = retina.detect_objects_using_yolox(
                    image=frame_rgb,
                    score_threshold=SCORE_THRESHOLD,
                    nms_threshold=NMS_THRESHOLD,
                )
            except Exception as exc:
                logger.warning(f"Detection failed for frame: {exc}")
                annotations, categories = None, None

            # ── 5. Convert RGB → BGR for OpenCV display ───────────────────────
            frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

            # ── 6. Draw detections on the frame ───────────────────────────────
            frame_bgr = draw_detections(frame_bgr, annotations, categories)

            # ── 7. Show the annotated frame ───────────────────────────────────
            cv2.imshow("Cellphone Detection", frame_bgr)

            # ── 8. Check for quit key ─────────────────────────────────────────
            key = cv2.waitKey(1) & 0xFF
            if key == ord("q"):
                logger.info("Quit key pressed.")
                stop_event.set()
                break

    finally:
        # ── 9. Clean up ───────────────────────────────────────────────────────
        stop_event.set()
        cap_thread.join(timeout=3.0)
        webcam.disconnect()
        cv2.destroyAllWindows()
        logger.info("Pipeline shut down cleanly.")