CVB Python Multi-Camera Acquisition - Software Trigger

Hi,

I’m looking for some help with acquiring images using CVB Python from multiple cameras when using a software trigger.

Hardware, 2 USB cameras:
Alvium 1800U-158C-CH-C

There is no hardware trigger, and I’d like to keep it this way. I accept this will lead to mis-matches between the camera acquisition, but would like to still get some initial results to assess this.

System software versions:
Windows 11
Common Vision Blox Image Manager v13.04.005
Python 3.9
CVB python 1.4

When setup in GenICam Browser, the cameras are achieving 250 FPS. I have saved those settings into user sets for each camera, and saved them as configured devices.

I was sent an example that makes use of the software trigger and python threads to acquire from both cameras. The problem with this is that the triggering takes between 1 to 2 seconds, so by the time the images start acquiring I am out of sync with the time I wanted to acquire at.

import datetime
import time
import cvb
import os
import cvb.foundation
import sys
import threading
import numpy

x=200
print("set up start")
start_time = datetime.datetime.now()
device_A = cvb.DeviceFactory.open(os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=0)
deviceNodeMap_A = device_A.node_maps["Device"]
deviceStream_A = device_A.stream()
ringBufferCount_A = deviceStream_A.ring_buffer.count
deviceStream_A.ring_buffer.change_count(x, 0)
print("Current RingBuffer amount: " + str(ringBufferCount_A))

device_B = cvb.DeviceFactory.open(os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=1)
deviceNodeMap_B = device_B.node_maps["Device"]
deviceStream_B = device_B.stream()
ringBufferCount_B = deviceStream_B.ring_buffer.count
deviceStream_B.ring_buffer.change_count(x, 0)
print("Current RingBuffer amount: " + str(ringBufferCount_B))


ringBufferCount_A = deviceStream_A.ring_buffer.count
print("Changed RingBuffer amount to: " + str(ringBufferCount_A))

ringBufferCount_B = deviceStream_B.ring_buffer.count
print("Changed RingBuffer amount to: " + str(ringBufferCount_B))


triggerMode_A = deviceNodeMap_A["TriggerMode"]
oldTriggerMode_A = deviceNodeMap_A["TriggerMode"]
triggerSource_A = deviceNodeMap_A["TriggerSource"]
oldTriggerSource_A = deviceNodeMap_A["TriggerSource"]
triggerSoftware_A = deviceNodeMap_A["TriggerSoftware"]
triggerMode_A.value = "On"
triggerSource_A.value = "Software"

triggerMode_B = deviceNodeMap_B["TriggerMode"]
oldTriggerMode_B = deviceNodeMap_B["TriggerMode"]
triggerSource_B = deviceNodeMap_B["TriggerSource"]
oldTriggerSource_B = deviceNodeMap_B["TriggerSource"]
triggerSoftware_B = deviceNodeMap_B["TriggerSoftware"]
triggerMode_B.value = "On"
triggerSource_B.value = "Software"

print("Starting stream and trigger camera given amount of times")
print("Set up done: {}".format(datetime.datetime.now() - start_time))

a_images = []
b_images = []

a_times = []
b_times = []


def Stream_A():

    global a_times
    global a_images

    ctr=0
    #Start Stream
    device_A.stream().start()
    print("Triggering A")
    start_time = datetime.datetime.now()
    # Collect images to fill buffer
    while (deviceStream_A.statistics[cvb.StreamInfo.NumBuffersPending] < x):
        triggerSoftware_A.execute()
        #print("Currently locked buffers: " + str(deviceStream_A.statistics[cvb.StreamInfo.NumBuffersLocked]))
    a_count = 0
    print("Triggering A done: {}".format(datetime.datetime.now() - start_time))
    start_time = datetime.datetime.now()
    while (deviceStream_A.statistics[cvb.StreamInfo.NumBuffersPending] > 0):
        image_A = device_A.stream().wait()
        np_array = cvb.as_array(image_A[0], copy=True)
        a_image_time = datetime.datetime.now()
        a_images.append(np_array)
        a_times.append(a_image_time)
        # print("A", np_array[100, 100])
        a_count += 1
    a_time = (datetime.datetime.now() - start_time).total_seconds()

    # Stop Stream
    device_A.stream().stop()

    print("A: ", a_count, "images", a_time, "seconds")
    print("A: last image time: ", a_image_time)

    # Reset changed settings of camera
    deviceStream_A.ring_buffer.change_count(3, 0)
    triggerMode_A = oldTriggerMode_A
    triggerSource_A = oldTriggerSource_A


