Detect Cellphone from Webcam Video
This example demonstrates how to use Tzara, the Telekinesis Physical AI Agent, to build a computer vision pipeline that detects a cellphone from a webcam video feed. Tzara generates code that captures video from the webcamV, runs a pre-trained YOLOx model for object detection, and outputs bounding boxes.
The Natural Language Instruction
The natural language instruction for this task is:
Capture an video stream with webcam and detect the cellphone in every frame and visualize itThe Generated Code
The generated code captures video from the webcam, detects cellphones using YOLOx, and prints the bounding boxes and confidence scores for each detected cellphone in every frame.
python
import cv2
import numpy as np
import threading
import queue
from loguru import logger
# Import the Webcam class from medulla
from telekinesis.medulla.cameras.webcam import Webcam
# Import Telekinesis detection and visualization modules
from telekinesis import retina
from datatypes import datatypes
# ─────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────
CAMERA_NAME = "webcam_0"
CAMERA_ID = 0
SCORE_THRESHOLD = 0.35
NMS_THRESHOLD = 0.45
CELLPHONE_CLASS_NAMES = {"cell phone", "cellphone", "mobile phone", "phone"}
FRAME_QUEUE_MAX = 2 # keep latency low
# ─────────────────────────────────────────────
# Shared state
# ─────────────────────────────────────────────
frame_queue: queue.Queue = queue.Queue(maxsize=FRAME_QUEUE_MAX)
stop_event = threading.Event()
def capture_thread_fn(webcam: Webcam) -> None:
"""
Continuously capture frames from the webcam and push them into the queue.
Drops the oldest frame when the queue is full to avoid stale data.
"""
logger.info("Capture thread started.")
while not stop_event.is_set():
try:
frame = webcam.capture_video_color_frame() # returns RGB ndarray
if frame is None:
continue
# Drop oldest frame if queue is full to keep latency low
if frame_queue.full():
try:
frame_queue.get_nowait()
except queue.Empty:
pass
frame_queue.put(frame)
except RuntimeError as exc:
logger.error(f"Capture error: {exc}")
stop_event.set()
break
logger.info("Capture thread stopped.")
def draw_detections(
frame_bgr: np.ndarray,
annotations: datatypes.ObjectDetectionAnnotations,
categories: datatypes.Categories,
) -> np.ndarray:
"""
Draw bounding boxes and labels for detected cellphones on the frame.
Args:
frame_bgr: BGR image as numpy array.
annotations: COCO-style object detection annotations.
categories: COCO category list returned by the detector.
Returns:
Annotated BGR image.
"""
# Build a quick id→name lookup from categories
cat_id_to_name: dict[int, str] = {}
if categories is not None and hasattr(categories, "categories"):
for cat in categories.categories:
cat_id_to_name[cat["id"]] = cat["name"].lower()
if annotations is None or not hasattr(annotations, "annotations"):
return frame_bgr
for ann in annotations.annotations:
cat_name = cat_id_to_name.get(ann["category_id"], "").lower()
# Filter to cellphone detections only
if not any(kw in cat_name for kw in CELLPHONE_CLASS_NAMES):
continue
# bbox is [x, y, w, h] in COCO XYWH format
x, y, w, h = ann["bbox"]
x1, y1, x2, y2 = int(x), int(y), int(x + w), int(y + h)
score = ann.get("score", None)
# Draw bounding box
cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Build label string
label = f"cell phone"
if score is not None:
label += f" {score:.2f}"
# Draw label background
(tw, th), baseline = cv2.getTextSize(
label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2
)
cv2.rectangle(
frame_bgr,
(x1, y1 - th - baseline - 4),
(x1 + tw, y1),
(0, 255, 0),
-1,
)
cv2.putText(
frame_bgr,
label,
(x1, y1 - baseline - 2),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 0, 0),
2,
cv2.LINE_AA,
)
return frame_bgr
def main() -> None:
# ── 1. Initialise and connect the webcam ──────────────────────────────────
webcam = Webcam(name=CAMERA_NAME, camera_id=CAMERA_ID)
webcam.connect()
logger.info("Webcam connected.")
# ── 2. Start the background capture thread ────────────────────────────────
cap_thread = threading.Thread(target=capture_thread_fn, args=(webcam,), daemon=True)
cap_thread.start()
logger.info("Starting detection loop. Press 'q' to quit.")
try:
while not stop_event.is_set():
# ── 3. Grab the latest frame ──────────────────────────────────────
try:
frame_rgb = frame_queue.get(timeout=1.0)
except queue.Empty:
continue
# ── 4. Run cellphone detection via YOLOX ──────────────────────────
# retina.detect_objects_using_yolox expects an RGB image
try:
annotations, categories = retina.detect_objects_using_yolox(
image=frame_rgb,
score_threshold=SCORE_THRESHOLD,
nms_threshold=NMS_THRESHOLD,
)
except Exception as exc:
logger.warning(f"Detection failed for frame: {exc}")
annotations, categories = None, None
# ── 5. Convert RGB → BGR for OpenCV display ───────────────────────
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
# ── 6. Draw detections on the frame ───────────────────────────────
frame_bgr = draw_detections(frame_bgr, annotations, categories)
# ── 7. Show the annotated frame ───────────────────────────────────
cv2.imshow("Cellphone Detection", frame_bgr)
# ── 8. Check for quit key ─────────────────────────────────────────
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
logger.info("Quit key pressed.")
stop_event.set()
break
finally:
# ── 9. Clean up ───────────────────────────────────────────────────────
stop_event.set()
cap_thread.join(timeout=3.0)
webcam.disconnect()
cv2.destroyAllWindows()
logger.info("Pipeline shut down cleanly.")
