CVB Python Multi-Camera Acquisition - Software Trigger

Hi Alex,

Thank you for your patience. The issue was from the wrong value that I typed; the correct value must be Software.

By the way, I had been blindly following the proposed script and had extended it but I think the design is not the one you really want to achieve.

I have prepared a revised version for you (including the fix for the trigger source). The main differences are listed as follows:

  • It introduces the concurrency that your application really needs.
  • It separates the proposed thread worker into the triggering section and the acquiring section.
  • It waits for an image by calling wait_for instead of wait.

I hope you get a chance to try it running.

import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any

import cvb
import cvb.foundation


class Result:
    class MeasurementItem(IntEnum):
        Setup = 0
        Triggering = 1
        Acquisition = 2

    def __init__(self):
        self.images = []
        self.measurement_dict = dict()
        for i in [item.name for item in self.MeasurementItem]:
            self.measurement_dict[i] = []

    def append(self, name: str, entry: Any):
        assert name in [item.name for item in self.MeasurementItem]
        self.measurement_dict[name].append(entry)


class TriggeringThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, node_map,
                                  nr_images: int, result: Result,
                                  is_trigger_available: bool = True,
                                  time_span: float = 0.005):
        super().__init__()
        self._tag = tag
        self._time_span = time_span
        self._device = device
        self._node_map = node_map
        self._is_trigger_available = is_trigger_available
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("triggering started: device #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        self._device.stream().start()
        if not self._is_trigger_available:
            pass
        else:
            trigger_software = self._node_map["TriggerSoftware"]  # type: cvb.CommandNode
            logging.info("triggering started: device #{}".format(self._tag))
            while count < nr_images:
                while not trigger_software.is_done:
                    time.sleep(self._time_span)
                trigger_software.execute()
                count += 1
        elapsed_time = (datetime.datetime.now() - base).total_seconds()
        self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
        logging.info("triggering completed: dev. #{}; {}".format(self._tag, elapsed_time))


class AcquisitionThread(threading.Thread):
    def __init__(self, tag: str, device: cvb.Device, nr_images: int,
                            result: Result, time_span_1: int = 5,
                            time_span_2: float = 0.001):
        super().__init__()
        self._tag = tag
        self.time_span_1 = time_span_1
        self._time_span_2 = time_span_2
        self._device = device
        self._result = result
        self._nr_images = nr_images

    def run(self) -> None:
        logging.info("acquisition started: dev. #{}".format(self._tag))
        count = 0
        base = datetime.datetime.now()
        while count < nr_images:
            logging.info("waiting: dev. #{}".format(self._tag))
            image, status = self._device.stream().wait_for(self.time_span_1)
            logging.info(
                "wait completed; device: #{}, status: {}".format(self._tag, status))
            if status == cvb.WaitStatus.Ok:
                count += 1
                self._result.images.append(image)
            time.sleep(self._time_span_2)
        elapsed_time = get_time_diff(base)
        self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
        logging.info("acquisition completed: {}; {} sec for {} images".format(
            self._tag, elapsed_time, nr_images))

        self._device.stream().stop()
        logging.info("streaming stopped: dev. #{}".format(self._tag))


def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
    feature_value_dict = dict()
    for name in feature_names_:
        logging.info("preserving; node: {}, value: {}".format(
            name, node_map_[name].value))
        feature_value_dict[name] = node_map_[name].value
    return feature_value_dict


def restore_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        logging.info("restoring; node: {}, value: {}".format(
            name, feature_value_dict[name]))
        node_map_[name].value = feature_value_dict[name]


def setup_features(node_map_, feature_value_dict: Dict[str, str]):
    for name in feature_value_dict.keys():
        logging.info("setting; node: {}, value: {}".format(
            name, feature_value_dict[name]))
        node_map_[name].from_string(feature_value_dict[name])


def get_time_diff(base):
    return (datetime.datetime.now() - base).total_seconds()


def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--nr-devices", type=int, default=2)
    parser.add_argument("-i", "--nr-images", type=int, default=200)
    parser.add_argument("-m", "--is-mock", action='store_true')
    parser.add_argument("-t", "--time-span", type=int, default=5)
    return parser


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)

    parser = create_parser()
    args = parser.parse_args()

    nr_devices = args.nr_devices
    logging.info("nr. devices: {}".format(nr_devices))
    is_ext_trigger = False if args.is_mock else True
    nr_images = args.nr_images
    logging.info("nr. images: {}".format(nr_images))
    feature_snapshot_list = []
    devices = []
    node_maps = []
    streams = []
    results = []

    for n in range(nr_devices):
        results.append(Result())
        logging.info("setting up device: dev. #{}".format(n))

        base = datetime.datetime.now()
        devices.append(cvb.DeviceFactory.open(
            os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
        node_maps.append(devices[n].node_maps["Device"])
        streams.append(devices[n].stream())

        streams[n].ring_buffer.change_count(nr_images, 0)
        logging.info("nr buffers: dev. #{}; {}".format(n, streams[n].ring_buffer.count))

        feature_value_pairs = {
            "TriggerMode": "On", "TriggerSource": "Software"} if \
            is_ext_trigger else {"GevInterfaceSelector": "0"}

        feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
        setup_features(node_maps[n], feature_value_pairs)

        elapsed_time = get_time_diff(base)
        logging.info("setup completed: dev. #{}; {} sec".format(
            n, elapsed_time))
        results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)

    triggering_threads = []
    for n in range(nr_devices):
        triggering_threads.append(TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n], result=results[n], nr_images=nr_images, is_trigger_available=is_ext_trigger))

    threads_list = [
        [TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
                          result=results[n], nr_images=nr_images,
                          is_trigger_available=is_ext_trigger) for n in
         range(nr_devices)],
        [AcquisitionThread(tag=str(n), device=devices[n],
                           result=results[n], nr_images=nr_images,
                           time_span_1=args.time_span) for n in
         range(nr_devices)]
    ]

    for threads in threads_list:
        for n, t in enumerate(threads):
            logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
            t.start()
            logging.info("started threading; dev. #{}".format(n))

        for n, t in enumerate(threads):
            logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
            t.join()
            logging.info("threading done; dev. #{}, t: {}".format(n, t))

    if is_ext_trigger:
        for n in range(nr_devices):
            restore_features(node_maps[n], feature_snapshot_list[n])

    """
    for n in range(nr_devices):
        logging.info(
            "images: dev. #{}; {}".format(
                n, [cvb.as_array(image) for image in results[n].images]))
    """

    logging.info("{}".format("#### Summary ####"))
    for n in range(nr_devices):
        logging.info("dev. #{}".format(n))
        for item in [i.name for i in Result.MeasurementItem]:
            logging.info("    {}: {}".format(
                item, results[n].measurement_dict[item]))