Hi Alex,
Thank you for your patience. The issue was from the wrong value that I typed; the correct value must be Software
.
By the way, I had been blindly following the proposed script and had extended it but I think the design is not the one you really want to achieve.
I have prepared a revised version for you (including the fix for the trigger source). The main differences are listed as follows:
- It introduces the concurrency that your application really needs.
- It separates the proposed thread worker into the triggering section and the acquiring section.
- It waits for an image by calling
wait_for
instead ofwait
.
I hope you get a chance to try it running.
import argparse
import datetime
from enum import IntEnum
import logging
import os
import threading
import time
from typing import List, Dict, Any
import cvb
import cvb.foundation
class Result:
class MeasurementItem(IntEnum):
Setup = 0
Triggering = 1
Acquisition = 2
def __init__(self):
self.images = []
self.measurement_dict = dict()
for i in [item.name for item in self.MeasurementItem]:
self.measurement_dict[i] = []
def append(self, name: str, entry: Any):
assert name in [item.name for item in self.MeasurementItem]
self.measurement_dict[name].append(entry)
class TriggeringThread(threading.Thread):
def __init__(self, tag: str, device: cvb.Device, node_map,
nr_images: int, result: Result,
is_trigger_available: bool = True,
time_span: float = 0.005):
super().__init__()
self._tag = tag
self._time_span = time_span
self._device = device
self._node_map = node_map
self._is_trigger_available = is_trigger_available
self._result = result
self._nr_images = nr_images
def run(self) -> None:
logging.info("triggering started: device #{}".format(self._tag))
count = 0
base = datetime.datetime.now()
self._device.stream().start()
if not self._is_trigger_available:
pass
else:
trigger_software = self._node_map["TriggerSoftware"] # type: cvb.CommandNode
logging.info("triggering started: device #{}".format(self._tag))
while count < nr_images:
while not trigger_software.is_done:
time.sleep(self._time_span)
trigger_software.execute()
count += 1
elapsed_time = (datetime.datetime.now() - base).total_seconds()
self._result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
logging.info("triggering completed: dev. #{}; {}".format(self._tag, elapsed_time))
class AcquisitionThread(threading.Thread):
def __init__(self, tag: str, device: cvb.Device, nr_images: int,
result: Result, time_span_1: int = 5,
time_span_2: float = 0.001):
super().__init__()
self._tag = tag
self.time_span_1 = time_span_1
self._time_span_2 = time_span_2
self._device = device
self._result = result
self._nr_images = nr_images
def run(self) -> None:
logging.info("acquisition started: dev. #{}".format(self._tag))
count = 0
base = datetime.datetime.now()
while count < nr_images:
logging.info("waiting: dev. #{}".format(self._tag))
image, status = self._device.stream().wait_for(self.time_span_1)
logging.info(
"wait completed; device: #{}, status: {}".format(self._tag, status))
if status == cvb.WaitStatus.Ok:
count += 1
self._result.images.append(image)
time.sleep(self._time_span_2)
elapsed_time = get_time_diff(base)
self._result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
logging.info("acquisition completed: {}; {} sec for {} images".format(
self._tag, elapsed_time, nr_images))
self._device.stream().stop()
logging.info("streaming stopped: dev. #{}".format(self._tag))
def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
feature_value_dict = dict()
for name in feature_names_:
logging.info("preserving; node: {}, value: {}".format(
name, node_map_[name].value))
feature_value_dict[name] = node_map_[name].value
return feature_value_dict
def restore_features(node_map_, feature_value_dict: Dict[str, str]):
for name in feature_value_dict.keys():
logging.info("restoring; node: {}, value: {}".format(
name, feature_value_dict[name]))
node_map_[name].value = feature_value_dict[name]
def setup_features(node_map_, feature_value_dict: Dict[str, str]):
for name in feature_value_dict.keys():
logging.info("setting; node: {}, value: {}".format(
name, feature_value_dict[name]))
node_map_[name].from_string(feature_value_dict[name])
def get_time_diff(base):
return (datetime.datetime.now() - base).total_seconds()
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--nr-devices", type=int, default=2)
parser.add_argument("-i", "--nr-images", type=int, default=200)
parser.add_argument("-m", "--is-mock", action='store_true')
parser.add_argument("-t", "--time-span", type=int, default=5)
return parser
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
parser = create_parser()
args = parser.parse_args()
nr_devices = args.nr_devices
logging.info("nr. devices: {}".format(nr_devices))
is_ext_trigger = False if args.is_mock else True
nr_images = args.nr_images
logging.info("nr. images: {}".format(nr_images))
feature_snapshot_list = []
devices = []
node_maps = []
streams = []
results = []
for n in range(nr_devices):
results.append(Result())
logging.info("setting up device: dev. #{}".format(n))
base = datetime.datetime.now()
devices.append(cvb.DeviceFactory.open(
os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
node_maps.append(devices[n].node_maps["Device"])
streams.append(devices[n].stream())
streams[n].ring_buffer.change_count(nr_images, 0)
logging.info("nr buffers: dev. #{}; {}".format(n, streams[n].ring_buffer.count))
feature_value_pairs = {
"TriggerMode": "On", "TriggerSource": "Software"} if \
is_ext_trigger else {"GevInterfaceSelector": "0"}
feature_snapshot_list.append(store_features(node_maps[n], feature_value_pairs.keys()))
setup_features(node_maps[n], feature_value_pairs)
elapsed_time = get_time_diff(base)
logging.info("setup completed: dev. #{}; {} sec".format(
n, elapsed_time))
results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)
triggering_threads = []
for n in range(nr_devices):
triggering_threads.append(TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n], result=results[n], nr_images=nr_images, is_trigger_available=is_ext_trigger))
threads_list = [
[TriggeringThread(tag=str(n), device=devices[n], node_map=node_maps[n],
result=results[n], nr_images=nr_images,
is_trigger_available=is_ext_trigger) for n in
range(nr_devices)],
[AcquisitionThread(tag=str(n), device=devices[n],
result=results[n], nr_images=nr_images,
time_span_1=args.time_span) for n in
range(nr_devices)]
]
for threads in threads_list:
for n, t in enumerate(threads):
logging.info("going to start threading; dev. #{}, t: {}".format(n, t))
t.start()
logging.info("started threading; dev. #{}".format(n))
for n, t in enumerate(threads):
logging.info("before finishing threading; dev. #{}, t: {}".format(n, t))
t.join()
logging.info("threading done; dev. #{}, t: {}".format(n, t))
if is_ext_trigger:
for n in range(nr_devices):
restore_features(node_maps[n], feature_snapshot_list[n])
"""
for n in range(nr_devices):
logging.info(
"images: dev. #{}; {}".format(
n, [cvb.as_array(image) for image in results[n].images]))
"""
logging.info("{}".format("#### Summary ####"))
for n in range(nr_devices):
logging.info("dev. #{}".format(n))
for item in [i.name for i in Result.MeasurementItem]:
logging.info(" {}: {}".format(
item, results[n].measurement_dict[item]))