IDS: Advanced BabyROS Usage
SUMMARY
Combine single-image capture and live video streaming in one script using a thin client-side IDS helper class that communicates with the camera server entirely over BabyROS — no direct hardware access required.
The camera server (camera_loop.py) must be running first — see Publish Live Video with BabyROS for setup instructions.
Example
Code
"""
Capture a single image and stream live video over BabyROS.
"""
from typing import Any
import time
from loguru import logger
import rerun as rr
import numpy as np
from babyros import node
class IDS:
"""Helper class for communicating with the IDS camera over BabyROS."""
def __init__(self, name: str, serial_number: str) -> None:
self._name = name
self._serial_number = serial_number
self._base_topic = "medulla/v1/camera/ids/IDS/" + self._name
self._connection_client = node.Client(topic=self._base_topic + "/connect")
self._capture_client = node.Client(topic=f"{self._base_topic}/capture")
self._parameter_client = node.Client(topic=f"{self._base_topic}/parameter")
self._start_video_stream_publisher = node.Publisher(topic=f"{self._base_topic}/start_video_stream")
self._stop_video_stream_publisher = node.Publisher(topic=f"{self._base_topic}/stop_video_stream")
def connect(self) -> dict:
return self._connection_client.request(data={"connect": True})
def disconnect(self) -> dict:
return self._connection_client.request(data={"connect": False})
def capture(self) -> np.ndarray:
image = self._capture_client.request()
if isinstance(image, list) and len(image) > 0:
image = image[0]
if image is None:
raise RuntimeError("No image received from camera server.")
return np.array(image)
def start_video_stream(self) -> None:
self._start_video_stream_publisher.publish(data={})
def stop_video_stream(self) -> None:
self._stop_video_stream_publisher.publish(data={})
def get_parameter(self, name: str) -> Any:
response = self._parameter_client.request(data={"mode": "get", "name": name})
return response[0]["value"]
def set_parameter(self, name: str, value: Any) -> Any:
response = self._parameter_client.request(data={"mode": "set", "name": name, "value": value})
return response[0]["value"]
def delete(self) -> None:
self._connection_client.delete()
self._capture_client.delete()
self._parameter_client.delete()
self._start_video_stream_publisher.delete()
self._stop_video_stream_publisher.delete()
def log_image(data: np.ndarray) -> None:
rr.log("Single_Image_Capture", rr.Image(data))
def log_video(data: np.ndarray) -> None:
rr.log("Continuous_Image_Capture", rr.EncodedImage(contents=data, media_type="image/jpeg"))
def main():
rr.init("IDS_Example", spawn=True)
camera = None
video_subscriber = None
try:
name = "my_ids_camera"
camera = IDS(name=name, serial_number="4108909352")
base_topic = f"medulla/v1/camera/ids/IDS/{name}"
video_subscriber = node.Subscriber(
topic=f"{base_topic}/video",
callback=log_video,
)
logger.debug(f"AcquisitionFrameRate: {camera.get_parameter('AcquisitionFrameRate')}")
logger.debug(f"ExposureTime: {camera.get_parameter('ExposureTime')}")
logger.debug(f"DeviceLinkThroughputLimit: {camera.get_parameter('DeviceLinkThroughputLimit')}")
camera.set_parameter('ExposureTime', 35000.0)
image = camera.capture()
log_image(image)
camera.set_parameter('AcquisitionFrameRate', 12.0)
camera.set_parameter('ExposureTime', 60000.0)
camera.start_video_stream()
time.sleep(5)
camera.stop_video_stream()
except KeyboardInterrupt:
logger.info("Shutting down.")
finally:
if camera is not None:
camera.delete()
if video_subscriber is not None:
video_subscriber.delete()
node.SessionManager.delete()
logger.info("Cleanup complete.")
if __name__ == "__main__":
main()Explanation
Now, let’s break down the code piece by piece.
The client-side IDS helper class wraps every BabyROS node needed to communicate with the camera server. Its constructor builds the base topic path from name — this must match exactly the name given to the server-side IDS instance in camera_loop.py. It then creates a node.Client for each request-response interaction and a node.Publisher for fire-and-forget triggers.
self._base_topic = "medulla/v1/camera/ids/IDS/" + self._name
self._connection_client = node.Client(topic=self._base_topic + "/connect")
self._capture_client = node.Client(topic=f"{self._base_topic}/capture")
self._parameter_client = node.Client(topic=f"{self._base_topic}/parameter")
self._start_video_stream_publisher = node.Publisher(topic=f"{self._base_topic}/start_video_stream")
self._stop_video_stream_publisher = node.Publisher(topic=f"{self._base_topic}/stop_video_stream")get_parameter and set_parameter send a structured request to the /parameter topic, carrying a mode field ("get" or "set") along with the parameter name and, for writes, the value. The response carries the confirmed value back.
def get_parameter(self, name: str) -> Any:
response = self._parameter_client.request(data={"mode": "get", "name": name})
return response[0]["value"]
def set_parameter(self, name: str, value: Any) -> Any:
response = self._parameter_client.request(data={"mode": "set", "name": name, "value": value})
return response[0]["value"]capture calls the /capture server, unwraps the first element of the response list (the server always returns a list), and converts it to a NumPy array. If the server returns nothing, a RuntimeError is raised immediately rather than silently returning an invalid frame.
def capture(self) -> np.ndarray:
image = self._capture_client.request()
if isinstance(image, list) and len(image) > 0:
image = image[0]
if image is None:
raise RuntimeError("No image received from camera server.")
return np.array(image)In main, all three camera parameters are read upfront to confirm the server's current state before any changes are made. Exposure is then set for the single-image capture, the frame is taken and logged. This demonstrates that get_parameter, set_parameter, and capture can be freely interleaved — all mediated by the server over BabyROS.
camera.set_parameter('ExposureTime', 35000.0)
image = camera.capture()
log_image(image)The parameters are updated to different values before starting the video stream for 5 seconds. The video_subscriber is created before acquisition starts to ensure no frames are missed in the window between the start_video_stream call and the first frame arriving.
camera.set_parameter('AcquisitionFrameRate', 12.0)
camera.set_parameter('ExposureTime', 60000.0)
camera.start_video_stream()
time.sleep(5)
camera.stop_video_stream()The finally block deletes the client-side IDS helper, the subscriber, and closes the BabyROS session regardless of how the script exits.
finally:
if camera is not None:
camera.delete()
if video_subscriber is not None:
video_subscriber.delete()
node.SessionManager.delete()Run
Start camera_loop.py in Terminal 1 as described in Publish Live Video with BabyROS, then:
python advanced_example.pyA Rerun viewer opens showing the captured frame under Single_Image_Capture, followed by 5 seconds of video under Continuous_Image_Capture.

