Skip to content

Publish Video with BabyROS

SUMMARY

Publish Video with BabyROS streams live video using BabyROS as the communication layer. Two scripts share responsibility: a camera server that owns the hardware connection, and a client that controls streaming remotely via BabyROS topics. Optionally, an advanced client wraps capture, streaming, and parameter access behind a small helper class so the rest of your application talks to the camera entirely over BabyROS.

Available on: IDS, Webcam.

The Skill

The camera class registers BabyROS topics under a base path derived from its name:

medulla/v1/camera/<vendor>/<ClassName>/<name>

Clients control streaming by publishing to the start_video_stream and stop_video_stream topics, and receive frames by subscribing to video. Advanced clients can also call connect, capture, and parameter topics for full remote control.

Example

IDS

Webcam

The camera server (camera_loop.py) runs in Terminal 1 — it connects to the hardware and keeps BabyROS nodes alive. The client (publish_video_example.py) runs in Terminal 2 — it publishes a start signal, subscribes to the video topic to receive and log frames to Rerun, then publishes a stop signal after 10 seconds.

The Code

Camera Server (Terminal 1)

python
"""
Camera loop for the IDS camera.
"""
import time

from loguru import logger

from babyros import node
from telekinesis.medulla.cameras import ids


def main():
    camera = None
    try:
        camera = ids.IDS(name="my_ids_camera", serial_number="4108909352")
        camera.connect()
        logger.info("Camera Server is running... Press Ctrl+C to stop.")
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Shutting down server...")
    finally:
        if camera is not None:
            camera.disconnect()
            node.SessionManager.delete()
        logger.info("Completed cleanup.")


if __name__ == "__main__":
    main()
python
"""
Camera loop for the Webcam.
"""
import time

from loguru import logger

from babyros import node
from telekinesis.medulla.cameras import webcam


def main():
    camera = None
    try:
        camera = webcam.Webcam(name="my_webcam", camera_id=0)
        camera.connect()
        logger.info("Camera Server is running... Press Ctrl+C to stop.")
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Shutting down server...")
    finally:
        if camera is not None:
            camera.disconnect()
            node.SessionManager.delete()
        logger.info("Completed cleanup.")


if __name__ == "__main__":
    main()

BabyROS Client (Terminal 2)

python
"""
Stream live video from an IDS camera over BabyROS.
"""
import time

from loguru import logger
import rerun as rr
import numpy as np

from babyros import node


def log_video(data: np.ndarray) -> None:
    rr.log(
        "Continuous_Image_Capture",
        rr.EncodedImage(contents=data, media_type="image/jpeg"),
    )


def main():
    rr.init("IDS_Example", spawn=True)

    name = "my_ids_camera"
    base_topic = f"medulla/v1/camera/ids/IDS/{name}"

    start_video_stream_publisher = node.Publisher(topic=f"{base_topic}/start_video_stream")
    stop_video_stream_publisher = node.Publisher(topic=f"{base_topic}/stop_video_stream")
    video_subscriber = node.Subscriber(topic=f"{base_topic}/video", callback=log_video)

    try:
        start_video_stream_publisher.publish(data={})
        time.sleep(10)
        stop_video_stream_publisher.publish(data={})
    except KeyboardInterrupt:
        logger.info("Shutting down.")
    finally:
        start_video_stream_publisher.delete()
        stop_video_stream_publisher.delete()
        video_subscriber.delete()
        node.SessionManager.delete()
        logger.info("Completed cleanup.")


if __name__ == "__main__":
    main()
python
"""
Stream live video from a webcam over BabyROS.
"""
import time

from loguru import logger
import rerun as rr
import numpy as np

from babyros import node


def log_video(image: np.ndarray) -> None:
    rr.log("Continuous_Image_Capture", rr.Image(image))