def Stream_B():

    global b_times
    global b_images

    ctr=0
    device_B.stream().start()
    print("Triggering B")
    start_time = datetime.datetime.now()
    while (deviceStream_B.statistics[cvb.StreamInfo.NumBuffersPending] < x):
        triggerSoftware_B.execute()
    # Process through images in buffer
    b_count = 0
    print("Triggering B done: {}".format(datetime.datetime.now() - start_time))
    start_time = datetime.datetime.now()
    while (deviceStream_B.statistics[cvb.StreamInfo.NumBuffersPending] > 0):
        image_B = device_B.stream().wait()
        np_array = cvb.as_array(image_B[0], copy=True)
        # print("B", np_array[100, 100])
        b_image_time = datetime.datetime.now()
        b_images.append(np_array)
        b_times.append(b_image_time)

        b_count += 1
    b_time = (datetime.datetime.now() - start_time).total_seconds()

    # Stop Stream
    device_B.stream().stop()

    print("B:", b_count, "images", b_time, "seconds")
    print("B: last image time: ", b_image_time)

    # Reset changed settings of camera
    deviceStream_B.ring_buffer.change_count(3, 0)
    triggerMode_B = oldTriggerMode_B
    triggerSource_B = oldTriggerSource_B


thread1 = threading.Thread(target=Stream_A)
thread1.start()

thread2 = threading.Thread(target=Stream_B)
thread2.start()


thread1.join()
thread2.join()

print(len(a_images), len(b_images))

In response to this, I tried moving the trigger within the acquisition loop (as below), but then the achieved FPS is around 90.

import datetime
import time
import cvb
import os
import cvb.foundation
import sys
import threading
import numpy

x=200
print("set up start")
start_time = datetime.datetime.now()
device_A = cvb.DeviceFactory.open(os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=0)
deviceNodeMap_A = device_A.node_maps["Device"]
deviceStream_A = device_A.stream()
ringBufferCount_A = deviceStream_A.ring_buffer.count
deviceStream_A.ring_buffer.change_count(x, 0)
print("Current RingBuffer amount: " + str(ringBufferCount_A))

device_B = cvb.DeviceFactory.open(os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=1)
deviceNodeMap_B = device_B.node_maps["Device"]
deviceStream_B = device_B.stream()
ringBufferCount_B = deviceStream_B.ring_buffer.count
deviceStream_B.ring_buffer.change_count(x, 0)
print("Current RingBuffer amount: " + str(ringBufferCount_B))


ringBufferCount_A = deviceStream_A.ring_buffer.count
print("Changed RingBuffer amount to: " + str(ringBufferCount_A))

ringBufferCount_B = deviceStream_B.ring_buffer.count
print("Changed RingBuffer amount to: " + str(ringBufferCount_B))


triggerMode_A = deviceNodeMap_A["TriggerMode"]
oldTriggerMode_A = deviceNodeMap_A["TriggerMode"]
triggerSource_A = deviceNodeMap_A["TriggerSource"]
oldTriggerSource_A = deviceNodeMap_A["TriggerSource"]
triggerSoftware_A = deviceNodeMap_A["TriggerSoftware"]
triggerMode_A.value = "On"
triggerSource_A.value = "Software"

triggerMode_B = deviceNodeMap_B["TriggerMode"]
oldTriggerMode_B = deviceNodeMap_B["TriggerMode"]
triggerSource_B = deviceNodeMap_B["TriggerSource"]
oldTriggerSource_B = deviceNodeMap_B["TriggerSource"]
triggerSoftware_B = deviceNodeMap_B["TriggerSoftware"]
triggerMode_B.value = "On"
triggerSource_B.value = "Software"

print("Starting stream and trigger camera given amount of times")
print("Set up done: {}".format(datetime.datetime.now() - start_time))

a_images = []
b_images = []

