Hi Alex,
I have just prepared an updated version of the test script:
import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any
import cvb
import cvb.foundation
class Result:
class MeasurementItem(IntEnum):
Setup = 0
Triggering = 1
Acquisition = 2
def __init__(self):
self.images = []
self.measurement_dict = dict()
for i in [item.name for item in self.MeasurementItem]:
self.measurement_dict[i] = []
def append(self, name: str, entry: Any):
assert name in [item.name for item in self.MeasurementItem]
self.measurement_dict[name].append(entry)
class TriggeringThread(threading.Thread):
def __init__(self, tag: str, device: cvb.Device, node_map,
nr_images: int, result: Result,
is_trigger_available: bool = True, time_span: float = 0.005,
exposure_time = None):
super().__init__()
self._tag = tag
self._time_span = exposure_time * 10e-6 if exposure_time else time_span
self._device = device
self._node_map = node_map
self._is_trigger_available = is_trigger_available
self._result = result
self._nr_images = nr_images
def run(self) -> None:
logging.info("triggering started: device #{}".format(self._tag))
count = 0
base = datetime.datetime.now()
self._device.stream().start()
if not self._is_trigger_available:
pass
else:
trigger_software = self._node_map["TriggerSoftware"] # type: cvb.CommandNode
logging.info("triggering started: device #{}".format(self._tag))
while count < nr_images:
while not trigger_software.is_done:
time.sleep(self._time_span)
trigger_software.execute()
count += 1
elapsed_time = (datetime.datetime.now() - base).total_seconds()
self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
logging.info("triggering completed; dev. #{}: {}".format(self._tag, elapsed_time))
logging.info("nr buffers pending; dev. #{}: {}".format(
self._tag,
int(self._device.stream().statistics[cvb.StreamInfo.NumBuffersPending])))
class AcquisitionThread(threading.Thread):
def __init__(self, tag: str, device: cvb.Device, nr_images: int,
result: Result, time_span_1: int = 5,
time_span_2: float = 0.001):
super().__init__()
self._tag = tag
self.time_span_1 = time_span_1
self._time_span_2 = time_span_2
self._device = device
self._result = result
self._nr_images = nr_images
def run(self) -> None:
logging.info("acquisition started: dev. #{}".format(self._tag))
count = 0
base = datetime.datetime.now()
while count < nr_images:
image, status = self._device.stream().wait_for(self.time_span_1)
logging.info(
"wait completed; dev. #{}: status: {}".format(self._tag, status))
if status == cvb.WaitStatus.Ok:
logging.info(
"acquired an image; dev. #{}: image #{}".format(self._tag, count))
count += 1
self._result.images.append(image)
time.sleep(self._time_span_2)
elapsed_time = get_time_diff(base)
self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
logging.info("acquisition completed; dev. #{}: {} sec for {} images".format(
self._tag, elapsed_time, nr_images))
self._device.stream().abort()
logging.info("streaming stopped: dev. #{}".format(self._tag))
def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
feature_value_dict = dict()
for name in feature_names_:
try:
logging.info("preserving; node: {}, value: {}".format(
name, node_map_[name].value))
feature_value_dict[name] = node_map_[name].value
except IndexError:
logging.info("{}: not supported".format(name))
return feature_value_dict
def restore_features(node_map_, feature_value_dict: Dict[str, str]):
for name in feature_value_dict.keys():
try:
logging.info("restoring; node: {}, value: {}".format(
name, feature_value_dict[name]))
node_map_[name].value = feature_value_dict[name]
except IndexError:
logging.info("{}: not supported".format(name))
def setup_features(node_map_, feature_value_dict: Dict[str, str]):
for name in feature_value_dict.keys():
try:
logging.info("setting; node: {}, value: {}".format(
name, feature_value_dict[name]))
node_map_[name].from_string(feature_value_dict[name])
except IndexError:
logging.info("{}: not supported".format(name))
def get_time_diff(base):
return (datetime.datetime.now() - base).total_seconds()
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--nr-devices", type=int, default=2)
parser.add_argument("-i", "--nr-images", type=int, default=200)
parser.add_argument("-m", "--is-mock", action='store_true')
parser.add_argument("-t", "--time-span", type=int, default=5)
return parser
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
format='%(relativeCreated)d: %(module)s: %(levelname)s: %(message)s')
parser = create_parser()
args = parser.parse_args()
nr_devices = args.nr_devices
logging.info("nr. devices: {}".format(nr_devices))
is_ext_trigger = False if args.is_mock else True
nr_images = args.nr_images
logging.info("nr. images: {}".format(nr_images))
feature_snapshot_list = []
devices = []
node_maps = []
streams = []
results = []
exposure_time = 3343
for n in range(nr_devices):
results.append(Result())
logging.info("setting up device: dev. #{}".format(n))
base = datetime.datetime.now()
devices.append(cvb.DeviceFactory.open(
os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
node_maps.append(devices[n].node_maps["Device"])
streams.append(devices[n].stream())
streams[n].ring_buffer.change_count(nr_images, 0)
logging.info("nr buffers; dev. #{}: {}".format(n, streams[n].ring_buffer.count))
feature_value_pairs = {
"TriggerMode": "On", "TriggerSource": "Software"} if \
is_ext_trigger else {"GevInterfaceSelector": "0"}
feature_value_pairs["ExposureTime"] = str(exposure_time)
feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
setup_features(node_maps[n], feature_value_pairs)
elapsed_time = get_time_diff(base)
logging.info("setup completed; dev. #{}: {} sec".format(
n, elapsed_time))
results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)
threads_list = [
[TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
result=results[n], nr_images=nr_images,
is_trigger_available=is_ext_trigger,
exposure_time=exposure_time) for n in
range(nr_devices)],
[AcquisitionThread(tag=str(n), device=devices[n],
result=results[n], nr_images=nr_images,
time_span_1=args.time_span) for n in
range(nr_devices)]
]
for threads in threads_list:
for n, t in enumerate(threads):
logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
t.start()
logging.info("started threading; dev. #{}".format(n))
for n, t in enumerate(threads):
logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
t.join()
logging.info("threading done; dev. #{}, t: {}".format(n, t))
if is_ext_trigger:
for n in range(nr_devices):
restore_features(node_maps[n], feature_snapshot_list[n])
"""
for n in range(nr_devices):
logging.info(
"images: dev. #{}; {}".format(
n, [cvb.as_array(image) for image in results[n].images]))
"""
for d in devices:
d.close()
logging.info("{}".format("#### Summary ####"))
for n in range(nr_devices):
logging.info("dev. #{}".format(n))
for item in [i.name for i in Result.MeasurementItem]:
logging.info(" {}: {}".format(
item, results[n].measurement_dict[item]))
So far, we are not yet certain if the device is triggered as expected even though we’ve been checking the completion by calling the is_done
method.
A possible case where the wait
call or the wait_for
call slip into the infinite loop: Even though we execute triggering the device certain times but the device drops some of them; it eventually leads to the situation where the images are less than our expectation.
To check the reality, I have added a line that printouts the number of pending buffers that are ready to be retrieved right after the triggering process.
In addition, for a case where the device’s IsDone
method is not synced with the real behavior, I have added an intentional sleep that is equal to the exposure time so that we can be sure the device is ready to accept the next trigger. This sleep can be redundant from the performance point of view but the performance should be another subject to be discussed later.
Thank you again for your patience and cooperation.