def main():
    rr.init("Webcam_Example", spawn=True)

    name = "my_webcam"
    base_topic = f"medulla/v1/camera/webcam/{name}"

    start_video_stream_publisher = node.Publisher(topic=f"{base_topic}/start_video_stream")
    stop_video_stream_publisher = node.Publisher(topic=f"{base_topic}/stop_video_stream")
    video_subscriber = node.Subscriber(topic=f"{base_topic}/video", callback=log_video)

    try:
        start_video_stream_publisher.publish(data={})
        time.sleep(10)
        stop_video_stream_publisher.publish(data={})
    except KeyboardInterrupt:
        logger.info("Shutting down.")
    finally:
        start_video_stream_publisher.delete()
        stop_video_stream_publisher.delete()
        video_subscriber.delete()
        node.SessionManager.delete()
        logger.info("Completed cleanup.")


if __name__ == "__main__":
    main()

The Explanation of the Code

Camera Server (camera_loop.py)

The server creates a camera instance and connects to the hardware. After connecting, it enters an infinite sleep loop — this does nothing except keep the process, and the BabyROS nodes registered inside the camera instance, alive and reachable by remote clients.

python
camera.connect()
while True:
    time.sleep(1)

On Ctrl+C, the finally block disconnects the camera and calls node.SessionManager.delete() to cleanly tear down the BabyROS session.

python
finally:
    if camera is not None:
        camera.disconnect()
        node.SessionManager.delete()

BabyROS Client (publish_video_example.py)

The client never accesses the camera hardware directly. Instead it constructs three BabyROS nodes whose topic paths mirror those registered by the camera instance in the server process. The name variable must exactly match the name passed to the camera in the server, because both sides derive their topic paths from it.

python
name = "my_ids_camera"
base_topic = f"medulla/v1/camera/ids/IDS/{name}"

start_video_stream_publisher = node.Publisher(topic=f"{base_topic}/start_video_stream")
stop_video_stream_publisher = node.Publisher(topic=f"{base_topic}/stop_video_stream")
video_subscriber = node.Subscriber(topic=f"{base_topic}/video", callback=log_video)

Publishing an empty payload to start_video_stream signals the server to begin continuous acquisition. The server pushes each incoming frame onto the video topic — JPEG-encoded for IDS, raw NumPy array for Webcam. Each frame arrives in the log_video callback and is forwarded to Rerun (rr.EncodedImage for the JPEG bytes, rr.Image for the raw array). After 10 seconds, publishing to stop_video_stream halts acquisition on the server side.

python
start_video_stream_publisher.publish(data={})
time.sleep(10)
stop_video_stream_publisher.publish(data={})

The finally block deletes all three nodes and closes the BabyROS session regardless of how the script exits.

python
finally:
    start_video_stream_publisher.delete()
    stop_video_stream_publisher.delete()
    video_subscriber.delete()
    node.SessionManager.delete()

Running the Example

Start the server first, then the client. Both must run from a terminal.

Terminal 1:

bash
python camera_loop.py

Wait for Camera Server is running... Press Ctrl+C to stop., then:

Terminal 2:

bash
python publish_video_example.py

Frames stream under Continuous_Image_Capture for 10 seconds. Press Ctrl+C in Terminal 1 to shut down the server.


Advanced: Remote Camera Control with a Client Helper

The plain client only starts and stops streaming. To call connect, capture, get_parameter, and set_parameter remotely as well, wrap the BabyROS nodes in a small client-side helper class. The example below targets the IDS server but the same pattern applies to any camera that registers connect, capture, and parameter topics.

The camera server (camera_loop.py) must be running first — see the Camera Server section above.

python
"""
Capture a single image and stream live video over BabyROS via a client helper.
"""
from typing import Any
import time

from loguru import logger
import rerun as rr
import numpy as np

from babyros import node