a_times = []
b_times = []


def Stream_A():

    global a_times
    global a_images

    ctr=0
    #Start Stream
    device_A.stream().start()
    print("Triggering A")
    a_count = 0
    start_time = datetime.datetime.now()
    while a_count < x:
        triggerSoftware_A.execute()
        image_A = device_A.stream().wait()
        np_array = cvb.as_array(image_A[0], copy=True)
        a_image_time = datetime.datetime.now()
        a_images.append(np_array)
        a_times.append(a_image_time)
        # print("A", np_array[100, 100])
        a_count += 1
    a_time = (datetime.datetime.now() - start_time).total_seconds()

    # Stop Stream
    device_A.stream().stop()

    print("A: ", a_count, "images", a_time, "seconds")
    print("A: last image time: ", a_image_time)

    # Reset changed settings of camera
    deviceStream_A.ring_buffer.change_count(3, 0)
    triggerMode_A = oldTriggerMode_A
    triggerSource_A = oldTriggerSource_A


def Stream_B():

    global b_times
    global b_images

    ctr=0
    device_B.stream().start()
    print("Triggering B")
    b_count = 0
    start_time = datetime.datetime.now()
    while b_count < x:
        triggerSoftware_B.execute()
        image_B = device_B.stream().wait()
        np_array = cvb.as_array(image_B[0], copy=True)
        # print("B", np_array[100, 100])
        b_image_time = datetime.datetime.now()
        b_images.append(np_array)
        b_times.append(b_image_time)

        b_count += 1
    b_time = (datetime.datetime.now() - start_time).total_seconds()

    # Stop Stream
    device_B.stream().stop()

    print("B:", b_count, "images", b_time, "seconds")
    print("B: last image time: ", b_image_time)

    # Reset changed settings of camera
    deviceStream_B.ring_buffer.change_count(3, 0)
    triggerMode_B = oldTriggerMode_B
    triggerSource_B = oldTriggerSource_B


thread1 = threading.Thread(target=Stream_A)
thread1.start()

thread2 = threading.Thread(target=Stream_B)
thread2.start()


thread1.join()
thread2.join()

print(len(a_images), len(b_images))

I’d like to increase the achieved FPS as close to 250 as possible. Is there something I’m missing with the software trigger? Or can what I want be achieved another way?

Thanks,

Alex

Hi Alex,

Thank you for giving it a try with CVB. May I ask you some questions?

When setup in GenICam Browser, the cameras are achieving 250 FPS.

Q1. When the frame rate is observed, is the device driven by a software trigger? I guess the answer is no but let me know the reality.

The problem with this is that the triggering takes between 1 to 2 seconds

Q2. Could you tell me how you measured the delay, please?

Q3. I’ve confirmed you prepared 200 buffers for the measurement. Is that mandatory for your real application?

I’ll be looking forward to hearing from you.

Hi Kazunari,

Thanks for getting back to me.

Q1: Below are the acquisition settings in GenICam Browser, so continuous mode from what I can see:

Screenshot 2022-08-31 165528

Q2: The delay was measured as in the code examples, using Python to determine how long the software trigger preparation took. Forgive my ignorance, but I am a bit confused why I would send 200 triggers and then start acquiring? In other camera software I’ve used I send one trigger, then acquire multiple images after that?

Q3: My real application consists of a PC with two cameras directly connected to it. The PC will receive a command to start image acquisition, which I then want to start through CVB Python within the application I am building. I want to record images at as high a frame rate as possible for 1 to 2 seconds, so will need to acquire around 200 - 400 images from each camera.

If there is another way, then I am open to it. The software trigger example above came from Stemmer support.

Thanks,

Alex

Hi Alex,

Thank you for the reply and you are very welcome.

Below are the acquisition settings in GenICam Browser, so continuous mode from what I can see

In general, if a camera runs with its own trigger then the frame rate reaches its maximum. If the trigger comes from outside, it will need to know when the camera can accept the trigger for the next sensor exposure. It means the host where the program runs will need to manage the timing. Maybe the topic should be discussed further sometime later; that truly is a topic about tuning for performance.

I want to record images at as high a frame rate as possible for 1 to 2 seconds, so will need to acquire around 200 - 400 images from each camera.

