CVB Python Multi-Camera Acquisition - Software Trigger

Hi Alex,

I have just prepared an updated version of the test script:

import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any

import cvb
import cvb.foundation


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


class TriggeringThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, node_map,
                 nr_images: int, result: Result,
                 is_trigger_available: bool = True, time_span: float = 0.005,
                 exposure_time = None):
        super().__init__()
        self._tag = tag
        self._time_span = exposure_time * 10e-6 if exposure_time else time_span
        self._device = device
        self._node_map = node_map
        self._is_trigger_available = is_trigger_available
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("triggering started: device #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        self._device.stream().start()
        if not self._is_trigger_available:
            pass
        else:

            trigger_software = self._node_map["TriggerSoftware"]  # type: cvb.CommandNode
            logging.info("triggering started: device #{}".format(self._tag))
            while count < nr_images:
                while not trigger_software.is_done:
                    time.sleep(self._time_span)
                trigger_software.execute()
                count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed; dev. #{}: {}".format(self._tag, elapsed_time))
        logging.info("nr buffers pending; dev. #{}: {}".format(
            self._tag,
            int(self._device.stream().statistics[cvb.StreamInfo.NumBuffersPending])))


class AcquisitionThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, nr_images: int,
                            result: Result, time_span_1: int = 5,
                            time_span_2: float = 0.001):
        super().__init__()
        self._tag = tag
        self.time_span_1 = time_span_1
        self._time_span_2 = time_span_2
        self._device = device
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("acquisition started: dev. #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        while count < nr_images:
            image, status = self._device.stream().wait_for(self.time_span_1)
            logging.info(
                "wait completed; dev. #{}: status: {}".format(self._tag, status))
            if status == cvb.WaitStatus.Ok:
                logging.info(
                    "acquired an image; dev. #{}: image #{}".format(self._tag, count))
                count += 1
                self._result.images.append(image)
            time.sleep(self._time_span_2)
        elapsed_time = get_time_diff(base)
        self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
        logging.info("acquisition completed; dev. #{}: {} sec for {} images".format(
            self._tag, elapsed_time, nr_images))

        self._device.stream().abort()
        logging.info("streaming stopped: dev. #{}".format(self._tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        try:
            logging.info("preserving; node: {}, value: {}".format(
                name, node_map_[name].value))
            feature_value_dict[name] = node_map_[name].value
        except IndexError:
            logging.info("{}: not supported".format(name))
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        try:
            logging.info("restoring; node: {}, value: {}".format(
                name, feature_value_dict[name]))
            node_map_[name].value = feature_value_dict[name]
        except IndexError:
            logging.info("{}: not supported".format(name))


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        try:
            logging.info("setting; node: {}, value: {}".format(
                name, feature_value_dict[name]))
            node_map_[name].from_string(feature_value_dict[name])
        except IndexError:
            logging.info("{}: not supported".format(name))


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--nr-devices", type=int, default=2)
    parser.add_argument("-i", "--nr-images", type=int, default=200)
    parser.add_argument("-m", "--is-mock", action='store_true')
    parser.add_argument("-t", "--time-span", type=int, default=5)
    return parser


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO,
                        format='%(relativeCreated)d: %(module)s: %(levelname)s: %(message)s')

    parser = create_parser()
    args = parser.parse_args()

    nr_devices = args.nr_devices
    logging.info("nr. devices: {}".format(nr_devices))
    is_ext_trigger = False if args.is_mock else True
    nr_images = args.nr_images
    logging.info("nr. images: {}".format(nr_images))
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    exposure_time = 3343

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: dev. #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images, 0)
        logging.info("nr buffers; dev. #{}: {}".format(n, streams[n].ring_buffer.count))

        feature_value_pairs = {
            "TriggerMode": "On", "TriggerSource": "Software"} if \
            is_ext_trigger else {"GevInterfaceSelector": "0"}
        feature_value_pairs["ExposureTime"] = str(exposure_time)

        feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
        setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("setup completed; dev. #{}: {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    threads_list = [
        [TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
                          result=results[n], nr_images=nr_images,
                          is_trigger_available=is_ext_trigger,
                          exposure_time=exposure_time) for n in
         range(nr_devices)],
        [AcquisitionThread(tag=str(n), device=devices[n],
                           result=results[n], nr_images=nr_images,
                           time_span_1=args.time_span) for n in
         range(nr_devices)]
    ]

    for threads in threads_list:
        for n, t in enumerate(threads):
            logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
            t.start()
            logging.info("started threading; dev. #{}".format(n))

        for n, t in enumerate(threads):
            logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
            t.join()
            logging.info("threading done; dev. #{}, t: {}".format(n, t))

    if is_ext_trigger:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    """
    for n in range(nr_devices):
        logging.info(
            "images: dev. #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))
    """

    for d in devices:
        d.close()

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("dev. #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))

So far, we are not yet certain if the device is triggered as expected even though we’ve been checking the completion by calling the is_done method.

A possible case where the wait call or the wait_for call slip into the infinite loop: Even though we execute triggering the device certain times but the device drops some of them; it eventually leads to the situation where the images are less than our expectation.

To check the reality, I have added a line that printouts the number of pending buffers that are ready to be retrieved right after the triggering process.

In addition, for a case where the device’s IsDone method is not synced with the real behavior, I have added an intentional sleep that is equal to the exposure time so that we can be sure the device is ready to accept the next trigger. This sleep can be redundant from the performance point of view but the performance should be another subject to be discussed later.

Thank you again for your patience and cooperation.

Hi,

Please see attached logs, I had to add the same fails check so that it didn’t get stuck in the loop.



Thanks,

Alex

Hi Alex,

Thank you for the update. I appreciate that.

nr buffers pending; dev. #0: 10

The line tells us that the number of the pending buffer is fewer than the expectation; we were trying to trigger the device 200 times but the acquired images are just 10.

And then, even though the acquired images are fewer, the program goes out of the triggering block; it means the is_done from the device had kept returning True at least 200 times.

From these observations, we would deduce the is_done from the device is not linked to the imaging sensor controller block; if the device is kind enough, these entities should be linked. In such a case, a possible solution for you would be to blindly wait for a certain amount of time, which is based on the defined timing chart that the device manufacturer provides, until triggering the device.

As a quick experiment, I have prepared an updated version of the script; in the script, we will wait for a duration that is double the exposure time:

import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any

import cvb
import cvb.foundation


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


class TriggeringThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, node_map,
                 nr_images: int, result: Result,
                 is_trigger_available: bool = True, time_span: float = 0.005,
                 exposure_time = None):
        super().__init__()
        self._tag = tag
        self._ratio = 2.0
        self._time_span = exposure_time * 10e-6 * self._ratio if exposure_time else time_span
        self._device = device
        self._node_map = node_map
        self._is_trigger_available = is_trigger_available
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("triggering started; dev. #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        self._device.stream().start()
        if not self._is_trigger_available:
            pass
        else:
            trigger_software = self._node_map["TriggerSoftware"]  # type: cvb.CommandNode
            while count < nr_images:
                time.sleep(self._time_span)
                logging.info("TriggerSoftware.is_done; dev. #{}: {}".format(self._tag, trigger_software.is_done))
                trigger_software.execute()
                logging.info("triggered dev. #{}; counter: {}".format(self._tag, count))
                count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed; dev. #{}: {}".format(self._tag, elapsed_time))
        logging.info("nr buffers pending; dev. #{}: {}".format(
            self._tag,
            int(self._device.stream().statistics[cvb.StreamInfo.NumBuffersPending])))


class AcquisitionThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, nr_images: int,
                            result: Result, time_span_1: int = 5,
                            time_span_2: float = 0.001):
        super().__init__()
        self._tag = tag
        self.time_span_1 = time_span_1
        self._time_span_2 = time_span_2
        self._device = device
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("acquisition started: dev. #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        while count < nr_images:
            image, status = self._device.stream().wait_for(self.time_span_1)
            logging.info(
                "wait completed; dev. #{}: status: {}".format(self._tag, status))
            if status == cvb.WaitStatus.Ok:
                logging.info(
                    "acquired an image; dev. #{}: image #{}".format(self._tag, count))
                count += 1
                self._result.images.append(image)
            time.sleep(self._time_span_2)
        elapsed_time = get_time_diff(base)
        self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
        logging.info("acquisition completed; dev. #{}: {} sec for {} images".format(
            self._tag, elapsed_time, nr_images))

        self._device.stream().abort()
        logging.info("streaming stopped: dev. #{}".format(self._tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        try:
            logging.info("preserving; node: {}, value: {}".format(
                name, node_map_[name].value))
            feature_value_dict[name] = node_map_[name].value
        except IndexError:
            logging.info("{}: not supported".format(name))
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        try:
            logging.info("restoring; node: {}, value: {}".format(
                name, feature_value_dict[name]))
            node_map_[name].value = feature_value_dict[name]
        except IndexError:
            logging.info("{}: not supported".format(name))


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        try:
            logging.info("setting; node: {}, value: {}".format(
                name, feature_value_dict[name]))
            node_map_[name].from_string(feature_value_dict[name])
        except IndexError:
            logging.info("{}: not supported".format(name))


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--nr-devices", type=int, default=2)
    parser.add_argument("-i", "--nr-images", type=int, default=200)
    parser.add_argument("-m", "--is-mock", action='store_true')
    parser.add_argument("-t", "--time-span", type=int, default=5)
    return parser


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO,
                        format='%(relativeCreated)d: %(module)s: %(levelname)s: %(message)s')

    parser = create_parser()
    args = parser.parse_args()

    nr_devices = args.nr_devices
    logging.info("nr. devices: {}".format(nr_devices))
    is_ext_trigger = False if args.is_mock else True
    nr_images = args.nr_images
    logging.info("nr. images: {}".format(nr_images))
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    exposure_time = 3343

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: dev. #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images, 0)
        logging.info("nr buffers; dev. #{}: {}".format(n, streams[n].ring_buffer.count))

        feature_value_pairs = {
            "TriggerMode": "On", "TriggerSource": "Software"} if \
            is_ext_trigger else {"GevInterfaceSelector": "0"}
        feature_value_pairs["ExposureTime"] = str(exposure_time)

        feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
        setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("setup completed; dev. #{}: {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    threads_list = [
        [TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
                          result=results[n], nr_images=nr_images,
                          is_trigger_available=is_ext_trigger,
                          time_span=exposure_time * 2) for n in
         range(nr_devices)],
        [AcquisitionThread(tag=str(n), device=devices[n],
                           result=results[n], nr_images=nr_images,
                           time_span_1=args.time_span) for n in
         range(nr_devices)]
    ]

    for threads in threads_list:
        for n, t in enumerate(threads):
            logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
            t.start()
            logging.info("started threading; dev. #{}".format(n))

        for n, t in enumerate(threads):
            logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
            t.join()
            logging.info("threading done; dev. #{}, t: {}".format(n, t))

    if is_ext_trigger:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    """
    for n in range(nr_devices):
        logging.info(
            "images: dev. #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))
    """

    for d in devices:
        d.close()

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("dev. #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))

There could still need to accumulate some more trials but we will need to have a program that gives you 200 as StreamInfo.NumBuffersPending anyway; otherwise, you will get stuck at the image acquisition thread because it expects to get a certain amount of images.

I hope you get a chance to try the proposed script and share the outcome with us to get the right approach for the next step.

Hi,

Logs below. It hangs on this for a while.

Whilst I appreciate the effort here, is it possible to get someone in the Stemmer team to test the example with 2 USB cameras?

Thanks,

Alex

For all reading this and having the same problem, @alexm found a working solution here:

https://forum.commonvisionblox.com/t/cvb-python-multi-camera-acquisition/1781/16

Cheers
Chris

1 Like