class IDS:
    """Helper class for communicating with the IDS camera over BabyROS."""

    def __init__(self, name: str, serial_number: str) -> None:
        self._name = name
        self._serial_number = serial_number
        self._base_topic = f"medulla/v1/camera/ids/IDS/{self._name}"

        self._connection_client = node.Client(topic=f"{self._base_topic}/connect")
        self._capture_client = node.Client(topic=f"{self._base_topic}/capture")
        self._parameter_client = node.Client(topic=f"{self._base_topic}/parameter")
        self._start_video_stream_publisher = node.Publisher(topic=f"{self._base_topic}/start_video_stream")
        self._stop_video_stream_publisher = node.Publisher(topic=f"{self._base_topic}/stop_video_stream")

    def connect(self) -> dict:
        return self._connection_client.request(data={"connect": True})

    def disconnect(self) -> dict:
        return self._connection_client.request(data={"connect": False})

    def capture(self) -> np.ndarray:
        image = self._capture_client.request()
        if isinstance(image, list) and len(image) > 0:
            image = image[0]
        if image is None:
            raise RuntimeError("No image received from camera server.")
        return np.array(image)

    def start_video_stream(self) -> None:
        self._start_video_stream_publisher.publish(data={})

    def stop_video_stream(self) -> None:
        self._stop_video_stream_publisher.publish(data={})

    def get_parameter(self, name: str) -> Any:
        response = self._parameter_client.request(data={"mode": "get", "name": name})
        return response[0]["value"]

    def set_parameter(self, name: str, value: Any) -> Any:
        response = self._parameter_client.request(data={"mode": "set", "name": name, "value": value})
        return response[0]["value"]

    def delete(self) -> None:
        self._connection_client.delete()
        self._capture_client.delete()
        self._parameter_client.delete()
        self._start_video_stream_publisher.delete()
        self._stop_video_stream_publisher.delete()


def log_image(data: np.ndarray) -> None:
    rr.log("Single_Image_Capture", rr.Image(data))


def log_video(data: np.ndarray) -> None:
    rr.log("Continuous_Image_Capture", rr.EncodedImage(contents=data, media_type="image/jpeg"))


def main():
    rr.init("IDS_Example", spawn=True)

    camera = None
    video_subscriber = None
    try:
        name = "my_ids_camera"
        camera = IDS(name=name, serial_number="4108909352")
        base_topic = f"medulla/v1/camera/ids/IDS/{name}"

        video_subscriber = node.Subscriber(topic=f"{base_topic}/video", callback=log_video)

        logger.debug(f"AcquisitionFrameRate: {camera.get_parameter('AcquisitionFrameRate')}")
        logger.debug(f"ExposureTime: {camera.get_parameter('ExposureTime')}")
        logger.debug(f"DeviceLinkThroughputLimit: {camera.get_parameter('DeviceLinkThroughputLimit')}")

        camera.set_parameter('ExposureTime', 35000.0)
        image = camera.capture()
        log_image(image)

        camera.set_parameter('AcquisitionFrameRate', 12.0)
        camera.set_parameter('ExposureTime', 60000.0)
        camera.start_video_stream()
        time.sleep(5)
        camera.stop_video_stream()
    except KeyboardInterrupt:
        logger.info("Shutting down.")
    finally:
        if camera is not None:
            camera.delete()
        if video_subscriber is not None:
            video_subscriber.delete()
        node.SessionManager.delete()
        logger.info("Cleanup complete.")


if __name__ == "__main__":
    main()

The constructor builds the base topic path from name — this must match exactly the name given to the server-side camera instance in camera_loop.py. It then creates a node.Client for each request-response interaction (/connect, /capture, /parameter) and a node.Publisher for the fire-and-forget start_video_stream / stop_video_stream triggers.

get_parameter and set_parameter send a structured request to the /parameter topic, carrying a mode field ("get" or "set") along with the parameter name and, for writes, the value. The response carries the confirmed value back. capture calls the /capture server, unwraps the first element of the response list (the server always returns a list), and converts it to a NumPy array. If the server returns nothing, a RuntimeError is raised immediately rather than silently returning an invalid frame.

get_parameter, set_parameter, and capture can be freely interleaved with streaming — all mediated by the server over BabyROS. The video_subscriber is created before acquisition starts to ensure no frames are missed in the window between start_video_stream and the first frame arriving.

Run

Start camera_loop.py in Terminal 1 first, then:

bash
python advanced_example.py

A Rerun viewer opens showing the captured frame under Single_Image_Capture, followed by 5 seconds of video under Continuous_Image_Capture.

Where to Use the Skill

  • Distributed pipelines — keep the camera process pinned to the machine with USB access while running vision workloads on a different machine.
  • Multi-consumer streaming — multiple clients can subscribe to the same video topic without duplicating hardware connections.
  • Remote tuning — adjust exposure or frame rate from a control script without restarting the camera server.