Okay, understood. So it may be reasonable to have 200 buffers. I’m fine with that. By the way, please remind that the copy=True option may be redundant depending on the application, and it may affect the performance of the copying process; it will be prominent if the data amount gets larger.

np_array = cvb.as_array(image_B[0], copy=True)

The copy parameter is False by default: Please consider leaving it as is as long as it is fine with your application.

The software trigger example above came from Stemmer support.

Please excuse me! :slight_smile: It is not always closely in sync with other colleagues, to be honest. As a fact, I just found your post and decided to reply. Anyway, I hope you understand that we try to manage ourselves to prevent making confuse you.

Hi again,

I have just prepared a simplified version of your benchmarking script:

import datetime
import cvb
import os
import cvb.foundation
import threading
import logging
from typing import List, Dict, Any
from enum import IntEnum


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


def Stream_(tag: str, device: cvb.Device, node_map, nr_images: int,
            result: Result, is_trigger_available: bool = True) -> None:
    logging.info("streaming started: device #{}".format(tag))
    device.stream().start()

    if not is_trigger_available:
        logging.info("triggering is omitted; device #{}".format(tag))
    else:
        trigger_software = node_map["TriggerSoftware"]
        count = 0
        logging.info("triggering started: device #{}".format(tag))
        base = datetime.datetime.now()
        while count < nr_images:
            trigger_software.execute()
            count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed: device #{}; {}".format(tag, elapsed_time))

    logging.info("acquisition started: device #{}".format(tag))
    count = 0
    base = datetime.datetime.now()
    while count < nr_images:
        image, _ = device.stream().wait()
        count += 1
        result.images.append(image)
    elapsed_time = get_time_diff(base)
    result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
    logging.info("acquisition completed: {}; {} sec for {} images".format(
        tag, elapsed_time, nr_images))

    device.stream().stop()
    logging.info("streaming stopped: device #{}".format(tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        feature_value_dict[name] = node_map_[name].value
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        node_map_[name].value = feature_value_dict[name]


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        node_map_[name] = feature_value_dict[name]


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)

    nr_devices = 2
    #is_trigger_available = False
    is_trigger_available = True
    nr_images_ = 200
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: device #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images_, 0)
        logging.info("nr buffers: device #{}; {}".format(n, streams[n].ring_buffer.count))

        if is_trigger_available:
            feature_names = ["TriggerMode", "TriggerSource"]
            feature_snapshot_list.append(
                store_features(node_maps[n], feature_names))
            feature_value_pairs = {
                "TriggerMode": "On", "TriggerSource": "TriggerSoftware"}
            setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("set up completed: device #{}; {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    threads = []
    for n in range(nr_devices):
        threads.append(threading.Thread(
            target=Stream_(tag=str(n), device=devices[n], node_map=node_maps[n],
                           result=results[n], nr_images=nr_images_,
                           is_trigger_available=is_trigger_available)))

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    if is_trigger_available:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    for n in range(nr_devices):
        logging.info(
            "images: device #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("device #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))

Please prepare two devices and run the script. You will see the summary at the bottom of the message console; specifically, there you will see the time consumed to set up a camera and the time consumed to acquire the required number of images.

INFO:root:#### Summary ####
INFO:root:device #0
INFO:root:    Setup: [0.900323]
INFO:root:    Triggering: []
INFO:root:    Acquisition: [12.465828]
INFO:root:device #1
INFO:root:    Setup: [0.881077]
INFO:root:    Triggering: []
INFO:root:    Acquisition: [12.928772]

In the example above, please note that the camera was the one that does not support external triggers. Hense I just let the camera run by its internal trigger.

When we look at the result, the acquisition is taking about 12 sec: it consists of every cost per image (60ms) * 200 images. And, as I mentioned, the camera does not support external triggers so we have no result in the “Triggering” entry.

By the way, again, I need to clearly admit that I have not tested a camera that supports external triggers including software triggers because I only have a mock/simulator; however, as far as I communicate with you I believe you can fix it by yourself when needed.

Again, you are always welcome; please feel free to come back anytime.

Hi Kazunari,

Thanks for sharing the example, definitely along the right lines. Unfortunately I am running into issues.

With is_trigger_available = True

And is_trigger_available = False
trigger false

Any further thoughts?

Thanks,

Alex

Hi Alex,

here’s an updated version of the script:

import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any

import cvb
import cvb.foundation


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


def worker(tag: str, device: cvb.Device, node_map, nr_images: int,
           result: Result, is_trigger_available: bool = True) -> None:
    logging.info("streaming started: device #{}".format(tag))
    device.stream().start()

    if not is_trigger_available:
        logging.info("triggering is omitted; device #{}".format(tag))
    else:
        trigger_software = node_map["TriggerSoftware"]  # type: cvb.CommandNode
        count = 0
        logging.info("triggering started: device #{}".format(tag))
        base = datetime.datetime.now()
        while count < nr_images:
            trigger_software.execute()
            while not trigger_software.is_done:
                time.sleep(.001)
            count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed: device #{}; {}".format(tag, elapsed_time))

    logging.info("acquisition started: device #{}".format(tag))
    count = 0
    base = datetime.datetime.now()
    while count < nr_images:
        image, _ = device.stream().wait()
        count += 1
        result.images.append(image)
    elapsed_time = get_time_diff(base)
    result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
    logging.info("acquisition completed: {}; {} sec for {} images".format(
        tag, elapsed_time, nr_images))

    device.stream().stop()
    logging.info("streaming stopped: device #{}".format(tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        feature_value_dict[name] = node_map_[name].value
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        node_map_[name].value = feature_value_dict[name]


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        node_map_[name].from_string(feature_value_dict[name])


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--nr-devices", type=int, default=2)
    parser.add_argument("-i", "--nr-images", type=int, default=200)
    parser.add_argument("-m", "--is-mock", action='store_true')
    return parser


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)

    parser = create_parser()
    args = parser.parse_args()

    nr_devices = args.nr_devices
    logging.info("nr. devices: {}".format(nr_devices))
    is_ext_trigger = False if args.is_mock else True
    nr_images = args.nr_images
    logging.info("nr. images: {}".format(nr_images))
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: device #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images, 0)
        logging.info("nr buffers: device #{}; {}".format(n, streams[n].ring_buffer.count))

        feature_value_pairs = {
            "TriggerMode": "On", "TriggerSource": "TriggerSoftware"} if \
            is_ext_trigger else {"GevInterfaceSelector": "0"}

        feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
        setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("set up completed: device #{}; {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    threads = []
    for n in range(nr_devices):
        threads.append(threading.Thread(
            target=worker(tag=str(n), device=devices[n], node_map=node_maps[n],
                          result=results[n], nr_images=nr_images,
                          is_trigger_available=is_ext_trigger)))

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    if is_ext_trigger:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    for n in range(nr_devices):
        logging.info(
            "images: device #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("device #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))

Again, I couldn’t have a chance to debug the code with any devices that support external triggers. I know that’s awkward but due to this reason, the script may not run. Please excuse me!

Hi,

Thanks again for coming back with a revised script.

I made the following minor change, as this was the line causing issue. Checking the logging information I can confirm that the trigger mode is on, and the source is “Software”.

def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        # node_map_[name].from_string(feature_value_dict[name])
        logging.info(node_map_[name])

This enabled the script to proceed, but then it got stuck and wouldn’t continue. Output below:

Screenshot 2022-09-05 161743

Looks similar to the other examples, where the cameras seem to freeze after setup and then not acquire.

Thanks,

Alex

Hi Alex,

Thank you for the feedback. I appreciate that.

I made the following minor change, as this was the line causing issue. Checking the logging information I can confirm that the trigger mode is on, and the source is “Software”.

Could you show me the traceback you get while the line is enabled, please?

Hi,

Thanks,

Alex

Hi Alex,

Thank you for your patience. The issue was from the wrong value that I typed; the correct value must be Software.

By the way, I had been blindly following the proposed script and had extended it but I think the design is not the one you really want to achieve.

I have prepared a revised version for you (including the fix for the trigger source). The main differences are listed as follows:

  • It introduces the concurrency that your application really needs.
  • It separates the proposed thread worker into the triggering section and the acquiring section.
  • It waits for an image by calling wait_for instead of wait.

I hope you get a chance to try it running.

import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any

import cvb
import cvb.foundation


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


class TriggeringThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, node_map,
                                  nr_images: int, result: Result,
                                  is_trigger_available: bool = True,
                                  time_span: float = 0.005):
        super().__init__()
        self._tag = tag
        self._time_span = time_span
        self._device = device
        self._node_map = node_map
        self._is_trigger_available = is_trigger_available
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("triggering started: device #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        self._device.stream().start()
        if not self._is_trigger_available:
            pass
        else:
            trigger_software = self._node_map["TriggerSoftware"]  # type: cvb.CommandNode
            logging.info("triggering started: device #{}".format(self._tag))
            while count < nr_images:
                while not trigger_software.is_done:
                    time.sleep(self._time_span)
                trigger_software.execute()
                count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed: dev. #{}; {}".format(self._tag, elapsed_time))


class AcquisitionThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, nr_images: int,
                            result: Result, time_span_1: int = 5,
                            time_span_2: float = 0.001):
        super().__init__()
        self._tag = tag
        self.time_span_1 = time_span_1
        self._time_span_2 = time_span_2
        self._device = device
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("acquisition started: dev. #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        while count < nr_images:
            logging.info("waiting: dev. #{}".format(self._tag))
            image, status = self._device.stream().wait_for(self.time_span_1)
            logging.info(
                "wait completed; device: #{}, status: {}".format(self._tag, status))
            if status == cvb.WaitStatus.Ok:
                count += 1
                self._result.images.append(image)
            time.sleep(self._time_span_2)
        elapsed_time = get_time_diff(base)
        self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
        logging.info("acquisition completed: {}; {} sec for {} images".format(
            self._tag, elapsed_time, nr_images))

        self._device.stream().stop()
        logging.info("streaming stopped: dev. #{}".format(self._tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        logging.info("preserving; node: {}, value: {}".format(
            name, node_map_[name].value))
        feature_value_dict[name] = node_map_[name].value
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        logging.info("restoring; node: {}, value: {}".format(
            name, feature_value_dict[name]))
        node_map_[name].value = feature_value_dict[name]


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        logging.info("setting; node: {}, value: {}".format(
            name, feature_value_dict[name]))
        node_map_[name].from_string(feature_value_dict[name])


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--nr-devices", type=int, default=2)
    parser.add_argument("-i", "--nr-images", type=int, default=200)
    parser.add_argument("-m", "--is-mock", action='store_true')
    parser.add_argument("-t", "--time-span", type=int, default=5)
    return parser


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)

    parser = create_parser()
    args = parser.parse_args()

    nr_devices = args.nr_devices
    logging.info("nr. devices: {}".format(nr_devices))
    is_ext_trigger = False if args.is_mock else True
    nr_images = args.nr_images
    logging.info("nr. images: {}".format(nr_images))
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: dev. #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images, 0)
        logging.info("nr buffers: dev. #{}; {}".format(n, streams[n].ring_buffer.count))

        feature_value_pairs = {
            "TriggerMode": "On", "TriggerSource": "Software"} if \
            is_ext_trigger else {"GevInterfaceSelector": "0"}

        feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
        setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("setup completed: dev. #{}; {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    triggering_threads = []
    for n in range(nr_devices):
        triggering_threads.append(TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n], result=results[n], nr_images=nr_images, is_trigger_available=is_ext_trigger))

    threads_list = [
        [TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
                          result=results[n], nr_images=nr_images,
                          is_trigger_available=is_ext_trigger) for n in
         range(nr_devices)],
        [AcquisitionThread(tag=str(n), device=devices[n],
                           result=results[n], nr_images=nr_images,
                           time_span_1=args.time_span) for n in
         range(nr_devices)]
    ]

    for threads in threads_list:
        for n, t in enumerate(threads):
            logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
            t.start()
            logging.info("started threading; dev. #{}".format(n))

        for n, t in enumerate(threads):
            logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
            t.join()
            logging.info("threading done; dev. #{}, t: {}".format(n, t))

    if is_ext_trigger:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    """
    for n in range(nr_devices):
        logging.info(
            "images: dev. #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))
    """

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("dev. #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))

Hi,

Thanks for this - I am very appreciative of your efforts and trying different approaches to help solve this issue with multi USB camera acquisition without a hardware trigger.

I had to make a couple of small changes. I added a check for failed images to make sure that the loop didn’t acquire indefinitely, and also commented out the logging in the loop to make sure that it wasn’t affecting timings. Code below:

import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any

import cvb
import cvb.foundation


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


class TriggeringThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, node_map,
                                  nr_images: int, result: Result,
                                  is_trigger_available: bool = True,
                                  time_span: float = 0.005):
        super().__init__()
        self._tag = tag
        self._time_span = time_span
        self._device = device
        self._node_map = node_map
        self._is_trigger_available = is_trigger_available
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("triggering started: device #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        self._device.stream().start()
        if not self._is_trigger_available:
            pass
        else:
            trigger_software = self._node_map["TriggerSoftware"]  # type: cvb.CommandNode
            logging.info("triggering started: device #{}".format(self._tag))
            while count < nr_images:
                while not trigger_software.is_done:
                    time.sleep(self._time_span)
                trigger_software.execute()
                count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed: dev. #{}; {}".format(self._tag, elapsed_time))


class AcquisitionThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, nr_images: int,
                            result: Result, time_span_1: int = 5,
                            time_span_2: float = 0.001):
        super().__init__()
        self._tag = tag
        self.time_span_1 = time_span_1
        self._time_span_2 = time_span_2
        self._device = device
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("acquisition started: dev. #{}".format(self._tag))
        count = 0
        fails = 0
        base = datetime.datetime.now()
        while count < nr_images and fails < nr_images:
            # logging.info("waiting: dev. #{}".format(self._tag))
            image, status = self._device.stream().wait_for(self.time_span_1)
            # logging.info(
            #     "wait completed; device: #{}, status: {}".format(self._tag, status))
            if status == cvb.WaitStatus.Ok:
                count += 1
                self._result.images.append(image)
            else:
                fails += 1
            time.sleep(self._time_span_2)
        elapsed_time = get_time_diff(base)
        self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
        logging.info("acquisition completed: {}; {} sec for {} images, {} failures".format(
            self._tag, elapsed_time, count, fails))

        self._device.stream().stop()
        logging.info("streaming stopped: dev. #{}".format(self._tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        logging.info("preserving; node: {}, value: {}".format(
            name, node_map_[name].value))
        feature_value_dict[name] = node_map_[name].value
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        logging.info("restoring; node: {}, value: {}".format(
            name, feature_value_dict[name]))
        node_map_[name].value = feature_value_dict[name]


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        logging.info("setting; node: {}, value: {}".format(
            name, feature_value_dict[name]))
        node_map_[name].from_string(feature_value_dict[name])


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--nr-devices", type=int, default=2)
    parser.add_argument("-i", "--nr-images", type=int, default=200)
    parser.add_argument("-m", "--is-mock", action='store_true')
    parser.add_argument("-t", "--time-span", type=int, default=5)
    return parser


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)

    parser = create_parser()
    args = parser.parse_args()

    nr_devices = args.nr_devices
    logging.info("nr. devices: {}".format(nr_devices))
    is_ext_trigger = False if args.is_mock else True
    nr_images = args.nr_images
    logging.info("nr. images: {}".format(nr_images))
    logging.info("is mock: {}".format(args.is_mock))
    logging.info("time span: {}".format(args.time_span))
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: dev. #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images, 0)
        logging.info("nr buffers: dev. #{}; {}".format(n, streams[n].ring_buffer.count))

        feature_value_pairs = {
            "TriggerMode": "On", "TriggerSource": "Software"} if \
            is_ext_trigger else {"GevInterfaceSelector": "0"}

        feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
        setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("setup completed: dev. #{}; {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    triggering_threads = []
    for n in range(nr_devices):
        triggering_threads.append(TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n], result=results[n], nr_images=nr_images, is_trigger_available=is_ext_trigger))

    threads_list = [
        [TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
                          result=results[n], nr_images=nr_images,
                          is_trigger_available=is_ext_trigger) for n in
         range(nr_devices)],
        [AcquisitionThread(tag=str(n), device=devices[n],
                           result=results[n], nr_images=nr_images,
                           time_span_1=args.time_span) for n in
         range(nr_devices)]
    ]

    for threads in threads_list:
        for n, t in enumerate(threads):
            logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
            t.start()
            logging.info("started threading; dev. #{}".format(n))

        for n, t in enumerate(threads):
            logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
            t.join()
            logging.info("threading done; dev. #{}, t: {}".format(n, t))

    if is_ext_trigger:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    """
    for n in range(nr_devices):
        logging.info(
            "images: dev. #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))
    """

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("dev. #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))

This gave the results below:

So it seems that the acquisition is around 30 images a second (counting the high number of unsuccessful image acquisitions).

Do you think there is any scope for this to improve? Or are we getting towards the limits of what is achievable with USB cameras connected straight into a PC?

For reference, here is some pseudo-code that I was able to use in another camera software a few years ago (this is what influenced my assumption that the same would be achievable with CVB Python). It looks similar to the MultiStreamHandler, but we’ve previously seen that that seems not to work with the USB camera setup.

cameras = CameraArray(2)
cameras.StartGrabbing()
while cameras.IsGrabbing():
    grab_result_1 = cameras[0].RetrieveResult(timeout_ms)
    grab_result_2 = cameras[1].RetrieveResult(timeout_ms)
    if grab_result_1.GrabSucceeded() & grab_result_2.GrabSucceeded():
        logging.debug("Both grabs succeeded")
        im1 = grab_result_1.GetArray()
        im2 = grab_result_2.GetArray()
        if time_elapsed > 2:
            break
cameras.Close()

Thanks,

Alex

Hi Alex,

Thank you for the update, and thank YOU; I know this kind of session requires your patience. I hope we can support you until your application satisfies the requirements.

Do you think there is any scope for this to improve?

I need to point out that the failure you counted is not a failure in reality. In the originally proposed script, the wati_for method is called periodically with a certain amount of sleep time; it means a method call may be time out if the buffer is not ready to be delivered. If you increase the sleep time amount then every single method call will return an image with success as the status. Of course, you can call the wait method instead but it will wait forever so you need to be sure that an image will be delivered anyway.

However, I don’t understand why it is taking 7 seconds to acquire the ready-to-be-delivered 15 images; the images must be in the allocated buffers when we came back from the triggering thread. So I had expected that we should complete the acquisition thread quite quickly.

Could you tell me the condition, please? I would like to know the following parameters:

  • TriggerMode; I believe it should be “On” but let’s make sure.
  • ExposureTime; it should give us a chance to evaluate the elapsed time on the triggering thread.

Hi,

TriggerMode is “On”.

Exposure Time is 3343.178 ms.

Thanks,

Alex

Hi Alex,

Thank you for the reply.

Exposure Time is 3343.178 ms.

Do you mean it’s about 3 seconds? No typo?

Sorry, amateur hour typo! I meant to type micro seconds, so actually 3 ms.

No problem. Could you replace the following line at line 82:

while count < nr_images and fails < nr_images:

with:

while count < nr_images:

and show me the output, please? To be honest, I do not yet understand why the acquisition thread is taking 7 sec for 15 images. The images should be present in the prepared buffers when we call the wait_for method and the images should be immediately delivered by the wait_for call.

Let’s check that out.

Hi Kazunari,

When I tried that originally, the thread was within that loop for a long time (I gave up after around 60 seconds), as there weren’t enough successful images being acquired for it to get out of the while statement.

Thanks,

Alex

Hi again,

In the meantime I’ve been persevering with the threaded example in the original post. I have integrated this into my main Python application to capture data.

I’ve noticed that quite often (usually after around 3 or 4 successful recordings), one of the devices will freeze/lock up on the call to stream.wait().

Is there a way of calling this function that will error after a certain timeframe? As at the moment when this happens I cannot close the stream, or the device and the only solution is to restart my entire application.

Thanks,

Alex

Just in case anyone else bumps into this, it was actually an issue with “device_A.stream().stop()”. This was sometimes freezing. Removing that line fixed the issue. “try_abort” also has a similar problem. In the end I just call “device.close()”.

The other issue regarding the low frame rate still persists.